Compare commits
36 Commits
master
...
scylla-4.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89e79023ae | ||
|
|
bc67da1a21 | ||
|
|
0c7643f1fe | ||
|
|
c563234f40 | ||
|
|
77b7a48a02 | ||
|
|
b2b1bfb159 | ||
|
|
d72cbe37aa | ||
|
|
9f7b560771 | ||
|
|
06af9c028c | ||
|
|
c74ab3ae80 | ||
|
|
32cd3a070a | ||
|
|
bb1554f09e | ||
|
|
2037d7550e | ||
|
|
c320c3f6da | ||
|
|
0ed70944aa | ||
|
|
89f860d409 | ||
|
|
0819d221f4 | ||
|
|
53f47d4e67 | ||
|
|
21ad12669a | ||
|
|
c812359383 | ||
|
|
1bd79705fb | ||
|
|
7e2ef386cc | ||
|
|
51bad7e72c | ||
|
|
0379d0c031 | ||
|
|
a8ef820f27 | ||
|
|
9908f009a4 | ||
|
|
48d8a075b4 | ||
|
|
e3ddd607bc | ||
|
|
511773d466 | ||
|
|
121cd383fa | ||
|
|
90639f48e5 | ||
|
|
8d029a04aa | ||
|
|
67995db899 | ||
|
|
282cd0df7c | ||
|
|
ce58994d30 | ||
|
|
78f5afec30 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.0.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -208,12 +208,11 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
throw api_error("ValidationException",
|
||||
format("Non-string IndexName '{}'", index_name->GetString()));
|
||||
}
|
||||
}
|
||||
|
||||
// If no tables for global indexes were found, the index may be local
|
||||
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
|
||||
type = table_or_view_type::lsi;
|
||||
table_name = lsi_name(orig_table_name, index_name->GetString());
|
||||
// If no tables for global indexes were found, the index may be local
|
||||
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
|
||||
type = table_or_view_type::lsi;
|
||||
table_name = lsi_name(orig_table_name, index_name->GetString());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -1019,13 +1018,22 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
|
||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
mutation m(schema, _pk);
|
||||
auto& row = m.partition().clustered_row(*schema, _ck);
|
||||
// If there's no clustering key, a tombstone should be created directly
|
||||
// on a partition, not on a clustering row - otherwise it will look like
|
||||
// an open-ended range tombstone, which will crash on KA/LA sstable format.
|
||||
// Ref: #6035
|
||||
const bool use_partition_tombstone = schema->clustering_key_size() == 0;
|
||||
if (!_cells) {
|
||||
// a DeleteItem operation:
|
||||
row.apply(tombstone(ts, gc_clock::now()));
|
||||
if (use_partition_tombstone) {
|
||||
m.partition().apply(tombstone(ts, gc_clock::now()));
|
||||
} else {
|
||||
// a DeleteItem operation:
|
||||
m.partition().clustered_row(*schema, _ck).apply(tombstone(ts, gc_clock::now()));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
// else, a PutItem operation:
|
||||
auto& row = m.partition().clustered_row(*schema, _ck);
|
||||
attribute_collector attrs_collector;
|
||||
for (auto& c : *_cells) {
|
||||
const column_definition* cdef = schema->get_column_definition(c.column_name);
|
||||
@@ -1048,7 +1056,11 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
// Scylla proper, to implement the operation to replace an entire
|
||||
// collection ("UPDATE .. SET x = ..") - see
|
||||
// cql3::update_parameters::make_tombstone_just_before().
|
||||
row.apply(tombstone(ts-1, gc_clock::now()));
|
||||
if (use_partition_tombstone) {
|
||||
m.partition().apply(tombstone(ts-1, gc_clock::now()));
|
||||
} else {
|
||||
row.apply(tombstone(ts-1, gc_clock::now()));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
|
||||
@@ -434,6 +434,12 @@ GCC6_CONCEPT(
|
||||
static KeyType
|
||||
generate_base_key_from_index_pk(const partition_key& index_pk, const std::optional<clustering_key>& index_ck, const schema& base_schema, const schema& view_schema) {
|
||||
const auto& base_columns = std::is_same_v<KeyType, partition_key> ? base_schema.partition_key_columns() : base_schema.clustering_key_columns();
|
||||
|
||||
// An empty key in the index paging state translates to an empty base key
|
||||
if (index_pk.is_empty() && !index_ck) {
|
||||
return KeyType::make_empty();
|
||||
}
|
||||
|
||||
std::vector<bytes_view> exploded_base_key;
|
||||
exploded_base_key.reserve(base_columns.size());
|
||||
|
||||
@@ -507,8 +513,7 @@ indexed_table_select_statement::do_execute_base_query(
|
||||
if (old_paging_state && concurrency == 1) {
|
||||
auto base_pk = generate_base_key_from_index_pk<partition_key>(old_paging_state->get_partition_key(),
|
||||
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
|
||||
if (_schema->clustering_key_size() > 0) {
|
||||
assert(old_paging_state->get_clustering_key().has_value());
|
||||
if (old_paging_state->get_clustering_key() && _schema->clustering_key_size() > 0) {
|
||||
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
|
||||
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
|
||||
command->slice.set_range(*_schema, base_pk,
|
||||
|
||||
@@ -614,11 +614,17 @@ public:
|
||||
future<sseg_ptr> terminate() {
|
||||
assert(_closed);
|
||||
if (!std::exchange(_terminated, true)) {
|
||||
clogger.trace("{} is closed but not terminated.", *this);
|
||||
if (_buffer.empty()) {
|
||||
new_buffer(0);
|
||||
// write a terminating zero block iff we are ending (a reused)
|
||||
// block before actual file end.
|
||||
// we should only get here when all actual data is
|
||||
// already flushed (see below, close()).
|
||||
if (size_on_disk() < _segment_manager->max_size) {
|
||||
clogger.trace("{} is closed but not terminated.", *this);
|
||||
if (_buffer.empty()) {
|
||||
new_buffer(0);
|
||||
}
|
||||
return cycle(true, true);
|
||||
}
|
||||
return cycle(true, true);
|
||||
}
|
||||
return make_ready_future<sseg_ptr>(shared_from_this());
|
||||
}
|
||||
|
||||
@@ -689,6 +689,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
@@ -859,7 +860,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
|
||||
}
|
||||
|
||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||
if (experimental()) {
|
||||
if (experimental() && f != experimental_features_t::UNUSED) {
|
||||
return true;
|
||||
}
|
||||
const auto& optval = experimental_features();
|
||||
@@ -911,11 +912,13 @@ const db::extensions& db::config::extensions() const {
|
||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
||||
// Lightweight transactions are no longer experimental. Map them
|
||||
// to UNUSED switch for a while, then remove altogether.
|
||||
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
|
||||
}
|
||||
|
||||
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
||||
return {LWT, UDF, CDC};
|
||||
return {UDF, CDC};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
@@ -81,7 +81,7 @@ namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { LWT, UDF, CDC };
|
||||
enum feature { UNUSED, UDF, CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
};
|
||||
@@ -278,6 +278,7 @@ public:
|
||||
named_value<uint32_t> shutdown_announce_in_ms;
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<int32_t> force_gossip_generation;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
|
||||
@@ -187,7 +187,7 @@ schema_ptr batchlog() {
|
||||
{{"cf_id", uuid_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"in_progress_ballot", timeuuid_type},
|
||||
{"promise", timeuuid_type},
|
||||
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
|
||||
{"most_recent_commit_at", timeuuid_type},
|
||||
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
|
||||
@@ -2201,8 +2201,8 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
||||
return service::paxos::paxos_state();
|
||||
}
|
||||
auto& row = results->one();
|
||||
auto promised = row.has("in_progress_ballot")
|
||||
? row.get_as<utils::UUID>("in_progress_ballot") : utils::UUID_gen::min_time_UUID(0);
|
||||
auto promised = row.has("promise")
|
||||
? row.get_as<utils::UUID>("promise") : utils::UUID_gen::min_time_UUID(0);
|
||||
|
||||
std::optional<service::paxos::proposal> accepted;
|
||||
if (row.has("proposal")) {
|
||||
@@ -2228,7 +2228,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
|
||||
}
|
||||
|
||||
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
||||
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||
return execute_cql_with_timeout(cql,
|
||||
timeout,
|
||||
utils::UUID_gen::micros_timestamp(ballot),
|
||||
@@ -2274,6 +2274,20 @@ future<> save_paxos_decision(const schema& s, const service::paxos::proposal& de
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
||||
// This should be called only if a learn stage succeeded on all replicas.
|
||||
// In this case we can remove the paxos row using ballot's timestamp which
|
||||
// guarantees that if there is more recent round it will not be affected.
|
||||
static auto cql = format("DELETE FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||
|
||||
return execute_cql_with_timeout(cql,
|
||||
timeout,
|
||||
utils::UUID_gen::micros_timestamp(ballot),
|
||||
to_legacy(*key.get_compound_type(s), key.representation()),
|
||||
s.id()
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
|
||||
sstring system_keyspace_name() {
|
||||
|
||||
@@ -647,6 +647,7 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
||||
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
||||
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
|
||||
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
|
||||
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
||||
|
||||
} // namespace system_keyspace
|
||||
} // namespace db
|
||||
|
||||
@@ -118,7 +118,7 @@ token token::midpoint(const token& t1, const token& t2) {
|
||||
}
|
||||
|
||||
token token::get_random_token() {
|
||||
return {kind::key, dht::get_random_number<int64_t>()};
|
||||
return token(kind::key, dht::get_random_number<uint64_t>());
|
||||
}
|
||||
|
||||
token token::from_sstring(const sstring& t) {
|
||||
|
||||
24
dht/token.hh
24
dht/token.hh
@@ -58,19 +58,27 @@ public:
|
||||
, _data(normalize(d)) { }
|
||||
|
||||
token(kind k, const bytes& b) : _kind(std::move(k)) {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
if (_kind != kind::key) {
|
||||
_data = 0;
|
||||
} else {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
|
||||
token(kind k, bytes_view b) : _kind(std::move(k)) {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
if (_kind != kind::key) {
|
||||
_data = 0;
|
||||
} else {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
|
||||
bool is_minimum() const {
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=666.development
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.0/latest/scylla.repo
|
||||
ARG VERSION=4.0.*
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -21,10 +21,6 @@ DynamoDB API requests.
|
||||
For example., "`--alternator-port=8000`" on the command line will run
|
||||
Alternator on port 8000 - the traditional port used by DynamoDB.
|
||||
|
||||
Alternator uses Scylla's LWT feature, which is currently considered
|
||||
experimental and needs to be seperately enabled as well, e.g. with the
|
||||
"`--experimental=on`" option.
|
||||
|
||||
By default, Scylla listens on this port on all network interfaces.
|
||||
To listen only on a specific interface, pass also an "`alternator-address`"
|
||||
option.
|
||||
@@ -55,9 +51,8 @@ Alternator's compatibility with DynamoDB, and will be updated as the work
|
||||
progresses and compatibility continues to improve.
|
||||
|
||||
### API Server
|
||||
* Transport: HTTP mostly supported, but small features like CRC header and
|
||||
compression are still missing. HTTPS supported on top of HTTP, so small
|
||||
features may still be missing.
|
||||
* Transport: HTTP and HTTPS are mostly supported, but small features like CRC
|
||||
header and compression are still missing.
|
||||
* Authorization (verifying the originator of the request): implemented
|
||||
on top of system\_auth.roles table. The secret key used for authorization
|
||||
is the salted\_hash column from the roles table, selected with:
|
||||
@@ -65,20 +60,19 @@ progresses and compatibility continues to improve.
|
||||
By default, authorization is not enforced at all. It can be turned on
|
||||
by providing an entry in Scylla configuration:
|
||||
alternator\_enforce\_authorization: true
|
||||
* DNS server for load balancing: Not yet supported. Client needs to pick
|
||||
one of the live Scylla nodes and send a request to it.
|
||||
* Load balancing: Not a part of Alternator. One should use an external load
|
||||
balancer or DNS server to balance the requests between the live Scylla
|
||||
nodes. We plan to publish a reference example soon.
|
||||
### Table Operations
|
||||
* CreateTable: Supported. Note our implementation is synchronous.
|
||||
* CreateTable and DeleteTable: Supported. Note our implementation is synchronous.
|
||||
* DescribeTable: Partial implementation. Missing creation date and size estimate.
|
||||
* UpdateTable: Not supported.
|
||||
* DescribeTable: Partial implementation. Missing creation date and size esitmate.
|
||||
* DeleteTable: Supported. Note our implementation is synchronous.
|
||||
* ListTables: Supported.
|
||||
### Item Operations
|
||||
* GetItem: Support almost complete except that projection expressions can
|
||||
only ask for top-level attributes.
|
||||
* PutItem: Support almost complete except that condition expressions can
|
||||
only refer to to-level attributes.
|
||||
pre-put content) not yet supported.
|
||||
* UpdateItem: Nested documents are supported but updates to nested attributes
|
||||
are not (e.g., `SET a.b[3].c=val`), and neither are nested attributes in
|
||||
condition expressions.
|
||||
@@ -90,15 +84,14 @@ progresses and compatibility continues to improve.
|
||||
* BatchWriteItem: Supported. Doesn't limit the number of items (DynamoDB
|
||||
limits to 25) or size of items (400 KB) or total request size (16 MB).
|
||||
### Scans
|
||||
* Scan: As usual, projection expressions only support top-level attributes.
|
||||
Filter expressions (to filter some of the items) partially supported:
|
||||
The ScanFilter syntax is supported but FilterExpression is not yet, and
|
||||
only equality operator is supported so far.
|
||||
The "Select" options which allows to count items instead of returning them
|
||||
is not yet supported. Parallel scan is not yet supported.
|
||||
* Query: Same issues as Scan above. Additionally, missing support for
|
||||
KeyConditionExpression (an alternative syntax replacing the older
|
||||
KeyConditions parameter which we do support).
|
||||
Scan and Query are mostly supported, with the following limitations:
|
||||
* As above, projection expressions only support top-level attributes.
|
||||
* Filter expressions (to filter some of the items) are only partially
|
||||
supported: The ScanFilter syntax is currently only supports the equality
|
||||
operator, and the FilterExpression syntax is not yet supported at all.
|
||||
* The "Select" options which allows to count items instead of returning them
|
||||
is not yet supported.
|
||||
* Parallel scan is not yet supported.
|
||||
### Secondary Indexes
|
||||
Global Secondary Indexes (GSI) and Local Secondary Indexes (LSI) are
|
||||
implemented, with the following limitations:
|
||||
@@ -116,24 +109,28 @@ implemented, with the following limitations:
|
||||
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
||||
or LOCAL_QUORUM (strong consistency).
|
||||
### Global Tables
|
||||
* Not yet supported: CreateGlobalTable, UpdateGlobalTable,
|
||||
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
|
||||
DescribeGlobalTableSettings. Implementation will use Scylla's multi-DC
|
||||
features.
|
||||
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
|
||||
be accessed from all of Scylla's DCs.
|
||||
* We do not yet support the DynamoDB API calls to make some of the tables
|
||||
global and others local to a particular DC: CreateGlobalTable,
|
||||
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
|
||||
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
|
||||
### Backup and Restore
|
||||
* On-demand backup: Not yet supported: CreateBackup, DescribeBackup,
|
||||
DeleteBackup, ListBackups, RestoreTableFromBackup. Implementation will
|
||||
use Scylla's snapshots
|
||||
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
|
||||
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
Users can use Scylla's [snapshots](https://docs.scylladb.com/operating-scylla/procedures/backup-restore/)
|
||||
or [Scylla Manager](https://docs.scylladb.com/operating-scylla/manager/2.0/backup/).
|
||||
* Continuous backup: Not yet supported: UpdateContinuousBackups,
|
||||
DescribeContinuousBackups, RestoreTableToPoinInTime.
|
||||
### Transations
|
||||
### Transactions
|
||||
* Not yet supported: TransactWriteItems, TransactGetItems.
|
||||
Note that this is a new DynamoDB feature - these are more powerful than
|
||||
the old conditional updates which were "lightweight transactions".
|
||||
### Streams (CDC)
|
||||
* Not yet supported
|
||||
### Streams
|
||||
* Scylla has experimental support for [CDC](https://docs.scylladb.com/using-scylla/cdc/)
|
||||
(change data capture), but the "DynamoDB Streams" API is not yet supported.
|
||||
### Encryption at rest
|
||||
* Supported natively by Scylla, but needs to be enabled by default.
|
||||
* Supported by Scylla Enterprise (not in open-source). Needs to be enabled.
|
||||
### ARNs and tags
|
||||
* ARN is generated for every alternator table
|
||||
* Tagging can be used with the help of the following requests:
|
||||
@@ -166,7 +163,9 @@ implemented, with the following limitations:
|
||||
* Not required. Scylla cache is rather advanced and there is no need to place
|
||||
a cache in front of the database: https://www.scylladb.com/2017/07/31/database-caches-not-good/
|
||||
### Metrics
|
||||
* Several metrics are available through the Grafana/Promethues stack: https://docs.scylladb.com/operating-scylla/monitoring/ It is different than the expectations of the current DynamoDB implementation. However, our
|
||||
* Several metrics are available through the Grafana/Prometheus stack:
|
||||
https://docs.scylladb.com/operating-scylla/monitoring/
|
||||
Those are different from the current DynamoDB metrics, but Scylla's
|
||||
monitoring is rather advanced and provide more insights to the internals.
|
||||
|
||||
## Alternator design and implementation
|
||||
@@ -229,8 +228,3 @@ one DynamoDB feature which we cannot support safely: we cannot modify
|
||||
a non-top-level attribute (e.g., a.b[3].c) directly without RMW. We plan
|
||||
to fix this in a future version by rethinking the data model we use for
|
||||
attributes, or rethinking our implementation of RMW (as explained above).
|
||||
|
||||
For reasons explained above, the data model used by Alternator to store
|
||||
data on disk is still in a state of flux, and may change in future versions.
|
||||
Therefore, in this early stage it is not recommended to store important
|
||||
production data using Alternator.
|
||||
|
||||
@@ -10,12 +10,10 @@ This section will guide you through the steps for setting up the cluster:
|
||||
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
||||
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
||||
add to every "docker run" command: `-p 8000:8000` before the image name
|
||||
and `--alternator-port=8000 --experimental 1` at the end. The
|
||||
"alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and "--experimental 1" is required to
|
||||
enable the experimental LWT feature which Alternator uses.
|
||||
and `--alternator-port=8000` at the end. The "alternator-port" option
|
||||
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
|
||||
For example,
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --experimental 1`
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
|
||||
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
|
||||
@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
|
||||
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
|
||||
Scylla with issue #4363 fixed)
|
||||
|
||||
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
|
||||
by Scylla with issue #6130 fixed)
|
||||
|
||||
## extension_attributes subcomponent
|
||||
|
||||
extension_attributes = extension_attribute_count extension_attribute*
|
||||
|
||||
@@ -110,10 +110,6 @@ feature_config feature_config_from_db_config(db::config& cfg) {
|
||||
fcfg.enable_cdc = true;
|
||||
}
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::LWT)) {
|
||||
fcfg.enable_lwt = true;
|
||||
}
|
||||
|
||||
return fcfg;
|
||||
}
|
||||
|
||||
@@ -178,9 +174,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
||||
if (_config.enable_cdc) {
|
||||
features.insert(gms::features::CDC);
|
||||
}
|
||||
if (_config.enable_lwt) {
|
||||
features.insert(gms::features::LWT);
|
||||
}
|
||||
features.insert(gms::features::LWT);
|
||||
|
||||
for (const sstring& s : _config.disabled_features) {
|
||||
features.erase(s);
|
||||
|
||||
@@ -41,7 +41,6 @@ struct feature_config {
|
||||
bool enable_sstables_mc_format = false;
|
||||
bool enable_user_defined_functions = false;
|
||||
bool enable_cdc = false;
|
||||
bool enable_lwt = false;
|
||||
std::set<sstring> disabled_features;
|
||||
feature_config();
|
||||
};
|
||||
|
||||
@@ -1725,8 +1725,12 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] {
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
generation_nbr = _cfg.force_gossip_generation();
|
||||
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||
}
|
||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||
local_state.mark_alive();
|
||||
|
||||
@@ -76,6 +76,8 @@ fedora_packages=(
|
||||
python3-psutil
|
||||
python3-cassandra-driver
|
||||
python3-colorama
|
||||
python3-boto3
|
||||
python3-pytest
|
||||
dnf-utils
|
||||
pigz
|
||||
net-tools
|
||||
|
||||
17
main.cc
17
main.cc
@@ -662,9 +662,17 @@ int main(int ac, char** av) {
|
||||
|
||||
supervisor::notify("starting tokens manager");
|
||||
token_metadata.start().get();
|
||||
auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
|
||||
token_metadata.stop().get();
|
||||
});
|
||||
// storage_proxy holds a reference on it and is not yet stopped.
|
||||
// what's worse is that the calltrace
|
||||
// storage_proxy::do_query
|
||||
// ::query_partition_key_range
|
||||
// ::query_partition_key_range_concurrent
|
||||
// leaves unwaited futures on the reactor and once it gets there
|
||||
// the token_metadata instance is accessed and ...
|
||||
//
|
||||
//auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
|
||||
// token_metadata.stop().get();
|
||||
//});
|
||||
|
||||
supervisor::notify("starting migration manager notifier");
|
||||
mm_notifier.start().get();
|
||||
@@ -1071,9 +1079,6 @@ int main(int ac, char** av) {
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
static sharded<alternator::server> alternator_server;
|
||||
|
||||
if (!cfg->check_experimental(db::experimental_features_t::LWT)) {
|
||||
throw std::runtime_error("Alternator enabled, but needs experimental LWT feature which wasn't enabled");
|
||||
}
|
||||
net::inet_address addr;
|
||||
try {
|
||||
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
|
||||
|
||||
@@ -452,6 +452,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::PAXOS_PREPARE:
|
||||
case messaging_verb::PAXOS_ACCEPT:
|
||||
case messaging_verb::PAXOS_LEARN:
|
||||
case messaging_verb::PAXOS_PRUNE:
|
||||
return 0;
|
||||
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
|
||||
// sent on a different connection to avoid potential deadlocks
|
||||
@@ -1179,14 +1180,14 @@ future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repai
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version)>&& func) {
|
||||
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_row_level_start() {
|
||||
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
|
||||
}
|
||||
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version));
|
||||
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||
@@ -1281,6 +1282,19 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point
|
||||
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
||||
}
|
||||
|
||||
void messaging_service::register_paxos_prune(std::function<future<rpc::no_wait_type>(
|
||||
const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info>)>&& func) {
|
||||
register_handler(this, messaging_verb::PAXOS_PRUNE, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_paxos_prune() {
|
||||
return unregister_handler(netw::messaging_verb::PAXOS_PRUNE);
|
||||
}
|
||||
future<>
|
||||
messaging_service::send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id,
|
||||
const partition_key& key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_PRUNE, netw::msg_addr(peer), schema_id, key, ballot, std::move(trace_info));
|
||||
}
|
||||
|
||||
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));
|
||||
|
||||
@@ -139,7 +139,8 @@ enum class messaging_verb : int32_t {
|
||||
PAXOS_ACCEPT = 40,
|
||||
PAXOS_LEARN = 41,
|
||||
HINT_MUTATION = 42,
|
||||
LAST = 43,
|
||||
PAXOS_PRUNE = 43,
|
||||
LAST = 44,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -341,9 +342,9 @@ public:
|
||||
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version)>&& func);
|
||||
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func);
|
||||
future<> unregister_repair_row_level_start();
|
||||
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version);
|
||||
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason);
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
|
||||
@@ -493,6 +494,14 @@ public:
|
||||
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
std::optional<tracing::trace_info> trace_info = std::nullopt);
|
||||
|
||||
void register_paxos_prune(std::function<future<rpc::no_wait_type>(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key,
|
||||
utils::UUID ballot, std::optional<tracing::trace_info>)>&& func);
|
||||
|
||||
future<> unregister_paxos_prune();
|
||||
|
||||
future<> send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id, const partition_key& key,
|
||||
utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
|
||||
|
||||
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
|
||||
future<> unregister_hint_mutation();
|
||||
|
||||
@@ -672,7 +672,8 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
const std::vector<sstring>& cfs_,
|
||||
int id_,
|
||||
const std::vector<sstring>& data_centers_,
|
||||
const std::vector<sstring>& hosts_)
|
||||
const std::vector<sstring>& hosts_,
|
||||
streaming::stream_reason reason_)
|
||||
: db(db_)
|
||||
, partitioner(get_partitioner_for_tables(db_, keyspace_, cfs_))
|
||||
, keyspace(keyspace_)
|
||||
@@ -682,6 +683,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
, shard(engine().cpu_id())
|
||||
, data_centers(data_centers_)
|
||||
, hosts(hosts_)
|
||||
, reason(reason_)
|
||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
|
||||
}
|
||||
|
||||
@@ -1462,7 +1464,7 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
|
||||
data_centers = options.data_centers, hosts = options.hosts] (database& localdb) mutable {
|
||||
auto ri = make_lw_shared<repair_info>(db,
|
||||
std::move(keyspace), std::move(ranges), std::move(cfs),
|
||||
id, std::move(data_centers), std::move(hosts));
|
||||
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1524,14 +1526,15 @@ future<> repair_abort_all(seastar::sharded<database>& db) {
|
||||
future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
sstring keyspace,
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors) {
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason) {
|
||||
if (ranges.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
|
||||
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||
int id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
|
||||
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
|
||||
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||
auto cfs = list_column_families(db.local(), keyspace);
|
||||
if (cfs.empty()) {
|
||||
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
|
||||
@@ -1540,12 +1543,12 @@ future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors] (database& localdb) mutable {
|
||||
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors, reason] (database& localdb) mutable {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ri = make_lw_shared<repair_info>(service::get_local_storage_service().db(),
|
||||
std::move(keyspace), std::move(ranges), std::move(cfs),
|
||||
id, std::move(data_centers), std::move(hosts));
|
||||
id, std::move(data_centers), std::move(hosts), reason);
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
@@ -1584,6 +1587,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces);
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
if (!db.local().has_keyspace(keyspace_name)) {
|
||||
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
|
||||
@@ -1716,7 +1720,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
||||
}
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources)).get();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
|
||||
@@ -1730,6 +1734,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db,
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
bool is_removenode = myip != leaving_node;
|
||||
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
|
||||
streaming::stream_reason reason = is_removenode ? streaming::stream_reason::removenode : streaming::stream_reason::decommission;
|
||||
rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node);
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
if (!db.local().has_keyspace(keyspace_name)) {
|
||||
@@ -1867,7 +1872,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db,
|
||||
ranges.swap(ranges_for_removenode);
|
||||
}
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||
}
|
||||
@@ -1883,8 +1888,8 @@ future<> removenode_with_repair(seastar::sharded<database>& db, locator::token_m
|
||||
return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node));
|
||||
}
|
||||
|
||||
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc) {
|
||||
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op)] () mutable {
|
||||
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc, streaming::stream_reason reason) {
|
||||
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op), reason] () mutable {
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
@@ -1921,7 +1926,7 @@ future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator:
|
||||
}
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||
@@ -1933,11 +1938,13 @@ future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_meta
|
||||
if (source_dc.empty()) {
|
||||
source_dc = get_local_dc();
|
||||
}
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
|
||||
auto reason = streaming::stream_reason::rebuild;
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
|
||||
}
|
||||
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm) {
|
||||
auto op = sstring("replace_with_repair");
|
||||
auto source_dc = get_local_dc();
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
|
||||
}
|
||||
|
||||
@@ -181,6 +181,7 @@ public:
|
||||
shard_id shard;
|
||||
std::vector<sstring> data_centers;
|
||||
std::vector<sstring> hosts;
|
||||
streaming::stream_reason reason;
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
|
||||
size_t nr_failed_ranges = 0;
|
||||
bool aborted = false;
|
||||
@@ -211,7 +212,8 @@ public:
|
||||
const std::vector<sstring>& cfs_,
|
||||
int id_,
|
||||
const std::vector<sstring>& data_centers_,
|
||||
const std::vector<sstring>& hosts_);
|
||||
const std::vector<sstring>& hosts_,
|
||||
streaming::stream_reason reason_);
|
||||
future<> do_streaming();
|
||||
void check_failed_ranges();
|
||||
future<> request_transfer_ranges(const sstring& cf,
|
||||
|
||||
@@ -451,14 +451,17 @@ class repair_writer {
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
streaming::stream_reason _reason;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
uint64_t estimated_partitions,
|
||||
size_t nr_peer_nodes)
|
||||
size_t nr_peer_nodes,
|
||||
streaming::stream_reason reason)
|
||||
: _schema(std::move(schema))
|
||||
, _estimated_partitions(estimated_partitions)
|
||||
, _nr_peer_nodes(nr_peer_nodes) {
|
||||
, _nr_peer_nodes(nr_peer_nodes)
|
||||
, _reason(reason) {
|
||||
init_writer();
|
||||
}
|
||||
|
||||
@@ -495,9 +498,9 @@ public:
|
||||
table& t = db.local().find_column_family(_schema->id());
|
||||
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema,
|
||||
make_generating_reader(_schema, std::move(get_next_mutation_fragment)),
|
||||
[&db, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
||||
auto& t = db.local().find_column_family(reader.schema());
|
||||
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, streaming::stream_reason::repair).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
||||
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
auto metadata = mutation_source_metadata{};
|
||||
auto& cs = t->get_compaction_strategy();
|
||||
@@ -590,6 +593,7 @@ private:
|
||||
repair_master _repair_master;
|
||||
gms::inet_address _myip;
|
||||
uint32_t _repair_meta_id;
|
||||
streaming::stream_reason _reason;
|
||||
// Repair master's sharding configuration
|
||||
shard_config _master_node_shard_config;
|
||||
// Partitioner of repair master
|
||||
@@ -653,6 +657,7 @@ public:
|
||||
uint64_t seed,
|
||||
repair_master master,
|
||||
uint32_t repair_meta_id,
|
||||
streaming::stream_reason reason,
|
||||
shard_config master_node_shard_config,
|
||||
size_t nr_peer_nodes = 1)
|
||||
: _db(db)
|
||||
@@ -666,6 +671,7 @@ public:
|
||||
, _repair_master(master)
|
||||
, _myip(utils::fb_utilities::get_broadcast_address())
|
||||
, _repair_meta_id(repair_meta_id)
|
||||
, _reason(reason)
|
||||
, _master_node_shard_config(std::move(master_node_shard_config))
|
||||
, _remote_partitioner(make_remote_partitioner())
|
||||
, _same_sharding_config(is_same_sharding_config())
|
||||
@@ -681,7 +687,7 @@ public:
|
||||
_seed,
|
||||
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
||||
)
|
||||
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes)
|
||||
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes, _reason)
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
||||
@@ -731,7 +737,8 @@ public:
|
||||
uint64_t max_row_buf_size,
|
||||
uint64_t seed,
|
||||
shard_config master_node_shard_config,
|
||||
table_schema_version schema_version) {
|
||||
table_schema_version schema_version,
|
||||
streaming::stream_reason reason) {
|
||||
return service::get_schema_for_write(schema_version, {from, src_cpu_id}).then([from,
|
||||
repair_meta_id,
|
||||
range,
|
||||
@@ -739,7 +746,8 @@ public:
|
||||
max_row_buf_size,
|
||||
seed,
|
||||
master_node_shard_config,
|
||||
schema_version] (schema_ptr s) {
|
||||
schema_version,
|
||||
reason] (schema_ptr s) {
|
||||
auto& db = service::get_local_storage_proxy().get_db();
|
||||
auto& cf = db.local().find_column_family(s->id());
|
||||
node_repair_meta_id id{from, repair_meta_id};
|
||||
@@ -752,6 +760,7 @@ public:
|
||||
seed,
|
||||
repair_meta::repair_master::no,
|
||||
repair_meta_id,
|
||||
reason,
|
||||
std::move(master_node_shard_config));
|
||||
bool insertion = repair_meta_map().emplace(id, rm).second;
|
||||
if (!insertion) {
|
||||
@@ -1412,28 +1421,28 @@ public:
|
||||
|
||||
// RPC API
|
||||
future<>
|
||||
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version) {
|
||||
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
return netw::get_local_messaging_service().send_repair_row_level_start(msg_addr(remote_node),
|
||||
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), _algo, _max_row_buf_size, _seed,
|
||||
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, _master_node_shard_config.partitioner_name, std::move(schema_version));
|
||||
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, _master_node_shard_config.partitioner_name, std::move(schema_version), reason);
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
static future<>
|
||||
repair_row_level_start_handler(gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
|
||||
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
|
||||
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version) {
|
||||
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason) {
|
||||
if (!_sys_dist_ks->local_is_initialized() || !_view_update_generator->local_is_initialized()) {
|
||||
return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
|
||||
utils::fb_utilities::get_broadcast_address())));
|
||||
}
|
||||
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
|
||||
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
|
||||
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version));
|
||||
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason);
|
||||
}
|
||||
|
||||
// RPC API
|
||||
@@ -2104,15 +2113,16 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
});
|
||||
ms.register_repair_row_level_start([] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
|
||||
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version) {
|
||||
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
|
||||
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, remote_partitioner_name, schema_version] () mutable {
|
||||
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, remote_partitioner_name, schema_version, reason] () mutable {
|
||||
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
|
||||
return repair_meta::repair_row_level_start_handler(from, src_cpu_id, repair_meta_id, std::move(ks_name),
|
||||
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
|
||||
shard_config{remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name)},
|
||||
schema_version);
|
||||
schema_version, r);
|
||||
});
|
||||
});
|
||||
ms.register_repair_row_level_stop([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
@@ -2442,6 +2452,7 @@ public:
|
||||
_seed,
|
||||
repair_meta::repair_master::yes,
|
||||
repair_meta_id,
|
||||
_ri.reason,
|
||||
std::move(master_node_shard_config),
|
||||
_all_live_peer_nodes.size());
|
||||
|
||||
@@ -2456,7 +2467,7 @@ public:
|
||||
nodes_to_stop.reserve(_all_nodes.size());
|
||||
try {
|
||||
parallel_for_each(_all_nodes, [&, this] (const gms::inet_address& node) {
|
||||
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version).then([&] () {
|
||||
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version, _ri.reason).then([&] () {
|
||||
nodes_to_stop.push_back(node);
|
||||
return master.repair_get_estimated_partitions(node).then([this, node] (uint64_t partitions) {
|
||||
rlogger.trace("Get repair_get_estimated_partitions for node={}, estimated_partitions={}", node, partitions);
|
||||
|
||||
@@ -319,10 +319,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
||||
+ column_offset(column_kind::regular_column),
|
||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||
|
||||
std::sort(_raw._columns.begin(),
|
||||
std::stable_sort(_raw._columns.begin(),
|
||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 92c488706c...76260705ef
@@ -190,4 +190,11 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
|
||||
});
|
||||
}
|
||||
|
||||
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state) {
|
||||
logger.debug("Delete paxos state for ballot {}", ballot);
|
||||
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);
|
||||
return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout);
|
||||
}
|
||||
|
||||
} // end of namespace "service::paxos"
|
||||
|
||||
@@ -124,6 +124,9 @@ public:
|
||||
clock_type::time_point timeout);
|
||||
// Replica RPC endpoint for Paxos "learn".
|
||||
static future<> learn(schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
|
||||
// Replica RPC endpoint for pruning Paxos table
|
||||
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state);
|
||||
};
|
||||
|
||||
} // end of namespace "service::paxos"
|
||||
|
||||
@@ -171,6 +171,7 @@ public:
|
||||
const schema_ptr& schema() {
|
||||
return _schema;
|
||||
}
|
||||
// called only when all replicas replied
|
||||
virtual void release_mutation() = 0;
|
||||
};
|
||||
|
||||
@@ -300,9 +301,10 @@ public:
|
||||
|
||||
class cas_mutation : public mutation_holder {
|
||||
lw_shared_ptr<paxos::proposal> _proposal;
|
||||
shared_ptr<paxos_response_handler> _handler;
|
||||
public:
|
||||
explicit cas_mutation(paxos::proposal proposal , schema_ptr s)
|
||||
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))) {
|
||||
explicit cas_mutation(paxos::proposal proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
|
||||
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))), _handler(std::move(handler)) {
|
||||
_size = _proposal->update.representation().size();
|
||||
_schema = std::move(s);
|
||||
}
|
||||
@@ -327,7 +329,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
virtual void release_mutation() override {
|
||||
_proposal.release();
|
||||
// The handler will be set for "learn", but not for PAXOS repair
|
||||
// since repair may not include all replicas
|
||||
if (_handler) {
|
||||
_handler->prune(_proposal->ballot);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1184,6 +1190,12 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
|
||||
return f;
|
||||
}
|
||||
|
||||
// debug output in mutate_internal needs this
|
||||
std::ostream& operator<<(std::ostream& os, const paxos_response_handler& h) {
|
||||
os << "paxos_response_handler{" << h.id() << "}";
|
||||
return os;
|
||||
}
|
||||
|
||||
// This function implements learning stage of Paxos protocol
|
||||
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
|
||||
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
|
||||
@@ -1219,12 +1231,41 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
|
||||
}
|
||||
|
||||
// Path for the "base" mutations
|
||||
std::array<std::tuple<paxos::proposal, schema_ptr, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, _key.token())};
|
||||
std::array<std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
|
||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
|
||||
|
||||
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
|
||||
}
|
||||
|
||||
void paxos_response_handler::prune(utils::UUID ballot) {
|
||||
if (_has_dead_endpoints) {
|
||||
return;
|
||||
}
|
||||
if ( _proxy->get_stats().cas_now_pruning >= pruning_limit) {
|
||||
_proxy->get_stats().cas_coordinator_dropped_prune++;
|
||||
return;
|
||||
}
|
||||
_proxy->get_stats().cas_now_pruning++;
|
||||
_proxy->get_stats().cas_prune++;
|
||||
// running in the background, but the amount of the bg job is limited by pruning_limit
|
||||
// it is waited by holding shared pointer to storage_proxy which guaranties
|
||||
// that storage_proxy::stop() will wait for this to complete
|
||||
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
|
||||
return futurize_apply([&] {
|
||||
if (fbu::is_me(peer)) {
|
||||
tracing::trace(tr_state, "prune: prune {} locally", ballot);
|
||||
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
|
||||
} else {
|
||||
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
|
||||
netw::messaging_service& ms = netw::get_local_messaging_service();
|
||||
return ms.send_paxos_prune(peer, _timeout, _schema->version(), _key.key(), ballot, tracing::make_trace_info(tr_state));
|
||||
}
|
||||
});
|
||||
}).finally([h = shared_from_this()] {
|
||||
h->_proxy->get_stats().cas_now_pruning--;
|
||||
});
|
||||
}
|
||||
|
||||
static std::vector<gms::inet_address>
|
||||
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
|
||||
std::vector<gms::inet_address> endpoints;
|
||||
@@ -1571,6 +1612,14 @@ void storage_proxy_stats::stats::register_stats() {
|
||||
sm::make_histogram("cas_write_contention", sm::description("how many contended writes were encountered"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()},
|
||||
[this]{ return cas_write_contention.get_histogram(1, 8);}),
|
||||
|
||||
sm::make_total_operations("cas_prune", cas_prune,
|
||||
sm::description("how many times paxos prune was done after successful cas operation"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
|
||||
sm::make_total_operations("cas_dropped_prune", cas_coordinator_dropped_prune,
|
||||
sm::description("how many times a coordinator did not perfom prune after cas"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
});
|
||||
|
||||
_metrics.add_group(REPLICA_STATS_CATEGORY, {
|
||||
@@ -1606,6 +1655,9 @@ void storage_proxy_stats::stats::register_stats() {
|
||||
sm::description("number of operations that crossed a shard boundary"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
|
||||
sm::make_total_operations("cas_dropped_prune", cas_replica_dropped_prune,
|
||||
sm::description("how many times a coordinator did not perfom prune after cas"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1879,11 +1931,11 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
|
||||
}
|
||||
|
||||
storage_proxy::response_id_type
|
||||
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& meta,
|
||||
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
|
||||
auto& [commit, s, t] = meta;
|
||||
auto& [commit, s, h, t] = meta;
|
||||
|
||||
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s), cl,
|
||||
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
|
||||
db::write_type::CAS, tr_state, std::move(permit));
|
||||
}
|
||||
|
||||
@@ -1898,7 +1950,7 @@ storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, s
|
||||
auto keyspace_name = s->ks_name();
|
||||
keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
||||
|
||||
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s), std::move(endpoints),
|
||||
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
||||
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
|
||||
}
|
||||
|
||||
@@ -2146,6 +2198,8 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
||||
cl_for_paxos, participants + 1, live_endpoints.size());
|
||||
}
|
||||
|
||||
bool dead = participants != live_endpoints.size();
|
||||
|
||||
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
|
||||
// If the values received from different replicas match, we skip a separate query stage thus saving
|
||||
// one network round trip. To generate less traffic, only closest replicas send data, others send
|
||||
@@ -2153,7 +2207,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
||||
// list of participants by proximity to this instance.
|
||||
sort_endpoints_by_proximity(live_endpoints);
|
||||
|
||||
return paxos_participants{std::move(live_endpoints), required_participants};
|
||||
return paxos_participants{std::move(live_endpoints), required_participants, dead};
|
||||
}
|
||||
|
||||
|
||||
@@ -4942,6 +4996,42 @@ void storage_proxy::init_messaging_service() {
|
||||
|
||||
return f;
|
||||
});
|
||||
ms.register_paxos_prune([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||
static thread_local uint16_t pruning = 0;
|
||||
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
|
||||
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||
auto src_ip = src_addr.addr;
|
||||
tracing::trace_state_ptr tr_state;
|
||||
if (trace_info) {
|
||||
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
|
||||
tracing::begin(tr_state);
|
||||
tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_ip, ballot);
|
||||
}
|
||||
|
||||
if (pruning >= pruning_limit) {
|
||||
get_stats().cas_replica_dropped_prune++;
|
||||
tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_ip);
|
||||
return make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
|
||||
}
|
||||
|
||||
pruning++;
|
||||
return get_schema_for_read(schema_id, src_addr).then([this, key = std::move(key), ballot,
|
||||
timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
|
||||
dht::token token = dht::get_token(*schema, key);
|
||||
unsigned shard = dht::shard_of(*schema, token);
|
||||
bool local = shard == engine().cpu_id();
|
||||
get_stats().replica_cross_shard_ops += !local;
|
||||
return smp::submit_to(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
|
||||
local, key = std::move(key), ballot, timeout, src_ip, d = defer([] { pruning--; })] () {
|
||||
tracing::trace_state_ptr tr_state = gt;
|
||||
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
|
||||
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
|
||||
return netw::messaging_service::no_wait();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_proxy::uninit_messaging_service() {
|
||||
@@ -4956,7 +5046,8 @@ future<> storage_proxy::uninit_messaging_service() {
|
||||
ms.unregister_truncate(),
|
||||
ms.unregister_paxos_prepare(),
|
||||
ms.unregister_paxos_accept(),
|
||||
ms.unregister_paxos_learn()
|
||||
ms.unregister_paxos_learn(),
|
||||
ms.unregister_paxos_prune()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -242,6 +242,7 @@ public:
|
||||
std::vector<gms::inet_address> endpoints;
|
||||
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
|
||||
size_t required_participants;
|
||||
bool has_dead_endpoints;
|
||||
};
|
||||
|
||||
const gms::feature_service& features() const { return _features; }
|
||||
@@ -317,7 +318,7 @@ private:
|
||||
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& proposal,
|
||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
@@ -634,6 +635,11 @@ private:
|
||||
db::consistency_level _cl_for_learn;
|
||||
// Live endpoints, as per get_paxos_participants()
|
||||
std::vector<gms::inet_address> _live_endpoints;
|
||||
// True if there are dead endpoints
|
||||
// We don't include endpoints known to be unavailable in pending
|
||||
// endpoints list, but need to be aware of them to avoid pruning
|
||||
// system.paxos data if some endpoint is missing a Paxos write.
|
||||
bool _has_dead_endpoints;
|
||||
// How many endpoints need to respond favourably for the protocol to progress to the next step.
|
||||
size_t _required_participants;
|
||||
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms
|
||||
@@ -651,6 +657,9 @@ private:
|
||||
// Unique request id for logging purposes.
|
||||
const uint64_t _id = next_id++;
|
||||
|
||||
// max pruning operations to run in parralel
|
||||
static constexpr uint16_t pruning_limit = 1000;
|
||||
|
||||
public:
|
||||
tracing::trace_state_ptr tr_state;
|
||||
|
||||
@@ -674,6 +683,7 @@ public:
|
||||
storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(_schema->ks_name(), _key.token(), _cl_for_paxos);
|
||||
_live_endpoints = std::move(pp.endpoints);
|
||||
_required_participants = pp.required_participants;
|
||||
_has_dead_endpoints = pp.has_dead_endpoints;
|
||||
tracing::trace(tr_state, "Create paxos_response_handler for token {} with live: {} and required participants: {}",
|
||||
_key.token(), _live_endpoints, _required_participants);
|
||||
}
|
||||
@@ -691,6 +701,7 @@ public:
|
||||
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
|
||||
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
|
||||
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
|
||||
void prune(utils::UUID ballot);
|
||||
uint64_t id() const {
|
||||
return _id;
|
||||
}
|
||||
|
||||
@@ -116,6 +116,11 @@ struct write_stats {
|
||||
uint64_t cas_write_condition_not_met = 0;
|
||||
uint64_t cas_write_timeout_due_to_uncertainty = 0;
|
||||
uint64_t cas_failed_read_round_optimization = 0;
|
||||
uint16_t cas_now_pruning = 0;
|
||||
uint64_t cas_prune = 0;
|
||||
uint64_t cas_coordinator_dropped_prune = 0;
|
||||
uint64_t cas_replica_dropped_prune = 0;
|
||||
|
||||
|
||||
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
|
||||
public:
|
||||
|
||||
@@ -3409,10 +3409,13 @@ void feature_enabled_listener::on_enabled() {
|
||||
|
||||
future<> read_sstables_format(distributed<storage_service>& ss) {
|
||||
return db::system_keyspace::get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME).then([&ss] (std::optional<sstring> format_opt) {
|
||||
sstables::sstable_version_types format = sstables::from_string(format_opt.value_or("ka"));
|
||||
return ss.invoke_on_all([format] (storage_service& s) {
|
||||
s._sstables_format = format;
|
||||
});
|
||||
if (format_opt) {
|
||||
sstables::sstable_version_types format = sstables::from_string(*format_opt);
|
||||
return ss.invoke_on_all([format] (storage_service& s) {
|
||||
s._sstables_format = format;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -312,7 +312,13 @@ private:
|
||||
*/
|
||||
std::optional<db_clock::time_point> _cdc_streams_ts;
|
||||
|
||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
|
||||
// _sstables_format is the format used for writing new sstables.
|
||||
// Here we set its default value, but if we discover that all the nodes
|
||||
// in the cluster support a newer format, _sstables_format will be set to
|
||||
// that format. read_sstables_format() also overwrites _sstables_format
|
||||
// if an sstable format was chosen earlier (and this choice was persisted
|
||||
// in the system table).
|
||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::la;
|
||||
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
|
||||
feature_enabled_listener _la_feature_listener;
|
||||
feature_enabled_listener _mc_feature_listener;
|
||||
|
||||
@@ -72,47 +72,8 @@ private:
|
||||
static std::vector<column_info> build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static);
|
||||
|
||||
utils::UUID schema_uuid;
|
||||
std::vector<column_info> regular_schema_columns_from_sstable;
|
||||
@@ -125,10 +86,10 @@ private:
|
||||
state(state&&) = default;
|
||||
state& operator=(state&&) = default;
|
||||
|
||||
state(const schema& s, const serialization_header& header)
|
||||
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
|
||||
: schema_uuid(s.version())
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
|
||||
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
|
||||
{}
|
||||
};
|
||||
@@ -136,9 +97,10 @@ private:
|
||||
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
|
||||
|
||||
public:
|
||||
column_translation get_for_schema(const schema& s, const serialization_header& header) {
|
||||
column_translation get_for_schema(
|
||||
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
|
||||
if (s.version() != _state->schema_uuid) {
|
||||
_state = make_lw_shared(state(s, header));
|
||||
_state = make_lw_shared(state(s, header, features));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@
|
||||
*/
|
||||
|
||||
#include "mp_row_consumer.hh"
|
||||
#include "column_translation.hh"
|
||||
#include "concrete_types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
|
||||
return ccb.build(timestamp);
|
||||
}
|
||||
|
||||
// See #6130.
|
||||
static data_type freeze_types_in_collections(data_type t) {
|
||||
return ::visit(*t, make_visitor(
|
||||
[] (const map_type_impl& typ) -> data_type {
|
||||
return map_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_keys_type()->freeze()),
|
||||
freeze_types_in_collections(typ.get_values_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const set_type_impl& typ) -> data_type {
|
||||
return set_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const list_type_impl& typ) -> data_type {
|
||||
return list_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[&] (const abstract_type& typ) -> data_type {
|
||||
return std::move(t);
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
|
||||
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
|
||||
* but that should never happen. */
|
||||
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
|
||||
return features.enabled_features;
|
||||
}
|
||||
|
||||
std::vector<column_translation::column_info> column_translation::state::build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
|
||||
// See #6130.
|
||||
type = freeze_types_in_collections(std::move(type));
|
||||
}
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1348,7 +1348,7 @@ public:
|
||||
, _consumer(consumer)
|
||||
, _sst(sst)
|
||||
, _header(sst->get_serialization_header())
|
||||
, _column_translation(sst->get_column_translation(s, _header))
|
||||
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
|
||||
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
||||
{
|
||||
setup_columns(_regular_row, _column_translation.regular_columns());
|
||||
|
||||
@@ -792,8 +792,9 @@ public:
|
||||
const serialization_header& get_serialization_header() const {
|
||||
return get_mutable_serialization_header(*_components);
|
||||
}
|
||||
column_translation get_column_translation(const schema& s, const serialization_header& h) {
|
||||
return _column_translation.get_for_schema(s, h);
|
||||
column_translation get_column_translation(
|
||||
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
|
||||
return _column_translation.get_for_schema(s, h, f);
|
||||
}
|
||||
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
||||
return _shards;
|
||||
|
||||
@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
|
||||
ShadowableTombstones = 2, // See #3885
|
||||
CorrectStaticCompact = 3, // See #4139
|
||||
CorrectEmptyCounters = 4, // See #4363
|
||||
End = 5,
|
||||
CorrectUDTsInCollections = 5, // See #6130
|
||||
End = 6,
|
||||
};
|
||||
|
||||
// Scylla-specific features enabled for a particular sstable.
|
||||
|
||||
36
test.py
36
test.py
@@ -203,6 +203,17 @@ class CqlTestSuite(TestSuite):
|
||||
def pattern(self):
|
||||
return "*_test.cql"
|
||||
|
||||
class RunTestSuite(TestSuite):
|
||||
"""TestSuite for test directory with a 'run' script """
|
||||
|
||||
def add_test(self, shortname, mode, options):
|
||||
test = RunTest(self.next_id, shortname, self, mode, options)
|
||||
self.tests.append(test)
|
||||
|
||||
@property
|
||||
def pattern(self):
|
||||
return "run"
|
||||
|
||||
|
||||
class Test:
|
||||
"""Base class for CQL, Unit and Boost tests"""
|
||||
@@ -332,6 +343,24 @@ class CqlTest(Test):
|
||||
if self.is_equal_result is False:
|
||||
print_unidiff(self.result, self.reject)
|
||||
|
||||
class RunTest(Test):
|
||||
"""Run tests in a directory started by a run script"""
|
||||
|
||||
def __init__(self, test_no, shortname, suite, mode, options):
|
||||
super().__init__(test_no, shortname, suite, mode, options)
|
||||
self.path = os.path.join(suite.path, shortname)
|
||||
self.xmlout = os.path.join(options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
|
||||
self.args = ["--junit-xml={}".format(self.xmlout)]
|
||||
|
||||
def print_summary(self):
|
||||
print("Output of {} {}:".format(self.path, " ".join(self.args)))
|
||||
print(read_log(self.log_filename))
|
||||
|
||||
async def run(self, options):
|
||||
# This test can and should be killed gently, with SIGTERM, not with SIGKILL
|
||||
self.success = await run_test(self, options, gentle_kill=True)
|
||||
logging.info("Test #%d %s", self.id, "succeeded" if self.success else "failed ")
|
||||
return self
|
||||
|
||||
class TabularConsoleOutput:
|
||||
"""Print test progress to the console"""
|
||||
@@ -375,7 +404,7 @@ class TabularConsoleOutput:
|
||||
print(msg)
|
||||
|
||||
|
||||
async def run_test(test, options):
|
||||
async def run_test(test, options, gentle_kill=False):
|
||||
"""Run test program, return True if success else False"""
|
||||
|
||||
with open(test.log_filename, "wb") as log:
|
||||
@@ -423,7 +452,10 @@ async def run_test(test, options):
|
||||
return True
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
||||
if process is not None:
|
||||
process.kill()
|
||||
if gentle_kill:
|
||||
process.terminate()
|
||||
else:
|
||||
process.kill()
|
||||
stdout, _ = await process.communicate()
|
||||
if isinstance(e, asyncio.TimeoutError):
|
||||
report_error("Test timed out")
|
||||
|
||||
@@ -54,6 +54,8 @@ def pytest_addoption(parser):
|
||||
parser.addoption("--https", action="store_true",
|
||||
help="communicate via HTTPS protocol on port 8043 instead of HTTP when"
|
||||
" running against a local Scylla installation")
|
||||
parser.addoption("--url", action="store",
|
||||
help="communicate with given URL instead of defaults")
|
||||
|
||||
# "dynamodb" fixture: set up client object for communicating with the DynamoDB
|
||||
# API. Currently this chooses either Amazon's DynamoDB in the default region
|
||||
@@ -70,7 +72,10 @@ def dynamodb(request):
|
||||
# requires us to specify dummy region and credential parameters,
|
||||
# otherwise the user is forced to properly configure ~/.aws even
|
||||
# for local runs.
|
||||
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
|
||||
if request.config.getoption('url') != None:
|
||||
local_url = request.config.getoption('url')
|
||||
else:
|
||||
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
|
||||
# Disable verifying in order to be able to use self-signed TLS certificates
|
||||
verify = not request.config.getoption('https')
|
||||
return boto3.resource('dynamodb', endpoint_url=local_url, verify=verify,
|
||||
@@ -4,24 +4,31 @@
|
||||
set -e
|
||||
|
||||
script_path=$(dirname $(readlink -e $0))
|
||||
source_path=$script_path/../..
|
||||
|
||||
# By default, we take the latest build/*/scylla as the executable:
|
||||
SCYLLA=${SCYLLA-$(ls -t "$script_path/../build/"*"/scylla" | head -1)}
|
||||
SCYLLA=${SCYLLA-$(ls -t "$source_path/build/"*"/scylla" | head -1)}
|
||||
SCYLLA=$(readlink -f "$SCYLLA")
|
||||
SCYLLA_IP=${IP-127.0.0.1}
|
||||
CPUSET=${CPUSET-0}
|
||||
CQLSH=${CQLSH-cqlsh}
|
||||
|
||||
# We need to use cqlsh to set up the authentication credentials expected by
|
||||
# some of the tests that check check authentication. If cqlsh is not installed
|
||||
# there isn't much point of even starting Scylla
|
||||
if ! type "$CQLSH" >/dev/null 2>&1
|
||||
# Below, we need to use python3 and the Cassandra drive to set up the
|
||||
# authentication credentials expected by some of the tests that check
|
||||
# authentication. If they are not installed there isn't much point of
|
||||
# even starting Scylla
|
||||
if ! python3 -c 'from cassandra.cluster import Cluster' >/dev/null 2>&1
|
||||
then
|
||||
echo "Error: cannot find '$CQLSH', needed for configuring Alternator authentication." >&2
|
||||
echo "Please install $CQLSH in your path, or set CQLSH to its location." >&2
|
||||
echo "Error: python3 and python3-cassandra-driver must be installed to configure Alternator authentication." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Pick a loopback IP address for Scylla to run, in an attempt not to collide
|
||||
# other concurrent runs of Scylla. CCM uses 127.0.0.<nodenum>, so if we use
|
||||
# 127.1.*.* which cannot collide with it. Moreover, we'll take the last two
|
||||
# bytes of the address from the current process - so as to allow multiple
|
||||
# concurrent runs of this code to use a different address.
|
||||
SCYLLA_IP=127.1.$(($$ >> 8 & 255)).$(($$ & 255))
|
||||
echo "Running Scylla on $SCYLLA_IP"
|
||||
|
||||
tmp_dir=/tmp/alternator-test-$$
|
||||
mkdir $tmp_dir
|
||||
|
||||
@@ -52,6 +59,7 @@ trap 'cleanup' EXIT
|
||||
# to work. We only need to do this if the "--https" option was explicitly
|
||||
# passed - otherwise the test would not use HTTPS anyway.
|
||||
alternator_port_option="--alternator-port=8000"
|
||||
alternator_url="http://$SCYLLA_IP:8000"
|
||||
for i
|
||||
do
|
||||
if [ "$i" = --https ]
|
||||
@@ -59,17 +67,20 @@ do
|
||||
openssl genrsa 2048 > "$tmp_dir/scylla.key"
|
||||
openssl req -new -x509 -nodes -sha256 -days 365 -subj "/C=IL/ST=None/L=None/O=None/OU=None/CN=example.com" -key "$tmp_dir/scylla.key" -out "$tmp_dir/scylla.crt"
|
||||
alternator_port_option="--alternator-https-port=8043"
|
||||
alternator_url="https://$SCYLLA_IP:8043"
|
||||
fi
|
||||
done
|
||||
"$SCYLLA" --options-file "$script_path/../conf/scylla.yaml" \
|
||||
--alternator-address $SCYLLA_IP \
|
||||
"$SCYLLA" --options-file "$source_path/conf/scylla.yaml" \
|
||||
--alternator-address $SCYLLA_IP \
|
||||
$alternator_port_option \
|
||||
--alternator-enforce-authorization=1 \
|
||||
--experimental=on --developer-mode=1 \
|
||||
--developer-mode=1 \
|
||||
--ring-delay-ms 0 --collectd 0 \
|
||||
--cpuset "$CPUSET" -m 1G \
|
||||
--api-address $SCYLLA_IP --rpc-address $SCYLLA_IP \
|
||||
--api-address $SCYLLA_IP \
|
||||
--rpc-address $SCYLLA_IP \
|
||||
--listen-address $SCYLLA_IP \
|
||||
--prometheus-address $SCYLLA_IP \
|
||||
--seed-provider-parameters seeds=$SCYLLA_IP \
|
||||
--workdir "$tmp_dir" \
|
||||
--server-encryption-options keyfile="$tmp_dir/scylla.key" \
|
||||
@@ -79,8 +90,11 @@ done
|
||||
SCYLLA_PROCESS=$!
|
||||
|
||||
# Set up the the proper authentication credentials needed by the Alternator
|
||||
# test. This requires connecting to Scylla with cqlsh - we'll wait up for
|
||||
# test. This requires connecting to Scylla with CQL - we'll wait up for
|
||||
# one minute for this to work:
|
||||
setup_authentication() {
|
||||
python3 -c 'from cassandra.cluster import Cluster; Cluster(["'$SCYLLA_IP'"]).connect().execute("INSERT INTO system_auth.roles (role, salted_hash) VALUES ('\''alternator'\'', '\''secret_pass'\'')")'
|
||||
}
|
||||
echo "Scylla is: $SCYLLA."
|
||||
echo -n "Booting Scylla..."
|
||||
ok=
|
||||
@@ -94,18 +108,18 @@ do
|
||||
summary="Error: Scylla failed to boot after $SECONDS seconds."
|
||||
break
|
||||
fi
|
||||
err=`"$CQLSH" -e "INSERT INTO system_auth.roles (role, salted_hash) VALUES ('alternator', 'secret_pass')" 2>&1` && ok=yes && break
|
||||
err=`setup_authentication 2>&1` && ok=yes && break
|
||||
case "$err" in
|
||||
"Connection error:"*)
|
||||
*NoHostAvailable:*)
|
||||
# This is what we expect while Scylla is still booting.
|
||||
;;
|
||||
*"command not found")
|
||||
summary="Error: need 'cqlsh' in your path, to configure Alternator authentication."
|
||||
*ImportError:*|*"command not found"*)
|
||||
summary="Error: need python3 and python3-cassandra-driver to configure Alternator authentication."
|
||||
echo
|
||||
echo $summary
|
||||
break;;
|
||||
*)
|
||||
summary="Unknown cqlsh error, can't set authentication credentials: '$err'"
|
||||
summary="Unknown error trying to set authentication credentials: '$err'"
|
||||
echo
|
||||
echo $summary
|
||||
break;;
|
||||
@@ -125,7 +139,8 @@ else
|
||||
fi
|
||||
|
||||
cd "$script_path"
|
||||
pytest "$@"
|
||||
set +e
|
||||
pytest --url $alternator_url "$@"
|
||||
code=$?
|
||||
case $code in
|
||||
0) summary="Alternator tests pass";;
|
||||
1
test/alternator/suite.yaml
Normal file
1
test/alternator/suite.yaml
Normal file
@@ -0,0 +1 @@
|
||||
type: Run
|
||||
@@ -100,6 +100,14 @@ def test_query_basic_restrictions(dynamodb, filled_test_table):
|
||||
print(got_items)
|
||||
assert multiset([item for item in items if item['p'] == 'long' and item['c'].startswith('11')]) == multiset(got_items)
|
||||
|
||||
def test_query_nonexistent_table(dynamodb):
|
||||
client = dynamodb.meta.client
|
||||
with pytest.raises(ClientError, match="ResourceNotFoundException"):
|
||||
client.query(TableName="i_do_not_exist", KeyConditions={
|
||||
'p' : {'AttributeValueList': ['long'], 'ComparisonOperator': 'EQ'},
|
||||
'c' : {'AttributeValueList': ['11'], 'ComparisonOperator': 'BEGINS_WITH'}
|
||||
})
|
||||
|
||||
def test_begins_with(dynamodb, test_table):
|
||||
paginator = dynamodb.meta.client.get_paginator('query')
|
||||
items = [{'p': 'unorthodox_chars', 'c': sort_key, 'str': 'a'} for sort_key in [u'ÿÿÿ', u'cÿbÿ', u'cÿbÿÿabg'] ]
|
||||
@@ -42,6 +42,11 @@ def test_scan_basic(filled_test_table):
|
||||
assert len(items) == len(got_items)
|
||||
assert multiset(items) == multiset(got_items)
|
||||
|
||||
def test_scan_nonexistent_table(dynamodb):
|
||||
client = dynamodb.meta.client
|
||||
with pytest.raises(ClientError, match="ResourceNotFoundException"):
|
||||
client.scan(TableName="i_do_not_exist")
|
||||
|
||||
def test_scan_with_paginator(dynamodb, filled_test_table):
|
||||
test_table, items = filled_test_table
|
||||
paginator = dynamodb.meta.client.get_paginator('scan')
|
||||
@@ -244,11 +244,12 @@ def test_table_streams_off(dynamodb):
|
||||
table.delete();
|
||||
# DynamoDB doesn't allow StreamSpecification to be empty map - if it
|
||||
# exists, it must have a StreamEnabled
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
table = create_test_table(dynamodb, StreamSpecification={},
|
||||
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
|
||||
table.delete();
|
||||
# Unfortunately, new versions of boto3 doesn't let us pass this...
|
||||
#with pytest.raises(ClientError, match='ValidationException'):
|
||||
# table = create_test_table(dynamodb, StreamSpecification={},
|
||||
# KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
# AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
|
||||
# table.delete();
|
||||
# Unfortunately, boto3 doesn't allow us to pass StreamSpecification=None.
|
||||
# This is what we had in issue #5796.
|
||||
|
||||
@@ -296,7 +296,9 @@ SEASTAR_TEST_CASE(test_commitlog_closed) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
||||
commitlog::config cfg;
|
||||
cfg.commitlog_segment_size_in_mb = 2;
|
||||
|
||||
constexpr auto max_size_mb = 2;
|
||||
cfg.commitlog_segment_size_in_mb = max_size_mb;
|
||||
cfg.commitlog_total_space_in_mb = 1;
|
||||
cfg.commitlog_sync_period_in_ms = 1;
|
||||
return cl_test(cfg, [](commitlog& log) {
|
||||
@@ -306,8 +308,15 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
||||
// add a flush handler that simply says we're done with the range.
|
||||
auto r = log.add_flush_handler([&log, sem, segments](cf_id_type id, replay_position pos) {
|
||||
*segments = log.get_active_segment_names();
|
||||
log.discard_completed_segments(id);
|
||||
sem->signal();
|
||||
// Verify #5899 - file size should not exceed the config max.
|
||||
return parallel_for_each(*segments, [](sstring filename) {
|
||||
return file_size(filename).then([](uint64_t size) {
|
||||
BOOST_REQUIRE_LE(size, max_size_mb * 1024 * 1024);
|
||||
});
|
||||
}).then([&log, sem, id] {
|
||||
log.discard_completed_segments(id);
|
||||
sem->signal();
|
||||
});
|
||||
});
|
||||
|
||||
auto set = make_lw_shared<std::set<segment_id_type>>();
|
||||
|
||||
@@ -930,17 +930,17 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -950,7 +950,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -958,9 +958,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -973,7 +973,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
return make_ready_future();
|
||||
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -992,7 +992,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -531,6 +531,43 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
});
|
||||
|
||||
{
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
|
||||
assert_that(res).is_rows().with_rows({{
|
||||
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
// Override the actual paging state with one with empty keys,
|
||||
// which is a valid paging state as well, and should return
|
||||
// no rows.
|
||||
paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
|
||||
std::nullopt, paging_state->get_remaining(), paging_state->get_query_uuid(),
|
||||
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
|
||||
paging_state->get_rows_fetched_for_last_partition());
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
|
||||
assert_that(res).is_rows().with_size(0);
|
||||
}
|
||||
|
||||
{
|
||||
// An artificial paging state with an empty key pair is also valid and is expected
|
||||
// not to return rows (since no row matches an empty partition key)
|
||||
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
|
||||
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
|
||||
assert_that(res).is_rows().with_size(0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -5256,3 +5256,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
test_sstable_log_too_many_rows_f(random, (random + 1), false);
|
||||
test_sstable_log_too_many_rows_f((random + 1), random, true);
|
||||
}
|
||||
|
||||
// The following test runs on test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection
|
||||
// It was created using Scylla 3.0.x using the following CQL statements:
|
||||
//
|
||||
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||
// CREATE TYPE ks.ut (a int, b int);
|
||||
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
|
||||
// m map<int, frozen<ut>>,
|
||||
// fm frozen<map<int, frozen<ut>>>,
|
||||
// mm map<int, frozen<map<int, frozen<ut>>>>,
|
||||
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
|
||||
// s set<frozen<ut>>,
|
||||
// fs frozen<set<frozen<ut>>>,
|
||||
// l list<frozen<ut>>,
|
||||
// fl frozen<list<frozen<ut>>>
|
||||
// ) WITH compression = {};
|
||||
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
|
||||
// m[0] = {a: 0, b: 0},
|
||||
// fm = {0: {a: 0, b: 0}},
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
// fs = {{a: 0, b: 0}},
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
// fl = [{a: 0, b: 0}]
|
||||
// WHERE pk = 0;
|
||||
//
|
||||
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
|
||||
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
|
||||
|
||||
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
|
||||
"test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection";
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
|
||||
auto abj = defer([] { await_background_jobs().get(); });
|
||||
|
||||
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
|
||||
{to_bytes("a"), to_bytes("b")},
|
||||
{int32_type, int32_type}, false);
|
||||
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
|
||||
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
|
||||
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
|
||||
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
|
||||
auto s_type = set_type_impl::get_instance(ut, true);
|
||||
auto fs_type = set_type_impl::get_instance(ut, false);
|
||||
auto l_type = list_type_impl::get_instance(ut, true);
|
||||
auto fl_type = list_type_impl::get_instance(ut, false);
|
||||
|
||||
auto s = schema_builder("ks", "t")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("m", m_type)
|
||||
.with_column("fm", fm_type)
|
||||
.with_column("mm", mm_type)
|
||||
.with_column("fmm", fmm_type)
|
||||
.with_column("s", s_type)
|
||||
.with_column("fs", fs_type)
|
||||
.with_column("l", l_type)
|
||||
.with_column("fl", fl_type)
|
||||
.set_compressor_params(compression_parameters::no_compression())
|
||||
.build();
|
||||
|
||||
auto m_cdef = s->get_column_definition(to_bytes("m"));
|
||||
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
|
||||
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
|
||||
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
|
||||
auto s_cdef = s->get_column_definition(to_bytes("s"));
|
||||
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
|
||||
auto l_cdef = s->get_column_definition(to_bytes("l"));
|
||||
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
|
||||
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
|
||||
|
||||
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
|
||||
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
|
||||
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
|
||||
auto fs_val = make_set_value(fs_type, {ut_val});
|
||||
auto fl_val = make_list_value(fl_type, {ut_val});
|
||||
|
||||
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
|
||||
auto ckey = clustering_key::make_empty();
|
||||
|
||||
// m[0] = {a: 0, b: 0}
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
|
||||
}
|
||||
|
||||
// fm = {0: {a: 0, b: 0}}
|
||||
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
|
||||
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
|
||||
}
|
||||
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
|
||||
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(ut->decompose(ut_val),
|
||||
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
|
||||
}
|
||||
|
||||
// fs = {{a: 0, b: 0}},
|
||||
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
|
||||
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
|
||||
}
|
||||
|
||||
// fl = [{a: 0, b: 0}]
|
||||
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
|
||||
|
||||
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
|
||||
sst.load();
|
||||
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
|
||||
}
|
||||
|
||||
@@ -397,9 +397,6 @@ public:
|
||||
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
|
||||
cfg->num_tokens.set(256);
|
||||
cfg->ring_delay_ms.set(500);
|
||||
auto features = cfg->experimental_features();
|
||||
features.emplace_back(db::experimental_features_t::LWT);
|
||||
cfg->experimental_features(features);
|
||||
cfg->shutdown_announce_in_ms.set(0);
|
||||
cfg->broadcast_to_all_shards().get();
|
||||
create_directories((data_dir_path + "/system").c_str());
|
||||
@@ -439,7 +436,6 @@ public:
|
||||
|
||||
gms::feature_config fcfg;
|
||||
fcfg.enable_cdc = true;
|
||||
fcfg.enable_lwt = true;
|
||||
fcfg.enable_sstables_mc_format = true;
|
||||
if (cfg->enable_user_defined_functions()) {
|
||||
fcfg.enable_user_defined_functions = true;
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3519784297
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
Scylla.db
|
||||
CRC.db
|
||||
Filter.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Index.db
|
||||
Summary.db
|
||||
Data.db
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-31-20200128
|
||||
docker.io/scylladb/scylla-toolchain:fedora-31-20200402
|
||||
|
||||
Reference in New Issue
Block a user