Compare commits

...

21 Commits

Author SHA1 Message Date
Avi Kivity
2edc1ca039 tools: toolchain: update to Fedora 33 with clang 11
Update the toolchain to Fedora 33 with clang 11 (note the
build still uses gcc).

The image now creates a /root/.m2/repository directory; without
this the tools/jmx build fails on aarch64.

Add java-1.8.0-openjdk-devel since that is where javac lives now.
Add a JAVA8_HOME environment variable; wihtout this ant is not
able to find javac.

The toolchain is enabled for x86_64 and aarch64.
2020-10-28 17:02:53 +02:00
Avi Kivity
5ff5d43c7a Update tools/java submodule
* tools/java e97c106047...ad48b44a26 (1):
  > build: Add generated Thrift sources to multi-Java build
2020-10-28 16:52:25 +02:00
Pavel Emelyanov
b2ce3b197e allocation_strategy: Fix standard_migrator initialization
This is the continuation of 30722b8c8e, so let me re-cite Rafael:

    The constructors of these global variables can allocate memory. Since
    the variables are thread_local, they are initialized at first use.

    There is nothing we can do if these allocations fail, so use
    disable_failure_guard.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20201028140553.21709-1-xemul@scylladb.com>
2020-10-28 16:22:23 +02:00
Asias He
289a08072a repair: Make repair_writer a shared pointer
The future of the fiber that writes data into sstables inside
the repair_writer is stored in _writer_done like below:

class repair_writer {
   _writer_done[node_idx] =
      mutation_writer::distribute_reader_and_consume_on_shards().then([this] {
         ...
      }).handle_exception([this] {
         ...
      });
}

The fiber access repair_writer object in the error handling path. We
wait for the _writer_done to finish before we destroy repair_meta
object which contains the repair_writer object to avoid the fiber
accessing already freed repair_writer object.

To be safer, we can make repair_writer a shared pointer and take a
reference in the distribute_reader_and_consume_on_shards code path.

Fixes #7406

Closes #7430
2020-10-28 16:22:23 +02:00
Avi Kivity
4b9206a180 install: abort if LD_PRELOAD is set when executing a relocatable binary
LD_PRELOAD libraries usually have dependencies in the host system,
which they will not have access to in a relocatable environment
since we use a different libc. Detect that LD_PRELOAD is in use and if
so, abort with an error.

Fixes #7493.

Closes #7494
2020-10-28 16:22:23 +02:00
Avi Kivity
2a42fc5cde build: supply linker flags only to the linker, not the compiler
Clang complains if it sees linker-only flags when called for compilation,
so move the compile-time flags from cxx_ld_flags to cxxflags, and remove
cxx_ld_flags from the compiler command line.

The linker flags are also passed to Seastar so that the build-id and
interpreter hacks still apply to iotune.

Closes #7466
2020-10-28 16:22:23 +02:00
Avi Kivity
fc15d0a4be build: relocatable package: exclude tools/python3
python3 has its own relocatable package, no need to include it
in scylla-package.tar.gz.

Python has its own relocatable package, so packaging it in scylla-package.ta

Closes #7467
2020-10-28 16:22:23 +02:00
Avi Kivity
6eb3ba74e4 Update tools/java submodule
* tools/java f2e8666d7e...e97c106047 (1):
  > Relocatable Package: create product prefixed relocatable archive
2020-10-28 08:47:49 +02:00
Juliusz Stasiewicz
e0176bccab create_table_statement: Disallow default TTL on counter tables
In such attempt `invalid_request_exception` is thrown.
Also, simple CQL test is added.

Fixes #6879
2020-10-27 22:44:02 +02:00
Nadav Har'El
92b741b4ff alternator test: more tests for disabled streams and closed shards
We already have a test for the behavior of a closed shard and how
iterators previously created for it are still valid. In this patch
we add to this also checking that the shard id itself, not just the
iterator, is still valid.

Additionally, although the aforementioned test used a disabled stream
to create a closed shard, it was not a complete test for the behavior
of a disabled stream, and this patch adds such a test. We check that
although the stream is disabled, it is still fully usable (for 24 hours) -
its original ARN is still listed on ListStreams, the ARN is still usable,
its shards can be listed, all are marked as closed but still fully readable.

Both tests pass on DynamoDB, and xfail on Alternator because of
issue #7239 - CDC drops the CDC log table as soon as CDC is disabled,
so the stream data is lost immediately instead of being retained for
24 hours.

Refs #7239

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20201006183915.434055-1-nyh@scylladb.com>
2020-10-27 22:44:02 +02:00
Nadav Har'El
a57d4c0092 docs: clean up format of docs/alternator/getting-started.md
In https://github.com/scylladb/scylla-docs/pull/3105 it was noted that
the Sphynx document parser doesn't like a horizontal line ("---") in
the beginning of a section. Since there is no real reason why we must
have this horizontal line, let's just remove it.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20201001151312.261825-1-nyh@scylladb.com>
2020-10-27 22:44:02 +02:00
Avi Kivity
e2a02f15c2 Merge 'transport/system_ks: Add more info to system.clients' from Juliusz Stasiewicz
This patch fills the following columns in `system.clients` table:
* `connection_stage`
* `driver_name`
* `driver_version`
* `protocol_version`

It also improves:
* `client_type` - distinguishes cql from thrift just in case
* `username` - now it displays correct username iff `PasswordAuthenticator` is configured.

