mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-30 11:36:54 +00:00
Currently when a node wants to create and broadcast a new CDC generation it performs the following steps: 1. choose the generation's stream IDs and mapping (how this is done is irrelevant for the current discussion) 2. choose the generation's timestamp by taking the current time (according to its local clock) and adding 2 * ring_delay 3. insert the generation's data (mapping and stream IDs) into system_distributed.cdc_generation_descriptions, using the generation's timestamp as the partition key (we call this table the "old internal table" below) 4. insert the generation's timestamp into the "CDC_STREAMS_TIMESTAMP" application state. The timestamp spreads epidemically through the gossip protocol. When nodes see the timestamp, they retrieve the generation data from the old internal table. Unfortunately, due to the schema of the old internal table, where the entire generation data is stored in a single cell, step 3 may fail for sufficiently large generations (there is a size threshold for which step 3 will always fail - retrying the operation won't help). Also the old internal table lies in the system_distributed keyspace that uses SimpleStrategy with replication factor 3, which is also problematic; for example, when nodes restart, they must reach at least 2 out of these 3 specific replicas in order to retrieve the current generation (we write and read the generation data with QUORUM, unless we're a single-node cluster, where we use ONE). Until this happens, a restarting node can't coordinate writes to CDC-enabled tables. It would be better if the node could access the last known generation locally. The commit introduces a new table for broadcasting generation data with the following properties: - it uses a better schema that stores the data in multiple rows, each of manageable size - it resides in the `system_distributed_everywhere` keyspace so the data will be written to every node in the cluster that has a token in the token ring - the data will be written using CL=ALL and read using CL=ONE; thanks to this, restarting node won't have to communicate with other nodes to retrieve the data of the last known generation. Note that writing with CL=ALL does not reduce availability: creating a new generation *requires* all nodes to be available anyway, because they must learn about the generation before their clocks go past the generation's timestamp; if they don't, partitions won't be mapped to stream IDs consistently across the cluster - the partition key is no longer the generation's timestamp. Because it was that way in the old internal table, it forced the algorithm to choose the timestamp *before* the generation data was inserted into the table. What if the inserting took a long time? It increased the chance that nodes would learn about the generation too late (after their clocks moved past its timestamp). With the new schema we will first insert the generation data using a randomly generated UUID as the partition key, *then* choose the timestamp, then gossip both the timestamp and the UUID. The timestamp and the UUID form the "generation identifier" of this new generation; this should explain why we introduced the generation_id_v2 type in previous commits. Observe that after a node learns about a generation broadcasted using this new method through gossip it will retrieve its data very quickly since it's one of the replicas and it can use CL=ONE as it was written using CL=ALL. Note that the node is still using the old method - the actual switch will be done in a later commit.
132 lines
4.9 KiB
C++
132 lines
4.9 KiB
C++
/*
|
|
* Copyright (C) 2018 ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* This file is part of Scylla.
|
|
*
|
|
* Scylla is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Scylla is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "bytes.hh"
|
|
#include "schema_fwd.hh"
|
|
#include "service/migration_manager.hh"
|
|
#include "service/qos/qos_common.hh"
|
|
#include "utils/UUID.hh"
|
|
#include "cdc/generation_id.hh"
|
|
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
|
|
#include <unordered_map>
|
|
|
|
namespace cql3 {
|
|
class query_processor;
|
|
}
|
|
|
|
namespace cdc {
|
|
class stream_id;
|
|
class topology_description;
|
|
class streams_version;
|
|
} // namespace cdc
|
|
|
|
namespace service {
|
|
class storage_proxy;
|
|
}
|
|
|
|
namespace db {
|
|
|
|
class system_distributed_keyspace {
|
|
public:
|
|
static constexpr auto NAME = "system_distributed";
|
|
static constexpr auto NAME_EVERYWHERE = "system_distributed_everywhere";
|
|
|
|
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
|
static constexpr auto SERVICE_LEVELS = "service_levels";
|
|
|
|
/* Nodes use this table to communicate new CDC stream generations to other nodes. */
|
|
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generation_descriptions";
|
|
|
|
/* Nodes use this table to communicate new CDC stream generations to other nodes.
|
|
* Resides in system_distributed_everywhere. */
|
|
static constexpr auto CDC_GENERATIONS_V2 = "cdc_generation_descriptions_v2";
|
|
|
|
/* This table is used by CDC clients to learn about available CDC streams. */
|
|
static constexpr auto CDC_DESC_V2 = "cdc_streams_descriptions_v2";
|
|
|
|
/* Used by CDC clients to learn CDC generation timestamps. */
|
|
static constexpr auto CDC_TIMESTAMPS = "cdc_generation_timestamps";
|
|
|
|
/* Previous version of the "cdc_streams_descriptions_v2" table.
|
|
* We use it in the upgrade procedure to ensure that CDC generations appearing
|
|
* in the old table also appear in the new table, if necessary. */
|
|
static constexpr auto CDC_DESC_V1 = "cdc_streams_descriptions";
|
|
|
|
/* Information required to modify/query some system_distributed tables, passed from the caller. */
|
|
struct context {
|
|
/* How many different token owners (endpoints) are there in the token ring? */
|
|
size_t num_token_owners;
|
|
};
|
|
private:
|
|
cql3::query_processor& _qp;
|
|
service::migration_manager& _mm;
|
|
service::storage_proxy& _sp;
|
|
|
|
bool _started = false;
|
|
bool _forced_cdc_timestamps_schema_sync = false;
|
|
|
|
public:
|
|
/* Should writes to the given table always be synchronized by commitlog (flushed to disk)
|
|
* before being acknowledged? */
|
|
static bool is_extra_durable(const sstring& cf_name);
|
|
|
|
system_distributed_keyspace(cql3::query_processor&, service::migration_manager&, service::storage_proxy&);
|
|
|
|
future<> start();
|
|
future<> stop();
|
|
|
|
bool started() const { return _started; }
|
|
|
|
future<std::unordered_map<utils::UUID, sstring>> view_status(sstring ks_name, sstring view_name) const;
|
|
future<> start_view_build(sstring ks_name, sstring view_name) const;
|
|
future<> finish_view_build(sstring ks_name, sstring view_name) const;
|
|
future<> remove_view(sstring ks_name, sstring view_name) const;
|
|
|
|
future<> insert_cdc_topology_description(cdc::generation_id_v1, const cdc::topology_description&, context);
|
|
future<std::optional<cdc::topology_description>> read_cdc_topology_description(cdc::generation_id_v1, context);
|
|
|
|
future<> insert_cdc_generation(utils::UUID, const cdc::topology_description&, context);
|
|
future<std::optional<cdc::topology_description>> read_cdc_generation(utils::UUID);
|
|
|
|
future<> create_cdc_desc(db_clock::time_point, const cdc::topology_description&, context);
|
|
future<bool> cdc_desc_exists(db_clock::time_point, context);
|
|
|
|
/* Get all generation timestamps appearing in the "cdc_streams_descriptions" table
|
|
* (the old CDC stream description table). */
|
|
future<std::vector<db_clock::time_point>> get_cdc_desc_v1_timestamps(context);
|
|
|
|
future<std::map<db_clock::time_point, cdc::streams_version>> cdc_get_versioned_streams(db_clock::time_point not_older_than, context);
|
|
|
|
future<db_clock::time_point> cdc_current_generation_timestamp(context);
|
|
|
|
future<qos::service_levels_info> get_service_levels() const;
|
|
future<qos::service_levels_info> get_service_level(sstring service_level_name) const;
|
|
future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const;
|
|
future<> drop_service_level(sstring service_level_name) const;
|
|
};
|
|
|
|
}
|