Compare commits

...

46 Commits

Author SHA1 Message Date
Nadav Har'El
afa2c1b0bf materialized_views: propagate "view virtual columns" between nodes
db::schema_tables::ALL and db::schema_tables::all_tables() are both supposed
to list the same schema tables - the former is the list of their names, and
the latter is the list of their schemas. This code duplication makes it easy
to forget to update one of them, and indeed recently the new
"view_virtual_columns" was added to all_tables() but not to ALL.

What this patch does is to make ALL a function instead of constant vector.
The newly named all_table_names() function uses all_tables() so the list
of schema tables only appears once.

So that nobody worries about the performance impact, all_table_names()
caches the list in a per-thread vector that is only prepared once per thread.

Because after this patch all_table_names() has the "view_virtual_columns"
that was previously missing, this patch also fixes #4339, which was about
virtual columns in materialized views not being propagated to other nodes.

Unfortunately, to test the fix for #4339 we need a test with multiple
nodes, so we cannot test it here in a unit test, and will instead use
the dtest framework, in a separate patch.

Fixes #4339

Branches: 3.0
Tests: all unit tests (release and debug mode), new dtest for #4339. The unit test mutation_reader_test failed in debug mode but not in release mode, but this probably has nothing to do with this patch (?).

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Message-Id: <20190320063437.32731-1-nyh@scylladb.com>
(cherry picked from commit 7c874057f5)
2020-01-06 00:37:59 +02:00
Tomasz Grabiec
ad70fe8503 cql: alter type: Format field name as text instead of hex
Fixes #4841

Message-Id: <1565702635-26214-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 64ff1b6405)
2020-01-05 18:55:40 +02:00
Gleb Natapov
3cd9c78056 cache_hitrate_calculator: do not ignore a future returned from gossiper::add_local_application_state
We should wait for a future returned from add_local_application_state() to
resolve before issuing new calculation, otherwise two
add_local_application_state() may run simultaneously for the same state.

Fixes #4838.

Message-Id: <20190812082158.GE17984@scylladb.com>
(cherry picked from commit 00c4078af3)
2020-01-05 18:50:27 +02:00
Benny Halevy
c5e5ed2775 tracing: one_session_records: keep local tracing ptr
Similar to trace_state keep shared_ptr<tracing> _local_tracing_ptr
in one_session_records when constructed so it can be used
during shutdown.

Fixes #5243

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 7aef39e400)
2019-12-24 18:42:33 +02:00
Tomasz Grabiec
666266c3cf types: Fix abort on type alter which affects a compact storage table with no regular columns
Fixes #4837

Message-Id: <1565702247-23800-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 34cff6ed6b)
2019-12-24 17:44:40 +02:00
Dejan Mircevski
19b5d70338 tests: Add cquery_nofail() utility
Most tests await the result of cql_test_env::execute_cql().  Most
would also benefit from reporting errors with top-level location
included.

Ref #4837 (a prerequisite for backporting)

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>
(cherry picked from commit a9849ecba7)
2019-12-24 17:44:40 +02:00
Amnon Heiman
b3cdee7e27 init: do not allow replace-address for seeds
If a node is a seed node, it can not be started with
replace-address-first-boot or the replace-address flag.

The issue is that as a seed node it will generate new tokens instead of
replacing the existing one the user expect it to replaec when supplying
the flags.

This patch will throw a bad_configuration_error exception
in this case.

Fixes #3889

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 399d79fc6f)
2019-12-23 17:24:52 +02:00
Rafael Ávila de Espíndola
4c42f18d82 cql: Fix use of UDT in reversed columns
We were missing calls to underlying_type in a few locations and so the
insert would think the given literal was invalid and the select would
refuse to fetch a UDT field.

Fixes #4672

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20190708200516.59841-1-espindola@scylladb.com>
(cherry picked from commit 4e7ffb80c0)
2019-12-23 15:57:47 +02:00
Benny Halevy
ea8f8ab7a3 sstables: mc: prevent signed integer overflow
Fix runtime error: signed integer overflow
introduced by 2dc3776407

Delta-encoded values may wrap around if the encoded value is
less than the base value.  This could happen in two places:
In the mc-format serialization header itself, where the base values are implicit
Cassandra epoch time, and in the sstables data files, where the base values
are taken from the encoding_stats (later written to the serialization_header).

In these cases, when the calculation is done using signed integer/long we may see
"runtime error: signed integer overflow" messages in debug mode
(with -fsanitize=undefined / -fsanitize=signed-integer-overflow).

Overflow here is expected and harmless since we do not gurantee that
neither the base values in the serialization header are greater than
or equal to Cassandra's epoch now that the delta-encoded values are
always greater than or equal to the respective base values in
the serialization header.

To prevent these warnings, the subtraction/addition should be done with unsigned
(two's complement) arithmetic and the result converted to the signed type.

Note that to keep the code simple where possible, when also rely on implicit
conversion of signed integers to unsigned when either one of added value is unsigned
and the other is signed.

Fixes: #4098

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20190120142950.15776-1-bhalevy@scylladb.com>
(cherry picked from commit 844a2de263)
2019-12-15 15:52:35 +02:00
Piotr Sarna
db6821ce8f table: Reduce read amplification in view update generation
This commit makes sure that single-partition readers for
read-before-write do not have fast-forwarding enabled,
as it may lead to huge read amplification. The observed case was:
1. Creating an index.
  CREATE INDEX index1  ON myks2.standard1 ("C1");
2. Running cassandra-stress in order to generate view updates.
cassandra-stress write no-warmup n=1000000 cl=ONE -schema \
  'replication(factor=2) compaction(strategy=LeveledCompactionStrategy)' \
  keyspace=myks2 -pop seq=4000000..8000000 -rate threads=100 -errors
  skip-read-validation -node 127.0.0.1;

Without disabling fast-forwarding, single-partition readers
were turned into scanning readers in cache, which resulted
in reading 36GB (sic!) on a workload which generates less
than 1GB of view updates. After applying the fix, the number
dropped down to less than 1GB, as expected.

Refs #5409
Fixes #4615
Fixes #5418

(cherry picked from commit 79c3a508f4)
2019-12-05 22:36:41 +02:00
Rafael Ávila de Espíndola
3c91bad0dc commitlog: make sure a file is closed
If allocate or truncate throws, we have to close the file.

Fixes #4877

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20191114174810.49004-1-espindola@scylladb.com>
(cherry picked from commit 6160b9017d)
2019-11-24 17:50:06 +02:00
Tomasz Grabiec
bbe41a82be row_cache: Fix abort on bad_alloc during cache update
Since 90d6c0b, cache will abort when trying to detach partition
entries while they're updated. This should never happen. It can happen
though, when the update fails on bad_alloc, because the cleanup guard
invalidates the cache before it releases partition snapshots (held by
"update" coroutine).

Fix by destroying the coroutine first.

Fixes #5327.

Tests:
  - row_cache_test (dev)

Message-Id: <1574360259-10132-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit e3d025d014)
2019-11-24 17:44:30 +02:00
Nadav Har'El
6fb42269e9 merge: row_marker: correct row expiry condition
Merged patch set by Piotr Dulikowski:

This change corrects condition on which a row was considered expired by its
TTL.

The logic that decides when a row becomes expired was inconsistent with the
logic that decides if a single cell is expired. A single cell becomes expired
when expiry_timestamp <= now, while a row became expired when
expiry_timestamp < now (notice the strict inequality). For rows inserted
with TTL, this caused non-key cells to expire (change their values to null)
one second before the row disappeared. Now, row expiry logic uses non-strict
inequality.

Fixes #4263,
Fixes #5290.

Tests:

    unit(dev)
    python test described in issue #5290

(cherry picked from commit 9b9609c65b)
(cherry picked from commit 95acf71680)
2019-11-20 21:40:40 +02:00
Asias He
ee2255a189 gossip: Fix max generation drift measure
Assume n1 and n2 in a cluster with generation number g1, g2. The
cluster runs for more than 1 year (MAX_GENERATION_DIFFERENCE). When n1
reboots with generation g1' which is time based, n2 will see
g1' > g2 + MAX_GENERATION_DIFFERENCE and reject n1's gossip update.

To fix, check the generation drift with generation value this node would
get if this node were restarted.

This is a backport of CASSANDRA-10969.

Fixes #5164

(cherry picked from commit 0a52ecb6df)
2019-11-20 11:39:37 +02:00
Kamil Braun
3218e6cd4c view: fix bug in virtual columns.
When creating a virtual column of non-frozen map type,
the wrong type was used for the map's keys.

Fixes #5165.

(cherry picked from commit ef9d5750c8)
2019-11-19 11:17:54 +02:00
Rafael Ávila de Espíndola
1d94aac551 sstable: close file_writer if an exception in thrown
The previous code was not exception safe and would eventually cause a
file to be destroyed without being closed, causing an assert failure.

Unfortunately it doesn't seem to be possible to test this without
error injection, since using an invalid directory fails before this
code is executed.

Fixes #4948

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20190904002314.79591-1-espindola@scylladb.com>
(cherry picked from commit 000514e7cc)
2019-11-19 11:17:54 +02:00
Avi Kivity
2e5110d063 reconcilable_result: use chunked_vector to hold partitions
Usually, a reconcilable_result holds very few partitions (1 is common),
since the page size is limited by 1MB. But if we have paging disabled or
if we are reconciling a range full of tombstones, we may see many more.
This can cause large allocations.

Change to chunked_vector to prevent those large allocations, as they
can be quite expensive.

Fixes #4780.

(cherry picked from commit 093d2cd7e5)
2019-11-19 11:17:54 +02:00
Avi Kivity
e4bb7ce73c utils::chunked_vector: add rbegin() and related iterators
Needed as an std::vector replacement.

(cherry picked from commit eaa9a5b0d7)

Prerequisite for #4780.
2019-11-19 11:17:54 +02:00
Avi Kivity
ecc54c1a68 utils: chunked_vector: make begin()/end() const correct
begin() of a const vector should return a const_iterator, to avoid
giving the caller the ability to mutate it.

This slipped through since iterator's constructor does a const_cast.

Noticed by code inspection.

(cherry picked from commit df6faae980)

Prerequisite for #4780.
2019-11-19 11:17:54 +02:00
Glauber Costa
71cfd108c6 do not crash in user-defined operations if the controller is disabled
Scylla currently crashes if we run manual operations like nodetool
compact with the controller disabled. While we neither like nor
recommend running with the controller disabled, due to some corner cases
in the controller algorithm we are not yet at the point in which we can
deprecate this and are sometimes forced to disable it.

The reason for the crash is that manual operations will invoke
_backlog_of_shares, which returns what is the backlog needed to
create a certain number of shares. That scan the existing control
points, but when we run without the controller there are no control
points and we crash.

Backlog doesn't matter if the controller is disabled, and the return
value of this function will be immaterial in this case. So to avoid the
crash, we return something right away if the controller is disabled.

Fixes #5016

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit c9f2d1d105)
2019-11-19 11:17:54 +02:00
Avi Kivity
d40a7a5e9e Merge "Add proper aggregation for paged indexing" from Piotr
"
Fixes #4540

This series adds proper handling of aggregation for paged indexed queries.
Before this series returned results were presented to the user in per-page
partial manner, while they should have been returned as a single aggregated
value.

Tests: unit(dev)
"

* 'add_proper_aggregation_for_paged_indexing_for_3.0' of https://github.com/psarna/scylla:
  test: add 'eventually' block to index paging test
  tests: add indexing+paging test case for clustering keys
  tests: add indexing + paging + aggregation test case
  cql3: make DEFAULT_COUNT_PAGE_SIZE constant public
  cql3: add proper aggregation to paged indexing
  cql3: add a query options constructor with explicit page size
  cql3: enable explicit copying of query_options
  cql3: split execute_base_query implementation
2019-11-19 11:17:54 +02:00
Takuya ASADA
a163d245ec dist/common/scripts/scylla_setup: don't proceed with empty NIC name
Currently NIC selection prompt on scylla_setup just proceed setup when
user just pressed Enter key on the prompt.
The prompt should ask NIC name again until user input correct NIC name.

Fixes #4517
Message-Id: <20190617124925.11559-1-syuu@scylladb.com>

(cherry picked from commit 7320c966bc)
2019-11-19 11:17:54 +02:00
Piotr Sarna
045831b706 test: add 'eventually' block to index paging test
Without 'eventually', the test is flaky because the index can still
be not up to date while checking its conditions.

Fixes #4670