What is still missing:
* SSL params (I'll happily get some advice here)
* `hostname` - I didn't find it in tested drivers

Refs #6946

Closes #7349

* github.com:scylladb/scylla:
  transport: Update `connection_stage` in `system.clients`
  transport: Retrieve driver's name and version from STARTUP message
  transport: Notify `system.clients` about "protocol_version"
  transport: On successful authentication add `username` to system.clients
2020-10-27 22:44:02 +02:00
Amnon Heiman
52db99f25f scyllatop/livedata.py: Safe iteration over metrics
This patch change the code that iterates over the metrics to use a copy
of the metrics names to make it safe to remove the metrics from the
metrics object.

Fixes #7488

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
2020-10-27 22:44:02 +02:00
Calle Wilund
1bc96a5785 alternator::streams: Make describe_stream use actual log ttl as window
Allows QA to bypass the normal hardcoded 24h ttl of data and still
get "proper" behaviour w.r.t. available stream set/generations.
I.e. can manually change cdc ttl option for alternator table after
streams enabled. Should not be exposed, but perhaps useful for
testing.

Closes #7483
2020-10-26 12:16:36 +02:00
Calle Wilund
4b65d67a1a partition_version: Change range_tombstones() to return chunked_vector
Refs #7364

The number of tombstones can be large. As a stopgap measure to
just returning a source range (with keepalive), we can at least
alleviate the problem by using a chunked vector.

Closes #7433
2020-10-26 11:54:42 +02:00
Benny Halevy
82aabab054 table: get rid of reshuffle_sstables
It is unused since 7351db7cab

Refs #6950

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20201026074914.34721-1-bhalevy@scylladb.com>
2020-10-26 09:50:21 +02:00
Calle Wilund
46ea8c9b8b cdc: Add an "end-of-record" column to
Fixes #7435

Adds an "eor" (end-of-record) column to cdc log. This is non-null only on
last-in-timestamp group rows, i.e. end of a singular source "event".

A client can use this as a shortcut to knowing whether or not he has a
full cdc "record" for a given source mutation (single row change).

Closes #7436
2020-10-26 09:39:27 +02:00
Juliusz Stasiewicz
0251cb9b31 transport: Update connection_stage in system.clients 2020-10-12 18:44:00 +02:00
Juliusz Stasiewicz
6abe1352ba transport: Retrieve driver's name and version from STARTUP message 2020-10-12 18:37:19 +02:00
Juliusz Stasiewicz
d2d162ece3 transport: Notify system.clients about "protocol_version" 2020-10-12 18:32:00 +02:00
Juliusz Stasiewicz
acf0341e9b transport: On successful authentication add username to system.clients
The username becomes known in the course of resolving challenges
from `PasswordAuthenticator`. That's why username is being set on
successful authentication; until then all users are "anonymous".
Meanwhile, `AllowAllAuthenticator` (the default) does not request
username, so users logged with it will remain as "anonymous" in
`system.clients`.

Shuffling of code was necessary to unify existing infrastructure
for INSERTing entries into `system.clients` with later UPDATEs.
2020-10-06 18:52:46 +02:00
30 changed files with 403 additions and 111 deletions

View File

@@ -475,6 +475,8 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
status = "ENABLED";
}
}
auto ttl = std::chrono::seconds(opts.ttl());
rjson::set(stream_desc, "StreamStatus", rjson::from_string(status));
@@ -498,10 +500,10 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
// cannot really "resume" query, must iterate all data. because we cannot query neither "time" (pk) > something,
// or on expired...
// TODO: maybe add secondary index to topology table to enable this?
return _sdks.cdc_get_versioned_streams({ tm.count_normal_token_owners() }).then([this, &db, schema, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc)](std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
return _sdks.cdc_get_versioned_streams({ tm.count_normal_token_owners() }).then([this, &db, schema, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc), ttl](std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
// filter out cdc generations older than the table or now() - dynamodb_streams_max_window (24h)
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - dynamodb_streams_max_window);
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
auto i = topologies.lower_bound(low_ts);
// need first gen _intersecting_ the timestamp.

View File

@@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
b.set_caching_options(caching_options::get_disabled_caching_options());
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
@@ -880,14 +881,26 @@ public:
return _base_schema;
}
clustering_key create_ck(int batch) const {
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
}
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
// The numbering of batch sequence numbers starts from 0.
clustering_key allocate_new_log_row() {
auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) });
auto log_ck = create_ck(_batch_no++);
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
return log_ck;
}
bool has_rows() const {
return _batch_no != 0;
}
clustering_key last_row_key() const {
return create_ck(_batch_no - 1);
}
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
clustering_key allocate_new_log_row(operation op) {
auto log_ck = allocate_new_log_row();
@@ -944,6 +957,11 @@ public:
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
}
void end_record() {
if (has_rows()) {
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
}
}
private:
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
size_t pos = 0;
@@ -1519,6 +1537,11 @@ public:
cdc::inspect_mutation(m, v);
}
void end_record() override {
assert(_builder);
_builder->end_record();
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {

View File

@@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
processor.produce_postimage(&ck);
}
}
processor.end_record();
}
}
@@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro
processor.produce_postimage(&cr.key());
}
}
processor.end_record();
}
} // namespace cdc

View File

@@ -77,6 +77,10 @@ public:
// both columns have different timestamp or TTL set.
// m - the small mutation to be converted into CDC log rows.
virtual void process_change(const mutation& m) = 0;
// Tells processor we have reached end of record - last part
// of a given timestamp batch
virtual void end_record() = 0;
};
bool should_split(const mutation& base_mutation);

View File

