Compare commits
37 Commits
copilot/ad
...
on-hold-br
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
660d68953d | ||
|
|
bff9b459ef | ||
|
|
4c77b86f26 | ||
|
|
fb0afb04ae | ||
|
|
bdb93af423 | ||
|
|
2fffaf36c4 | ||
|
|
dab150f3d8 | ||
|
|
7b6db3b69a | ||
|
|
b096c0d97d | ||
|
|
55e157be4d | ||
|
|
75593a6178 | ||
|
|
bbbc4aafef | ||
|
|
b74411f0ed | ||
|
|
995ffd6ee0 | ||
|
|
88e843c9db | ||
|
|
8cfeb6f509 | ||
|
|
7adc9aa50c | ||
|
|
15d4475870 | ||
|
|
bbe5c323a9 | ||
|
|
82f70a1c19 | ||
|
|
0668dc25df | ||
|
|
148655dc21 | ||
|
|
c89e5f06ba | ||
|
|
b38d169f55 | ||
|
|
5ac40ed1a8 | ||
|
|
37e6e65211 | ||
|
|
994645c03b | ||
|
|
51ed9a0ec0 | ||
|
|
fa689c811e | ||
|
|
18774b90a7 | ||
|
|
b20a85d651 | ||
|
|
1f0f3a4464 | ||
|
|
698ac3ac4e | ||
|
|
f975b7890e | ||
|
|
5da2489e0e | ||
|
|
d9961fc6a2 | ||
|
|
3c3621db07 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -72,7 +72,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.3.0-dev
|
||||
VERSION=5.3.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -88,17 +88,20 @@ json::json_return_type make_streamed(rjson::value&& value) {
|
||||
// move objects to coroutine frame.
|
||||
auto los = std::move(os);
|
||||
auto lrs = std::move(rs);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await rjson::print(*lrs, los);
|
||||
co_await los.flush();
|
||||
co_await los.close();
|
||||
} catch (...) {
|
||||
// at this point, we cannot really do anything. HTTP headers and return code are
|
||||
// already written, and quite potentially a portion of the content data.
|
||||
// just log + rethrow. It is probably better the HTTP server closes connection
|
||||
// abruptly or something...
|
||||
elogger.error("Unhandled exception in data streaming: {}", std::current_exception());
|
||||
throw;
|
||||
ex = std::current_exception();
|
||||
elogger.error("Exception during streaming HTTP response: {}", ex);
|
||||
}
|
||||
co_await los.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
co_return;
|
||||
};
|
||||
@@ -2368,21 +2371,22 @@ std::optional<rjson::value> executor::describe_single_item(schema_ptr schema,
|
||||
return item;
|
||||
}
|
||||
|
||||
std::vector<rjson::value> executor::describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice& slice,
|
||||
const cql3::selection::selection& selection,
|
||||
const query::result& query_result,
|
||||
const std::optional<attrs_to_get>& attrs_to_get) {
|
||||
cql3::selection::result_set_builder builder(selection, gc_clock::now());
|
||||
query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection));
|
||||
future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice&& slice,
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get) {
|
||||
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
|
||||
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
|
||||
auto result_set = builder.build();
|
||||
std::vector<rjson::value> ret;
|
||||
for (auto& result_row : result_set->rows()) {
|
||||
rjson::value item = rjson::empty_object();
|
||||
describe_single_item(selection, result_row, attrs_to_get, item);
|
||||
describe_single_item(*selection, result_row, *attrs_to_get, item);
|
||||
ret.push_back(std::move(item));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
return ret;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
static bool check_needs_read_before_write(const parsed::value& v) {
|
||||
@@ -3254,8 +3258,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
|
||||
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
|
||||
std::vector<rjson::value> jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get);
|
||||
return make_ready_future<std::vector<rjson::value>>(std::move(jsons));
|
||||
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get));
|
||||
});
|
||||
response_futures.push_back(std::move(f));
|
||||
}
|
||||
|
||||
@@ -234,11 +234,11 @@ public:
|
||||
const query::result&,
|
||||
const std::optional<attrs_to_get>&);
|
||||
|
||||
static std::vector<rjson::value> describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice& slice,
|
||||
const cql3::selection::selection& selection,
|
||||
const query::result& query_result,
|
||||
const std::optional<attrs_to_get>& attrs_to_get);
|
||||
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice&& slice,
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get);
|
||||
|
||||
static void describe_single_item(const cql3::selection::selection&,
|
||||
const std::vector<managed_bytes_opt>&,
|
||||
|
||||
@@ -55,6 +55,7 @@ future<bool> default_role_row_satisfies(
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_query_state(),
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
cql3::query_processor::cache_internal::yes).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
|
||||
@@ -1524,14 +1524,18 @@ protected:
|
||||
co_return std::nullopt;
|
||||
}
|
||||
private:
|
||||
// Releases reference to cleaned files such that respective used disk space can be freed.
|
||||
void release_exhausted(std::vector<sstables::shared_sstable> exhausted_sstables) {
|
||||
_compacting.release_compacting(exhausted_sstables);
|
||||
}
|
||||
|
||||
future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
|
||||
co_await coroutine::switch_to(_cm.compaction_sg().cpu);
|
||||
|
||||
// Releases reference to cleaned files such that respective used disk space can be freed.
|
||||
auto release_exhausted = [this, &descriptor] (std::vector<sstables::shared_sstable> exhausted_sstables) mutable {
|
||||
auto exhausted = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(exhausted_sstables);
|
||||
std::erase_if(descriptor.sstables, [&] (const sstables::shared_sstable& sst) {
|
||||
return exhausted.contains(sst);
|
||||
});
|
||||
_compacting.release_compacting(exhausted_sstables);
|
||||
};
|
||||
|
||||
for (;;) {
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm.available_memory()));
|
||||
_cm.register_backlog_tracker(user_initiated);
|
||||
@@ -1539,8 +1543,7 @@ private:
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
setup_new_compaction(descriptor.run_identifier);
|
||||
co_await compact_sstables_and_update_history(descriptor, _compaction_data,
|
||||
std::bind(&cleanup_sstables_compaction_task_executor::release_exhausted, this, std::placeholders::_1));
|
||||
co_await compact_sstables_and_update_history(descriptor, _compaction_data, release_exhausted);
|
||||
finish_compaction();
|
||||
_cm.reevaluate_postponed_compactions();
|
||||
co_return; // done with current job
|
||||
@@ -1587,6 +1590,10 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
|
||||
bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) {
|
||||
auto& cs = get_compaction_state(&t);
|
||||
if (sst->is_shared()) {
|
||||
throw std::runtime_error(format("Shared SSTable {} cannot be marked as requiring cleanup, as it can only be processed by resharding",
|
||||
sst->get_filename()));
|
||||
}
|
||||
if (needs_cleanup(sst, sorted_owned_ranges)) {
|
||||
cs.sstables_requiring_cleanup.insert(sst);
|
||||
return true;
|
||||
|
||||
@@ -307,6 +307,8 @@ public:
|
||||
private:
|
||||
future<> try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t);
|
||||
|
||||
// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
|
||||
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
||||
public:
|
||||
// Submit a table to be upgraded and wait for its termination.
|
||||
future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version);
|
||||
@@ -407,9 +409,6 @@ public:
|
||||
return _tombstone_gc_state;
|
||||
};
|
||||
|
||||
// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
|
||||
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
||||
|
||||
// Uncoditionally erase sst from `sstables_requiring_cleanup`
|
||||
// Returns true iff sst was found and erased.
|
||||
bool erase_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst);
|
||||
|
||||
@@ -120,20 +120,13 @@ cql3::statements::create_keyspace_statement::prepare(data_dictionary::database d
|
||||
return std::make_unique<prepared_statement>(make_shared<create_keyspace_statement>(*this));
|
||||
}
|
||||
|
||||
future<> cql3::statements::create_keyspace_statement::grant_permissions_to_creator(query_processor& qp, const service::client_state& cs) const {
|
||||
return do_with(auth::make_data_resource(keyspace()), auth::make_functions_resource(keyspace()), [&cs](const auth::resource& r, const auth::resource& fr) {
|
||||
future<> cql3::statements::create_keyspace_statement::grant_permissions_to_creator(const service::client_state& cs) const {
|
||||
return do_with(auth::make_data_resource(keyspace()), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
*cs.user(),
|
||||
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
}).then([&cs, &fr] {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
*cs.user(),
|
||||
fr).handle_exception_type([](const auth::unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual future<> grant_permissions_to_creator(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
|
||||
@@ -39,7 +39,7 @@ public:
|
||||
|
||||
std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
future<> grant_permissions_to_creator(query_processor& qp, const service::client_state&) const;
|
||||
future<> grant_permissions_to_creator(const service::client_state&) const;
|
||||
|
||||
void validate(query_processor&, const service::client_state&) const override;
|
||||
|
||||
|
||||
@@ -146,7 +146,7 @@ create_table_statement::prepare(data_dictionary::database db, cql_stats& stats)
|
||||
abort();
|
||||
}
|
||||
|
||||
future<> create_table_statement::grant_permissions_to_creator(query_processor& qp, const service::client_state& cs) const {
|
||||
future<> create_table_statement::grant_permissions_to_creator(const service::client_state& cs) const {
|
||||
return do_with(auth::make_data_resource(keyspace(), column_family()), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
|
||||
@@ -77,7 +77,7 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual future<> grant_permissions_to_creator(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
|
||||
@@ -22,29 +22,13 @@ namespace statements {
|
||||
future<> function_statement::check_access(query_processor& qp, const service::client_state& state) const { return make_ready_future<>(); }
|
||||
|
||||
future<> create_function_statement_base::check_access(query_processor& qp, const service::client_state& state) const {
|
||||
create_arg_types(qp);
|
||||
if (!functions::functions::find(_name, _arg_types)) {
|
||||
co_await state.has_functions_access(qp.db(), _name.keyspace, auth::permission::CREATE);
|
||||
} else if (_or_replace) {
|
||||
_altering = true;
|
||||
co_await state.has_function_access(qp.db(), _name.keyspace, auth::encode_signature(_name.name, _arg_types), auth::permission::ALTER);
|
||||
}
|
||||
}
|
||||
co_await state.has_functions_access(qp.db(), _name.keyspace, auth::permission::CREATE);
|
||||
if (_or_replace) {
|
||||
create_arg_types(qp);
|
||||
sstring encoded_signature = auth::encode_signature(_name.name, _arg_types);
|
||||
|
||||
future<> create_function_statement_base::grant_permissions_to_creator(query_processor& qp, const service::client_state& cs) const {
|
||||
if (_altering) {
|
||||
return make_ready_future<>();
|
||||
co_await state.has_function_access(qp.db(), _name.keyspace, encoded_signature, auth::permission::ALTER);
|
||||
}
|
||||
std::string_view keyspace = _name.has_keyspace() ? _name.keyspace : cs.get_keyspace();
|
||||
|
||||
return do_with(auth::make_functions_resource(keyspace, auth::encode_signature(_name.name, _arg_types)), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
*cs.user(),
|
||||
r).handle_exception_type([](const auth::unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> drop_function_statement_base::check_access(query_processor& qp, const service::client_state& state) const
|
||||
|
||||
@@ -43,11 +43,9 @@ protected:
|
||||
virtual void validate(query_processor& qp, const service::client_state& state) const override;
|
||||
virtual seastar::future<shared_ptr<db::functions::function>> create(query_processor& qp, db::functions::function* old) const = 0;
|
||||
virtual seastar::future<shared_ptr<db::functions::function>> validate_while_executing(query_processor&) const override;
|
||||
virtual seastar::future<> grant_permissions_to_creator(query_processor& qp, const service::client_state& cs) const override;
|
||||
|
||||
bool _or_replace;
|
||||
bool _if_not_exists;
|
||||
mutable bool _altering = false;
|
||||
|
||||
create_function_statement_base(functions::function_name name, std::vector<shared_ptr<cql3_type::raw>> raw_arg_types,
|
||||
bool or_replace, bool if_not_exists);
|
||||
|
||||
@@ -64,7 +64,7 @@ std::unique_ptr<prepared_statement> create_role_statement::prepare(
|
||||
return std::make_unique<prepared_statement>(::make_shared<create_role_statement>(*this));
|
||||
}
|
||||
|
||||
future<> create_role_statement::grant_permissions_to_creator(query_processor& qp, const service::client_state& cs) const {
|
||||
future<> create_role_statement::grant_permissions_to_creator(const service::client_state& cs) const {
|
||||
return do_with(auth::make_role_resource(_role), [&cs](const auth::resource& r) {
|
||||
return auth::grant_applicable_permissions(
|
||||
*cs.get_auth_service(),
|
||||
@@ -94,7 +94,7 @@ future<> create_role_statement::check_access(query_processor& qp, const service:
|
||||
}
|
||||
|
||||
future<result_message_ptr>
|
||||
create_role_statement::execute(query_processor& qp,
|
||||
create_role_statement::execute(query_processor&,
|
||||
service::query_state& state,
|
||||
const query_options&) const {
|
||||
auth::role_config config;
|
||||
@@ -104,12 +104,12 @@ create_role_statement::execute(query_processor& qp,
|
||||
return do_with(
|
||||
std::move(config),
|
||||
extract_authentication_options(_options),
|
||||
[this, &qp, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
|
||||
[this, &state](const auth::role_config& config, const auth::authentication_options& authen_options) {
|
||||
const auto& cs = state.get_client_state();
|
||||
auto& as = *cs.get_auth_service();
|
||||
|
||||
return auth::create_role(as, _role, config, authen_options).then([this, &qp, &cs] {
|
||||
return grant_permissions_to_creator(qp, cs);
|
||||
return auth::create_role(as, _role, config, authen_options).then([this, &cs] {
|
||||
return grant_permissions_to_creator(cs);
|
||||
}).then([] {
|
||||
return void_result_message();
|
||||
}).handle_exception_type([this](const auth::role_already_exists& e) {
|
||||
|
||||
@@ -38,7 +38,7 @@ schema_altering_statement::schema_altering_statement(cf_name name, timeout_confi
|
||||
{
|
||||
}
|
||||
|
||||
future<> schema_altering_statement::grant_permissions_to_creator(query_processor& qp, const service::client_state&) const {
|
||||
future<> schema_altering_statement::grant_permissions_to_creator(const service::client_state&) const {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -119,10 +119,10 @@ schema_altering_statement::execute(query_processor& qp, service::query_state& st
|
||||
}
|
||||
}
|
||||
|
||||
return execute0(qp, state, options).then([this, &qp, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
return execute0(qp, state, options).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
auto permissions_granted_fut = internal
|
||||
? make_ready_future<>()
|
||||
: grant_permissions_to_creator(qp, state.get_client_state());
|
||||
: grant_permissions_to_creator(state.get_client_state());
|
||||
return permissions_granted_fut.then([result = std::move(result)] {
|
||||
return result;
|
||||
});
|
||||
|
||||
@@ -48,7 +48,7 @@ protected:
|
||||
*
|
||||
* By default, this function does nothing.
|
||||
*/
|
||||
virtual future<> grant_permissions_to_creator(query_processor& qp, const service::client_state&) const;
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const;
|
||||
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
|
||||
@@ -216,7 +216,7 @@ keyspace_metadata::keyspace_metadata(std::string_view name,
|
||||
std::move(strategy_options),
|
||||
durable_writes,
|
||||
std::move(cf_defs),
|
||||
user_types_metadata{},
|
||||
std::move(user_types),
|
||||
storage_options{}) { }
|
||||
|
||||
keyspace_metadata::keyspace_metadata(std::string_view name,
|
||||
|
||||
@@ -2569,9 +2569,6 @@ future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_d
|
||||
});
|
||||
}
|
||||
|
||||
const size_t view_updating_consumer::buffer_size_soft_limit{1 * 1024 * 1024};
|
||||
const size_t view_updating_consumer::buffer_size_hard_limit{2 * 1024 * 1024};
|
||||
|
||||
void view_updating_consumer::do_flush_buffer() {
|
||||
_staging_reader_handle.pause();
|
||||
|
||||
@@ -2594,6 +2591,10 @@ void view_updating_consumer::do_flush_buffer() {
|
||||
}
|
||||
|
||||
void view_updating_consumer::flush_builder() {
|
||||
_buffer.emplace_back(_mut_builder->flush());
|
||||
}
|
||||
|
||||
void view_updating_consumer::end_builder() {
|
||||
_mut_builder->consume_end_of_partition();
|
||||
if (auto mut_opt = _mut_builder->consume_end_of_stream()) {
|
||||
_buffer.emplace_back(std::move(*mut_opt));
|
||||
@@ -2602,11 +2603,9 @@ void view_updating_consumer::flush_builder() {
|
||||
}
|
||||
|
||||
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
|
||||
if (_buffer_size >= buffer_size_hard_limit) {
|
||||
if (_buffer_size >= _buffer_size_hard_limit) {
|
||||
flush_builder();
|
||||
auto dk = _buffer.back().decorated_key();
|
||||
do_flush_buffer();
|
||||
consume_new_partition(dk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,8 +35,17 @@ public:
|
||||
// We prefer flushing on partition boundaries, so at the end of a partition,
|
||||
// we flush on reaching the soft limit. Otherwise we continue accumulating
|
||||
// data. We flush mid-partition if we reach the hard limit.
|
||||
static const size_t buffer_size_soft_limit;
|
||||
static const size_t buffer_size_hard_limit;
|
||||
static constexpr size_t buffer_size_soft_limit_default = 1 * 1024 * 1024;
|
||||
static constexpr size_t buffer_size_hard_limit_default = 2 * 1024 * 1024;
|
||||
private:
|
||||
size_t _buffer_size_soft_limit = buffer_size_soft_limit_default;
|
||||
size_t _buffer_size_hard_limit = buffer_size_hard_limit_default;
|
||||
public:
|
||||
// Meant only for usage in tests.
|
||||
void set_buffer_size_limit_for_testing_purposes(size_t sz) {
|
||||
_buffer_size_soft_limit = sz;
|
||||
_buffer_size_hard_limit = sz;
|
||||
}
|
||||
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
@@ -51,6 +60,7 @@ private:
|
||||
private:
|
||||
void do_flush_buffer();
|
||||
void flush_builder();
|
||||
void end_builder();
|
||||
void maybe_flush_buffer_mid_partition();
|
||||
|
||||
public:
|
||||
@@ -115,8 +125,8 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
flush_builder();
|
||||
if (_buffer_size >= buffer_size_soft_limit) {
|
||||
end_builder();
|
||||
if (_buffer_size >= _buffer_size_soft_limit) {
|
||||
do_flush_buffer();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
|
||||
2
dist/common/scripts/scylla_fstrim_setup
vendored
2
dist/common/scripts/scylla_fstrim_setup
vendored
@@ -16,7 +16,7 @@ if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
print('Requires root permission.')
|
||||
sys.exit(1)
|
||||
systemd_unit('scylla-fstrim.timer').unmask()
|
||||
systemd_unit('scylla-fstrim.timer').enable()
|
||||
systemd_unit('scylla-fstrim.timer').start()
|
||||
if is_redhat_variant() or is_arch() or is_suse_variant():
|
||||
systemd_unit('fstrim.timer').disable()
|
||||
|
||||
@@ -107,6 +107,11 @@ For example:
|
||||
CREATE KEYSPACE Excelsior
|
||||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};
|
||||
|
||||
.. TODO Add a link to the description of minimum_keyspace_rf when the ScyllaDB options section is added to the docs.
|
||||
|
||||
You can configure the minimum acceptable replication factor using the ``minimum_keyspace_rf`` option.
|
||||
Attempting to create a keyspace with a replication factor lower than the value set with
|
||||
``minimum_keyspace_rf`` will return an error (the default value is 0).
|
||||
|
||||
The supported ``options`` are:
|
||||
|
||||
|
||||
@@ -592,7 +592,7 @@ of eventual consistency on an event of a timestamp collision:
|
||||
|
||||
``INSERT`` statements happening concurrently at different cluster
|
||||
nodes proceed without coordination. Eventually cell values
|
||||
supplied by a statement with the highest timestamp will prevail.
|
||||
supplied by a statement with the highest timestamp will prevail (see :ref:`update ordering <update-ordering>`).
|
||||
|
||||
Unless a timestamp is provided by the client, Scylla will automatically
|
||||
generate a timestamp with microsecond precision for each
|
||||
@@ -601,7 +601,7 @@ by the same node are unique. Timestamps assigned at different
|
||||
nodes are not guaranteed to be globally unique.
|
||||
With a steadily high write rate timestamp collision
|
||||
is not unlikely. If it happens, i.e. two ``INSERTS`` have the same
|
||||
timestamp, the lexicographically bigger value prevails:
|
||||
timestamp, a conflict resolution algorithm determines which of the inserted cells prevails (see :ref:`update ordering <update-ordering>`).
|
||||
|
||||
Please refer to the :ref:`UPDATE <update-parameters>` section for more information on the :token:`update_parameter`.
|
||||
|
||||
@@ -709,8 +709,8 @@ Similarly to ``INSERT``, ``UPDATE`` statement happening concurrently at differen
|
||||
cluster nodes proceed without coordination. Cell values
|
||||
supplied by a statement with the highest timestamp will prevail.
|
||||
If two ``UPDATE`` statements or ``UPDATE`` and ``INSERT``
|
||||
statements have the same timestamp,
|
||||
lexicographically bigger value prevails.
|
||||
statements have the same timestamp, a conflict resolution algorithm determines which cells prevails
|
||||
(see :ref:`update ordering <update-ordering>`).
|
||||
|
||||
Regarding the :token:`assignment`:
|
||||
|
||||
@@ -749,7 +749,7 @@ parameters:
|
||||
Scylla ensures that query timestamps created by the same coordinator node are unique (even across different shards
|
||||
on the same node). However, timestamps assigned at different nodes are not guaranteed to be globally unique.
|
||||
Note that with a steadily high write rate, timestamp collision is not unlikely. If it happens, e.g. two INSERTS
|
||||
have the same timestamp, conflicting cell values are compared and the cells with the lexicographically bigger value prevail.
|
||||
have the same timestamp, a conflict resolution algorithm determines which of the inserted cells prevails (see :ref:`update ordering <update-ordering>` for more information):
|
||||
- ``TTL``: specifies an optional Time To Live (in seconds) for the inserted values. If set, the inserted values are
|
||||
automatically removed from the database after the specified time. Note that the TTL concerns the inserted values, not
|
||||
the columns themselves. This means that any subsequent update of the column will also reset the TTL (to whatever TTL
|
||||
@@ -759,6 +759,55 @@ parameters:
|
||||
- ``TIMEOUT``: specifies a timeout duration for the specific request.
|
||||
Please refer to the :ref:`SELECT <using-timeout>` section for more information.
|
||||
|
||||
.. _update-ordering:
|
||||
|
||||
Update ordering
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
:ref:`INSERT <insert-statement>`, :ref:`UPDATE <update-statement>`, and :ref:`DELETE <delete_statement>`
|
||||
operations are ordered by their ``TIMESTAMP``.
|
||||
|
||||
Ordering of such changes is done at the cell level, where each cell carries a write ``TIMESTAMP``,
|
||||
other attributes related to its expiration when it has a non-zero time-to-live (``TTL``),
|
||||
and the cell value.
|
||||
|
||||
The fundamental rule for ordering cells that insert, update, or delete data in a given row and column
|
||||
is that the cell with the highest timestamp wins.
|
||||
|
||||
However, it is possible that multiple such cells will carry the same ``TIMESTAMP``.
|
||||
There could be several reasons for ``TIMESTAMP`` collision:
|
||||
|
||||
* Benign collision can be caused by "replay" of a mutation, e.g., due to client retry, or due to internal processes.
|
||||
In such cases, the cells are equivalent, and any of them can be selected arbitrarily.
|
||||
* ``TIMESTAMP`` collisions might be normally caused by parallel queries that are served
|
||||
by different coordinator nodes. The coordinators might calculate the same write ``TIMESTAMP``
|
||||
based on their local time in microseconds.
|
||||
* Collisions might also happen with user-provided timestamps if the application does not guarantee
|
||||
unique timestamps with the ``USING TIMESTAMP`` parameter (see :ref:`Update parameters <update-parameters>` for more information).
|
||||
|
||||
As said above, in the replay case, ordering of cells should not matter, as they carry the same value
|
||||
and same expiration attributes, so picking any of them will reach the same result.
|
||||
However, other ``TIMESTAMP`` conflicts must be resolved in a consistent way by all nodes.
|
||||
Otherwise, if nodes would have picked an arbitrary cell in case of a conflict and they would
|
||||
reach different results, reading from different replicas would detect the inconsistency and trigger
|
||||
read-repair that will generate yet another cell that would still conflict with the existing cells,
|
||||
with no guarantee for convergence.
|
||||
|
||||
Therefore, Scylla implements an internal, consistent conflict-resolution algorithm
|
||||
that orders cells with conflicting ``TIMESTAMP`` values based on other properties, like:
|
||||
|
||||
* whether the cell is a tombstone or a live cell,
|
||||
* whether the cell has an expiration time,
|
||||
* the cell ``TTL``,
|
||||
* and finally, what value the cell carries.
|
||||
|
||||
The conflict-resolution algorithm is documented in `Scylla's internal documentation <https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md>`_
|
||||
and it may be subject to change.
|
||||
|
||||
Reliable serialization can be achieved using unique write ``TIMESTAMP``
|
||||
and by using :doc:`Lightweight Transactions (LWT) </using-scylla/lwt>` to ensure atomicity of
|
||||
:ref:`INSERT <insert-statement>`, :ref:`UPDATE <update-statement>`, and :ref:`DELETE <delete_statement>`.
|
||||
|
||||
.. _delete_statement:
|
||||
|
||||
DELETE
|
||||
@@ -798,7 +847,7 @@ For more information on the :token:`update_parameter` refer to the :ref:`UPDATE
|
||||
In a ``DELETE`` statement, all deletions within the same partition key are applied atomically,
|
||||
meaning either all columns mentioned in the statement are deleted or none.
|
||||
If ``DELETE`` statement has the same timestamp as ``INSERT`` or
|
||||
``UPDATE`` of the same primary key, delete operation prevails.
|
||||
``UPDATE`` of the same primary key, delete operation prevails (see :ref:`update ordering <update-ordering>`).
|
||||
|
||||
A ``DELETE`` operation can be conditional through the use of an ``IF`` clause, similar to ``UPDATE`` and ``INSERT``
|
||||
statements. Each such ``DELETE`` gets a globally unique timestamp.
|
||||
|
||||
37
docs/dev/timestamp-conflict-resolution.md
Normal file
37
docs/dev/timestamp-conflict-resolution.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Timestamp conflict resolution
|
||||
|
||||
The fundamental rule for ordering cells that insert, update, or delete data in a given row and column
|
||||
is that the cell with the highest timestamp wins.
|
||||
|
||||
However, it is possible that multiple such cells will carry the same `TIMESTAMP`.
|
||||
In this case, conflicts must be resolved in a consistent way by all nodes.
|
||||
Otherwise, if nodes would have picked an arbitrary cell in case of a conflict and they would
|
||||
reach different results, reading from different replicas would detect the inconsistency and trigger
|
||||
read-repair that will generate yet another cell that would still conflict with the existing cells,
|
||||
with no guarantee for convergence.
|
||||
|
||||
The first tie-breaking rule when two cells have the same write timestamp is that
|
||||
dead cells win over live cells; and if both cells are deleted, the one with the later deletion time prevails.
|
||||
|
||||
If both cells are alive, their expiration time is examined.
|
||||
Cells that are written with a non-zero TTL (either implicit, as determined by
|
||||
the table's default TTL, or explicit, `USING TTL`) are due to expire
|
||||
TTL seconds after the time they were written (as determined by the coordinator,
|
||||
and rounded to 1 second resolution). That time is the cell's expiration time.
|
||||
When cells expire, they become tombstones, shadowing any data written with a write timestamp
|
||||
less than or equal to the timestamp of the expiring cell.
|
||||
Therefore, cells that have an expiration time win over cells with no expiration time.
|
||||
|
||||
If both cells have an expiration time, the one with the latest expiration time wins;
|
||||
and if they have the same expiration time (in whole second resolution),
|
||||
their write time is derived from the expiration time less the original time-to-live value
|
||||
and the one that was written at a later time prevails.
|
||||
|
||||
Finally, if both cells are live and have no expiration, or have the same expiration time and time-to-live,
|
||||
the cell with the lexicographically bigger value prevails.
|
||||
|
||||
Note that when multiple columns are INSERTed or UPDATEed using the same timestamp,
|
||||
SELECTing those columns might return a result that mixes cells from either upsert.
|
||||
This may happen when both upserts have no expiration time, or both their expiration time and TTL are the
|
||||
same, respectively (in whole second resolution). In such a case, cell selection would be based on the cell values
|
||||
in each column, independently of each other.
|
||||
@@ -27,38 +27,34 @@ ScyllaDB Open Source
|
||||
|
||||
The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
|
||||
+----------------------------+----------------------------------+-----------------------------+---------+-------+
|
||||
| Linux Distributions | Ubuntu | Debian | CentOS /| Rocky/|
|
||||
| | | | RHEL | RHEL |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| ScyllaDB Version / Version | 14.04| 16.04| 18.04|20.04 |22.04 | 8 | 9 | 10 | 11 | 7 | 8 |
|
||||
+============================+======+======+======+======+======+======+======+=======+=======+=========+=======+
|
||||
| 5.2 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 5.1 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 5.0 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.6 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.5 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.4 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.3 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.2 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.1 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 4.0 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |x| | |x| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 3.x | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |x| | |x| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 2.3 | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |x| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 2.2 | |v| | |v| | |x| | |x| | |x| | |v| | |x| | |x| | |x| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
+----------------------------+----------------------------------+-----------------------------+---------+---------------+
|
||||
| Linux Distributions | Ubuntu | Debian | CentOS /| Rocky / |
|
||||
| | | | RHEL | RHEL |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| ScyllaDB Version / Version | 14.04| 16.04| 18.04|20.04 |22.04 | 8 | 9 | 10 | 11 | 7 | 8 | 9 |
|
||||
+============================+======+======+======+======+======+======+======+=======+=======+=========+=======+=======+
|
||||
| 5.3 | |x| | |x| | |x| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 5.2 | |x| | |x| | |x| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 5.1 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 5.0 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.6 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.5 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.4 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.3 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.2 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |v| | |x| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.1 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |v| | |x| | |v| | |v| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
| 4.0 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |x| | |x| | |v| | |x| | |x| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+-------+
|
||||
|
||||
|
||||
All releases are available as a Docker container, EC2 AMI, and a GCP image (GCP image from version 4.3). Since
|
||||
@@ -80,13 +76,13 @@ ScyllaDB Enterprise
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
| ScyllaDB Version / Version | 14.04| 16.04| 18.04| 20.04| 22.04 | 8 | 9 | 10 | 11 | 7 | 8 |
|
||||
+============================+======+======+======+======+=======+======+======+======+======+========+=======+
|
||||
| 2023.1 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
| 2023.1 | |x| | |x| | |x| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
| 2022.2 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
| 2022.1 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
| 2021.1 | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
| 2021.1 | |x| | |v| | |v| | |v| | |v| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
| 2020.1 | |x| | |v| | |v| | |x| | |x| | |x| | |v| | |v| | |x| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
|
||||
174
docs/index.rst
174
docs/index.rst
@@ -1,152 +1,53 @@
|
||||
:full-width:
|
||||
:hide-version-warning:
|
||||
:hide-pre-content:
|
||||
:hide-post-content:
|
||||
:hide-sidebar:
|
||||
:hide-secondary-sidebar:
|
||||
:landing:
|
||||
:orphan:
|
||||
|
||||
.. title:: Welcome to ScyllaDB Documentation
|
||||
====================================
|
||||
ScyllaDB Open Source Documentation
|
||||
====================================
|
||||
|
||||
.. hero-box::
|
||||
:title: Welcome to ScyllaDB Documentation
|
||||
:image: /_static/img/mascots/scylla-docs.svg
|
||||
:search_box:
|
||||
.. meta::
|
||||
:title: ScyllaDB Open Source Documentation
|
||||
:description: ScyllaDB Open Source Documentation
|
||||
:keywords: ScyllaDB Open Source, Scylla Open Source, Scylla docs, ScyllaDB documentation, Scylla Documentation
|
||||
|
||||
New to ScyllaDB? Start `here <https://cloud.docs.scylladb.com/stable/scylladb-basics/>`_!
|
||||
About This User Guide
|
||||
-----------------------
|
||||
|
||||
.. raw:: html
|
||||
ScyllaDB is a distributed NoSQL wide-column database for data-intensive apps that require
|
||||
high performance and low latency.
|
||||
|
||||
<div class="landing__content landing__content">
|
||||
This user guide covers topics related to ScyllaDB Open Source - an open-source project that allows you to evaluate
|
||||
experimental features, review the `source code <https://github.com/scylladb/scylladb>`_, and add your contributions
|
||||
to the project.
|
||||
|
||||
.. raw:: html
|
||||
For topics related to other ScyllaDB flavors, see the documentation for `ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/>`_ and
|
||||
`ScyllaDB Cloud <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
<div class="topics-grid topics-grid--scrollable grid-container full">
|
||||
Documentation Highlights
|
||||
--------------------------
|
||||
|
||||
<div class="grid-x grid-margin-x hs">
|
||||
* :doc:`Install ScyllaDB Open Source </getting-started/install-scylla/index>`
|
||||
* :doc:`Configure ScyllaDB Open Source </getting-started/system-configuration/>`
|
||||
* :doc:`Cluster Management Procedures </operating-scylla/procedures/cluster-management/index>`
|
||||
* :doc:`Upgrade ScyllaDB Open Source </upgrade/index>`
|
||||
* :doc:`CQL Reference </cql/index>`
|
||||
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Cloud
|
||||
:link: https://cloud.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-cloud.svg
|
||||
:class: large-4 cloud-card
|
||||
:anchor: ScyllaDB Cloud Documentation
|
||||
ScyllaDB Community
|
||||
--------------------------
|
||||
|
||||
Simplify application development with ScyllaDB Cloud - a fully managed database-as-a-service.
|
||||
Join the ScyllaDB Open Source community:
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Enterprise
|
||||
:link: https://enterprise.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-enterprise.svg
|
||||
:class: large-4 enterprise-card
|
||||
:anchor: ScyllaDB Enterprise Documentation
|
||||
* Contribute to the ScyllaDB Open Source `project <https://github.com/scylladb/scylladb>`_.
|
||||
* Join the `ScyllaDB Community Forum <https://forum.scylladb.com/>`_.
|
||||
* Join our `Slack Channel <https://slack.scylladb.com/>`_.
|
||||
* Sign up for the `scylladb-users <https://groups.google.com/d/forum/scylladb-users>`_ Google group.
|
||||
|
||||
Deploy and manage ScyllaDB's most stable enterprise-grade database with premium features and 24/7 support.
|
||||
Learn How to Use ScyllaDB
|
||||
---------------------------
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Open Source
|
||||
:link: https://docs.scylladb.com/stable/getting-started/
|
||||
:image: /_static/img/mascots/scylla-opensource.svg
|
||||
:class: large-4 opensource-card
|
||||
:anchor: ScyllaDB Open Source Documentation
|
||||
|
||||
Deploy and manage your database in your environment.
|
||||
|
||||
|
||||
.. raw:: html
|
||||
|
||||
</div></div>
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<div class="topics-grid topics-grid--products">
|
||||
|
||||
<h2 class="topics-grid__title">Other Products</h2>
|
||||
<p class="topics-grid__text"></p>
|
||||
|
||||
<div class="grid-container full">
|
||||
<div class="grid-x grid-margin-x">
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Alternator
|
||||
:link: https://docs.scylladb.com/stable/alternator/alternator.html
|
||||
:image: /_static/img/mascots/scylla-alternator.svg
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Open source Amazon DynamoDB-compatible API.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Monitoring Stack
|
||||
:link: https://monitoring.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-monitor.svg
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Complete open source monitoring solution for your ScyllaDB clusters.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Manager
|
||||
:link: https://manager.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-manager.svg
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Hassle-free ScyllaDB NoSQL database management for scale-out clusters.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Drivers
|
||||
:link: https://docs.scylladb.com/stable/using-scylla/drivers/
|
||||
:image: /_static/img/mascots/scylla-drivers.svg
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Shard-aware drivers for superior performance.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Operator
|
||||
:link: https://operator.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-enterprise.svg
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Easily run and manage your ScyllaDB cluster on Kubernetes.
|
||||
|
||||
.. raw:: html
|
||||
|
||||
</div></div></div>
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<div class="topics-grid">
|
||||
|
||||
<h2 class="topics-grid__title">Learn More About ScyllaDB</h2>
|
||||
<p class="topics-grid__text"></p>
|
||||
<div class="grid-container full">
|
||||
<div class="grid-x grid-margin-x">
|
||||
|
||||
.. topic-box::
|
||||
:title: Attend ScyllaDB University
|
||||
:link: https://university.scylladb.com/
|
||||
:image: /_static/img/mascots/scylla-university.png
|
||||
:class: large-6,small-12
|
||||
:anchor: Find a Class
|
||||
|
||||
| Register to take a *free* class at ScyllaDB University.
|
||||
| There are several learning paths to choose from.
|
||||
|
||||
.. topic-box::
|
||||
:title: Register for a Webinar
|
||||
:link: https://www.scylladb.com/resources/webinars/
|
||||
:image: /_static/img/mascots/scylla-with-computer-2.png
|
||||
:class: large-6,small-12
|
||||
:anchor: Find a Webinar
|
||||
|
||||
| You can either participate in a live webinar or see a recording on demand.
|
||||
| There are several webinars to choose from.
|
||||
|
||||
.. raw:: html
|
||||
|
||||
</div></div></div>
|
||||
|
||||
.. raw:: html
|
||||
|
||||
</div>
|
||||
You can learn to use ScyllaDB by taking **free courses** at `ScyllaDB University <https://university.scylladb.com/>`_.
|
||||
In addition, you can read our `blog <https://www.scylladb.com/blog/>`_ and attend ScyllaDB's
|
||||
`webinars, workshops, and conferences <https://www.scylladb.com/company/events/>`_.
|
||||
|
||||
.. toctree::
|
||||
:hidden:
|
||||
@@ -159,7 +60,6 @@
|
||||
troubleshooting/index
|
||||
kb/index
|
||||
reference/index
|
||||
ScyllaDB University <https://university.scylladb.com/>
|
||||
faq
|
||||
Contribute to ScyllaDB <contribute>
|
||||
alternator/alternator
|
||||
|
||||
@@ -179,6 +179,19 @@ Restore the configuration files
|
||||
for conf in $(cat /var/lib/dpkg/info/scylla-*server.conffiles /var/lib/dpkg/info/scylla-*conf.conffiles /var/lib/dpkg/info/scylla-*jmx.conffiles | grep -v init ) /etc/systemd/system/{var-lib-scylla,var-lib-systemd-coredump}.mount; do sudo cp -v $conf.backup-4.3 $conf; done
|
||||
sudo systemctl daemon-reload (Ubuntu 16.04)
|
||||
|
||||
Restore system tables
|
||||
---------------------
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because 2021.1 uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: console
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo find . -maxdepth 1 -type f -exec sudo rm -f "{}" +
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
|
||||
@@ -184,6 +184,19 @@ Restore the configuration files
|
||||
for conf in $(cat /var/lib/dpkg/info/scylla-*server.conffiles /var/lib/dpkg/info/scylla-*conf.conffiles /var/lib/dpkg/info/scylla-*jmx.conffiles | grep -v init ) /etc/systemd/system/{var-lib-scylla,var-lib-systemd-coredump}.mount; do sudo cp -v $conf.backup-5.0 $conf; done
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
Restore system tables
|
||||
---------------------
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because 2022.1 uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: console
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo find . -maxdepth 1 -type f -exec sudo rm -f "{}" +
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ Upgrade ScyllaDB Open Source
|
||||
.. toctree::
|
||||
:hidden:
|
||||
|
||||
ScyllaDB 5.2 to 5.3 <upgrade-guide-from-5.2-to-5.3/index>
|
||||
ScyllaDB 5.1 to 5.2 <upgrade-guide-from-5.1-to-5.2/index>
|
||||
ScyllaDB 5.0 to 5.1 <upgrade-guide-from-5.0-to-5.1/index>
|
||||
ScyllaDB 5.x maintenance release <upgrade-guide-from-5.x.y-to-5.x.z/index>
|
||||
@@ -37,6 +38,7 @@ Upgrade ScyllaDB Open Source
|
||||
|
||||
Procedures for upgrading to a newer version of ScyllaDB Open Source.
|
||||
|
||||
* :doc:`Upgrade Guide - ScyllaDB 5.2 to 5.3 <upgrade-guide-from-5.2-to-5.3/index>`
|
||||
* :doc:`Upgrade Guide - ScyllaDB 5.1 to 5.2 <upgrade-guide-from-5.1-to-5.2/index>`
|
||||
* :doc:`Upgrade Guide - ScyllaDB 5.0 to 5.1 <upgrade-guide-from-5.0-to-5.1/index>`
|
||||
* :doc:`Upgrade Guide - ScyllaDB 5.x maintenance releases <upgrade-guide-from-5.x.y-to-5.x.z/index>`
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
====================================
|
||||
Upgrade Guide - ScyllaDB 5.2 to 5.3
|
||||
====================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB <upgrade-guide-from-5.2-to-5.3-generic>
|
||||
Metrics <metric-update-5.2-to-5.3>
|
||||
|
||||
.. panel-box::
|
||||
:title: Upgrade ScyllaDB
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
|
||||
Upgrade guides are available for:
|
||||
|
||||
* :doc:`Upgrade ScyllaDB from 5.2.x to 5.3.y <upgrade-guide-from-5.2-to-5.3-generic>`
|
||||
* :doc:`ScyllaDB Metrics Update - Scylla 5.2 to 5.3 <metric-update-5.2-to-5.3>`
|
||||
@@ -0,0 +1,19 @@
|
||||
ScyllaDB Metric Update - Scylla 5.2 to 5.3
|
||||
============================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Scylla 5.3 Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
The following metrics are new in ScyllaDB 5.3
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
||||
|
||||
The following metrics are renamed in ScyllaDB 5.3
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,432 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 5.2
|
||||
.. |NEW_VERSION| replace:: 5.3
|
||||
|
||||
.. |DEBIAN_SRC_REPO| replace:: Debian
|
||||
.. _DEBIAN_SRC_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-5.2
|
||||
|
||||
.. |UBUNTU_SRC_REPO| replace:: Ubuntu
|
||||
.. _UBUNTU_SRC_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-5.2
|
||||
|
||||
.. |SCYLLA_DEB_SRC_REPO| replace:: ScyllaDB deb repo (|DEBIAN_SRC_REPO|_, |UBUNTU_SRC_REPO|_)
|
||||
|
||||
.. |SCYLLA_RPM_SRC_REPO| replace:: ScyllaDB rpm repo
|
||||
.. _SCYLLA_RPM_SRC_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-5.2
|
||||
|
||||
.. |DEBIAN_NEW_REPO| replace:: Debian
|
||||
.. _DEBIAN_NEW_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-5.3
|
||||
|
||||
.. |UBUNTU_NEW_REPO| replace:: Ubuntu
|
||||
.. _UBUNTU_NEW_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-5.3
|
||||
|
||||
.. |SCYLLA_DEB_NEW_REPO| replace:: ScyllaDB deb repo (|DEBIAN_NEW_REPO|_, |UBUNTU_NEW_REPO|_)
|
||||
|
||||
.. |SCYLLA_RPM_NEW_REPO| replace:: ScyllaDB rpm repo
|
||||
.. _SCYLLA_RPM_NEW_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-5.3
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: Scylla Metrics Update - Scylla 5.2 to 5.3
|
||||
.. _SCYLLA_METRICS: ../metric-update-5.2-to-5.3
|
||||
|
||||
=============================================================================
|
||||
Upgrade Guide - |SCYLLA_NAME| |SRC_VERSION| to |NEW_VERSION|
|
||||
=============================================================================
|
||||
|
||||
This document is a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|, and rollback to
|
||||
version |SRC_VERSION| if required.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS, Debian, and Ubuntu. See
|
||||
:doc:`OS Support by Platform and Version </getting-started/os-support>` for information about supported versions.
|
||||
|
||||
It also applies when using ScyllaDB official image on EC2, GCP, or Azure. The image is based on Ubuntu 22.04.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes in the cluster, serially (i.e., one node at a time), you will:
|
||||
|
||||
* Check that the cluster's schema is synchronized
|
||||
* Drain the node and backup the data
|
||||
* Backup the configuration file
|
||||
* Stop ScyllaDB
|
||||
* Download and install new ScyllaDB packages
|
||||
* (Optional) Enable consistent cluster management in the configuration file
|
||||
* Start ScyllaDB
|
||||
* Validate that the upgrade was successful
|
||||
|
||||
Apply the following procedure **serially** on each node. Do not move to the next node before validating that the node you upgraded is up and running the new version.
|
||||
|
||||
**During** the rolling upgrade, it is highly recommended:
|
||||
|
||||
* Not to use the new |NEW_VERSION| features.
|
||||
* Not to run administration functions, like repairs, refresh, rebuild, or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending ScyllaDB Manager (only available for ScyllaDB Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes.
|
||||
|
||||
.. note::
|
||||
|
||||
If you use the `ScyllaDB Monitoring Stack <https://monitoring.docs.scylladb.com/>`_, we recommend upgrading the Monitoring Stack to the latest version **before** upgrading ScyllaDB.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
Check the cluster schema
|
||||
-------------------------
|
||||
|
||||
Make sure that all nodes have the schema synchronized before the upgrade. The upgrade procedure will fail if there is a schema
|
||||
disagreement between nodes.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool describecluster
|
||||
|
||||
Drain the nodes and backup data
|
||||
-----------------------------------
|
||||
|
||||
Before any major procedure, like an upgrade, it is recommended to backup all the data to an external device. In ScyllaDB,
|
||||
backup is performed using the ``nodetool snapshot`` command. For **each** node in the cluster, run the following command:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
nodetool snapshot
|
||||
|
||||
Take note of the directory name that nodetool gives you, and copy all the directories having that name under ``/var/lib/scylla``
|
||||
to a backup device.
|
||||
|
||||
When the upgrade is completed on all nodes, remove the snapshot with the ``nodetool clearsnapshot -t <snapshot>`` command to prevent
|
||||
running out of space.
|
||||
|
||||
Backup the configuration file
|
||||
------------------------------
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup-src
|
||||
|
||||
Gracefully stop the node
|
||||
------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``. You should use the same version as this version in case you want to |ROLLBACK|_ the upgrade. If you are not running a |SRC_VERSION|.x version, stop right here! This guide only covers |SRC_VERSION|.x to |NEW_VERSION|.y upgrades.
|
||||
|
||||
**To upgrade ScyllaDB:**
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION|.
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``. You should use the same version as this version in case you want to |ROLLBACK|_ the upgrade. If you are not running a |SRC_VERSION|.x version, stop right here! This guide only covers |SRC_VERSION|.x to |NEW_VERSION|.y upgrades.
|
||||
|
||||
**To upgrade ScyllaDB:**
|
||||
|
||||
#. Update the |SCYLLA_RPM_NEW_REPO|_ to |NEW_VERSION|.
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum update scylla\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``. You should use the same version as this version in case you want to |ROLLBACK|_ the upgrade. If you are not running a |SRC_VERSION|.x version, stop right here! This guide only covers |SRC_VERSION|.x to |NEW_VERSION|.y upgrades.
|
||||
|
||||
There are two alternative upgrade procedures:
|
||||
|
||||
* :ref:`Upgrading ScyllaDB and simultaneously updating 3rd party and OS packages <upgrade-image-recommended-procedure>`. It is recommended if you are running a ScyllaDB official image (EC2 AMI, GCP, and Azure images), which is based on Ubuntu 20.04.
|
||||
|
||||
* :ref:`Upgrading ScyllaDB without updating any external packages <upgrade-image-upgrade-guide-regular-procedure>`.
|
||||
|
||||
.. _upgrade-image-recommended-procedure:
|
||||
|
||||
**To upgrade ScyllaDB and update 3rd party and OS packages (RECOMMENDED):**
|
||||
|
||||
Choosing this upgrade procedure allows you to upgrade your ScyllaDB version and update the 3rd party and OS packages using one command.
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION|.
|
||||
|
||||
#. Load the new repo:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo apt-get update
|
||||
|
||||
|
||||
#. Run the following command to update the manifest file:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
cat scylla-packages-<version>-<arch>.txt | sudo xargs -n1 apt-get install -y
|
||||
|
||||
Where:
|
||||
|
||||
* ``<version>`` - The ScyllaDB version to which you are upgrading ( |NEW_VERSION| ).
|
||||
* ``<arch>`` - Architecture type: ``x86_64`` or ``aarch64``.
|
||||
|
||||
The file is included in the ScyllaDB packages downloaded in the previous step. The file location is ``http://downloads.scylladb.com/downloads/scylla/aws/manifest/scylla-packages-<version>-<arch>.txt``
|
||||
|
||||
Example:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
cat scylla-packages-5.2.0-x86_64.txt | sudo xargs -n1 apt-get install -y
|
||||
|
||||
.. note::
|
||||
|
||||
Alternatively, you can update the manifest file with the following command:
|
||||
|
||||
``sudo apt-get install $(awk '{print $1'} scylla-packages-<version>-<arch>.txt) -y``
|
||||
|
||||
.. _upgrade-image-upgrade-guide-regular-procedure:
|
||||
|
||||
**To upgrade ScyllaDB:**
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION|.
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. _upgrade-5.3-enable-raft:
|
||||
|
||||
(Optional) Enable consistent cluster management in the node's configuration file
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
.. note::
|
||||
|
||||
Skip this section if you enabled Raft-based consistent cluster management in version 5.2.
|
||||
|
||||
Optionally, you can enable Raft-based consistent cluster management on your cluster with the ``consistent_cluster_management``
|
||||
option. Setting the option to ``true`` on every node enables the Raft consensus algorithm for your cluster. Raft will be used
|
||||
to consistently manage cluster-wide metadata as soon as you finish upgrading every node to the new version. See
|
||||
:doc:`Raft in ScyllaDB </architecture/raft/>` to learn more.
|
||||
|
||||
In ScyllaDB 5.2 and 5.3, Raft-based consistent cluster management is disabled by default. If you didn't enable the feature
|
||||
in version 5.2, you can enable it during the upgrade to version 5.3: modify the ``scylla.yaml`` configuration file
|
||||
in ``/etc/scylla/`` **on every node** and add the following:
|
||||
|
||||
.. code:: yaml
|
||||
|
||||
consistent_cluster_management: true
|
||||
|
||||
.. note:: Once you finish upgrading every node with ``consistent_cluster_management`` enabled, it won't be possible to disable the option.
|
||||
|
||||
The option can also be enabled after the cluster is upgraded to |NEW_VERSION| (see :ref:`Enabling Raft in existing cluster <enabling-raft-existing-cluster>`).
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check again after two minutes, to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
|
||||
See |Scylla_METRICS|_ for more information..
|
||||
|
||||
Validate Raft Setup After Upgrading Every Node
|
||||
-----------------------------------------------
|
||||
|
||||
.. _validate-raft-setup-5.3:
|
||||
|
||||
.. note::
|
||||
|
||||
Skip this section if you enabled Raft-based consistent cluster management in version 5.2.
|
||||
|
||||
The following section only applies :ref:`if you enabled <upgrade-5.3-enable-raft>` the ``consistent_cluster_management`` option
|
||||
on every node when upgrading the cluster to version 5.3.
|
||||
|
||||
When you enable ``consistent_cluster_management`` on every node during an upgrade, the ScyllaDB cluster will start
|
||||
an additional internal procedure as soon as every node is upgraded to the new version. The goal of the procedure is to
|
||||
initialize data structures used by the Raft algorithm to consistently manage cluster-wide metadata such as table schemas.
|
||||
|
||||
If you performed the rolling upgrade procedure correctly (in particular, ensuring that schema is synchronized on every step
|
||||
and there are no problems with cluster connectivity), that internal procedure should take no longer than a few seconds
|
||||
to finish. However, the procedure requires **full cluster availability**. If one of your nodes fails before this procedure
|
||||
finishes (for example, due to a hardware problem), the procedure may get stuck. This may cause the cluster to end up in
|
||||
a state where schema change operations are unavailable.
|
||||
|
||||
For this reason, **you must verify** that the internal procedure has finished successfully by checking the logs of every
|
||||
ScyllaDB node. If the procedure gets stuck, manual intervention is required.
|
||||
|
||||
Refer to :ref:`Verifying that the internal Raft upgrade procedure finished successfully <verify-raft-procedure>` for
|
||||
instructions on how to verify that the procedure was successful and how to proceed if it gets stuck.
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
.. include:: /upgrade/_common/warning_rollback.rst
|
||||
|
||||
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION|.x to |SRC_VERSION|.y. Apply this procedure if an upgrade
|
||||
from |SRC_VERSION| to |NEW_VERSION| fails before completing on all nodes. Use this procedure only for nodes you upgraded to |NEW_VERSION|.
|
||||
|
||||
.. warning::
|
||||
|
||||
The rollback procedure can be applied **only** if some nodes have not been upgraded to |NEW_VERSION| yet.
|
||||
As soon as the last node in the rolling upgrade procedure is started with |NEW_VERSION|, rollback becomes impossible.
|
||||
At that point, the only way to restore a cluster to |SRC_VERSION| is by restoring it from backup.
|
||||
|
||||
ScyllaDB rollback is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes you rollback to |SRC_VERSION|, serially (i.e., one node at a time), you will:
|
||||
|
||||
* Drain the node and stop Scylla
|
||||
* Retrieve the old ScyllaDB packages
|
||||
* Restore the configuration file
|
||||
* Restore system tables
|
||||
* Reload systemd configuration
|
||||
* Restart ScyllaDB
|
||||
* Validate the rollback success
|
||||
|
||||
Apply the following procedure **serially** on each node. Do not move to the next node before validating that the rollback was successful and the node is up and running the old version.
|
||||
|
||||
Rollback Steps
|
||||
==============
|
||||
Drain and gracefully stop the node
|
||||
----------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
|
||||
..
|
||||
TODO: downgrade for 3rd party packages in EC2/GCP/Azure - like in the upgrade section?
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Update the |SCYLLA_DEB_SRC_REPO| to |SRC_VERSION|.
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Update the |SCYLLA_RPM_SRC_REPO|_ to |SRC_VERSION|.
|
||||
#. Install:
|
||||
|
||||
.. code:: console
|
||||
|
||||
sudo yum clean all
|
||||
sudo rm -rf /var/cache/yum
|
||||
sudo yum remove scylla\\*cqlsh
|
||||
sudo yum downgrade scylla\\* -y
|
||||
sudo yum install scylla
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Update the |SCYLLA_DEB_SRC_REPO| to |SRC_VERSION|.
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
Restore the configuration file
|
||||
------------------------------
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/scylla/scylla.yaml
|
||||
sudo cp -a /etc/scylla/scylla.yaml.backup-src | /etc/scylla/scylla.yaml
|
||||
|
||||
Restore system tables
|
||||
---------------------
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because |NEW_VERSION| uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
Reload systemd configuration
|
||||
----------------------------
|
||||
|
||||
You must reload the unit file if the systemd unit file is changed.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check the upgrade instructions above for validation. Once you are sure the node rollback is successful, move to the next node in the cluster.
|
||||
@@ -167,6 +167,19 @@ Restore the configuration file
|
||||
|
||||
for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ) /etc/systemd/system/{var-lib-scylla,var-lib-systemd-coredump}.mount; do sudo cp -v $conf.backup-4.3 $conf; done
|
||||
|
||||
Restore system tables
|
||||
---------------------
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because 2021.1 uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: console
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo find . -maxdepth 1 -type f -exec sudo rm -f "{}" +
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
|
||||
@@ -172,6 +172,19 @@ Restore the configuration file
|
||||
|
||||
for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ) /etc/systemd/system/{var-lib-scylla,var-lib-systemd-coredump}.mount; do sudo cp -v $conf.backup-5.0 $conf; done
|
||||
|
||||
Restore system tables
|
||||
---------------------
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because 2022.1 uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: console
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo find . -maxdepth 1 -type f -exec sudo rm -f "{}" +
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
|
||||
@@ -346,8 +346,10 @@ Restore system tables
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because |NEW_VERSION| uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: sh
|
||||
.. code:: console
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo find . -maxdepth 1 -type f -exec sudo rm -f "{}" +
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
@@ -346,8 +346,11 @@ Restore system tables
|
||||
|
||||
Restore all tables of **system** and **system_schema** from the previous snapshot because |NEW_VERSION| uses a different set of system tables. See :doc:`Restore from a Backup and Incremental Backup </operating-scylla/procedures/backup-restore/restore/>` for reference.
|
||||
|
||||
.. code:: sh
|
||||
.. code:: console
|
||||
|
||||
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo find . -maxdepth 1 -type f -exec sudo rm -f "{}" +
|
||||
cd /var/lib/scylla/data/keyspace_name/table_name-UUID/snapshots/<snapshot_name>/
|
||||
sudo cp -r * /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
sudo chown -R scylla:scylla /var/lib/scylla/data/keyspace_name/table_name-UUID/
|
||||
|
||||
@@ -632,9 +632,9 @@ messaging_service::initial_scheduling_info() const {
|
||||
sched_infos.reserve(sched_infos.size() +
|
||||
_scheduling_config.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT);
|
||||
for (const auto& tenant : _scheduling_config.statement_tenants) {
|
||||
sched_infos.push_back({ tenant.sched_group, "statement:" + tenant.name });
|
||||
sched_infos.push_back({ tenant.sched_group, "statement-ack:" + tenant.name });
|
||||
sched_infos.push_back({ tenant.sched_group, "forward:" + tenant.name });
|
||||
for (auto&& connection_prefix : _connection_types_prefix) {
|
||||
sched_infos.push_back({ tenant.sched_group, sstring(connection_prefix) + tenant.name });
|
||||
}
|
||||
}
|
||||
|
||||
assert(sched_infos.size() == PER_SHARD_CONNECTION_COUNT +
|
||||
@@ -660,6 +660,14 @@ messaging_service::scheduling_group_for_isolation_cookie(const sstring& isolatio
|
||||
return info.sched_group;
|
||||
}
|
||||
}
|
||||
// Check for the case of the client using a connection class we don't
|
||||
// recognize, but we know its a tenant, not a system connection.
|
||||
// Fall-back to the default tenant in this case.
|
||||
for (auto&& connection_prefix : _connection_types_prefix) {
|
||||
if (isolation_cookie.find(connection_prefix.data()) == 0) {
|
||||
return _scheduling_config.statement_tenants.front().sched_group;
|
||||
}
|
||||
}
|
||||
// Client is using a new connection class that the server doesn't recognize yet.
|
||||
// Assume it's important, after server upgrade we'll recognize it.
|
||||
return default_scheduling_group();
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <list>
|
||||
#include <vector>
|
||||
#include <optional>
|
||||
#include <array>
|
||||
#include <absl/container/btree_set.h>
|
||||
#include <seastar/net/tls.hh>
|
||||
|
||||
@@ -526,6 +527,7 @@ public:
|
||||
scheduling_group scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const;
|
||||
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
|
||||
unsigned get_rpc_client_idx(messaging_verb verb) const;
|
||||
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"};
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
@@ -560,21 +560,23 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe
|
||||
}
|
||||
try {
|
||||
return _db.invoke_on_all([this, cmd = &_cmd, ranges = &_ranges, gs = global_schema_ptr(_schema),
|
||||
gts = tracing::global_trace_state_ptr(_trace_state), timeout] (replica::database& db) mutable {
|
||||
gts = tracing::global_trace_state_ptr(_trace_state), timeout] (replica::database& db) mutable -> future<> {
|
||||
auto schema = gs.get();
|
||||
auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, gts.get(), timeout);
|
||||
auto& table = db.find_column_family(schema);
|
||||
auto& semaphore = this->semaphore();
|
||||
auto shard = this_shard_id();
|
||||
|
||||
auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, gts.get(), timeout);
|
||||
|
||||
if (!querier_opt) {
|
||||
_readers[shard] = reader_meta(reader_state::inexistent);
|
||||
return;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto& q = *querier_opt;
|
||||
|
||||
if (&q.permit().semaphore() != &semaphore) {
|
||||
co_await q.close();
|
||||
on_internal_error(mmq_log, format("looked-up reader belongs to different semaphore than the one appropriate for this query class: "
|
||||
"looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})",
|
||||
q.permit().semaphore().name(),
|
||||
@@ -583,6 +585,9 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe
|
||||
reinterpret_cast<uintptr_t>(&semaphore)));
|
||||
}
|
||||
|
||||
// At this point the readers is passed to the semaphore and we are
|
||||
// safe w.r.t. exceptions. The code between this point and obtaining
|
||||
// the querier must take care of closing it if an error happens.
|
||||
auto handle = semaphore.register_inactive_read(std::move(q).reader());
|
||||
_readers[shard] = reader_meta(
|
||||
reader_state::successful_lookup,
|
||||
|
||||
@@ -66,36 +66,48 @@ atomic_cell::atomic_cell(const abstract_type& type, atomic_cell_view other)
|
||||
set_view(_data);
|
||||
}
|
||||
|
||||
// Based on:
|
||||
// - org.apache.cassandra.db.AbstractCell#reconcile()
|
||||
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
|
||||
// - org.apache.cassandra.db.BufferDeletedCell#reconcile()
|
||||
// Based on Cassandra's resolveRegular function:
|
||||
// - https://github.com/apache/cassandra/blob/e4f31b73c21b04966269c5ac2d3bd2562e5f6c63/src/java/org/apache/cassandra/db/rows/Cells.java#L79-L119
|
||||
//
|
||||
// Note: the ordering algorithm for cell is the same as for rows,
|
||||
// except that the cell value is used to break a tie in case all other attributes are equal.
|
||||
// See compare_row_marker_for_merge.
|
||||
std::strong_ordering
|
||||
compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
|
||||
// Largest write timestamp wins.
|
||||
if (left.timestamp() != right.timestamp()) {
|
||||
return left.timestamp() <=> right.timestamp();
|
||||
}
|
||||
// Tombstones always win reconciliation with live cells of the same timestamp
|
||||
if (left.is_live() != right.is_live()) {
|
||||
return left.is_live() ? std::strong_ordering::less : std::strong_ordering::greater;
|
||||
}
|
||||
if (left.is_live()) {
|
||||
auto c = compare_unsigned(left.value(), right.value()) <=> 0;
|
||||
if (c != 0) {
|
||||
return c;
|
||||
}
|
||||
// Prefer expiring cells (which will become tombstones at some future date) over live cells.
|
||||
// See https://issues.apache.org/jira/browse/CASSANDRA-14592
|
||||
if (left.is_live_and_has_ttl() != right.is_live_and_has_ttl()) {
|
||||
// prefer expiring cells.
|
||||
return left.is_live_and_has_ttl() ? std::strong_ordering::greater : std::strong_ordering::less;
|
||||
}
|
||||
// If both are expiring, choose the cell with the latest expiry or derived write time.
|
||||
if (left.is_live_and_has_ttl()) {
|
||||
// Prefer cell with latest expiry
|
||||
if (left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
} else {
|
||||
// prefer the cell that was written later,
|
||||
// so it survives longer after it expires, until purged.
|
||||
} else if (right.ttl() != left.ttl()) {
|
||||
// The cell write time is derived by (expiry - ttl).
|
||||
// Prefer the cell that was written later,
|
||||
// so it survives longer after it expires, until purged,
|
||||
// as it become purgeable gc_grace_seconds after it was written.
|
||||
//
|
||||
// Note that this is an extension to Cassandra's algorithm
|
||||
// which stops at the expiration time, and if equal,
|
||||
// move forward to compare the cell values.
|
||||
return right.ttl() <=> left.ttl();
|
||||
}
|
||||
}
|
||||
// The cell with the largest value wins, if all other attributes of the cells are identical.
|
||||
// This is quite arbitrary, but still required to break the tie in a deterministic way.
|
||||
return compare_unsigned(left.value(), right.value());
|
||||
} else {
|
||||
// Both are deleted
|
||||
|
||||
|
||||
@@ -1124,20 +1124,34 @@ operator<<(std::ostream& os, const mutation_partition::printer& p) {
|
||||
constexpr gc_clock::duration row_marker::no_ttl;
|
||||
constexpr gc_clock::duration row_marker::dead;
|
||||
|
||||
// Note: the ordering algorithm for rows is the same as for cells,
|
||||
// except that there is no cell value to break a tie in case all other attributes are equal.
|
||||
// See compare_atomic_cell_for_merge.
|
||||
int compare_row_marker_for_merge(const row_marker& left, const row_marker& right) noexcept {
|
||||
// Largest write timestamp wins.
|
||||
if (left.timestamp() != right.timestamp()) {
|
||||
return left.timestamp() > right.timestamp() ? 1 : -1;
|
||||
}
|
||||
// Tombstones always win reconciliation with live rows of the same timestamp
|
||||
if (left.is_live() != right.is_live()) {
|
||||
return left.is_live() ? -1 : 1;
|
||||
}
|
||||
if (left.is_live()) {
|
||||
// Prefer expiring rows (which will become tombstones at some future date) over live rows.
|
||||
// See https://issues.apache.org/jira/browse/CASSANDRA-14592
|
||||
if (left.is_expiring() != right.is_expiring()) {
|
||||
// prefer expiring cells.
|
||||
return left.is_expiring() ? 1 : -1;
|
||||
}
|
||||
if (left.is_expiring() && left.expiry() != right.expiry()) {
|
||||
return left.expiry() < right.expiry() ? -1 : 1;
|
||||
// If both are expiring, choose the cell with the latest expiry or derived write time.
|
||||
if (left.is_expiring()) {
|
||||
if (left.expiry() != right.expiry()) {
|
||||
return left.expiry() < right.expiry() ? -1 : 1;
|
||||
} else if (left.ttl() != right.ttl()) {
|
||||
// The cell write time is derived by (expiry - ttl).
|
||||
// Prefer row that was written later (and has a smaller ttl).
|
||||
return left.ttl() < right.ttl() ? 1 : -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Both are either deleted or missing
|
||||
|
||||
@@ -55,6 +55,17 @@ public:
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
// Might only be called between consume_new_partition()
|
||||
// and consume_end_of_partition().
|
||||
//
|
||||
// Returns (and forgets) the partition contents consumed so far.
|
||||
// Can be used to split the processing of a large mutation into
|
||||
// multiple smaller `mutation` objects (which add up to the full mutation).
|
||||
mutation flush() {
|
||||
assert(_m);
|
||||
return std::exchange(*_m, mutation(_s, _m->decorated_key()));
|
||||
}
|
||||
|
||||
mutation_opt consume_end_of_stream() {
|
||||
return std::move(_m);
|
||||
}
|
||||
@@ -67,6 +78,7 @@ class mutation_rebuilder_v2 {
|
||||
schema_ptr _s;
|
||||
mutation_rebuilder _builder;
|
||||
range_tombstone_assembler _rt_assembler;
|
||||
position_in_partition _pos = position_in_partition::before_all_clustered_rows();
|
||||
public:
|
||||
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { }
|
||||
public:
|
||||
@@ -91,6 +103,7 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume(range_tombstone_change&& rt) {
|
||||
_pos = rt.position();
|
||||
if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) {
|
||||
_builder.consume(std::move(*rt_opt));
|
||||
}
|
||||
@@ -103,6 +116,7 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume(clustering_row&& cr) {
|
||||
_pos = position_in_partition::after_key(*_s, cr.position());
|
||||
_builder.consume(std::move(cr));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
@@ -116,4 +130,22 @@ public:
|
||||
_rt_assembler.on_end_of_stream();
|
||||
return _builder.consume_end_of_stream();
|
||||
}
|
||||
|
||||
// Might only be called between consume_new_partition()
|
||||
// and consume_end_of_partition().
|
||||
//
|
||||
// Returns (and forgets) the partition contents consumed so far.
|
||||
// Can be used to split the processing of a large mutation into
|
||||
// multiple smaller `mutation` objects (which add up to the full mutation).
|
||||
//
|
||||
// The active range tombstone (if present) is flushed with end bound
|
||||
// just after the last seen clustered position, but the range tombstone
|
||||
// remains active, and the next mutation will see it restarted at the
|
||||
// position it was flushed at.
|
||||
mutation flush() {
|
||||
if (auto rt_opt = _rt_assembler.flush(*_s, _pos)) {
|
||||
_builder.consume(std::move(*rt_opt));
|
||||
}
|
||||
return _builder.flush();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -103,6 +103,12 @@ partition_slice_builder::mutate_specific_ranges(std::function<void(query::specif
|
||||
return *this;
|
||||
}
|
||||
|
||||
partition_slice_builder&
|
||||
partition_slice_builder::set_specific_ranges(query::specific_ranges ranges) {
|
||||
_specific_ranges = std::make_unique<query::specific_ranges>(std::move(ranges));
|
||||
return *this;
|
||||
}
|
||||
|
||||
partition_slice_builder&
|
||||
partition_slice_builder::with_no_regular_columns() {
|
||||
_regular_columns = query::column_id_vector();
|
||||
|
||||
@@ -45,6 +45,7 @@ public:
|
||||
partition_slice_builder& mutate_ranges(std::function<void(std::vector<query::clustering_range>&)>);
|
||||
// noop if no specific ranges have been set yet
|
||||
partition_slice_builder& mutate_specific_ranges(std::function<void(query::specific_ranges&)>);
|
||||
partition_slice_builder& set_specific_ranges(query::specific_ranges);
|
||||
partition_slice_builder& without_partition_key_columns();
|
||||
partition_slice_builder& without_clustering_key_columns();
|
||||
partition_slice_builder& reversed();
|
||||
|
||||
@@ -32,6 +32,7 @@ struct mutation_fragment_and_stream_id {
|
||||
};
|
||||
|
||||
using mutation_fragment_batch = boost::iterator_range<merger_vector<mutation_fragment_and_stream_id>::iterator>;
|
||||
using mutation_fragment_batch_opt = std::optional<mutation_fragment_batch>;
|
||||
|
||||
template<typename Producer>
|
||||
concept FragmentProducer = requires(Producer p, dht::partition_range part_range, position_range pos_range) {
|
||||
@@ -226,6 +227,7 @@ private:
|
||||
streamed_mutation::forwarding _fwd_sm;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
private:
|
||||
future<mutation_fragment_batch_opt> maybe_produce_batch();
|
||||
void maybe_add_readers_at_partition_boundary();
|
||||
void maybe_add_readers(const std::optional<dht::ring_position_view>& pos);
|
||||
void add_readers(std::vector<flat_mutation_reader_v2> new_readers);
|
||||
@@ -469,15 +471,21 @@ mutation_reader_merger::mutation_reader_merger(schema_ptr schema,
|
||||
}
|
||||
|
||||
future<mutation_fragment_batch> mutation_reader_merger::operator()() {
|
||||
return repeat_until_value([this] { return maybe_produce_batch(); });
|
||||
}
|
||||
|
||||
future<mutation_fragment_batch_opt> mutation_reader_merger::maybe_produce_batch() {
|
||||
// Avoid merging-related logic if we know that only a single reader owns
|
||||
// current partition.
|
||||
if (_single_reader.reader != reader_iterator{}) {
|
||||
if (_single_reader.reader->is_buffer_empty()) {
|
||||
if (_single_reader.reader->is_end_of_stream()) {
|
||||
_current.clear();
|
||||
return make_ready_future<mutation_fragment_batch>(_current, &_single_reader);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(mutation_fragment_batch(_current, &_single_reader));
|
||||
}
|
||||
return _single_reader.reader->fill_buffer().then([this] { return operator()(); });
|
||||
return _single_reader.reader->fill_buffer().then([] {
|
||||
return make_ready_future<mutation_fragment_batch_opt>();
|
||||
});
|
||||
}
|
||||
_current.clear();
|
||||
_current.emplace_back(_single_reader.reader->pop_mutation_fragment(), &*_single_reader.reader);
|
||||
@@ -485,22 +493,22 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
|
||||
if (_current.back().fragment.is_end_of_partition()) {
|
||||
_next.emplace_back(std::exchange(_single_reader.reader, {}), mutation_fragment_v2::kind::partition_end);
|
||||
}
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current);
|
||||
}
|
||||
|
||||
if (in_gallop_mode()) {
|
||||
return advance_galloping_reader().then([this] (needs_merge needs_merge) {
|
||||
if (!needs_merge) {
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current);
|
||||
}
|
||||
// Galloping reader may have lost to some other reader. In that case, we should proceed
|
||||
// with standard merging logic.
|
||||
return (*this)();
|
||||
return make_ready_future<mutation_fragment_batch_opt>();
|
||||
});
|
||||
}
|
||||
|
||||
if (!_next.empty()) {
|
||||
return prepare_next().then([this] { return (*this)(); });
|
||||
return prepare_next().then([] { return make_ready_future<mutation_fragment_batch_opt>(); });
|
||||
}
|
||||
|
||||
_current.clear();
|
||||
@@ -509,7 +517,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
|
||||
// readers for the next one.
|
||||
if (_fragment_heap.empty()) {
|
||||
if (!_halted_readers.empty() || _reader_heap.empty()) {
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current);
|
||||
}
|
||||
|
||||
auto key = [] (const merger_vector<reader_and_fragment>& heap) -> const dht::decorated_key& {
|
||||
@@ -529,7 +537,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
|
||||
_current.emplace_back(std::move(_fragment_heap.back().fragment), &*_single_reader.reader);
|
||||
_fragment_heap.clear();
|
||||
_gallop_mode_hits = 0;
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,7 +563,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
|
||||
_gallop_mode_hits = 1;
|
||||
}
|
||||
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current);
|
||||
}
|
||||
|
||||
future<> mutation_reader_merger::next_partition() {
|
||||
@@ -918,7 +926,7 @@ class clustering_order_reader_merger {
|
||||
//
|
||||
// If the galloping reader wins with other readers again, the fragment is returned as the next batch.
|
||||
// Otherwise, the reader is pushed onto _peeked_readers and we retry in non-galloping mode.
|
||||
future<mutation_fragment_batch> peek_galloping_reader() {
|
||||
future<mutation_fragment_batch_opt> peek_galloping_reader() {
|
||||
return _galloping_reader->reader.peek().then([this] (mutation_fragment_v2* mf) {
|
||||
bool erase = false;
|
||||
if (mf) {
|
||||
@@ -943,7 +951,7 @@ class clustering_order_reader_merger {
|
||||
|| _cmp(mf->position(), _peeked_readers.front()->reader.peek_buffer().position()) < 0)) {
|
||||
_current_batch.emplace_back(_galloping_reader->reader.pop_mutation_fragment(), &_galloping_reader->reader);
|
||||
|
||||
return make_ready_future<mutation_fragment_batch>(_current_batch);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current_batch);
|
||||
}
|
||||
|
||||
// One of the existing readers won with the galloping reader,
|
||||
@@ -969,7 +977,7 @@ class clustering_order_reader_merger {
|
||||
return maybe_erase.then([this] {
|
||||
_galloping_reader = {};
|
||||
_gallop_mode_hits = 0;
|
||||
return (*this)();
|
||||
return make_ready_future<mutation_fragment_batch_opt>();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -994,6 +1002,10 @@ public:
|
||||
// returned by the previous operator() call after calling operator() again
|
||||
// (the data from the previous batch is destroyed).
|
||||
future<mutation_fragment_batch> operator()() {
|
||||
return repeat_until_value([this] { return maybe_produce_batch(); });
|
||||
}
|
||||
|
||||
future<mutation_fragment_batch_opt> maybe_produce_batch() {
|
||||
_current_batch.clear();
|
||||
|
||||
if (in_gallop_mode()) {
|
||||
@@ -1001,7 +1013,7 @@ public:
|
||||
}
|
||||
|
||||
if (!_unpeeked_readers.empty()) {
|
||||
return peek_readers().then([this] { return (*this)(); });
|
||||
return peek_readers().then([] { return make_ready_future<mutation_fragment_batch_opt>(); });
|
||||
}
|
||||
|
||||
// Before we return a batch of fragments using currently opened readers we must check the queue
|
||||
@@ -1026,7 +1038,7 @@ public:
|
||||
_all_readers.push_front(std::move(r));
|
||||
_unpeeked_readers.push_back(_all_readers.begin());
|
||||
}
|
||||
return peek_readers().then([this] { return (*this)(); });
|
||||
return peek_readers().then([] { return make_ready_future<mutation_fragment_batch_opt>(); });
|
||||
}
|
||||
|
||||
if (_peeked_readers.empty()) {
|
||||
@@ -1040,7 +1052,7 @@ public:
|
||||
}
|
||||
_should_emit_partition_end = false;
|
||||
}
|
||||
return make_ready_future<mutation_fragment_batch>(_current_batch);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current_batch);
|
||||
}
|
||||
|
||||
// Take all fragments with the next smallest position (there may be multiple such fragments).
|
||||
@@ -1074,7 +1086,7 @@ public:
|
||||
_gallop_mode_hits = 1;
|
||||
}
|
||||
|
||||
return make_ready_future<mutation_fragment_batch>(_current_batch);
|
||||
return make_ready_future<mutation_fragment_batch_opt>(_current_batch);
|
||||
}
|
||||
|
||||
future<> next_partition() {
|
||||
|
||||
@@ -611,7 +611,7 @@ future<> evictable_reader_v2::fill_buffer() {
|
||||
// First make sure we've made progress w.r.t. _next_position_in_partition.
|
||||
// This loop becomes inifinite when next pos is a partition start.
|
||||
// In that case progress is guranteed anyway, so skip this loop entirely.
|
||||
while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) {
|
||||
while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(buffer().back().position(), _next_position_in_partition) <= 0) {
|
||||
push_mutation_fragment(_reader->pop_mutation_fragment());
|
||||
next_mf = co_await _reader->peek();
|
||||
}
|
||||
|
||||
@@ -368,6 +368,7 @@ public:
|
||||
|
||||
future<> on_end_of_stream() noexcept {
|
||||
return _reader.close().then([this] {
|
||||
_permit.release_base_resources();
|
||||
_reader = mutation_fragment_v1_stream(make_empty_flat_reader_v2(_schema, _permit));
|
||||
_reader_handle.reset();
|
||||
});
|
||||
@@ -375,6 +376,7 @@ public:
|
||||
|
||||
future<> close() noexcept {
|
||||
return _reader.close().then([this] {
|
||||
_permit.release_base_resources();
|
||||
_reader_handle.reset();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1132,9 +1132,6 @@ public:
|
||||
// Safely iterate through table states, while performing async operations on them.
|
||||
future<> parallel_foreach_table_state(std::function<future<>(compaction::table_state&)> action);
|
||||
|
||||
// Add sst to or remove it from the sstables_requiring_cleanup set.
|
||||
bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
||||
|
||||
// Uncoditionally erase sst from `sstables_requiring_cleanup`
|
||||
// Returns true iff sst was found and erased.
|
||||
bool erase_sstable_cleanup_state(const sstables::shared_sstable& sst);
|
||||
|
||||
@@ -155,9 +155,8 @@ collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<r
|
||||
auto shared_sstables = d.retrieve_shared_sstables();
|
||||
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
|
||||
if (sorted_owned_ranges_ptr) {
|
||||
auto& table = db.local().find_column_family(ks_name, table_name);
|
||||
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
|
||||
if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) {
|
||||
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
|
||||
need_cleanup.push_back(co_await sst->get_open_info());
|
||||
co_return false;
|
||||
}
|
||||
@@ -242,9 +241,6 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
|
||||
buckets.emplace_back();
|
||||
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
|
||||
auto sst = co_await dir.load_foreign_sstable(info);
|
||||
if (owned_ranges_ptr) {
|
||||
table.update_sstable_cleanup_state(sst, *owned_ranges_ptr);
|
||||
}
|
||||
// Last bucket gets leftover SSTables
|
||||
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
|
||||
buckets.emplace_back();
|
||||
|
||||
@@ -2856,12 +2856,6 @@ table::as_data_dictionary() const {
|
||||
return _impl.wrap(*this);
|
||||
}
|
||||
|
||||
bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) {
|
||||
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, sorted_owned_ranges);
|
||||
}
|
||||
|
||||
bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) {
|
||||
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: f94b1bb9cb...e45cef9ce8
@@ -314,7 +314,7 @@ static shared_ptr<cql3::selection::selection> mock_selection(
|
||||
) {
|
||||
auto name_as_expression = [] (const sstring& name) -> cql3::expr::expression {
|
||||
return cql3::expr::unresolved_identifier {
|
||||
make_shared<cql3::column_identifier_raw>(name, false)
|
||||
make_shared<cql3::column_identifier_raw>(name, true)
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -469,6 +469,13 @@ struct group0_members {
|
||||
const raft::server& _group0_server;
|
||||
const raft_address_map& _address_map;
|
||||
|
||||
raft::config_member_set get_members() const {
|
||||
return _group0_server.get_configuration().current;
|
||||
}
|
||||
|
||||
std::optional<gms::inet_address> get_inet_addr(const raft::config_member& member) const {
|
||||
return _address_map.find(member.addr.id);
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> get_inet_addrs(std::source_location l =
|
||||
std::source_location::current()) const {
|
||||
@@ -1172,10 +1179,7 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
// We fetch the config again on every attempt to handle the possibility of removing failed nodes.
|
||||
auto current_config = members0.get_inet_addrs();
|
||||
if (current_config.empty()) {
|
||||
continue;
|
||||
}
|
||||
auto current_members_set = members0.get_members();
|
||||
|
||||
::tracker<bool> tracker;
|
||||
auto retry = make_lw_shared<bool>(false);
|
||||
@@ -1189,10 +1193,20 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
}
|
||||
|
||||
(void) [] (netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown,
|
||||
std::vector<gms::inet_address> current_config,
|
||||
raft::config_member_set current_members_set, group0_members members0,
|
||||
lw_shared_ptr<std::unordered_set<gms::inet_address>> entered_synchronize,
|
||||
lw_shared_ptr<bool> retry, ::tracker<bool> tracker) -> future<> {
|
||||
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> {
|
||||
co_await max_concurrent_for_each(current_members_set, max_concurrency, [&] (const raft::config_member& member) -> future<> {
|
||||
auto node_opt = members0.get_inet_addr(member);
|
||||
|
||||
if (!node_opt.has_value()) {
|
||||
upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: cannot resolve the IP of {}", member);
|
||||
*retry = true;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto node = *node_opt;
|
||||
|
||||
if (entered_synchronize->contains(node)) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1229,7 +1243,7 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
});
|
||||
|
||||
tracker.set_value(false);
|
||||
}(ms, as, pause_shutdown, std::move(current_config), entered_synchronize, retry, tracker);
|
||||
}(ms, as, pause_shutdown, std::move(current_members_set), members0, entered_synchronize, retry, tracker);
|
||||
|
||||
auto finish_early = co_await tracker.get();
|
||||
if (finish_early) {
|
||||
|
||||
@@ -115,8 +115,12 @@ static const seastar::metrics::label op_type_label("op_type");
|
||||
static const seastar::metrics::label scheduling_group_label("scheduling_group_name");
|
||||
static const seastar::metrics::label rejected_by_coordinator_label("rejected_by_coordinator");
|
||||
|
||||
seastar::metrics::label_instance make_scheduling_group_label(const scheduling_group& sg) {
|
||||
return scheduling_group_label(sg.name());
|
||||
}
|
||||
|
||||
seastar::metrics::label_instance current_scheduling_group_label() {
|
||||
return scheduling_group_label(current_scheduling_group().name());
|
||||
return make_scheduling_group_label(current_scheduling_group());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2328,7 +2332,8 @@ storage_proxy_stats::split_stats::split_stats(const sstring& category, const sst
|
||||
, _long_description_prefix(long_description_prefix)
|
||||
, _category(category)
|
||||
, _op_type(op_type)
|
||||
, _auto_register_metrics(auto_register_metrics) { }
|
||||
, _auto_register_metrics(auto_register_metrics)
|
||||
, _sg(current_scheduling_group()) { }
|
||||
|
||||
storage_proxy_stats::write_stats::write_stats()
|
||||
: writes_attempts(COORDINATOR_STATS_CATEGORY, "total_write_attempts", "total number of write requests", "mutation_data")
|
||||
@@ -2645,7 +2650,7 @@ void storage_proxy_stats::split_stats::register_metrics_local() {
|
||||
auto new_metrics = sm::metric_groups();
|
||||
new_metrics.add_group(_category, {
|
||||
sm::make_counter(_short_description_prefix + sstring("_local_node"), [this] { return _local.val; },
|
||||
sm::description(_long_description_prefix + "on a local Node"), {storage_proxy_stats::current_scheduling_group_label(), op_type_label(_op_type)})
|
||||
sm::description(_long_description_prefix + "on a local Node"), {storage_proxy_stats::make_scheduling_group_label(_sg), op_type_label(_op_type)})
|
||||
});
|
||||
_metrics = std::exchange(new_metrics, {});
|
||||
}
|
||||
@@ -2658,7 +2663,7 @@ void storage_proxy_stats::split_stats::register_metrics_for(sstring dc, gms::ine
|
||||
if (auto [ignored, added] = _dc_stats.try_emplace(dc); added) {
|
||||
_metrics.add_group(_category, {
|
||||
sm::make_counter(_short_description_prefix + sstring("_remote_node"), [this, dc] { return _dc_stats[dc].val; },
|
||||
sm::description(seastar::format("{} when communicating with external Nodes in another DC", _long_description_prefix)), {storage_proxy_stats::current_scheduling_group_label(), datacenter_label(dc), op_type_label(_op_type)})
|
||||
sm::description(seastar::format("{} when communicating with external Nodes in another DC", _long_description_prefix)), {storage_proxy_stats::make_scheduling_group_label(_sg), datacenter_label(dc), op_type_label(_op_type)})
|
||||
.set_skip_when_empty()
|
||||
});
|
||||
}
|
||||
@@ -5467,7 +5472,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, *erm, merged, current_merged_preferred_replicas, gossiper, pcf);
|
||||
|
||||
// Estimate whether merging will be a win or not
|
||||
if (!is_worth_merging_for_range_query(erm->get_topology(), filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
|
||||
if (filtered_merged.empty()
|
||||
|| !is_worth_merging_for_range_query(
|
||||
erm->get_topology(), filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
|
||||
break;
|
||||
} else if (pcf) {
|
||||
// check that merged set hit rate is not to low
|
||||
@@ -5479,6 +5486,10 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
return float(cf->get_hit_rate(g, ep).rate);
|
||||
}
|
||||
} ep_to_hr{g, pcf};
|
||||
|
||||
if (range.empty()) {
|
||||
on_internal_error(slogger, "empty range passed to `find_min`");
|
||||
}
|
||||
return *boost::range::min_element(range | boost::adaptors::transformed(ep_to_hr));
|
||||
};
|
||||
auto merged = find_min(filtered_merged) * 1.2; // give merged set 20% boost
|
||||
|
||||
@@ -44,6 +44,7 @@ private:
|
||||
// whether to register per-endpoint metrics automatically
|
||||
bool _auto_register_metrics;
|
||||
|
||||
scheduling_group _sg;
|
||||
public:
|
||||
/**
|
||||
* @param category a statistics category, e.g. "client" or "replica"
|
||||
|
||||
@@ -14,6 +14,7 @@ import boto3
|
||||
import requests
|
||||
import re
|
||||
from util import create_test_table, is_aws, scylla_log
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# When tests are run with HTTPS, the server often won't have its SSL
|
||||
# certificate signed by a known authority. So we will disable certificate
|
||||
@@ -89,6 +90,18 @@ def dynamodb(request):
|
||||
region_name='us-east-1', aws_access_key_id='alternator', aws_secret_access_key='secret_pass',
|
||||
config=boto_config.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300)))
|
||||
|
||||
def new_dynamodb_session(request, dynamodb):
|
||||
ses = boto3.Session()
|
||||
host = urlparse(dynamodb.meta.client._endpoint.host)
|
||||
conf = botocore.client.Config(parameter_validation=False)
|
||||
if request.config.getoption('aws'):
|
||||
return boto3.resource('dynamodb', config=conf)
|
||||
if host.hostname == 'localhost':
|
||||
conf = conf.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300))
|
||||
return ses.resource('dynamodb', endpoint_url=dynamodb.meta.client._endpoint.host, verify=host.scheme != 'http',
|
||||
region_name='us-east-1', aws_access_key_id='alternator', aws_secret_access_key='secret_pass',
|
||||
config=conf)
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def dynamodbstreams(request):
|
||||
# Disable boto3's client-side validation of parameters. This validation
|
||||
|
||||
@@ -8,8 +8,12 @@
|
||||
|
||||
import pytest
|
||||
import random
|
||||
from botocore.exceptions import ClientError
|
||||
from botocore.exceptions import ClientError, HTTPClientError
|
||||
from util import random_string, full_scan, full_query, multiset, scylla_inject_error
|
||||
import urllib3
|
||||
import traceback
|
||||
import sys
|
||||
from conftest import new_dynamodb_session
|
||||
|
||||
# Test ensuring that items inserted by a batched statement can be properly extracted
|
||||
# via GetItem. Schema has both hash and sort keys.
|
||||
@@ -358,6 +362,43 @@ def test_batch_write_item_large(test_table_sn):
|
||||
assert full_query(test_table_sn, KeyConditionExpression='p=:p', ExpressionAttributeValues={':p': p}
|
||||
) == [{'p': p, 'c': i, 'content': long_content} for i in range(25)]
|
||||
|
||||
# Test if client breaking connection during HTTP response
|
||||
# streaming doesn't break the server.
|
||||
def test_batch_write_item_large_broken_connection(test_table_sn, request, dynamodb):
|
||||
fn_name = sys._getframe().f_code.co_name
|
||||
ses = new_dynamodb_session(request, dynamodb)
|
||||
|
||||
p = random_string()
|
||||
long_content = random_string(100)*500
|
||||
write_reply = test_table_sn.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_sn.name: [{'PutRequest': {'Item': {'p': p, 'c': i, 'content': long_content}}} for i in range(25)],
|
||||
})
|
||||
assert 'UnprocessedItems' in write_reply and write_reply['UnprocessedItems'] == dict()
|
||||
|
||||
read_fun = urllib3.HTTPResponse.read_chunked
|
||||
triggered = False
|
||||
def broken_read_fun(self, amt=None, decode_content=None):
|
||||
ret = read_fun(self, amt, decode_content)
|
||||
st = traceback.extract_stack()
|
||||
# Try to not disturb other tests if executed in parallel
|
||||
if fn_name in str(st):
|
||||
self._fp.fp.raw.close() # close the socket
|
||||
nonlocal triggered
|
||||
triggered = True
|
||||
return ret
|
||||
urllib3.HTTPResponse.read_chunked = broken_read_fun
|
||||
|
||||
try:
|
||||
# This disruption doesn't always work so we repeat it.
|
||||
for _ in range(1, 20):
|
||||
with pytest.raises(HTTPClientError):
|
||||
# Our monkey patched read_chunked function will make client unusable
|
||||
# so we need to use separate session so that it doesn't affect other tests.
|
||||
ses.meta.client.query(TableName=test_table_sn.name, KeyConditionExpression='p=:p', ExpressionAttributeValues={':p': p})
|
||||
assert triggered
|
||||
finally:
|
||||
urllib3.HTTPResponse.read_chunked = read_fun
|
||||
|
||||
# DynamoDB limits the number of items written by a BatchWriteItem operation
|
||||
# to 25, even if they are small. Exceeding this limit results in a
|
||||
# ValidationException error - and none of the items in the batch are written.
|
||||
|
||||
@@ -1092,6 +1092,65 @@ SEASTAR_THREAD_TEST_CASE(max_result_size_for_unlimited_query_selection_test) {
|
||||
}, std::move(cfg)).get();
|
||||
}
|
||||
|
||||
// Check that during a multi-page range scan:
|
||||
// * semaphore mismatch is detected
|
||||
// * code is exception safe w.r.t. to the mismatch exception, e.g. readers are closed properly
|
||||
SEASTAR_TEST_CASE(multipage_range_scan_semaphore_mismatch) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
const auto do_abort = set_abort_on_internal_error(false);
|
||||
auto reset_abort = defer([do_abort] {
|
||||
set_abort_on_internal_error(do_abort);
|
||||
});
|
||||
e.execute_cql("CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
|
||||
|
||||
auto insert_id = e.prepare("INSERT INTO ks.tbl(pk, ck, v) VALUES(?, ?, ?)").get();
|
||||
|
||||
auto& db = e.local_db();
|
||||
auto& tbl = db.find_column_family("ks", "tbl");
|
||||
auto s = tbl.schema();
|
||||
|
||||
auto dk = tests::generate_partition_key(tbl.schema());
|
||||
const auto pk = cql3::raw_value::make_value(managed_bytes(*dk.key().begin(*s)));
|
||||
const auto v = cql3::raw_value::make_value(int32_type->decompose(0));
|
||||
for (int32_t ck = 0; ck < 100; ++ck) {
|
||||
e.execute_prepared(insert_id, {pk, cql3::raw_value::make_value(int32_type->decompose(ck)), v}).get();
|
||||
}
|
||||
|
||||
auto sched_groups = get_scheduling_groups().get();
|
||||
|
||||
query::read_command cmd1(
|
||||
s->id(),
|
||||
s->version(),
|
||||
s->full_slice(),
|
||||
query::max_result_size(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max()),
|
||||
query::tombstone_limit::max,
|
||||
query::row_limit(4),
|
||||
query::partition_limit::max,
|
||||
gc_clock::now(),
|
||||
std::nullopt,
|
||||
query_id::create_random_id(),
|
||||
query::is_first_page::yes);
|
||||
|
||||
auto cmd2 = cmd1;
|
||||
auto cr = query::clustering_range::make_starting_with({clustering_key::from_single_value(*s, int32_type->decompose(3)), false});
|
||||
cmd2.slice = partition_slice_builder(*s).set_specific_ranges(query::specific_ranges(dk.key(), {cr})).build();
|
||||
cmd2.is_first_page = query::is_first_page::no;
|
||||
|
||||
auto pr = dht::partition_range::make_starting_with({dk, true});
|
||||
auto prs = dht::partition_range_vector{pr};
|
||||
|
||||
auto read_page = [&] (scheduling_group sg, const query::read_command& cmd) {
|
||||
with_scheduling_group(sg, [&] {
|
||||
return query_data_on_all_shards(e.db(), s, cmd, prs, query::result_options::only_result(), {}, db::no_timeout);
|
||||
}).get();
|
||||
};
|
||||
|
||||
read_page(default_scheduling_group(), cmd1);
|
||||
BOOST_REQUIRE_EXCEPTION(read_page(sched_groups.statement_scheduling_group, cmd2), std::runtime_error,
|
||||
testing::exception_predicate::message_contains("looked-up reader belongs to different semaphore than the one appropriate for this query class:"));
|
||||
});
|
||||
}
|
||||
|
||||
// Test `upgrade_sstables` on all keyspaces (including the system keyspace).
|
||||
// Refs: #9494 (https://github.com/scylladb/scylla/issues/9494)
|
||||
SEASTAR_TEST_CASE(upgrade_sstables) {
|
||||
|
||||
@@ -670,6 +670,50 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping
|
||||
assertions.produces_end_of_stream();
|
||||
}
|
||||
|
||||
class selector_of_empty_readers : public reader_selector {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
size_t _remaining;
|
||||
public:
|
||||
selector_of_empty_readers(schema_ptr s, reader_permit permit, size_t count)
|
||||
: reader_selector(s, dht::ring_position_view::min())
|
||||
, _schema(s)
|
||||
, _permit(std::move(permit))
|
||||
, _remaining(count) {
|
||||
}
|
||||
virtual std::vector<flat_mutation_reader_v2> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
|
||||
if (_remaining == 0) {
|
||||
return {};
|
||||
}
|
||||
--_remaining;
|
||||
std::vector<flat_mutation_reader_v2> ret;
|
||||
ret.push_back(make_empty_flat_reader_v2(_schema, _permit));
|
||||
return ret;
|
||||
}
|
||||
virtual std::vector<flat_mutation_reader_v2> fast_forward_to(const dht::partition_range& pr) override {
|
||||
assert(false); // Fast forward not supported by this reader
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
// Reproduces scylladb/scylladb#14415
|
||||
SEASTAR_THREAD_TEST_CASE(test_combined_reader_with_incrementally_opened_empty_readers) {
|
||||
static constexpr size_t empty_reader_count = 10 * 1000;
|
||||
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto reader = make_combined_reader(s.schema(), permit,
|
||||
std::make_unique<selector_of_empty_readers>(s.schema(), permit, empty_reader_count),
|
||||
streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding::no);
|
||||
|
||||
// Expect that the reader won't produce a stack overflow
|
||||
assert_that(std::move(reader))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(combined_mutation_reader_test) {
|
||||
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
|
||||
simple_schema s;
|
||||
@@ -3666,9 +3710,13 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) {
|
||||
auto stop_rd = deferred_close(rd);
|
||||
rd.set_max_buffer_size(max_buf_size);
|
||||
|
||||
// #13491 - the reader must not consume the entire partition but a small batch of fragments based on the buffer size.
|
||||
rd.fill_buffer().get();
|
||||
rd.fill_buffer().get();
|
||||
auto buf1 = rd.detach_buffer();
|
||||
BOOST_REQUIRE_EQUAL(buf1.size(), 3);
|
||||
// There should be 6-7 fragments, but to avoid computing the exact number of fragments that should fit in `max_buf_size`,
|
||||
// just ensure that there are <= 10 (consuming the whole partition would give ~1000 fragments).
|
||||
BOOST_REQUIRE_LE(buf1.size(), 10);
|
||||
}
|
||||
|
||||
struct mutation_bounds {
|
||||
|
||||
@@ -687,14 +687,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
auto expiry_2 = now + ttl_2;
|
||||
|
||||
auto assert_order = [] (atomic_cell_view first, atomic_cell_view second) {
|
||||
if (compare_atomic_cell_for_merge(first, second) >= 0) {
|
||||
testlog.trace("Expected {} < {}", first, second);
|
||||
abort();
|
||||
}
|
||||
if (compare_atomic_cell_for_merge(second, first) <= 0) {
|
||||
testlog.trace("Expected {} < {}", second, first);
|
||||
abort();
|
||||
}
|
||||
testlog.trace("Expected {} < {}", first, second);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(first, second) < 0);
|
||||
|
||||
testlog.trace("Expected {} > {}", second, first);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(second, first) > 0);
|
||||
};
|
||||
|
||||
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
|
||||
@@ -703,18 +700,27 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
|
||||
};
|
||||
|
||||
testlog.debug("Live cells with same value are equal");
|
||||
assert_equal(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value")),
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value")));
|
||||
|
||||
testlog.debug("Non-expiring live cells are ordered before expiring cells");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value")),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
|
||||
|
||||
testlog.debug("Non-expiring live cells are ordered before expiring cells, regardless of their value");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value2")),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value1"), expiry_1, ttl_1));
|
||||
|
||||
testlog.debug("Dead cells with same expiry are equal");
|
||||
assert_equal(
|
||||
atomic_cell::make_dead(1, expiry_1),
|
||||
atomic_cell::make_dead(1, expiry_1));
|
||||
|
||||
testlog.debug("Non-expiring live cells are ordered before expiring cells, with empty value");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes()),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
|
||||
@@ -722,49 +728,57 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
// Origin doesn't compare ttl (is it wise?)
|
||||
// But we do. See https://github.com/scylladb/scylla/issues/10156
|
||||
// and https://github.com/scylladb/scylla/issues/10173
|
||||
testlog.debug("Expiring cells with higher ttl are ordered before expiring cells with smaller ttl and same expiry time");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
|
||||
|
||||
testlog.debug("Cells are ordered by value if all else is equal");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value2")));
|
||||
|
||||
testlog.debug("Cells are ordered by value in lexicographical order if all else is equal");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value12")),
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value2")));
|
||||
|
||||
// Live cells are ordered first by timestamp...
|
||||
testlog.debug("Live cells are ordered first by timestamp...");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value2")),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value1")));
|
||||
|
||||
// ..then by value
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value1"), expiry_2, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value2"), expiry_1, ttl_1));
|
||||
|
||||
// ..then by expiry
|
||||
testlog.debug("...then by expiry");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_1, ttl_1),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_1));
|
||||
|
||||
// Dead wins
|
||||
testlog.debug("...then by ttl (in reverse)");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_1, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_1, ttl_1));
|
||||
|
||||
testlog.debug("...then by value");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value1"), expiry_1, ttl_1),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value2"), expiry_1, ttl_1));
|
||||
|
||||
testlog.debug("Dead wins");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value")),
|
||||
atomic_cell::make_dead(1, expiry_1));
|
||||
|
||||
// Dead wins with expiring cell
|
||||
testlog.debug("Dead wins with expiring cell");
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_2, ttl_2),
|
||||
atomic_cell::make_dead(1, expiry_1));
|
||||
|
||||
// Deleted cells are ordered first by timestamp
|
||||
testlog.debug("Deleted cells are ordered first by timestamp...");
|
||||
assert_order(
|
||||
atomic_cell::make_dead(1, expiry_2),
|
||||
atomic_cell::make_dead(2, expiry_1));
|
||||
|
||||
// ...then by expiry
|
||||
testlog.debug("...then by expiry");
|
||||
assert_order(
|
||||
atomic_cell::make_dead(1, expiry_1),
|
||||
atomic_cell::make_dead(1, expiry_2));
|
||||
@@ -3013,6 +3027,75 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) {
|
||||
}
|
||||
}
|
||||
|
||||
// Tests mutation_rebuilder_v2::flush().
|
||||
SEASTAR_THREAD_TEST_CASE(test_mutation_rebuilder_v2_flush) {
|
||||
simple_schema ss;
|
||||
schema_ptr s = ss.schema();
|
||||
auto pk = ss.make_pkey();
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto p = semaphore.make_permit();
|
||||
|
||||
// Main idea of the test: we prepare a stream with all "interesting"
|
||||
// situations (with respect to positions), for example:
|
||||
// - RTC right before and after a key
|
||||
// - Overlapping RTCs
|
||||
// - Keys without a RTC in between, but with an active RTC from before
|
||||
// - Keys without a RTC in between, but without an active RTC from before
|
||||
// etc.
|
||||
//
|
||||
// Then we pass this stream through mutation_rebuilder_v2 with two flushes
|
||||
// in between (on all possible positions), and check that the result is
|
||||
// the same as without flushes.
|
||||
auto frags = std::vector<mutation_fragment_v2>();
|
||||
frags.emplace_back(*s, p, partition_start(pk, {}));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_all_clustered_rows(), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(0)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(1)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(1)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(2)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(3)), tombstone{}));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(3)));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(4)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(4)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(5)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(5)));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(6)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_all_clustered_rows(), tombstone{}));
|
||||
frags.emplace_back(*s, p, partition_end());
|
||||
|
||||
mutation_rebuilder_v2 rebuilder_without_flush(s);
|
||||
for (int i = 0; i < frags.size(); ++i) {
|
||||
rebuilder_without_flush.consume(mutation_fragment_v2(*s, p, frags[i]));
|
||||
}
|
||||
auto m_expected = std::move(*rebuilder_without_flush.consume_end_of_stream());
|
||||
|
||||
// We do two flushes (we test all possible combinations of their positions,
|
||||
// including no flush).
|
||||
// This is to test that the first flush doesn't break the rebuilder in
|
||||
// a way that prevents another flush.
|
||||
for (int first_flush = 0; first_flush < frags.size(); ++first_flush) {
|
||||
for (int second_flush = first_flush; second_flush < frags.size(); ++second_flush) {
|
||||
mutation_rebuilder_v2 rebuilder(s);
|
||||
auto m1 = mutation(s, pk); // Contents of flush 1.
|
||||
auto m2 = mutation(s, pk); // Contents of flush 2.
|
||||
auto m3 = mutation(s, pk); // Contents of final flush.
|
||||
for (int i = 0; i < frags.size(); ++i) {
|
||||
rebuilder.consume(mutation_fragment_v2(*s, p, frags[i]));
|
||||
if (i == first_flush) {
|
||||
m1 = rebuilder.flush();
|
||||
}
|
||||
if (i == second_flush) {
|
||||
m2 = rebuilder.flush();
|
||||
}
|
||||
}
|
||||
m3 = std::move(*rebuilder.consume_end_of_stream());
|
||||
assert_that(m1 + m2 + m3).is_equal_to(m_expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(mutation_with_dummy_clustering_row_is_consumed_monotonically) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
|
||||
@@ -5058,3 +5058,99 @@ SEASTAR_TEST_CASE(compaction_optimization_to_avoid_bloom_filter_checks) {
|
||||
BOOST_REQUIRE_EQUAL(1, result.stats.bloom_filter_checks);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_gc_grace_seconds(10000);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::leveled);
|
||||
std::map<sstring, sstring> opts = {
|
||||
{ "sstable_size_in_mb", "0" }, // makes sure that every mutation produces one fragment, to trigger incremental compaction
|
||||
};
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
auto s = builder.build();
|
||||
auto sst_gen = env.make_sst_factory(s);
|
||||
|
||||
auto make_insert = [&] (partition_key key) {
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), api::new_timestamp());
|
||||
return m;
|
||||
};
|
||||
|
||||
std::vector<utils::observer<sstable&>> observers;
|
||||
std::vector<shared_sstable> ssts;
|
||||
size_t sstables_closed = 0;
|
||||
size_t sstables_closed_during_cleanup = 0;
|
||||
static constexpr size_t sstables_nr = 10;
|
||||
|
||||
dht::token_range_vector owned_token_ranges;
|
||||
|
||||
std::set<mutation, mutation_decorated_key_less_comparator> merged;
|
||||
for (auto i = 0; i < sstables_nr * 2; i++) {
|
||||
merged.insert(make_insert(partition_key::from_exploded(*s, {to_bytes(to_sstring(i))})));
|
||||
}
|
||||
|
||||
std::unordered_set<sstables::generation_type> gens; // input sstable generations
|
||||
run_id run_identifier = run_id::create_random_id();
|
||||
auto merged_it = merged.begin();
|
||||
for (auto i = 0; i < sstables_nr; i++) {
|
||||
auto mut1 = std::move(*merged_it);
|
||||
merged_it++;
|
||||
auto mut2 = std::move(*merged_it);
|
||||
merged_it++;
|
||||
auto sst = make_sstable_containing(sst_gen, {
|
||||
std::move(mut1),
|
||||
std::move(mut2)
|
||||
});
|
||||
sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run.
|
||||
sst->set_sstable_level(1);
|
||||
|
||||
// every sstable will be eligible for cleanup, by having both an owned and unowned token.
|
||||
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));
|
||||
|
||||
gens.insert(sst->generation());
|
||||
ssts.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
size_t last_input_sstable_count = sstables_nr;
|
||||
{
|
||||
auto t = env.make_table_for_tests(s);
|
||||
auto stop = deferred_stop(t);
|
||||
t->disable_auto_compaction().get();
|
||||
const dht::token_range_vector empty_owned_ranges;
|
||||
for (auto&& sst : ssts) {
|
||||
testlog.info("run id {}", sst->run_identifier());
|
||||
column_family_test(t).add_sstable(sst).get();
|
||||
column_family_test::update_sstables_known_generation(*t, sst->generation());
|
||||
observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable {
|
||||
auto sstables = t->get_sstables();
|
||||
auto input_sstable_count = std::count_if(sstables->begin(), sstables->end(), [&] (const shared_sstable& sst) {
|
||||
return gens.count(sst->generation());
|
||||
});
|
||||
|
||||
testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), input_sstable_count);
|
||||
sstables_closed++;
|
||||
if (input_sstable_count < last_input_sstable_count) {
|
||||
sstables_closed_during_cleanup++;
|
||||
last_input_sstable_count = input_sstable_count;
|
||||
}
|
||||
}));
|
||||
}
|
||||
ssts = {}; // releases references
|
||||
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
|
||||
t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get();
|
||||
testlog.info("Cleanup has finished");
|
||||
}
|
||||
|
||||
while (sstables_closed != sstables_nr) {
|
||||
yield().get();
|
||||
}
|
||||
|
||||
testlog.info("Closed sstables {}, Closed during cleanup {}", sstables_closed, sstables_closed_during_cleanup);
|
||||
|
||||
BOOST_REQUIRE(sstables_closed == sstables_nr);
|
||||
BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -269,7 +269,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_reversing_reader_random_schema) {
|
||||
streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
close_r1.cancel();
|
||||
|
||||
compare_readers(*query_schema, std::move(r1), std::move(r2));
|
||||
compare_readers(*query_schema, std::move(r1), std::move(r2), true);
|
||||
}
|
||||
|
||||
auto r1 = source.make_reader_v2(query_schema, semaphore.make_permit(), prange,
|
||||
|
||||
@@ -103,3 +103,33 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_split_stats) {
|
||||
auto ep1 = gms::inet_address("127.0.0.1");
|
||||
auto sg1 = create_scheduling_group("apa1", 100).get();
|
||||
auto sg2 = create_scheduling_group("apa2", 100).get();
|
||||
|
||||
std::optional<service::storage_proxy_stats::split_stats> stats1, stats2;
|
||||
|
||||
// pretending to be abstract_write_response_handler type.
|
||||
// created in various scheduling groups, in which they
|
||||
// instantiate group-local split_stats.
|
||||
with_scheduling_group(sg1, [&] {
|
||||
stats1.emplace("tuta", "nils", "en nils", "nilsa", true);
|
||||
}).get0();
|
||||
|
||||
with_scheduling_group(sg2, [&] {
|
||||
stats2.emplace("tuta", "nils", "en nils", "nilsa", true);
|
||||
}).get0();
|
||||
|
||||
// simulating the calling of storage_proxy::on_down, from gossip
|
||||
// on node dropping out. If inside a write operation, we'll pick up
|
||||
// write handlers and to "timeout_cb" on them, which in turn might
|
||||
// call get_ep_stat, which evenually calls register_metrics for
|
||||
// the DC written to.
|
||||
// Point being is that either the above should not happen, or
|
||||
// split_stats should be resilient to being called from different
|
||||
// scheduling group.
|
||||
stats1->register_metrics_for("DC1", ep1);
|
||||
stats2->register_metrics_for("DC1", ep1);
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/key_utils.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
#include "test/lib/mutation_assertions.hh"
|
||||
#include "utils/ranges.hh"
|
||||
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
@@ -743,23 +745,25 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
total_rows,
|
||||
_buffer_rows);
|
||||
|
||||
BOOST_REQUIRE(current_rows);
|
||||
BOOST_REQUIRE(!mut.partition().empty());
|
||||
BOOST_REQUIRE(current_rows <= _max_rows_hard);
|
||||
BOOST_REQUIRE(_buffer_rows <= _max_rows_hard);
|
||||
|
||||
// The current partition doesn't have all of its rows yet, verify
|
||||
// that the new mutation contains the next rows for the same
|
||||
// partition
|
||||
if (!_collected_muts.empty() && rows_in_mut(_collected_muts.back()) < _partition_rows.at(_collected_muts.back().decorated_key())) {
|
||||
BOOST_REQUIRE(_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
|
||||
const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key();
|
||||
const auto& next_ckey = mut.partition().clustered_rows().begin()->key();
|
||||
BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey));
|
||||
if (!_collected_muts.empty() && _collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key())) {
|
||||
if (rows_in_mut(_collected_muts.back()) && rows_in_mut(mut)) {
|
||||
const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key();
|
||||
const auto& next_ckey = mut.partition().clustered_rows().begin()->key();
|
||||
BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey));
|
||||
}
|
||||
mutation_application_stats stats;
|
||||
_collected_muts.back().partition().apply(*_schema, mut.partition(), *mut.schema(), stats);
|
||||
// The new mutation is a new partition.
|
||||
} else {
|
||||
if (!_collected_muts.empty()) {
|
||||
BOOST_REQUIRE(rows_in_mut(_collected_muts.back()) == _partition_rows.at(_collected_muts.back().decorated_key()));
|
||||
BOOST_REQUIRE(!_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
|
||||
}
|
||||
_collected_muts.push_back(std::move(mut));
|
||||
@@ -783,8 +787,8 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
, _rl(std::make_unique<row_locker>(_schema))
|
||||
, _rl_stats(std::make_unique<row_locker::stats>())
|
||||
, _less_cmp(*_schema)
|
||||
, _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit))
|
||||
, _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit))
|
||||
, _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit_default))
|
||||
, _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit_default))
|
||||
, _ok(ok)
|
||||
{ }
|
||||
|
||||
@@ -828,6 +832,8 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
for (auto ck = 0; ck < partition_size_100kb; ++ck) {
|
||||
mut_desc.add_clustered_cell({int32_type->decompose(data_value(ck))}, "v", tests::data_model::mutation_description::value(blob_100kb));
|
||||
}
|
||||
// Reproduces #14503
|
||||
mut_desc.add_range_tombstone(nonwrapping_range<tests::data_model::mutation_description::key>::make_open_ended_both_sides());
|
||||
muts.push_back(mut_desc.build(schema));
|
||||
partition_rows.emplace(muts.back().decorated_key(), partition_size_100kb);
|
||||
}
|
||||
@@ -884,3 +890,96 @@ SEASTAR_TEST_CASE(test_load_view_build_progress_with_values_missing) {
|
||||
BOOST_REQUIRE(e.get_system_keyspace().local().load_view_build_progress().get0().empty());
|
||||
});
|
||||
}
|
||||
|
||||
// A random mutation test for view_updating_consumer's buffering logic.
|
||||
// Passes random mutations through a view_updating_consumer with a extremely
|
||||
// small buffer, which should cause a buffer flush after every mutation fragment.
|
||||
// Should check that flushing works correctly in every position, and regardless
|
||||
// of the last fragment and the last range tombstone change,
|
||||
//
|
||||
// Inspired by #14503.
|
||||
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutations) {
|
||||
// Collects the mutations produced by the tested view_updating_consumer into a vector.
|
||||
class consumer_verifier {
|
||||
schema_ptr _schema;
|
||||
std::vector<mutation>& _collected_muts;
|
||||
std::unique_ptr<row_locker> _rl;
|
||||
std::unique_ptr<row_locker::stats> _rl_stats;
|
||||
bool& _ok;
|
||||
|
||||
private:
|
||||
void check(mutation mut) {
|
||||
BOOST_REQUIRE(!mut.partition().empty());
|
||||
_collected_muts.push_back(std::move(mut));
|
||||
}
|
||||
|
||||
public:
|
||||
consumer_verifier(schema_ptr schema, std::vector<mutation>& collected_muts, bool& ok)
|
||||
: _schema(std::move(schema))
|
||||
, _collected_muts(collected_muts)
|
||||
, _rl(std::make_unique<row_locker>(_schema))
|
||||
, _rl_stats(std::make_unique<row_locker::stats>())
|
||||
, _ok(ok)
|
||||
{ }
|
||||
|
||||
future<row_locker::lock_holder> operator()(mutation mut) {
|
||||
try {
|
||||
check(std::move(mut));
|
||||
} catch (...) {
|
||||
_ok = false;
|
||||
BOOST_FAIL(fmt::format("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception()));
|
||||
}
|
||||
return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats);
|
||||
}
|
||||
};
|
||||
|
||||
// Create a random mutation.
|
||||
// We don't really want a random `mutation`, but a random valid mutation fragment
|
||||
// stream. But I don't know a better way to get that other than to create a random
|
||||
// `mutation` and shove it through readers.
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
mutation mut = gen();
|
||||
schema_ptr schema = gen.schema();
|
||||
|
||||
// Turn the random mutation into a mutation fragment stream,
|
||||
// so it can be fed to the view_updating_consumer.
|
||||
// Quite verbose. Perhaps there exists a simpler way to do this.
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
const abort_source as;
|
||||
auto mt = make_lw_shared<replica::memtable>(schema);
|
||||
mt->apply(mut);
|
||||
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
|
||||
auto p = make_manually_paused_evictable_reader_v2(
|
||||
mt->as_data_source(),
|
||||
schema,
|
||||
permit,
|
||||
query::full_partition_range,
|
||||
schema->full_slice(),
|
||||
service::get_local_streaming_priority(),
|
||||
nullptr,
|
||||
::mutation_reader::forwarding::no);
|
||||
auto& staging_reader = std::get<0>(p);
|
||||
auto& staging_reader_handle = std::get<1>(p);
|
||||
auto close_staging_reader = deferred_close(staging_reader);
|
||||
|
||||
// Feed the random valid mutation fragment stream to the view_updating_consumer,
|
||||
// and collect its outputs.
|
||||
std::vector<mutation> collected_muts;
|
||||
bool ok = true;
|
||||
auto vuc = db::view::view_updating_consumer(schema, permit, as, staging_reader_handle,
|
||||
consumer_verifier(schema, collected_muts, ok));
|
||||
vuc.set_buffer_size_limit_for_testing_purposes(1);
|
||||
staging_reader.consume_in_thread(std::move(vuc));
|
||||
|
||||
// Check that the outputs sum up to the initial mutation.
|
||||
// We could also check that they are non-overlapping, which is
|
||||
// expected from the view_updating_consumer flushes, but it's
|
||||
// not necessary for correctness.
|
||||
BOOST_REQUIRE(ok);
|
||||
mutation total(schema, mut.decorated_key());
|
||||
for (const auto& x : collected_muts) {
|
||||
total += x;
|
||||
}
|
||||
assert_that(total).is_equal_to_compacted(mut);
|
||||
}
|
||||
|
||||
@@ -18,10 +18,20 @@ import time
|
||||
@pytest.fixture(scope="session")
|
||||
def table1(cql, test_keyspace):
|
||||
table = test_keyspace + "." + unique_name()
|
||||
cql.execute(f"CREATE TABLE {table} (k int PRIMARY KEY, v int)")
|
||||
cql.execute(f"CREATE TABLE {table} (k int PRIMARY KEY, v int, w int)")
|
||||
yield table
|
||||
cql.execute("DROP TABLE " + table)
|
||||
|
||||
# sync with wall-clock on exact second so that expiration won't cross the whole-second boundary
|
||||
# 100 milliseconds should be enough to execute 2 inserts at the same second in debug mode
|
||||
# sleep until the next whole second mark if there is not enough time left on the clock
|
||||
def ensure_sync_with_tick(millis = 100):
|
||||
t = time.time()
|
||||
while t - int(t) >= 1 - millis / 1000:
|
||||
time.sleep(1 - (t - int(t)))
|
||||
t = time.time()
|
||||
return t
|
||||
|
||||
# In Cassandra, timestamps can be any *signed* 64-bit integer, not including
|
||||
# the most negative 64-bit integer (-2^63) which for deletion times is
|
||||
# reserved for marking *not deleted* cells.
|
||||
@@ -78,3 +88,151 @@ def test_futuristic_timestamp(cql, table1):
|
||||
print('checking with restrict_future_timestamp=false')
|
||||
cql.execute(f'INSERT INTO {table1} (k, v) VALUES ({p}, 1) USING TIMESTAMP {futuristic_ts}')
|
||||
assert [(futuristic_ts,)] == cql.execute(f'SELECT writetime(v) FROM {table1} where k = {p}')
|
||||
|
||||
def test_rewrite_different_values_using_same_timestamp(cql, table1):
|
||||
"""
|
||||
Rewriting cells more than once with the same timestamp
|
||||
requires tie-breaking to decide which of the cells prevails.
|
||||
When the two inserts are non-expiring or when they have the same expiration time,
|
||||
cells are selected based on the higher value.
|
||||
Otherwise, expiring cells are preferred over non-expiring ones,
|
||||
and if both are expiring, the one with the later expiration time wins.
|
||||
"""
|
||||
table = table1
|
||||
ts = 1000
|
||||
values = [[1, 2], [2, 1]]
|
||||
for i in range(len(values)):
|
||||
v1, v2 = values[i]
|
||||
|
||||
def assert_value(k, expected):
|
||||
select = f"SELECT k, v FROM {table} WHERE k = {k}"
|
||||
res = list(cql.execute(select))
|
||||
assert len(res) == 1
|
||||
assert res[0].v == expected
|
||||
|
||||
# With no TTL, highest value wins
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts}")
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts}")
|
||||
assert_value(k, max(v1, v2))
|
||||
|
||||
# Expiring cells are preferred over non-expiring
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts}")
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts} and TTL 1")
|
||||
assert_value(k, v2)
|
||||
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts} and TTL 1")
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts}")
|
||||
assert_value(k, v1)
|
||||
|
||||
# When both are expiring, the one with the later expiration time wins
|
||||
ensure_sync_with_tick()
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts} and TTL 1")
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts} and TTL 2")
|
||||
assert_value(k, v2)
|
||||
|
||||
ensure_sync_with_tick()
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts} and TTL 2")
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts} and TTL 1")
|
||||
assert_value(k, v1)
|
||||
|
||||
def test_rewrite_different_values_using_same_timestamp_and_expiration(scylla_only, cql, table1):
|
||||
"""
|
||||
Rewriting cells more than once with the same timestamp
|
||||
requires tie-breaking to decide which of the cells prevails.
|
||||
When the two inserts are expiring and have the same expiration time,
|
||||
scylla selects the cells with the lower ttl.
|
||||
"""
|
||||
table = table1
|
||||
ts = 1000
|
||||
values = [[1, 2], [2, 1]]
|
||||
for i in range(len(values)):
|
||||
v1, v2 = values[i]
|
||||
|
||||
def assert_value(k, expected):
|
||||
select = f"SELECT k, v FROM {table} WHERE k = {k}"
|
||||
res = list(cql.execute(select))
|
||||
assert len(res) == 1
|
||||
assert res[0].v == expected
|
||||
|
||||
# When both have the same expiration, the one with the lower TTL wins (as it has higher derived write time = expiration - ttl)
|
||||
ensure_sync_with_tick()
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts} and TTL 3")
|
||||
time.sleep(1)
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts} and TTL 2")
|
||||
assert_value(k, v2)
|
||||
|
||||
def test_rewrite_using_same_timestamp_select_after_expiration(cql, table1):
|
||||
"""
|
||||
Reproducer for https://github.com/scylladb/scylladb/issues/14182
|
||||
|
||||
Rewrite a cell using the same timestamp and ttl.
|
||||
Due to #14182, after the first insert expires,
|
||||
the first write would have been selected when it has a lexicographically larger
|
||||
value, and that results in a null value in the select query result.
|
||||
With the fix, we expect to get the cell with the higher expiration time.
|
||||
"""
|
||||
table = table1
|
||||
ts = 1000
|
||||
values = [[2, 1], [1, 2]]
|
||||
for i in range(len(values)):
|
||||
v1, v2 = values[i]
|
||||
|
||||
def assert_value(k, expected):
|
||||
select = f"SELECT k, v FROM {table} WHERE k = {k}"
|
||||
res = list(cql.execute(select))
|
||||
assert len(res) == 1
|
||||
assert res[0].v == expected
|
||||
|
||||
ensure_sync_with_tick()
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v1}) USING TIMESTAMP {ts} AND TTL 1")
|
||||
cql.execute(f"INSERT INTO {table} (k, v) VALUES ({k}, {v2}) USING TIMESTAMP {ts} AND TTL 2")
|
||||
|
||||
# wait until first insert expires, and expect 2nd value.
|
||||
# Null value was returned due to #14182 when v1 > v2
|
||||
time.sleep(1)
|
||||
assert_value(k, v2)
|
||||
|
||||
def test_rewrite_multiple_cells_using_same_timestamp(cql, table1):
|
||||
"""
|
||||
Reproducer for https://github.com/scylladb/scylladb/issues/14182:
|
||||
|
||||
Inserts multiple cells in two insert queries that use the same timestamp and different expiration.
|
||||
Due to #14182, the select query result contained a mixture
|
||||
of the inserts that is based on the value in each cell,
|
||||
rather than on the (different) expiration times on the
|
||||
two inserts.
|
||||
"""
|
||||
table = table1
|
||||
ts = 1000
|
||||
ttl1 = 10
|
||||
ttl2 = 20
|
||||
values = [{'v':1, 'w':2}, {'v':2, 'w':1}]
|
||||
|
||||
def assert_values(k, expected):
|
||||
select = f"SELECT * FROM {table} WHERE k = {k}"
|
||||
res = list(cql.execute(select))
|
||||
assert len(res) == 1
|
||||
assert res[0].k == k and res[0].v == expected['v'] and res[0].w == expected['w']
|
||||
|
||||
# rewrite values once with and once without TTL
|
||||
# if reconciliation is done by value, the result will be a mix of the two writes
|
||||
# while if reconciliation is based first on the expiration time, the second write should prevail.
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v, w) VALUES ({k}, {values[0]['v']}, {values[0]['w']}) USING TIMESTAMP {ts} AND TTL {ttl1}")
|
||||
cql.execute(f"INSERT INTO {table} (k, v, w) VALUES ({k}, {values[1]['v']}, {values[1]['w']}) USING TIMESTAMP {ts}")
|
||||
assert_values(k, values[0])
|
||||
|
||||
# rewrite values using the same write time and different ttls, so they get different expiration times
|
||||
# if reconciliation is done by value, the result will be a mix of the two writes
|
||||
# while if reconciliation is based first on the expiration time, the second write should prevail.
|
||||
k = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table} (k, v, w) VALUES ({k}, {values[0]['v']}, {values[0]['w']}) USING TIMESTAMP {ts} AND TTL {ttl1}")
|
||||
cql.execute(f"INSERT INTO {table} (k, v, w) VALUES ({k}, {values[1]['v']}, {values[1]['w']}) USING TIMESTAMP {ts} AND TTL {ttl2}")
|
||||
assert_values(k, values[1])
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
import pytest
|
||||
import time
|
||||
from cassandra.protocol import SyntaxException, InvalidRequest, Unauthorized, ConfigurationException
|
||||
from cassandra.protocol import SyntaxException, InvalidRequest, Unauthorized
|
||||
from util import new_test_table, new_function, new_user, new_session, new_test_keyspace, unique_name, new_type
|
||||
|
||||
# Test that granting permissions to various resources works for the default user.
|
||||
@@ -149,7 +149,7 @@ def test_udf_permissions_serialization(cql):
|
||||
for permission in permissions:
|
||||
cql.execute(f"GRANT {permission} ON {resource} TO {user}")
|
||||
|
||||
permissions = {row.resource: row.permissions for row in cql.execute(f"SELECT * FROM system_auth.role_permissions WHERE role = '{user}'")}
|
||||
permissions = {row.resource: row.permissions for row in cql.execute(f"SELECT * FROM system_auth.role_permissions")}
|
||||
assert permissions['functions'] == set(['ALTER', 'AUTHORIZE', 'CREATE', 'DROP', 'EXECUTE'])
|
||||
assert permissions[f'functions/{keyspace}'] == set(['ALTER', 'AUTHORIZE', 'CREATE', 'DROP', 'EXECUTE'])
|
||||
assert permissions[f'functions/{keyspace}/{div_fun}[org.apache.cassandra.db.marshal.LongType^org.apache.cassandra.db.marshal.Int32Type]'] == set(['ALTER', 'AUTHORIZE', 'DROP', 'EXECUTE'])
|
||||
@@ -199,9 +199,9 @@ def test_drop_udf_with_same_name(cql):
|
||||
schema = "a int primary key"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
body1_lua = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
|
||||
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return Long.valueOf(42);'"
|
||||
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42;'"
|
||||
body2_lua = "(i int, j int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
|
||||
body2_java = "(i int, j int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return Long.valueOf(42);'"
|
||||
body2_java = "(i int, j int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42;'"
|
||||
body1 = body1_lua
|
||||
body2 = body2_lua
|
||||
try:
|
||||
@@ -216,16 +216,17 @@ def test_drop_udf_with_same_name(cql):
|
||||
with new_user(cql) as username:
|
||||
with new_session(cql, username) as user_session:
|
||||
grant(cql, 'DROP', f'FUNCTION {keyspace}.{fun}(int)', username)
|
||||
with pytest.raises((InvalidRequest, Unauthorized)):
|
||||
with pytest.raises(InvalidRequest):
|
||||
user_session.execute(f"DROP FUNCTION {keyspace}.{fun}")
|
||||
eventually_unauthorized(lambda: user_session.execute(f"DROP FUNCTION {keyspace}.{fun}(int, int)"))
|
||||
grant(cql, 'DROP', f'FUNCTION {keyspace}.{fun}(int, int)', username)
|
||||
with pytest.raises((InvalidRequest, Unauthorized)):
|
||||
with pytest.raises(InvalidRequest):
|
||||
user_session.execute(f"DROP FUNCTION {keyspace}.{fun}")
|
||||
eventually_authorized(lambda: user_session.execute(f"DROP FUNCTION {keyspace}.{fun}(int)"))
|
||||
eventually_authorized(lambda: user_session.execute(f"DROP FUNCTION {keyspace}.{fun}"))
|
||||
|
||||
# Test that permissions set for user-defined functions are enforced
|
||||
# Tests for ALTER are separate, because they are qualified as cassandra_bug
|
||||
def test_grant_revoke_udf_permissions(cql):
|
||||
schema = "a int primary key, b list<int>"
|
||||
user = "cassandra"
|
||||
@@ -269,19 +270,40 @@ def test_grant_revoke_udf_permissions(cql):
|
||||
for resource in [f'function {keyspace}.{fun}(int, list<int>)', f'all functions in keyspace {keyspace}', 'all functions']:
|
||||
check_enforced(cql, username, permission='AUTHORIZE', resource=resource, function=grant_idempotent)
|
||||
|
||||
# This test case is artificially extracted from the one above,
|
||||
# because it's qualified as cassandra_bug - the documentation quotes that ALTER is needed on
|
||||
# functions if the definition is replaced (CREATE OR REPLACE FUNCTION (...)),
|
||||
# and yet it's not enforced
|
||||
def test_grant_revoke_alter_udf_permissions(cassandra_bug, cql):
|
||||
schema = "a int primary key"
|
||||
user = "cassandra"
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
fun_body_java = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE java AS 'return 42;'"
|
||||
fun_body = fun_body_lua
|
||||
try:
|
||||
with new_function(cql, keyspace, fun_body) as fun:
|
||||
pass
|
||||
except:
|
||||
fun_body = fun_body_java
|
||||
with new_user(cql) as username:
|
||||
with new_session(cql, username) as user_session:
|
||||
fun = "fun42"
|
||||
|
||||
grant(cql, 'ALTER', 'ALL FUNCTIONS', username)
|
||||
check_enforced(cql, username, permission='CREATE', resource=f'all functions in keyspace {keyspace}',
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{unique_name()} {fun_body}"))
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{fun} {fun_body}"))
|
||||
check_enforced(cql, username, permission='CREATE', resource='all functions',
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{unique_name()} {fun_body}"))
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{fun} {fun_body}"))
|
||||
revoke(cql, 'ALTER', 'ALL FUNCTIONS', username)
|
||||
|
||||
cql.execute(f"CREATE FUNCTION IF NOT EXISTS {keyspace}.{fun} {fun_body}")
|
||||
grant(cql, 'CREATE', 'ALL FUNCTIONS', username)
|
||||
check_enforced(cql, username, permission='ALTER', resource=f'all functions in keyspace {keyspace}',
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{fun} {fun_body}"))
|
||||
check_enforced(cql, username, permission='ALTER', resource='all functions',
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{fun} {fun_body}"))
|
||||
check_enforced(cql, username, permission='ALTER', resource=f'FUNCTION {keyspace}.{fun}(int, list<int>)',
|
||||
check_enforced(cql, username, permission='ALTER', resource=f'FUNCTION {keyspace}.{fun}(int)',
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE FUNCTION {keyspace}.{fun} {fun_body}"))
|
||||
|
||||
# Test that granting permissions on non-existent UDFs fails
|
||||
@@ -293,12 +315,12 @@ def test_grant_perms_on_nonexistent_udf(cql):
|
||||
revoke(cql, 'EXECUTE', 'ALL FUNCTIONS', username)
|
||||
with pytest.raises(InvalidRequest):
|
||||
grant(cql, 'EXECUTE', f'ALL FUNCTIONS IN KEYSPACE {keyspace}', username)
|
||||
with pytest.raises((InvalidRequest, ConfigurationException)):
|
||||
with pytest.raises(InvalidRequest):
|
||||
grant(cql, 'EXECUTE', f'FUNCTION {keyspace}.{fun_name}(int)', username)
|
||||
cql.execute(f"CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}")
|
||||
grant(cql, 'EXECUTE', f'ALL FUNCTIONS IN KEYSPACE {keyspace}', username)
|
||||
revoke(cql, 'EXECUTE', f'ALL FUNCTIONS IN KEYSPACE {keyspace}', username)
|
||||
with pytest.raises((InvalidRequest, SyntaxException)):
|
||||
with pytest.raises(InvalidRequest):
|
||||
grant(cql, 'EXECUTE', f'FUNCTION {keyspace}.{fun_name}(int)', username)
|
||||
|
||||
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
|
||||
@@ -314,24 +336,15 @@ def test_grant_perms_on_nonexistent_udf(cql):
|
||||
cql.execute(f"DROP KEYSPACE IF EXISTS {keyspace}")
|
||||
|
||||
# Test that permissions for user-defined aggregates are also enforced.
|
||||
def test_grant_revoke_uda_permissions(cql):
|
||||
# scylla_only, because Lua is used as the target language
|
||||
def test_grant_revoke_uda_permissions(scylla_only, cql):
|
||||
schema = 'id bigint primary key'
|
||||
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
for i in range(8):
|
||||
cql.execute(f"INSERT INTO {table} (id) VALUES ({10**i})")
|
||||
avg_partial_body_lua = "(state tuple<bigint, bigint>, val bigint) CALLED ON NULL INPUT RETURNS tuple<bigint, bigint> LANGUAGE lua AS 'return {state[1] + val, state[2] + 1}'"
|
||||
avg_partial_body_java = "(state tuple<bigint, bigint>, val bigint) CALLED ON NULL INPUT RETURNS tuple<bigint, bigint> LANGUAGE java AS 'return state.setLong(0, state.getLong(0) + val).setLong(1, state.getLong(1) + Long.valueOf(1));'"
|
||||
div_body_lua = "(state tuple<bigint, bigint>) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return state[1]//state[2]'"
|
||||
div_body_java = "(state tuple<bigint, bigint>) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return state.getLong(0)/state.getLong(1);'"
|
||||
avg_partial_body = avg_partial_body_lua
|
||||
div_body = div_body_lua
|
||||
try:
|
||||
with new_function(cql, keyspace, avg_partial_body):
|
||||
pass
|
||||
except:
|
||||
avg_partial_body = avg_partial_body_java
|
||||
div_body = div_body_java
|
||||
avg_partial_body = "(state tuple<bigint, bigint>, val bigint) CALLED ON NULL INPUT RETURNS tuple<bigint, bigint> LANGUAGE lua AS 'return {state[1] + val, state[2] + 1}'"
|
||||
div_body = "(state tuple<bigint, bigint>) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return state[1]//state[2]'"
|
||||
with new_function(cql, keyspace, avg_partial_body) as avg_partial, new_function(cql, keyspace, div_body) as div_fun:
|
||||
custom_avg_body = f"(bigint) SFUNC {avg_partial} STYPE tuple<bigint, bigint> FINALFUNC {div_fun} INITCOND (0,0)"
|
||||
with new_user(cql) as username:
|
||||
@@ -347,7 +360,7 @@ def test_grant_revoke_uda_permissions(cql):
|
||||
check_enforced(cql, username, permission='CREATE', resource='all functions',
|
||||
function=create_aggr_idempotent)
|
||||
|
||||
cql.execute(f"CREATE AGGREGATE IF NOT EXISTS {keyspace}.{custom_avg} {custom_avg_body}")
|
||||
grant(cql, 'CREATE', 'ALL FUNCTIONS', username)
|
||||
check_enforced(cql, username, permission='ALTER', resource=f'all functions in keyspace {keyspace}',
|
||||
function=lambda: user_session.execute(f"CREATE OR REPLACE AGGREGATE {keyspace}.{custom_avg} {custom_avg_body}"))
|
||||
check_enforced(cql, username, permission='ALTER', resource='all functions',
|
||||
|
||||
@@ -59,10 +59,27 @@ private:
|
||||
continue;
|
||||
}
|
||||
// silently ignore rtcs that don't change anything
|
||||
if (next->is_range_tombstone_change() && next->as_range_tombstone_change().tombstone() == _rt) {
|
||||
testlog.trace("Received spurious closing rtc: {}", mutation_fragment_v2::printer(*_reader.schema(), *next));
|
||||
_reader().get();
|
||||
continue;
|
||||
if (next->is_range_tombstone_change()) {
|
||||
auto rtc_mf = std::move(*_reader().get());
|
||||
auto tomb = rtc_mf.as_range_tombstone_change().tombstone();
|
||||
auto cmp = position_in_partition::tri_compare(*_reader.schema());
|
||||
// squash rtcs with the same pos
|
||||
while (auto next_maybe_rtc = _reader.peek().get0()) {
|
||||
if (next_maybe_rtc->is_range_tombstone_change() && cmp(next_maybe_rtc->position(), rtc_mf.position()) == 0) {
|
||||
testlog.trace("Squashing {} with {}", next_maybe_rtc->as_range_tombstone_change().tombstone(), tomb);
|
||||
tomb = next_maybe_rtc->as_range_tombstone_change().tombstone();
|
||||
_reader().get0();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
rtc_mf.mutate_as_range_tombstone_change(*_reader.schema(), [tomb] (range_tombstone_change& rtc) { rtc.set_tombstone(tomb); });
|
||||
if (tomb == _rt) {
|
||||
testlog.trace("Received spurious rtcs, equivalent to: {}", mutation_fragment_v2::printer(*_reader.schema(), rtc_mf));
|
||||
continue;
|
||||
}
|
||||
_reader.unpop_mutation_fragment(std::move(rtc_mf));
|
||||
next = _reader.peek().get0();
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
@@ -2681,9 +2681,9 @@ static bool compare_readers(const schema& s, flat_mutation_reader_v2& authority,
|
||||
return !empty;
|
||||
}
|
||||
|
||||
void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested) {
|
||||
void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, bool exact) {
|
||||
auto close_authority = deferred_close(authority);
|
||||
auto assertions = assert_that(std::move(tested));
|
||||
auto assertions = assert_that(std::move(tested)).exact(exact);
|
||||
compare_readers(s, authority, assertions);
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ bytes make_blob(size_t blob_size);
|
||||
void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mutation>&,
|
||||
schema_ptr, const std::vector<mutation>&)>);
|
||||
|
||||
void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested);
|
||||
void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, bool exact = false);
|
||||
void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector<position_range>& fwd_ranges);
|
||||
|
||||
// Forward `r` to each range in `fwd_ranges` and consume all fragments produced by `r` in these ranges.
|
||||
|
||||
@@ -119,7 +119,7 @@ class dataset_acceptor {
|
||||
public:
|
||||
virtual ~dataset_acceptor() = default;;
|
||||
virtual bool can_run(dataset&) = 0;
|
||||
virtual void run(replica::column_family& cf, dataset& ds) = 0;
|
||||
virtual void run(app_template &app, replica::column_family& cf, dataset& ds) = 0;
|
||||
};
|
||||
|
||||
struct test_group {
|
||||
@@ -169,7 +169,7 @@ public:
|
||||
// type of its argument.
|
||||
template<typename DataSet>
|
||||
class dataset_acceptor_impl: public dataset_acceptor {
|
||||
using test_fn = void (*)(replica::column_family&, DataSet&);
|
||||
using test_fn = void (*)(app_template&, replica::column_family&, DataSet&);
|
||||
test_fn _fn;
|
||||
private:
|
||||
static DataSet* try_cast(dataset& ds) {
|
||||
@@ -182,13 +182,13 @@ public:
|
||||
return try_cast(ds) != nullptr;
|
||||
}
|
||||
|
||||
void run(replica::column_family& cf, dataset& ds) override {
|
||||
_fn(cf, *try_cast(ds));
|
||||
void run(app_template &app, replica::column_family& cf, dataset& ds) override {
|
||||
_fn(app, cf, *try_cast(ds));
|
||||
}
|
||||
};
|
||||
|
||||
template<typename DataSet>
|
||||
std::unique_ptr<dataset_acceptor> make_test_fn(void (*fn)(replica::column_family&, DataSet&)) {
|
||||
std::unique_ptr<dataset_acceptor> make_test_fn(void (*fn)(app_template &app, replica::column_family&, DataSet&)) {
|
||||
return std::make_unique<dataset_acceptor_impl<DataSet>>(fn);
|
||||
}
|
||||
|
||||
@@ -1210,7 +1210,6 @@ static int count_for_skip_pattern(int n, int n_read, int n_skip) {
|
||||
return n / (n_read + n_skip) * n_read + std::min(n % (n_read + n_skip), n_read);
|
||||
}
|
||||
|
||||
app_template app;
|
||||
bool cancel = false;
|
||||
bool cache_enabled;
|
||||
bool new_test_case = false;
|
||||
@@ -1224,14 +1223,14 @@ void clear_cache() {
|
||||
cql_env->local_db().row_cache_tracker().clear();
|
||||
}
|
||||
|
||||
void on_test_group() {
|
||||
void on_test_group(app_template &app) {
|
||||
if (!app.configuration().contains("keep-cache-across-test-groups")
|
||||
&& !app.configuration().contains("keep-cache-across-test-cases")) {
|
||||
clear_cache();
|
||||
}
|
||||
};
|
||||
|
||||
void on_test_case() {
|
||||
void on_test_case(app_template &app) {
|
||||
new_test_case = true;
|
||||
if (!app.configuration().contains("keep-cache-across-test-cases")) {
|
||||
clear_cache();
|
||||
@@ -1311,11 +1310,11 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void run_test_case(std::function<std::vector<test_result>()> fn) {
|
||||
void run_test_case(app_template &app, std::function<std::vector<test_result>()> fn) {
|
||||
result_collector rc;
|
||||
|
||||
auto do_run = [&] {
|
||||
on_test_case();
|
||||
on_test_case(app);
|
||||
return fn();
|
||||
};
|
||||
|
||||
@@ -1341,13 +1340,13 @@ void run_test_case(std::function<std::vector<test_result>()> fn) {
|
||||
rc.done();
|
||||
}
|
||||
|
||||
void run_test_case(std::function<test_result()> fn) {
|
||||
run_test_case([&] {
|
||||
void run_test_case(app_template &app, std::function<test_result()> fn) {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector { fn() };
|
||||
});
|
||||
}
|
||||
|
||||
void test_large_partition_single_key_slice(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_single_key_slice(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
auto n_rows = ds.n_rows(cfg);
|
||||
int_range live_range = int_range({0}, {n_rows - 1});
|
||||
|
||||
@@ -1361,21 +1360,21 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
return r;
|
||||
};
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
test(int_range::make({0}, {1})),
|
||||
check_no_disk_reads(test(int_range::make({0}, {1}))),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
test(int_range::make({0}, {n_rows / 2})),
|
||||
check_no_disk_reads(test(int_range::make({0}, {n_rows / 2}))),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
test(int_range::make({0}, {n_rows})),
|
||||
check_no_disk_reads(test(int_range::make({0}, {n_rows}))),
|
||||
@@ -1384,35 +1383,35 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
|
||||
assert(n_rows > 200); // assumed below
|
||||
|
||||
run_test_case([&] { // adjacent, no overlap
|
||||
run_test_case(app, [&] { // adjacent, no overlap
|
||||
return test_result_vector {
|
||||
test(int_range::make({1}, {100, false})),
|
||||
test(int_range::make({100}, {109})),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // adjacent, contained
|
||||
run_test_case(app, [&] { // adjacent, contained
|
||||
return test_result_vector {
|
||||
test(int_range::make({1}, {100})),
|
||||
check_no_disk_reads(test(int_range::make_singular(100))),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // overlap
|
||||
run_test_case(app, [&] { // overlap
|
||||
return test_result_vector {
|
||||
test(int_range::make({1}, {100})),
|
||||
test(int_range::make({51}, {150})),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // enclosed
|
||||
run_test_case(app, [&] { // enclosed
|
||||
return test_result_vector {
|
||||
test(int_range::make({1}, {100})),
|
||||
check_no_disk_reads(test(int_range::make({51}, {70}))),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // enclosing
|
||||
run_test_case(app, [&] { // enclosing
|
||||
return test_result_vector {
|
||||
test(int_range::make({51}, {70})),
|
||||
test(int_range::make({41}, {80})),
|
||||
@@ -1420,21 +1419,21 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // adjacent, singular excluded
|
||||
run_test_case(app, [&] { // adjacent, singular excluded
|
||||
return test_result_vector {
|
||||
test(int_range::make({0}, {100, false})),
|
||||
test(int_range::make_singular(100)),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // adjacent, singular excluded
|
||||
run_test_case(app, [&] { // adjacent, singular excluded
|
||||
return test_result_vector {
|
||||
test(int_range::make({100, false}, {200})),
|
||||
test(int_range::make_singular(100)),
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
test(int_range::make_ending_with({100})),
|
||||
check_no_disk_reads(test(int_range::make({10}, {20}))),
|
||||
@@ -1442,7 +1441,7 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
test(int_range::make_starting_with({100})),
|
||||
check_no_disk_reads(test(int_range::make({150}, {159}))),
|
||||
@@ -1451,7 +1450,7 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // many gaps
|
||||
run_test_case(app, [&] { // many gaps
|
||||
return test_result_vector {
|
||||
test(int_range::make({10}, {20, false})),
|
||||
test(int_range::make({30}, {40, false})),
|
||||
@@ -1461,7 +1460,7 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
};
|
||||
});
|
||||
|
||||
run_test_case([&] { // many gaps
|
||||
run_test_case(app, [&] { // many gaps
|
||||
return test_result_vector {
|
||||
test(int_range::make({10}, {20, false})),
|
||||
test(int_range::make({30}, {40, false})),
|
||||
@@ -1472,7 +1471,7 @@ void test_large_partition_single_key_slice(replica::column_family& cf, clustered
|
||||
});
|
||||
}
|
||||
|
||||
void test_large_partition_skips(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_skips(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
auto n_rows = ds.n_rows(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names());
|
||||
@@ -1483,7 +1482,7 @@ void test_large_partition_skips(replica::column_family& cf, clustered_ds& ds) {
|
||||
return r;
|
||||
};
|
||||
auto test = [&] (int n_read, int n_skip) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return do_test(n_read, n_skip);
|
||||
});
|
||||
};
|
||||
@@ -1512,7 +1511,7 @@ void test_large_partition_skips(replica::column_family& cf, clustered_ds& ds) {
|
||||
output_mgr->add_test_static_param("cache_enabled", "Testing cache scan of large partition with varying row continuity.");
|
||||
for (auto n_read : {1, 64}) {
|
||||
for (auto n_skip : {1, 64}) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
do_test(n_read, n_skip), // populate with gaps
|
||||
do_test(1, 0),
|
||||
@@ -1523,12 +1522,12 @@ void test_large_partition_skips(replica::column_family& cf, clustered_ds& ds) {
|
||||
}
|
||||
}
|
||||
|
||||
void test_large_partition_slicing(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_slicing(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
auto n_rows = ds.n_rows(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
|
||||
auto test = [&] (int offset, int read) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = slice_rows(cf, ds, offset, read);
|
||||
r.set_params(to_sstrings(offset, read));
|
||||
check_fragment_count(r, std::min(n_rows - offset, read));
|
||||
@@ -1547,12 +1546,12 @@ void test_large_partition_slicing(replica::column_family& cf, clustered_ds& ds)
|
||||
test(n_rows / 2, 4096);
|
||||
}
|
||||
|
||||
void test_large_partition_slicing_clustering_keys(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_slicing_clustering_keys(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
auto n_rows = ds.n_rows(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
|
||||
auto test = [&] (int offset, int read) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = slice_rows_by_ck(cf, ds, offset, read);
|
||||
r.set_params(to_sstrings(offset, read));
|
||||
check_fragment_count(r, std::min(n_rows - offset, read));
|
||||
@@ -1571,12 +1570,12 @@ void test_large_partition_slicing_clustering_keys(replica::column_family& cf, cl
|
||||
test(n_rows / 2, 4096);
|
||||
}
|
||||
|
||||
void test_large_partition_slicing_single_partition_reader(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_slicing_single_partition_reader(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
auto n_rows = ds.n_rows(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
|
||||
auto test = [&](int offset, int read) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = slice_rows_single_key(cf, ds, offset, read);
|
||||
r.set_params(to_sstrings(offset, read));
|
||||
check_fragment_count(r, std::min(n_rows - offset, read));
|
||||
@@ -1595,12 +1594,12 @@ void test_large_partition_slicing_single_partition_reader(replica::column_family
|
||||
test(n_rows / 2, 4096);
|
||||
}
|
||||
|
||||
void test_large_partition_select_few_rows(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_select_few_rows(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
auto n_rows = ds.n_rows(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"stride", "{:<7}"}, {"rows", "{:<7}"}}, test_result::stats_names());
|
||||
auto test = [&](int stride, int read) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = select_spread_rows(cf, ds, stride, read);
|
||||
r.set_params(to_sstrings(stride, read));
|
||||
check_fragment_count(r, read);
|
||||
@@ -1616,17 +1615,17 @@ void test_large_partition_select_few_rows(replica::column_family& cf, clustered_
|
||||
test(2, n_rows / 2);
|
||||
}
|
||||
|
||||
void test_large_partition_forwarding(replica::column_family& cf, clustered_ds& ds) {
|
||||
void test_large_partition_forwarding(app_template &app, replica::column_family& cf, clustered_ds& ds) {
|
||||
output_mgr->set_test_param_names({{"pk-scan", "{:<7}"}}, test_result::stats_names());
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = test_forwarding_with_restriction(cf, ds, cfg, false);
|
||||
check_fragment_count(r, 2);
|
||||
r.set_params(to_sstrings("yes"));
|
||||
return r;
|
||||
});
|
||||
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = test_forwarding_with_restriction(cf, ds, cfg, true);
|
||||
check_fragment_count(r, 2);
|
||||
r.set_params(to_sstrings("no"));
|
||||
@@ -1634,7 +1633,7 @@ void test_large_partition_forwarding(replica::column_family& cf, clustered_ds& d
|
||||
});
|
||||
}
|
||||
|
||||
void test_small_partition_skips(replica::column_family& cf2, multipart_ds& ds) {
|
||||
void test_small_partition_skips(app_template &app, replica::column_family& cf2, multipart_ds& ds) {
|
||||
auto n_parts = ds.n_partitions(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"", "{:<2}"}, {"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names());
|
||||
@@ -1647,7 +1646,7 @@ void test_small_partition_skips(replica::column_family& cf2, multipart_ds& ds) {
|
||||
};
|
||||
auto test = [&] (int n_read, int n_skip) {
|
||||
test_result r;
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
r = do_test(n_read, n_skip);
|
||||
return r;
|
||||
});
|
||||
@@ -1679,7 +1678,7 @@ void test_small_partition_skips(replica::column_family& cf2, multipart_ds& ds) {
|
||||
output_mgr->add_test_static_param("cache_enabled", "Testing cache scan with small partitions with varying continuity.");
|
||||
for (auto n_read : {1, 64}) {
|
||||
for (auto n_skip : {1, 64}) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
return test_result_vector {
|
||||
do_test(n_read, n_skip), // populate with gaps
|
||||
do_test(1, 0),
|
||||
@@ -1690,13 +1689,13 @@ void test_small_partition_skips(replica::column_family& cf2, multipart_ds& ds) {
|
||||
}
|
||||
}
|
||||
|
||||
void test_small_partition_slicing(replica::column_family& cf2, multipart_ds& ds) {
|
||||
void test_small_partition_slicing(app_template &app, replica::column_family& cf2, multipart_ds& ds) {
|
||||
auto n_parts = ds.n_partitions(cfg);
|
||||
|
||||
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
|
||||
auto keys = make_pkeys(cf2.schema(), n_parts);
|
||||
auto test = [&] (int offset, int read) {
|
||||
run_test_case([&] {
|
||||
run_test_case(app, [&] {
|
||||
auto r = slice_partitions(cf2, keys, offset, read);
|
||||
r.set_params(to_sstrings(offset, read));
|
||||
check_fragment_count(r, std::min(n_parts - offset, read));
|
||||
@@ -1885,6 +1884,8 @@ namespace perf {
|
||||
|
||||
int scylla_fast_forward_main(int argc, char** argv) {
|
||||
namespace bpo = boost::program_options;
|
||||
app_template app;
|
||||
|
||||
app.add_options()
|
||||
("random-seed", boost::program_options::value<unsigned>(), "Random number generator seed")
|
||||
("run-tests", bpo::value<std::vector<std::string>>()->default_value(
|
||||
@@ -1920,7 +1921,7 @@ int scylla_fast_forward_main(int argc, char** argv) {
|
||||
("dump-all-results", "Write results of all iterations of all tests to text files in the output directory")
|
||||
;
|
||||
|
||||
return app.run(argc, argv, [] {
|
||||
return app.run(argc, argv, [&app] {
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
|
||||
@@ -1969,7 +1970,7 @@ int scylla_fast_forward_main(int argc, char** argv) {
|
||||
std::cout << "Data directory: " << db_cfg.data_file_directories() << "\n";
|
||||
std::cout << "Output directory: " << output_dir << "\n";
|
||||
|
||||
auto init = [] {
|
||||
auto init = [&app] {
|
||||
auto conf_seed = app.configuration()["random-seed"];
|
||||
auto seed = conf_seed.empty() ? std::random_device()() : conf_seed.as<unsigned>();
|
||||
std::cout << "random-seed=" << seed << '\n';
|
||||
@@ -1978,9 +1979,9 @@ int scylla_fast_forward_main(int argc, char** argv) {
|
||||
});
|
||||
};
|
||||
|
||||
return init().then([db_cfg_ptr] {
|
||||
return do_with_cql_env([] (cql_test_env& env) {
|
||||
return seastar::async([&env] {
|
||||
return init().then([&app, db_cfg_ptr] {
|
||||
return do_with_cql_env([&app] (cql_test_env& env) {
|
||||
return seastar::async([&app, &env] {
|
||||
cql_env = &env;
|
||||
sstring name = app.configuration()["name"].as<std::string>();
|
||||
|
||||
@@ -2047,8 +2048,8 @@ int scylla_fast_forward_main(int argc, char** argv) {
|
||||
output_mgr->add_test_group(tc, *ds, false);
|
||||
} else {
|
||||
output_mgr->add_test_group(tc, *ds, true);
|
||||
on_test_group();
|
||||
tc.test_fn->run(find_table(db, *ds), *ds);
|
||||
on_test_group(app);
|
||||
tc.test_fn->run(app, find_table(db, *ds), *ds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
114
utils/rjson.cc
114
utils/rjson.cc
@@ -227,68 +227,62 @@ std::string print(const rjson::value& value, size_t max_nested_level) {
|
||||
return std::string(buffer.GetString());
|
||||
}
|
||||
|
||||
// This class implements RapidJSON Handler and batches Put() calls into output_stream writes.
|
||||
class output_stream_buffer {
|
||||
static constexpr size_t _buf_size = 512;
|
||||
seastar::output_stream<char>& _os;
|
||||
temporary_buffer<char> _buf = temporary_buffer<char>(_buf_size);
|
||||
size_t _pos = 0;
|
||||
|
||||
future<> send(temporary_buffer<char> b) {
|
||||
co_return co_await _os.write(b.get(), b.size());
|
||||
}
|
||||
public:
|
||||
output_stream_buffer(seastar::output_stream<char>& os) : _os(os) {}
|
||||
using Ch = char; // Used by rjson internally
|
||||
future<> f = make_ready_future<>();
|
||||
|
||||
void Flush() {
|
||||
if (f.failed()) {
|
||||
f.get0();
|
||||
}
|
||||
if (_pos == 0) {
|
||||
return;
|
||||
}
|
||||
if (_pos < _buf_size) {
|
||||
_buf.trim(_pos); // Last flush may be shorter
|
||||
}
|
||||
// Either we call futures right away (if they are ready) or we start growing continuations
|
||||
// chain as we don't have the ability to wait here because Flush() signature is set by rjson.
|
||||
f = f.then([this, b = std::move(_buf)] () mutable {
|
||||
return send(std::move(b));
|
||||
});
|
||||
_pos = 0;
|
||||
_buf = temporary_buffer<char>(_buf_size);
|
||||
}
|
||||
|
||||
void Put(Ch c) {
|
||||
if (_pos == _buf_size) {
|
||||
Flush();
|
||||
}
|
||||
// Note: Should consider writing directly to the buffer in output_stream
|
||||
// instead of double buffering. But output_stream for a single char has higher
|
||||
// overhead than the above check + once we hit a non-completed future, we'd have
|
||||
// to revert to this method anyway...
|
||||
*(_buf.get_write() + _pos) = c;
|
||||
++_pos;
|
||||
}
|
||||
};
|
||||
|
||||
future<> print(const rjson::value& value, seastar::output_stream<char>& os, size_t max_nested_level) {
|
||||
struct os_buffer {
|
||||
seastar::output_stream<char>& _os;
|
||||
temporary_buffer<char> _buf;
|
||||
size_t _pos = 0;
|
||||
future<> _f = make_ready_future<>();
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-local-typedefs"
|
||||
using Ch = char;
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
void send(bool try_reuse = true) {
|
||||
if (_f.failed()) {
|
||||
_f.get0();
|
||||
}
|
||||
if (!_buf.empty() && _pos > 0) {
|
||||
_buf.trim(_pos);
|
||||
_pos = 0;
|
||||
// Note: we're assuming we're writing to a buffered output_stream (hello http server).
|
||||
// If we were not, or if (http) output_stream supported mixed buffered/packed content
|
||||
// it might be a good idea to instead send our buffer as a packet directly. If so, the
|
||||
// buffer size should probably increase (at least after first send()).
|
||||
_f = _f.then([this, buf = std::move(_buf), &os = _os, try_reuse]() mutable -> future<> {
|
||||
return os.write(buf.get(), buf.size()).then([this, buf = std::move(buf), try_reuse]() mutable {
|
||||
// Chances are high we just copied this to output_stream buffer, and got here
|
||||
// immediately. If so, reuse the buffer.
|
||||
if (try_reuse && _buf.empty() && _pos == 0) {
|
||||
_buf = std::move(buf);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
void Put(char c) {
|
||||
if (_pos == _buf.size()) {
|
||||
send();
|
||||
if (_buf.empty()) {
|
||||
_buf = temporary_buffer<char>(512);
|
||||
}
|
||||
}
|
||||
// Second note: Should consider writing directly to the buffer in output_stream
|
||||
// instead of double buffering. But output_stream for a single char has higher
|
||||
// overhead than the above check + once we hit a non-completed future, we'd have
|
||||
// to revert to this method anyway...
|
||||
*(_buf.get_write() + _pos) = c;
|
||||
++_pos;
|
||||
}
|
||||
void Flush() {
|
||||
send();
|
||||
}
|
||||
future<> finish()&& {
|
||||
send(false);
|
||||
return std::move(_f);
|
||||
}
|
||||
};
|
||||
|
||||
os_buffer osb{ os };
|
||||
using streamer = rapidjson::Writer<os_buffer, encoding, encoding, allocator>;
|
||||
guarded_yieldable_json_handler<streamer, false, os_buffer> writer(osb, max_nested_level);
|
||||
output_stream_buffer buf{ os };
|
||||
using streamer = rapidjson::Writer<output_stream_buffer, encoding, encoding, allocator>;
|
||||
guarded_yieldable_json_handler<streamer, false, output_stream_buffer> writer(buf, max_nested_level);
|
||||
value.Accept(writer);
|
||||
co_return co_await std::move(osb).finish();
|
||||
buf.Flush();
|
||||
// This function has to be a coroutine otherwise buf gets destroyed before all its
|
||||
// continuations from buf.f finish leading to use-after-free.
|
||||
co_return co_await std::move(buf.f);
|
||||
}
|
||||
|
||||
rjson::malformed_value::malformed_value(std::string_view name, const rjson::value& value)
|
||||
|
||||
Reference in New Issue
Block a user