(cherry picked from commit ebbe038d19)
2019-11-15 09:15:29 +01:00
Piotr Sarna
148245ab6a tests: add indexing+paging test case for clustering keys
Indexing a non-prefix part of the clustering key has a separate
code path (see issue #3405), so it deserves a separate test case.
2019-11-14 12:32:08 +01:00
Piotr Sarna
bbe5de1403 tests: add indexing + paging + aggregation test case
Indexed queries used to erroneously return partial per-page results
for aggregation queries. This test case used to reproduce the problem
and now ensures that there would be no regressions.

Refs #4540
2019-11-14 12:32:07 +01:00
Piotr Sarna
ca0df416c0 cql3: make DEFAULT_COUNT_PAGE_SIZE constant public
The constant will be later used in test scenarios.
2019-11-14 12:25:37 +01:00
Piotr Sarna
37ed60374e cql3: add proper aggregation to paged indexing
Aggregated and paged filtering needs to aggregate the results
from all pages in order to avoid returning partial per-page
results. It's a little bit more complicated than regular aggregation,
because each paging state needs to be translated between the base
table and the underlying view. The routine keeps fetching pages
from the underlying view, which are then used to fetch base rows,
which go straight to the result set builder.

Fixes #4540
2019-11-14 12:25:37 +01:00
Piotr Sarna
7c991a276b cql3: add a query options constructor with explicit page size
For internal use, there already exists a query_options constructor
that copies data from another query_options with overwritten paging
state. This commit adds an option to overwrite page size as well.
2019-11-14 10:49:28 +01:00
Piotr Sarna
72e039be85 cql3: enable explicit copying of query_options 2019-11-14 10:49:28 +01:00
Piotr Sarna
a28ecc4714 cql3: split execute_base_query implementation
In order to handle aggregation queries correctly, the function that
returns base query results is split into two, so it's possible to
access raw query results, before they're converted into end-user
CQL message.
2019-11-14 10:49:28 +01:00
Avi Kivity
584c555698 Update seastar submodule
* seastar 3920dcb3f8...083dc0875e (2):
  > core: fix a race in execution stages
  > execution_stage: prevent unbounded growth

Fixes #4749.
Fixes #4856.
2019-11-13 13:15:54 +02:00
null
e772f11ee0 release: prepare for3.0.11 by yaronkaikov 2019-10-30 11:01:40 +02:00
Botond Dénes
d79b6a7481 repair: repair_cf_range(): extract result of local checksum calculation only once
The loop that collects the result of the checksum calculations and logs
any errors. The error logging includes `checksums[0]` which corresponds
to the checksum calculation on the local node. This violates the
assumption of the code following the loop, which assumes that the future
of `checksums[0]` is intact after the loop terminates. However this is
only true when the checksum calculation is successful and is false when
it fails, as in this case the loop extracts the error and logs it. When
the code after the loop checks again whether said calculation failed, it
will get a false negative and will go ahead and attempt to extract the
value, triggering an assert failure.
Fix by making sure that even in the case of failed checksum calculation,
the result of `checksum[0]` is extracted only once.

Fixes: #5238
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20191029151709.90986-1-bdenes@scylladb.com>
(cherry picked from commit e48f301e95)
2019-10-29 20:43:50 +02:00
Avi Kivity
85168c500c Merge "Fix handling of schema alters and eviction in cache" from Tomasz
"
Fixes #5134, Eviction concurrent with preempted partition entry update after
  memtable flush may allow stale data to be populated into cache.

Fixes #5135, Cache reads may miss some writes if schema alter followed by a
  read happened concurrently with preempted partition entry update.

Fixes #5127, Cache populating read concurrent with schema alter may use the
  wrong schema version to interpret sstable data.

Fixes #5128, Reads of multi-row partitions concurrent with memtable flush may
  fail or cause a node crash after schema alter.
"

* tag 'fix-cache-issues-with-schema-alter-and-eviction-v2' of github.com:tgrabiec/scylla:
  tests: row_cache: Introduce test_alter_then_preempted_update_then_memtable_read
  tests: row_cache_stress_test: Verify all entries are evictable at the end
  tests: row_cache_stress_test: Exercise single-partition reads
  tests: row_cache_stress_test: Add periodic schema alters
  tests: memtable_snapshot_source: Allow changing the schema
  tests: simple_schema: Prepare for schema altering
  row_cache: Record upgraded schema in memtable entries during update
  memtable: Extract memtable_entry::upgrade_schema()
  row_cache, mvcc: Prevent locked snapshots from being evicted
  row_cache: Make evict() not use invalidate_unwrapped()
  mvcc: Introduce partition_snapshot::touch()
  row_cache, mvcc: Do not upgrade schema of entries which are being updated
  row_cache: Use the correct schema version to populate the partition entry
  delegating_reader: Optimize fill_buffer()
  row_cache, memtable: Use upgrade_schema()
  flat_mutation_reader: Introduce upgrade_schema()

(cherry picked from commit 8ed6f94a16)
(cherry picked from commit 3f4d9f210f)
2019-10-22 19:47:02 +02:00
Botond Dénes
5b9e2cd6e6 querier_cache: correctly account entries evicted on insertion in the population
Currently, the population stat is not increased for entries that are
evicted immediately on insert, however the code that does the eviction
still decreases the population stat, leading to an imbalance and in some
cases the underflow of the population stat. To fix, unconditionally
increase the population stat upon inserting an entry, regardless of
whether it is immediately evicted or not.

Fixes: #5123

Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20191001153215.82997-1-bdenes@scylladb.com>
(cherry picked from commit 00b432b61d)
2019-10-05 12:36:34 +03:00
Avi Kivity
77f33ca106 Merge " hinted handoff: fix races during shutdown and draining" from Vlad
"
Fix races that may lead to use-after-free events and file system level exceptions
during shutdown and drain.

The root cause of use-after-free events in question is that space_watchdog blocks on
end_point_hints_manager::file_update_mutex() and we need to make sure this mutex is alive as long as
it's accessed even if the corresponding end_point_hints_manager instance
is destroyed in the context of manager::drain_for().

File system exceptions may occur when space_watchdog attempts to scan a
directory while it's being deleted from the drain_for() context.
In case of such an exception new hints generation is going to be blocked
- including for materialized views, till the next space_watchdog round (in 1s).

Issues that are fixed are #4685 and #4836.

Tested as follows:
 1) Patched the code in order to trigger the race with (a lot) higher
    probability and running slightly modified hinted handoff replace
    dtest with a debug binary for 100 times. Side effect of this
    testing was discovering of #4836.
 2) Using the same patch as above tested that there are no crashes and
    nodes survive stop/start sequences (they were not without this series)
    in the context of all hinted handoff dtests. Ran the whole set of
    tests with dev binary for 10 times.
"

Fixes #4685
Fixes #4836.

* 'hinted_handoff_race_between_drain_for_and_space_watchdog_no_global_lock-v2' of https://github.com/vladzcloudius/scylla:
  hinted handoff: fix a race on a directory removal between space_watchdog and drain_for()
  hinted handoff: make taking file_update_mutex safe
  db::hints::manager::drain_for(): fix alignment
  db::hints::manager: serialize calls to drain_for()
  db::hints: cosmetics: identation and missing method qualifier

(cherry picked from commit 3cb081eb84)
2019-10-05 12:25:51 +03:00
Gleb Natapov
93760f13ee messaging_service: enable reuseaddr on messaging service rpc
Fixes #4943

Message-Id: <20190918152405.GV21540@scylladb.com>
(cherry picked from commit 73e3d0a283)
2019-10-03 15:24:53 +03:00
Avi Kivity
e597ae1176 Update seastar submodule
* seastar af3fc691b9...3920dcb3f8 (2):
  > net: socket::{set,get}_reuseaddr() should not be virtual
  > Merge "fix some tcp connection bugs and add reuseaddr option to a client socket" from Gleb

Prerequisite for #4943.
2019-10-03 15:23:35 +03:00
Tomasz Grabiec
79c7015cce Merge "hinted handoff: don't reuse_segments and discard corrupted segments" from Vlad
This series addresses two issues in the hinted handoff that should
complete fixing the infamous #4231.

In particular the second patch removes the requirement to manually
delete hints files after upgrading to 3.0.4.

Tested with manual unit testing.

* https://github.com/vladzcloudius/scylla.git hinted_handoff_drop_broken_segments-v3:
  hinted handoff: disable "reuse_segments"
  commitlog: introduce a segment_error
  hinted handoff: discard corrupted segments

(cherry picked from commit ac0d435c3e)
2019-09-28 19:52:57 +03:00
Asias He
00a14000cd storage_service: Replicate and advertise tokens early in the boot up process
When a node is restarted, there is a race between gossip starts (other
nodes will mark this node up again and send requests) and the tokens are
replicated to other shards. Here is an example:

- n1, n2
- n2 is down, n1 think n2 is down
- n2 starts again, n2 starts gossip service, n1 thinks n2 is up and sends
  reads/writes to n2, but n2 hasn't replicated the token_metadata to all
  the shards.
- n2 complains:
  token_metadata - sorted_tokens is empty in first_token_index!
  token_metadata - sorted_tokens is empty in first_token_index!
  token_metadata - sorted_tokens is empty in first_token_index!
  token_metadata - sorted_tokens is empty in first_token_index!
  token_metadata - sorted_tokens is empty in first_token_index!
  token_metadata - sorted_tokens is empty in first_token_index!
  storage_proxy - Failed to apply mutation from $ip#4: std::runtime_error
  (sorted_tokens is empty in first_token_index!)

The code path looks like below:

0 stoarge_service::init_server
1    prepare_to_join()
2          add gossip application state of NET_VERSION, SCHEMA and so on.
3         _gossiper.start_gossiping().get()
4    join_token_ring()
5           _token_metadata.update_normal_tokens(tokens, get_broadcast_address());
6           replicate_to_all_cores().get()
7           storage_service::set_gossip_tokens() which adds the gossip application state of TOKENS and STATUS

The race talked above is at line 3 and line 6.

To fix, we can replicate the token_metadata early after it is filled
with the tokens read from system table before gossip starts. So that
when other nodes think this restarting node is up, the tokens are
already replicated to all the shards.

In addition, this patch also fixes the issue that other nodes might see
a node miss the TOKENS and STATUS application state in gossip if that
node failed in the middle of a restarting process, i.e., it is killed
after line 3 and before line 7. As a result we could not replace the
node.

Tests: update_cluster_layout_tests.py
Fixes: #4709
Fixes: #4723
(cherry picked from commit 3b39a59135)
2019-09-22 12:46:36 +03:00
Avi Kivity
1c40a0fcd2 Update seastar submodule
* seastar ea859b5840...af3fc691b9 (1):
  > iotune: fix exception handling in case test file creation fails

Fixes #5001.
2019-09-18 18:37:23 +03:00
Gleb Natapov
e10735852b messaging_service: configure different streaming domain for each rpc server
A streaming domain identifies a server across shards. Each server should
have different one.

Fixes: #4953

Message-Id: <20190908085327.GR21540@scylladb.com>
(cherry picked from commit 9e9f64d90e)
2019-09-09 20:37:40 +03:00
Avi Kivity
42433a25a8 Update seastar submodule
* seastar 445b5126c2...ea859b5840 (1):
  > perftune: fix missing import for logging

Fixes #4958.
2019-09-04 13:50:29 +03:00
Paweł Dziepak
d04d3fa653 mutation_partition: verify row::append_cell() precondition
row::append_cell() has a precondition that the new cell column id needs
to be larger than that of any other already existing cell. If this
precondition is violated the row will end up in an invalid state. This
patch adds assertion to make sure we fail early in such cases.

(cherry picked from commit 060e3f8ac2)
2019-08-23 15:06:18 +02:00
Avi Kivity
1bcc5a1b5c Merge "database: assign proper io priority for streaming view updates" from Piotr
"
Streamed view updates parasitized on writing io priority, which is
reserved for user writes - it's now properly bound to streaming
write priority.

Verified manually by checking appropriate io metrics: scylla_io_queue_total_bytes{class="streaming_write" ...} vs scylla_io_queue_total_bytes{class="query" ...}

Tests: unit(dev)
"

Fixes #4615.

* 'assign_proper_io_priority_to_streaming_view_updates' of https://github.com/psarna/scylla:
  db,view: wrap view update generation in stream scheduling group
  database: assign proper io priority for streaming view updates

(cherry picked from commit 2c7435418a)
2019-08-22 16:21:42 +03:00
Botond Dénes
450b9ac9bf multishard_combining_reader: shard reader: don't stop on non-full prefixes
This patch is a backport of the fix for #4733 (merged to master as
0cf4fab). As the shard reader code has been substantially refactored
post the 3.0 branch cut time, that fix cannot be backported at all,
instead this is a separate fix developed specially for 3.0.

To quickly reiterate, the problem at hand is that when recreating a
previously evicted shard reader of a multishard reader, the position of
the last fragment seen by that reader is used as the position after
which the read resumes. For this we just created a clustering range
starting from *after* the key (open bound). This works well in most
cases but when that last key is a non-full prefix this will also ignore
any still unread clustering rows that falls into that prefix.

This patch doesn't attempt to fix the problem in a systematic way like
the fix in master does, making sure reader recreation works properly
with prefixes as well, instead, for the sake of minimizing the impact,
we simply avoid ending the buffer on a prefix key. This fix is more
naive and can cause over-read when the stream contains lots of
successive range tombstones with prefix positions. On the other hand,
this leads to a *much* simpler fix, and anyway, as reader eviction is
much rarer in 3.0 this should have a lesser impact.

A unit test is also added to make sure the problem is fixed.

Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20190819120748.28168-1-bdenes@scylladb.com>
2019-08-19 15:09:47 +03:00
71 changed files with 1131 additions and 233 deletions

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=3.0.10
VERSION=3.0.11
if test -f version
then

View File

