Compare commits
29 Commits
debug_form
...
scylla-4.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b443b2574a | ||
|
|
2ee321d88e | ||
|
|
4563f4b992 | ||
|
|
81dc8eeec7 | ||
|
|
2d72f7d8e5 | ||
|
|
c6ee86b512 | ||
|
|
67348cd6e8 | ||
|
|
44cc4843f1 | ||
|
|
f1f5586bf6 | ||
|
|
3a447cd755 | ||
|
|
176aa91be5 | ||
|
|
4a3eff17ff | ||
|
|
2e00f6d0a1 | ||
|
|
bf509c3b16 | ||
|
|
84ef30752f | ||
|
|
f1b71ec216 | ||
|
|
93ed536fba | ||
|
|
ab3da4510c | ||
|
|
bb8fcbff68 | ||
|
|
af43d0c62d | ||
|
|
8c8c266f67 | ||
|
|
6d1301d93c | ||
|
|
be545d6d5d | ||
|
|
a1c15f0690 | ||
|
|
4d68c53389 | ||
|
|
7d1f352be2 | ||
|
|
0fe5335447 | ||
|
|
8a026b8b14 | ||
|
|
0760107b9f |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.1.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -573,24 +573,61 @@ static bool validate_legal_tag_chars(std::string_view tag) {
|
||||
return std::all_of(tag.begin(), tag.end(), &is_legal_tag_char);
|
||||
}
|
||||
|
||||
static const std::unordered_set<std::string_view> allowed_write_isolation_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
|
||||
static void validate_tags(const std::map<sstring, sstring>& tags) {
|
||||
static const std::unordered_set<std::string_view> allowed_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
auto it = tags.find(rmw_operation::WRITE_ISOLATION_TAG_KEY);
|
||||
if (it != tags.end()) {
|
||||
std::string_view value = it->second;
|
||||
elogger.warn("Allowed values count {} {}", value, allowed_values.count(value));
|
||||
if (allowed_values.count(value) == 0) {
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw api_error("ValidationException",
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_values));
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_write_isolation_values));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static rmw_operation::write_isolation parse_write_isolation(std::string_view value) {
|
||||
if (!value.empty()) {
|
||||
switch (value[0]) {
|
||||
case 'f':
|
||||
return rmw_operation::write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return rmw_operation::write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return rmw_operation::write_isolation::UNSAFE_RMW;
|
||||
}
|
||||
}
|
||||
// Shouldn't happen as validate_tags() / set_default_write_isolation()
|
||||
// verify allow only a closed set of values.
|
||||
return rmw_operation::default_write_isolation;
|
||||
|
||||
}
|
||||
// This default_write_isolation is always overwritten in main.cc, which calls
|
||||
// set_default_write_isolation().
|
||||
rmw_operation::write_isolation rmw_operation::default_write_isolation =
|
||||
rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
void rmw_operation::set_default_write_isolation(std::string_view value) {
|
||||
if (value.empty()) {
|
||||
throw std::runtime_error("When Alternator is enabled, write "
|
||||
"isolation policy must be selected, using the "
|
||||
"'--alternator-write-isolation' option. "
|
||||
"See docs/alternator/alternator.md for instructions.");
|
||||
}
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw std::runtime_error(format("Invalid --alternator-write-isolation "
|
||||
"setting '{}'. Allowed values: {}.",
|
||||
value, allowed_write_isolation_values));
|
||||
}
|
||||
default_write_isolation = parse_write_isolation(value);
|
||||
}
|
||||
|
||||
// FIXME: Updating tags currently relies on updating schema, which may be subject
|
||||
// to races during concurrent updates of the same table. Once Scylla schema updates
|
||||
// are fixed, this issue will automatically get fixed as well.
|
||||
@@ -710,6 +747,17 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
|
||||
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
|
||||
}
|
||||
|
||||
static future<> wait_for_schema_agreement(db::timeout_clock::time_point deadline) {
|
||||
return do_until([deadline] {
|
||||
if (db::timeout_clock::now() > deadline) {
|
||||
throw std::runtime_error("Unable to reach schema agreement");
|
||||
}
|
||||
return service::get_local_migration_manager().have_schema_agreement();
|
||||
}, [] {
|
||||
return seastar::sleep(500ms);
|
||||
});
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
@@ -905,7 +953,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
if (rjson::find(table_info, "Tags")) {
|
||||
f = add_tags(_proxy, schema, table_info);
|
||||
}
|
||||
return f.then([table_info = std::move(table_info), schema] () mutable {
|
||||
return f.then([] {
|
||||
return wait_for_schema_agreement(db::timeout_clock::now() + 10s);
|
||||
}).then([table_info = std::move(table_info), schema] () mutable {
|
||||
rjson::value status = rjson::empty_object();
|
||||
supplement_table_info(table_info, *schema);
|
||||
rjson::set(status, "TableDescription", std::move(table_info));
|
||||
@@ -1195,22 +1245,9 @@ rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(sch
|
||||
const auto& tags = get_tags_of_table(schema);
|
||||
auto it = tags.find(WRITE_ISOLATION_TAG_KEY);
|
||||
if (it == tags.end() || it->second.empty()) {
|
||||
// By default, fall back to always enforcing LWT
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
}
|
||||
switch (it->second[0]) {
|
||||
case 'f':
|
||||
return write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return write_isolation::UNSAFE_RMW;
|
||||
default:
|
||||
// In case of an incorrect tag, fall back to the safest option: LWT_ALWAYS
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
return default_write_isolation;
|
||||
}
|
||||
return parse_write_isolation(it->second);
|
||||
}
|
||||
|
||||
// shard_for_execute() checks whether execute() must be called on a specific
|
||||
@@ -1261,7 +1298,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
stats& stats) {
|
||||
if (needs_read_before_write) {
|
||||
if (_write_isolation == write_isolation::FORBID_RMW) {
|
||||
throw api_error("ValidationException", "Read-modify-write operations not supported");
|
||||
throw api_error("ValidationException", "Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
|
||||
}
|
||||
stats.reads_before_write++;
|
||||
if (_write_isolation == write_isolation::UNSAFE_RMW) {
|
||||
@@ -3080,7 +3117,7 @@ static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_d
|
||||
if (attrs.Size() != 1) {
|
||||
throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs));
|
||||
}
|
||||
bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], pk_cdef);
|
||||
partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value));
|
||||
auto decorated_key = dht::decorate_key(*schema, pk);
|
||||
if (op != comparison_operator_type::EQ) {
|
||||
@@ -3105,7 +3142,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (attrs.Size() != expected_attrs_size) {
|
||||
throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
|
||||
}
|
||||
bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], ck_cdef);
|
||||
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
|
||||
switch (op) {
|
||||
case comparison_operator_type::EQ:
|
||||
@@ -3119,7 +3156,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
case comparison_operator_type::GT:
|
||||
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
|
||||
case comparison_operator_type::BETWEEN: {
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_upper_limit = get_key_from_typed_value(attrs[1], ck_cdef);
|
||||
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
|
||||
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
|
||||
}
|
||||
@@ -3132,9 +3169,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
|
||||
throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
|
||||
}
|
||||
std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].GetString();
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(raw_upper_limit_str);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_upper_limit), ck, schema, ck_cdef.type);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef.type);
|
||||
}
|
||||
default:
|
||||
throw api_error("ValidationException", format("Unknown primary key bound passed: {}", int(op)));
|
||||
|
||||
@@ -63,6 +63,10 @@ public:
|
||||
|
||||
static write_isolation get_write_isolation_for_schema(schema_ptr schema);
|
||||
|
||||
static write_isolation default_write_isolation;
|
||||
public:
|
||||
static void set_default_write_isolation(std::string_view mode);
|
||||
|
||||
protected:
|
||||
// The full request JSON
|
||||
rjson::value _request;
|
||||
|
||||
@@ -130,7 +130,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
||||
*/
|
||||
future<db_clock::time_point> get_local_streams_timestamp();
|
||||
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_topology_description table.
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_generations table.
|
||||
* Returns the timestamp of this new generation.
|
||||
*
|
||||
* Should be called when starting the node for the first time (i.e., joining the ring).
|
||||
@@ -159,9 +159,9 @@ db_clock::time_point make_new_cdc_generation(
|
||||
std::optional<db_clock::time_point> get_streams_timestamp_for(const gms::inet_address& endpoint, const gms::gossiper&);
|
||||
|
||||
/* Inform CDC users about a generation of streams (identified by the given timestamp)
|
||||
* by inserting it into the cdc_description table.
|
||||
* by inserting it into the cdc_streams table.
|
||||
*
|
||||
* Assumes that the cdc_topology_description table contains this generation.
|
||||
* Assumes that the cdc_generations table contains this generation.
|
||||
*
|
||||
* Returning from this function does not mean that the table update was successful: the function
|
||||
* might run an asynchronous task in the background.
|
||||
|
||||
@@ -140,6 +140,9 @@ public:
|
||||
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
// Returns whether or not interposer consumer is used by a given strategy.
|
||||
bool use_interposer_consumer() const;
|
||||
};
|
||||
|
||||
// Creates a compaction_strategy object from one of the strategies available.
|
||||
|
||||
10
database.cc
10
database.cc
@@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const
|
||||
|
||||
inline
|
||||
std::unique_ptr<compaction_manager>
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
|
||||
}
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
@@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
|
||||
: _stats(make_lw_shared<db_stats>())
|
||||
, _cl_stats(std::make_unique<cell_locker_stats>())
|
||||
, _cfg(cfg)
|
||||
@@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _mutation_query_stage()
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
|
||||
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
|
||||
|
||||
@@ -1427,7 +1427,7 @@ public:
|
||||
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
|
||||
|
||||
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
|
||||
database(database&&) = delete;
|
||||
~database();
|
||||
|
||||
|
||||
@@ -681,7 +681,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
@@ -736,6 +736,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, redis_port(this, "redis_port", value_status::Used, 0, "Port on which the REDIS transport listens for clients.")
|
||||
, redis_ssl_port(this, "redis_ssl_port", value_status::Used, 0, "Port on which the REDIS TLS native transport listens for clients.")
|
||||
|
||||
@@ -314,6 +314,8 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
named_value<uint16_t> redis_port;
|
||||
|
||||
@@ -703,6 +703,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -725,6 +726,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
@@ -72,7 +72,7 @@ schema_ptr view_build_status() {
|
||||
}
|
||||
|
||||
/* An internal table used by nodes to exchange CDC generation data. */
|
||||
schema_ptr cdc_topology_description() {
|
||||
schema_ptr cdc_generations() {
|
||||
thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION);
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION, {id})
|
||||
@@ -108,7 +108,7 @@ schema_ptr cdc_desc() {
|
||||
static std::vector<schema_ptr> all_tables() {
|
||||
return {
|
||||
view_build_status(),
|
||||
cdc_topology_description(),
|
||||
cdc_generations(),
|
||||
cdc_desc(),
|
||||
};
|
||||
}
|
||||
@@ -204,7 +204,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
|
||||
false).discard_result();
|
||||
}
|
||||
|
||||
/* We want to make sure that writes/reads to/from cdc_topology_description and cdc_description
|
||||
/* We want to make sure that writes/reads to/from cdc_generations and cdc_streams
|
||||
* are consistent: a read following an acknowledged write to the same partition should contact
|
||||
* at least one of the replicas that the write contacted.
|
||||
* Normally we would achieve that by always using CL = QUORUM,
|
||||
|
||||
@@ -48,10 +48,10 @@ public:
|
||||
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
||||
|
||||
/* Nodes use this table to communicate new CDC stream generations to other nodes. */
|
||||
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_topology_description";
|
||||
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generations";
|
||||
|
||||
/* This table is used by CDC clients to learn about avaliable CDC streams. */
|
||||
static constexpr auto CDC_DESC = "cdc_description";
|
||||
static constexpr auto CDC_DESC = "cdc_streams";
|
||||
|
||||
/* Information required to modify/query some system_distributed tables, passed from the caller. */
|
||||
struct context {
|
||||
|
||||
13
dist/common/scripts/scylla_coredump_setup
vendored
13
dist/common/scripts/scylla_coredump_setup
vendored
@@ -65,8 +65,8 @@ Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
|
||||
[Mount]
|
||||
What=/var/lib/systemd/coredump
|
||||
Where=/var/lib/scylla/coredump
|
||||
What=/var/lib/scylla/coredump
|
||||
Where=/var/lib/systemd/coredump
|
||||
Type=none
|
||||
Options=bind
|
||||
|
||||
@@ -78,6 +78,7 @@ WantedBy=multi-user.target
|
||||
makedirs('/var/lib/scylla/coredump')
|
||||
systemd_unit.reload()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').enable()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').start()
|
||||
if os.path.exists('/usr/lib/sysctl.d/50-coredump.conf'):
|
||||
run('sysctl -p /usr/lib/sysctl.d/50-coredump.conf')
|
||||
else:
|
||||
@@ -99,6 +100,14 @@ WantedBy=multi-user.target
|
||||
try:
|
||||
run('coredumpctl --no-pager --no-legend info {}'.format(pid))
|
||||
print('\nsystemd-coredump is working finely.')
|
||||
|
||||
# get last coredump generated by bash and remove it, ignore inaccessaible ones
|
||||
corefile = out(cmd=r'coredumpctl -1 --no-legend dump 2>&1 | grep "bash" | '
|
||||
r'grep -v "inaccessible" | grep "Storage:\|Coredump:"',
|
||||
shell=True, exception=False)
|
||||
if corefile:
|
||||
corefile = corefile.split()[-1]
|
||||
run('rm -f {}'.format(corefile))
|
||||
except subprocess.CalledProcessError as e:
|
||||
print('Does not able to detect coredump, failed to configure systemd-coredump.')
|
||||
sys.exit(1)
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
|
||||
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
|
||||
1
dist/docker/redhat/commandlineparser.py
vendored
1
dist/docker/redhat/commandlineparser.py
vendored
@@ -18,6 +18,7 @@ def parse():
|
||||
parser.add_argument('--api-address', default=None, dest='apiAddress')
|
||||
parser.add_argument('--alternator-address', default=None, dest='alternatorAddress', help="Alternator API address to listen to. Defaults to listen address.")
|
||||
parser.add_argument('--alternator-port', default=None, dest='alternatorPort', help="Alternator API port to listen to. Disabled by default.")
|
||||
parser.add_argument('--alternator-write-isolation', default=None, dest='alternatorWriteIsolation', help="Alternator default write isolation policy.")
|
||||
parser.add_argument('--disable-version-check', default=False, action='store_true', dest='disable_housekeeping', help="Disable version check")
|
||||
parser.add_argument('--authenticator', default=None, dest='authenticator', help="Set authenticator class")
|
||||
parser.add_argument('--authorizer', default=None, dest='authorizer', help="Set authorizer class")
|
||||
|
||||
4
dist/docker/redhat/scyllasetup.py
vendored
4
dist/docker/redhat/scyllasetup.py
vendored
@@ -16,6 +16,7 @@ class ScyllaSetup:
|
||||
self._broadcastRpcAddress = arguments.broadcastRpcAddress
|
||||
self._apiAddress = arguments.apiAddress
|
||||
self._alternatorPort = arguments.alternatorPort
|
||||
self._alternatorWriteIsolation = arguments.alternatorWriteIsolation
|
||||
self._smp = arguments.smp
|
||||
self._memory = arguments.memory
|
||||
self._overprovisioned = arguments.overprovisioned
|
||||
@@ -116,6 +117,9 @@ class ScyllaSetup:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-port %s" % self._alternatorPort]
|
||||
|
||||
if self._alternatorWriteIsolation is not None:
|
||||
args += ["--alternator-write-isolation %s" % self._alternatorWriteIsolation]
|
||||
|
||||
if self._authenticator is not None:
|
||||
args += ["--authenticator %s" % self._authenticator]
|
||||
|
||||
|
||||
@@ -25,6 +25,14 @@ By default, Scylla listens on this port on all network interfaces.
|
||||
To listen only on a specific interface, pass also an "`alternator-address`"
|
||||
option.
|
||||
|
||||
As we explain below in the "Write isolation policies", Alternator has
|
||||
four different choices for the implementation of writes, each with
|
||||
different advantages. You should consider which of the options makes
|
||||
more sense for your intended use case, and use the "`--alternator-write-isolation`"
|
||||
option to choose one. There is currently no default for this option: Trying
|
||||
to run Scylla with Alternator enabled without passing this option will
|
||||
result in an error asking you to set it.
|
||||
|
||||
DynamoDB clients usually specify a single "endpoint" address, e.g.,
|
||||
`dynamodb.us-east-1.amazonaws.com`, and a DNS server hosted on that address
|
||||
distributes the connections to many different backend nodes. Alternator
|
||||
@@ -108,12 +116,15 @@ implemented, with the following limitations:
|
||||
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
||||
or LOCAL_QUORUM (strong consistency).
|
||||
### Global Tables
|
||||
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
|
||||
be accessed from all of Scylla's DCs.
|
||||
* We do not yet support the DynamoDB API calls to make some of the tables
|
||||
global and others local to a particular DC: CreateGlobalTable,
|
||||
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
|
||||
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
|
||||
* Currently, *all* Alternator tables are created as "global" tables and can
|
||||
be accessed from all the DCs existing at the time of the table's creation.
|
||||
If a DC is added after a table is created, the table won't be visible from
|
||||
the new DC and changing that requires a CQL "ALTER TABLE" statement to
|
||||
modify the table's replication strategy.
|
||||
* We do not yet support the DynamoDB API calls that control which table is
|
||||
visible from what DC: CreateGlobalTable, UpdateGlobalTable,
|
||||
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
|
||||
DescribeGlobalTableSettings, and UpdateTable.
|
||||
### Backup and Restore
|
||||
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
|
||||
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
@@ -153,23 +164,28 @@ implemented, with the following limitations:
|
||||
|
||||
### Write isolation policies
|
||||
DynamoDB API update requests may involve a read before the write - e.g., a
|
||||
_conditional_ update, or an update based on the old value of an attribute.
|
||||
_conditional_ update or an update based on the old value of an attribute.
|
||||
The read and the write should be treated as a single transaction - protected
|
||||
(_isolated_) from other parallel writes to the same item.
|
||||
|
||||
By default, Alternator does this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation. However, LWT significantly slows
|
||||
writes down, so Alternator supports three additional _write isolation
|
||||
policies_, which can be chosen on a per-table basis and may make sense for
|
||||
certain workloads as explained below.
|
||||
Alternator could do this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation, but this significantly slows
|
||||
down writes, and not necessary for workloads which don't use read-modify-write
|
||||
(RMW) updates.
|
||||
|
||||
The write isolation policy of a table is configured by tagging the table (at
|
||||
CreateTable time, or any time later with TagResource) with the key
|
||||
So Alternator supports four _write isolation policies_, which can be chosen
|
||||
on a per-table basis and may make sense for certain workloads as explained
|
||||
below.
|
||||
|
||||
A default write isolation policy **must** be chosen using the
|
||||
`--alternator-write-isolation` configuration option. Additionally, the write
|
||||
isolation policy for a specific table can be overriden by tagging the table
|
||||
(at CreateTable time, or any time later with TagResource) with the key
|
||||
`system:write_isolation`, and one of the following values:
|
||||
|
||||
* `a`, `always`, or `always_use_lwt` - This is the default choice.
|
||||
It performs every write operation - even those that do not need a read
|
||||
before the write - as a lightweight transaction.
|
||||
* `a`, `always`, or `always_use_lwt` - This mode performs every write
|
||||
operation - even those that do not need a read before the write - as a
|
||||
lightweight transaction.
|
||||
|
||||
This is the slowest choice, but also the only choice guaranteed to work
|
||||
correctly for every workload.
|
||||
|
||||
@@ -10,10 +10,12 @@ This section will guide you through the steps for setting up the cluster:
|
||||
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
||||
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
||||
add to every "docker run" command: `-p 8000:8000` before the image name
|
||||
and `--alternator-port=8000` at the end. The "alternator-port" option
|
||||
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
|
||||
and `--alternator-port=8000 --alternator-write-isolation=always` at the end.
|
||||
The "alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and the "alternator-write-isolation" chooses
|
||||
whether or not Alternator will use LWT for every write.
|
||||
For example,
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --alternator-write-isolation=always
|
||||
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
|
||||
12
docs/cdc.md
12
docs/cdc.md
@@ -92,7 +92,7 @@ Shard-colocation is an optimization.
|
||||
|
||||
Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster it modifies the token ring by refining existing vnodes into smaller vnodes. But before it does it, it will introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas).
|
||||
|
||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::generate_topology_description` function. It then inserts the generation description into an internal distributed table `cdc_topology_description` in the `system_distributed` keyspace. The table is defined as follows (from db/system_distributed_keyspace.cc):
|
||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::generate_topology_description` function. It then inserts the generation description into an internal distributed table `cdc_generations` in the `system_distributed` keyspace. The table is defined as follows (from db/system_distributed_keyspace.cc):
|
||||
```
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION, {id})
|
||||
/* The timestamp of this CDC generation. */
|
||||
@@ -131,11 +131,11 @@ Next, the node starts gossiping the timestamp of the new generation together wit
|
||||
}).get();
|
||||
```
|
||||
|
||||
When other nodes learn about the generation, they'll extract it from the `cdc_topology_description` table and save it using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||
When other nodes learn about the generation, they'll extract it from the `cdc_generations` table and save it using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||
Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for a minute or two. Thus colocation will be lost for a while. This problem will be fixed when the two-phase-commit approach is implemented.
|
||||
|
||||
We're not able to prevent a node learning about a new generation too late due to a network partition: if gossip doesn't reach the node in time, some writes might be sent to the wrong (old) generation.
|
||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_topology_description`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (just the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_generations`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (just the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||
Thus we give up availability for safety. This likely won't happen if the administrator ensures that the cluster is not partitioned before bootstrapping a new node. This problem will also be mitigated with a future patch.
|
||||
|
||||
Due to the need of maintaining colocation we don't allow the client to send writes with arbitrary timestamps.
|
||||
@@ -144,7 +144,7 @@ Reason: we cannot allow writes before `T`, because they belong to the old genera
|
||||
|
||||
### Streams description table
|
||||
|
||||
The `cdc_description` table in the `system_distributed` keyspace allows CDC clients to learn about available sets of streams and the time intervals they are operating at. It's definition is as follows (db/system_distributed_keyspace.cc):
|
||||
The `cdc_streams` table in the `system_distributed` keyspace allows CDC clients to learn about available sets of streams and the time intervals they are operating at. It's definition is as follows (db/system_distributed_keyspace.cc):
|
||||
```
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_DESC, {id})
|
||||
/* The timestamp of this CDC generation. */
|
||||
@@ -161,9 +161,9 @@ where
|
||||
thread_local data_type cdc_stream_tuple_type = tuple_type_impl::get_instance({long_type, long_type});
|
||||
thread_local data_type cdc_streams_set_type = set_type_impl::get_instance(cdc_stream_tuple_type, false);
|
||||
```
|
||||
This table simply contains each generation's timestamp (as partition key) and the set of stream IDs used by this generation. It is meant to be user-facing, in contrast to `cdc_topology_description` which is used internally.
|
||||
This table simply contains each generation's timestamp (as partition key) and the set of stream IDs used by this generation. It is meant to be user-facing, in contrast to `cdc_generations` which is used internally.
|
||||
|
||||
When nodes learn about a CDC generation through gossip, they race to update the description table by inserting a proper row (see `cdc::update_streams_description`). This operation is idempotent so it doesn't matter if multiple nodes do it at the same time.
|
||||
|
||||
#### TODO: expired generations
|
||||
The `expired` column in `cdc_description` and `cdc_topology_description` means that this generation was superseded by some new generation and will soon be removed (its table entry will be gone). This functionality is yet to be implemented.
|
||||
The `expired` column in `cdc_streams` and `cdc_generations` means that this generation was superseded by some new generation and will soon be removed (its table entry will be gone). This functionality is yet to be implemented.
|
||||
|
||||
@@ -163,6 +163,12 @@ $ docker run --name some-scylla -d scylladb/scylla --alternator-port 8000
|
||||
|
||||
**Since: 3.2**
|
||||
|
||||
### `--alternator-write-isolation policy`
|
||||
|
||||
The `--alternator-write-isolation` command line option chooses between four allowed write isolation policies described in docs/alternator/alternator.md. This option must be specified if Alternator is enabled - it does not have a default.
|
||||
|
||||
**Since: 4.1**
|
||||
|
||||
### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.
|
||||
|
||||
@@ -175,6 +175,7 @@ public:
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::HIBERNATE,
|
||||
versioned_value::STATUS_BOOTSTRAPPING,
|
||||
versioned_value::STATUS_UNKNOWN,
|
||||
};
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
10
main.cc
10
main.cc
@@ -78,6 +78,7 @@
|
||||
#include "cdc/log.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "alternator/tags_extension.hh"
|
||||
#include "alternator/rmw_operation.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -736,7 +737,7 @@ int main(int ac, char** av) {
|
||||
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
|
||||
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
start_large_data_handler(db).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1081,6 +1082,7 @@ int main(int ac, char** av) {
|
||||
}
|
||||
|
||||
if (cfg->alternator_port() || cfg->alternator_https_port()) {
|
||||
alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
static sharded<alternator::server> alternator_server;
|
||||
|
||||
@@ -1186,6 +1188,12 @@ int main(int ac, char** av) {
|
||||
}
|
||||
});
|
||||
|
||||
auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
|
||||
db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
});
|
||||
|
||||
auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
|
||||
if (cfg->redis_port() || cfg->redis_ssl_port()) {
|
||||
redis.stop().get();
|
||||
|
||||
@@ -450,6 +450,7 @@ class repair_writer {
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
streaming::stream_reason _reason;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -561,11 +562,18 @@ public:
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
_mq[node_idx]->abort(ep);
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -588,6 +596,10 @@ public:
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -1191,6 +1203,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1216,18 +1245,7 @@ private:
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1238,15 +1256,7 @@ private:
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1936,22 +1946,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1977,22 +1982,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2017,22 +2017,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: e708d1df3a...78f626af6c
@@ -1040,12 +1040,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
@@ -2602,11 +2606,8 @@ future<> storage_service::drain() {
|
||||
ss.do_stop_ms().get();
|
||||
|
||||
// Interrupt on going compaction and shutdown to prevent further compaction
|
||||
// No new compactions will be started from this call site on, but we don't need
|
||||
// to wait for them to stop. Drain leaves the node alive, and a future shutdown
|
||||
// will wait on the compaction_manager stop future.
|
||||
ss.db().invoke_on_all([] (auto& db) {
|
||||
db.get_compaction_manager().do_stop();
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
|
||||
ss.set_mode(mode::DRAINING, "flushing column families", false);
|
||||
|
||||
@@ -548,6 +548,7 @@ private:
|
||||
}
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0;
|
||||
virtual bool use_interposer_consumer() const = 0;
|
||||
|
||||
compaction_info finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
|
||||
_info->ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(ended_at.time_since_epoch()).count();
|
||||
@@ -629,8 +630,10 @@ public:
|
||||
return garbage_collected_sstable_writer(_gc_sstable_writer_data);
|
||||
}
|
||||
|
||||
bool contains_multi_fragment_runs() const {
|
||||
return _contains_multi_fragment_runs;
|
||||
bool enable_garbage_collected_sstable_writer() const {
|
||||
// FIXME: Disable GC writer if interposer consumer is enabled until they both can work simultaneously.
|
||||
// More details can be found at https://github.com/scylladb/scylla/issues/6472
|
||||
return _contains_multi_fragment_runs && !use_interposer_consumer();
|
||||
}
|
||||
|
||||
template <typename GCConsumer = noop_compacted_fragments_consumer>
|
||||
@@ -740,6 +743,10 @@ public:
|
||||
return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return _cf.get_compaction_strategy().use_interposer_consumer();
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Compacting {}", formatted_msg);
|
||||
}
|
||||
@@ -820,7 +827,7 @@ private:
|
||||
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
|
||||
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
|
||||
// meaning incremental compaction is disabled for this compaction.
|
||||
if (!_contains_multi_fragment_runs) {
|
||||
if (!enable_garbage_collected_sstable_writer()) {
|
||||
return;
|
||||
}
|
||||
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
|
||||
@@ -1238,6 +1245,10 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Resharding {}", formatted_msg);
|
||||
}
|
||||
@@ -1330,7 +1341,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf)
|
||||
cf.schema()->ks_name(), cf.schema()->cf_name()));
|
||||
}
|
||||
auto c = make_compaction(cf, std::move(descriptor));
|
||||
if (c->contains_multi_fragment_runs()) {
|
||||
if (c->enable_garbage_collected_sstable_writer()) {
|
||||
auto gc_writer = c->make_garbage_collected_sstable_writer();
|
||||
return compaction::run(std::move(c), std::move(gc_writer));
|
||||
}
|
||||
|
||||
@@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
|
||||
});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
|
||||
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
|
||||
auto b = backlog() / available_memory;
|
||||
// This means we are using an unimplemented strategy
|
||||
@@ -372,26 +372,17 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
|
||||
: _compaction_controller(sg, iop, shares)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
, _available_memory(available_memory)
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(1)
|
||||
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
{}
|
||||
|
||||
compaction_manager::~compaction_manager() {
|
||||
@@ -455,17 +446,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop() {
|
||||
do_stop();
|
||||
return std::move(*_stop_future);
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() {
|
||||
if (_stopped) {
|
||||
return;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
_stopped = true;
|
||||
cmlog.info("Asked to stop");
|
||||
_stopped = true;
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
// Stop all ongoing compaction.
|
||||
@@ -475,10 +460,7 @@ void compaction_manager::do_stop() {
|
||||
// Wait for each task handler to stop. Copy list because task remove itself
|
||||
// from the list when done.
|
||||
auto tasks = _tasks;
|
||||
|
||||
// fine to ignore here, since it is used to set up the shared promise in
|
||||
// the finally block. Waiters will wait on the shared_future through stop().
|
||||
_stop_future.emplace(do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return parallel_for_each(tasks, [this] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
});
|
||||
@@ -490,7 +472,7 @@ void compaction_manager::do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return _compaction_controller.shutdown();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
|
||||
@@ -523,7 +505,8 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
|
||||
} catch (storage_io_error& e) {
|
||||
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
|
||||
retry = false;
|
||||
do_stop();
|
||||
// FIXME discarded future.
|
||||
(void)stop();
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
|
||||
retry = true;
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/scheduling.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "log.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include <vector>
|
||||
@@ -70,9 +69,6 @@ private:
|
||||
|
||||
// Used to assert that compaction_manager was explicitly stopped, if started.
|
||||
bool _stopped = true;
|
||||
// We use a shared promise to indicate whether or not we are stopped because it is legal
|
||||
// for stop() to be called twice. For instance it is called on DRAIN and shutdown.
|
||||
std::optional<future<>> _stop_future;
|
||||
|
||||
stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
@@ -153,10 +149,9 @@ private:
|
||||
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
|
||||
|
||||
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);
|
||||
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
||||
public:
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
|
||||
@@ -165,13 +160,9 @@ public:
|
||||
// Start compaction manager.
|
||||
void start();
|
||||
|
||||
// Stop all fibers. Ongoing compactions will be waited. Should only be called
|
||||
// once, from main teardown path.
|
||||
// Stop all fibers. Ongoing compactions will be waited.
|
||||
future<> stop();
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop();
|
||||
|
||||
bool stopped() const { return _stopped; }
|
||||
|
||||
// Submit a column family to be compacted.
|
||||
|
||||
@@ -1080,6 +1080,10 @@ reader_consumer compaction_strategy::make_interposer_consumer(const mutation_sou
|
||||
return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool compaction_strategy::use_interposer_consumer() const {
|
||||
return _compaction_strategy_impl->use_interposer_consumer();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
|
||||
@@ -99,5 +99,9 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
virtual bool use_interposer_consumer() const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
|
||||
@@ -346,6 +346,10 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override;
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override;
|
||||
|
||||
virtual bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -73,6 +73,7 @@ done
|
||||
--alternator-address $SCYLLA_IP \
|
||||
$alternator_port_option \
|
||||
--alternator-enforce-authorization=1 \
|
||||
--alternator-write-isolation=always_use_lwt \
|
||||
--developer-mode=1 \
|
||||
--ring-delay-ms 0 --collectd 0 \
|
||||
--smp 2 -m 1G \
|
||||
|
||||
@@ -283,8 +283,8 @@ SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_description) {
|
||||
assert_unauthorized(format("DROP TABLE {}", full_name));
|
||||
};
|
||||
|
||||
test_table("cdc_description");
|
||||
test_table("cdc_topology_description");
|
||||
test_table("cdc_streams");
|
||||
test_table("cdc_generations");
|
||||
}, mk_cdc_test_config()).get();
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), true).get();
|
||||
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
|
||||
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
db.invoke_on_all([] (database& db) {
|
||||
db.get_compaction_manager().start();
|
||||
}).get();
|
||||
|
||||
@@ -670,11 +670,11 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("8182496e-4baf-3a07-91e6-caa140388846"),
|
||||
utils::UUID("a65ea746-4d8a-3e5c-8fbf-5f70c14dbcbc"),
|
||||
utils::UUID("a65ea746-4d8a-3e5c-8fbf-5f70c14dbcbc"),
|
||||
utils::UUID("4c138336-4677-3520-8556-4aab007cfedb"),
|
||||
utils::UUID("4c138336-4677-3520-8556-4aab007cfedb"),
|
||||
utils::UUID("2fb5d448-c537-39d1-9384-5166bcdcaa9a"),
|
||||
utils::UUID("7786dd34-2256-38f8-881e-79b062397069"),
|
||||
utils::UUID("7786dd34-2256-38f8-881e-79b062397069"),
|
||||
utils::UUID("5ca0cc9b-3651-3651-96ab-2324cdc07300"),
|
||||
utils::UUID("5ca0cc9b-3651-3651-96ab-2324cdc07300"),
|
||||
utils::UUID("62e1e586-6eec-3ff5-882a-89386664694b"),
|
||||
utils::UUID("daf6ded5-c294-3b07-b6a0-1b318a3c2e17"),
|
||||
utils::UUID("370c7d8e-0a4a-394d-b627-318805c64584"),
|
||||
@@ -685,11 +685,11 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("a33bc2a7-33b7-335d-8644-ecfdd23d1ca6"),
|
||||
utils::UUID("8ec3169e-33f9-356e-9a20-172ddf4261dc"),
|
||||
utils::UUID("8ec3169e-33f9-356e-9a20-172ddf4261dc"),
|
||||
utils::UUID("6d3a2294-0e82-33b8-943a-459cc9f3bf76"),
|
||||
utils::UUID("6d3a2294-0e82-33b8-943a-459cc9f3bf76"),
|
||||
utils::UUID("72d2ee27-a675-397d-85e1-1c49d3dcba13"),
|
||||
utils::UUID("e5a2ec93-1f1a-33b2-ad2e-9795f4b6b229"),
|
||||
utils::UUID("e5a2ec93-1f1a-33b2-ad2e-9795f4b6b229"),
|
||||
utils::UUID("6f1f5e2a-834a-37f8-ae05-ef4a1f406996"),
|
||||
utils::UUID("6f1f5e2a-834a-37f8-ae05-ef4a1f406996"),
|
||||
utils::UUID("e4c2bd0d-5f02-3d6f-9a43-de38b152b1fd"),
|
||||
utils::UUID("3b2c4957-4434-3078-ae42-fedcd81ac8cd"),
|
||||
utils::UUID("90518efe-88e6-39bd-a0a6-d32efc80777a"),
|
||||
@@ -700,11 +700,11 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_functions) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("e8879c0e-a731-3ac5-9b43-d2ed33b331f2"),
|
||||
utils::UUID("4a20c241-583c-334e-9fe9-b906280f724f"),
|
||||
utils::UUID("4a20c241-583c-334e-9fe9-b906280f724f"),
|
||||
utils::UUID("9711e6c4-dfcd-3c09-bf8b-f02811f73730"),
|
||||
utils::UUID("9711e6c4-dfcd-3c09-bf8b-f02811f73730"),
|
||||
utils::UUID("f169e77d-8ee1-3994-9379-065bcb9d1646"),
|
||||
utils::UUID("7185d744-0038-37ff-9770-04764feedbb7"),
|
||||
utils::UUID("7185d744-0038-37ff-9770-04764feedbb7"),
|
||||
utils::UUID("6d285eda-8963-3687-9ba6-a00764324b67"),
|
||||
utils::UUID("6d285eda-8963-3687-9ba6-a00764324b67"),
|
||||
utils::UUID("e96eb4ca-4f90-3b47-bfed-81e4a441734c"),
|
||||
utils::UUID("14f6c60f-8ba3-3141-8958-dd74366ee1ca"),
|
||||
utils::UUID("987a3386-83d1-3436-b3fc-1d2a3cfdd659"),
|
||||
@@ -724,11 +724,11 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_cdc_options) {
|
||||
auto ext = std::make_shared<db::extensions>();
|
||||
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("07d3ffb8-b7f5-367d-b128-d34b2033b788"),
|
||||
utils::UUID("9500fd95-abeb-32ea-b7af-568021eee217"),
|
||||
utils::UUID("9500fd95-abeb-32ea-b7af-568021eee217"),
|
||||
utils::UUID("9bd2ee49-f6db-37c7-a81f-1c2524dec3bf"),
|
||||
utils::UUID("9bd2ee49-f6db-37c7-a81f-1c2524dec3bf"),
|
||||
utils::UUID("fd939d2a-41fc-33e8-aa65-7d7b1678b307"),
|
||||
utils::UUID("87f0a70e-9dcd-34ae-8b72-bb23addab551"),
|
||||
utils::UUID("87f0a70e-9dcd-34ae-8b72-bb23addab551"),
|
||||
utils::UUID("4c8bf5c8-4823-3f35-9e34-275978f130c9"),
|
||||
utils::UUID("4c8bf5c8-4823-3f35-9e34-275978f130c9"),
|
||||
utils::UUID("549d0735-3087-3cf5-b4b6-23518a803246"),
|
||||
utils::UUID("612eaafb-27a4-3c01-b292-5d4424585ff7"),
|
||||
utils::UUID("01ea7d67-6f30-3215-aaf0-b7e2266daec5"),
|
||||
|
||||
@@ -5771,3 +5771,89 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
|
||||
cm.stop().wait();
|
||||
});
|
||||
}
|
||||
|
||||
//
|
||||
// Test that https://github.com/scylladb/scylla/issues/6472 is gone
|
||||
//
|
||||
SEASTAR_TEST_CASE(test_bug_6472) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
auto builder = schema_builder("tests", "test_bug_6472")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map<sstring, sstring> opts = {
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" },
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" },
|
||||
};
|
||||
builder.set_compaction_strategy_options(opts);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
auto s = builder.build();
|
||||
|
||||
auto sst_gen = [&env, s, tmpdir_path, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, tmpdir_path, (*gen)++, la, big);
|
||||
};
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_expiring_cell = [&] (std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step), gc_clock::duration(step + 5s));
|
||||
return m;
|
||||
};
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.datadir = tmpdir_path;
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
reader_concurrency_semaphore sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
cfg.read_concurrency_semaphore = &sem;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
// Make 100 expiring cells which belong to different time windows
|
||||
std::vector<mutation> muts;
|
||||
muts.reserve(101);
|
||||
for (auto i = 1; i < 101; i++) {
|
||||
muts.push_back(make_expiring_cell(std::chrono::hours(i)));
|
||||
}
|
||||
muts.push_back(make_expiring_cell(std::chrono::hours(110)));
|
||||
|
||||
//
|
||||
// Reproduce issue 6472 by making an input set which causes both interposer and GC writer to be enabled
|
||||
//
|
||||
std::vector<shared_sstable> sstables_spanning_many_windows = {
|
||||
make_sstable_containing(sst_gen, muts),
|
||||
make_sstable_containing(sst_gen, muts),
|
||||
};
|
||||
utils::UUID run_id = utils::make_random_uuid();
|
||||
for (auto& sst : sstables_spanning_many_windows) {
|
||||
sstables::test(sst).set_run_identifier(run_id);
|
||||
}
|
||||
|
||||
// Make sure everything we wanted expired is expired by now.
|
||||
forward_jump_clocks(std::chrono::hours(101));
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(sstables_spanning_many_windows),
|
||||
*cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -460,7 +460,7 @@ public:
|
||||
|
||||
database_config dbcfg;
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
});
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3079304936
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2013820447
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
708745264
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3997503000
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
4279379389
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
153527140
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
4090444561
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user