Compare commits
23 Commits
next
...
scylla-4.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67348cd6e8 | ||
|
|
44cc4843f1 | ||
|
|
f1f5586bf6 | ||
|
|
3a447cd755 | ||
|
|
176aa91be5 | ||
|
|
4a3eff17ff | ||
|
|
2e00f6d0a1 | ||
|
|
bf509c3b16 | ||
|
|
84ef30752f | ||
|
|
f1b71ec216 | ||
|
|
93ed536fba | ||
|
|
ab3da4510c | ||
|
|
bb8fcbff68 | ||
|
|
af43d0c62d | ||
|
|
8c8c266f67 | ||
|
|
6d1301d93c | ||
|
|
be545d6d5d | ||
|
|
a1c15f0690 | ||
|
|
4d68c53389 | ||
|
|
7d1f352be2 | ||
|
|
0fe5335447 | ||
|
|
8a026b8b14 | ||
|
|
0760107b9f |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.1.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -573,24 +573,61 @@ static bool validate_legal_tag_chars(std::string_view tag) {
|
||||
return std::all_of(tag.begin(), tag.end(), &is_legal_tag_char);
|
||||
}
|
||||
|
||||
static const std::unordered_set<std::string_view> allowed_write_isolation_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
|
||||
static void validate_tags(const std::map<sstring, sstring>& tags) {
|
||||
static const std::unordered_set<std::string_view> allowed_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
auto it = tags.find(rmw_operation::WRITE_ISOLATION_TAG_KEY);
|
||||
if (it != tags.end()) {
|
||||
std::string_view value = it->second;
|
||||
elogger.warn("Allowed values count {} {}", value, allowed_values.count(value));
|
||||
if (allowed_values.count(value) == 0) {
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw api_error("ValidationException",
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_values));
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_write_isolation_values));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static rmw_operation::write_isolation parse_write_isolation(std::string_view value) {
|
||||
if (!value.empty()) {
|
||||
switch (value[0]) {
|
||||
case 'f':
|
||||
return rmw_operation::write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return rmw_operation::write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return rmw_operation::write_isolation::UNSAFE_RMW;
|
||||
}
|
||||
}
|
||||
// Shouldn't happen as validate_tags() / set_default_write_isolation()
|
||||
// verify allow only a closed set of values.
|
||||
return rmw_operation::default_write_isolation;
|
||||
|
||||
}
|
||||
// This default_write_isolation is always overwritten in main.cc, which calls
|
||||
// set_default_write_isolation().
|
||||
rmw_operation::write_isolation rmw_operation::default_write_isolation =
|
||||
rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
void rmw_operation::set_default_write_isolation(std::string_view value) {
|
||||
if (value.empty()) {
|
||||
throw std::runtime_error("When Alternator is enabled, write "
|
||||
"isolation policy must be selected, using the "
|
||||
"'--alternator-write-isolation' option. "
|
||||
"See docs/alternator/alternator.md for instructions.");
|
||||
}
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw std::runtime_error(format("Invalid --alternator-write-isolation "
|
||||
"setting '{}'. Allowed values: {}.",
|
||||
value, allowed_write_isolation_values));
|
||||
}
|
||||
default_write_isolation = parse_write_isolation(value);
|
||||
}
|
||||
|
||||
// FIXME: Updating tags currently relies on updating schema, which may be subject
|
||||
// to races during concurrent updates of the same table. Once Scylla schema updates
|
||||
// are fixed, this issue will automatically get fixed as well.
|
||||
@@ -710,6 +747,17 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
|
||||
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
|
||||
}
|
||||
|
||||
static future<> wait_for_schema_agreement(db::timeout_clock::time_point deadline) {
|
||||
return do_until([deadline] {
|
||||
if (db::timeout_clock::now() > deadline) {
|
||||
throw std::runtime_error("Unable to reach schema agreement");
|
||||
}
|
||||
return service::get_local_migration_manager().have_schema_agreement();
|
||||
}, [] {
|
||||
return seastar::sleep(500ms);
|
||||
});
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
@@ -905,7 +953,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
if (rjson::find(table_info, "Tags")) {
|
||||
f = add_tags(_proxy, schema, table_info);
|
||||
}
|
||||
return f.then([table_info = std::move(table_info), schema] () mutable {
|
||||
return f.then([] {
|
||||
return wait_for_schema_agreement(db::timeout_clock::now() + 10s);
|
||||
}).then([table_info = std::move(table_info), schema] () mutable {
|
||||
rjson::value status = rjson::empty_object();
|
||||
supplement_table_info(table_info, *schema);
|
||||
rjson::set(status, "TableDescription", std::move(table_info));
|
||||
@@ -1195,22 +1245,9 @@ rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(sch
|
||||
const auto& tags = get_tags_of_table(schema);
|
||||
auto it = tags.find(WRITE_ISOLATION_TAG_KEY);
|
||||
if (it == tags.end() || it->second.empty()) {
|
||||
// By default, fall back to always enforcing LWT
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
}
|
||||
switch (it->second[0]) {
|
||||
case 'f':
|
||||
return write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return write_isolation::UNSAFE_RMW;
|
||||
default:
|
||||
// In case of an incorrect tag, fall back to the safest option: LWT_ALWAYS
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
return default_write_isolation;
|
||||
}
|
||||
return parse_write_isolation(it->second);
|
||||
}
|
||||
|
||||
// shard_for_execute() checks whether execute() must be called on a specific
|
||||
@@ -1261,7 +1298,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
stats& stats) {
|
||||
if (needs_read_before_write) {
|
||||
if (_write_isolation == write_isolation::FORBID_RMW) {
|
||||
throw api_error("ValidationException", "Read-modify-write operations not supported");
|
||||
throw api_error("ValidationException", "Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
|
||||
}
|
||||
stats.reads_before_write++;
|
||||
if (_write_isolation == write_isolation::UNSAFE_RMW) {
|
||||
@@ -3080,7 +3117,7 @@ static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_d
|
||||
if (attrs.Size() != 1) {
|
||||
throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs));
|
||||
}
|
||||
bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], pk_cdef);
|
||||
partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value));
|
||||
auto decorated_key = dht::decorate_key(*schema, pk);
|
||||
if (op != comparison_operator_type::EQ) {
|
||||
@@ -3105,7 +3142,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (attrs.Size() != expected_attrs_size) {
|
||||
throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
|
||||
}
|
||||
bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], ck_cdef);
|
||||
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
|
||||
switch (op) {
|
||||
case comparison_operator_type::EQ:
|
||||
@@ -3119,7 +3156,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
case comparison_operator_type::GT:
|
||||
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
|
||||
case comparison_operator_type::BETWEEN: {
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_upper_limit = get_key_from_typed_value(attrs[1], ck_cdef);
|
||||
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
|
||||
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
|
||||
}
|
||||
@@ -3132,9 +3169,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
|
||||
throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
|
||||
}
|
||||
std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].GetString();
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(raw_upper_limit_str);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_upper_limit), ck, schema, ck_cdef.type);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef.type);
|
||||
}
|
||||
default:
|
||||
throw api_error("ValidationException", format("Unknown primary key bound passed: {}", int(op)));
|
||||
|
||||
@@ -63,6 +63,10 @@ public:
|
||||
|
||||
static write_isolation get_write_isolation_for_schema(schema_ptr schema);
|
||||
|
||||
static write_isolation default_write_isolation;
|
||||
public:
|
||||
static void set_default_write_isolation(std::string_view mode);
|
||||
|
||||
protected:
|
||||
// The full request JSON
|
||||
rjson::value _request;
|
||||
|
||||
10
database.cc
10
database.cc
@@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const
|
||||
|
||||
inline
|
||||
std::unique_ptr<compaction_manager>
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
|
||||
}
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
@@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
|
||||
: _stats(make_lw_shared<db_stats>())
|
||||
, _cl_stats(std::make_unique<cell_locker_stats>())
|
||||
, _cfg(cfg)
|
||||
@@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _mutation_query_stage()
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
|
||||
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
|
||||
|
||||
@@ -1427,7 +1427,7 @@ public:
|
||||
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
|
||||
|
||||
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
|
||||
database(database&&) = delete;
|
||||
~database();
|
||||
|
||||
|
||||
@@ -681,7 +681,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
@@ -736,6 +736,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, redis_port(this, "redis_port", value_status::Used, 0, "Port on which the REDIS transport listens for clients.")
|
||||
, redis_ssl_port(this, "redis_ssl_port", value_status::Used, 0, "Port on which the REDIS TLS native transport listens for clients.")
|
||||
|
||||
@@ -314,6 +314,8 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
named_value<uint16_t> redis_port;
|
||||
|
||||
@@ -703,6 +703,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -725,6 +726,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
13
dist/common/scripts/scylla_coredump_setup
vendored
13
dist/common/scripts/scylla_coredump_setup
vendored
@@ -65,8 +65,8 @@ Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
|
||||
[Mount]
|
||||
What=/var/lib/systemd/coredump
|
||||
Where=/var/lib/scylla/coredump
|
||||
What=/var/lib/scylla/coredump
|
||||
Where=/var/lib/systemd/coredump
|
||||
Type=none
|
||||
Options=bind
|
||||
|
||||
@@ -78,6 +78,7 @@ WantedBy=multi-user.target
|
||||
makedirs('/var/lib/scylla/coredump')
|
||||
systemd_unit.reload()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').enable()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').start()
|
||||
if os.path.exists('/usr/lib/sysctl.d/50-coredump.conf'):
|
||||
run('sysctl -p /usr/lib/sysctl.d/50-coredump.conf')
|
||||
else:
|
||||
@@ -99,6 +100,14 @@ WantedBy=multi-user.target
|
||||
try:
|
||||
run('coredumpctl --no-pager --no-legend info {}'.format(pid))
|
||||
print('\nsystemd-coredump is working finely.')
|
||||
|
||||
# get last coredump generated by bash and remove it, ignore inaccessaible ones
|
||||
corefile = out(cmd=r'coredumpctl -1 --no-legend dump 2>&1 | grep "bash" | '
|
||||
r'grep -v "inaccessible" | grep "Storage:\|Coredump:"',
|
||||
shell=True, exception=False)
|
||||
if corefile:
|
||||
corefile = corefile.split()[-1]
|
||||
run('rm -f {}'.format(corefile))
|
||||
except subprocess.CalledProcessError as e:
|
||||
print('Does not able to detect coredump, failed to configure systemd-coredump.')
|
||||
sys.exit(1)
|
||||
|
||||
1
dist/docker/redhat/commandlineparser.py
vendored
1
dist/docker/redhat/commandlineparser.py
vendored
@@ -18,6 +18,7 @@ def parse():
|
||||
parser.add_argument('--api-address', default=None, dest='apiAddress')
|
||||
parser.add_argument('--alternator-address', default=None, dest='alternatorAddress', help="Alternator API address to listen to. Defaults to listen address.")
|
||||
parser.add_argument('--alternator-port', default=None, dest='alternatorPort', help="Alternator API port to listen to. Disabled by default.")
|
||||
parser.add_argument('--alternator-write-isolation', default=None, dest='alternatorWriteIsolation', help="Alternator default write isolation policy.")
|
||||
parser.add_argument('--disable-version-check', default=False, action='store_true', dest='disable_housekeeping', help="Disable version check")
|
||||
parser.add_argument('--authenticator', default=None, dest='authenticator', help="Set authenticator class")
|
||||
parser.add_argument('--authorizer', default=None, dest='authorizer', help="Set authorizer class")
|
||||
|
||||
4
dist/docker/redhat/scyllasetup.py
vendored
4
dist/docker/redhat/scyllasetup.py
vendored
@@ -16,6 +16,7 @@ class ScyllaSetup:
|
||||
self._broadcastRpcAddress = arguments.broadcastRpcAddress
|
||||
self._apiAddress = arguments.apiAddress
|
||||
self._alternatorPort = arguments.alternatorPort
|
||||
self._alternatorWriteIsolation = arguments.alternatorWriteIsolation
|
||||
self._smp = arguments.smp
|
||||
self._memory = arguments.memory
|
||||
self._overprovisioned = arguments.overprovisioned
|
||||
@@ -116,6 +117,9 @@ class ScyllaSetup:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-port %s" % self._alternatorPort]
|
||||
|
||||
if self._alternatorWriteIsolation is not None:
|
||||
args += ["--alternator-write-isolation %s" % self._alternatorWriteIsolation]
|
||||
|
||||
if self._authenticator is not None:
|
||||
args += ["--authenticator %s" % self._authenticator]
|
||||
|
||||
|
||||
@@ -25,6 +25,14 @@ By default, Scylla listens on this port on all network interfaces.
|
||||
To listen only on a specific interface, pass also an "`alternator-address`"
|
||||
option.
|
||||
|
||||
As we explain below in the "Write isolation policies", Alternator has
|
||||
four different choices for the implementation of writes, each with
|
||||
different advantages. You should consider which of the options makes
|
||||
more sense for your intended use case, and use the "`--alternator-write-isolation`"
|
||||
option to choose one. There is currently no default for this option: Trying
|
||||
to run Scylla with Alternator enabled without passing this option will
|
||||
result in an error asking you to set it.
|
||||
|
||||
DynamoDB clients usually specify a single "endpoint" address, e.g.,
|
||||
`dynamodb.us-east-1.amazonaws.com`, and a DNS server hosted on that address
|
||||
distributes the connections to many different backend nodes. Alternator
|
||||
@@ -108,12 +116,15 @@ implemented, with the following limitations:
|
||||
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
||||
or LOCAL_QUORUM (strong consistency).
|
||||
### Global Tables
|
||||
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
|
||||
be accessed from all of Scylla's DCs.
|
||||
* We do not yet support the DynamoDB API calls to make some of the tables
|
||||
global and others local to a particular DC: CreateGlobalTable,
|
||||
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
|
||||
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
|
||||
* Currently, *all* Alternator tables are created as "global" tables and can
|
||||
be accessed from all the DCs existing at the time of the table's creation.
|
||||
If a DC is added after a table is created, the table won't be visible from
|
||||
the new DC and changing that requires a CQL "ALTER TABLE" statement to
|
||||
modify the table's replication strategy.
|
||||
* We do not yet support the DynamoDB API calls that control which table is
|
||||
visible from what DC: CreateGlobalTable, UpdateGlobalTable,
|
||||
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
|
||||
DescribeGlobalTableSettings, and UpdateTable.
|
||||
### Backup and Restore
|
||||
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
|
||||
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
@@ -153,23 +164,28 @@ implemented, with the following limitations:
|
||||
|
||||
### Write isolation policies
|
||||
DynamoDB API update requests may involve a read before the write - e.g., a
|
||||
_conditional_ update, or an update based on the old value of an attribute.
|
||||
_conditional_ update or an update based on the old value of an attribute.
|
||||
The read and the write should be treated as a single transaction - protected
|
||||
(_isolated_) from other parallel writes to the same item.
|
||||
|
||||
By default, Alternator does this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation. However, LWT significantly slows
|
||||
writes down, so Alternator supports three additional _write isolation
|
||||
policies_, which can be chosen on a per-table basis and may make sense for
|
||||
certain workloads as explained below.
|
||||
Alternator could do this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation, but this significantly slows
|
||||
down writes, and not necessary for workloads which don't use read-modify-write
|
||||
(RMW) updates.
|
||||
|
||||
The write isolation policy of a table is configured by tagging the table (at
|
||||
CreateTable time, or any time later with TagResource) with the key
|
||||
So Alternator supports four _write isolation policies_, which can be chosen
|
||||
on a per-table basis and may make sense for certain workloads as explained
|
||||
below.
|
||||
|
||||
A default write isolation policy **must** be chosen using the
|
||||
`--alternator-write-isolation` configuration option. Additionally, the write
|
||||
isolation policy for a specific table can be overriden by tagging the table
|
||||
(at CreateTable time, or any time later with TagResource) with the key
|
||||
`system:write_isolation`, and one of the following values:
|
||||
|
||||
* `a`, `always`, or `always_use_lwt` - This is the default choice.
|
||||
It performs every write operation - even those that do not need a read
|
||||
before the write - as a lightweight transaction.
|
||||
* `a`, `always`, or `always_use_lwt` - This mode performs every write
|
||||
operation - even those that do not need a read before the write - as a
|
||||
lightweight transaction.
|
||||
|
||||
This is the slowest choice, but also the only choice guaranteed to work
|
||||
correctly for every workload.
|
||||
|
||||
@@ -10,10 +10,12 @@ This section will guide you through the steps for setting up the cluster:
|
||||
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
||||
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
||||
add to every "docker run" command: `-p 8000:8000` before the image name
|
||||
and `--alternator-port=8000` at the end. The "alternator-port" option
|
||||
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
|
||||
and `--alternator-port=8000 --alternator-write-isolation=always` at the end.
|
||||
The "alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and the "alternator-write-isolation" chooses
|
||||
whether or not Alternator will use LWT for every write.
|
||||
For example,
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --alternator-write-isolation=always
|
||||
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
|
||||
@@ -163,6 +163,12 @@ $ docker run --name some-scylla -d scylladb/scylla --alternator-port 8000
|
||||
|
||||
**Since: 3.2**
|
||||
|
||||
### `--alternator-write-isolation policy`
|
||||
|
||||
The `--alternator-write-isolation` command line option chooses between four allowed write isolation policies described in docs/alternator/alternator.md. This option must be specified if Alternator is enabled - it does not have a default.
|
||||
|
||||
**Since: 4.1**
|
||||
|
||||
### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.
|
||||
|
||||
10
main.cc
10
main.cc
@@ -78,6 +78,7 @@
|
||||
#include "cdc/log.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "alternator/tags_extension.hh"
|
||||
#include "alternator/rmw_operation.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -736,7 +737,7 @@ int main(int ac, char** av) {
|
||||
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
|
||||
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
start_large_data_handler(db).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1081,6 +1082,7 @@ int main(int ac, char** av) {
|
||||
}
|
||||
|
||||
if (cfg->alternator_port() || cfg->alternator_https_port()) {
|
||||
alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
static sharded<alternator::server> alternator_server;
|
||||
|
||||
@@ -1186,6 +1188,12 @@ int main(int ac, char** av) {
|
||||
}
|
||||
});
|
||||
|
||||
auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
|
||||
db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
});
|
||||
|
||||
auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
|
||||
if (cfg->redis_port() || cfg->redis_ssl_port()) {
|
||||
redis.stop().get();
|
||||
|
||||
@@ -450,6 +450,7 @@ class repair_writer {
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
streaming::stream_reason _reason;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -561,11 +562,18 @@ public:
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
_mq[node_idx]->abort(ep);
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -588,6 +596,10 @@ public:
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -1191,6 +1203,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1216,18 +1245,7 @@ private:
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1238,15 +1256,7 @@ private:
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1936,22 +1946,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1977,22 +1982,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2017,22 +2017,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: e708d1df3a...78f626af6c
@@ -1040,12 +1040,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
@@ -2602,11 +2606,8 @@ future<> storage_service::drain() {
|
||||
ss.do_stop_ms().get();
|
||||
|
||||
// Interrupt on going compaction and shutdown to prevent further compaction
|
||||
// No new compactions will be started from this call site on, but we don't need
|
||||
// to wait for them to stop. Drain leaves the node alive, and a future shutdown
|
||||
// will wait on the compaction_manager stop future.
|
||||
ss.db().invoke_on_all([] (auto& db) {
|
||||
db.get_compaction_manager().do_stop();
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
|
||||
ss.set_mode(mode::DRAINING, "flushing column families", false);
|
||||
|
||||
@@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
|
||||
});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
|
||||
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
|
||||
auto b = backlog() / available_memory;
|
||||
// This means we are using an unimplemented strategy
|
||||
@@ -372,26 +372,17 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
|
||||
: _compaction_controller(sg, iop, shares)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
, _available_memory(available_memory)
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(1)
|
||||
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
{}
|
||||
|
||||
compaction_manager::~compaction_manager() {
|
||||
@@ -455,17 +446,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop() {
|
||||
do_stop();
|
||||
return std::move(*_stop_future);
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() {
|
||||
if (_stopped) {
|
||||
return;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
_stopped = true;
|
||||
cmlog.info("Asked to stop");
|
||||
_stopped = true;
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
// Stop all ongoing compaction.
|
||||
@@ -475,10 +460,7 @@ void compaction_manager::do_stop() {
|
||||
// Wait for each task handler to stop. Copy list because task remove itself
|
||||
// from the list when done.
|
||||
auto tasks = _tasks;
|
||||
|
||||
// fine to ignore here, since it is used to set up the shared promise in
|
||||
// the finally block. Waiters will wait on the shared_future through stop().
|
||||
_stop_future.emplace(do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return parallel_for_each(tasks, [this] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
});
|
||||
@@ -490,7 +472,7 @@ void compaction_manager::do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return _compaction_controller.shutdown();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
|
||||
@@ -523,7 +505,8 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
|
||||
} catch (storage_io_error& e) {
|
||||
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
|
||||
retry = false;
|
||||
do_stop();
|
||||
// FIXME discarded future.
|
||||
(void)stop();
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
|
||||
retry = true;
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/scheduling.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "log.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include <vector>
|
||||
@@ -70,9 +69,6 @@ private:
|
||||
|
||||
// Used to assert that compaction_manager was explicitly stopped, if started.
|
||||
bool _stopped = true;
|
||||
// We use a shared promise to indicate whether or not we are stopped because it is legal
|
||||
// for stop() to be called twice. For instance it is called on DRAIN and shutdown.
|
||||
std::optional<future<>> _stop_future;
|
||||
|
||||
stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
@@ -153,10 +149,9 @@ private:
|
||||
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
|
||||
|
||||
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);
|
||||
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
||||
public:
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
|
||||
@@ -165,13 +160,9 @@ public:
|
||||
// Start compaction manager.
|
||||
void start();
|
||||
|
||||
// Stop all fibers. Ongoing compactions will be waited. Should only be called
|
||||
// once, from main teardown path.
|
||||
// Stop all fibers. Ongoing compactions will be waited.
|
||||
future<> stop();
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop();
|
||||
|
||||
bool stopped() const { return _stopped; }
|
||||
|
||||
// Submit a column family to be compacted.
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
|
||||
@@ -73,6 +73,7 @@ done
|
||||
--alternator-address $SCYLLA_IP \
|
||||
$alternator_port_option \
|
||||
--alternator-enforce-authorization=1 \
|
||||
--alternator-write-isolation=always_use_lwt \
|
||||
--developer-mode=1 \
|
||||
--ring-delay-ms 0 --collectd 0 \
|
||||
--smp 2 -m 1G \
|
||||
|
||||
@@ -84,7 +84,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), true).get();
|
||||
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
|
||||
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
db.invoke_on_all([] (database& db) {
|
||||
db.get_compaction_manager().start();
|
||||
}).get();
|
||||
|
||||
@@ -460,7 +460,7 @@ public:
|
||||
|
||||
database_config dbcfg;
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user