@@ -61,6 +61,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
// - _last_row points at a direct predecessor of the next row which is going to be read.
// Used for populating continuity.
// - _population_range_starts_before_all_rows is set accordingly
// - _underlying is engaged and fast-forwarded
reading_from_underlying,
end_of_stream
@@ -99,7 +100,13 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
// forward progress is not guaranteed in case iterators are getting constantly invalidated.
bool _lower_bound_changed = false;
// Points to the underlying reader conforming to _schema,
// either to *_underlying_holder or _read_context->underlying().underlying().
flat_mutation_reader* _underlying = nullptr;
std::optional<flat_mutation_reader> _underlying_holder;
future<> do_fill_buffer(db::timeout_clock::time_point);
future<> ensure_underlying(db::timeout_clock::time_point);
void copy_from_cache_to_buffer();
future<> process_static_row(db::timeout_clock::time_point);
void move_to_end();
@@ -186,23 +193,22 @@ future<> cache_flat_mutation_reader::process_static_row(db::timeout_clock::time_
return make_ready_future<>();
} else {
_read_context->cache().on_row_miss();
return _read_context->get_next_fragment(timeout).then([this] (mutation_fragment_opt&& sr) {
if (sr) {
assert(sr->is_static_row());
maybe_add_to_cache(sr->as_static_row());
push_mutation_fragment(std::move(*sr));
}
maybe_set_static_row_continuous();
return ensure_underlying(timeout).then([this, timeout] {
return (*_underlying)(timeout).then([this] (mutation_fragment_opt&& sr) {
if (sr) {
assert(sr->is_static_row());
maybe_add_to_cache(sr->as_static_row());
push_mutation_fragment(std::move(*sr));
}
maybe_set_static_row_continuous();
});
});
}
}
inline
void cache_flat_mutation_reader::touch_partition() {
if (_snp->at_latest_version()) {
rows_entry& last_dummy = *_snp->version()->partition().clustered_rows().rbegin();
_snp->tracker()->touch(last_dummy);
}
_snp->touch();
}
inline
@@ -232,14 +238,36 @@ future<> cache_flat_mutation_reader::fill_buffer(db::timeout_clock::time_point t
});
}
inline
future<> cache_flat_mutation_reader::ensure_underlying(db::timeout_clock::time_point timeout) {
if (_underlying) {
return make_ready_future<>();
}
return _read_context->ensure_underlying(timeout).then([this, timeout] {
flat_mutation_reader& ctx_underlying = _read_context->underlying().underlying();
if (ctx_underlying.schema() != _schema) {
_underlying_holder = make_delegating_reader(ctx_underlying);
_underlying_holder->upgrade_schema(_schema);
_underlying = &*_underlying_holder;
} else {
_underlying = &ctx_underlying;
}
});
}
inline
future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
if (_state == state::move_to_underlying) {
if (!_underlying) {
return ensure_underlying(timeout).then([this, timeout] {
return do_fill_buffer(timeout);
});
}
_state = state::reading_from_underlying;
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
: position_in_partition(_upper_bound);
return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
return _underlying->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
return read_from_underlying(timeout);
});
}
@@ -280,7 +308,7 @@ future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_poin
inline
future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::time_point timeout) {
return consume_mutation_fragments_until(_read_context->underlying().underlying(),
return consume_mutation_fragments_until(*_underlying,
[this] { return _state != state::reading_from_underlying || is_buffer_full(); },
[this] (mutation_fragment mf) {
_read_context->cache().on_row_miss();

View File

@@ -130,6 +130,18 @@ query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<ser
}
query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
: query_options(qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names),
std::move(qo->_values),
std::move(qo->_value_views),
qo->_skip_metadata,
std::move(query_options::specific_options{page_size, paging_state, qo->_options.serial_consistency, qo->_options.timestamp}),
qo->_cql_serialization_format) {
}
query_options::query_options(std::vector<cql3::raw_value> values)
: query_options(
db::consistency_level::ONE, infinite_timeout_config, std::move(values))

View File

@@ -102,7 +102,7 @@ private:
public:
query_options(query_options&&) = default;
query_options(const query_options&) = delete;
explicit query_options(const query_options&) = default;
explicit query_options(db::consistency_level consistency,
const timeout_config& timeouts,
@@ -155,6 +155,7 @@ public:
explicit query_options(db::consistency_level, const timeout_config& timeouts,
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
const timeout_config& get_timeout_config() const { return _timeout_config; }

View File

@@ -142,7 +142,7 @@ shared_ptr<selector::factory>
selectable::with_field_selection::new_selector_factory(database& db, schema_ptr s, std::vector<const column_definition*>& defs) {
auto&& factory = _selected->new_selector_factory(db, s, defs);
auto&& type = factory->new_instance()->get_type();
auto&& ut = dynamic_pointer_cast<const user_type_impl>(std::move(type));
auto&& ut = dynamic_pointer_cast<const user_type_impl>(type->underlying_type());
if (!ut) {
throw exceptions::invalid_request_exception(
sprint("Invalid field selection: %s of type %s is not a user type",

View File

@@ -165,7 +165,7 @@ alter_type_statement::add_or_alter::add_or_alter(const ut_name& name, bool is_ad
user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_update) const
{
if (get_idx_of_field(to_update, _field_name)) {
throw exceptions::invalid_request_exception(sprint("Cannot add new field %s to type %s: a field of the same name already exists", _field_name->name(), _name.to_string()));
throw exceptions::invalid_request_exception(sprint("Cannot add new field %s to type %s: a field of the same name already exists", _field_name->to_string(), _name.to_string()));
}
std::vector<bytes> new_names(to_update->field_names());
@@ -173,7 +173,7 @@ user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_
std::vector<data_type> new_types(to_update->field_types());
auto&& add_type = _field_type->prepare(db, keyspace())->get_type();
if (add_type->references_user_type(to_update->_keyspace, to_update->_name)) {
throw exceptions::invalid_request_exception(sprint("Cannot add new field %s of type %s to type %s as this would create a circular reference", _field_name->name(), _field_type->to_string(), _name.to_string()));
throw exceptions::invalid_request_exception(sprint("Cannot add new field %s of type %s to type %s as this would create a circular reference", _field_name->to_string(), _field_type->to_string(), _name.to_string()));
}
new_types.push_back(std::move(add_type));
return user_type_impl::get_instance(to_update->_keyspace, to_update->_name, std::move(new_names), std::move(new_types));
@@ -183,13 +183,13 @@ user_type alter_type_statement::add_or_alter::do_alter(database& db, user_type t
{
stdx::optional<uint32_t> idx = get_idx_of_field(to_update, _field_name);
if (!idx) {
throw exceptions::invalid_request_exception(sprint("Unknown field %s in type %s", _field_name->name(), _name.to_string()));
throw exceptions::invalid_request_exception(sprint("Unknown field %s in type %s", _field_name->to_string(), _name.to_string()));
}
auto previous = to_update->field_types()[*idx];
auto new_type = _field_type->prepare(db, keyspace())->get_type();
if (!new_type->is_compatible_with(*previous)) {
throw exceptions::invalid_request_exception(sprint("Type %s in incompatible with previous type %s of field %s in user type %s", _field_type->to_string(), previous->as_cql3_type()->to_string(), _field_name->name(), _name.to_string()));
throw exceptions::invalid_request_exception(sprint("Type %s in incompatible with previous type %s of field %s in user type %s", _field_type->to_string(), previous->as_cql3_type()->to_string(), _field_name->to_string(), _name.to_string()));
}
std::vector<data_type> new_types(to_update->field_types());

View File

@@ -522,8 +522,8 @@ indexed_table_select_statement::prepare_command_for_base_query(const query_optio
return cmd;
}
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
indexed_table_select_statement::do_execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
@@ -582,22 +582,27 @@ indexed_table_select_statement::execute_base_query(
}).then([&merger]() {
return merger.get();
});
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
});
}
// Function for fetching the selected columns from a list of clustering rows.
// It is currently used only in our Secondary Index implementation - ordinary
// CQL SELECT statements do not have the syntax to request a list of rows.
// FIXME: The current implementation is very inefficient - it requests each
// row separately (and, incrementally, in parallel). Even multiple rows from a single
// partition are requested separately. This last case can be easily improved,
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(proxy, std::move(partition_ranges), state, options, now, paging_state).then(
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
indexed_table_select_statement::do_execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
@@ -652,9 +657,23 @@ indexed_table_select_statement::execute_base_query(
});
}).then([&merger] () {
return merger.get();
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
});
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
});
}
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(proxy, std::move(primary_keys), state, options, now, paging_state).then(
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
});
}
@@ -929,6 +948,60 @@ indexed_table_select_statement::do_execute(service::storage_proxy& proxy,
}
}
// Aggregated and paged filtering needs to aggregate the results from all pages
// in order to avoid returning partial per-page results (issue #4540).
// It's a little bit more complicated than regular aggregation, because each paging state
// needs to be translated between the base table and the underlying view.
// The routine below keeps fetching pages from the underlying view, which are then
// used to fetch base rows, which go straight to the result set builder.
// A local, internal copy of query_options is kept in order to keep updating
// the paging state between requesting data from replicas.
const bool aggregate = _selection->is_aggregate();
if (aggregate) {
const bool restrictions_need_filtering = _restrictions->need_filtering();
return do_with(cql3::selection::result_set_builder(*_selection, now, options.get_cql_serialization_format()), std::make_unique<cql3::query_options>(cql3::query_options(options)),
[this, &options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] (cql3::selection::result_set_builder& builder, std::unique_ptr<cql3::query_options>& internal_options) {
// page size is set to the internal count page size, regardless of the user-provided value
internal_options.reset(new cql3::query_options(std::move(internal_options), options.get_paging_state(), DEFAULT_COUNT_PAGE_SIZE));
return repeat([this, &builder, &options, &internal_options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] () {
auto consume_results = [this, &builder, &options, &internal_options, restrictions_need_filtering] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
if (restrictions_need_filtering) {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection,
cql3::selection::result_set_builder::restrictions_filter(_restrictions, options, cmd->row_limit)));
} else {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection));
}
};
if (whole_partitions || partition_slices) {
return find_index_partition_ranges(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (dht::partition_range_vector partition_ranges, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return do_execute_base_query(proxy, std::move(partition_ranges), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
} else {
return find_index_clustering_rows(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (std::vector<primary_key> primary_keys, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return this->do_execute_base_query(proxy, std::move(primary_keys), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
}
}).then([this, &builder, restrictions_need_filtering] () {
auto rs = builder.build();
update_stats_rows_read(rs->size());
_stats.filtered_rows_matched_total += restrictions_need_filtering ? rs->size() : 0;
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
});
}
if (whole_partitions || partition_slices) {
// In this case, can use our normal query machinery, which retrieves
// entire partitions or the same slice for many partitions.

View File

@@ -67,8 +67,8 @@ class select_statement : public cql_statement {
public:
using parameters = raw::select_statement::parameters;
using ordering_comparator_type = raw::select_statement::ordering_comparator_type;
protected:
static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000;
protected:
static thread_local const ::shared_ptr<parameters> _default_parameters;
schema_ptr _schema;
uint32_t _bound_terms;
@@ -213,6 +213,14 @@ private:
lw_shared_ptr<query::read_command>
prepare_command_for_base_query(const query_options& options, service::query_state& state, gc_clock::time_point now, bool use_paging);
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
do_execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
future<shared_ptr<cql_transport::messages::result_message>>
execute_base_query(
service::storage_proxy& proxy,
@@ -222,6 +230,23 @@ private:
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
// Function for fetching the selected columns from a list of clustering rows.
// It is currently used only in our Secondary Index implementation - ordinary
// CQL SELECT statements do not have the syntax to request a list of rows.
// FIXME: The current implementation is very inefficient - it requests each
// row separately (and, incrementally, in parallel). Even multiple rows from a single
// partition are requested separately. This last case can be easily improved,
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
do_execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
future<shared_ptr<cql_transport::messages::result_message>>
execute_base_query(
service::storage_proxy& proxy,

View File

@@ -54,7 +54,7 @@ public:
column->ks_name,
column->cf_name,
::make_shared<column_identifier>(sprint("%s[%d]", column->name, component), true),
static_pointer_cast<const tuple_type_impl>(column->type)->type(component));
static_pointer_cast<const tuple_type_impl>(column->type->underlying_type())->type(component));
}
/**
@@ -112,7 +112,7 @@ public:
private:
void validate_assignable_to(database& db, const sstring& keyspace, shared_ptr<column_specification> receiver) {
auto tt = dynamic_pointer_cast<const tuple_type_impl>(receiver->type);
auto tt = dynamic_pointer_cast<const tuple_type_impl>(receiver->type->underlying_type());
if (!tt) {
throw exceptions::invalid_request_exception(sprint("Invalid tuple type literal for %s of type %s", receiver->name, receiver->type->as_cql3_type()));
}

View File

@@ -2233,6 +2233,10 @@ void backlog_controller::adjust() {
float backlog_controller::backlog_of_shares(float shares) const {
size_t idx = 1;
// No control points means the controller is disabled.
if (_control_points.size() == 0) {
return 1.0f;
}
while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {
idx++;
}

View File

@@ -892,7 +892,7 @@ public:
}
private:
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source) const;
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source, const io_priority_class& io_priority) const;
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
future<> generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,

View File

@@ -1189,6 +1189,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
}
}
/// \brief Helper for ensuring a file is closed if an exception is thrown.
///
/// The file provided by the file_fut future is passed to func.
/// * If func throws an exception E, the file is closed and we return
/// a failed future with E.
/// * If func returns a value V, the file is not closed and we return
/// a future with V.
/// Note that when an exception is not thrown, it is the
/// responsibility of func to make sure the file will be closed. It
/// can close the file itself, return it, or store it somewhere.
///
/// \tparam Func The type of function this wraps
/// \param file_fut A future that produces a file
/// \param func A function that uses a file
/// \return A future that passes the file produced by file_fut to func
/// and closes it if func fails
template <typename Func>
static auto close_on_failure(future<file> file_fut, Func func) {
return file_fut.then([func = std::move(func)](file f) {
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
using futurator = futurize<std::result_of_t<Func(file)>>;
return futurator::make_exception_future(e);
});
});
});
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment(bool active) {
static const auto flags = open_flags::wo | open_flags::create;
@@ -1219,7 +1247,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
return fut;
});
return fut.then([this, d, active, filename](file f) {
return close_on_failure(std::move(fut), [this, d, active, filename] (file f) {
f = make_checked_file(commit_error_handler, f);
// xfs doesn't like files extended betond eof, so enlarge the file
return f.truncate(max_size).then([this, d, active, f, filename] () mutable {
@@ -1757,7 +1785,7 @@ db::commitlog::read_log_file(const sstring& filename, seastar::io_priority_class
}
if (magic != segment::segment_magic) {
throw std::invalid_argument("Not a scylla format commitlog file");
throw invalid_segment_format();
}
crc32_nbo crc;
crc.process(ver);
@@ -1766,7 +1794,7 @@ db::commitlog::read_log_file(const sstring& filename, seastar::io_priority_class
auto cs = crc.checksum();
if (cs != checksum) {
throw std::runtime_error("Checksum error in file header");
throw header_checksum_error();
}
this->id = id;

View File

@@ -342,18 +342,40 @@ public:
typedef std::function<future<>(temporary_buffer<char>, replay_position)> commit_load_reader_func;
class segment_data_corruption_error: public std::runtime_error {
class segment_error : public std::exception {};
class segment_data_corruption_error: public segment_error {
std::string _msg;
public:
segment_data_corruption_error(std::string msg, uint64_t s)
: std::runtime_error(msg), _bytes(s) {
: _msg(std::move(msg)), _bytes(s) {
}
uint64_t bytes() const {
return _bytes;
}
virtual const char* what() const noexcept {
return _msg.c_str();
}
private:
uint64_t _bytes;
};
class invalid_segment_format : public segment_error {
static constexpr const char* _msg = "Not a scylla format commitlog file";
public:
virtual const char* what() const noexcept {
return _msg;
}
};
class header_checksum_error : public segment_error {
static constexpr const char* _msg = "Checksum error in file header";
public:
virtual const char* what() const noexcept {
return _msg;
}
};
static future<std::unique_ptr<subscription<temporary_buffer<char>, replay_position>>> read_log_file(
const sstring&, seastar::io_priority_class read_io_prio_class, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr);
private:

View File

@@ -82,6 +82,9 @@ void manager::register_metrics(const sstring& group_name) {
sm::make_derive("discarded", _stats.discarded,
sm::description("Number of hints that were discarded during sending (too old, schema changed, etc.).")),
sm::make_derive("corrupted_files", _stats.corrupted_files,
sm::description("Number of hints files that were discarded during sending because the file was corrupted.")),
});
}
@@ -114,8 +117,8 @@ future<> manager::stop() {
return _draining_eps_gate.close().finally([this] {
return parallel_for_each(_ep_managers, [] (auto& pair) {
return pair.second.stop();
}).finally([this] {
return pair.second.stop();
}).finally([this] {
_ep_managers.clear();
manager_logger.info("Stopped");
}).discard_result();
@@ -236,6 +239,8 @@ future<> manager::end_point_hints_manager::stop(drain should_drain) noexcept {
manager::end_point_hints_manager::end_point_hints_manager(const key_type& key, manager& shard_manager)
: _key(key)
, _shard_manager(shard_manager)
, _file_update_mutex_ptr(make_lw_shared<seastar::shared_mutex>())
, _file_update_mutex(*_file_update_mutex_ptr)
, _state(state_set::of<state::stopped>())
, _hints_dir(_shard_manager.hints_dir() / format("{}", _key).c_str())
, _sender(*this, _shard_manager.local_storage_proxy(), _shard_manager.local_db(), _shard_manager.local_gossiper())
@@ -244,6 +249,8 @@ manager::end_point_hints_manager::end_point_hints_manager(const key_type& key, m
manager::end_point_hints_manager::end_point_hints_manager(end_point_hints_manager&& other)
: _key(other._key)
, _shard_manager(other._shard_manager)
, _file_update_mutex_ptr(std::move(other._file_update_mutex_ptr))
, _file_update_mutex(*_file_update_mutex_ptr)
, _state(other._state)
, _hints_dir(std::move(other._hints_dir))
, _sender(other._sender, *this)
@@ -513,28 +520,35 @@ void manager::drain_for(gms::inet_address endpoint) {
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint);
with_gate(_draining_eps_gate, [this, endpoint] {
return futurize_apply([this, endpoint] () {
if (utils::fb_utilities::is_me(endpoint)) {
return parallel_for_each(_ep_managers, [] (auto& pair) {
return pair.second.stop(drain::yes).finally([&pair] {
return remove_file(pair.second.hints_dir().c_str());
return with_semaphore(drain_lock(), 1, [this, endpoint] {
return futurize_apply([this, endpoint] () {
if (utils::fb_utilities::is_me(endpoint)) {
return parallel_for_each(_ep_managers, [] (auto& pair) {
return pair.second.stop(drain::yes).finally([&pair] {
return with_file_update_mutex(pair.second, [&pair] {
return remove_file(pair.second.hints_dir().c_str());
});
});
}).finally([this] {
_ep_managers.clear();
});
}).finally([this] {
_ep_managers.clear();
});
} else {
ep_managers_map_type::iterator ep_manager_it = find_ep_manager(endpoint);
if (ep_manager_it != ep_managers_end()) {
return ep_manager_it->second.stop(drain::yes).finally([this, endpoint, hints_dir = ep_manager_it->second.hints_dir()] {
_ep_managers.erase(endpoint);
return remove_file(hints_dir.c_str());
});
}
} else {
ep_managers_map_type::iterator ep_manager_it = find_ep_manager(endpoint);
if (ep_manager_it != ep_managers_end()) {
return ep_manager_it->second.stop(drain::yes).finally([this, endpoint, &ep_man = ep_manager_it->second] {
return with_file_update_mutex(ep_man, [&ep_man] {
return remove_file(ep_man.hints_dir().c_str());
}).finally([this, endpoint] {
_ep_managers.erase(endpoint);
});
});
}
return make_ready_future<>();
}
}).handle_exception([endpoint] (auto eptr) {
manager_logger.error("Exception when draining {}: {}", endpoint, eptr);
return make_ready_future<>();
}
}).handle_exception([endpoint] (auto eptr) {
manager_logger.error("Exception when draining {}: {}", endpoint, eptr);
});
});
});
}
@@ -725,6 +739,10 @@ bool manager::end_point_hints_manager::sender::send_one_file(const sstring& fnam
}, _last_not_complete_rp.pos, &_db.get_config().extensions()).get0();
s->done().get();
} catch (db::commitlog::segment_error& ex) {
manager_logger.error("{}: {}. Dropping...", fname, ex.what());
ctx_ptr->state.remove(send_state::segment_replay_failed);
++this->shard_stats().corrupted_files;
} catch (...) {
manager_logger.trace("sending of {} failed: {}", fname, std::current_exception());
ctx_ptr->state.set(send_state::segment_replay_failed);

View File

@@ -60,6 +60,7 @@ private:
uint64_t dropped = 0;
uint64_t sent = 0;
uint64_t discarded = 0;
uint64_t corrupted_files = 0;
};
// map: shard -> segments
@@ -274,7 +275,8 @@ public:
manager& _shard_manager;
hints_store_ptr _hints_store_anchor;
seastar::gate _store_gate;
seastar::shared_mutex _file_update_mutex;
lw_shared_ptr<seastar::shared_mutex> _file_update_mutex_ptr;
seastar::shared_mutex& _file_update_mutex;
enum class state {
can_hint, // hinting is currently allowed (used by the space_watchdog)
@@ -376,8 +378,20 @@ public:
return _state.contains(state::stopped);
}
seastar::shared_mutex& file_update_mutex() {
return _file_update_mutex;
/// \brief Safely runs a given functor under the file_update_mutex of \ref ep_man
///
/// Runs a given functor under the file_update_mutex of the given end_point_hints_manager instance.
/// This function is safe even if \ref ep_man gets destroyed before the future this function returns resolves
/// (as long as the \ref func call itself is safe).
///
/// \tparam Func Functor type.
/// \param ep_man end_point_hints_manager instance which file_update_mutex we want to lock.
/// \param func Functor to run under the lock.
/// \return Whatever \ref func returns.
template <typename Func>
friend inline auto with_file_update_mutex(end_point_hints_manager& ep_man, Func&& func) {
lw_shared_ptr<seastar::shared_mutex> lock_ptr = ep_man._file_update_mutex_ptr;
return with_lock(*lock_ptr, std::forward<Func>(func)).finally([lock_ptr] {});
}
const boost::filesystem::path& hints_dir() const noexcept {
@@ -385,6 +399,10 @@ public:
}
private:
seastar::shared_mutex& file_update_mutex() noexcept {
return _file_update_mutex;
}
/// \brief Creates a new hints store object.
///
/// - Creates a hints store directory if doesn't exist: <shard_hints_dir>/<ep_key>
@@ -451,6 +469,7 @@ private:
stats _stats;
seastar::metrics::metric_groups _metrics;
std::unordered_set<ep_key_type> _eps_with_pending_hints;
seastar::semaphore _drain_lock = {1};
public:
manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, resource_manager&res_manager, distributed<database>& db);
@@ -529,6 +548,10 @@ public:
return _hints_dir_device_id;
}
seastar::semaphore& drain_lock() noexcept {
return _drain_lock;
}
void allow_hints();
void forbid_hints();
void forbid_hints_for_eps_with_pending_hints();

View File

@@ -90,16 +90,27 @@ future<> space_watchdog::stop() noexcept {
return std::move(_started);
}
// Called under the end_point_hints_manager::file_update_mutex() of the corresponding end_point_hints_manager instance.
future<> space_watchdog::scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key) {
return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key, &shard_manager] (lister::path dir, directory_entry de) {
// Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory
if (_files_count == 1) {
shard_manager.add_ep_with_pending_hints(ep_key);
}
++_files_count;
return do_with(std::move(path), [this, ep_key, &shard_manager] (boost::filesystem::path& path) {
// It may happen that we get here and the directory has already been deleted in the context of manager::drain_for().
// In this case simply bail out.
return engine().file_exists(path.native()).then([this, ep_key, &shard_manager, &path] (bool exists) {
if (!exists) {
return make_ready_future<>();
} else {
return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key, &shard_manager] (lister::path dir, directory_entry de) {
// Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory
if (_files_count == 1) {
shard_manager.add_ep_with_pending_hints(ep_key);
}
++_files_count;
return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) {
_total_size += fsize;
return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) {
_total_size += fsize;
});
});
}
});
});
}
@@ -137,7 +148,7 @@ void space_watchdog::on_timer() {
// continue to enumeration - there is no one to change them.
auto it = shard_manager.find_ep_manager(de.name);
if (it != shard_manager.ep_managers_end()) {
return with_lock(it->second.file_update_mutex(), [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)]() mutable {
return with_file_update_mutex(it->second, [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable {
return scan_one_ep_dir(dir / ep_name.c_str(), shard_manager, ep_key_type(ep_name));
});
} else {

View File

@@ -598,7 +598,7 @@ public:
future<> flush_schemas() {
return _qp.proxy().get_db().invoke_on_all([this] (database& db) {
return parallel_for_each(db::schema_tables::ALL, [this, &db](const sstring& cf_name) {
return parallel_for_each(db::schema_tables::all_table_names(), [this, &db](const sstring& cf_name) {
auto& cf = db.find_column_family(db::schema_tables::NAME, cf_name);
return cf.flush();
});

View File

@@ -194,8 +194,6 @@ static void prepare_builder_from_table_row(const schema_ctxt&, schema_builder&,
using namespace v3;
std::vector<const char*> ALL { KEYSPACES, TABLES, SCYLLA_TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES };
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
future<> save_system_schema(const sstring & ksname) {
@@ -203,7 +201,7 @@ future<> save_system_schema(const sstring & ksname) {
auto ksm = ks.metadata();
// delete old, possibly obsolete entries in schema tables
return parallel_for_each(ALL, [ksm] (sstring cf) {
return parallel_for_each(all_table_names(), [ksm] (sstring cf) {
auto deletion_timestamp = schema_creation_timestamp() - 1;
return db::execute_cql(sprint("DELETE FROM %s.%s USING TIMESTAMP %s WHERE keyspace_name = ?", NAME, cf,
deletion_timestamp), ksm->name()).discard_result();
@@ -598,7 +596,7 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
}
};
return do_with(md5_hasher(), [map, reduce] (auto& hash) {
return do_for_each(ALL.begin(), ALL.end(), [&hash, map, reduce] (auto& table) {
return do_for_each(all_table_names(), [&hash, map, reduce] (auto& table) {
return map(table).then([&hash, reduce] (auto&& mutations) {
reduce(hash, mutations);
});
@@ -629,7 +627,7 @@ future<std::vector<frozen_mutation>> convert_schema_to_mutations(distributed<ser
std::move(mutations.begin(), mutations.end(), std::back_inserter(result));
return std::move(result);
};
return map_reduce(ALL.begin(), ALL.end(), map, std::vector<frozen_mutation>{}, reduce);
return map_reduce(all_table_names(), map, std::vector<frozen_mutation>{}, reduce);
}
future<schema_result>
@@ -2663,12 +2661,22 @@ data_type parse_type(sstring str)
}
std::vector<schema_ptr> all_tables() {
// Don't forget to update this list when new schema tables are added.
// The listed schema tables are the ones synchronized between nodes,
// and forgetting one of them in this list can cause bugs like #4339.
return {
keyspaces(), tables(), scylla_tables(), columns(), dropped_columns(), triggers(),
views(), indexes(), types(), functions(), aggregates(), view_virtual_columns()
};
}
const std::vector<sstring>& all_table_names() {
static thread_local std::vector<sstring> all =
boost::copy_range<std::vector<sstring>>(all_tables() |
boost::adaptors::transformed([] (auto schema) { return schema->cf_name(); }));
return all;
}
namespace legacy {
table_schema_version schema_mutations::digest() const {

View File

@@ -127,9 +127,8 @@ using namespace v3;
// Replication of schema between nodes with different version is inhibited.
extern const sstring version;
extern std::vector<const char*> ALL;
std::vector<schema_ptr> all_tables();
const std::vector<sstring>& all_table_names();
// saves/creates "ks" + all tables etc, while first deleting all old schema entries (will be rewritten)
future<> save_system_schema(const sstring & ks);

View File

@@ -445,7 +445,7 @@ void create_virtual_column(schema_builder& builder, const bytes& name, const dat
// A map has keys and values. We don't need these values,
// and can use empty values instead.
auto mtype = dynamic_pointer_cast<const map_type_impl>(type);
builder.with_column(name, map_type_impl::get_instance(mtype->get_values_type(), empty_type, true), column_kind::regular_column, column_view_virtual::yes);
builder.with_column(name, map_type_impl::get_instance(mtype->get_keys_type(), empty_type, true), column_kind::regular_column, column_view_virtual::yes);
} else if (ctype->is_set()) {
// A set's cell has nothing beyond the keys, so the
// virtual version of a set is, unfortunately, a complete

View File

@@ -24,7 +24,9 @@
namespace db::view {
future<> view_update_from_staging_generator::start() {
_started = seastar::async([this]() mutable {
thread_attributes attr;
attr.sched_group = _db.get_streaming_scheduling_group();
_started = seastar::async(std::move(attr), [this]() mutable {
while (!_as.abort_requested()) {
if (_sstables_with_tables.empty()) {
_pending_sstables.wait().get();

View File

@@ -449,6 +449,8 @@ def create_perftune_conf(nic='eth0'):
def is_valid_nic(nic):
if len(nic) == 0:
return False
return os.path.exists('/sys/class/net/{}'.format(nic))
# Remove this when we do not support SET_NIC configuration value anymore

View File

@@ -23,6 +23,7 @@
#include "mutation_reader.hh"
#include "seastar/util/reference_wrapper.hh"
#include "clustering_ranges_walker.hh"
#include "schema_upgrader.hh"
#include <algorithm>
#include <boost/range/adaptor/transformed.hpp>
@@ -871,3 +872,7 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, std::deque<mutation_
}
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(fragments), pr);
}
void flat_mutation_reader::do_upgrade_schema(const schema_ptr& s) {
*this = transform(std::move(*this), schema_upgrader(s));
}

View File

@@ -328,6 +328,7 @@ private:
flat_mutation_reader() = default;
explicit operator bool() const noexcept { return bool(_impl); }
friend class optimized_optional<flat_mutation_reader>;
void do_upgrade_schema(const schema_ptr&);
public:
// Documented in mutation_reader::forwarding in mutation_reader.hh.
class partition_range_forwarding_tag;
@@ -466,6 +467,14 @@ public:
void move_buffer_content_to(impl& other) {
_impl->move_buffer_content_to(other);
}
// Causes this reader to conform to s.
// Multiple calls of upgrade_schema() compose, effects of prior calls on the stream are preserved.
void upgrade_schema(const schema_ptr& s) {
if (__builtin_expect(s != schema(), false)) {
do_upgrade_schema(s);
}
}
};
using flat_mutation_reader_opt = optimized_optional<flat_mutation_reader>;
@@ -568,8 +577,12 @@ class delegating_reader : public flat_mutation_reader::impl {
public:
delegating_reader(Underlying&& r) : impl(to_reference(r).schema()), _underlying(std::forward<Underlying>(r)) { }
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return fill_buffer_from(to_reference(_underlying), timeout).then([this] (bool underlying_finished) {
_end_of_stream = underlying_finished;
if (is_buffer_full()) {
return make_ready_future<>();
}
return to_reference(_underlying).fill_buffer(timeout).then([this] {
_end_of_stream = to_reference(_underlying).is_end_of_stream();
to_reference(_underlying).move_buffer_content_to(*this);
});
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {

View File

@@ -483,8 +483,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation();
int remote_generation = remote_state.get_heart_beat_state().get_generation();
logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation);
// A node was removed with nodetool removenode can have a generation of 2
if (local_generation > 2 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) {
if (remote_generation > service::get_generation_number() + MAX_GENERATION_DIFFERENCE) {
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
ep, local_generation, remote_generation);

View File

@@ -156,7 +156,9 @@ public:
static constexpr std::chrono::milliseconds INTERVAL{1000};
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
/** Maximimum difference in generation and version values we are willing to accept about a peer */
// Maximimum difference between remote generation value and generation
// value this node would get if this node were restarted that we are
// willing to accept about a peer.
static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365;
std::chrono::milliseconds fat_client_timeout;

View File

@@ -26,6 +26,6 @@ class partition {
class reconcilable_result {
uint32_t row_count();
std::vector<partition> partitions();
utils::chunked_vector<partition> partitions();
query::short_read is_short_read() [[version 1.6]] = query::short_read::no;
};

View File

@@ -44,6 +44,7 @@ void init_storage_service(distributed<database>& db, sharded<auth::service>& aut
}
void init_ms_fd_gossiper(sharded<gms::feature_service>& features
, db::config& cfg
, sstring listen_address_in
, uint16_t storage_port
, uint16_t ssl_storage_port
@@ -156,6 +157,10 @@ void init_ms_fd_gossiper(sharded<gms::feature_service>& features
to_string(seeds), listen_address_in, broadcast_address);
throw std::runtime_error("Use broadcast_address for seeds list");
}
if ((!cfg.replace_address_first_boot().empty() || !cfg.replace_address().empty()) && seeds.count(broadcast_address)) {
startlog.error("Bad configuration: replace-address and replace-address-first-boot are not allowed for seed nodes");
throw bad_configuration_error();
}
gms::get_gossiper().start(std::ref(features)).get();
auto& gossiper = gms::get_local_gossiper();
gossiper.set_seeds(seeds);

View File

@@ -51,6 +51,7 @@ struct init_scheduling_config {
};
void init_ms_fd_gossiper(sharded<gms::feature_service>& features
, db::config& config
, sstring listen_address
, uint16_t storage_port
, uint16_t ssl_storage_port

View File

@@ -609,6 +609,7 @@ int main(int ac, char** av) {
scfg.streaming = dbcfg.streaming_scheduling_group;
scfg.gossip = scheduling_group();
init_ms_fd_gossiper(feature_service
, *cfg
, listen_address
, storage_port
, ssl_storage_port

View File

@@ -24,7 +24,6 @@
#include "frozen_mutation.hh"
#include "stdx.hh"
#include "partition_snapshot_reader.hh"
#include "schema_upgrader.hh"
#include "partition_builder.hh"
memtable::memtable(schema_ptr schema, dirty_memory_manager& dmm, memtable_list* memtable_list,
@@ -343,11 +342,8 @@ public:
bool digest_requested = _slice.options.contains<query::partition_slice::option::with_digest>();
auto mpsr = make_partition_snapshot_flat_reader(snp_schema, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no);
if (snp_schema->version() != schema()->version()) {
_delegate = transform(std::move(mpsr), schema_upgrader(schema()));
} else {
_delegate = std::move(mpsr);
}
mpsr.upgrade_schema(schema());
_delegate = std::move(mpsr);
} else {
_end_of_stream = true;
}
@@ -502,11 +498,8 @@ private:
auto snp_schema = key_and_snp->second->schema();
auto mpsr = make_partition_snapshot_flat_reader<partition_snapshot_accounter>(snp_schema, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory);
if (snp_schema->version() != schema()->version()) {
_partition_reader = transform(std::move(mpsr), schema_upgrader(schema()));
} else {
_partition_reader = std::move(mpsr);
}
mpsr.upgrade_schema(schema());
_partition_reader = std::move(mpsr);
}
}
public:
@@ -582,11 +575,8 @@ memtable::make_flat_reader(schema_ptr s,
bool digest_requested = slice.options.contains<query::partition_slice::option::with_digest>();
auto rd = make_partition_snapshot_flat_reader(snp_schema, std::move(dk), std::move(cr), std::move(snp), digest_requested,
*this, _read_section, shared_from_this(), fwd);
if (snp_schema->version() != s->version()) {
return transform(std::move(rd), schema_upgrader(s));
} else {
return rd;
}
rd.upgrade_schema(s);
return rd;
} else {
auto res = make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), range, slice, pc, fwd_mr);
if (fwd == streamed_mutation::forwarding::yes) {
@@ -701,13 +691,19 @@ bool memtable::is_flushed() const {
return bool(_underlying);
}
void memtable_entry::upgrade_schema(const schema_ptr& s, mutation_cleaner& cleaner) {
if (_schema != s) {
partition().upgrade(_schema, s, cleaner, no_cache_tracker);
_schema = s;
}
}
void memtable::upgrade_entry(memtable_entry& e) {
if (e._schema != _schema) {
assert(!reclaiming_enabled());
with_allocator(allocator(), [this, &e] {
with_linearized_managed_bytes([&] {
e.partition().upgrade(e._schema, _schema, cleaner(), no_cache_tracker);
e._schema = _schema;
e.upgrade_schema(_schema, cleaner());
});
});
}

View File

@@ -68,6 +68,10 @@ public:
schema_ptr& schema() { return _schema; }
partition_snapshot_ptr snapshot(memtable& mtbl);
// Makes the entry conform to given schema.
// Must be called under allocating section of the region which owns the entry.
void upgrade_schema(const schema_ptr&, mutation_cleaner&);
size_t external_memory_usage_without_rows() const {
return _key.key().external_memory_usage();
}

View File

@@ -282,25 +282,26 @@ void messaging_service::start_listen() {
if (_compress_what != compress_what::none) {
so.compressor_factory = &compressor_factory;
}
so.streaming_domain = rpc::streaming_domain_type(0x55AA);
// FIXME: we don't set so.tcp_nodelay, because we can't tell at this point whether the connection will come from a
// local or remote datacenter, and whether or not the connection will be used for gossip. We can fix
// the first by wrapping its server_socket, but not the second.
auto limits = rpc_resource_limits(_mcfg.rpc_memory_limit);
if (!_server[0]) {
auto listen = [&] (const gms::inet_address& a) {
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
so.streaming_domain = sdomain;
auto addr = ipv4_addr{a.raw_addr(), _port};
return std::unique_ptr<rpc_protocol_server_wrapper>(new rpc_protocol_server_wrapper(*_rpc,
so, addr, limits));
};
_server[0] = listen(_listen_address);
_server[0] = listen(_listen_address, rpc::streaming_domain_type(0x55AA));
if (listen_to_bc) {
_server[1] = listen(utils::fb_utilities::get_broadcast_address());
_server[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x66BB));
}
}
if (!_server_tls[0]) {
auto listen = [&] (const gms::inet_address& a) {
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
so.streaming_domain = sdomain;
return std::unique_ptr<rpc_protocol_server_wrapper>(
[this, &so, &a, limits] () -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_encrypt_what == encrypt_what::none) {
@@ -313,9 +314,9 @@ void messaging_service::start_listen() {
so, seastar::tls::listen(_credentials, addr, lo), limits);
}());
};
_server_tls[0] = listen(_listen_address);
_server_tls[0] = listen(_listen_address, rpc::streaming_domain_type(0x77CC));
if (listen_to_bc) {
_server_tls[1] = listen(utils::fb_utilities::get_broadcast_address());
_server_tls[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x88DD));
}
}
// Do this on just cpu 0, to avoid duplicate logs.
@@ -593,6 +594,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
opts.compressor_factory = &compressor_factory;
}
opts.tcp_nodelay = must_tcp_nodelay;
opts.reuseaddr = true;
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),

View File

@@ -1162,6 +1162,7 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
void
row::append_cell(column_id id, atomic_cell_or_collection value) {
if (_type == storage_type::vector && id < max_vector_size) {
assert(_storage.vector.v.size() <= id);
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
_storage.vector.present.set(id);

View File

@@ -387,7 +387,7 @@ public:
if (is_missing() || _ttl == dead) {
return false;
}
if (_ttl != no_ttl && _expiry < now) {
if (_ttl != no_ttl && _expiry <= now) {
return false;
}
return _timestamp > t.timestamp;
@@ -397,7 +397,7 @@ public:
if (_ttl == dead) {
return true;
}
return _ttl != no_ttl && _expiry < now;
return _ttl != no_ttl && _expiry <= now;
}
// Can be called only when is_live().
bool is_expiring() const {
@@ -435,7 +435,7 @@ public:
_timestamp = api::missing_timestamp;
return false;
}
if (_ttl > no_ttl && _expiry < now) {
if (_ttl > no_ttl && _expiry <= now) {
_expiry -= _ttl;
_ttl = dead;
}

View File

@@ -31,7 +31,7 @@ reconcilable_result::reconcilable_result()
: _row_count(0)
{ }
reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partition> p, query::short_read short_read,
reconcilable_result::reconcilable_result(uint32_t row_count, utils::chunked_vector<partition> p, query::short_read short_read,
query::result_memory_tracker memory_tracker)
: _row_count(row_count)
, _short_read(short_read)
@@ -39,11 +39,11 @@ reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partiti
, _partitions(std::move(p))
{ }
const std::vector<partition>& reconcilable_result::partitions() const {
const utils::chunked_vector<partition>& reconcilable_result::partitions() const {
return _partitions;
}
std::vector<partition>& reconcilable_result::partitions() {
utils::chunked_vector<partition>& reconcilable_result::partitions() {
return _partitions;
}

View File

@@ -27,6 +27,7 @@
#include "frozen_mutation.hh"
#include "db/timeout_clock.hh"
#include "querier.hh"
#include "utils/chunked_vector.hh"
#include <seastar/core/execution_stage.hh>
class reconcilable_result;
@@ -72,17 +73,17 @@ class reconcilable_result {
uint32_t _row_count;
query::short_read _short_read;
query::result_memory_tracker _memory_tracker;
std::vector<partition> _partitions;
utils::chunked_vector<partition> _partitions;
public:
~reconcilable_result();
reconcilable_result();
reconcilable_result(reconcilable_result&&) = default;
reconcilable_result& operator=(reconcilable_result&&) = default;
reconcilable_result(uint32_t row_count, std::vector<partition> partitions, query::short_read short_read,
reconcilable_result(uint32_t row_count, utils::chunked_vector<partition> partitions, query::short_read short_read,
query::result_memory_tracker memory_tracker = { });
const std::vector<partition>& partitions() const;
std::vector<partition>& partitions();
const utils::chunked_vector<partition>& partitions() const;
utils::chunked_vector<partition>& partitions();
uint32_t row_count() const {
return _row_count;
@@ -112,7 +113,7 @@ class reconcilable_result_builder {
const schema& _schema;
const query::partition_slice& _slice;
std::vector<partition> _result;
utils::chunked_vector<partition> _result;
uint32_t _live_rows{};
bool _has_ck_selector{};

View File

@@ -810,11 +810,19 @@ future<> foreign_reader::ensure_buffer_contains_all_fragments_for_last_pos(flat_
if (reader.is_buffer_empty()) {
return reader.is_end_of_stream();
}
const auto& next_pos = reader.peek_buffer().position();
if (next_pos.region() != partition_region::clustered) {
if (!buffer.back().is_range_tombstone()) {
return true;
}
return !next_pos.key().equal(*reader.schema(), buffer.back().position().key());
const auto next_pos = reader.peek_buffer().position();
const auto& last_key = buffer.back().key();
// Ending the buffer on a non-full prefix key position is
// problematic because when recreating the reader we continue
// from *after* the last key we saw. If this is a prefix this
// would exclude all clustering positions that fall into the
// prefix. Fixing this is non-trivial and has little gain over
// just making sure we don't end the buffer on a prefix.
return last_key.is_full(*reader.schema()) && !next_pos.key().equal(*reader.schema(), last_key);
};
return do_until(stop, [&reader, &buffer] {

View File

@@ -172,6 +172,9 @@ tombstone partition_entry::partition_tombstone() const {
partition_snapshot::~partition_snapshot() {
with_allocator(region().allocator(), [this] {
if (_locked) {
touch();
}
if (_version && _version.is_unique_owner()) {
auto v = &*_version;
_version = {};
@@ -268,6 +271,7 @@ partition_entry::~partition_entry() {
return;
}
if (_snapshot) {
assert(!_snapshot->is_locked());
_snapshot->_version = std::move(_version);
_snapshot->_version.mark_as_unique_owner();
_snapshot->_entry = nullptr;
@@ -284,6 +288,7 @@ stop_iteration partition_entry::clear_gently(cache_tracker* tracker) noexcept {
}
if (_snapshot) {
assert(!_snapshot->is_locked());
_snapshot->_version = std::move(_version);
_snapshot->_version.mark_as_unique_owner();
_snapshot->_entry = nullptr;
@@ -311,6 +316,7 @@ stop_iteration partition_entry::clear_gently(cache_tracker* tracker) noexcept {
void partition_entry::set_version(partition_version* new_version)
{
if (_snapshot) {
assert(!_snapshot->is_locked());
_snapshot->_version = std::move(_version);
_snapshot->_entry = nullptr;
}
@@ -459,7 +465,6 @@ public:
coroutine partition_entry::apply_to_incomplete(const schema& s,
partition_entry&& pe,
const schema& pe_schema,
mutation_cleaner& pe_cleaner,
logalloc::allocating_section& alloc,
logalloc::region& reg,
@@ -479,10 +484,6 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
// partitions where I saw 40% slow down.
const bool preemptible = s.clustering_key_size() > 0;
if (s.version() != pe_schema.version()) {
pe.upgrade(pe_schema.shared_from_this(), s.shared_from_this(), pe_cleaner, no_cache_tracker);
}
// When preemptible, later memtable reads could start using the snapshot before
// snapshot's writes are made visible in cache, which would cause them to miss those writes.
// So we cannot allow erasing when preemptible.
@@ -496,6 +497,7 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
}
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
dst_snp->lock();
// Once we start updating the partition, we must keep all snapshots until the update completes,
// otherwise partial writes would be published. So the scope of snapshots must enclose the scope
@@ -570,6 +572,7 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
auto has_next = src_cur.erase_and_advance();
acc.unpin_memory(size);
if (!has_next) {
dst_snp->unlock();
return stop_iteration::yes;
}
} while (!preemptible || !need_preempt());
@@ -661,6 +664,18 @@ partition_snapshot::range_tombstones()
position_in_partition_view::after_all_clustered_rows());
}
void partition_snapshot::touch() noexcept {
// Eviction assumes that older versions are evicted before newer so only the latest snapshot
// can be touched.
if (_tracker && at_latest_version()) {
auto&& rows = version()->partition().clustered_rows();
assert(!rows.empty());
rows_entry& last_dummy = *rows.rbegin();
assert(last_dummy.is_last_dummy());
_tracker->touch(last_dummy);
}
}
std::ostream& operator<<(std::ostream& out, const partition_entry& e) {
out << "{";
bool first = true;
@@ -687,6 +702,7 @@ void partition_entry::evict(mutation_cleaner& cleaner) noexcept {
return;
}
if (_snapshot) {
assert(!_snapshot->is_locked());
_snapshot->_version = std::move(_version);
_snapshot->_version.mark_as_unique_owner();
_snapshot->_entry = nullptr;
@@ -706,3 +722,18 @@ partition_snapshot_ptr::~partition_snapshot_ptr() {
}
}
}
void partition_snapshot::lock() noexcept {
// partition_entry::is_locked() assumes that if there is a locked snapshot,
// it can be found attached directly to it.
assert(at_latest_version());
_locked = true;
}
void partition_snapshot::unlock() noexcept {
// Locked snapshots must always be latest, is_locked() assumes that.
// Also, touch() is only effective when this snapshot is latest.
assert(at_latest_version());
_locked = false;
touch(); // Make the entry evictable again in case it was fully unlinked by eviction attempt.
}

View File

@@ -303,6 +303,7 @@ private:
mutation_cleaner* _cleaner;
cache_tracker* _tracker;
boost::intrusive::slist_member_hook<> _cleaner_hook;
bool _locked = false;
friend class partition_entry;
friend class mutation_cleaner_impl;
public:
@@ -318,6 +319,22 @@ public:
partition_snapshot& operator=(const partition_snapshot&) = delete;
partition_snapshot& operator=(partition_snapshot&&) = delete;
// Makes the snapshot locked.
// See is_locked() for meaning.
// Can be called only when at_lastest_version(). The snapshot must remain latest as long as it's locked.
void lock() noexcept;
// Makes the snapshot no longer locked.
// See is_locked() for meaning.
void unlock() noexcept;
// Tells whether the snapshot is locked.
// Locking the snapshot prevents it from getting detached from the partition entry.
// It also prevents the partition entry from being evicted.
bool is_locked() const {
return _locked;
}
static partition_snapshot& container_of(partition_version_ref* ref) {
return *boost::intrusive::get_parent_from_member(ref, &partition_snapshot::_version);
}
@@ -344,6 +361,9 @@ public:
// to the latest version.
stop_iteration slide_to_oldest() noexcept;
// Brings the snapshot to the front of the LRU.
void touch() noexcept;
// Must be called after snapshot's original region is merged into a different region
// before the original region is destroyed, unless the snapshot is destroyed earlier.
void migrate(logalloc::region* region, mutation_cleaner* cleaner) noexcept {
@@ -503,9 +523,18 @@ public:
return _version->all_elements_reversed();
}
// Tells whether this entry is locked.
// Locked entries are undergoing an update and should not have their snapshots
// detached from the entry.
// Certain methods can only be called when !is_locked().
bool is_locked() const {
return _snapshot && _snapshot->is_locked();
}
// Strong exception guarantees.
// Assumes this instance and mp are fully continuous.
// Use only on non-evictable entries.
// Must not be called when is_locked().
void apply(const schema& s, const mutation_partition& mp, const schema& mp_schema);
void apply(const schema& s, mutation_partition&& mp, const schema& mp_schema);
@@ -526,11 +555,14 @@ public:
// such that if the operation is retried (possibly many times) and eventually
// succeeds the result will be as if the first attempt didn't fail.
//
// The schema of pe must conform to s.
//
// Returns a coroutine object representing the operation.
// The coroutine must be resumed with the region being unlocked.
//
// The coroutine cannot run concurrently with other apply() calls.
coroutine apply_to_incomplete(const schema& s,
partition_entry&& pe,
const schema& pe_schema,
mutation_cleaner& pe_cleaner,
logalloc::allocating_section&,
logalloc::region&,
@@ -539,6 +571,7 @@ public:
real_dirty_memory_accounter&);
// If this entry is evictable, cache_tracker must be provided.
// Must not be called when is_locked().
partition_version& add_version(const schema& s, cache_tracker*);
// Returns a reference to existing version with an active snapshot of given phase
@@ -568,9 +601,11 @@ public:
tombstone partition_tombstone() const;
// needs to be called with reclaiming disabled
// Must not be called when is_locked().
void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*);
// Snapshots with different values of phase will point to different partition_version objects.
// When is_locked(), read() can only be called with a phase which is <= the phase of the current snapshot.
partition_snapshot_ptr read(logalloc::region& region,
mutation_cleaner&,
schema_ptr entry_schema,

View File

@@ -288,11 +288,11 @@ static void insert_querier(
auto& e = entries.emplace_back(key, std::move(q), expires);
e.set_pos(--entries.end());
++stats.population;
if (auto irh = sem.register_inactive_read(std::make_unique<querier_inactive_read>(entries, e.pos(), stats))) {
e.set_inactive_handle(irh);
index.insert(e);
++stats.population;
}
}

View File

@@ -191,7 +191,7 @@ public:
const dht::decorated_key& key() const { return *_key; }
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
private:
public:
future<> ensure_underlying(db::timeout_clock::time_point timeout) {
if (_underlying_snapshot) {
return create_underlying(true, timeout);
@@ -210,18 +210,6 @@ public:
_underlying_snapshot = {};
_key = dk;
}
// Fast forwards the underlying streamed_mutation to given range.
future<> fast_forward_to(position_range range, db::timeout_clock::time_point timeout) {
return ensure_underlying(timeout).then([this, range = std::move(range), timeout] {
return _underlying.underlying().fast_forward_to(std::move(range), timeout);
});
}
// Gets the next fragment from the underlying reader
future<mutation_fragment_opt> get_next_fragment(db::timeout_clock::time_point timeout) {
return ensure_underlying(timeout).then([this, timeout] {
return _underlying.underlying()(timeout);
});
}
};
}

View File

@@ -814,8 +814,10 @@ static future<> repair_cf_range(repair_info& ri,
// still do our best to repair available replicas.
std::vector<gms::inet_address> live_neighbors;
std::vector<partition_checksum> live_neighbors_checksum;
bool local_checksum_failed = false;
for (unsigned i = 0; i < checksums.size(); i++) {
if (checksums[i].failed()) {
local_checksum_failed |= (i == 0);
rlogger.warn(
"Checksum of range {} on {} failed: {}",
range,
@@ -831,7 +833,7 @@ static future<> repair_cf_range(repair_info& ri,
live_neighbors_checksum.push_back(checksums[i].get0());
}
}
if (checksums[0].failed() || live_neighbors.empty()) {
if (local_checksum_failed || live_neighbors.empty()) {
return make_ready_future<>();
}
// If one of the available checksums is different, repair

View File

@@ -32,7 +32,6 @@
#include <sys/sdt.h>
#include "stdx.hh"
#include "read_context.hh"
#include "schema_upgrader.hh"
#include "dirty_memory_manager.hh"
#include "cache_flat_mutation_reader.hh"
#include "real_dirty_memory_accounter.hh"
@@ -350,13 +349,11 @@ future<> read_context::create_underlying(bool skip_first_fragment, db::timeout_c
static flat_mutation_reader read_directly_from_underlying(read_context& reader) {
flat_mutation_reader res = make_delegating_reader(reader.underlying().underlying());
if (reader.schema()->version() != reader.underlying().underlying().schema()->version()) {
res = transform(std::move(res), schema_upgrader(reader.schema()));
}
if (reader.fwd() == streamed_mutation::forwarding::no) {
res = make_nonforwardable(std::move(res), true);
}
return std::move(res);
res.upgrade_schema(reader.schema());
return res;
}
// Reader which populates the cache using data from the delegate.
@@ -947,7 +944,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
});
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
coroutine update;
size_t size_entry;
// In case updater fails, we must bring the cache to consistency without deferring.
auto cleanup = defer([&m, this] {
@@ -955,6 +951,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
_prev_snapshot_pos = {};
_prev_snapshot = {};
});
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
while (!m.partitions.empty()) {
with_allocator(_tracker.allocator(), [&] () {
@@ -1026,8 +1023,10 @@ future<> row_cache::update(external_updater eu, memtable& m) {
if (cache_i != partitions_end() && cache_i->key().equal(*_schema, mem_e.key())) {
cache_entry& entry = *cache_i;
upgrade_entry(entry);
assert(entry._schema == _schema);
_tracker.on_partition_merge();
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), _tracker.memtable_cleaner(),
mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner());
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(),
alloc, _tracker.region(), _tracker, _underlying_phase, acc);
} else if (cache_i->continuous()
|| with_allocator(standard_allocator(), [&] { return is_present(mem_e.key()); })
@@ -1039,7 +1038,8 @@ future<> row_cache::update(external_updater eu, memtable& m) {
entry->set_continuous(cache_i->continuous());
_tracker.insert(*entry);
_partitions.insert_before(cache_i, *entry);
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), *mem_e.schema(), _tracker.memtable_cleaner(),
mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner());
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(),
alloc, _tracker.region(), _tracker, _underlying_phase, acc);
} else {
return make_empty_coroutine();
@@ -1136,8 +1136,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
});
}
void row_cache::evict(const dht::partition_range& range) {
invalidate_unwrapped(range);
void row_cache::evict() {
while (_tracker.region().evict_some() == memory::reclaiming_result::reclaimed_something) {}
}
void row_cache::invalidate_unwrapped(const dht::partition_range& range) {
@@ -1224,8 +1224,11 @@ void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
partition_version& pv = partition_version::container_of(mutation_partition::container_of(
mutation_partition::rows_type::container_of_only_member(*it)));
if (pv.is_referenced_from_entry()) {
cache_entry& ce = cache_entry::container_of(partition_entry::container_of(pv));
ce.on_evicted(tracker);
partition_entry& pe = partition_entry::container_of(pv);
if (!pe.is_locked()) {
cache_entry& ce = cache_entry::container_of(pe);
ce.on_evicted(tracker);
}
}
}
}
@@ -1246,13 +1249,12 @@ flat_mutation_reader cache_entry::do_read(row_cache& rc, read_context& reader) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, reader.phase());
auto ckr = query::clustering_key_filter_ranges::get_ranges(*_schema, reader.slice(), _key.key());
auto r = make_cache_flat_mutation_reader(_schema, _key, std::move(ckr), rc, reader.shared_from_this(), std::move(snp));
if (reader.schema()->version() != _schema->version()) {
r = transform(std::move(r), schema_upgrader(reader.schema()));
}
if (reader.fwd() == streamed_mutation::forwarding::yes) {
r = make_forwardable(std::move(r));
}
return std::move(r);
r.upgrade_schema(rc.schema());
r.upgrade_schema(reader.schema());
return r;
}
const schema_ptr& row_cache::schema() const {
@@ -1260,7 +1262,7 @@ const schema_ptr& row_cache::schema() const {
}
void row_cache::upgrade_entry(cache_entry& e) {
if (e._schema != _schema) {
if (e._schema != _schema && !e.partition().is_locked()) {
auto& r = _tracker.region();
assert(!r.reclaiming_enabled());
with_allocator(r.allocator(), [this, &e] {

View File

@@ -549,12 +549,12 @@ public:
future<> invalidate(external_updater, const dht::partition_range& = query::full_partition_range);
future<> invalidate(external_updater, dht::partition_range_vector&&);
// Evicts entries from given range in cache.
// Evicts entries from cache.
//
// Note that this does not synchronize with the underlying source,
// it is assumed that the underlying source didn't change.
// If it did, use invalidate() instead.
void evict(const dht::partition_range& = query::full_partition_range);
void evict();
size_t partitions() const {
return _partitions.size();

Submodule seastar updated: 445b5126c2...083dc0875e

View File

@@ -181,7 +181,7 @@ future<> service::client_state::has_access(const sstring& ks, auth::permission p
for (auto cf : { db::system_keyspace::LOCAL, db::system_keyspace::PEERS }) {
tmp.insert(auth::make_data_resource(db::system_keyspace::NAME, cf));
}
for (auto cf : db::schema_tables::ALL) {
for (auto cf : db::schema_tables::all_table_names()) {
tmp.insert(auth::make_data_resource(db::schema_tables::NAME, cf));
}
return tmp;

View File

@@ -161,13 +161,14 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
auto& g = gms::get_local_gossiper();
auto& ss = get_local_storage_service();
_slen = _gstate.size();
g.add_local_application_state(gms::application_state::CACHE_HITRATES, ss.value_factory.cache_hitrates(_gstate));
// if max difference during this round is big schedule next recalculate earlier
if (_diff < 0.01) {
return std::chrono::milliseconds(2000);
} else {
return std::chrono::milliseconds(500);
}
return g.add_local_application_state(gms::application_state::CACHE_HITRATES, ss.value_factory.cache_hitrates(_gstate)).then([this] {
// if max difference during this round is big schedule next recalculate earlier
if (_diff < 0.01) {
return std::chrono::milliseconds(2000);
} else {
return std::chrono::milliseconds(500);
}
});
}).finally([this] {
_gstate = std::string(); // free memory, do not trust clear() to do that for string
_rates.clear();

View File

@@ -2751,8 +2751,8 @@ public:
// build reconcilable_result from reconciled data
// traverse backwards since large keys are at the start
std::vector<partition> vec;
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::ref(vec), [] (std::vector<partition>& a, const mutation_and_live_row_count& m_a_rc) {
utils::chunked_vector<partition> vec;
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::ref(vec), [] (utils::chunked_vector<partition>& a, const mutation_and_live_row_count& m_a_rc) {
a.emplace_back(partition(m_a_rc.live_row_count, freeze(m_a_rc.mut)));
return std::ref(a);
});

View File

@@ -409,6 +409,14 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
}
}
// If this is a restarting node, we should update tokens before gossip starts
auto my_tokens = db::system_keyspace::get_saved_tokens().get0();
bool restarting_normal_node = db::system_keyspace::bootstrap_complete() && !db().local().is_replacing() && !my_tokens.empty();
if (restarting_normal_node) {
slogger.info("Restarting a node in NORMAL status");
_token_metadata.update_normal_tokens(my_tokens, get_broadcast_address());
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
@@ -419,6 +427,12 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
}).get();
auto features = get_config_supported_features();
_token_metadata.update_host_id(local_host_id, get_broadcast_address());
// Replicate the tokens early because once gossip runs other nodes
// might send reads/writes to this node. Replicate it early to make
// sure the tokens are valid on all the shards.
replicate_to_all_cores().get();
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
@@ -429,6 +443,10 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
app_states.emplace(gms::application_state::SCHEMA_TABLES_VERSION, versioned_value(db::schema_tables::version));
app_states.emplace(gms::application_state::RPC_READY, value_factory.cql_ready(false));
app_states.emplace(gms::application_state::VIEW_BACKLOG, versioned_value(""));
if (restarting_normal_node) {
app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(my_tokens));
app_states.emplace(gms::application_state::STATUS, value_factory.normal(my_tokens));
}
slogger.info("Starting up server gossip");
auto& gossiper = gms::get_local_gossiper();
@@ -697,6 +715,7 @@ void storage_service::bootstrap(std::unordered_set<token> tokens) {
} else {
// Dont set any state for the node which is bootstrapping the existing token...
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
replicate_to_all_cores().get();
auto replace_addr = db().local().get_replace_address();
if (replace_addr) {
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
@@ -1473,6 +1492,7 @@ future<> storage_service::init_server(int delay, bind_messaging_port do_bind) {
auto tokens = db::system_keyspace::get_saved_tokens().get0();
if (!tokens.empty()) {
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
replicate_to_all_cores().get();
// order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
gossiper.add_local_application_state({
{ gms::application_state::TOKENS, value_factory.tokens(tokens) },

View File

@@ -72,8 +72,11 @@ inline gc_clock::duration parse_ttl(int32_t value) {
inline gc_clock::duration parse_ttl(const serialization_header& header,
uint64_t delta) {
int32_t _delta = static_cast<int32_t>(delta);
return parse_ttl(header.get_min_ttl() + _delta);
// sign-extend min_ttl back to 64 bits and
// add the delta using unsigned arithmetic
// to prevent signed integer overflow
uint64_t min_ttl = static_cast<uint64_t>(static_cast<int64_t>(header.get_min_ttl()));
return parse_ttl(static_cast<int32_t>(min_ttl + delta));
}
inline gc_clock::time_point parse_expiry(int32_t value) {
@@ -85,8 +88,11 @@ inline gc_clock::time_point parse_expiry(int32_t value) {
inline gc_clock::time_point parse_expiry(const serialization_header& header,
uint64_t delta) {
int32_t _delta = static_cast<int32_t>(delta);
return parse_expiry(header.get_min_local_deletion_time() + _delta);
// sign-extend min_local_deletion_time back to 64 bits and
// add the delta using unsigned arithmetic
// to prevent signed integer overflow
uint64_t min_local_deletion_time = static_cast<uint64_t>(static_cast<int64_t>(header.get_min_local_deletion_time()));
return parse_expiry(static_cast<int32_t>(min_local_deletion_time + delta));
}
}; // namespace sstables

View File

@@ -309,9 +309,11 @@ void write_missing_columns(W& out, const indexed_columns& columns, const row& ro
template <typename T, typename W>
GCC6_CONCEPT(requires Writer<W>())
void write_unsigned_delta_vint(W& out, T value, T base) {
using unsigned_type = std::make_unsigned_t<T>;
unsigned_type unsigned_delta = static_cast<unsigned_type>(value) - static_cast<unsigned_type>(base);
// sign-extend to 64-bits
using signed_type = std::make_signed_t<T>;
int64_t delta = static_cast<signed_type>(value) - static_cast<signed_type>(base);
int64_t delta = static_cast<int64_t>(static_cast<signed_type>(unsigned_delta));
// write as unsigned 64-bit varint
write_vint(out, static_cast<uint64_t>(delta));
}
@@ -381,9 +383,11 @@ sstable_schema make_sstable_schema(const schema& s, const encoding_stats& enc_st
sstable_schema sst_sch;
serialization_header& header = sst_sch.header;
// mc serialization header minimum values are delta-encoded based on the default timestamp epoch times
header.min_timestamp_base.value = static_cast<uint64_t>(enc_stats.min_timestamp - encoding_stats::timestamp_epoch);
header.min_local_deletion_time_base.value = static_cast<uint64_t>(enc_stats.min_local_deletion_time - encoding_stats::deletion_time_epoch);
header.min_ttl_base.value = static_cast<uint64_t>(enc_stats.min_ttl - encoding_stats::ttl_epoch);
// Note: We rely on implicit conversion to uint64_t when subtracting the signed epoch values below
// for preventing signed integer overflow.
header.min_timestamp_base.value = static_cast<uint64_t>(enc_stats.min_timestamp) - encoding_stats::timestamp_epoch;
header.min_local_deletion_time_base.value = static_cast<uint64_t>(enc_stats.min_local_deletion_time) - encoding_stats::deletion_time_epoch;
header.min_ttl_base.value = static_cast<uint64_t>(enc_stats.min_ttl) - encoding_stats::ttl_epoch;
header.pk_type_name = to_bytes_array_vint_size(pk_type_to_string(s));

View File

@@ -1023,9 +1023,26 @@ void sstable::write_simple(const T& component, const io_priority_class& pc) {
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
auto w = file_writer(std::move(f), std::move(options));
write(_version, w, component);
w.flush();
w.close();
std::exception_ptr eptr;
try {
write(_version, w, component);
w.flush();
} catch (...) {
eptr = std::current_exception();
}
try {
w.close();
} catch (...) {
std::exception_ptr close_eptr = std::current_exception();
sstlog.warn("failed to close file_writer: {}", close_eptr);
// If write succeeded but close failed, we rethrow close's exception.
if (!eptr) {
eptr = close_eptr;
}
}
if (eptr) {
std::rethrow_exception(eptr);
}
}
template future<> sstable::read_simple<component_type::Filter>(sstables::filter& f, const io_priority_class& pc);

View File

@@ -410,16 +410,17 @@ struct serialization_header : public metadata_base<serialization_header> {
}
// mc serialization header minimum values are delta-encoded based on the default timestamp epoch times
// Note: following conversions rely on min_*_base.value being unsigned to prevent signed integer overflow
api::timestamp_type get_min_timestamp() const {
return static_cast<api::timestamp_type>(min_timestamp_base.value + encoding_stats::timestamp_epoch);
}
int32_t get_min_ttl() const {
return static_cast<int32_t>(min_ttl_base.value) + encoding_stats::ttl_epoch;
return static_cast<int32_t>(min_ttl_base.value + encoding_stats::ttl_epoch);
}
int32_t get_min_local_deletion_time() const {
return static_cast<int32_t>(min_local_deletion_time_base.value) + encoding_stats::deletion_time_epoch;
return static_cast<int32_t>(min_local_deletion_time_base.value + encoding_stats::deletion_time_epoch);
}
};

View File

@@ -70,7 +70,7 @@ future<row_locker::lock_holder> table::push_view_replica_updates(const schema_pt
return push_view_replica_updates(s, std::move(m), timeout);
}
future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source) const {
future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source, const io_priority_class& io_priority) const {
if (!_config.view_update_concurrency_semaphore->current()) {
// We don't have resources to generate view updates for this write. If we reached this point, we failed to
// throttle the client. The memory queue is already full, waiting on the semaphore would cause this node to
@@ -110,13 +110,13 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
// We'll return this lock to the caller, which will release it after
// writing the base-table update.
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source)] (row_locker::lock_holder lock) {
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source), &io_priority] (row_locker::lock_holder lock) {
return do_with(
dht::partition_range::make_singular(m.decorated_key()),
std::move(slice),
std::move(m),
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source)] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, pk, slice, service::get_local_sstable_query_read_priority());
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this
@@ -128,11 +128,11 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
}
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source());
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(), service::get_local_sstable_query_read_priority());
}
future<row_locker::lock_holder> table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(std::move(excluded_sstable)));
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(std::move(excluded_sstable)), service::get_local_streaming_write_priority());
}
mutation_source

View File

@@ -151,3 +151,14 @@ rows_assertions rows_assertions::with_serialized_columns_count(size_t columns_co
}
return {*this};
}
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env, const char* query, const std::experimental::source_location& loc) {
try {
return env.execute_cql(query).get0();
} catch (...) {
BOOST_FAIL(format("query '{}' failed: {}\n{}:{}: originally from here",
query, std::current_exception(), loc.file_name(), loc.line()));
}
return shared_ptr<cql_transport::messages::result_message>(nullptr);
}

View File

@@ -22,10 +22,12 @@
#pragma once
#include "tests/cql_test_env.hh"
#include "transport/messages/result_message_base.hh"
#include "bytes.hh"
#include "core/shared_ptr.hh"
#include "core/future.hh"
#include <experimental/source_location>
class rows_assertions {
shared_ptr<cql_transport::messages::result_message::rows> _rows;
@@ -73,3 +75,12 @@ void assert_that_failed(future<T...>&& f)
catch (...) {
}
}
/// Invokes env.execute_cql(query), awaits its result, and returns it. If an exception is thrown,
/// invokes BOOST_FAIL with useful diagnostics.
///
/// \note Should be called from a seastar::thread context, as it awaits the CQL result.
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env,
const char* query,
const std::experimental::source_location& loc = std::experimental::source_location::current());

View File

@@ -1182,6 +1182,18 @@ SEASTAR_TEST_CASE(test_tuples) {
});
}
SEASTAR_TEST_CASE(test_user_type_reversed) {
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create type my_type (a int);").get();
e.execute_cql("create table tbl (a int, b frozen<my_type>, primary key ((a), b)) with clustering order by (b desc);").get();
e.execute_cql("insert into tbl (a, b) values (1, (2));").get();
assert_that(e.execute_cql("select a,b.a from tbl;").get0())
.is_rows()
.with_size(1)
.with_row({int32_type->decompose(1), int32_type->decompose(2)});
});
}
SEASTAR_TEST_CASE(test_user_type) {
return do_with_cql_env([] (cql_test_env& e) {
return e.execute_cql("create type ut1 (my_int int, my_bigint bigint, my_text text);").discard_result().then([&e] {
@@ -2978,3 +2990,11 @@ SEASTAR_TEST_CASE(test_select_with_mixed_order_table) {
}
});
}
SEASTAR_TEST_CASE(test_alter_type_on_compact_storage_with_no_regular_columns_does_not_crash) {
return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TYPE my_udf (first text);");
cquery_nofail(e, "create table z (pk int, ck frozen<my_udf>, primary key(pk, ck)) with compact storage;");
cquery_nofail(e, "alter type my_udf add test_int int;");
});
}

View File

@@ -62,7 +62,7 @@ private:
}
auto count = _memtables.size();
auto op = _apply.start();
auto new_mt = make_lw_shared<memtable>(_memtables.back()->schema());
auto new_mt = make_lw_shared<memtable>(_s);
std::vector<flat_mutation_reader> readers;
for (auto&& mt : _memtables) {
readers.push_back(mt->make_flat_reader(new_mt->schema(),
@@ -101,6 +101,13 @@ public:
_should_compact.broadcast();
_compactor.get();
}
// Will cause subsequent apply() calls to accept writes conforming to given schema (or older).
// Without this, the writes will be upgraded to the old schema and snapshots will not reflect
// parts of writes which depend on the new schema.
void set_schema(schema_ptr s) {
pending()->set_schema(s);
_s = s;
}
// Must run in a seastar thread
void clear() {
_memtables.erase(_memtables.begin(), _memtables.end());

View File

@@ -1874,6 +1874,168 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
}).get();
}
// Test the multishard reader correctly handling non-full prefix keys
//
// Check that the presence of non-full prefix keys in the mutation
// stream will not cause shard reader recreation skipping clustering rows
// that fall into the prefix.
//
// Theory of operation:
// 1) Prepare a bunch of partitions, each with a bunch of clustering
// rows with full clustering keys.
// 2) Use as the shard reader a special reader, which, if the last mutation
// fragment in the buffer is a clustering row, injects a range tombstone
// which starts with a non-full prefix covering the next clustering row.
// 4) Create range tombstones such that they don't shadow any of the rows
// and they are already expired, hence can be filtered out by compaction.
// 3) Read back all the mutations and check that no clustering row is missing.
//
// Note that the multishard reader recreates shard readers based on the last
// fragment seen by that shard reader. In this test we check that recreating
// the reader doesn't skip any rows if that last seen fragment is a range
// tombstone with a non-full prefix position.
//
// Has to be run with smp >= 3
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_full_prefix_keys) {
class reader : public flat_mutation_reader::impl {
flat_mutation_reader _reader;
public:
reader(schema_ptr schema, const dht::partition_range& range, const query::partition_slice& slice, std::vector<mutation> mutations)
: impl(std::move(schema)), _reader(flat_mutation_reader_from_mutations(std::move(mutations), range, slice)) {
}
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return _reader.fill_buffer(timeout).then([this] {
_reader.move_buffer_content_to(*this);
if (is_buffer_empty()) {
_end_of_stream = _reader.is_end_of_stream();
return;
}
const auto& mf = buffer().back();
if (!mf.is_clustering_row()) {
return;
}
const auto& ck = mf.key();
auto ck_val = value_cast<int32_t>(int32_type->deserialize_value(ck.explode().front()));
// The last fragment is a cr with pos {ck_val, ck_val}.
// The next (if any) will be {ck_val + 1, ck_val + 1}.
// We want to cover: [{ck_val + 1}, {ck_val + 2, 0}), so that
// the prefix covers the next row, but not the one after it.
auto start = clustering_key_prefix::from_exploded(*_schema, {int32_type->decompose(data_value(++ck_val))});
auto end = clustering_key_prefix::from_exploded(*_schema, {int32_type->decompose(data_value(++ck_val)), int32_type->decompose(data_value(0))});
// We want all the range tombstones to be gc-able.
const auto deletion_time = gc_clock::now() - _schema->gc_grace_seconds() - std::chrono::hours(8);
// Make expired tombstones so we can just compact them away
// when comparing the read data to the original ones. We are
// only interested in the rows anyway.
auto rt = range_tombstone(start, bound_kind::incl_start, end, bound_kind::excl_end, tombstone(-100, deletion_time));
push_mutation_fragment(std::move(rt));
});
}
virtual void next_partition() override { }
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override { throw std::bad_function_call(); }
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override { throw std::bad_function_call(); }
};
struct mutation_less_comparator {
dht::decorated_key::less_comparator _cmp;
explicit mutation_less_comparator(schema_ptr s) : _cmp(s) { }
bool operator()(const mutation& a, const mutation& b) const {
return _cmp(a.decorated_key(), b.decorated_key());
}
};
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}
do_with_cql_env([] (cql_test_env& env) -> future<> {
auto schema = schema_builder("ks", "cf")
.with_column(to_bytes("pk"), int32_type, column_kind::partition_key)
.with_column(to_bytes(format("ck{}", 0)), int32_type, column_kind::clustering_key)
.with_column(to_bytes(format("ck{}", 1)), int32_type, column_kind::clustering_key)
.with_column(to_bytes("v"), int32_type, column_kind::regular_column)
.build();
auto expected_mutations = std::set<mutation, mutation_less_comparator>{mutation_less_comparator{schema}};
std::unordered_map<shard_id, std::vector<frozen_mutation>> shard_mutations;
auto& partitioner = dht::global_partitioner();
auto val_cdef = schema->regular_column_at(0);
api::timestamp_type ts = 0;
for (auto pk = 0; pk < 10 * static_cast<int>(smp::count); ++pk) {
auto dkey = partitioner.decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(data_value(pk))));
auto mut = mutation(schema, dkey);
for (auto ck = 0; ck < 100; ++ck) {
auto ck_val = int32_type->decompose(data_value(ck));
auto ckey = clustering_key::from_exploded(*schema, {ck_val, ck_val});
mut.set_clustered_cell(ckey, val_cdef, atomic_cell::make_live(*val_cdef.type, ts++, int32_type->decompose(data_value(0))));
}
expected_mutations.emplace(std::move(mut));
}
for (const auto& mut : expected_mutations) {
shard_mutations[partitioner.shard_of(mut.token())].emplace_back(freeze(mut));
}
auto factory = [&shard_mutations] (
shard_id shard,
schema_ptr schema,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
auto& frozen_muts = shard_mutations[shard];
return smp::submit_to(shard, [gs = global_schema_ptr(schema), &range, &slice, &frozen_muts] () mutable {
auto schema = gs.get();
auto rd = make_flat_mutation_reader<reader>(schema, range, slice, boost::copy_range<std::vector<mutation>>(
frozen_muts | boost::adaptors::transformed([schema] (const frozen_mutation& fm) { return fm.unfreeze(schema); })));
using foreign_reader_ptr = foreign_ptr<std::unique_ptr<flat_mutation_reader>>;
return make_ready_future<foreign_reader_ptr>(make_foreign(std::make_unique<flat_mutation_reader>(std::move(rd))));
});
};
std::vector<mutation> actual_mutations;
{
auto reader = make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), test_reader_lifecycle_policy::no_delay, true),
partitioner,
schema,
query::full_partition_range,
schema->full_slice(),
service::get_local_sstable_query_read_priority());
const auto now = gc_clock::now();
while (auto mut_opt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0()) {
// We expect the range tombstones to be purged.
mut_opt->partition().compact_for_query(*schema, now, {query::clustering_range::make_open_ended_both_sides()}, false,
std::numeric_limits<uint32_t>::max());
actual_mutations.emplace_back(std::move(*mut_opt));
}
}
BOOST_REQUIRE_EQUAL(actual_mutations.size(), expected_mutations.size());
auto ita = actual_mutations.begin();
auto ite = expected_mutations.begin();
for (;ita != actual_mutations.end(), ite != expected_mutations.end(); ++ita, ++ite) {
assert_that(*ita).is_equal_to(*ite);
}
return make_ready_future<>();
}).get();
}
// A reader that can controlled by it's "creator" after it's created.
//
// It can execute one of a set of actions on it's fill_buffer() call:

View File

@@ -1253,6 +1253,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
assert_that(m).is_equal_to(m2);
}
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
// to TTL.
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
can_gc_fn never_gc = [] (tombstone) { return false; };
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
BOOST_REQUIRE(mark.is_live(tombstone(), t));
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
};
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
};
const auto timestamp = api::timestamp_type(1);
const auto t0 = gc_clock::now();
const auto t1 = t0 + 1s;
const auto t2 = t0 + 2s;
const auto t3 = t0 + 3s;
// Without timestamp the marker is missing (doesn't exist)
const row_marker m1;
must_be_dead(m1, t0);
must_be_dead(m1, t1);
must_be_dead(m1, t2);
must_be_dead(m1, t3);
// With timestamp and without ttl, a row_marker is always alive
const row_marker m2(timestamp);
must_be_alive(m2, t0);
must_be_alive(m2, t1);
must_be_alive(m2, t2);
must_be_alive(m2, t3);
// A row_marker becomes dead exactly at the moment of expiry
// Reproduces #4263, #5290
const auto ttl = 1s;
const row_marker m3(timestamp, ttl, t2);
must_be_alive(m3, t0);
must_be_alive(m3, t1);
must_be_dead(m3, t2);
must_be_dead(m3, t3);
}
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck", bytes_type, column_kind::clustering_key)
.build();
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
auto ttl = 1s;
auto t0 = gc_clock::now();
auto t1 = t0 + 1s;
auto t2 = t0 + 2s;
auto t3 = t0 + 3s;
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
auto slice = partition_slice_builder(*s)
.without_partition_key_columns()
.build();
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
};
mutation m(s, pk);
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
assert_that(results_at_time(m, t0))
.has_size(3)
.has(a_row().with_column("ck", data_value(bytes("A"))))
.has(a_row().with_column("ck", data_value(bytes("B"))))
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t1))
.has_size(2)
.has(a_row().with_column("ck", data_value(bytes("B"))))
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t2))
.has_size(1)
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t3)).is_empty();
}
SEASTAR_TEST_CASE(test_querying_expired_cells) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")

