Compare commits
21 Commits
next-4.3
...
next-f33-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2edc1ca039 | ||
|
|
5ff5d43c7a | ||
|
|
b2ce3b197e | ||
|
|
289a08072a | ||
|
|
4b9206a180 | ||
|
|
2a42fc5cde | ||
|
|
fc15d0a4be | ||
|
|
6eb3ba74e4 | ||
|
|
e0176bccab | ||
|
|
92b741b4ff | ||
|
|
a57d4c0092 | ||
|
|
e2a02f15c2 | ||
|
|
52db99f25f | ||
|
|
1bc96a5785 | ||
|
|
4b65d67a1a | ||
|
|
82aabab054 | ||
|
|
46ea8c9b8b | ||
|
|
0251cb9b31 | ||
|
|
6abe1352ba | ||
|
|
d2d162ece3 | ||
|
|
acf0341e9b |
@@ -475,6 +475,8 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
status = "ENABLED";
|
||||
}
|
||||
}
|
||||
|
||||
auto ttl = std::chrono::seconds(opts.ttl());
|
||||
|
||||
rjson::set(stream_desc, "StreamStatus", rjson::from_string(status));
|
||||
|
||||
@@ -498,10 +500,10 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
// cannot really "resume" query, must iterate all data. because we cannot query neither "time" (pk) > something,
|
||||
// or on expired...
|
||||
// TODO: maybe add secondary index to topology table to enable this?
|
||||
return _sdks.cdc_get_versioned_streams({ tm.count_normal_token_owners() }).then([this, &db, schema, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc)](std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
|
||||
return _sdks.cdc_get_versioned_streams({ tm.count_normal_token_owners() }).then([this, &db, schema, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc), ttl](std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
|
||||
|
||||
// filter out cdc generations older than the table or now() - dynamodb_streams_max_window (24h)
|
||||
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - dynamodb_streams_max_window);
|
||||
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
|
||||
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
|
||||
|
||||
auto i = topologies.lower_bound(low_ts);
|
||||
// need first gen _intersecting_ the timestamp.
|
||||
|
||||
25
cdc/log.cc
25
cdc/log.cc
@@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
|
||||
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
|
||||
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
|
||||
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
|
||||
b.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
|
||||
for (const auto& column : columns) {
|
||||
@@ -880,14 +881,26 @@ public:
|
||||
return _base_schema;
|
||||
}
|
||||
|
||||
clustering_key create_ck(int batch) const {
|
||||
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
|
||||
}
|
||||
|
||||
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
|
||||
// The numbering of batch sequence numbers starts from 0.
|
||||
clustering_key allocate_new_log_row() {
|
||||
auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) });
|
||||
auto log_ck = create_ck(_batch_no++);
|
||||
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
|
||||
return log_ck;
|
||||
}
|
||||
|
||||
bool has_rows() const {
|
||||
return _batch_no != 0;
|
||||
}
|
||||
|
||||
clustering_key last_row_key() const {
|
||||
return create_ck(_batch_no - 1);
|
||||
}
|
||||
|
||||
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
|
||||
clustering_key allocate_new_log_row(operation op) {
|
||||
auto log_ck = allocate_new_log_row();
|
||||
@@ -944,6 +957,11 @@ public:
|
||||
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
|
||||
}
|
||||
|
||||
void end_record() {
|
||||
if (has_rows()) {
|
||||
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
|
||||
}
|
||||
}
|
||||
private:
|
||||
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
|
||||
size_t pos = 0;
|
||||
@@ -1519,6 +1537,11 @@ public:
|
||||
cdc::inspect_mutation(m, v);
|
||||
}
|
||||
|
||||
void end_record() override {
|
||||
assert(_builder);
|
||||
_builder->end_record();
|
||||
}
|
||||
|
||||
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
|
||||
// The `transformer` object on which this method was called on should not be used anymore.
|
||||
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
|
||||
|
||||
@@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
|
||||
processor.produce_postimage(&ck);
|
||||
}
|
||||
}
|
||||
|
||||
processor.end_record();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro
|
||||
processor.produce_postimage(&cr.key());
|
||||
}
|
||||
}
|
||||
|
||||
processor.end_record();
|
||||
}
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -77,6 +77,10 @@ public:
|
||||
// both columns have different timestamp or TTL set.
|
||||
// m - the small mutation to be converted into CDC log rows.
|
||||
virtual void process_change(const mutation& m) = 0;
|
||||
|
||||
// Tells processor we have reached end of record - last part
|
||||
// of a given timestamp batch
|
||||
virtual void end_record() = 0;
|
||||
};
|
||||
|
||||
bool should_split(const mutation& base_mutation);
|
||||
|
||||
27
configure.py
27
configure.py
@@ -257,18 +257,18 @@ modes = {
|
||||
'stack-usage-threshold': 1024*40,
|
||||
},
|
||||
'release': {
|
||||
'cxxflags': '',
|
||||
'cxx_ld_flags': '-O3 -ffunction-sections -fdata-sections -Wl,--gc-sections',
|
||||
'cxxflags': '-O3 -ffunction-sections -fdata-sections ',
|
||||
'cxx_ld_flags': '-Wl,--gc-sections',
|
||||
'stack-usage-threshold': 1024*13,
|
||||
},
|
||||
'dev': {
|
||||
'cxxflags': '-DSEASTAR_ENABLE_ALLOC_FAILURE_INJECTION -DSCYLLA_ENABLE_ERROR_INJECTION',
|
||||
'cxx_ld_flags': '-O1',
|
||||
'cxxflags': '-O1 -DSEASTAR_ENABLE_ALLOC_FAILURE_INJECTION -DSCYLLA_ENABLE_ERROR_INJECTION',
|
||||
'cxx_ld_flags': '',
|
||||
'stack-usage-threshold': 1024*21,
|
||||
},
|
||||
'sanitize': {
|
||||
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
|
||||
'cxx_ld_flags': '-Os',
|
||||
'cxxflags': '-Os -DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
|
||||
'cxx_ld_flags': '',
|
||||
'stack-usage-threshold': 1024*50,
|
||||
}
|
||||
}
|
||||
@@ -1167,11 +1167,11 @@ optimization_flags = [
|
||||
optimization_flags = [o
|
||||
for o in optimization_flags
|
||||
if flag_supported(flag=o, compiler=args.cxx)]
|
||||
modes['release']['cxx_ld_flags'] += ' ' + ' '.join(optimization_flags)
|
||||
modes['release']['cxxflags'] += ' ' + ' '.join(optimization_flags)
|
||||
|
||||
if flag_supported(flag='-Wstack-usage=4096', compiler=args.cxx):
|
||||
for mode in modes:
|
||||
modes[mode]['cxx_ld_flags'] += f' -Wstack-usage={modes[mode]["stack-usage-threshold"]} -Wno-error=stack-usage='
|
||||
modes[mode]['cxxflags'] += f' -Wstack-usage={modes[mode]["stack-usage-threshold"]} -Wno-error=stack-usage='
|
||||
|
||||
linker_flags = linker_flags(compiler=args.cxx)
|
||||
|
||||
@@ -1338,6 +1338,13 @@ libdeflate_cflags = seastar_cflags
|
||||
|
||||
MODE_TO_CMAKE_BUILD_TYPE = {'release' : 'RelWithDebInfo', 'debug' : 'Debug', 'dev' : 'Dev', 'sanitize' : 'Sanitize' }
|
||||
|
||||
# cmake likes to separate things with semicolons
|
||||
def semicolon_separated(*flags):
|
||||
# original flags may be space separated, so convert to string still
|
||||
# using spaces
|
||||
f = ' '.join(flags)
|
||||
return re.sub(' +', ';', f)
|
||||
|
||||
def configure_seastar(build_dir, mode):
|
||||
seastar_build_dir = os.path.join(build_dir, mode, 'seastar')
|
||||
|
||||
@@ -1346,8 +1353,8 @@ def configure_seastar(build_dir, mode):
|
||||
'-DCMAKE_C_COMPILER={}'.format(args.cc),
|
||||
'-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
|
||||
'-DCMAKE_EXPORT_NO_PACKAGE_REGISTRY=ON',
|
||||
'-DSeastar_CXX_FLAGS={}'.format((seastar_cflags + ' ' + modes[mode]['cxx_ld_flags']).replace(' ', ';')),
|
||||
'-DSeastar_LD_FLAGS={}'.format(seastar_ldflags),
|
||||
'-DSeastar_CXX_FLAGS={}'.format((seastar_cflags).replace(' ', ';')),
|
||||
'-DSeastar_LD_FLAGS={}'.format(semicolon_separated(seastar_ldflags, modes[mode]['cxx_ld_flags'])),
|
||||
'-DSeastar_CXX_DIALECT=gnu++20',
|
||||
'-DSeastar_API_LEVEL=6',
|
||||
'-DSeastar_UNUSED_RESULT_ERROR=ON',
|
||||
|
||||
@@ -20,44 +20,47 @@
|
||||
*/
|
||||
|
||||
#include "connection_notifier.hh"
|
||||
#include "db/query_context.hh"
|
||||
#include "cql3/constants.hh"
|
||||
#include "database.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
namespace db::system_keyspace {
|
||||
extern const char *const CLIENTS;
|
||||
}
|
||||
|
||||
static sstring to_string(client_type ct) {
|
||||
sstring to_string(client_type ct) {
|
||||
switch (ct) {
|
||||
case client_type::cql: return "cql";
|
||||
case client_type::thrift: return "thrift";
|
||||
case client_type::alternator: return "alternator";
|
||||
default: throw std::runtime_error("Invalid client_type");
|
||||
}
|
||||
throw std::runtime_error("Invalid client_type");
|
||||
}
|
||||
|
||||
static sstring to_string(client_connection_stage ccs) {
|
||||
switch (ccs) {
|
||||
case client_connection_stage::established: return connection_stage_literal<client_connection_stage::established>;
|
||||
case client_connection_stage::authenticating: return connection_stage_literal<client_connection_stage::authenticating>;
|
||||
case client_connection_stage::ready: return connection_stage_literal<client_connection_stage::ready>;
|
||||
}
|
||||
throw std::runtime_error("Invalid client_connection_stage");
|
||||
}
|
||||
|
||||
future<> notify_new_client(client_data cd) {
|
||||
// FIXME: consider prepared statement
|
||||
const static sstring req
|
||||
= format("INSERT INTO system.{} (address, port, client_type, shard_id, protocol_version, username) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?);", db::system_keyspace::CLIENTS);
|
||||
= format("INSERT INTO system.{} (address, port, client_type, connection_stage, shard_id, protocol_version, username) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?);", db::system_keyspace::CLIENTS);
|
||||
|
||||
return db::execute_cql(req,
|
||||
std::move(cd.ip), cd.port, to_string(cd.ct), cd.shard_id,
|
||||
std::move(cd.ip), cd.port, to_string(cd.ct), to_string(cd.connection_stage), cd.shard_id,
|
||||
cd.protocol_version.has_value() ? data_value(*cd.protocol_version) : data_value::make_null(int32_type),
|
||||
cd.username.value_or("anonymous")).discard_result();
|
||||
}
|
||||
|
||||
future<> notify_disconnected_client(gms::inet_address addr, client_type ct, int port) {
|
||||
future<> notify_disconnected_client(net::inet_address addr, int port, client_type ct) {
|
||||
// FIXME: consider prepared statement
|
||||
const static sstring req
|
||||
= format("DELETE FROM system.{} where address=? AND port=? AND client_type=?;",
|
||||
db::system_keyspace::CLIENTS);
|
||||
return db::execute_cql(req, addr.addr(), port, to_string(ct)).discard_result();
|
||||
return db::execute_cql(req, std::move(addr), port, to_string(ct)).discard_result();
|
||||
}
|
||||
|
||||
future<> clear_clientlist() {
|
||||
|
||||
@@ -20,27 +20,65 @@
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
#include "db/query_context.hh"
|
||||
|
||||
#include <seastar/net/inet_address.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
|
||||
#include <optional>
|
||||
|
||||
namespace db::system_keyspace {
|
||||
extern const char *const CLIENTS;
|
||||
}
|
||||
|
||||
enum class client_type {
|
||||
cql = 0,
|
||||
thrift,
|
||||
alternator,
|
||||
};
|
||||
|
||||
sstring to_string(client_type ct);
|
||||
|
||||
enum class changed_column {
|
||||
username = 0,
|
||||
connection_stage,
|
||||
driver_name,
|
||||
driver_version,
|
||||
hostname,
|
||||
protocol_version,
|
||||
};
|
||||
|
||||
template <changed_column column> constexpr const char* column_literal = "";
|
||||
template <> constexpr const char* column_literal<changed_column::username> = "username";
|
||||
template <> constexpr const char* column_literal<changed_column::connection_stage> = "connection_stage";
|
||||
template <> constexpr const char* column_literal<changed_column::driver_name> = "driver_name";
|
||||
template <> constexpr const char* column_literal<changed_column::driver_version> = "driver_version";
|
||||
template <> constexpr const char* column_literal<changed_column::hostname> = "hostname";
|
||||
template <> constexpr const char* column_literal<changed_column::protocol_version> = "protocol_version";
|
||||
|
||||
enum class client_connection_stage {
|
||||
established = 0,
|
||||
authenticating,
|
||||
ready,
|
||||
};
|
||||
|
||||
template <client_connection_stage ccs> constexpr const char* connection_stage_literal = "";
|
||||
template <> constexpr const char* connection_stage_literal<client_connection_stage::established> = "ESTABLISHED";
|
||||
template <> constexpr const char* connection_stage_literal<client_connection_stage::authenticating> = "AUTHENTICATING";
|
||||
template <> constexpr const char* connection_stage_literal<client_connection_stage::ready> = "READY";
|
||||
|
||||
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
|
||||
struct client_data {
|
||||
gms::inet_address ip;
|
||||
net::inet_address ip;
|
||||
int32_t port;
|
||||
client_type ct;
|
||||
client_connection_stage connection_stage = client_connection_stage::established;
|
||||
int32_t shard_id; /// ID of server-side shard which is processing the connection.
|
||||
|
||||
// `optional' column means that it's nullable (possibly because it's
|
||||
// unimplemented yet). If you want to fill ("implement") any of them,
|
||||
// remember to update the query in `notify_new_client()'.
|
||||
std::optional<sstring> connection_stage;
|
||||
std::optional<sstring> driver_name;
|
||||
std::optional<sstring> driver_version;
|
||||
std::optional<sstring> hostname;
|
||||
@@ -52,6 +90,17 @@ struct client_data {
|
||||
};
|
||||
|
||||
future<> notify_new_client(client_data cd);
|
||||
future<> notify_disconnected_client(gms::inet_address addr, client_type ct, int port);
|
||||
|
||||
future<> notify_disconnected_client(net::inet_address addr, int port, client_type ct);
|
||||
future<> clear_clientlist();
|
||||
|
||||
template <changed_column column_enum_val>
|
||||
struct notify_client_change {
|
||||
template <typename T>
|
||||
future<> operator()(net::inet_address addr, int port, client_type ct, T&& value) {
|
||||
const static sstring req
|
||||
= format("UPDATE system.{} SET {}=? WHERE address=? AND port=? AND client_type=?;",
|
||||
db::system_keyspace::CLIENTS, column_literal<column_enum_val>);
|
||||
|
||||
return db::execute_cql(req, std::forward<T>(value), std::move(addr), port, to_string(ct)).discard_result();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -204,6 +204,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
}
|
||||
|
||||
_properties.validate(db, _properties.properties()->make_schema_extensions(db.extensions()));
|
||||
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
|
||||
|
||||
auto stmt = ::make_shared<create_table_statement>(_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
|
||||
|
||||
@@ -211,6 +212,11 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
for (auto&& entry : _definitions) {
|
||||
::shared_ptr<column_identifier> id = entry.first;
|
||||
cql3_type pt = entry.second->prepare(db, keyspace());
|
||||
|
||||
if (has_default_ttl && pt.is_counter()) {
|
||||
throw exceptions::invalid_request_exception("Cannot set default_time_to_live on a table with counters");
|
||||
}
|
||||
|
||||
if (pt.get_type()->is_multi_cell()) {
|
||||
if (pt.get_type()->is_user_type()) {
|
||||
// check for multi-cell types (non-frozen UDTs or collections) inside a non-frozen UDT
|
||||
|
||||
15
database.hh
15
database.hh
@@ -765,21 +765,6 @@ public:
|
||||
future<> clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<db::replay_position> discard_sstables(db_clock::time_point);
|
||||
|
||||
// Make sure the generation numbers are sequential, starting from "start".
|
||||
// Generations before "start" are left untouched.
|
||||
//
|
||||
// Return the highest generation number seen so far
|
||||
//
|
||||
// Word of warning: although this function will reshuffle anything over "start", it is
|
||||
// very dangerous to do that with live SSTables. This is meant to be used with SSTables
|
||||
// that are not yet managed by the system.
|
||||
//
|
||||
// Parameter all_generations stores the generation of all SSTables in the system, so it
|
||||
// will be easy to determine which SSTable is new.
|
||||
// An example usage would query all shards asking what is the highest SSTable number known
|
||||
// to them, and then pass that + 1 as "start".
|
||||
future<std::vector<sstables::entry_descriptor>> reshuffle_sstables(std::set<int64_t> all_generations, int64_t start);
|
||||
|
||||
// FIXME: this is just an example, should be changed to something more
|
||||
// general. compact_all_sstables() starts a compaction of all sstables.
|
||||
// It doesn't flush the current memtable first. It's just a ad-hoc method,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Getting Started With ScyllaDB Alternator
|
||||
---
|
||||
|
||||
## Installing Scylla
|
||||
Before you can start using ScyllaDB Alternator, you will have to have an up
|
||||
and running scylla cluster configured to expose the alternator port.
|
||||
|
||||
@@ -28,6 +28,7 @@ else
|
||||
fi
|
||||
|
||||
debian_base_packages=(
|
||||
clang
|
||||
liblua5.3-dev
|
||||
python3-pyparsing
|
||||
python3-colorama
|
||||
@@ -44,6 +45,7 @@ debian_base_packages=(
|
||||
)
|
||||
|
||||
fedora_packages=(
|
||||
clang
|
||||
lua-devel
|
||||
yaml-cpp-devel
|
||||
thrift-devel
|
||||
@@ -57,6 +59,7 @@ fedora_packages=(
|
||||
python
|
||||
sudo
|
||||
java-1.8.0-openjdk-headless
|
||||
java-1.8.0-openjdk-devel
|
||||
ant
|
||||
ant-junit
|
||||
maven
|
||||
|
||||
@@ -106,6 +106,7 @@ adjust_bin() {
|
||||
"$root/$prefix/libexec/$bin"
|
||||
cat > "$root/$prefix/bin/$bin" <<EOF
|
||||
#!/bin/bash -e
|
||||
[[ -z "\$LD_PRELOAD" ]] || { echo "\$0: not compatible with LD_PRELOAD" >&2; exit 110; }
|
||||
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
|
||||
export LD_LIBRARY_PATH="$prefix/libreloc"
|
||||
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
|
||||
@@ -130,6 +131,7 @@ relocate_python3() {
|
||||
cp "$script" "$relocateddir"
|
||||
cat > "$install"<<EOF
|
||||
#!/usr/bin/env bash
|
||||
[[ -z "\$LD_PRELOAD" ]] || { echo "\$0: not compatible with LD_PRELOAD" >&2; exit 110; }
|
||||
export LC_ALL=en_US.UTF-8
|
||||
x="\$(readlink -f "\$0")"
|
||||
b="\$(basename "\$x")"
|
||||
|
||||
@@ -542,12 +542,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
|
||||
return partition_snapshot_ptr(std::move(snp));
|
||||
}
|
||||
|
||||
std::vector<range_tombstone>
|
||||
partition_snapshot::range_tombstone_result
|
||||
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end)
|
||||
{
|
||||
partition_version* v = &*version();
|
||||
if (!v->next()) {
|
||||
return boost::copy_range<std::vector<range_tombstone>>(
|
||||
return boost::copy_range<range_tombstone_result>(
|
||||
v->partition().row_tombstones().slice(*_schema, start, end));
|
||||
}
|
||||
range_tombstone_list list(*_schema);
|
||||
@@ -557,10 +557,10 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_
|
||||
}
|
||||
v = v->next();
|
||||
}
|
||||
return boost::copy_range<std::vector<range_tombstone>>(list.slice(*_schema, start, end));
|
||||
return boost::copy_range<range_tombstone_result>(list.slice(*_schema, start, end));
|
||||
}
|
||||
|
||||
std::vector<range_tombstone>
|
||||
partition_snapshot::range_tombstone_result
|
||||
partition_snapshot::range_tombstones()
|
||||
{
|
||||
return range_tombstones(
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "utils/anchorless_list.hh"
|
||||
#include "utils/logalloc.hh"
|
||||
#include "utils/coroutine.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
#include <boost/intrusive/parent_from_member.hpp>
|
||||
#include <boost/intrusive/slist.hpp>
|
||||
@@ -400,10 +401,13 @@ public:
|
||||
::static_row static_row(bool digest_requested) const;
|
||||
bool static_row_continuous() const;
|
||||
mutation_partition squashed() const;
|
||||
|
||||
using range_tombstone_result = utils::chunked_vector<range_tombstone>;
|
||||
|
||||
// Returns range tombstones overlapping with [start, end)
|
||||
std::vector<range_tombstone> range_tombstones(position_in_partition_view start, position_in_partition_view end);
|
||||
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
|
||||
// Returns all range tombstones
|
||||
std::vector<range_tombstone> range_tombstones();
|
||||
range_tombstone_result range_tombstones();
|
||||
};
|
||||
|
||||
class partition_snapshot_ptr {
|
||||
|
||||
@@ -509,7 +509,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class repair_writer {
|
||||
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
uint64_t _estimated_partitions;
|
||||
@@ -569,8 +569,9 @@ public:
|
||||
table& t = db.local().find_column_family(_schema->id());
|
||||
auto [queue_reader, queue_handle] = make_queue_reader(_schema, _permit);
|
||||
_mq[node_idx] = std::move(queue_handle);
|
||||
auto writer = shared_from_this();
|
||||
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions, writer] (flat_mutation_reader reader) {
|
||||
auto& t = db.local().find_column_family(reader.schema());
|
||||
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
@@ -598,13 +599,13 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
t.stream_in_progress()).then([node_idx, writer] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
writer->_schema->ks_name(), writer->_schema->cf_name(), partitions);
|
||||
}).handle_exception([node_idx, writer] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
writer->_schema->ks_name(), writer->_schema->cf_name(), ep);
|
||||
writer->_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
@@ -718,7 +719,7 @@ private:
|
||||
size_t _nr_peer_nodes= 1;
|
||||
repair_stats _stats;
|
||||
repair_reader _repair_reader;
|
||||
repair_writer _repair_writer;
|
||||
lw_shared_ptr<repair_writer> _repair_writer;
|
||||
// Contains rows read from disk
|
||||
std::list<repair_row> _row_buf;
|
||||
// Contains rows we are working on to sync between peers
|
||||
@@ -822,7 +823,7 @@ public:
|
||||
_seed,
|
||||
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
||||
)
|
||||
, _repair_writer(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason)
|
||||
, _repair_writer(make_lw_shared<repair_writer>(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason))
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[&ms] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return ms.local().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
||||
@@ -855,7 +856,7 @@ public:
|
||||
auto f2 = _sink_source_for_get_row_diff.close();
|
||||
auto f3 = _sink_source_for_put_row_diff.close();
|
||||
return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
|
||||
return _repair_writer.wait_for_writer_done();
|
||||
return _repair_writer->wait_for_writer_done();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1340,8 +1341,8 @@ private:
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>&& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list<repair_row>& row_diff) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return with_semaphore(_repair_writer->sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer->create_writer(_db, node_idx);
|
||||
return repeat([this, node_idx, update_buf, &row_diff] () mutable {
|
||||
if (row_diff.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
@@ -1355,7 +1356,7 @@ private:
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
return _repair_writer->do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
row_diff.pop_front();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
|
||||
@@ -154,7 +154,9 @@ ar.reloc_add('licenses')
|
||||
ar.reloc_add('swagger-ui')
|
||||
ar.reloc_add('api')
|
||||
def exclude_submodules(tarinfo):
|
||||
if tarinfo.name in ('scylla/tools/jmx', 'scylla/tools/java'):
|
||||
if tarinfo.name in ('scylla/tools/jmx',
|
||||
'scylla/tools/java',
|
||||
'scylla/tools/python3'):
|
||||
return None
|
||||
return tarinfo
|
||||
ar.reloc_add('tools', filter=exclude_submodules)
|
||||
|
||||
@@ -118,6 +118,7 @@ private:
|
||||
private volatile String keyspace;
|
||||
#endif
|
||||
std::optional<auth::authenticated_user> _user;
|
||||
std::optional<sstring> _driver_name, _driver_version;
|
||||
|
||||
auth_state _auth_state = auth_state::UNINITIALIZED;
|
||||
|
||||
@@ -147,6 +148,20 @@ public:
|
||||
_auth_state = new_state;
|
||||
}
|
||||
|
||||
std::optional<sstring> get_driver_name() const {
|
||||
return _driver_name;
|
||||
}
|
||||
void set_driver_name(sstring driver_name) {
|
||||
_driver_name = std::move(driver_name);
|
||||
}
|
||||
|
||||
std::optional<sstring> get_driver_version() const {
|
||||
return _driver_version;
|
||||
}
|
||||
void set_driver_version(sstring driver_version) {
|
||||
_driver_version = std::move(driver_version);
|
||||
}
|
||||
|
||||
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
|
||||
: _is_internal(false)
|
||||
, _is_thrift(thrift)
|
||||
|
||||
27
table.cc
27
table.cc
@@ -852,33 +852,6 @@ table::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<sstables::entry_descriptor>>
|
||||
table::reshuffle_sstables(std::set<int64_t> all_generations, int64_t start) {
|
||||
struct work {
|
||||
std::set<int64_t> all_generations; // Stores generation of all live sstables in the system.
|
||||
work(int64_t start, std::set<int64_t> gens)
|
||||
: all_generations(gens) {}
|
||||
};
|
||||
|
||||
return do_with(work(start, std::move(all_generations)), [this] (work& work) {
|
||||
tlogger.info("Reshuffling SSTables in {}...", _config.datadir);
|
||||
return lister::scan_dir(_config.datadir, { directory_entry_type::regular }, [this, &work] (fs::path parent_dir, directory_entry de) {
|
||||
auto comps = sstables::entry_descriptor::make_descriptor(parent_dir.native(), de.name);
|
||||
if (comps.component != component_type::TOC) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// Skip generations that were already loaded by Scylla at a previous stage.
|
||||
if (work.all_generations.contains(comps.generation)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return make_exception_future<>(std::runtime_error("Loading SSTables from the main SSTable directory is unsafe and no longer supported."
|
||||
" You will find a directory called upload/ inside the table directory that can be used to load new SSTables into the system"));
|
||||
}, &sstables::manifest_json_filter).then([&work] {
|
||||
return make_ready_future<std::vector<sstables::entry_descriptor>>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void table::set_metrics() {
|
||||
auto cf = column_family_label(_schema->cf_name());
|
||||
auto ks = keyspace_label(_schema->ks_name());
|
||||
|
||||
@@ -1270,7 +1270,7 @@ def test_stream_specification(test_table_stream_with_result, dynamodbstreams):
|
||||
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
|
||||
def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
|
||||
table, arn = test_table_ss_keys_only
|
||||
iterators = latest_iterators(dynamodbstreams, arn)
|
||||
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
|
||||
# Do an UpdateItem operation that is expected to leave one event in the
|
||||
# stream.
|
||||
table.update_item(Key={'p': random_string(), 'c': random_string()},
|
||||
@@ -1286,7 +1286,7 @@ def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
|
||||
# eventually *one* of the stream shards will return one event:
|
||||
timeout = time.time() + 15
|
||||
while time.time() < timeout:
|
||||
for iter in iterators:
|
||||
for (shard_id, iter) in shards_and_iterators:
|
||||
response = dynamodbstreams.get_records(ShardIterator=iter)
|
||||
if 'Records' in response and response['Records'] != []:
|
||||
# Found the shard with the data! Test that it only has
|
||||
@@ -1301,10 +1301,105 @@ def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
|
||||
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
|
||||
assert len(response['Records']) == 0
|
||||
assert not 'NextShardIterator' in response
|
||||
# Until now we verified that we can read the closed shard
|
||||
# using an old iterator. Let's test now that the closed
|
||||
# shard id is also still valid, and a new iterator can be
|
||||
# created for it, and the old data can be read from it:
|
||||
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
||||
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
|
||||
response = dynamodbstreams.get_records(ShardIterator=iter)
|
||||
assert len(response['Records']) == 1
|
||||
return
|
||||
time.sleep(0.5)
|
||||
pytest.fail("timed out")
|
||||
|
||||
# In the above test (test_streams_closed_read) we used a disabled stream as
|
||||
# a means to generate a closed shard, and tested the behavior of that closed
|
||||
# shard. In the following test, we do more extensive testing on the the
|
||||
# behavior of a disabled stream and verify that it is sill usable (for 24
|
||||
# hours), reproducing issue #7239: The disabled stream's ARN should still be
|
||||
# listed for the table, this ARN should continue to work, listing the
|
||||
# stream's shards should give an indication that they are all closed - but
|
||||
# all these shards should still be readable.
|
||||
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
|
||||
def test_streams_disabled_stream(test_table_ss_keys_only, dynamodbstreams):
|
||||
table, arn = test_table_ss_keys_only
|
||||
iterators = latest_iterators(dynamodbstreams, arn)
|
||||
# Do an UpdateItem operation that is expected to leave one event in the
|
||||
# stream.
|
||||
table.update_item(Key={'p': random_string(), 'c': random_string()},
|
||||
UpdateExpression='SET x = :x', ExpressionAttributeValues={':x': 5})
|
||||
|
||||
# Wait for this one update to become available in the stream before we
|
||||
# disable the stream. Otherwise, theoretically (although unlikely in
|
||||
# practice) we may disable the stream before the update was saved to it.
|
||||
timeout = time.time() + 15
|
||||
found = False
|
||||
while time.time() < timeout and not found:
|
||||
for iter in iterators:
|
||||
response = dynamodbstreams.get_records(ShardIterator=iter)
|
||||
if 'Records' in response and len(response['Records']) > 0:
|
||||
found = True
|
||||
break
|
||||
time.sleep(0.5)
|
||||
assert found
|
||||
|
||||
# Disable streaming for this table. Note that the test_table_ss_keys_only
|
||||
# fixture has "function" scope so it is fine to ruin table, it will not
|
||||
# be used in other tests.
|
||||
disable_stream(dynamodbstreams, table)
|
||||
|
||||
# Check that the stream ARN which we previously got for the disabled
|
||||
# stream is still listed by ListStreams
|
||||
arns = [stream['StreamArn'] for stream in dynamodbstreams.list_streams(TableName=table.name)['Streams']]
|
||||
assert arn in arns
|
||||
|
||||
# DescribeStream on the disabled stream still works and lists its shards.
|
||||
# All these shards are listed as being closed (i.e., should have
|
||||
# EndingSequenceNumber). The basic details of the stream (e.g., the view
|
||||
# type) are available and the status of the stream is DISABLED.
|
||||
response = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
|
||||
assert response['StreamStatus'] == 'DISABLED'
|
||||
assert response['StreamViewType'] == 'KEYS_ONLY'
|
||||
assert response['TableName'] == table.name
|
||||
shards_info = response['Shards']
|
||||
while 'LastEvaluatedShardId' in response:
|
||||
response = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
|
||||
assert response['StreamStatus'] == 'DISABLED'
|
||||
assert response['StreamViewType'] == 'KEYS_ONLY'
|
||||
assert response['TableName'] == table.name
|
||||
shards_info.extend(response['Shards'])
|
||||
print('Number of shards in stream: {}'.format(len(shards_info)))
|
||||
for shard in shards_info:
|
||||
assert 'EndingSequenceNumber' in shard['SequenceNumberRange']
|
||||
assert shard['SequenceNumberRange']['EndingSequenceNumber'].isdecimal()
|
||||
|
||||
# We can get TRIM_HORIZON iterators for all these shards, to read all
|
||||
# the old data they still have (this data should be saved for 24 hours
|
||||
# after the stream was disabled)
|
||||
iterators = []
|
||||
for shard in shards_info:
|
||||
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
||||
ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'])
|
||||
|
||||
# We can read the one change we did in one of these iterators. The data
|
||||
# should be available immediately - no need for retries with timeout.
|
||||
nrecords = 0
|
||||
for iter in iterators:
|
||||
response = dynamodbstreams.get_records(ShardIterator=iter)
|
||||
if 'Records' in response:
|
||||
nrecords += len(response['Records'])
|
||||
# The shard is closed, so NextShardIterator should either be missing
|
||||
# now, indicating that it is a closed shard (DynamoDB does this),
|
||||
# or, it may (and currently does in Alternator) return an iterator
|
||||
# and reading from *that* iterator should then tell us that
|
||||
# we reached the end of the shard (i.e., zero results and
|
||||
# missing NextShardIterator).
|
||||
if 'NextShardIterator' in response:
|
||||
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
|
||||
assert len(response['Records']) == 0
|
||||
assert not 'NextShardIterator' in response
|
||||
assert nrecords == 1
|
||||
|
||||
# TODO: tests on multiple partitions
|
||||
# TODO: write a test that disabling the stream and re-enabling it works, but
|
||||
|
||||
@@ -326,6 +326,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
|
||||
// cdc log clustering key
|
||||
assert_has_column(cdc::log_meta_column_name("operation"), byte_type);
|
||||
assert_has_column(cdc::log_meta_column_name("ttl"), long_type);
|
||||
assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type);
|
||||
|
||||
// pk
|
||||
assert_has_column(cdc::log_data_column_name("pk"), int32_type);
|
||||
@@ -534,6 +535,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
auto val_index = column_index(*rows, cdc::log_data_column_name("val"));
|
||||
auto val2_index = column_index(*rows, cdc::log_data_column_name("val2"));
|
||||
auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl"));
|
||||
auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch"));
|
||||
|
||||
auto val_type = int32_type;
|
||||
auto val = *first[0][val_index];
|
||||
@@ -583,10 +585,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
if (post_enabled) {
|
||||
val = *post_image.back()[val_index];
|
||||
val2 = *post_image.back()[val2_index];
|
||||
auto eor = *post_image.back()[eor_index];
|
||||
|
||||
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]);
|
||||
BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val)));
|
||||
BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2)));
|
||||
BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor)));
|
||||
}
|
||||
|
||||
const auto& ttl_cell = second[second.size() - 2][ttl_index];
|
||||
|
||||
@@ -49,7 +49,7 @@ static thread_local mutation_application_stats app_stats_for_tests;
|
||||
// Verifies that tombstones in "list" are monotonic, overlap with the requested range,
|
||||
// and have information equivalent with "expected" in that range.
|
||||
static
|
||||
void check_tombstone_slice(const schema& s, std::vector<range_tombstone> list,
|
||||
void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_tombstone>& list,
|
||||
const query::clustering_range& range,
|
||||
std::initializer_list<range_tombstone> expected)
|
||||
{
|
||||
|
||||
9
test/cql/counters_disallow_ttl_test.cql
Normal file
9
test/cql/counters_disallow_ttl_test.cql
Normal file
@@ -0,0 +1,9 @@
|
||||
create table tb1 (pk int primary key, c1 counter) with default_time_to_live = 100;
|
||||
|
||||
create table tb2 (pk int primary key, c1 counter);
|
||||
alter table tb2 with default_time_to_live = 100;
|
||||
|
||||
create table tb3 (pk int primary key) with default_time_to_live = 100;
|
||||
alter table tb3 add (c1 counter);
|
||||
|
||||
create table tb4 (pk int, ck int, cs counter static, primary KEY (pk, ck)) with default_time_to_live = 100;
|
||||
31
test/cql/counters_disallow_ttl_test.result
Normal file
31
test/cql/counters_disallow_ttl_test.result
Normal file
@@ -0,0 +1,31 @@
|
||||
create table tb1 (pk int primary key, c1 counter) with default_time_to_live = 100;
|
||||
{
|
||||
"message" : "exceptions::invalid_request_exception (Cannot set default_time_to_live on a table with counters)",
|
||||
"status" : "error"
|
||||
}
|
||||
|
||||
create table tb2 (pk int primary key, c1 counter);
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
alter table tb2 with default_time_to_live = 100;
|
||||
{
|
||||
"message" : "exceptions::invalid_request_exception (Cannot set default_time_to_live on a table with counters)",
|
||||
"status" : "error"
|
||||
}
|
||||
|
||||
create table tb3 (pk int primary key) with default_time_to_live = 100;
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
alter table tb3 add (c1 counter);
|
||||
{
|
||||
"message" : "exceptions::configuration_exception (Cannot add a counter column (c1) in a non counter column family)",
|
||||
"status" : "error"
|
||||
}
|
||||
|
||||
create table tb4 (pk int, ck int, cs counter static, primary KEY (pk, ck)) with default_time_to_live = 100;
|
||||
{
|
||||
"message" : "exceptions::invalid_request_exception (Cannot set default_time_to_live on a table with counters)",
|
||||
"status" : "error"
|
||||
}
|
||||
Submodule tools/java updated: f2e8666d7e...ad48b44a26
@@ -39,7 +39,7 @@ class LiveData(object):
|
||||
def _discoverMetrics(self):
|
||||
results = metric.Metric.discover(self._metric_source)
|
||||
logging.debug('_discoverMetrics: {} results discovered'.format(len(results)))
|
||||
for symbol in results:
|
||||
for symbol in list(results):
|
||||
if not self._matches(symbol, self._metricPatterns):
|
||||
results.pop(symbol)
|
||||
logging.debug('_initializeMetrics: {} results matched'.format(len(results)))
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM docker.io/fedora:32
|
||||
FROM docker.io/fedora:33
|
||||
ADD ./install-dependencies.sh ./
|
||||
ADD ./seastar/install-dependencies.sh ./seastar/
|
||||
ADD ./tools/toolchain/system-auth ./
|
||||
@@ -10,4 +10,6 @@ RUN dnf -y install 'dnf-command(copr)' \
|
||||
&& echo 'ALL ALL=(ALL:ALL) NOPASSWD: ALL' >> /etc/sudoers \
|
||||
&& cp system-auth /etc/pam.d \
|
||||
&& echo 'Defaults !requiretty' >> /etc/sudoers
|
||||
RUN mkdir -p /root/.m2/repository
|
||||
ENV JAVA8_HOME=/usr/lib/jvm/java-1.8.0-openjdk
|
||||
CMD /bin/bash
|
||||
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-32-20200910
|
||||
docker.io/scylladb/scylla-toolchain:fedora-33-20201028
|
||||
@@ -309,7 +309,7 @@ cql_server::unadvertise_connection(shared_ptr<connection> conn) {
|
||||
const auto ip = conn->get_client_state().get_client_address().addr();
|
||||
const auto port = conn->get_client_state().get_client_port();
|
||||
clogger.trace("Advertising disconnection of CQL client {}:{}", ip, port);
|
||||
return notify_disconnected_client(ip, client_type::cql, port);
|
||||
return notify_disconnected_client(ip, port, client_type::cql);
|
||||
}
|
||||
|
||||
unsigned
|
||||
@@ -371,6 +371,16 @@ cql_server::connection::read_frame() {
|
||||
_version = current_version;
|
||||
throw exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", client_version));
|
||||
}
|
||||
|
||||
auto client_state_notification_f = std::apply(notify_client_change<changed_column::protocol_version>{},
|
||||
std::tuple_cat(make_client_key(_client_state), std::make_tuple(_version)));
|
||||
|
||||
return client_state_notification_f.then_wrapped([this] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
clogger.info("exception while setting protocol_version in `system.clients`: {}", std::current_exception());
|
||||
}
|
||||
return _read_buf.read_exactly(frame_size() - 1).then([this] (temporary_buffer<char> tail) {
|
||||
temporary_buffer<char> full(frame_size());
|
||||
full.get_write()[0] = _version;
|
||||
@@ -385,6 +395,7 @@ cql_server::connection::read_frame() {
|
||||
}
|
||||
return make_ready_future<ret_type>(frame);
|
||||
});
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// Not the first frame, so we know the size.
|
||||
@@ -579,13 +590,19 @@ future<> cql_server::connection::shutdown()
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
std::tuple<net::inet_address, int, client_type> cql_server::connection::make_client_key(const service::client_state& cli_state) {
|
||||
return std::make_tuple(cli_state.get_client_address().addr(),
|
||||
cli_state.get_client_port(),
|
||||
cli_state.is_thrift() ? client_type::thrift : client_type::cql);
|
||||
}
|
||||
|
||||
client_data cql_server::connection::make_client_data() const {
|
||||
client_data cd;
|
||||
cd.ip = _client_state.get_client_address().addr();
|
||||
cd.port = _client_state.get_client_port();
|
||||
cd.ct = client_type::cql;
|
||||
std::tie(cd.ip, cd.port, cd.ct) = make_client_key(_client_state);
|
||||
cd.shard_id = this_shard_id();
|
||||
cd.protocol_version = _version;
|
||||
cd.driver_name = _client_state.get_driver_name();
|
||||
cd.driver_version = _client_state.get_driver_version();
|
||||
if (const auto user_ptr = _client_state.user(); user_ptr) {
|
||||
cd.username = user_ptr->name;
|
||||
}
|
||||
@@ -751,6 +768,21 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
throw exceptions::protocol_exception(format("Unknown compression algorithm: {}", compression));
|
||||
}
|
||||
}
|
||||
|
||||
auto client_state_notification_f = make_ready_future<>();
|
||||
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
|
||||
_client_state.set_driver_version(driver_ver_opt->second);
|
||||
client_state_notification_f = std::apply(notify_client_change<changed_column::driver_version>{},
|
||||
std::tuple_cat(make_client_key(_client_state), std::make_tuple(driver_ver_opt->second)));
|
||||
}
|
||||
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
|
||||
_client_state.set_driver_name(driver_name_opt->second);
|
||||
client_state_notification_f = client_state_notification_f.then([ck = make_client_key(_client_state), dn = driver_name_opt->second] {
|
||||
return std::apply(notify_client_change<changed_column::driver_name>{},
|
||||
std::tuple_cat(std::move(ck), std::forward_as_tuple(dn)));
|
||||
});
|
||||
}
|
||||
|
||||
cql_protocol_extension_enum_set cql_proto_exts;
|
||||
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
|
||||
if (options.contains(protocol_extension_name(ext))) {
|
||||
@@ -758,11 +790,28 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
}
|
||||
}
|
||||
_client_state.set_protocol_extensions(std::move(cql_proto_exts));
|
||||
auto& a = client_state.get_auth_service()->underlying_authenticator();
|
||||
if (a.require_authentication()) {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), trace_state));
|
||||
std::unique_ptr<cql_server::response> res;
|
||||
if (auto& a = client_state.get_auth_service()->underlying_authenticator(); a.require_authentication()) {
|
||||
res = make_autheticate(stream, a.qualified_java_name(), trace_state);
|
||||
client_state_notification_f = client_state_notification_f.then([ck = make_client_key(_client_state)] {
|
||||
return std::apply(notify_client_change<changed_column::connection_stage>{},
|
||||
std::tuple_cat(std::move(ck), std::make_tuple(connection_stage_literal<client_connection_stage::authenticating>)));
|
||||
});
|
||||
} else {
|
||||
res = make_ready(stream, trace_state);
|
||||
client_state_notification_f = client_state_notification_f.then([ck = make_client_key(_client_state)] {
|
||||
return std::apply(notify_client_change<changed_column::connection_stage>{},
|
||||
std::tuple_cat(std::move(ck), std::make_tuple(connection_stage_literal<client_connection_stage::ready>)));
|
||||
});
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, trace_state));
|
||||
return client_state_notification_f.then_wrapped([res = std::move(res)] (future<> f) mutable {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
clogger.info("exception while setting driver_name/version in `system.clients`: {}", std::current_exception());
|
||||
}
|
||||
return std::move(res);
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
@@ -774,6 +823,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
|
||||
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
|
||||
client_state.set_login(std::move(user));
|
||||
auto f = client_state.check_user_can_login();
|
||||
if (client_state.user()->name) {
|
||||
f = f.then([cli_key = make_client_key(client_state), username = *client_state.user()->name] {
|
||||
return std::apply(notify_client_change<changed_column::username>{},
|
||||
std::tuple_cat(std::move(cli_key), std::forward_as_tuple(username)));
|
||||
});
|
||||
}
|
||||
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
|
||||
});
|
||||
@@ -1109,7 +1164,17 @@ cql_server::connection::process_register(uint16_t stream, request_reader in, ser
|
||||
auto et = parse_event_type(event_type);
|
||||
_server._notifier->register_event(et, this);
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, std::move(trace_state)));
|
||||
auto client_state_notification_f = std::apply(notify_client_change<changed_column::connection_stage>{},
|
||||
std::tuple_cat(make_client_key(_client_state), std::make_tuple(connection_stage_literal<client_connection_stage::ready>)));
|
||||
|
||||
return client_state_notification_f.then_wrapped([res = make_ready(stream, std::move(trace_state))] (future<> f) mutable {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
clogger.info("exception while setting connection_stage in `system.clients`: {}", std::current_exception());
|
||||
}
|
||||
return std::move(res);
|
||||
});
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const
|
||||
|
||||
@@ -48,6 +48,7 @@ class registrations;
|
||||
}
|
||||
|
||||
class database;
|
||||
enum class client_type;
|
||||
struct client_data;
|
||||
|
||||
namespace cql_transport {
|
||||
@@ -189,6 +190,7 @@ private:
|
||||
future<> process();
|
||||
future<> process_request();
|
||||
future<> shutdown();
|
||||
static std::tuple<net::inet_address, int, client_type> make_client_key(const service::client_state& cli_state);
|
||||
client_data make_client_data() const;
|
||||
const service::client_state& get_client_state() const { return _client_state; }
|
||||
private:
|
||||
|
||||
@@ -83,6 +83,7 @@ public:
|
||||
template <typename T>
|
||||
standard_migrator<T>& get_standard_migrator()
|
||||
{
|
||||
seastar::memory::disable_failure_guard dfg;
|
||||
static thread_local standard_migrator<T> instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user