Compare commits
4 Commits
copilot/fi
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0afb6b8fb6 | ||
|
|
03733ffa1b | ||
|
|
780243790e | ||
|
|
1125175b51 |
@@ -2223,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
|
||||
|
||||
// The put_or_delete_item class builds the mutations needed by the PutItem and
|
||||
// DeleteItem operations - either as stand-alone commands or part of a list
|
||||
// of commands in BatchWriteItem.
|
||||
// of commands in BatchWriteItems.
|
||||
// put_or_delete_item splits each operation into two stages: Constructing the
|
||||
// object parses and validates the user input (throwing exceptions if there
|
||||
// are input errors). Later, build() generates the actual mutation, with a
|
||||
// specified timestamp. This split is needed because of the peculiar needs of
|
||||
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
|
||||
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
|
||||
// any writing happens (if one of the commands has an error, none of the
|
||||
// writes should be done). LWT makes it impossible for the parse step to
|
||||
// generate "mutation" objects, because the timestamp still isn't known.
|
||||
@@ -3026,7 +3026,7 @@ struct primary_key_equal {
|
||||
};
|
||||
|
||||
// This is a cas_request subclass for applying given put_or_delete_items to
|
||||
// one partition using LWT as part as BatchWriteItem. This is a write-only
|
||||
// one partition using LWT as part as BatchWriteItems. This is a write-only
|
||||
// operation, not needing the previous value of the item (the mutation to be
|
||||
// done is known prior to starting the operation). Nevertheless, we want to
|
||||
// do this mutation via LWT to ensure that it is serialized with other LWT
|
||||
@@ -3065,7 +3065,7 @@ static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, serv
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
|
||||
timeout, timeout, true, std::move(cdc_opts)).discard_result();
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
|
||||
// does not need to support conditional updates.
|
||||
}
|
||||
|
||||
|
||||
@@ -1085,6 +1085,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'db/view/view_building_task_mutation_builder.cc',
|
||||
'db/virtual_table.cc',
|
||||
'db/virtual_tables.cc',
|
||||
'db/system0_virtual_tables.cc',
|
||||
'db/tablet_options.cc',
|
||||
'db/object_storage_endpoint_param.cc',
|
||||
'index/secondary_index_manager.cc',
|
||||
|
||||
@@ -248,7 +248,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
|
||||
// which is larger than the segment ID of the RP of the last written hint.
|
||||
cfg.base_segment_id = _last_written_rp.base_id();
|
||||
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (this auto, commitlog l) -> future<commitlog> {
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) -> future<commitlog> {
|
||||
// add_store() is triggered every time hint files are forcefully flushed to I/O (every hints_flush_period).
|
||||
// When this happens we want to refill _sender's segments only if it has finished with the segments he had before.
|
||||
if (_sender.have_segments()) {
|
||||
|
||||
201
db/system0_virtual_tables.cc
Normal file
201
db/system0_virtual_tables.cc
Normal file
@@ -0,0 +1,201 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "db/system0_virtual_tables.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/virtual_table.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "replica/tablets.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/log.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace {
|
||||
|
||||
static constexpr auto SYSTEM0_KEYSPACE_NAME = "system0";
|
||||
|
||||
logging::logger sys0log("system0_virtual_tables");
|
||||
|
||||
// Virtual table that mirrors system.topology but allows writes via group0
|
||||
class system0_topology_table : public memtable_filling_virtual_table {
|
||||
private:
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
public:
|
||||
explicit system0_topology_table(cql3::query_processor& qp)
|
||||
: memtable_filling_virtual_table(build_schema())
|
||||
, _qp(qp)
|
||||
{}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
// Use the same schema as system.topology but in system0 keyspace
|
||||
auto id = generate_legacy_id(SYSTEM0_KEYSPACE_NAME, system_keyspace::TOPOLOGY);
|
||||
return schema_builder(SYSTEM0_KEYSPACE_NAME, system_keyspace::TOPOLOGY, std::optional(id))
|
||||
.with_column("key", utf8_type, column_kind::partition_key)
|
||||
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("datacenter", utf8_type)
|
||||
.with_column("rack", utf8_type)
|
||||
.with_column("tokens", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("node_state", utf8_type)
|
||||
.with_column("release_version", utf8_type)
|
||||
.with_column("topology_request", utf8_type)
|
||||
.with_column("replaced_id", uuid_type)
|
||||
.with_column("rebuild_option", utf8_type)
|
||||
.with_column("num_tokens", int32_type)
|
||||
.with_column("tokens_string", utf8_type)
|
||||
.with_column("shard_count", int32_type)
|
||||
.with_column("ignore_msb", int32_type)
|
||||
.with_column("cleanup_status", utf8_type)
|
||||
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("request_id", timeuuid_type)
|
||||
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
|
||||
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
|
||||
.with_column("version", long_type, column_kind::static_column)
|
||||
.with_column("fence_version", long_type, column_kind::static_column)
|
||||
.with_column("transition_state", utf8_type, column_kind::static_column)
|
||||
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
||||
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
||||
.with_column("global_topology_request", utf8_type, column_kind::static_column)
|
||||
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
|
||||
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
|
||||
.with_column("session", uuid_type, column_kind::static_column)
|
||||
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
||||
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
||||
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.set_comment("Virtual table for updating system.topology via group0")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
future<> execute(std::function<void(mutation)> mutation_sink) override {
|
||||
// For reads, we mirror the actual system.topology table
|
||||
// This is a simplified placeholder implementation
|
||||
sys0log.debug("system0.topology: read operation");
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> apply(const frozen_mutation& fm) override {
|
||||
sys0log.info("system0.topology: received write operation");
|
||||
|
||||
// Convert mutation from system0.topology schema to system.topology schema
|
||||
const mutation m = fm.unfreeze(_s);
|
||||
|
||||
// Re-freeze the mutation with the system.topology schema
|
||||
auto system_topology_schema = system_keyspace::topology();
|
||||
mutation target_m(system_topology_schema, m.key());
|
||||
target_m.partition() = m.partition();
|
||||
|
||||
// TODO: Submit mutation to group0 via raft_group0_client
|
||||
// For now, just log a warning
|
||||
sys0log.warn("system0.topology: write operations require group0 integration (not yet implemented)");
|
||||
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
// Virtual table that mirrors system.tablets but allows writes via group0
|
||||
class system0_tablets_table : public memtable_filling_virtual_table {
|
||||
private:
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
public:
|
||||
explicit system0_tablets_table(cql3::query_processor& qp)
|
||||
: memtable_filling_virtual_table(build_schema())
|
||||
, _qp(qp)
|
||||
{}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
// Create a simple schema for tablets in system0 keyspace
|
||||
// This mirrors system.tablets structure
|
||||
auto id = generate_legacy_id(SYSTEM0_KEYSPACE_NAME, system_keyspace::TABLETS);
|
||||
auto replica_set_type = replica::get_replica_set_type();
|
||||
|
||||
return schema_builder(SYSTEM0_KEYSPACE_NAME, system_keyspace::TABLETS, id)
|
||||
.with_column("table_id", uuid_type, column_kind::partition_key)
|
||||
.with_column("tablet_count", int32_type, column_kind::static_column)
|
||||
.with_column("keyspace_name", utf8_type, column_kind::static_column)
|
||||
.with_column("table_name", utf8_type, column_kind::static_column)
|
||||
.with_column("last_token", long_type, column_kind::clustering_key)
|
||||
.with_column("replicas", replica_set_type)
|
||||
.with_column("new_replicas", replica_set_type)
|
||||
.with_column("stage", utf8_type)
|
||||
.with_column("transition", utf8_type)
|
||||
.with_column("session", uuid_type)
|
||||
.set_comment("Virtual table for updating system.tablets via group0")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
future<> execute(std::function<void(mutation)> mutation_sink) override {
|
||||
// For reads, we mirror the actual system.tablets table
|
||||
// This is a simplified placeholder implementation
|
||||
sys0log.debug("system0.tablets: read operation");
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> apply(const frozen_mutation& fm) override {
|
||||
sys0log.info("system0.tablets: received write operation");
|
||||
|
||||
// Convert mutation from system0.tablets schema to system.tablets schema
|
||||
const mutation m = fm.unfreeze(_s);
|
||||
|
||||
// Re-freeze the mutation with the system.tablets schema
|
||||
auto system_tablets_schema = system_keyspace::tablets();
|
||||
mutation target_m(system_tablets_schema, m.key());
|
||||
target_m.partition() = m.partition();
|
||||
|
||||
// TODO: Submit mutation to group0 via raft_group0_client
|
||||
// For now, just log a warning
|
||||
sys0log.warn("system0.tablets: write operations require group0 integration (not yet implemented)");
|
||||
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
future<> initialize_system0_virtual_tables(
|
||||
sharded<service::raft_group_registry>& dist_raft_gr,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<cql3::query_processor>& qp) {
|
||||
|
||||
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
|
||||
auto& virtual_tables = *virtual_tables_registry;
|
||||
auto& db = sys_ks.local().local_db();
|
||||
|
||||
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
|
||||
auto schema = tbl->schema();
|
||||
virtual_tables[schema->id()] = std::move(tbl);
|
||||
|
||||
// Add the table as a local system table (similar to regular virtual tables)
|
||||
// Note: This creates tables in the system0 keyspace which is treated as internal
|
||||
co_await db.add_column_family_and_make_directory(schema, replica::database::is_new_cf::yes);
|
||||
|
||||
auto& cf = db.find_column_family(schema);
|
||||
cf.mark_ready_for_writes(nullptr);
|
||||
auto& vt = virtual_tables[schema->id()];
|
||||
cf.set_virtual_reader(vt->as_mutation_source());
|
||||
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
|
||||
};
|
||||
|
||||
// Add system0 virtual tables
|
||||
co_await add_table(std::make_unique<system0_topology_table>(qp.local()));
|
||||
co_await add_table(std::make_unique<system0_tablets_table>(qp.local()));
|
||||
|
||||
sys0log.info("system0 virtual tables initialized");
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
33
db/system0_virtual_tables.hh
Normal file
33
db/system0_virtual_tables.hh
Normal file
@@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "schema/schema_fwd.hh"
|
||||
|
||||
namespace service {
|
||||
class raft_group_registry;
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_keyspace;
|
||||
|
||||
// Initialize virtual tables in the system0 keyspace which mirror group0 tables
|
||||
// from the system keyspace but allow writes via group0.
|
||||
future<> initialize_system0_virtual_tables(
|
||||
sharded<service::raft_group_registry>& dist_raft_gr,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<cql3::query_processor>& qp);
|
||||
|
||||
} // namespace db
|
||||
6
main.cc
6
main.cc
@@ -108,6 +108,7 @@
|
||||
#include "lang/manager.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "db/virtual_tables.hh"
|
||||
#include "db/system0_virtual_tables.hh"
|
||||
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
@@ -1836,6 +1837,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg);
|
||||
}).get();
|
||||
|
||||
checkpoint(stop_signal, "initializing system0 virtual tables");
|
||||
smp::invoke_on_all([&] {
|
||||
return db::initialize_system0_virtual_tables(raft_gr, sys_ks, qp);
|
||||
}).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
sstables::init_metrics().get();
|
||||
|
||||
@@ -50,7 +50,8 @@ static const std::unordered_set<std::string_view> internal_keyspaces = {
|
||||
db::system_keyspace::NAME,
|
||||
db::schema_tables::NAME,
|
||||
auth::meta::legacy::AUTH_KS,
|
||||
tracing::trace_keyspace_helper::KEYSPACE_NAME
|
||||
tracing::trace_keyspace_helper::KEYSPACE_NAME,
|
||||
"system0" // Virtual keyspace for group0 table updates
|
||||
};
|
||||
|
||||
bool is_internal_keyspace(std::string_view name) {
|
||||
|
||||
@@ -205,7 +205,7 @@ def test_batch_write_invalid_operation(test_table_s):
|
||||
|
||||
# In test_item.py we have a bunch of test_empty_* tests on different ways to
|
||||
# create an empty item (which in Scylla requires the special CQL row marker
|
||||
# to be supported correctly). BatchWriteItem provides yet another way of
|
||||
# to be supported correctly). BatchWriteItems provides yet another way of
|
||||
# creating items, so check the empty case here too:
|
||||
def test_empty_batch_write(test_table):
|
||||
p = random_string()
|
||||
@@ -214,7 +214,7 @@ def test_empty_batch_write(test_table):
|
||||
batch.put_item({'p': p, 'c': c})
|
||||
assert test_table.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item'] == {'p': p, 'c': c}
|
||||
|
||||
# Test that BatchWriteItem allows writing to multiple tables in one operation
|
||||
# Test that BatchWriteItems allows writing to multiple tables in one operation
|
||||
def test_batch_write_multiple_tables(test_table_s, test_table):
|
||||
p1 = random_string()
|
||||
c1 = random_string()
|
||||
|
||||
Reference in New Issue
Block a user