View File

@@ -269,7 +269,10 @@ void mvcc_partition::apply_to_evictable(partition_entry&& src, schema_ptr src_sc
logalloc::allocating_section as;
mutation_cleaner src_cleaner(region(), no_cache_tracker);
auto c = as(region(), [&] {
return _e.apply_to_incomplete(*schema(), std::move(src), *src_schema, src_cleaner, as, region(),
if (_s != src_schema) {
src.upgrade(src_schema, _s, src_cleaner, no_cache_tracker);
}
return _e.apply_to_incomplete(*schema(), std::move(src), src_cleaner, as, region(),
*_container.tracker(), _container.next_phase(), _container.accounter());
});
repeat([&] {

View File

@@ -65,6 +65,16 @@ struct table {
c_keys = s.make_ckeys(rows);
}
void set_schema(schema_ptr new_s) {
s.set_schema(new_s);
mt->set_schema(new_s);
if (prev_mt) {
prev_mt->set_schema(new_s);
}
cache.set_schema(new_s);
underlying.set_schema(new_s);
}
size_t index_of_key(const dht::decorated_key& dk) {
for (auto i : boost::irange<size_t>(0, p_keys.size())) {
if (p_keys[i].equal(*s.schema(), dk)) {
@@ -125,16 +135,28 @@ struct table {
flat_mutation_reader rd;
};
void alter_schema() {
static thread_local int col_id = 0;
auto new_s = schema_builder(s.schema())
.with_column(to_bytes(format("_a{}", col_id++)), byte_type)
.build();
test_log.trace("changing schema to {}", *new_s);
set_schema(new_s);
}
std::unique_ptr<reader> make_reader(dht::partition_range pr, query::partition_slice slice) {
test_log.trace("making reader, pk={} ck={}", pr, slice);
auto r = std::make_unique<reader>(reader{std::move(pr), std::move(slice), make_empty_flat_reader(s.schema())});
std::vector<flat_mutation_reader> rd;
if (prev_mt) {
rd.push_back(prev_mt->make_flat_reader(s.schema(), r->pr, r->slice));
rd.push_back(prev_mt->make_flat_reader(s.schema(), r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
}
rd.push_back(mt->make_flat_reader(s.schema(), r->pr, r->slice));
rd.push_back(cache.make_reader(s.schema(), r->pr, r->slice));
r->rd = make_combined_reader(s.schema(), std::move(rd), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
rd.push_back(mt->make_flat_reader(s.schema(), r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
rd.push_back(cache.make_reader(s.schema(), r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
r->rd = make_combined_reader(s.schema(), std::move(rd), streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return r;
}
@@ -168,11 +190,13 @@ class validating_consumer {
size_t _row_count = 0;
size_t _key = 0;
std::vector<api::timestamp_type> _writetimes;
schema_ptr _s;
public:
validating_consumer(table& t, reader_id id)
validating_consumer(table& t, reader_id id, schema_ptr s)
: _t(t)
, _id(id)
, _writetimes(t.p_writetime)
, _s(s)
{ }
void consume_new_partition(const dht::decorated_key& key) {
@@ -190,7 +214,7 @@ public:
++_row_count;
sstring value;
api::timestamp_type t;
std::tie(value, t) = _t.s.get_value(row);
std::tie(value, t) = _t.s.get_value(*_s, row);
test_log.trace("reader {}: {} @{}, {}", _id, value, t, row);
if (_value && value != _value) {
throw std::runtime_error(sprint("Saw values from two different writes in partition %d: %s and %s", _key, _value, value));
@@ -305,7 +329,7 @@ int main(int argc, char** argv) {
while (!cancelled) {
test_log.trace("{}: starting read", id);
auto rd = t.make_single_key_reader(pk, ck_range);
auto row_count = rd->rd.consume(validating_consumer(t, id), db::no_timeout).get0();
auto row_count = rd->rd.consume(validating_consumer(t, id, t.s.schema()), db::no_timeout).get0();
if (row_count != len) {
throw std::runtime_error(sprint("Expected %d fragments, got %d", len, row_count));
}
@@ -317,7 +341,7 @@ int main(int argc, char** argv) {
while (!cancelled) {
test_log.trace("{}: starting read", id);
auto rd = t.make_scanning_reader();
auto row_count = rd->rd.consume(validating_consumer(t, id), db::no_timeout).get0();
auto row_count = rd->rd.consume(validating_consumer(t, id, t.s.schema()), db::no_timeout).get0();
if (row_count != expected_row_count) {
throw std::runtime_error(sprint("Expected %d fragments, got %d", expected_row_count, row_count));
}
@@ -352,6 +376,12 @@ int main(int argc, char** argv) {
});
evictor.arm_periodic(3s);
timer<> schema_changer;
schema_changer.set_callback([&] {
t.alter_schema();
});
schema_changer.arm_periodic(1s);
// Mutator
while (!cancelled) {
t.mutate_next_phase();
@@ -362,6 +392,13 @@ int main(int argc, char** argv) {
evictor.cancel();
readers.get();
scanning_readers.get();
t.cache.evict();
t.tracker.cleaner().drain().get();
t.tracker.memtable_cleaner().drain().get();
assert(t.tracker.get_stats().partitions == 0);
assert(t.tracker.get_stats().rows == 0);
});
});
}

View File

@@ -22,6 +22,7 @@
#include <boost/test/unit_test.hpp>
#include <seastar/core/sleep.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/backtrace.hh>
#include <seastar/util/alloc_failure_injector.hh>
#include <boost/algorithm/cxx11/any_of.hpp>
@@ -3196,6 +3197,66 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
});
}
SEASTAR_TEST_CASE(test_alter_then_preempted_update_then_memtable_read) {
return seastar::async([] {
simple_schema ss;
memtable_snapshot_source underlying(ss.schema());
schema_ptr s = ss.schema();
auto pk = ss.make_pkey("pk");
mutation m(s, pk);
mutation m2(s, pk);
const int c_keys = 10000; // enough for update to be preempted
for (auto ck : ss.make_ckeys(c_keys)) {
ss.add_row(m, ck, "tag1");
ss.add_row(m2, ck, "tag2");
}
underlying.apply(m);
cache_tracker tracker;
row_cache cache(s, snapshot_source([&] { return underlying(); }), tracker);
auto pr = dht::partition_range::make_singular(m.decorated_key());
// Populate the cache so that update has an entry to update.
assert_that(cache.make_reader(s, pr)).produces(m);
auto mt2 = make_lw_shared<memtable>(s);
mt2->apply(m2);
// Alter the schema
auto s2 = schema_builder(s)
.with_column(to_bytes("_a"), byte_type)
.build();
cache.set_schema(s2);
mt2->set_schema(s2);
auto update_f = cache.update([&] () noexcept {
underlying.apply(m2);
}, *mt2);
auto wait_for_update = defer([&] { update_f.get(); });
// Wait for cache update to enter the partition
while (tracker.get_stats().partition_merges == 0) {
later().get();
}
auto mt2_reader = mt2->make_flat_reader(s, pr, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto cache_reader = cache.make_reader(s, pr, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
assert_that(std::move(mt2_reader)).produces(m2);
assert_that(std::move(cache_reader)).produces(m);
wait_for_update.cancel();
update_f.get();
assert_that(cache.make_reader(s)).produces(m + m2);
});
}
SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memtable_readers) {
// Verifies that memtable readers created before memtable is moved to cache
// are not affected by eviction in cache after their partition entries were moved to cache.

View File

@@ -24,6 +24,7 @@
#include "tests/cql_assertions.hh"
#include "service/pager/paging_state.hh"
#include "transport/messages/result_message.hh"
#include "cql3/statements/select_statement.hh"
SEASTAR_TEST_CASE(test_secondary_index_regular_column_query) {
@@ -654,3 +655,64 @@ SEASTAR_TEST_CASE(test_secondary_index_on_partition_key_with_filtering) {
});
});
}
SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
static constexpr int row_count = 2 * cql3::statements::select_statement::DEFAULT_COUNT_PAGE_SIZE + 120;
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("CREATE TABLE fpa (id int primary key, v int)").get();
e.execute_cql("CREATE INDEX ON fpa(v)").get();
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO fpa (id, v) VALUES ({}, {})", i + 1, i % 2).c_str()).get();
}
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = e.execute_cql("SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo)).get0();
// Even though we set up paging, we still expect a single result from an aggregation function.
// Also, instead of the user-provided page size, internal DEFAULT_COUNT_PAGE_SIZE is expected to be used.
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4)},
});
// Even if paging is not explicitly used, the query will be internally paged to avoid OOM.
msg = e.execute_cql("SELECT sum(id) FROM fpa WHERE v = 1;").get0();
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count / 2 + 1)},
});
});
// Similar, but this time a non-prefix clustering key part is indexed (wrt. issue 3405, after which we have
// a special code path for indexing composite non-prefix clustering keys).
e.execute_cql("CREATE TABLE fpa2 (id int, c1 int, c2 int, primary key (id, c1, c2))").get();
e.execute_cql("CREATE INDEX ON fpa2(c2)").get();
eventually([&] {
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str()).get();
}
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = e.execute_cql("SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo)).get0();
// Even though we set up paging, we still expect a single result from an aggregation function
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = e.execute_cql("SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo)).get0();
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count / 2 + 1)},
});
});
});
}

