Files
scylladb/test/lib/cql_test_env.cc
Benny Halevy 57cc5f6ae1 sstable_directory: use a external load_semaphore
Although each sstable_directory limits concurrency using
max_concurrent_for_each, there could be a large number
of calls to do_for_each_sstable running in parallel
(e.g per keyspace X per table in the distributed_loader).

To cap parallelism across sstable_directory instances and
concurrent calls to do_for_each_sstable, start a sharded<semaphore>
and pass a shared semaphore& to the sstable_directory:s.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2020-10-08 11:57:06 +03:00

669 lines
29 KiB
C++

/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/algorithm/transform.hpp>
#include <iterator>
#include <seastar/core/thread.hh>
#include <seastar/util/defer.hh>
#include "sstables/sstables.hh"
#include <seastar/core/do_with.hh>
#include "test/lib/cql_test_env.hh"
#include "cql3/functions/functions.hh"
#include "cql3/query_processor.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/batch_statement.hh"
#include "cql3/cql_config.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/scheduling.hh>
#include "utils/UUID_gen.hh"
#include "service/migration_manager.hh"
#include "sstables/compaction_manager.hh"
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include "auth/service.hh"
#include "auth/common.hh"
#include "db/config.hh"
#include "db/batchlog_manager.hh"
#include "schema_builder.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/reader_permit.hh"
#include "db/query_context.hh"
#include "test/lib/test_services.hh"
#include "db/view/view_builder.hh"
#include "db/view/node_view_update_backlog.hh"
#include "distributed_loader.hh"
// TODO: remove (#293)
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include "service/storage_service.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/sstables-format-selector.hh"
#include "debug.hh"
using namespace std::chrono_literals;
cql_test_config::cql_test_config()
: cql_test_config(make_shared<db::config>())
{}
cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
: db_config(cfg)
{
// This causes huge amounts of commitlog writes to allocate space on disk,
// which all get thrown away when the test is done. This can cause timeouts
// if /tmp is not tmpfs.
db_config->commitlog_use_o_dsync.set(false);
}
cql_test_config::cql_test_config(const cql_test_config&) = default;
cql_test_config::~cql_test_config() = default;
static const sstring testing_superuser = "tester";
static future<> tst_init_ms_fd_gossiper(sharded<gms::feature_service>& features, sharded<locator::token_metadata>& tm, sharded<netw::messaging_service>& ms, db::config& cfg, db::seed_provider_type seed_provider,
sharded<abort_source>& abort_sources, sstring cluster_name = "Test Cluster") {
// Init gossiper
std::set<gms::inet_address> seeds;
if (seed_provider.parameters.contains("seeds")) {
size_t begin = 0;
size_t next = 0;
sstring seeds_str = seed_provider.parameters.find("seeds")->second;
while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) {
seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin)));
begin = next+1;
}
}
if (seeds.empty()) {
seeds.emplace(gms::inet_address("127.0.0.1"));
}
return gms::get_gossiper().start(std::ref(abort_sources), std::ref(features), std::ref(tm), std::ref(ms), std::ref(cfg)).then([seeds, cluster_name] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_seeds(seeds);
gossiper.set_cluster_name(cluster_name);
});
}
// END TODO
class single_node_cql_env : public cql_test_env {
public:
static constexpr std::string_view ks_name = "ks";
static std::atomic<bool> active;
private:
sharded<gms::feature_service>& _feature_service;
sharded<database>& _db;
sharded<auth::service>& _auth_service;
sharded<db::view::view_builder>& _view_builder;
sharded<db::view::view_update_generator>& _view_update_generator;
sharded<service::migration_notifier>& _mnotifier;
private:
struct core_local_state {
service::client_state client_state;
core_local_state(auth::service& auth_service)
: client_state(service::client_state::external_tag{}, auth_service)
{
client_state.set_login(auth::authenticated_user(testing_superuser));
}
future<> stop() {
return make_ready_future<>();
}
};
distributed<core_local_state> _core_local;
private:
auto make_query_state() {
if (_db.local().has_keyspace(ks_name)) {
_core_local.local().client_state.set_keyspace(_db.local(), ks_name);
}
return ::make_shared<service::query_state>(_core_local.local().client_state, empty_service_permit());
}
public:
single_node_cql_env(
sharded<gms::feature_service>& feature_service,
sharded<database>& db,
sharded<auth::service>& auth_service,
sharded<db::view::view_builder>& view_builder,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<service::migration_notifier>& mnotifier)
: _feature_service(feature_service)
, _db(db)
, _auth_service(auth_service)
, _view_builder(view_builder)
, _view_update_generator(view_update_generator)
, _mnotifier(mnotifier)
{ }
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(sstring_view text) override {
auto qs = make_query_state();
return local_qp().execute_direct(text, *qs, cql3::query_options::DEFAULT).finally([qs] {});
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(
sstring_view text,
std::unique_ptr<cql3::query_options> qo) override
{
auto qs = make_query_state();
auto& lqo = *qo;
return local_qp().execute_direct(text, *qs, lqo).finally([qs, qo = std::move(qo)] {});
}
virtual future<cql3::prepared_cache_key_type> prepare(sstring query) override {
return qp().invoke_on_all([query, this] (auto& local_qp) {
auto qs = this->make_query_state();
return local_qp.prepare(query, *qs).finally([qs] {}).discard_result();
}).then([query, this] {
return local_qp().compute_id(query, ks_name);
});
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared(
cql3::prepared_cache_key_type id,
std::vector<cql3::raw_value> values,
db::consistency_level cl = db::consistency_level::ONE) override {
const auto& so = cql3::query_options::specific_options::DEFAULT;
auto options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config,
std::move(values), cql3::query_options::specific_options{
so.page_size,
so.state,
db::consistency_level::SERIAL,
so.timestamp,
});
return execute_prepared_with_qo(id, std::move(options));
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared_with_qo(
cql3::prepared_cache_key_type id,
std::unique_ptr<cql3::query_options> qo) override
{
auto prepared = local_qp().get_prepared(id);
if (!prepared) {
throw not_prepared_exception(id);
}
auto stmt = prepared->statement;
assert(stmt->get_bound_terms() == qo->get_values_count());
qo->prepare(prepared->bound_names);
auto qs = make_query_state();
auto& lqo = *qo;
return local_qp().execute_prepared(std::move(prepared), std::move(id), *qs, lqo, true)
.finally([qs, qo = std::move(qo)] {});
}
virtual future<std::vector<mutation>> get_modification_mutations(const sstring& text) override {
auto qs = make_query_state();
auto cql_stmt = local_qp().get_statement(text, qs->get_client_state())->statement;
auto modif_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(std::move(cql_stmt));
if (!modif_stmt) {
throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text));
}
auto& qo = cql3::query_options::DEFAULT;
auto timeout = db::timeout_clock::now() + qo.get_timeout_config().write_timeout;
return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs)
.finally([qs, modif_stmt = std::move(modif_stmt)] {});
}
virtual future<> create_table(std::function<schema(std::string_view)> schema_maker) override {
auto id = utils::UUID_gen::get_time_UUID();
schema_builder builder(make_lw_shared<schema>(schema_maker(ks_name)));
builder.set_uuid(id);
auto s = builder.build(schema_builder::compact_storage::no);
return service::get_local_migration_manager().announce_new_column_family(s, true);
}
virtual future<> require_keyspace_exists(const sstring& ks_name) override {
auto& db = _db.local();
assert(db.has_keyspace(ks_name));
return make_ready_future<>();
}
virtual future<> require_table_exists(const sstring& ks_name, const sstring& table_name) override {
auto& db = _db.local();
assert(db.has_schema(ks_name, table_name));
return make_ready_future<>();
}
virtual future<> require_table_does_not_exist(const sstring& ks_name, const sstring& table_name) override {
auto& db = _db.local();
assert(!db.has_schema(ks_name, table_name));
return make_ready_future<>();
}
virtual future<> require_column_has_value(const sstring& table_name,
std::vector<data_value> pk,
std::vector<data_value> ck,
const sstring& column_name,
data_value expected) override {
auto& db = _db.local();
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
auto pkey = partition_key::from_deeply_exploded(*schema, pk);
auto ckey = clustering_key::from_deeply_exploded(*schema, ck);
auto exp = expected.type()->decompose(expected);
auto dk = dht::decorate_key(*schema, pkey);
auto shard = dht::shard_of(*schema, dk._token);
return _db.invoke_on(shard, [pkey = std::move(pkey),
ckey = std::move(ckey),
ks_name = std::move(ks_name),
column_name = std::move(column_name),
exp = std::move(exp),
table_name = std::move(table_name)] (database& db) mutable {
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
return cf.find_partition_slow(schema, tests::make_permit(), pkey)
.then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
assert(p != nullptr);
auto row = p->find_row(*schema, ckey);
assert(row != nullptr);
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
assert(col_def != nullptr);
const atomic_cell_or_collection* cell = row->find_cell(col_def->id);
if (!cell) {
assert(((void)"column not set", 0));
}
bytes actual;
if (!col_def->type->is_multi_cell()) {
auto c = cell->as_atomic_cell(*col_def);
assert(c.is_live());
actual = c.value().linearize();
} else {
actual = serialize_for_cql(*col_def->type,
cell->as_collection_mutation(), cql_serialization_format::internal());
}
assert(col_def->type->equal(actual, exp));
});
});
}
virtual service::client_state& local_client_state() override {
return _core_local.local().client_state;
}
virtual database& local_db() override {
return _db.local();
}
cql3::query_processor& local_qp() override {
return cql3::get_local_query_processor();
}
sharded<database>& db() override {
return _db;
}
distributed<cql3::query_processor>& qp() override {
return cql3::get_query_processor();
}
auth::service& local_auth_service() override {
return _auth_service.local();
}
virtual db::view::view_builder& local_view_builder() override {
return _view_builder.local();
}
virtual db::view::view_update_generator& local_view_update_generator() override {
return _view_update_generator.local();
}
virtual service::migration_notifier& local_mnotifier() override {
return _mnotifier.local();
}
future<> start() {
return _core_local.start(std::ref(_auth_service));
}
future<> stop() {
return _core_local.stop();
}
future<> create_keyspace(std::string_view name) {
auto query = format("create keyspace {} with replication = {{ 'class' : 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor' : 1 }};", name);
return execute_cql(query).discard_result();
}
static future<> do_with(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in) {
using namespace std::filesystem;
return seastar::async([cfg_in = std::move(cfg_in), func] {
logalloc::prime_segment_pool(memory::stats().total_memory(), memory::min_free_memory()).get();
bool old_active = false;
if (!active.compare_exchange_strong(old_active, true)) {
throw std::runtime_error("Starting more than one cql_test_env at a time not supported due to singletons.");
}
auto deactivate = defer([] {
bool old_active = true;
auto success = active.compare_exchange_strong(old_active, false);
assert(success);
});
// FIXME: make the function storage non static
auto clear_funcs = defer([] {
smp::invoke_on_all([] () {
cql3::functions::functions::clear_functions();
}).get();
});
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
auto stop_snitch = defer([] { locator::i_endpoint_snitch::stop_snitch().get(); });
sharded<abort_source> abort_sources;
abort_sources.start().get();
auto stop_abort_sources = defer([&] { abort_sources.stop().get(); });
sharded<database> db;
debug::db = &db;
auto reset_db_ptr = defer([] {
debug::db = nullptr;
});
auto cfg = cfg_in.db_config;
tmpdir data_dir;
auto data_dir_path = data_dir.path().string();
if (!cfg->data_file_directories.is_set()) {
cfg->data_file_directories.set({data_dir_path});
} else {
data_dir_path = cfg->data_file_directories()[0];
}
cfg->commitlog_directory.set(data_dir_path + "/commitlog.dir");
cfg->hints_directory.set(data_dir_path + "/hints.dir");
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
cfg->num_tokens.set(256);
cfg->ring_delay_ms.set(500);
cfg->shutdown_announce_in_ms.set(0);
cfg->broadcast_to_all_shards().get();
create_directories((data_dir_path + "/system").c_str());
create_directories(cfg->commitlog_directory().c_str());
create_directories(cfg->hints_directory().c_str());
create_directories(cfg->view_hints_directory().c_str());
for (unsigned i = 0; i < smp::count; ++i) {
create_directories((cfg->hints_directory() + "/" + std::to_string(i)).c_str());
create_directories((cfg->view_hints_directory() + "/" + std::to_string(i)).c_str());
}
if (!cfg->max_memory_for_unlimited_query_soft_limit.is_set()) {
cfg->max_memory_for_unlimited_query_soft_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
}
if (!cfg->max_memory_for_unlimited_query_hard_limit.is_set()) {
cfg->max_memory_for_unlimited_query_hard_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
}
sharded<locator::token_metadata> token_metadata;
token_metadata.start().get();
auto stop_token_metadata = defer([&token_metadata] { token_metadata.stop().get(); });
sharded<service::migration_notifier> mm_notif;
mm_notif.start().get();
auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
set_abort_on_internal_error(true);
const gms::inet_address listen("127.0.0.1");
sharded<netw::messaging_service> ms;
// don't start listening so tests can be run in parallel
ms.start(listen, std::move(7000)).get();
auto stop_ms = defer([&ms] { ms.stop().get(); });
sharded<auth::service> auth_service;
// Normally the auth server is already stopped in here,
// but if there is an initialization failure we have to
// make sure to stop it now or ~sharded will assert.
auto stop_auth_server = defer([&auth_service] {
auth_service.stop().get();
});
auto sys_dist_ks = seastar::sharded<db::system_distributed_keyspace>();
auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); });
gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features);
sharded<gms::feature_service> feature_service;
feature_service.start(fcfg).get();
auto stop_feature_service = defer([&] { feature_service.stop().get(); });
// FIXME: split
tst_init_ms_fd_gossiper(feature_service, token_metadata, ms, *cfg, db::config::seed_provider_type(), abort_sources).get();
distributed<service::storage_proxy>& proxy = service::get_storage_proxy();
distributed<service::migration_manager>& mm = service::get_migration_manager();
distributed<db::batchlog_manager>& bm = db::get_batchlog_manager();
sharded<cql3::cql_config> cql_config;
cql_config.start().get();
auto stop_cql_config = defer([&] { cql_config.stop().get(); });
sharded<db::view::view_update_generator> view_update_generator;
auto& ss = service::get_storage_service();
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
ss.start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), std::ref(ms), true).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
sharded<semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer([&sst_dir_semaphore] {
sst_dir_semaphore.stop().get();
});
database_config dbcfg;
if (cfg_in.dbcfg) {
dbcfg = std::move(*cfg_in.dbcfg);
} else {
dbcfg.available_memory = memory::stats().total_memory();
}
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources), std::ref(sst_dir_semaphore)).get();
auto stop_db = defer([&db] {
db.stop().get();
});
db.invoke_on_all([] (database& db) {
db.set_format_by_config();
}).get();
auto stop_ms_fd_gossiper = defer([] {
gms::get_gossiper().stop().get();
});
ss.invoke_on_all([] (auto&& ss) {
ss.enable_all_features();
}).get();
service::storage_proxy::config spcfg;
spcfg.available_memory = memory::stats().total_memory();
db::view::node_update_backlog b(smp::count, 10ms);
scheduling_group_key_config sg_conf =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
proxy.start(std::ref(db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(feature_service), std::ref(token_metadata), std::ref(ms)).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });
auto& qp = cql3::get_query_processor();
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), qp_mcfg, std::ref(cql_config)).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });
db::batchlog_manager_config bmcfg;
bmcfg.replay_rate = 100000000;
bmcfg.write_request_timeout = 2s;
bm.start(std::ref(qp), bmcfg).get();
auto stop_bm = defer([&bm] { bm.stop().get(); });
view_update_generator.start(std::ref(db)).get();
view_update_generator.invoke_on_all(&db::view::view_update_generator::start).get();
auto stop_view_update_generator = defer([&view_update_generator] {
view_update_generator.stop().get();
});
distributed_loader::init_system_keyspace(db).get();
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}).get();
distributed_loader::init_non_system_keyspaces(db, proxy, mm).get();
db.invoke_on_all([] (database& db) {
for (auto& x : db.get_column_families()) {
table& t = *(x.second);
t.enable_auto_compaction();
}
}).get();
sharded<cdc::cdc_service> cdc;
cdc.start(std::ref(proxy)).get();
auto stop_cdc_service = defer([&] {
cdc.stop().get();
});
// In main.cc we call db::system_keyspace::setup which calls
// minimal_setup and init_local_cache
db::system_keyspace::minimal_setup(db, qp);
auto stop_system_keyspace = defer([] { db::qctx = {}; });
start_large_data_handler(db).get();
db.invoke_on_all([] (database& db) {
db.get_compaction_manager().enable();
}).get();
auto stop_database_d = defer([&db] {
stop_database(db).get();
});
db::system_keyspace::init_local_cache().get();
auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); });
service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get();
service::get_local_storage_service().join_cluster().get();
auth::permissions_cache_config perm_cache_config;
perm_cache_config.max_entries = cfg->permissions_cache_max_entries();
perm_cache_config.validity_period = std::chrono::milliseconds(cfg->permissions_validity_in_ms());
perm_cache_config.update_period = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());
const qualified_name qualified_authorizer_name(auth::meta::AUTH_PACKAGE_NAME, cfg->authorizer());
const qualified_name qualified_authenticator_name(auth::meta::AUTH_PACKAGE_NAME, cfg->authenticator());
const qualified_name qualified_role_manager_name(auth::meta::AUTH_PACKAGE_NAME, cfg->role_manager());
auth::service_config auth_config;
auth_config.authorizer_java_name = qualified_authorizer_name;
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
auth_service.start(perm_cache_config, std::ref(qp), std::ref(mm_notif), std::ref(mm), auth_config).get();
auth_service.invoke_on_all([&mm] (auth::service& auth) {
return auth.start(mm.local());
}).get();
auto deinit_storage_service_server = defer([&auth_service] {
gms::stop_gossiping().get();
auth_service.stop().get();
});
sharded<db::view::view_builder> view_builder;
view_builder.start(std::ref(db), std::ref(sys_dist_ks), std::ref(mm_notif)).get();
view_builder.invoke_on_all([&mm] (db::view::view_builder& vb) {
return vb.start(mm.local());
}).get();
auto stop_view_builder = defer([&view_builder] {
view_builder.stop().get();
});
// Create the testing user.
try {
auth::role_config config;
config.is_superuser = true;
config.can_login = true;
auth::create_role(
auth_service.local(),
testing_superuser,
config,
auth::authentication_options()).get0();
} catch (const auth::role_already_exists&) {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}
single_node_cql_env env(feature_service, db, auth_service, view_builder, view_update_generator, mm_notif);
env.start().get();
auto stop_env = defer([&env] { env.stop().get(); });
if (!env.local_db().has_keyspace(ks_name)) {
env.create_keyspace(ks_name).get();
}
func(env).get();
});
}
future<::shared_ptr<cql_transport::messages::result_message>> execute_batch(
const std::vector<sstring_view>& queries, std::unique_ptr<cql3::query_options> qo) override {
using cql3::statements::batch_statement;
using cql3::statements::modification_statement;
std::vector<batch_statement::single_statement> modifications;
boost::transform(queries, back_inserter(modifications), [this](const auto& query) {
auto stmt = local_qp().get_statement(query, _core_local.local().client_state);
if (!dynamic_cast<modification_statement*>(stmt->statement.get())) {
throw exceptions::invalid_request_exception(
"Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
}
return batch_statement::single_statement(static_pointer_cast<modification_statement>(stmt->statement));
});
auto batch = ::make_shared<batch_statement>(
batch_statement::type::UNLOGGED,
std::move(modifications),
cql3::attributes::none(),
local_qp().get_cql_stats());
auto qs = make_query_state();
auto& lqo = *qo;
return local_qp().execute_batch(batch, *qs, lqo, {}).finally([qs, batch, qo = std::move(qo)] {});
}
};
std::atomic<bool> single_node_cql_env::active = { false };
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in) {
return single_node_cql_env::do_with(func, std::move(cfg_in));
}
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config cfg_in) {
return single_node_cql_env::do_with([func = std::move(func)] (auto& e) {
return seastar::async([func = std::move(func), &e] {
return func(e);
});
}, std::move(cfg_in));
}
namespace debug {
seastar::sharded<database>* db;
}