The batchlog table contains an entry for each logged batch that is processed by the local node as coordinator. These entries are typically very short lived, they are inserted when the batch is processed and deleted immediately after the batch is successfully applied.
When a table has `tombstone_gc = {'mode': 'repair'}` enabled, every repair has to flush all hints and batchlogs, so that we can be certain that there is no live data in any of these, older than the last repair. Since batches can contain member queries from any number of tables, the whole batchlog has to be flushed, even if repair-mode tombstone-gc is enabled for a single table.
Flushing the batchlog table happens by doing a batchlog replay. This involves reading the entire content of this table, and attempting to replay+delete any live entries (that are old enough to be replayed). Under normal operating circumstances, 99%+ of the content of the batchlog table is partition tombstones. Because of this, scanning the content of this table has to process thousands to millions of tombstones. This was observed to require up to 20 minutes to finish, causing repairs to slow down to a crawl, as the batchlog-flush has to be repeated at the end of the repair of each token-range.
When trying to address this problem, the first idea was that we should expedite the garbage-collection of these accumulated tombstones. This experiment failed, see https://github.com/scylladb/scylladb/pull/23752. The commitlog proved to be an impossible to bypass barrier, preventing quick garbage-collection of tombstones. So long as a single commit-log segment is alive, holding content from the batchlog table, all tombstones written after are blocked from GC.
The second approach, represented by this PR, is to not rely in tombstone GC to reduce the tombstone amount. Instead restructure the table such that a single higher-order tombstone can be used to shadow and allow for the eviction of the myriads of individual batchlog entry tombstones. This is realized by reorganizing the batchlog table such that individual batches are rows, not partitions.
This new schema is introduced by the new `system.batchlog_v2` table, introduced by this PR:
CREATE TABLE system.batchlog_v2 (
version int,
stage int,
shard int,
written_at timestamp,
id uuid,
data blob,
PRIMARY KEY ((version, stage, shard), written_at, id));
The new schema organization has the following goals:
1) Make post-replay batchlog cleanup possible with a simple range-tombstone. This allows dropping the individual dead batchlog entries, as they are shadowed by a higher level tombstone. This enables dropping tombstones without tombstone GC.
2) To make the above possible, introduce the stage key component: batchlog entries that fail the first replay attempt, are moved to the failed_replay stage, so the initial stage can be cleaned up safely.
3) Spread out the data among Scylla shards, via the batchlog shard column.
4) Make batchlog entries ordered by the batchlog create time (id). This allows for selecting batchlogs to replay, without post-filtering of batchlogs that are too young to be replayed.
Fixes: https://github.com/scylladb/scylladb/issues/23358
This is an improvement, normally not a backport-candidate. We might override this and backport to allow wider use of `tombstone_gc: {'mode': 'repair'}`.
Closes scylladb/scylladb#26671
* github.com:scylladb/scylladb:
db/config: change batchlog_replay_cleanup_after_replays default to 1
test/boost/batchlog_manager_test: add test for batchlog cleanup
replica/mutation_dump: always set position weight for clustering positions
service/storage_proxy: s/batch_replay_throw/storage_proxy_fail_replay_batch/
test/lib: introduce error_injection.hh
utils/error_injection: add debug log to disable() and disable_all()
test/lib/cql_test_env: forward config to batchlog
test/lib/cql_test_env: add batch type to execute_batch()
test/lib/cql_assertions: add with_size(predicate) overload
test/lib/cql_assertions: add source location to fail messages
test/lib/cql_assertions: columns_assertions: add assert_for_columns_of_each_row()
test/lib/cql_assertions: rows_assertions::assert_for_columns_of_row(): add index bound check
test/lib/cql_assertions: columns_assertions: add T* with_typed_column() overload
db/batchlog_manager: config: s/write_timeout/reply_timeot/
db,service: switch to system.batchlog_v2
db/system_keyspace: introduce system.batchlog_v2
service,db: extract generation of batchlog delete mutation
service,db: extract get_batchlog_mutation_for() from storage-proxy
db/batchlog_manager: only consider propagation delay with tombstone-gc=repair
db/batchlog_manager: don't drop entire batch if one mutations' table was dropped
data_dictionary: table: add get_truncation_time()
db/batchlog_manager: batch(): replace map_reduce() with simple loop
db/batchlog_manager: finish coroutinizing replay_all_failed_batches
db/batchlog_manager: improve replayAllFailedBatches logs
231 lines
7.6 KiB
C++
231 lines
7.6 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <functional>
|
|
#include <vector>
|
|
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
|
|
#include "db/view/view_building_worker.hh"
|
|
#include "db/view/view_update_generator.hh"
|
|
#include "service/qos/service_level_controller.hh"
|
|
#include "replica/database.hh"
|
|
#include "transport/messages/result_message_base.hh"
|
|
#include "cql3/query_options_fwd.hh"
|
|
#include "cql3/values.hh"
|
|
#include "cql3/prepared_statements_cache.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "cql3/statements/batch_statement.hh"
|
|
#include "bytes.hh"
|
|
#include "schema/schema.hh"
|
|
#include "service/tablet_allocator.hh"
|
|
#include "vector_search/vector_store_client.hh"
|
|
|
|
namespace replica {
|
|
class database;
|
|
}
|
|
|
|
namespace db {
|
|
class batchlog_manager;
|
|
}
|
|
|
|
namespace db::view {
|
|
class view_builder;
|
|
class view_update_generator;
|
|
}
|
|
|
|
namespace auth {
|
|
class service;
|
|
}
|
|
|
|
namespace cql3 {
|
|
class query_processor;
|
|
}
|
|
|
|
namespace utils {
|
|
class disk_space_monitor;
|
|
}
|
|
|
|
namespace service {
|
|
|
|
class client_state;
|
|
class migration_manager;
|
|
class raft_group0_client;
|
|
class raft_group_registry;
|
|
|
|
}
|
|
|
|
class not_prepared_exception : public std::runtime_error {
|
|
public:
|
|
not_prepared_exception(const cql3::prepared_cache_key_type& id) : std::runtime_error(format("Not prepared: {}", id)) {}
|
|
};
|
|
|
|
namespace db {
|
|
class config;
|
|
}
|
|
|
|
struct scheduling_groups {
|
|
scheduling_group compaction_scheduling_group;
|
|
scheduling_group memory_compaction_scheduling_group;
|
|
scheduling_group streaming_scheduling_group;
|
|
scheduling_group statement_scheduling_group;
|
|
scheduling_group memtable_scheduling_group;
|
|
scheduling_group memtable_to_cache_scheduling_group;
|
|
scheduling_group gossip_scheduling_group;
|
|
};
|
|
|
|
// Creating and destroying scheduling groups on each env setup and teardown
|
|
// doesn't work because it messes up execution stages due to scheduling groups
|
|
// having the same name but not having the same id on each run. So they are
|
|
// created once and used across all envs. This method allows retrieving them to
|
|
// be used in tests.
|
|
// Not thread safe!
|
|
future<scheduling_groups> get_scheduling_groups();
|
|
|
|
class cql_test_config {
|
|
public:
|
|
seastar::shared_ptr<db::config> db_config;
|
|
// Scheduling groups are overwritten unconditionally, see get_scheduling_groups().
|
|
std::optional<replica::database_config> dbcfg;
|
|
std::set<sstring> disabled_features;
|
|
std::optional<cql3::query_processor::memory_config> qp_mcfg;
|
|
bool need_remote_proxy = false;
|
|
std::optional<uint64_t> initial_tablets; // When engaged, the default keyspace will use tablets.
|
|
locator::host_id host_id;
|
|
gms::inet_address broadcast_address = gms::inet_address("localhost");
|
|
bool ms_listen = false;
|
|
bool run_with_raft_recovery = false;
|
|
bool clean_data_dir_before_test = true;
|
|
|
|
std::optional<db_clock::duration> batchlog_replay_timeout;
|
|
std::chrono::milliseconds batchlog_delay = std::chrono::milliseconds(0);
|
|
|
|
std::optional<timeout_config> query_timeout;
|
|
|
|
cql_test_config();
|
|
cql_test_config(const cql_test_config&);
|
|
cql_test_config(shared_ptr<db::config>);
|
|
~cql_test_config();
|
|
};
|
|
|
|
struct cql_test_init_configurables {
|
|
db::extensions& extensions;
|
|
};
|
|
|
|
class cql_test_env {
|
|
public:
|
|
virtual ~cql_test_env() {};
|
|
|
|
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(std::string_view text) = 0;
|
|
|
|
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(
|
|
std::string_view text, std::unique_ptr<cql3::query_options> qo) = 0;
|
|
|
|
/// Processes queries (which must be modifying queries) as a batch.
|
|
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_batch(
|
|
const std::vector<std::string_view>& queries, cql3::statements::batch_statement::type batch_type, std::unique_ptr<cql3::query_options> qo) = 0;
|
|
|
|
virtual future<cql3::prepared_cache_key_type> prepare(sstring query) = 0;
|
|
|
|
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared(
|
|
cql3::prepared_cache_key_type id,
|
|
cql3::raw_value_vector_with_unset values,
|
|
db::consistency_level cl = db::consistency_level::ONE) = 0;
|
|
|
|
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared_with_qo(
|
|
cql3::prepared_cache_key_type id,
|
|
std::unique_ptr<cql3::query_options> qo) = 0;
|
|
|
|
virtual future<utils::chunked_vector<mutation>> get_modification_mutations(const sstring& text) = 0;
|
|
|
|
virtual future<> create_table(std::function<schema(std::string_view)> schema_maker) = 0;
|
|
|
|
virtual service::client_state& local_client_state() = 0;
|
|
|
|
virtual replica::database& local_db() = 0;
|
|
|
|
virtual sharded<locator::shared_token_metadata>& shared_token_metadata() = 0;
|
|
|
|
virtual cql3::query_processor& local_qp() = 0;
|
|
|
|
virtual sharded<replica::database>& db() = 0;
|
|
|
|
virtual sharded<cql3::query_processor> & qp() = 0;
|
|
|
|
virtual auth::service& local_auth_service() = 0;
|
|
|
|
virtual sharded<db::view::view_builder>& view_builder() = 0;
|
|
virtual db::view::view_builder& local_view_builder() = 0;
|
|
|
|
virtual sharded<db::view::view_building_worker>& view_building_worker() = 0;
|
|
|
|
virtual db::view::view_update_generator& local_view_update_generator() = 0;
|
|
|
|
virtual service::migration_notifier& local_mnotifier() = 0;
|
|
|
|
virtual sharded<service::migration_manager>& migration_manager() = 0;
|
|
|
|
virtual sharded<db::batchlog_manager>& batchlog_manager() = 0;
|
|
|
|
virtual sharded<netw::messaging_service>& get_messaging_service() = 0;
|
|
|
|
virtual sharded<gms::gossiper>& gossiper() = 0;
|
|
|
|
virtual future<> refresh_client_state() = 0;
|
|
|
|
virtual service::raft_group0_client& get_raft_group0_client() = 0;
|
|
|
|
virtual sharded<service::raft_group_registry>& get_raft_group_registry() = 0;
|
|
|
|
virtual sharded<db::system_keyspace>& get_system_keyspace() = 0;
|
|
|
|
virtual sharded<service::tablet_allocator>& get_tablet_allocator() = 0;
|
|
|
|
virtual sharded<service::storage_proxy>& get_storage_proxy() = 0;
|
|
|
|
virtual sharded<gms::feature_service>& get_feature_service() = 0;
|
|
|
|
virtual sharded<sstables::storage_manager>& get_sstorage_manager() = 0;
|
|
|
|
virtual sharded<service::storage_service>& get_storage_service() = 0;
|
|
|
|
virtual sharded<tasks::task_manager>& get_task_manager() = 0;
|
|
|
|
virtual sharded<locator::shared_token_metadata>& get_shared_token_metadata() = 0;
|
|
|
|
virtual sharded<service::topology_state_machine>& get_topology_state_machine() = 0;
|
|
|
|
data_dictionary::database data_dictionary();
|
|
|
|
virtual sharded<qos::service_level_controller>& service_level_controller_service() = 0;
|
|
|
|
// Call only on shard0.
|
|
virtual utils::disk_space_monitor& disk_space_monitor() = 0;
|
|
|
|
virtual db::config& db_config() = 0;
|
|
|
|
virtual sharded<vector_search::vector_store_client>& vector_store_client() = 0;
|
|
|
|
virtual sharded<auth::cache>& auth_cache() = 0;
|
|
};
|
|
|
|
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_config = {}, std::optional<cql_test_init_configurables> = {});
|
|
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config = {}, thread_attributes thread_attr = {}, std::optional<cql_test_init_configurables> = {});
|
|
|
|
void do_with_cql_env_noreentrant_in_thread(std::function<future<>(cql_test_env&)> func, cql_test_config = {}, std::optional<cql_test_init_configurables> = {});
|
|
|
|
// this function should be called in seastar thread
|
|
void do_with_mc(cql_test_env& env, std::function<void(service::group0_batch&)> func);
|
|
|
|
reader_permit make_reader_permit(cql_test_env&);
|