@@ -257,18 +257,18 @@ modes = {
'stack-usage-threshold': 1024*40,
},
'release': {
'cxxflags': '',
'cxx_ld_flags': '-O3 -ffunction-sections -fdata-sections -Wl,--gc-sections',
'cxxflags': '-O3 -ffunction-sections -fdata-sections ',
'cxx_ld_flags': '-Wl,--gc-sections',
'stack-usage-threshold': 1024*13,
},
'dev': {
'cxxflags': '-DSEASTAR_ENABLE_ALLOC_FAILURE_INJECTION -DSCYLLA_ENABLE_ERROR_INJECTION',
'cxx_ld_flags': '-O1',
'cxxflags': '-O1 -DSEASTAR_ENABLE_ALLOC_FAILURE_INJECTION -DSCYLLA_ENABLE_ERROR_INJECTION',
'cxx_ld_flags': '',
'stack-usage-threshold': 1024*21,
},
'sanitize': {
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
'cxx_ld_flags': '-Os',
'cxxflags': '-Os -DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
'cxx_ld_flags': '',
'stack-usage-threshold': 1024*50,
}
}
@@ -1167,11 +1167,11 @@ optimization_flags = [
optimization_flags = [o
for o in optimization_flags
if flag_supported(flag=o, compiler=args.cxx)]
modes['release']['cxx_ld_flags'] += ' ' + ' '.join(optimization_flags)
modes['release']['cxxflags'] += ' ' + ' '.join(optimization_flags)
if flag_supported(flag='-Wstack-usage=4096', compiler=args.cxx):
for mode in modes:
modes[mode]['cxx_ld_flags'] += f' -Wstack-usage={modes[mode]["stack-usage-threshold"]} -Wno-error=stack-usage='
modes[mode]['cxxflags'] += f' -Wstack-usage={modes[mode]["stack-usage-threshold"]} -Wno-error=stack-usage='
linker_flags = linker_flags(compiler=args.cxx)
@@ -1338,6 +1338,13 @@ libdeflate_cflags = seastar_cflags
MODE_TO_CMAKE_BUILD_TYPE = {'release' : 'RelWithDebInfo', 'debug' : 'Debug', 'dev' : 'Dev', 'sanitize' : 'Sanitize' }
# cmake likes to separate things with semicolons
def semicolon_separated(*flags):
# original flags may be space separated, so convert to string still
# using spaces
f = ' '.join(flags)
return re.sub(' +', ';', f)
def configure_seastar(build_dir, mode):
seastar_build_dir = os.path.join(build_dir, mode, 'seastar')
@@ -1346,8 +1353,8 @@ def configure_seastar(build_dir, mode):
'-DCMAKE_C_COMPILER={}'.format(args.cc),
'-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
'-DCMAKE_EXPORT_NO_PACKAGE_REGISTRY=ON',
'-DSeastar_CXX_FLAGS={}'.format((seastar_cflags + ' ' + modes[mode]['cxx_ld_flags']).replace(' ', ';')),
'-DSeastar_LD_FLAGS={}'.format(seastar_ldflags),
'-DSeastar_CXX_FLAGS={}'.format((seastar_cflags).replace(' ', ';')),
'-DSeastar_LD_FLAGS={}'.format(semicolon_separated(seastar_ldflags, modes[mode]['cxx_ld_flags'])),
'-DSeastar_CXX_DIALECT=gnu++20',
'-DSeastar_API_LEVEL=6',
'-DSeastar_UNUSED_RESULT_ERROR=ON',

View File

@@ -20,44 +20,47 @@
*/
#include "connection_notifier.hh"
#include "db/query_context.hh"
#include "cql3/constants.hh"
#include "database.hh"
#include "service/storage_proxy.hh"
#include <stdexcept>
namespace db::system_keyspace {
extern const char *const CLIENTS;
}
static sstring to_string(client_type ct) {
sstring to_string(client_type ct) {
switch (ct) {
case client_type::cql: return "cql";
case client_type::thrift: return "thrift";
case client_type::alternator: return "alternator";
default: throw std::runtime_error("Invalid client_type");
}
throw std::runtime_error("Invalid client_type");
}
static sstring to_string(client_connection_stage ccs) {
switch (ccs) {
case client_connection_stage::established: return connection_stage_literal<client_connection_stage::established>;
case client_connection_stage::authenticating: return connection_stage_literal<client_connection_stage::authenticating>;
case client_connection_stage::ready: return connection_stage_literal<client_connection_stage::ready>;
}
throw std::runtime_error("Invalid client_connection_stage");
}
future<> notify_new_client(client_data cd) {
// FIXME: consider prepared statement
const static sstring req
= format("INSERT INTO system.{} (address, port, client_type, shard_id, protocol_version, username) "
"VALUES (?, ?, ?, ?, ?, ?);", db::system_keyspace::CLIENTS);
= format("INSERT INTO system.{} (address, port, client_type, connection_stage, shard_id, protocol_version, username) "
"VALUES (?, ?, ?, ?, ?, ?, ?);", db::system_keyspace::CLIENTS);
return db::execute_cql(req,
std::move(cd.ip), cd.port, to_string(cd.ct), cd.shard_id,
std::move(cd.ip), cd.port, to_string(cd.ct), to_string(cd.connection_stage), cd.shard_id,
cd.protocol_version.has_value() ? data_value(*cd.protocol_version) : data_value::make_null(int32_type),
cd.username.value_or("anonymous")).discard_result();
}
future<> notify_disconnected_client(gms::inet_address addr, client_type ct, int port) {
future<> notify_disconnected_client(net::inet_address addr, int port, client_type ct) {
// FIXME: consider prepared statement
const static sstring req
= format("DELETE FROM system.{} where address=? AND port=? AND client_type=?;",
db::system_keyspace::CLIENTS);
return db::execute_cql(req, addr.addr(), port, to_string(ct)).discard_result();
return db::execute_cql(req, std::move(addr), port, to_string(ct)).discard_result();
}
future<> clear_clientlist() {

View File

@@ -20,27 +20,65 @@
*/
#pragma once
#include "gms/inet_address.hh"
#include "db/query_context.hh"
#include <seastar/net/inet_address.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include <optional>
namespace db::system_keyspace {
extern const char *const CLIENTS;
}
enum class client_type {
cql = 0,
thrift,
alternator,
};
sstring to_string(client_type ct);
enum class changed_column {
username = 0,
connection_stage,
driver_name,
driver_version,
hostname,
protocol_version,
};
template <changed_column column> constexpr const char* column_literal = "";
template <> constexpr const char* column_literal<changed_column::username> = "username";
template <> constexpr const char* column_literal<changed_column::connection_stage> = "connection_stage";
template <> constexpr const char* column_literal<changed_column::driver_name> = "driver_name";
template <> constexpr const char* column_literal<changed_column::driver_version> = "driver_version";
template <> constexpr const char* column_literal<changed_column::hostname> = "hostname";
template <> constexpr const char* column_literal<changed_column::protocol_version> = "protocol_version";
enum class client_connection_stage {
established = 0,
authenticating,
ready,
};
template <client_connection_stage ccs> constexpr const char* connection_stage_literal = "";
template <> constexpr const char* connection_stage_literal<client_connection_stage::established> = "ESTABLISHED";
template <> constexpr const char* connection_stage_literal<client_connection_stage::authenticating> = "AUTHENTICATING";
template <> constexpr const char* connection_stage_literal<client_connection_stage::ready> = "READY";
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
struct client_data {
gms::inet_address ip;
net::inet_address ip;
int32_t port;
client_type ct;
client_connection_stage connection_stage = client_connection_stage::established;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
// `optional' column means that it's nullable (possibly because it's
// unimplemented yet). If you want to fill ("implement") any of them,
// remember to update the query in `notify_new_client()'.
std::optional<sstring> connection_stage;
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<sstring> hostname;
@@ -52,6 +90,17 @@ struct client_data {
};
future<> notify_new_client(client_data cd);
future<> notify_disconnected_client(gms::inet_address addr, client_type ct, int port);
future<> notify_disconnected_client(net::inet_address addr, int port, client_type ct);
future<> clear_clientlist();
template <changed_column column_enum_val>
struct notify_client_change {
template <typename T>
future<> operator()(net::inet_address addr, int port, client_type ct, T&& value) {
const static sstring req
= format("UPDATE system.{} SET {}=? WHERE address=? AND port=? AND client_type=?;",
db::system_keyspace::CLIENTS, column_literal<column_enum_val>);
return db::execute_cql(req, std::forward<T>(value), std::move(addr), port, to_string(ct)).discard_result();
}
};

View File

@@ -204,6 +204,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
}
_properties.validate(db, _properties.properties()->make_schema_extensions(db.extensions()));
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
auto stmt = ::make_shared<create_table_statement>(_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
@@ -211,6 +212,11 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
for (auto&& entry : _definitions) {
::shared_ptr<column_identifier> id = entry.first;
cql3_type pt = entry.second->prepare(db, keyspace());
if (has_default_ttl && pt.is_counter()) {
throw exceptions::invalid_request_exception("Cannot set default_time_to_live on a table with counters");
}
if (pt.get_type()->is_multi_cell()) {
if (pt.get_type()->is_user_type()) {
// check for multi-cell types (non-frozen UDTs or collections) inside a non-frozen UDT

View File

@@ -765,21 +765,6 @@ public:
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
// Make sure the generation numbers are sequential, starting from "start".
// Generations before "start" are left untouched.
//
// Return the highest generation number seen so far
//
// Word of warning: although this function will reshuffle anything over "start", it is
// very dangerous to do that with live SSTables. This is meant to be used with SSTables
// that are not yet managed by the system.
//
// Parameter all_generations stores the generation of all SSTables in the system, so it
// will be easy to determine which SSTable is new.
// An example usage would query all shards asking what is the highest SSTable number known
// to them, and then pass that + 1 as "start".
future<std::vector<sstables::entry_descriptor>> reshuffle_sstables(std::set<int64_t> all_generations, int64_t start);
// FIXME: this is just an example, should be changed to something more
// general. compact_all_sstables() starts a compaction of all sstables.
// It doesn't flush the current memtable first. It's just a ad-hoc method,

View File

@@ -1,5 +1,5 @@
# Getting Started With ScyllaDB Alternator
---
## Installing Scylla
Before you can start using ScyllaDB Alternator, you will have to have an up
and running scylla cluster configured to expose the alternator port.

View File

@@ -28,6 +28,7 @@ else
fi
debian_base_packages=(
clang
liblua5.3-dev
python3-pyparsing
python3-colorama
@@ -44,6 +45,7 @@ debian_base_packages=(
)
fedora_packages=(
clang
lua-devel
yaml-cpp-devel
thrift-devel
@@ -57,6 +59,7 @@ fedora_packages=(
python
sudo
java-1.8.0-openjdk-headless
java-1.8.0-openjdk-devel
ant
ant-junit
maven

View File

@@ -106,6 +106,7 @@ adjust_bin() {
"$root/$prefix/libexec/$bin"
cat > "$root/$prefix/bin/$bin" <<EOF
#!/bin/bash -e
[[ -z "\$LD_PRELOAD" ]] || { echo "\$0: not compatible with LD_PRELOAD" >&2; exit 110; }
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
export LD_LIBRARY_PATH="$prefix/libreloc"
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
@@ -130,6 +131,7 @@ relocate_python3() {
cp "$script" "$relocateddir"
cat > "$install"<<EOF
#!/usr/bin/env bash
[[ -z "\$LD_PRELOAD" ]] || { echo "\$0: not compatible with LD_PRELOAD" >&2; exit 110; }
export LC_ALL=en_US.UTF-8
x="\$(readlink -f "\$0")"
b="\$(basename "\$x")"

View File

@@ -542,12 +542,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
return partition_snapshot_ptr(std::move(snp));
}
std::vector<range_tombstone>
partition_snapshot::range_tombstone_result
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end)
{
partition_version* v = &*version();
if (!v->next()) {
return boost::copy_range<std::vector<range_tombstone>>(
return boost::copy_range<range_tombstone_result>(
v->partition().row_tombstones().slice(*_schema, start, end));
}
range_tombstone_list list(*_schema);
@@ -557,10 +557,10 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_
}
v = v->next();
}
return boost::copy_range<std::vector<range_tombstone>>(list.slice(*_schema, start, end));
return boost::copy_range<range_tombstone_result>(list.slice(*_schema, start, end));
}
std::vector<range_tombstone>
partition_snapshot::range_tombstone_result
partition_snapshot::range_tombstones()
{
return range_tombstones(

View File

@@ -26,6 +26,7 @@
#include "utils/anchorless_list.hh"
#include "utils/logalloc.hh"
#include "utils/coroutine.hh"
#include "utils/chunked_vector.hh"
#include <boost/intrusive/parent_from_member.hpp>
#include <boost/intrusive/slist.hpp>
@@ -400,10 +401,13 @@ public:
::static_row static_row(bool digest_requested) const;
bool static_row_continuous() const;
mutation_partition squashed() const;
using range_tombstone_result = utils::chunked_vector<range_tombstone>;
// Returns range tombstones overlapping with [start, end)
std::vector<range_tombstone> range_tombstones(position_in_partition_view start, position_in_partition_view end);
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
// Returns all range tombstones
std::vector<range_tombstone> range_tombstones();
range_tombstone_result range_tombstones();
};
class partition_snapshot_ptr {

View File

@@ -509,7 +509,7 @@ public:
}
};
class repair_writer {
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
schema_ptr _schema;
reader_permit _permit;
uint64_t _estimated_partitions;
@@ -569,8 +569,9 @@ public:
table& t = db.local().find_column_family(_schema->id());
auto [queue_reader, queue_handle] = make_queue_reader(_schema, _permit);
_mq[node_idx] = std::move(queue_handle);
auto writer = shared_from_this();
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions, writer] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
@@ -598,13 +599,13 @@ public:
return consumer(std::move(reader));
});
},
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
t.stream_in_progress()).then([node_idx, writer] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
_schema->ks_name(), _schema->cf_name(), partitions);
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
writer->_schema->ks_name(), writer->_schema->cf_name(), partitions);
}).handle_exception([node_idx, writer] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
_mq[node_idx]->abort(ep);
writer->_schema->ks_name(), writer->_schema->cf_name(), ep);
writer->_mq[node_idx]->abort(ep);
return make_exception_future<>(std::move(ep));
});
}
@@ -718,7 +719,7 @@ private:
size_t _nr_peer_nodes= 1;
repair_stats _stats;
repair_reader _repair_reader;
repair_writer _repair_writer;
lw_shared_ptr<repair_writer> _repair_writer;
// Contains rows read from disk
std::list<repair_row> _row_buf;
// Contains rows we are working on to sync between peers
@@ -822,7 +823,7 @@ public:
_seed,
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
)
, _repair_writer(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason)
, _repair_writer(make_lw_shared<repair_writer>(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&ms] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
return ms.local().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
@@ -855,7 +856,7 @@ public:
auto f2 = _sink_source_for_get_row_diff.close();
auto f3 = _sink_source_for_put_row_diff.close();
return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
return _repair_writer.wait_for_writer_done();
return _repair_writer->wait_for_writer_done();
});
}
@@ -1340,8 +1341,8 @@ private:
future<> do_apply_rows(std::list<repair_row>&& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list<repair_row>& row_diff) {
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
_repair_writer.create_writer(_db, node_idx);
return with_semaphore(_repair_writer->sem(), 1, [this, node_idx, update_buf, &row_diff] {
_repair_writer->create_writer(_db, node_idx);
return repeat([this, node_idx, update_buf, &row_diff] () mutable {
if (row_diff.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -1355,7 +1356,7 @@ private:
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
return _repair_writer->do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
row_diff.pop_front();
return make_ready_future<stop_iteration>(stop_iteration::no);
});

View File

@@ -154,7 +154,9 @@ ar.reloc_add('licenses')
ar.reloc_add('swagger-ui')
ar.reloc_add('api')
def exclude_submodules(tarinfo):
if tarinfo.name in ('scylla/tools/jmx', 'scylla/tools/java'):
if tarinfo.name in ('scylla/tools/jmx',
'scylla/tools/java',
'scylla/tools/python3'):
return None
return tarinfo
ar.reloc_add('tools', filter=exclude_submodules)

View File

@@ -118,6 +118,7 @@ private:
private volatile String keyspace;
#endif
std::optional<auth::authenticated_user> _user;
std::optional<sstring> _driver_name, _driver_version;
auth_state _auth_state = auth_state::UNINITIALIZED;
@@ -147,6 +148,20 @@ public:
_auth_state = new_state;
}
std::optional<sstring> get_driver_name() const {
return _driver_name;
}
void set_driver_name(sstring driver_name) {
_driver_name = std::move(driver_name);
}
std::optional<sstring> get_driver_version() const {
return _driver_version;
}
void set_driver_version(sstring driver_version) {
_driver_version = std::move(driver_version);
}
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
: _is_internal(false)
, _is_thrift(thrift)

View File

@@ -852,33 +852,6 @@ table::stop() {
});
}
future<std::vector<sstables::entry_descriptor>>
table::reshuffle_sstables(std::set<int64_t> all_generations, int64_t start) {
struct work {
std::set<int64_t> all_generations; // Stores generation of all live sstables in the system.
work(int64_t start, std::set<int64_t> gens)
: all_generations(gens) {}
};
return do_with(work(start, std::move(all_generations)), [this] (work& work) {
tlogger.info("Reshuffling SSTables in {}...", _config.datadir);
return lister::scan_dir(_config.datadir, { directory_entry_type::regular }, [this, &work] (fs::path parent_dir, directory_entry de) {
auto comps = sstables::entry_descriptor::make_descriptor(parent_dir.native(), de.name);
if (comps.component != component_type::TOC) {
return make_ready_future<>();
}
// Skip generations that were already loaded by Scylla at a previous stage.
if (work.all_generations.contains(comps.generation)) {
return make_ready_future<>();
}
return make_exception_future<>(std::runtime_error("Loading SSTables from the main SSTable directory is unsafe and no longer supported."
" You will find a directory called upload/ inside the table directory that can be used to load new SSTables into the system"));
}, &sstables::manifest_json_filter).then([&work] {
return make_ready_future<std::vector<sstables::entry_descriptor>>();
});
});
}
void table::set_metrics() {
auto cf = column_family_label(_schema->cf_name());
auto ks = keyspace_label(_schema->ks_name());

View File

@@ -1270,7 +1270,7 @@ def test_stream_specification(test_table_stream_with_result, dynamodbstreams):
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
@@ -1286,7 +1286,7 @@ def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
# eventually *one* of the stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Found the shard with the data! Test that it only has
@@ -1301,10 +1301,105 @@ def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert len(response['Records']) == 0
assert not 'NextShardIterator' in response
# Until now we verified that we can read the closed shard
# using an old iterator. Let's test now that the closed
# shard id is also still valid, and a new iterator can be
# created for it, and the old data can be read from it:
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert len(response['Records']) == 1
return
time.sleep(0.5)
pytest.fail("timed out")
# In the above test (test_streams_closed_read) we used a disabled stream as
# a means to generate a closed shard, and tested the behavior of that closed
# shard. In the following test, we do more extensive testing on the the
# behavior of a disabled stream and verify that it is sill usable (for 24
# hours), reproducing issue #7239: The disabled stream's ARN should still be
# listed for the table, this ARN should continue to work, listing the
# stream's shards should give an indication that they are all closed - but
# all these shards should still be readable.
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
def test_streams_disabled_stream(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :x', ExpressionAttributeValues={':x': 5})
# Wait for this one update to become available in the stream before we
# disable the stream. Otherwise, theoretically (although unlikely in
# practice) we may disable the stream before the update was saved to it.
timeout = time.time() + 15
found = False
while time.time() < timeout and not found:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) > 0:
found = True
break
time.sleep(0.5)
assert found
# Disable streaming for this table. Note that the test_table_ss_keys_only
# fixture has "function" scope so it is fine to ruin table, it will not
# be used in other tests.
disable_stream(dynamodbstreams, table)
# Check that the stream ARN which we previously got for the disabled
# stream is still listed by ListStreams
arns = [stream['StreamArn'] for stream in dynamodbstreams.list_streams(TableName=table.name)['Streams']]
assert arn in arns
# DescribeStream on the disabled stream still works and lists its shards.
# All these shards are listed as being closed (i.e., should have
# EndingSequenceNumber). The basic details of the stream (e.g., the view
# type) are available and the status of the stream is DISABLED.
response = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
assert response['StreamStatus'] == 'DISABLED'
assert response['StreamViewType'] == 'KEYS_ONLY'
assert response['TableName'] == table.name
shards_info = response['Shards']
while 'LastEvaluatedShardId' in response:
response = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
assert response['StreamStatus'] == 'DISABLED'
assert response['StreamViewType'] == 'KEYS_ONLY'
assert response['TableName'] == table.name
shards_info.extend(response['Shards'])
print('Number of shards in stream: {}'.format(len(shards_info)))
for shard in shards_info:
assert 'EndingSequenceNumber' in shard['SequenceNumberRange']
assert shard['SequenceNumberRange']['EndingSequenceNumber'].isdecimal()
# We can get TRIM_HORIZON iterators for all these shards, to read all
# the old data they still have (this data should be saved for 24 hours
# after the stream was disabled)
iterators = []
for shard in shards_info:
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'])
# We can read the one change we did in one of these iterators. The data
# should be available immediately - no need for retries with timeout.
nrecords = 0
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response:
nrecords += len(response['Records'])
# The shard is closed, so NextShardIterator should either be missing
# now, indicating that it is a closed shard (DynamoDB does this),
# or, it may (and currently does in Alternator) return an iterator
# and reading from *that* iterator should then tell us that
# we reached the end of the shard (i.e., zero results and
# missing NextShardIterator).
if 'NextShardIterator' in response:
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert len(response['Records']) == 0
assert not 'NextShardIterator' in response
assert nrecords == 1
# TODO: tests on multiple partitions
# TODO: write a test that disabling the stream and re-enabling it works, but

View File

@@ -326,6 +326,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
// cdc log clustering key
assert_has_column(cdc::log_meta_column_name("operation"), byte_type);
assert_has_column(cdc::log_meta_column_name("ttl"), long_type);
assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type);
// pk
assert_has_column(cdc::log_data_column_name("pk"), int32_type);
@@ -534,6 +535,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
auto val_index = column_index(*rows, cdc::log_data_column_name("val"));
auto val2_index = column_index(*rows, cdc::log_data_column_name("val2"));
auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl"));
auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch"));
auto val_type = int32_type;
auto val = *first[0][val_index];
@@ -583,10 +585,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
if (post_enabled) {
val = *post_image.back()[val_index];
val2 = *post_image.back()[val2_index];
auto eor = *post_image.back()[eor_index];
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]);
BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val)));
BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2)));
BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor)));
}
const auto& ttl_cell = second[second.size() - 2][ttl_index];

