Merge 'alternator: move uses of replica module to data_dictionary' from Avi Kivity
Alternator is a coordinator-side service and so should not access the replica module. In this series all but one of uses of the replica module are replaced with data_dictionary. One case remains - accessing the replication map which is not available (and should not be available) via the data dictionary. The data_dictionary module is expanded with missing accessors. Closes #9945 * github.com:scylladb/scylla: alternator: switch to data_dictionary for table listing purposes data_dictionary: add get_tables() data_dictionary: introduce keyspace::is_internal()
This commit is contained in:
@@ -21,7 +21,6 @@
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "alternator/executor.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "query-result-set.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
@@ -124,7 +123,7 @@ std::string get_signature(std::string_view access_key_id, std::string_view secre
|
||||
}
|
||||
|
||||
future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username) {
|
||||
schema_ptr schema = proxy.get_db().local().find_schema("system_auth", "roles");
|
||||
schema_ptr schema = proxy.data_dictionary().find_schema("system_auth", "roles");
|
||||
partition_key pk = partition_key::from_single_value(*schema, utf8_type->decompose(username));
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
|
||||
@@ -14,9 +14,9 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "log.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "data_dictionary/keyspace_metadata.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "timestamp.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "types/map.hh"
|
||||
#include "schema.hh"
|
||||
#include "query-request.hh"
|
||||
@@ -209,8 +209,8 @@ schema_ptr executor::find_table(service::storage_proxy& proxy, const rjson::valu
|
||||
return nullptr;
|
||||
}
|
||||
try {
|
||||
return proxy.get_db().local().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + sstring(*table_name), *table_name);
|
||||
} catch(replica::no_such_column_family&) {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + sstring(*table_name), *table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
throw api_error::resource_not_found(
|
||||
format("Requested resource not found: Table: {} not found", *table_name));
|
||||
}
|
||||
@@ -226,7 +226,7 @@ schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request)
|
||||
return schema;
|
||||
}
|
||||
|
||||
static std::tuple<bool, std::string_view, std::string_view> try_get_internal_table(std::string_view table_name) {
|
||||
static std::tuple<bool, std::string_view, std::string_view> try_get_internal_table(data_dictionary::database db, std::string_view table_name) {
|
||||
size_t it = table_name.find(executor::INTERNAL_TABLE_PREFIX);
|
||||
if (it != 0) {
|
||||
return {false, "", ""};
|
||||
@@ -239,7 +239,8 @@ static std::tuple<bool, std::string_view, std::string_view> try_get_internal_tab
|
||||
std::string_view ks_name = table_name.substr(0, delim);
|
||||
table_name.remove_prefix(ks_name.size() + 1);
|
||||
// Only internal keyspaces can be accessed to avoid leakage
|
||||
if (!is_internal_keyspace(ks_name)) {
|
||||
auto ks = db.try_find_keyspace(ks_name);
|
||||
if (!ks || !ks->is_internal()) {
|
||||
return {false, "", ""};
|
||||
}
|
||||
return {true, ks_name, table_name};
|
||||
@@ -255,11 +256,11 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
table_or_view_type type = table_or_view_type::base;
|
||||
std::string table_name = get_table_name(request);
|
||||
|
||||
auto [is_internal_table, internal_ks_name, internal_table_name] = try_get_internal_table(table_name);
|
||||
auto [is_internal_table, internal_ks_name, internal_table_name] = try_get_internal_table(proxy.data_dictionary(), table_name);
|
||||
if (is_internal_table) {
|
||||
try {
|
||||
return { proxy.get_db().local().find_schema(sstring(internal_ks_name), sstring(internal_table_name)), type };
|
||||
} catch (replica::no_such_column_family&) {
|
||||
return { proxy.data_dictionary().find_schema(sstring(internal_ks_name), sstring(internal_table_name)), type };
|
||||
} catch (data_dictionary::no_such_column_family&) {
|
||||
throw api_error::resource_not_found(
|
||||
format("Requested resource not found: Internal table: {}.{} not found", internal_ks_name, internal_table_name));
|
||||
}
|
||||
@@ -278,20 +279,20 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
format("Non-string IndexName '{}'", index_name->GetString()));
|
||||
}
|
||||
// If no tables for global indexes were found, the index may be local
|
||||
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
|
||||
if (!proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
type = table_or_view_type::lsi;
|
||||
table_name = lsi_name(orig_table_name, index_name->GetString());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return { proxy.get_db().local().find_schema(keyspace_name, table_name), type };
|
||||
} catch(replica::no_such_column_family&) {
|
||||
return { proxy.data_dictionary().find_schema(keyspace_name, table_name), type };
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
if (index_name) {
|
||||
// DynamoDB returns a different error depending on whether the
|
||||
// base table doesn't exist (ResourceNotFoundException) or it
|
||||
// does exist but the index does not (ValidationException).
|
||||
if (proxy.get_db().local().has_schema(keyspace_name, orig_table_name)) {
|
||||
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
|
||||
throw api_error::validation(
|
||||
format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
|
||||
} else {
|
||||
@@ -435,7 +436,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
// Add base table's KeySchema and collect types for AttributeDefinitions:
|
||||
describe_key_schema(table_description, *schema, key_attribute_types);
|
||||
|
||||
replica::table& t = _proxy.get_db().local().find_column_family(schema);
|
||||
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
|
||||
if (!t.views().empty()) {
|
||||
rjson::value gsi_array = rjson::empty_array();
|
||||
rjson::value lsi_array = rjson::empty_array();
|
||||
@@ -495,7 +496,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
co_await _mm.container().invoke_on(0, [&] (service::migration_manager& mm) -> future<> {
|
||||
co_await mm.schema_read_barrier();
|
||||
|
||||
if (!p.local().get_db().local().has_schema(keyspace_name, table_name)) {
|
||||
if (!p.local().data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
|
||||
}
|
||||
|
||||
@@ -614,8 +615,8 @@ static schema_ptr get_table_from_arn(service::storage_proxy& proxy, std::string_
|
||||
size_t table_start = arn.find_last_of('/');
|
||||
std::string_view table_name = arn.substr(table_start + 1);
|
||||
// FIXME: remove sstring creation once find_schema gains a view-based interface
|
||||
return proxy.get_db().local().find_schema(sstring(keyspace_name), sstring(table_name));
|
||||
} catch (const replica::no_such_column_family& e) {
|
||||
return proxy.data_dictionary().find_schema(sstring(keyspace_name), sstring(table_name));
|
||||
} catch (const data_dictionary::no_such_column_family& e) {
|
||||
throw api_error::access_denied("Incorrect resource identifier");
|
||||
} catch (const std::out_of_range& e) {
|
||||
throw api_error::access_denied("Incorrect resource identifier");
|
||||
@@ -1757,8 +1758,8 @@ static schema_ptr get_table_from_batch_request(const service::storage_proxy& pro
|
||||
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
|
||||
validate_table_name(table_name);
|
||||
try {
|
||||
return proxy.get_db().local().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(replica::no_such_column_family&) {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
|
||||
}
|
||||
}
|
||||
@@ -1896,7 +1897,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt)]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = proxy.get_db().local().find_schema(ks, cf);
|
||||
auto schema = proxy.data_dictionary().find_schema(ks, cf);
|
||||
//FIXME: A corresponding FIXME can be found in transport/server.cc when a message must be bounced
|
||||
// to another shard - once it is solved, this place can use a similar solution. Instead of passing
|
||||
// empty_service_permit() to the background operation, the current permit's lifetime should be prolonged,
|
||||
@@ -4096,13 +4097,13 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
return make_ready_future<request_return_type>(api_error::validation("Limit must be greater than 0 and no greater than 100"));
|
||||
}
|
||||
|
||||
auto table_names = _proxy.get_db().local().get_column_families()
|
||||
| boost::adaptors::map_values
|
||||
| boost::adaptors::filtered([] (const lw_shared_ptr<replica::table>& t) {
|
||||
return t->schema()->ks_name().find(KEYSPACE_NAME_PREFIX) == 0 && !t->schema()->is_view();
|
||||
auto tables = _proxy.data_dictionary().get_tables(); // hold on to temporary, table_names isn't a container, it's a view
|
||||
auto table_names = tables
|
||||
| boost::adaptors::filtered([] (data_dictionary::table t) {
|
||||
return t.schema()->ks_name().find(KEYSPACE_NAME_PREFIX) == 0 && !t.schema()->is_view();
|
||||
})
|
||||
| boost::adaptors::transformed([] (const lw_shared_ptr<replica::table>& t) {
|
||||
return t->schema()->cf_name();
|
||||
| boost::adaptors::transformed([] (data_dictionary::table t) {
|
||||
return t.schema()->cf_name();
|
||||
});
|
||||
|
||||
rjson::value response = rjson::empty_object();
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
|
||||
#include "utils/base64.hh"
|
||||
#include "log.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include "cdc/log.hh"
|
||||
@@ -142,8 +141,8 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
auto limit = rjson::get_opt<int>(request, "Limit").value_or(std::numeric_limits<int>::max());
|
||||
auto streams_start = rjson::get_opt<stream_arn>(request, "ExclusiveStartStreamArn");
|
||||
auto table = find_table(_proxy, request);
|
||||
auto& db = _proxy.get_db().local();
|
||||
auto& cfs = db.get_column_families();
|
||||
auto db = _proxy.data_dictionary();
|
||||
auto cfs = db.get_tables();
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
@@ -156,10 +155,10 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
// between queries may or may not miss info. But that should be rare,
|
||||
// and we can probably expect this to be a single call.
|
||||
if (streams_start) {
|
||||
i = std::find_if(i, e, [&](const std::pair<utils::UUID, lw_shared_ptr<replica::column_family>>& p) {
|
||||
return p.first == streams_start
|
||||
&& cdc::get_base_table(db, *p.second->schema())
|
||||
&& is_alternator_keyspace(p.second->schema()->ks_name())
|
||||
i = std::find_if(i, e, [&](data_dictionary::table t) {
|
||||
return t.schema()->id() == streams_start
|
||||
&& cdc::get_base_table(db.real_database(), *t.schema())
|
||||
&& is_alternator_keyspace(t.schema()->ks_name())
|
||||
;
|
||||
});
|
||||
if (i != e) {
|
||||
@@ -173,7 +172,7 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
std::optional<stream_arn> last;
|
||||
|
||||
for (;limit > 0 && i != e; ++i) {
|
||||
auto s = i->second->schema();
|
||||
auto s = i->schema();
|
||||
auto& ks_name = s->ks_name();
|
||||
auto& cf_name = s->cf_name();
|
||||
|
||||
@@ -183,14 +182,14 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
if (table && ks_name != table->ks_name()) {
|
||||
continue;
|
||||
}
|
||||
if (cdc::is_log_for_some_table(db, ks_name, cf_name)) {
|
||||
if (table && table != cdc::get_base_table(db, *s)) {
|
||||
if (cdc::is_log_for_some_table(db.real_database(), ks_name, cf_name)) {
|
||||
if (table && table != cdc::get_base_table(db.real_database(), *s)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
rjson::value new_entry = rjson::empty_object();
|
||||
|
||||
last = i->first;
|
||||
last = i->schema()->id();
|
||||
rjson::add(new_entry, "StreamArn", *last);
|
||||
rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s)));
|
||||
rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(table_name(*s))));
|
||||
@@ -411,7 +410,7 @@ using namespace std::string_literals;
|
||||
* This will be a partial overlap, but it is the best we can do.
|
||||
*/
|
||||
|
||||
static std::chrono::seconds confidence_interval(const replica::database& db) {
|
||||
static std::chrono::seconds confidence_interval(data_dictionary::database db) {
|
||||
return std::chrono::seconds(db.get_config().alternator_streams_time_window_s());
|
||||
}
|
||||
|
||||
@@ -429,12 +428,12 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
auto stream_arn = rjson::get<alternator::stream_arn>(request, "StreamArn");
|
||||
|
||||
schema_ptr schema, bs;
|
||||
auto& db = _proxy.get_db().local();
|
||||
auto db = _proxy.data_dictionary();
|
||||
|
||||
try {
|
||||
auto& cf = db.find_column_family(stream_arn);
|
||||
auto cf = db.find_column_family(stream_arn);
|
||||
schema = cf.schema();
|
||||
bs = cdc::get_base_table(_proxy.get_db().local(), *schema);
|
||||
bs = cdc::get_base_table(db.real_database(), *schema);
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
@@ -713,18 +712,18 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
|
||||
}
|
||||
|
||||
auto stream_arn = rjson::get<alternator::stream_arn>(request, "StreamArn");
|
||||
auto& db = _proxy.get_db().local();
|
||||
auto db = _proxy.data_dictionary();
|
||||
|
||||
schema_ptr schema = nullptr;
|
||||
std::optional<shard_id> sid;
|
||||
|
||||
try {
|
||||
auto& cf = db.find_column_family(stream_arn);
|
||||
auto cf = db.find_column_family(stream_arn);
|
||||
schema = cf.schema();
|
||||
sid = rjson::get<shard_id>(request, "ShardId");
|
||||
} catch (...) {
|
||||
}
|
||||
if (!schema || !cdc::get_base_table(db, *schema) || !is_alternator_keyspace(schema->ks_name())) {
|
||||
if (!schema || !cdc::get_base_table(db.real_database(), *schema) || !is_alternator_keyspace(schema->ks_name())) {
|
||||
throw api_error::resource_not_found("Invalid StreamArn");
|
||||
}
|
||||
if (!sid) {
|
||||
@@ -801,12 +800,12 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
throw api_error::validation("Limit must be 1 or more");
|
||||
}
|
||||
|
||||
auto& db = _proxy.get_db().local();
|
||||
auto db = _proxy.data_dictionary();
|
||||
schema_ptr schema, base;
|
||||
try {
|
||||
auto& log_table = db.find_column_family(iter.table);
|
||||
auto log_table = db.find_column_family(iter.table);
|
||||
schema = log_table.schema();
|
||||
base = cdc::get_base_table(db, *schema);
|
||||
base = cdc::get_base_table(db.real_database(), *schema);
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
@@ -1044,7 +1043,7 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
|
||||
}
|
||||
|
||||
if (stream_enabled->GetBool()) {
|
||||
auto& db = sp.get_db().local();
|
||||
auto db = sp.data_dictionary();
|
||||
|
||||
if (!db.features().cluster_supports_cdc()) {
|
||||
throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster.");
|
||||
@@ -1084,8 +1083,8 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
|
||||
void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp) {
|
||||
auto& opts = schema.cdc_options();
|
||||
if (opts.enabled()) {
|
||||
auto& db = sp.get_db().local();
|
||||
auto& cf = db.find_column_family(schema.ks_name(), cdc::log_name(schema.cf_name()));
|
||||
auto db = sp.data_dictionary();
|
||||
auto cf = db.find_table(schema.ks_name(), cdc::log_name(schema.cf_name()));
|
||||
stream_arn arn(cf.schema()->id());
|
||||
rjson::add(descr, "LatestStreamArn", arn);
|
||||
rjson::add(descr, "LatestStreamLabel", rjson::from_string(stream_label(*cf.schema())));
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "service/pager/query_pagers.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "sstables/types.hh"
|
||||
#include "mutation.hh"
|
||||
#include "types.hh"
|
||||
#include "types/map.hh"
|
||||
#include "utils/rjson.hh"
|
||||
@@ -61,7 +62,7 @@ static const sstring TTL_TAG_KEY("system:ttl_attribute");
|
||||
|
||||
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.update_time_to_live++;
|
||||
if (!_proxy.get_db().local().features().cluster_supports_alternator_ttl()) {
|
||||
if (!_proxy.data_dictionary().features().cluster_supports_alternator_ttl()) {
|
||||
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
|
||||
}
|
||||
|
||||
@@ -115,7 +116,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
|
||||
future<executor::request_return_type> executor::describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.describe_time_to_live++;
|
||||
if (!_proxy.get_db().local().features().cluster_supports_alternator_ttl()) {
|
||||
if (!_proxy.data_dictionary().features().cluster_supports_alternator_ttl()) {
|
||||
co_return api_error::unknown_operation("DescribeTimeToLive not yet supported. Experimental support is available if the 'alternator_ttl' experimental feature is enabled on all nodes.");
|
||||
}
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
@@ -165,7 +166,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
|
||||
// like user deletions, will also appear on the CDC log and therefore
|
||||
// Alternator Streams if enabled (FIXME: explain how we mark the
|
||||
// deletion different from user deletes. We don't do it yet.).
|
||||
expiration_service::expiration_service(replica::database& db, service::storage_proxy& proxy)
|
||||
expiration_service::expiration_service(data_dictionary::database db, service::storage_proxy& proxy)
|
||||
: _db(db)
|
||||
, _proxy(proxy)
|
||||
{
|
||||
@@ -629,7 +630,7 @@ static future<> scan_table_ranges(
|
||||
// reboot.
|
||||
static future<bool> scan_table(
|
||||
service::storage_proxy& proxy,
|
||||
replica::database& db,
|
||||
data_dictionary::database db,
|
||||
schema_ptr s,
|
||||
abort_source& abort_source,
|
||||
named_semaphore& page_sem)
|
||||
@@ -681,7 +682,7 @@ static future<bool> scan_table(
|
||||
// FIXME: consider if we should ask the scan without caching?
|
||||
// can we use cache but not fill it?
|
||||
scan_ranges_context scan_ctx{s, proxy, std::move(column_name), std::move(member)};
|
||||
token_ranges_owned_by_this_shard<primary> my_ranges(db, s);
|
||||
token_ranges_owned_by_this_shard<primary> my_ranges(db.real_database(), s);
|
||||
while (std::optional<dht::partition_range> range = my_ranges.next_partition_range()) {
|
||||
// Note that because of issue #9167 we need to run a separate
|
||||
// query on each partition range, and can't pass several of
|
||||
@@ -701,7 +702,7 @@ static future<bool> scan_table(
|
||||
// by tasking another node to take over scanning of the dead node's primary
|
||||
// ranges. What we do here is that this node will also check expiration
|
||||
// on its *secondary* ranges - but only those whose primary owner is down.
|
||||
token_ranges_owned_by_this_shard<secondary> my_secondary_ranges(db, s);
|
||||
token_ranges_owned_by_this_shard<secondary> my_secondary_ranges(db.real_database(), s);
|
||||
while (std::optional<dht::partition_range> range = my_secondary_ranges.next_partition_range()) {
|
||||
dht::partition_range_vector partition_ranges;
|
||||
partition_ranges.push_back(std::move(*range));
|
||||
@@ -718,12 +719,12 @@ future<> expiration_service::run() {
|
||||
// also need to notice when a new table is added, a table is
|
||||
// deleted or when ttl is enabled or disabled for a table!
|
||||
for (;;) {
|
||||
// _db.get_column_families() may change under our feet during a
|
||||
// _db.tables() may change under our feet during a
|
||||
// long-living loop, so we must keep our own copy of the list of
|
||||
// schemas.
|
||||
std::vector<schema_ptr> schemas;
|
||||
for (const auto& cf : _db.get_column_families()) {
|
||||
schemas.push_back(cf.second->schema());
|
||||
for (auto cf : _db.get_tables()) {
|
||||
schemas.push_back(cf.schema());
|
||||
}
|
||||
for (schema_ptr s : schemas) {
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
@@ -27,7 +27,7 @@ namespace alternator {
|
||||
// items in all tables with per-item expiration enabled. Currently, this means
|
||||
// Alternator tables with TTL configured via a UpdateTimeToLeave request.
|
||||
class expiration_service final : public seastar::peering_sharded_service<expiration_service> {
|
||||
replica::database& _db;
|
||||
data_dictionary::database _db;
|
||||
service::storage_proxy& _proxy;
|
||||
// _end is set by start(), and resolves when the the background service
|
||||
// started by it ends. To ask the background service to end, _abort_source
|
||||
@@ -41,7 +41,7 @@ public:
|
||||
// sharded_service<expiration_service>::start() creates this object on
|
||||
// all shards, so calls this constructor on each shard. Later, the
|
||||
// additional start() function should be invoked on all shards.
|
||||
expiration_service(replica::database&, service::storage_proxy&);
|
||||
expiration_service(data_dictionary::database, service::storage_proxy&);
|
||||
future<> start();
|
||||
future<> run();
|
||||
// sharded_service<expiration_service>::stop() calls the following stop()
|
||||
|
||||
@@ -43,6 +43,11 @@ keyspace::user_types() const {
|
||||
return metadata()->user_types();
|
||||
}
|
||||
|
||||
bool
|
||||
keyspace::is_internal() const {
|
||||
return _ops->is_internal(*this);
|
||||
}
|
||||
|
||||
const locator::abstract_replication_strategy&
|
||||
keyspace::get_replication_strategy() const {
|
||||
return _ops->get_replication_strategy(*this);
|
||||
@@ -72,6 +77,11 @@ database::get_keyspaces() const {
|
||||
return _ops->get_keyspaces(*this);
|
||||
}
|
||||
|
||||
std::vector<table>
|
||||
database::get_tables() const {
|
||||
return _ops->get_tables(*this);
|
||||
}
|
||||
|
||||
std::optional<table>
|
||||
database::try_find_table(std::string_view ks, std::string_view table) const {
|
||||
return _ops->try_find_table(*this, ks, table);
|
||||
|
||||
@@ -86,6 +86,7 @@ private:
|
||||
friend class impl;
|
||||
keyspace(const impl* ops, const void* keyspace);
|
||||
public:
|
||||
bool is_internal() const;
|
||||
lw_shared_ptr<keyspace_metadata> metadata() const;
|
||||
const user_types_metadata& user_types() const;
|
||||
const locator::abstract_replication_strategy& get_replication_strategy() const;
|
||||
@@ -102,6 +103,7 @@ public:
|
||||
std::optional<keyspace> try_find_keyspace(std::string_view name) const;
|
||||
bool has_keyspace(std::string_view name) const; // throws no_keyspace
|
||||
std::vector<keyspace> get_keyspaces() const;
|
||||
std::vector<table> get_tables() const;
|
||||
table find_table(std::string_view ks, std::string_view table) const; // throws no_such_column_family
|
||||
table find_column_family(utils::UUID uuid) const; // throws no_such_column_family
|
||||
schema_ptr find_schema(std::string_view ks, std::string_view table) const; // throws no_such_column_family
|
||||
|
||||
@@ -19,11 +19,13 @@ public:
|
||||
virtual ~impl();
|
||||
virtual std::optional<keyspace> try_find_keyspace(database db, std::string_view name) const = 0;
|
||||
virtual std::vector<keyspace> get_keyspaces(database db) const = 0;
|
||||
virtual std::vector<table> get_tables(database db) const = 0;
|
||||
virtual std::optional<table> try_find_table(database db, std::string_view ks, std::string_view tab) const = 0;
|
||||
virtual std::optional<table> try_find_table(database db, utils::UUID id) const = 0;
|
||||
virtual const secondary_index::secondary_index_manager& get_index_manager(table t) const = 0;
|
||||
virtual schema_ptr get_table_schema(table t) const = 0;
|
||||
virtual lw_shared_ptr<keyspace_metadata> get_keyspace_metadata(keyspace ks) const = 0;
|
||||
virtual bool is_internal(keyspace ks) const = 0;
|
||||
virtual const locator::abstract_replication_strategy& get_replication_strategy(keyspace ks) const = 0;
|
||||
virtual const std::vector<view_ptr>& get_table_views(table t) const = 0;
|
||||
virtual sstring get_available_index_name(database db, std::string_view ks_name, std::string_view table_name,
|
||||
|
||||
3
main.cc
3
main.cc
@@ -1391,7 +1391,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// start this outside the Alternator if().
|
||||
if (cfg->check_experimental(db::experimental_features_t::ALTERNATOR_TTL)) {
|
||||
supervisor::notify("starting the expiration service");
|
||||
es.start(std::ref(db), std::ref(proxy)).get();
|
||||
es.start(seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db)),
|
||||
std::ref(proxy)).get();
|
||||
stop_expiration_service = defer_verbose_shutdown("expiration service", [&es] {
|
||||
es.stop().get();
|
||||
});
|
||||
|
||||
@@ -2294,6 +2294,15 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
virtual std::vector<data_dictionary::table> get_tables(data_dictionary::database db) const override {
|
||||
std::vector<data_dictionary::table> ret;
|
||||
auto&& tables = unwrap(db).get_column_families();
|
||||
ret.reserve(tables.size());
|
||||
for (auto&& [uuid, cf] : tables) {
|
||||
ret.push_back(wrap(*cf));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
virtual std::optional<data_dictionary::table> try_find_table(data_dictionary::database db, std::string_view ks, std::string_view table) const override {
|
||||
try {
|
||||
return wrap(unwrap(db).find_column_family(ks, table));
|
||||
@@ -2323,6 +2332,9 @@ public:
|
||||
virtual const locator::abstract_replication_strategy& get_replication_strategy(data_dictionary::keyspace ks) const override {
|
||||
return unwrap(ks).get_replication_strategy();
|
||||
}
|
||||
virtual bool is_internal(data_dictionary::keyspace ks) const override {
|
||||
return is_internal_keyspace(unwrap(ks).metadata()->name());
|
||||
}
|
||||
virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view table_name,
|
||||
std::optional<sstring> index_name_root) const override {
|
||||
return unwrap(db).get_available_index_name(sstring(ks_name), sstring(table_name), index_name_root);
|
||||
|
||||
Reference in New Issue
Block a user