View File

@@ -40,12 +40,21 @@ class simple_schema {
schema_ptr _s;
api::timestamp_type _timestamp = api::min_timestamp;
const column_definition& _v_def;
const column_definition* _v_def = nullptr;
table_schema_version _v_def_version;
simple_schema(schema_ptr s, api::timestamp_type timestamp)
: _s(s)
, _timestamp(timestamp)
, _v_def(*_s->get_column_definition(to_bytes("v"))) {
{}
private:
const column_definition& get_v_def(const schema& s) {
if (_v_def_version == s.version() && _v_def) {
return *_v_def;
}
_v_def = s.get_column_definition(to_bytes("v"));
_v_def_version = s.version();
return *_v_def;
}
public:
api::timestamp_type current_timestamp() {
@@ -66,7 +75,6 @@ public:
.with_column("s1", utf8_type, ws ? column_kind::static_column : column_kind::regular_column)
.with_column("v", utf8_type)
.build())
, _v_def(*_s->get_column_definition(to_bytes("v")))
{ }
sstring cql() const {
@@ -101,16 +109,18 @@ public:
if (t == api::missing_timestamp) {
t = new_timestamp();
}
m.set_clustered_cell(key, _v_def, atomic_cell::make_live(*_v_def.type, t, data_value(v).serialize()));
const column_definition& v_def = get_v_def(*_s);
m.set_clustered_cell(key, v_def, atomic_cell::make_live(*v_def.type, t, data_value(v).serialize()));
return t;
}
std::pair<sstring, api::timestamp_type> get_value(const clustering_row& row) {
auto cell = row.cells().find_cell(_v_def.id);
std::pair<sstring, api::timestamp_type> get_value(const schema& s, const clustering_row& row) {
const column_definition& v_def = get_v_def(s);
auto cell = row.cells().find_cell(v_def.id);
if (!cell) {
throw std::runtime_error("cell not found");
}
atomic_cell_view ac = cell->as_atomic_cell(_v_def);
atomic_cell_view ac = cell->as_atomic_cell(v_def);
if (!ac.is_live()) {
throw std::runtime_error("cell is dead");
}
@@ -119,17 +129,22 @@ public:
mutation_fragment make_row(const clustering_key& key, sstring v) {
auto row = clustering_row(key);
row.cells().apply(*_s->get_column_definition(to_bytes(sstring("v"))),
atomic_cell::make_live(*_v_def.type, new_timestamp(), data_value(v).serialize()));
const column_definition& v_def = get_v_def(*_s);
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, new_timestamp(), data_value(v).serialize()));
return mutation_fragment(std::move(row));
}
mutation_fragment make_row_from_serialized_value(const clustering_key& key, bytes_view v) {
auto row = clustering_row(key);
row.cells().apply(_v_def, atomic_cell::make_live(*_v_def.type, new_timestamp(), v));
const column_definition& v_def = get_v_def(*_s);
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, new_timestamp(), v));
return mutation_fragment(std::move(row));
}
void set_schema(schema_ptr s) {
_s = s;
}
api::timestamp_type add_static_row(mutation& m, sstring s1, api::timestamp_type t = api::missing_timestamp) {
if (t == api::missing_timestamp) {
t = new_timestamp();

View File

@@ -515,7 +515,7 @@ SEASTAR_TEST_CASE(test_update_column_not_in_view_with_flush) {
}
void test_partial_update_with_unselected_collections(cql_test_env& e, std::function<void()>&& maybe_flush) {
e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<int>, m map<int,int>, primary key (p, c))").get();
e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<int>, m map<int,text>, primary key (p, c))").get();
e.execute_cql("create materialized view vcf as select a, b from cf "
"where p is not null and c is not null "
"primary key (c, p)").get();
@@ -563,7 +563,7 @@ e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<i
assert_that(msg).is_rows().is_empty();
});
e.execute_cql("update cf set m=m+{3:3}, l=l-[1], s=s-{2} where p = 1 and c = 1").get();
e.execute_cql("update cf set m=m+{3:'text'}, l=l-[1], s=s-{2} where p = 1 and c = 1").get();
maybe_flush();
eventually([&] {
auto msg = e.execute_cql("select * from vcf").get0();

View File

@@ -205,8 +205,9 @@ void tracing::set_trace_probability(double p) {
}
one_session_records::one_session_records()
: backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state())
, budget_ptr(tracing::get_local_tracing_instance().get_cached_records_ptr()) {}
: _local_tracing_ptr(tracing::get_local_tracing_instance().shared_from_this())
, backend_state_ptr(_local_tracing_ptr->allocate_backend_session_state())
, budget_ptr(_local_tracing_ptr->get_cached_records_ptr()) {}
std::ostream& operator<<(std::ostream& os, const span_id& id) {
return os << id.get_id();

View File

@@ -239,6 +239,8 @@ public:
};
class one_session_records {
private:
shared_ptr<tracing> _local_tracing_ptr;
public:
utils::UUID session_id;
session_record session_rec;
@@ -663,7 +665,7 @@ private:
void one_session_records::set_pending_for_write() {
_is_pending_for_write = true;
budget_ptr = tracing::get_local_tracing_instance().get_pending_records_ptr();
budget_ptr = _local_tracing_ptr->get_pending_records_ptr();
}
void one_session_records::data_consumed() {
@@ -672,7 +674,7 @@ void one_session_records::data_consumed() {
}
_is_pending_for_write = false;
budget_ptr = tracing::get_local_tracing_instance().get_cached_records_ptr();
budget_ptr = _local_tracing_ptr->get_cached_records_ptr();
}
inline span_id span_id::make_span_id() {

View File

@@ -1988,8 +1988,7 @@ struct empty_type_impl : abstract_type {
return false;
}
virtual std::experimental::optional<data_type> update_user_type(const shared_ptr<const user_type_impl> updated) const {
// Can't happen
abort();
return std::experimental::nullopt;
}
};

View File

@@ -246,10 +246,18 @@ public:
public:
const T& front() const { return *cbegin(); }
T& front() { return *begin(); }
iterator begin() const { return iterator(_chunks.data(), 0); }
iterator end() const { return iterator(_chunks.data(), _size); }
iterator begin() { return iterator(_chunks.data(), 0); }
iterator end() { return iterator(_chunks.data(), _size); }
const_iterator begin() const { return const_iterator(_chunks.data(), 0); }
const_iterator end() const { return const_iterator(_chunks.data(), _size); }
const_iterator cbegin() const { return const_iterator(_chunks.data(), 0); }
const_iterator cend() const { return const_iterator(_chunks.data(), _size); }
std::reverse_iterator<iterator> rbegin() { return std::reverse_iterator(end()); }
std::reverse_iterator<iterator> rend() { return std::reverse_iterator(begin()); }
std::reverse_iterator<const_iterator> rbegin() const { return std::reverse_iterator(end()); }
std::reverse_iterator<const_iterator> rend() const { return std::reverse_iterator(begin()); }
std::reverse_iterator<const_iterator> crbegin() const { return std::reverse_iterator(cend()); }
std::reverse_iterator<const_iterator> crend() const { return std::reverse_iterator(cbegin()); }
public:
bool operator==(const chunked_vector& x) const {
return boost::equal(*this, x);