View File

@@ -49,7 +49,7 @@ static thread_local mutation_application_stats app_stats_for_tests;
// Verifies that tombstones in "list" are monotonic, overlap with the requested range,
// and have information equivalent with "expected" in that range.
static
void check_tombstone_slice(const schema& s, std::vector<range_tombstone> list,
void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_tombstone>& list,
const query::clustering_range& range,
std::initializer_list<range_tombstone> expected)
{

View File

@@ -0,0 +1,9 @@
create table tb1 (pk int primary key, c1 counter) with default_time_to_live = 100;
create table tb2 (pk int primary key, c1 counter);
alter table tb2 with default_time_to_live = 100;
create table tb3 (pk int primary key) with default_time_to_live = 100;
alter table tb3 add (c1 counter);
create table tb4 (pk int, ck int, cs counter static, primary KEY (pk, ck)) with default_time_to_live = 100;

View File

@@ -0,0 +1,31 @@
create table tb1 (pk int primary key, c1 counter) with default_time_to_live = 100;
{
"message" : "exceptions::invalid_request_exception (Cannot set default_time_to_live on a table with counters)",
"status" : "error"
}
create table tb2 (pk int primary key, c1 counter);
{
"status" : "ok"
}
alter table tb2 with default_time_to_live = 100;
{
"message" : "exceptions::invalid_request_exception (Cannot set default_time_to_live on a table with counters)",
"status" : "error"
}
create table tb3 (pk int primary key) with default_time_to_live = 100;
{
"status" : "ok"
}
alter table tb3 add (c1 counter);
{
"message" : "exceptions::configuration_exception (Cannot add a counter column (c1) in a non counter column family)",
"status" : "error"
}
create table tb4 (pk int, ck int, cs counter static, primary KEY (pk, ck)) with default_time_to_live = 100;
{
"message" : "exceptions::invalid_request_exception (Cannot set default_time_to_live on a table with counters)",
"status" : "error"
}

View File

@@ -39,7 +39,7 @@ class LiveData(object):
def _discoverMetrics(self):
results = metric.Metric.discover(self._metric_source)
logging.debug('_discoverMetrics: {} results discovered'.format(len(results)))
for symbol in results:
for symbol in list(results):
if not self._matches(symbol, self._metricPatterns):
results.pop(symbol)
logging.debug('_initializeMetrics: {} results matched'.format(len(results)))

View File

@@ -1,4 +1,4 @@
FROM docker.io/fedora:32
FROM docker.io/fedora:33
ADD ./install-dependencies.sh ./
ADD ./seastar/install-dependencies.sh ./seastar/
ADD ./tools/toolchain/system-auth ./
@@ -10,4 +10,6 @@ RUN dnf -y install 'dnf-command(copr)' \
&& echo 'ALL ALL=(ALL:ALL) NOPASSWD: ALL' >> /etc/sudoers \
&& cp system-auth /etc/pam.d \
&& echo 'Defaults !requiretty' >> /etc/sudoers
RUN mkdir -p /root/.m2/repository
ENV JAVA8_HOME=/usr/lib/jvm/java-1.8.0-openjdk
CMD /bin/bash

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-32-20200910
docker.io/scylladb/scylla-toolchain:fedora-33-20201028

View File

@@ -309,7 +309,7 @@ cql_server::unadvertise_connection(shared_ptr<connection> conn) {
const auto ip = conn->get_client_state().get_client_address().addr();
const auto port = conn->get_client_state().get_client_port();
clogger.trace("Advertising disconnection of CQL client {}:{}", ip, port);
return notify_disconnected_client(ip, client_type::cql, port);
return notify_disconnected_client(ip, port, client_type::cql);
}
unsigned
@@ -371,6 +371,16 @@ cql_server::connection::read_frame() {
_version = current_version;
throw exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", client_version));
}
auto client_state_notification_f = std::apply(notify_client_change<changed_column::protocol_version>{},
std::tuple_cat(make_client_key(_client_state), std::make_tuple(_version)));
return client_state_notification_f.then_wrapped([this] (future<> f) {
try {
f.get();
} catch (...) {
clogger.info("exception while setting protocol_version in `system.clients`: {}", std::current_exception());
}
return _read_buf.read_exactly(frame_size() - 1).then([this] (temporary_buffer<char> tail) {
temporary_buffer<char> full(frame_size());
full.get_write()[0] = _version;
@@ -385,6 +395,7 @@ cql_server::connection::read_frame() {
}
return make_ready_future<ret_type>(frame);
});
});
});
} else {
// Not the first frame, so we know the size.
@@ -579,13 +590,19 @@ future<> cql_server::connection::shutdown()
return make_ready_future<>();
}
std::tuple<net::inet_address, int, client_type> cql_server::connection::make_client_key(const service::client_state& cli_state) {
return std::make_tuple(cli_state.get_client_address().addr(),
cli_state.get_client_port(),
cli_state.is_thrift() ? client_type::thrift : client_type::cql);
}
client_data cql_server::connection::make_client_data() const {
client_data cd;
cd.ip = _client_state.get_client_address().addr();
cd.port = _client_state.get_client_port();
cd.ct = client_type::cql;
std::tie(cd.ip, cd.port, cd.ct) = make_client_key(_client_state);
cd.shard_id = this_shard_id();
cd.protocol_version = _version;
cd.driver_name = _client_state.get_driver_name();
cd.driver_version = _client_state.get_driver_version();
if (const auto user_ptr = _client_state.user(); user_ptr) {
cd.username = user_ptr->name;
}
@@ -751,6 +768,21 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
throw exceptions::protocol_exception(format("Unknown compression algorithm: {}", compression));
}
}
auto client_state_notification_f = make_ready_future<>();
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
_client_state.set_driver_version(driver_ver_opt->second);
client_state_notification_f = std::apply(notify_client_change<changed_column::driver_version>{},
std::tuple_cat(make_client_key(_client_state), std::make_tuple(driver_ver_opt->second)));
}
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
_client_state.set_driver_name(driver_name_opt->second);
client_state_notification_f = client_state_notification_f.then([ck = make_client_key(_client_state), dn = driver_name_opt->second] {
return std::apply(notify_client_change<changed_column::driver_name>{},
std::tuple_cat(std::move(ck), std::forward_as_tuple(dn)));
});
}
cql_protocol_extension_enum_set cql_proto_exts;
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
if (options.contains(protocol_extension_name(ext))) {
@@ -758,11 +790,28 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
}
}
_client_state.set_protocol_extensions(std::move(cql_proto_exts));
auto& a = client_state.get_auth_service()->underlying_authenticator();
if (a.require_authentication()) {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), trace_state));
std::unique_ptr<cql_server::response> res;
if (auto& a = client_state.get_auth_service()->underlying_authenticator(); a.require_authentication()) {
res = make_autheticate(stream, a.qualified_java_name(), trace_state);
client_state_notification_f = client_state_notification_f.then([ck = make_client_key(_client_state)] {
return std::apply(notify_client_change<changed_column::connection_stage>{},
std::tuple_cat(std::move(ck), std::make_tuple(connection_stage_literal<client_connection_stage::authenticating>)));
});
} else {
res = make_ready(stream, trace_state);
client_state_notification_f = client_state_notification_f.then([ck = make_client_key(_client_state)] {
return std::apply(notify_client_change<changed_column::connection_stage>{},
std::tuple_cat(std::move(ck), std::make_tuple(connection_stage_literal<client_connection_stage::ready>)));
});
}
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, trace_state));
return client_state_notification_f.then_wrapped([res = std::move(res)] (future<> f) mutable {
try {
f.get();
} catch (...) {
clogger.info("exception while setting driver_name/version in `system.clients`: {}", std::current_exception());
}
return std::move(res);
});
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
@@ -774,6 +823,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
client_state.set_login(std::move(user));
auto f = client_state.check_user_can_login();
if (client_state.user()->name) {
f = f.then([cli_key = make_client_key(client_state), username = *client_state.user()->name] {
return std::apply(notify_client_change<changed_column::username>{},
std::tuple_cat(std::move(cli_key), std::forward_as_tuple(username)));
});
}
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
});
@@ -1109,7 +1164,17 @@ cql_server::connection::process_register(uint16_t stream, request_reader in, ser
auto et = parse_event_type(event_type);
_server._notifier->register_event(et, this);
}
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, std::move(trace_state)));
auto client_state_notification_f = std::apply(notify_client_change<changed_column::connection_stage>{},
std::tuple_cat(make_client_key(_client_state), std::make_tuple(connection_stage_literal<client_connection_stage::ready>)));
return client_state_notification_f.then_wrapped([res = make_ready(stream, std::move(trace_state))] (future<> f) mutable {
try {
f.get();
} catch (...) {
clogger.info("exception while setting connection_stage in `system.clients`: {}", std::current_exception());
}
return std::move(res);
});
}
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const

View File

@@ -48,6 +48,7 @@ class registrations;
}
class database;
enum class client_type;
struct client_data;
namespace cql_transport {
@@ -189,6 +190,7 @@ private:
future<> process();
future<> process_request();
future<> shutdown();
static std::tuple<net::inet_address, int, client_type> make_client_key(const service::client_state& cli_state);
client_data make_client_data() const;
const service::client_state& get_client_state() const { return _client_state; }
private:

View File

@@ -83,6 +83,7 @@ public:
template <typename T>
standard_migrator<T>& get_standard_migrator()
{
seastar::memory::disable_failure_guard dfg;
static thread_local standard_migrator<T> instance;
return instance;
}