Compare commits
43 Commits
copilot/up
...
scylla-6.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93700ff5d1 | ||
|
|
5e2b4a0e80 | ||
|
|
bb5dc0771c | ||
|
|
9ed8519362 | ||
|
|
077d7c06a0 | ||
|
|
5a1575678b | ||
|
|
2401f7f9ca | ||
|
|
906d085289 | ||
|
|
34dd3a6daa | ||
|
|
3afa8ee2ca | ||
|
|
3347152ff9 | ||
|
|
ff7bd937e2 | ||
|
|
50ea1dbe32 | ||
|
|
45125c4d7d | ||
|
|
9207f7823d | ||
|
|
711864687f | ||
|
|
faf11e5bc3 | ||
|
|
f9215b4d7e | ||
|
|
469ac9976a | ||
|
|
d341f1ef1e | ||
|
|
07dfcd1f64 | ||
|
|
f8d63b5572 | ||
|
|
ca83da91d1 | ||
|
|
f55081fb1a | ||
|
|
aa8cdec5bd | ||
|
|
75a2484dba | ||
|
|
37387135b4 | ||
|
|
ac24ab5141 | ||
|
|
729dc03e0c | ||
|
|
9d64ced982 | ||
|
|
ea6349a6f5 | ||
|
|
ed9122a84e | ||
|
|
c7d6b4a194 | ||
|
|
a35e138b22 | ||
|
|
3db67faa8a | ||
|
|
6a12174e2d | ||
|
|
ca0096ccb8 | ||
|
|
a71d4bc49c | ||
|
|
749399e4b8 | ||
|
|
bdd97b2950 | ||
|
|
1a056f0cab | ||
|
|
cf78a2caca | ||
|
|
cbc53f0e81 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.2.0-dev
|
||||
VERSION=6.2.0-rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2195,7 +2195,6 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
mutation_builders.reserve(request_items.MemberCount());
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
batch_size++;
|
||||
schema_ptr schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
@@ -2216,6 +2215,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else if (r_name == "DeleteRequest") {
|
||||
const rjson::value& key = (r->value)["Key"];
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
@@ -2226,6 +2226,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("Unknown BatchWriteItem request type: {}", r_name));
|
||||
}
|
||||
@@ -3483,7 +3484,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
};
|
||||
std::vector<table_requests> requests;
|
||||
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
|
||||
@@ -3497,6 +3498,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rs.add(key);
|
||||
check_key(key, rs.schema);
|
||||
}
|
||||
batch_size += rs.requests.size();
|
||||
requests.emplace_back(std::move(rs));
|
||||
}
|
||||
|
||||
@@ -3504,7 +3506,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
co_await verify_permission(client_state, tr.schema, auth::permission::SELECT);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += requests.size();
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
// If we got here, all "requests" are valid, so let's start the
|
||||
// requests for the different partitions all in parallel.
|
||||
std::vector<future<std::vector<rjson::value>>> response_futures;
|
||||
|
||||
@@ -29,8 +29,6 @@ stats::stats() : api_operations{} {
|
||||
seastar::metrics::description("Latency summary of an operation via Alternator API"), [this]{return to_metrics_summary(api_operations.name.summary());})(op(CamelCaseName)).set_skip_when_empty(),
|
||||
OPERATION(batch_get_item, "BatchGetItem")
|
||||
OPERATION(batch_write_item, "BatchWriteItem")
|
||||
OPERATION(batch_get_item_batch_total, "BatchGetItemSize")
|
||||
OPERATION(batch_write_item_batch_total, "BatchWriteItemSize")
|
||||
OPERATION(create_backup, "CreateBackup")
|
||||
OPERATION(create_global_table, "CreateGlobalTable")
|
||||
OPERATION(create_table, "CreateTable")
|
||||
@@ -98,6 +96,10 @@ stats::stats() : api_operations{} {
|
||||
seastar::metrics::description("number of rows read and matched during filtering operations")),
|
||||
seastar::metrics::make_total_operations("filtered_rows_dropped_total", [this] { return cql_stats.filtered_rows_read_total - cql_stats.filtered_rows_matched_total; },
|
||||
seastar::metrics::description("number of rows read and dropped during filtering operations")),
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchWriteItem")},
|
||||
api_operations.batch_write_item_batch_total).set_skip_when_empty(),
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchGetItem")},
|
||||
api_operations.batch_get_item_batch_total).set_skip_when_empty(),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -296,7 +296,8 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
// When trimming, let's keep sstables with overlapping time window, so as to reduce write amplification.
|
||||
// For example, if there are N sstables spanning window W, where N <= 32, then we can produce all data for W
|
||||
// in a single compaction round, removing the need to later compact W to reduce its number of files.
|
||||
boost::partial_sort(multi_window, multi_window.begin() + max_sstables, [](const shared_sstable &a, const shared_sstable &b) {
|
||||
auto sort_size = std::min(max_sstables, multi_window.size());
|
||||
boost::partial_sort(multi_window, multi_window.begin() + sort_size, [](const shared_sstable &a, const shared_sstable &b) {
|
||||
return a->get_stats_metadata().max_timestamp < b->get_stats_metadata().max_timestamp;
|
||||
});
|
||||
maybe_trim_job(multi_window, job_size, disjoint);
|
||||
|
||||
@@ -1132,7 +1132,12 @@ public:
|
||||
write(out, uint64_t(0));
|
||||
}
|
||||
|
||||
buf.remove_suffix(buf.size_bytes() - size);
|
||||
auto to_remove = buf.size_bytes() - size;
|
||||
// #20862 - we decrement usage counter based on buf.size() below.
|
||||
// Since we are shrinking buffer here, we need to also decrement
|
||||
// counter already
|
||||
buf.remove_suffix(to_remove);
|
||||
_segment_manager->totals.buffer_list_bytes -= to_remove;
|
||||
|
||||
// Build sector checksums.
|
||||
auto id = net::hton(_desc.id);
|
||||
@@ -3826,6 +3831,10 @@ uint64_t db::commitlog::get_total_size() const {
|
||||
;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_buffer_size() const {
|
||||
return _segment_manager->totals.buffer_list_bytes;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_completed_tasks() const {
|
||||
return _segment_manager->totals.allocation_count;
|
||||
}
|
||||
|
||||
@@ -306,6 +306,7 @@ public:
|
||||
future<> delete_segments(std::vector<sstring>) const;
|
||||
|
||||
uint64_t get_total_size() const;
|
||||
uint64_t get_buffer_size() const;
|
||||
uint64_t get_completed_tasks() const;
|
||||
uint64_t get_flush_count() const;
|
||||
uint64_t get_pending_tasks() const;
|
||||
|
||||
@@ -35,8 +35,6 @@
|
||||
#include <span>
|
||||
#include <unordered_map>
|
||||
|
||||
class fragmented_temporary_buffer;
|
||||
|
||||
namespace utils {
|
||||
class directories;
|
||||
} // namespace utils
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
Features
|
||||
========================
|
||||
|
||||
This document highlights ScyllaDB's key data modeling features.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
:hidden:
|
||||
|
||||
Lightweight Transactions </features/lwt/>
|
||||
Global Secondary Indexes </features/secondary-indexes/>
|
||||
@@ -12,6 +15,23 @@ Features
|
||||
Change Data Capture </features/cdc/index>
|
||||
Workload Attributes </features/workload-attributes>
|
||||
|
||||
`ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/stable/overview.html#enterprise-only-features>`_
|
||||
provides additional features, including Encryption at Rest,
|
||||
workload prioritization, auditing, and more.
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Features
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* Secondary Indexes and Materialized Views provide efficient search mechanisms
|
||||
on non-partition keys by creating an index.
|
||||
|
||||
* :doc:`Global Secondary Indexes </features/secondary-indexes/>`
|
||||
* :doc:`Local Secondary Indexes </features/local-secondary-indexes/>`
|
||||
* :doc:`Materialized Views </features/materialized-views/>`
|
||||
|
||||
* :doc:`Lightweight Transactions </features/lwt/>` provide conditional updates
|
||||
through linearizability.
|
||||
* :doc:`Counters </features/counters/>` are columns that only allow their values
|
||||
to be incremented, decremented, read, or deleted.
|
||||
* :doc:`Change Data Capture </features/cdc/index>` allows you to query the current
|
||||
state and the history of all changes made to tables in the database.
|
||||
* :doc:`Workload Attributes </features/workload-attributes>` assigned to your workloads
|
||||
specify how ScyllaDB will handle requests depending on the workload.
|
||||
|
||||
@@ -6,9 +6,9 @@ You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.2 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
.. |SCYLLADB_VERSION| replace:: 5.2
|
||||
|
||||
.. update the version folder URL below (variables won't work):
|
||||
https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/
|
||||
|
||||
====================================================
|
||||
Install ScyllaDB Without root Privileges
|
||||
====================================================
|
||||
@@ -24,14 +19,17 @@ Note that if you're on CentOS 7, only root offline installation is supported.
|
||||
Download and Install
|
||||
-----------------------
|
||||
|
||||
#. Download the latest tar.gz file for ScyllaDB |SCYLLADB_VERSION| (x86 or ARM) from https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/.
|
||||
#. Download the latest tar.gz file for ScyllaDB version (x86 or ARM) from ``https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-<version>/``.
|
||||
|
||||
Example for version 6.1: https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-6.1/
|
||||
|
||||
#. Uncompress the downloaded package.
|
||||
|
||||
The following example shows the package for ScyllaDB 5.2.4 (x86):
|
||||
The following example shows the package for ScyllaDB 6.1.1 (x86):
|
||||
|
||||
.. code:: console
|
||||
|
||||
tar xvfz scylla-unified-5.2.4-0.20230623.cebbf6c5df2b.x86_64.tar.gz
|
||||
tar xvfz scylla-unified-6.1.1-0.20240814.8d90b817660a.x86_64.tar.gz
|
||||
|
||||
#. Install OpenJDK 8 or 11.
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ With the recent addition of the `ScyllaDB Advisor <http://monitoring.docs.scylla
|
||||
Install ScyllaDB Manager
|
||||
------------------------
|
||||
|
||||
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>` together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
|
||||
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>`_ together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
|
||||
ScyllaDB Manager provides automated backups and repairs of your database.
|
||||
ScyllaDB Manager can manage multiple ScyllaDB clusters and run cluster-wide tasks in a controlled and predictable way.
|
||||
For example, with ScyllaDB Manager you can control the intensity of a repair, increasing it to speed up the process, or lower the intensity to ensure it minimizes impact on ongoing operations.
|
||||
|
||||
@@ -3,7 +3,7 @@ Encryption: Data in Transit Client to Node
|
||||
|
||||
Follow the procedures below to enable a client to node encryption.
|
||||
Once enabled, all communication between the client and the node is transmitted over TLS/SSL.
|
||||
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 certified.
|
||||
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 enabled.
|
||||
|
||||
Workflow
|
||||
^^^^^^^^
|
||||
|
||||
@@ -21,8 +21,8 @@ The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* -
|
||||
-
|
||||
* - scylla_alternator_batch_item_count
|
||||
- The total number of items processed across all batches
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -143,6 +143,7 @@ public:
|
||||
// whereas without it, it will fail the insert - i.e. for things like raft etc _all_ nodes should
|
||||
// have it or none, otherwise we can get partial failures on writes.
|
||||
gms::feature fragmented_commitlog_entries { *this, "FRAGMENTED_COMMITLOG_ENTRIES"sv };
|
||||
gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv };
|
||||
|
||||
// A feature just for use in tests. It must not be advertised unless
|
||||
// the "features_enable_test_feature" injection is enabled.
|
||||
|
||||
4
main.cc
4
main.cc
@@ -1389,7 +1389,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
scfg.statement_tenants = {
|
||||
{dbcfg.statement_scheduling_group, "$user"},
|
||||
{default_scheduling_group(), "$system"},
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance"}
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance", false}
|
||||
};
|
||||
scfg.streaming = dbcfg.streaming_scheduling_group;
|
||||
scfg.gossip = dbcfg.gossip_scheduling_group;
|
||||
@@ -1404,7 +1404,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}
|
||||
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
messaging.start(mscfg, scfg, creds).get();
|
||||
messaging.start(mscfg, scfg, creds, std::ref(feature_service)).get();
|
||||
auto stop_ms = defer_verbose_shutdown("messaging service", [&messaging] {
|
||||
messaging.invoke_on_all(&netw::messaging_service::stop).get();
|
||||
});
|
||||
|
||||
@@ -119,6 +119,7 @@
|
||||
#include "idl/mapreduce_request.dist.impl.hh"
|
||||
#include "idl/storage_service.dist.impl.hh"
|
||||
#include "idl/join_node.dist.impl.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
namespace netw {
|
||||
|
||||
@@ -232,9 +233,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
|
||||
return _rpc->unregister_handler(verb);
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
|
||||
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service)
|
||||
: messaging_service(config{std::move(id), ip, ip, port},
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -419,13 +420,14 @@ void messaging_service::do_start_listen() {
|
||||
}
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials)
|
||||
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service)
|
||||
: _cfg(std::move(cfg))
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
|
||||
, _clients(PER_SHARD_CONNECTION_COUNT + scfg.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT)
|
||||
, _scheduling_config(scfg)
|
||||
, _scheduling_info_for_connection_index(initial_scheduling_info())
|
||||
, _feature_service(feature_service)
|
||||
{
|
||||
_rpc->set_logger(&rpc_logger);
|
||||
|
||||
@@ -434,7 +436,8 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
// which in turn relies on _connection_index_for_tenant to be initialized.
|
||||
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
|
||||
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
|
||||
auto& tenant_cfg = _scheduling_config.statement_tenants[i];
|
||||
_connection_index_for_tenant.push_back({tenant_cfg.sched_group, i, tenant_cfg.enabled});
|
||||
}
|
||||
|
||||
register_handler(this, messaging_verb::CLIENT_ID, [this] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id, rpc::optional<uint64_t> max_result_size, rpc::optional<utils::UUID> host_id) {
|
||||
@@ -457,6 +460,7 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
});
|
||||
|
||||
init_local_preferred_ip_cache(_cfg.preferred_ips);
|
||||
init_feature_listeners();
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
@@ -679,16 +683,22 @@ messaging_service::get_rpc_client_idx(messaging_verb verb) const {
|
||||
return idx;
|
||||
}
|
||||
|
||||
// A statement or statement-ack verb
|
||||
const auto curr_sched_group = current_scheduling_group();
|
||||
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
|
||||
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
|
||||
// i == 0: the default tenant maps to the default client indexes belonging to the interval
|
||||
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
|
||||
idx += i * PER_TENANT_CONNECTION_COUNT;
|
||||
break;
|
||||
if (_connection_index_for_tenant[i].enabled) {
|
||||
// i == 0: the default tenant maps to the default client indexes belonging to the interval
|
||||
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
|
||||
idx += i * PER_TENANT_CONNECTION_COUNT;
|
||||
break;
|
||||
} else {
|
||||
// If the tenant is disable, immediately return current index to
|
||||
// use $system tenant.
|
||||
return idx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return idx;
|
||||
}
|
||||
|
||||
@@ -793,6 +803,22 @@ void messaging_service::cache_preferred_ip(gms::inet_address ep, gms::inet_addre
|
||||
remove_rpc_client(msg_addr(ep));
|
||||
}
|
||||
|
||||
void messaging_service::init_feature_listeners() {
|
||||
_maintenance_tenant_enabled_listener = _feature_service.maintenance_tenant.when_enabled([this] {
|
||||
enable_scheduling_tenant("$maintenance");
|
||||
});
|
||||
}
|
||||
|
||||
void messaging_service::enable_scheduling_tenant(std::string_view name) {
|
||||
for (size_t i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
if (_scheduling_config.statement_tenants[i].name == name) {
|
||||
_scheduling_config.statement_tenants[i].enabled = true;
|
||||
_connection_index_for_tenant[i].enabled = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gms::inet_address messaging_service::get_public_endpoint_for(const gms::inet_address& ip) const {
|
||||
auto i = _preferred_to_endpoint.find(ip);
|
||||
return i != _preferred_to_endpoint.end() ? i->second : ip;
|
||||
|
||||
@@ -45,6 +45,7 @@ namespace gms {
|
||||
class gossip_digest_ack2;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
class feature_service;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
@@ -299,6 +300,7 @@ public:
|
||||
struct tenant {
|
||||
scheduling_group sched_group;
|
||||
sstring name;
|
||||
bool enabled = true;
|
||||
};
|
||||
// Must have at least one element. No two tenants should have the same
|
||||
// scheduling group. [0] is the default tenant, that all unknown
|
||||
@@ -319,6 +321,7 @@ private:
|
||||
struct tenant_connection_index {
|
||||
scheduling_group sched_group;
|
||||
unsigned cliend_idx;
|
||||
bool enabled;
|
||||
};
|
||||
private:
|
||||
config _cfg;
|
||||
@@ -337,6 +340,7 @@ private:
|
||||
scheduling_config _scheduling_config;
|
||||
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
|
||||
std::vector<tenant_connection_index> _connection_index_for_tenant;
|
||||
gms::feature_service& _feature_service;
|
||||
|
||||
struct connection_ref;
|
||||
std::unordered_multimap<locator::host_id, connection_ref> _host_connections;
|
||||
@@ -351,8 +355,8 @@ private:
|
||||
public:
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port);
|
||||
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service);
|
||||
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service);
|
||||
~messaging_service();
|
||||
|
||||
future<> start();
|
||||
@@ -544,6 +548,12 @@ public:
|
||||
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
|
||||
unsigned get_rpc_client_idx(messaging_verb verb) const;
|
||||
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"}; // "forward" is the old name for "mapreduce"
|
||||
|
||||
void init_feature_listeners();
|
||||
private:
|
||||
std::any _maintenance_tenant_enabled_listener;
|
||||
|
||||
void enable_scheduling_tenant(std::string_view name);
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
@@ -186,6 +186,8 @@ std::set<gms::inet_address> task_manager_module::get_nodes() const noexcept {
|
||||
_ss._topology_state_machine._topology.transition_nodes
|
||||
) | boost::adaptors::transformed([&ss = _ss] (auto& node) {
|
||||
return ss.host2ip(locator::host_id{node.first.uuid()});
|
||||
}) | boost::adaptors::filtered([&ss = _ss] (auto& ip) {
|
||||
return ss._gossiper.is_alive(ip);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -102,16 +102,27 @@ static const auto raft_manual_recovery_doc = "https://docs.scylladb.com/master/a
|
||||
|
||||
class group0_rpc: public service::raft_rpc {
|
||||
direct_failure_detector::failure_detector& _direct_fd;
|
||||
gms::gossiper& _gossiper;
|
||||
public:
|
||||
explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd,
|
||||
raft_state_machine& sm, netw::messaging_service& ms,
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id)
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id, gms::gossiper& gossiper)
|
||||
: raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id)
|
||||
, _direct_fd(direct_fd)
|
||||
, _direct_fd(direct_fd), _gossiper(gossiper)
|
||||
{}
|
||||
|
||||
virtual void on_configuration_change(raft::server_address_set add, raft::server_address_set del) override {
|
||||
for (const auto& addr: add) {
|
||||
auto ip_for_id = _address_map.find(addr.id);
|
||||
if (!ip_for_id) {
|
||||
// Make sure that the addresses of new nodes in the configuration are in the address map
|
||||
auto ips = _gossiper.get_nodes_with_host_id(locator::host_id(addr.id.uuid()));
|
||||
for (auto ip : ips) {
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
_address_map.add_or_update_entry(addr.id, ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Entries explicitly managed via `rpc::on_configuration_change() should NOT be
|
||||
// expirable.
|
||||
_address_map.set_nonexpiring(addr.id);
|
||||
@@ -204,7 +215,7 @@ const raft::server_id& raft_group0::load_my_id() {
|
||||
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
|
||||
service::migration_manager& mm, bool topology_change_enabled) {
|
||||
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), _feat, topology_change_enabled);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id, _gossiper);
|
||||
// Keep a reference to a specific RPC class.
|
||||
auto& rpc_ref = *rpc;
|
||||
auto storage = std::make_unique<raft_sys_table_storage>(qp, gid, my_id);
|
||||
|
||||
@@ -77,7 +77,7 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
|
||||
}
|
||||
return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...)
|
||||
.handle_exception_type([loc= std::move(loc), id] (const seastar::rpc::closed_error& e) {;
|
||||
const auto msg = fmt::format("Failed to execute {} on leader {}: {}", loc.function_name(), id, e);
|
||||
const auto msg = fmt::format("Failed to execute {}, destination {}: {}", loc.function_name(), id, e);
|
||||
rlogger.trace("{}", msg);
|
||||
return make_exception_future<Ret>(raft::transport_error(msg));
|
||||
});
|
||||
|
||||
@@ -572,11 +572,11 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
|
||||
}
|
||||
SCYLLA_ASSERT(existing_ip);
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
if (rs.ring.has_value()) {
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
update_topology(host_id, ip, rs);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
|
||||
} else {
|
||||
// After adding replacing endpoint above the node will no longer be reported for reads and writes,
|
||||
@@ -4062,6 +4062,11 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
|
||||
|
||||
request_id = guard.new_group0_state_id();
|
||||
|
||||
if (auto itr = _topology_state_machine._topology.requests.find(id);
|
||||
itr != _topology_state_machine._topology.requests.end() && itr->second == topology_request::remove) {
|
||||
throw std::runtime_error("Removenode failed. Concurrent request for removal already in progress");
|
||||
}
|
||||
try {
|
||||
// Make non voter during request submission for better HA
|
||||
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
|
||||
@@ -6673,6 +6678,10 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
|
||||
co_return join_node_response_result{};
|
||||
}
|
||||
|
||||
if (utils::get_local_injector().enter("join_node_response_drop_expiring")) {
|
||||
_group0->modifiable_address_map().force_drop_expiring_entries();
|
||||
}
|
||||
|
||||
try {
|
||||
co_return co_await std::visit(overloaded_functor {
|
||||
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
|
||||
|
||||
@@ -1577,7 +1577,30 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.info("entered `{}` transition state", *tstate);
|
||||
switch (*tstate) {
|
||||
case topology::transition_state::join_group0: {
|
||||
auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard)));
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
if (node.rs->state == node_state::replacing) {
|
||||
// Make sure all nodes are no longer trying to write to a node being replaced. This is important
|
||||
// if the new node have the same IP, so that old write will not go to the new node by mistake after this point.
|
||||
// It is important to do so before the call to finish_accepting_node() below since after this call the new node becomes
|
||||
// a full member of the cluster and it starts loading an initial snapshot. Since snapshot loading is not atomic any queries
|
||||
// that are done in parallel may see a partial state.
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
std::current_exception());
|
||||
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool accepted;
|
||||
std::tie(node, accepted) = co_await finish_accepting_node(std::move(node));
|
||||
|
||||
// If responding to the joining node failed, move the node to the left state and
|
||||
// stop the topology transition.
|
||||
@@ -1649,22 +1672,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
// Make sure all nodes are no longer trying to write to a node being replaced. This is important if the new node have the same IP, so that old write will not
|
||||
// go to the new node by mistake
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
std::current_exception());
|
||||
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
|
||||
break;
|
||||
}
|
||||
|
||||
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
|
||||
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
|
||||
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());
|
||||
|
||||
@@ -23,8 +23,8 @@
|
||||
|
||||
#include <variant>
|
||||
|
||||
template<typename T>
|
||||
static inline T consume_be(temporary_buffer<char>& p) {
|
||||
template<typename T, ContiguousSharedBuffer Buffer>
|
||||
static inline T consume_be(Buffer& p) {
|
||||
T i = read_be<T>(p.get());
|
||||
p.trim_front(sizeof(T));
|
||||
return i;
|
||||
@@ -60,7 +60,9 @@ enum class read_status { ready, waiting };
|
||||
// }
|
||||
// return pc._u32;
|
||||
//
|
||||
class primitive_consumer {
|
||||
template<ContiguousSharedBuffer Buffer>
|
||||
class primitive_consumer_impl {
|
||||
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
|
||||
private:
|
||||
// state machine progress:
|
||||
enum class prestate {
|
||||
@@ -103,20 +105,26 @@ private:
|
||||
|
||||
// state for READING_BYTES prestate
|
||||
size_t _read_bytes_len = 0;
|
||||
utils::small_vector<temporary_buffer<char>, 1> _read_bytes;
|
||||
temporary_buffer<char> _read_bytes_buf; // for contiguous reading.
|
||||
utils::small_vector<Buffer, 1> _read_bytes;
|
||||
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
|
||||
fragmented_temporary_buffer* _read_bytes_where;
|
||||
FragmentedBuffer* _read_bytes_where;
|
||||
|
||||
// Alloc-free
|
||||
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) noexcept {
|
||||
inline read_status read_partial_int(Buffer& data, prestate next_state) noexcept {
|
||||
std::copy(data.begin(), data.end(), _read_int.bytes);
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_prestate = next_state;
|
||||
return read_status::waiting;
|
||||
}
|
||||
inline read_status read_partial_int(prestate next_state) noexcept {
|
||||
_pos = 0;
|
||||
_prestate = next_state;
|
||||
return read_status::waiting;
|
||||
}
|
||||
template <typename VintType, prestate ReadingVint, prestate ReadingVintWithLen>
|
||||
inline read_status read_vint(temporary_buffer<char>& data, typename VintType::value_type& dest) {
|
||||
inline read_status read_vint(Buffer& data, typename VintType::value_type& dest) {
|
||||
if (data.empty()) {
|
||||
_prestate = ReadingVint;
|
||||
return read_status::waiting;
|
||||
@@ -128,9 +136,8 @@ private:
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -140,23 +147,23 @@ private:
|
||||
}
|
||||
}
|
||||
template <typename VintType>
|
||||
inline read_status read_vint_with_len(temporary_buffer<char>& data, typename VintType::value_type& dest) {
|
||||
inline read_status read_vint_with_len(Buffer& data, typename VintType::value_type& dest) {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
dest = VintType::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
return read_status::waiting;
|
||||
};
|
||||
public:
|
||||
primitive_consumer(reader_permit permit) : _permit(std::move(permit)) {}
|
||||
primitive_consumer_impl(reader_permit permit) : _permit(std::move(permit)) {}
|
||||
|
||||
inline read_status read_8(temporary_buffer<char>& data) {
|
||||
inline read_status read_8(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint8_t)) {
|
||||
_u8 = consume_be<uint8_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -170,7 +177,7 @@ public:
|
||||
// (this is the common case), do this immediately. Otherwise, remember
|
||||
// what we have in the buffer, and remember to continue later by using
|
||||
// a "prestate":
|
||||
inline read_status read_16(temporary_buffer<char>& data) {
|
||||
inline read_status read_16(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -179,7 +186,7 @@ public:
|
||||
}
|
||||
}
|
||||
// Alloc-free
|
||||
inline read_status read_32(temporary_buffer<char>& data) noexcept {
|
||||
inline read_status read_32(Buffer& data) noexcept {
|
||||
if (data.size() >= sizeof(uint32_t)) {
|
||||
_u32 = consume_be<uint32_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -187,7 +194,10 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U32);
|
||||
}
|
||||
}
|
||||
inline read_status read_64(temporary_buffer<char>& data) {
|
||||
inline read_status read_32() noexcept {
|
||||
return read_partial_int(prestate::READING_U32);
|
||||
}
|
||||
inline read_status read_64(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint64_t)) {
|
||||
_u64 = consume_be<uint64_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -195,16 +205,24 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U64);
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
temporary_buffer<char> share(Buffer& data, uint32_t offset, uint32_t len) {
|
||||
if constexpr(std::is_same_v<Buffer, temporary_buffer<char>>) {
|
||||
return data.share(offset, len);
|
||||
} else {
|
||||
auto ret = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin() + offset, data.begin() + offset + len, ret.get_write());
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes_contiguous(Buffer& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
if (data.size() >= len) {
|
||||
where = data.share(0, len);
|
||||
where = share(data, 0, len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
// copy what we have so far, read the rest later
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
_pos = data.size();
|
||||
@@ -213,12 +231,12 @@ public:
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
|
||||
inline read_status read_bytes(Buffer& data, uint32_t len, FragmentedBuffer& where) {
|
||||
if (data.size() >= len) {
|
||||
auto fragments = std::move(where).release();
|
||||
fragments.clear();
|
||||
fragments.push_back(data.share(0, len));
|
||||
where = fragmented_temporary_buffer(std::move(fragments), len);
|
||||
where = FragmentedBuffer(std::move(fragments), len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
@@ -233,7 +251,7 @@ public:
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_short_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_short_length_bytes(Buffer& data, temporary_buffer<char>& where) {
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
} else {
|
||||
@@ -242,19 +260,19 @@ public:
|
||||
}
|
||||
return read_bytes_contiguous(data, uint32_t{_u16}, where);
|
||||
}
|
||||
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
|
||||
inline read_status read_unsigned_vint(Buffer& data) {
|
||||
return read_vint<
|
||||
unsigned_vint,
|
||||
prestate::READING_UNSIGNED_VINT,
|
||||
prestate::READING_UNSIGNED_VINT_WITH_LEN>(data, _u64);
|
||||
}
|
||||
inline read_status read_signed_vint(temporary_buffer<char>& data) {
|
||||
inline read_status read_signed_vint(Buffer& data) {
|
||||
return read_vint<
|
||||
signed_vint,
|
||||
prestate::READING_SIGNED_VINT,
|
||||
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(Buffer& data, temporary_buffer<char>& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
@@ -267,9 +285,8 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -279,7 +296,7 @@ public:
|
||||
}
|
||||
}
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes(Buffer& data, FragmentedBuffer& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
|
||||
_read_bytes_where = &where;
|
||||
@@ -292,9 +309,8 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -307,7 +323,7 @@ public:
|
||||
private:
|
||||
// Reads bytes belonging to an integer of size len. Returns true
|
||||
// if a full integer is now available.
|
||||
bool process_int(temporary_buffer<char>& data, unsigned len) {
|
||||
bool process_int(Buffer& data, unsigned len) {
|
||||
SCYLLA_ASSERT(_pos < len);
|
||||
auto n = std::min((size_t)(len - _pos), data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
|
||||
@@ -316,9 +332,18 @@ private:
|
||||
return _pos == len;
|
||||
}
|
||||
public:
|
||||
read_status consume_u32(Buffer& data) {
|
||||
if (process_int(data, sizeof(uint32_t))) {
|
||||
_u32 = net::ntoh(_read_int.uint32);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
return read_status::waiting;
|
||||
}
|
||||
|
||||
// Feeds data into the state machine.
|
||||
// After the call, when data is not empty then active() can be assumed to be false.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
if (__builtin_expect(_prestate == prestate::NONE, true)) {
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -360,12 +385,12 @@ public:
|
||||
return read_vint_with_len<signed_vint>(data, _i64);
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -375,12 +400,12 @@ public:
|
||||
}
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -390,11 +415,11 @@ public:
|
||||
}
|
||||
case prestate::READING_BYTES_CONTIGUOUS: {
|
||||
auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes_buf);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -406,8 +431,8 @@ public:
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
std::vector<temporary_buffer<char>> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
|
||||
*_read_bytes_where = fragmented_temporary_buffer(std::move(fragments), _read_bytes_len);
|
||||
std::vector<Buffer> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
|
||||
*_read_bytes_where = FragmentedBuffer(std::move(fragments), _read_bytes_len);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -435,12 +460,7 @@ public:
|
||||
}
|
||||
break;
|
||||
case prestate::READING_U32:
|
||||
if (process_int(data, sizeof(uint32_t))) {
|
||||
_u32 = net::ntoh(_read_int.uint32);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
break;
|
||||
return consume_u32(data);
|
||||
case prestate::READING_U64:
|
||||
if (process_int(data, sizeof(uint64_t))) {
|
||||
_u64 = net::ntoh(_read_int.uint64);
|
||||
@@ -461,6 +481,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using primitive_consumer = primitive_consumer_impl<temporary_buffer<char>>;
|
||||
|
||||
template <typename StateProcessor>
|
||||
class continuous_data_consumer : protected primitive_consumer {
|
||||
using proceed = data_consumer::proceed;
|
||||
|
||||
@@ -1110,7 +1110,7 @@ public:
|
||||
_consumer.consume_row_end();
|
||||
return;
|
||||
}
|
||||
if (_state != state::ROW_START || primitive_consumer::active()) {
|
||||
if (_state != state::ROW_START || data_consumer::primitive_consumer::active()) {
|
||||
throw malformed_sstable_exception("end of input, but not end of row");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,6 +145,24 @@ private:
|
||||
//
|
||||
using block_set_type = std::set<promoted_index_block, block_comparator>;
|
||||
block_set_type _blocks;
|
||||
private:
|
||||
using Buffer = cached_file::page_view;
|
||||
|
||||
struct u32_parser {
|
||||
data_consumer::primitive_consumer_impl<Buffer>& parser;
|
||||
|
||||
void reset() {
|
||||
parser.read_32();
|
||||
}
|
||||
|
||||
data_consumer::read_status consume(Buffer& buf) {
|
||||
return parser.consume_u32(buf);
|
||||
}
|
||||
|
||||
uint32_t value() const {
|
||||
return parser._u32;
|
||||
}
|
||||
};
|
||||
public:
|
||||
const schema& _s;
|
||||
uint64_t _promoted_index_start;
|
||||
@@ -152,26 +170,50 @@ public:
|
||||
metrics& _metrics;
|
||||
const pi_index_type _blocks_count;
|
||||
cached_file& _cached_file;
|
||||
data_consumer::primitive_consumer _primitive_parser;
|
||||
clustering_parser _clustering_parser;
|
||||
promoted_index_block_parser _block_parser;
|
||||
data_consumer::primitive_consumer_impl<Buffer> _primitive_parser;
|
||||
u32_parser _u32_parser;
|
||||
clustering_parser<Buffer> _clustering_parser;
|
||||
promoted_index_block_parser<Buffer> _block_parser;
|
||||
reader_permit _permit;
|
||||
cached_file::stream _stream;
|
||||
logalloc::allocating_section _as;
|
||||
private:
|
||||
// Feeds the stream into the consumer until the consumer is satisfied.
|
||||
// Does not give unconsumed data back to the stream.
|
||||
template <typename Consumer>
|
||||
future<> consume_stream(cached_file::stream& s, Consumer& c) {
|
||||
return repeat([&] {
|
||||
return s.next_page_view().then([&] (cached_file::page_view&& page) {
|
||||
future<> read(cached_file::offset_type pos, tracing::trace_state_ptr trace_state, Consumer& c) {
|
||||
struct retry_exception : std::exception {};
|
||||
_stream = _cached_file.read(pos, _permit, trace_state);
|
||||
c.reset();
|
||||
return repeat([this, pos, trace_state, &c] {
|
||||
return _stream.next_page_view().then([this, &c] (cached_file::page_view&& page) {
|
||||
if (!page) {
|
||||
on_internal_error(sstlog, "End of stream while parsing");
|
||||
}
|
||||
bool retry = false;
|
||||
return _as(_cached_file.region(), [&] {
|
||||
auto buf = page.get_buf();
|
||||
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
|
||||
if (retry) {
|
||||
throw retry_exception();
|
||||
}
|
||||
retry = true;
|
||||
|
||||
auto status = c.consume(page);
|
||||
|
||||
utils::get_local_injector().inject("cached_promoted_index_parsing_invalidate_buf_across_page", [&page] {
|
||||
page.release_and_scramble();
|
||||
});
|
||||
|
||||
utils::get_local_injector().inject("cached_promoted_index_bad_alloc_parsing_across_page", [this] {
|
||||
// Prevent reserve explosion in testing.
|
||||
_as.set_lsa_reserve(1);
|
||||
_as.set_std_reserve(1);
|
||||
throw std::bad_alloc();
|
||||
});
|
||||
|
||||
return stop_iteration(status == data_consumer::read_status::ready);
|
||||
});
|
||||
}).handle_exception_type([this, pos, trace_state, &c] (const retry_exception& e) {
|
||||
_stream = _cached_file.read(pos, _permit, trace_state);
|
||||
c.reset();
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -183,48 +225,17 @@ private:
|
||||
return _promoted_index_size - (_blocks_count - idx) * sizeof(pi_offset_type);
|
||||
}
|
||||
|
||||
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _permit, trace_state);
|
||||
return _stream.next_page_view().then([this] (cached_file::page_view page) {
|
||||
temporary_buffer<char> buf = page.get_buf();
|
||||
static_assert(noexcept(std::declval<data_consumer::primitive_consumer>().read_32(buf)));
|
||||
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
|
||||
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
|
||||
}
|
||||
return consume_stream(_stream, _primitive_parser).then([this] {
|
||||
return _primitive_parser._u32;
|
||||
});
|
||||
});
|
||||
}
|
||||
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state);
|
||||
|
||||
// Postconditions:
|
||||
// - block.start is engaged and valid.
|
||||
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + block.offset, _permit, trace_state);
|
||||
_clustering_parser.reset();
|
||||
return consume_stream(_stream, _clustering_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(_clustering_parser.get_and_reset());
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
});
|
||||
}
|
||||
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state);
|
||||
|
||||
// Postconditions:
|
||||
// - block.end is engaged, all fields in the block are valid
|
||||
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + block.offset, _permit, trace_state);
|
||||
_block_parser.reset();
|
||||
return consume_stream(_stream, _block_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(std::move(_block_parser.start()));
|
||||
block.end.emplace(std::move(_block_parser.end()));
|
||||
block.end_open_marker = _block_parser.end_open_marker();
|
||||
block.data_file_offset = _block_parser.offset();
|
||||
block.width = _block_parser.width();
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
});
|
||||
}
|
||||
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state);
|
||||
|
||||
public:
|
||||
/// \brief Returns a pointer to promoted_index_block entry which has at least offset and index fields valid.
|
||||
future<promoted_index_block*> get_block_only_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
auto i = _blocks.lower_bound(idx);
|
||||
@@ -242,6 +253,7 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
void erase_range(block_set_type::iterator begin, block_set_type::iterator end) {
|
||||
while (begin != end) {
|
||||
--_metrics.block_count;
|
||||
@@ -267,6 +279,7 @@ public:
|
||||
, _blocks_count(blocks_count)
|
||||
, _cached_file(f)
|
||||
, _primitive_parser(permit)
|
||||
, _u32_parser(_primitive_parser)
|
||||
, _clustering_parser(s, permit, cvfl, true)
|
||||
, _block_parser(s, permit, std::move(cvfl))
|
||||
, _permit(std::move(permit))
|
||||
@@ -333,6 +346,10 @@ public:
|
||||
erase_range(_blocks.begin(), _blocks.lower_bound(block->index));
|
||||
}
|
||||
|
||||
void clear() {
|
||||
erase_range(_blocks.begin(), _blocks.end());
|
||||
}
|
||||
|
||||
cached_file& file() { return _cached_file; }
|
||||
};
|
||||
} // namespace sstables::mc
|
||||
@@ -350,6 +367,40 @@ struct fmt::formatter<sstables::mc::cached_promoted_index::promoted_index_block>
|
||||
};
|
||||
|
||||
namespace sstables::mc {
|
||||
|
||||
inline
|
||||
future<cached_promoted_index::pi_offset_type>
|
||||
cached_promoted_index::read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
return read(_promoted_index_start + get_offset_entry_pos(idx), trace_state, _u32_parser).then([idx, this] {
|
||||
sstlog.trace("cached_promoted_index {}: read_block_offset: idx: {}, offset: {}", fmt::ptr(this), idx, _u32_parser.value());
|
||||
return _u32_parser.value();
|
||||
});
|
||||
}
|
||||
|
||||
inline
|
||||
future<> cached_promoted_index::read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
return read(_promoted_index_start + block.offset, trace_state, _clustering_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(_clustering_parser.get_and_reset());
|
||||
sstlog.trace("cached_promoted_index {}: read_block_start: {}", fmt::ptr(this), block);
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
});
|
||||
}
|
||||
|
||||
inline
|
||||
future<> cached_promoted_index::read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
return read(_promoted_index_start + block.offset, trace_state, _block_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(std::move(_block_parser.start()));
|
||||
block.end.emplace(std::move(_block_parser.end()));
|
||||
block.end_open_marker = _block_parser.end_open_marker();
|
||||
block.data_file_offset = _block_parser.offset();
|
||||
block.width = _block_parser.width();
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
sstlog.trace("cached_promoted_index {}: read_block: {}", fmt::ptr(this), block);
|
||||
});
|
||||
}
|
||||
|
||||
/// Cursor implementation which does binary search over index entries.
|
||||
///
|
||||
/// Memory consumption: O(log(N))
|
||||
@@ -460,6 +511,8 @@ public:
|
||||
, _trace_state(std::move(trace_state))
|
||||
{ }
|
||||
|
||||
cached_promoted_index& promoted_index() { return _promoted_index; }
|
||||
|
||||
future<std::optional<skip_info>> advance_to(position_in_partition_view pos) override {
|
||||
position_in_partition::less_compare less(_s);
|
||||
|
||||
|
||||
@@ -26,20 +26,22 @@ namespace mc {
|
||||
// while (cp.consume(next_buf()) == read_status::waiting) {}
|
||||
// position_in_partition pos = cp.get();
|
||||
//
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class clustering_parser {
|
||||
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
|
||||
const schema& _s;
|
||||
column_values_fixed_lengths _clustering_values_fixed_lengths;
|
||||
bool _parsing_start_key;
|
||||
boost::iterator_range<column_values_fixed_lengths::const_iterator> ck_range;
|
||||
|
||||
std::vector<fragmented_temporary_buffer> clustering_key_values;
|
||||
std::vector<FragmentedBuffer> clustering_key_values;
|
||||
bound_kind_m kind{};
|
||||
|
||||
fragmented_temporary_buffer column_value;
|
||||
FragmentedBuffer column_value;
|
||||
uint64_t ck_blocks_header = 0;
|
||||
uint32_t ck_blocks_header_offset = 0;
|
||||
std::optional<position_in_partition> _pos;
|
||||
data_consumer::primitive_consumer _primitive;
|
||||
data_consumer::primitive_consumer_impl<Buffer> _primitive;
|
||||
|
||||
enum class state {
|
||||
CLUSTERING_START,
|
||||
@@ -79,7 +81,7 @@ class clustering_parser {
|
||||
|
||||
position_in_partition make_position() {
|
||||
auto key = clustering_key_prefix::from_range(clustering_key_values | boost::adaptors::transformed(
|
||||
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
|
||||
[] (const FragmentedBuffer & b) { return typename FragmentedBuffer::view(b); }));
|
||||
|
||||
if (kind == bound_kind_m::clustering) {
|
||||
return position_in_partition::for_key(std::move(key));
|
||||
@@ -108,7 +110,7 @@ public:
|
||||
|
||||
// Feeds the data into the state machine.
|
||||
// Returns read_status::ready when !active() after the call.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
if (_primitive.consume(data) == read_status::waiting) {
|
||||
return read_status::waiting;
|
||||
}
|
||||
@@ -202,12 +204,15 @@ public:
|
||||
}
|
||||
|
||||
void reset() {
|
||||
_parsing_start_key = true;
|
||||
_state = state::CLUSTERING_START;
|
||||
_primitive.reset();
|
||||
}
|
||||
};
|
||||
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class promoted_index_block_parser {
|
||||
clustering_parser _clustering;
|
||||
clustering_parser<Buffer> _clustering;
|
||||
|
||||
std::optional<position_in_partition> _start_pos;
|
||||
std::optional<position_in_partition> _end_pos;
|
||||
@@ -228,7 +233,7 @@ class promoted_index_block_parser {
|
||||
DONE,
|
||||
} _state = state::START;
|
||||
|
||||
data_consumer::primitive_consumer _primitive;
|
||||
data_consumer::primitive_consumer_impl<Buffer> _primitive;
|
||||
public:
|
||||
using read_status = data_consumer::read_status;
|
||||
|
||||
@@ -246,7 +251,7 @@ public:
|
||||
// Feeds the data into the state machine.
|
||||
// Returns read_status::ready when whole block was parsed.
|
||||
// If returns read_status::waiting then data.empty() after the call.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
static constexpr size_t width_base = 65536;
|
||||
if (_primitive.consume(data) == read_status::waiting) {
|
||||
return read_status::waiting;
|
||||
@@ -318,7 +323,7 @@ public:
|
||||
|
||||
void reset() {
|
||||
_end_open_marker.reset();
|
||||
_clustering.set_parsing_start_key(true);
|
||||
_clustering.reset();
|
||||
_state = state::START;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -71,7 +71,7 @@ private:
|
||||
};
|
||||
|
||||
struct m_parser_context {
|
||||
mc::promoted_index_block_parser block_parser;
|
||||
mc::promoted_index_block_parser<temporary_buffer<char>> block_parser;
|
||||
|
||||
m_parser_context(const schema& s, reader_permit permit, column_values_fixed_lengths cvfl)
|
||||
: block_parser(s, std::move(permit), std::move(cvfl))
|
||||
|
||||
@@ -104,13 +104,16 @@ def check_increases_metric(metrics, metric_names):
|
||||
assert saved_metrics[n] < get_metric(metrics, n, None, the_metrics), f'metric {n} did not increase'
|
||||
|
||||
@contextmanager
|
||||
def check_increases_operation(metrics, operation_names):
|
||||
def check_increases_operation(metrics, operation_names, metric_name = 'scylla_alternator_operation', expected_value=None):
|
||||
the_metrics = get_metrics(metrics)
|
||||
saved_metrics = { x: get_metric(metrics, 'scylla_alternator_operation', {'op': x}, the_metrics) for x in operation_names }
|
||||
saved_metrics = { x: get_metric(metrics, metric_name, {'op': x}, the_metrics) for x in operation_names }
|
||||
yield
|
||||
the_metrics = get_metrics(metrics)
|
||||
for op in operation_names:
|
||||
assert saved_metrics[op] < get_metric(metrics, 'scylla_alternator_operation', {'op': op}, the_metrics)
|
||||
if expected_value:
|
||||
assert expected_value == get_metric(metrics, metric_name, {'op': op}, the_metrics) - saved_metrics[op]
|
||||
else:
|
||||
assert saved_metrics[op] < get_metric(metrics, metric_name, {'op': op}, the_metrics)
|
||||
|
||||
###### Test for metrics that count DynamoDB API operations:
|
||||
|
||||
@@ -125,6 +128,16 @@ def test_batch_get_item(test_table_s, metrics):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
def test_batch_write_item_count(test_table_s, metrics):
|
||||
with check_increases_operation(metrics, ['BatchWriteItem'], metric_name='scylla_alternator_batch_item_count', expected_value=2):
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}, {'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
|
||||
def test_batch_get_item_count(test_table_s, metrics):
|
||||
with check_increases_operation(metrics, ['BatchGetItem'], metric_name='scylla_alternator_batch_item_count', expected_value=2):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}, {'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
# Test counters for CreateTable, DescribeTable, UpdateTable and DeleteTable
|
||||
def test_table_operations(dynamodb, metrics):
|
||||
with check_increases_operation(metrics, ['CreateTable', 'DescribeTable', 'UpdateTable', 'DeleteTable']):
|
||||
|
||||
@@ -453,3 +453,52 @@ SEASTAR_THREAD_TEST_CASE(test_invalidation) {
|
||||
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_page_view_as_contiguous_shared_buffer) {
|
||||
auto page_size = cached_file::page_size;
|
||||
test_file tf = make_test_file(page_size);
|
||||
|
||||
cached_file_stats metrics;
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, cf_lru, region, page_size);
|
||||
|
||||
auto s = cf.read(1, std::nullopt);
|
||||
cached_file::page_view p = s.next_page_view().get();
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, page_size - 1), sstring(p.begin(), p.end()));
|
||||
BOOST_REQUIRE_EQUAL(p.size(), page_size - 1);
|
||||
BOOST_REQUIRE(!p.empty());
|
||||
|
||||
p.trim(10);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, 10), sstring(p.begin(), p.end()));
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, 10), sstring(p.get_write(), p.end()));
|
||||
|
||||
p.trim_front(1);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(2, 9), sstring(p.begin(), p.end()));
|
||||
|
||||
// Check movability
|
||||
{
|
||||
auto p_cpy = p.share();
|
||||
auto p1 = std::move(p_cpy);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(2, 9), sstring(p1.begin(), p1.end()));
|
||||
BOOST_REQUIRE(p_cpy.empty());
|
||||
BOOST_REQUIRE(p_cpy.size() == 0);
|
||||
BOOST_REQUIRE(!p_cpy);
|
||||
}
|
||||
|
||||
auto p2 = p.share(2, 3);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(4, 3), sstring(p2.begin(), p2.end()));
|
||||
p2.trim_front(1); // should not affect p
|
||||
|
||||
p.trim_front(9);
|
||||
BOOST_REQUIRE_EQUAL(p.size(), 0);
|
||||
BOOST_REQUIRE(p.begin() == p.end());
|
||||
|
||||
p = {};
|
||||
BOOST_REQUIRE_EQUAL(p.size(), 0);
|
||||
BOOST_REQUIRE(p.begin() == p.end());
|
||||
BOOST_REQUIRE(!p);
|
||||
BOOST_REQUIRE_EQUAL(sstring(p.begin(), p.end()), sstring());
|
||||
|
||||
// p should not affect p2
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(5, 2), sstring(p2.begin(), p2.end()));
|
||||
}
|
||||
|
||||
@@ -2024,3 +2024,35 @@ SEASTAR_TEST_CASE(test_oversized_several_medium) {
|
||||
SEASTAR_TEST_CASE(test_oversized_several_large) {
|
||||
co_await test_oversized(8, 32);
|
||||
}
|
||||
|
||||
// tests #20862 - buffer usage counter not being updated correctly
|
||||
SEASTAR_TEST_CASE(test_commitlog_buffer_size_counter) {
|
||||
commitlog::config cfg;
|
||||
tmpdir tmp;
|
||||
cfg.commit_log_location = tmp.path().string();
|
||||
auto log = co_await commitlog::create_commitlog(cfg);
|
||||
|
||||
rp_set rps;
|
||||
// uncomment for verbosity
|
||||
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
|
||||
|
||||
auto uuid = make_table_id();
|
||||
auto size = 1024;
|
||||
|
||||
auto size_before_alloc = log.get_buffer_size();
|
||||
|
||||
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
|
||||
dst.fill('1', size);
|
||||
});
|
||||
h.release();
|
||||
|
||||
auto size_after_alloc = log.get_buffer_size();
|
||||
co_await log.sync_all_segments();
|
||||
auto size_after_sync = log.get_buffer_size();
|
||||
|
||||
BOOST_CHECK_LE(size_before_alloc, size_after_alloc);
|
||||
BOOST_CHECK_LE(size_after_sync, size_before_alloc);
|
||||
|
||||
co_await log.shutdown();
|
||||
co_await log.clear();
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/sstable_test_env.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/make_random_string.hh"
|
||||
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
|
||||
@@ -46,3 +47,96 @@ SEASTAR_TEST_CASE(test_abort_during_index_read) {
|
||||
consumer_ctx.close().get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_promoted_index_parsing_page_crossing_and_retries) {
|
||||
return test_env::do_with_async([](test_env& env) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
testlog.info("Skipped because error injection is not enabled");
|
||||
#else
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto pk = ss.make_pkey();
|
||||
auto mut = mutation(s, pk);
|
||||
|
||||
// enough to have same index block whose clustering key is split across pages
|
||||
std::vector<clustering_key> keys;
|
||||
const auto n_keys = 100;
|
||||
auto key_size = cached_file::page_size / 3; // guarantees that index blocks are not congruent with page size.
|
||||
keys.reserve(n_keys);
|
||||
for (int i = 0; i < n_keys; ++i) {
|
||||
keys.push_back(ss.make_ckey(make_random_string(key_size)));
|
||||
ss.add_row(mut, keys[i], "v");
|
||||
}
|
||||
|
||||
clustering_key::less_compare less(*s);
|
||||
std::sort(keys.begin(), keys.end(), less);
|
||||
|
||||
env.manager().set_promoted_index_block_size(1); // force entry for each row
|
||||
auto mut_reader = make_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), std::move(mut));
|
||||
auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer());
|
||||
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
tracing::trace_state_ptr trace = nullptr;
|
||||
|
||||
auto index = std::make_unique<index_reader>(sst, permit, trace, use_caching::yes, true);
|
||||
auto close_index = deferred_close(*index);
|
||||
|
||||
index->advance_to(dht::ring_position_view(pk)).get();
|
||||
index->read_partition_data().get();
|
||||
|
||||
auto cur = dynamic_cast<mc::bsearch_clustered_cursor*>(index->current_clustered_cursor());
|
||||
BOOST_REQUIRE(cur);
|
||||
|
||||
std::optional<cached_file::offset_type> prev_offset;
|
||||
int crossed_page = 0;
|
||||
|
||||
utils::get_local_injector().enable("cached_promoted_index_parsing_invalidate_buf_across_page", false);
|
||||
|
||||
for (int i = 0; i < n_keys - 1; ++i) {
|
||||
auto block_offset = cur->promoted_index().get_block_only_offset(i, trace).get()->offset;
|
||||
auto next_block_offset = cur->promoted_index().get_block_only_offset(i + 1, trace).get()->offset;
|
||||
|
||||
auto start_page = block_offset / cached_file::page_size;
|
||||
auto end_page = (next_block_offset - 1) / cached_file::page_size;
|
||||
if (start_page != end_page) {
|
||||
auto pos = position_in_partition::for_key(keys[i]);
|
||||
position_in_partition::equal_compare eq(*s);
|
||||
|
||||
testlog.info("Crossed page at block {}, offset [{}, {})", i, block_offset, next_block_offset);
|
||||
crossed_page++;
|
||||
|
||||
auto* block = cur->promoted_index().get_block(i, trace).get();
|
||||
|
||||
testlog.debug("key : {}", pos);
|
||||
testlog.debug("start : {}", *block->start);
|
||||
testlog.debug("end : {}", *block->end);
|
||||
|
||||
BOOST_REQUIRE(eq(*block->start, pos));
|
||||
BOOST_REQUIRE(eq(*block->end, pos));
|
||||
if (prev_offset) {
|
||||
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
|
||||
}
|
||||
|
||||
cur->promoted_index().clear();
|
||||
|
||||
utils::get_local_injector().enable("cached_promoted_index_bad_alloc_parsing_across_page", true);
|
||||
block = cur->promoted_index().get_block(i, trace).get();
|
||||
|
||||
testlog.debug("start : {}", *block->start);
|
||||
testlog.debug("end : {}", *block->end);
|
||||
BOOST_REQUIRE(eq(*block->start, pos));
|
||||
BOOST_REQUIRE(eq(*block->end, pos));
|
||||
if (prev_offset) {
|
||||
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
|
||||
}
|
||||
|
||||
prev_offset = block->data_file_offset;
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_GE(crossed_page, 6); // If not, increase n_keys
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
@@ -709,7 +709,7 @@ private:
|
||||
port = tmp.local_address().port();
|
||||
}
|
||||
// Don't start listening so tests can be run in parallel if cfg_in.ms_listen is not set to true explicitly.
|
||||
_ms.start(host_id, listen, std::move(port)).get();
|
||||
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service)).get();
|
||||
stop_ms = defer(stop_type(stop_ms_func));
|
||||
|
||||
if (cfg_in.ms_listen) {
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
#include <seastar/core/app-template.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/application_state.hh"
|
||||
@@ -56,6 +58,7 @@ int main(int ac, char ** av) {
|
||||
|
||||
sharded<abort_source> abort_sources;
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
sharded<gms::feature_service> feature_service;
|
||||
sharded<netw::messaging_service> messaging;
|
||||
|
||||
abort_sources.start().get();
|
||||
@@ -68,7 +71,10 @@ int main(int ac, char ** av) {
|
||||
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||
auto stop_token_mgr = defer([&] { token_metadata.stop().get(); });
|
||||
|
||||
messaging.start(locator::host_id{}, listen, 7000).get();
|
||||
auto cfg = gms::feature_config_from_db_config(db::config(), {});
|
||||
feature_service.start(cfg).get();
|
||||
|
||||
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
|
||||
auto stop_messaging = deferred_stop(messaging);
|
||||
|
||||
gms::gossip_config gcfg;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "gms/feature_service.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossip_digest_syn.hh"
|
||||
#include "gms/gossip_digest_ack.hh"
|
||||
@@ -192,8 +193,11 @@ int main(int ac, char ** av) {
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||
auto stop_tm = deferred_stop(token_metadata);
|
||||
seastar::sharded<gms::feature_service> feature_service;
|
||||
auto cfg = gms::feature_config_from_db_config(db::config(), {});
|
||||
feature_service.start(cfg).get();
|
||||
seastar::sharded<netw::messaging_service> messaging;
|
||||
messaging.start(locator::host_id{}, listen, 7000).get();
|
||||
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
|
||||
auto stop_messaging = deferred_stop(messaging);
|
||||
seastar::sharded<tester> testers;
|
||||
testers.start(std::ref(messaging)).get();
|
||||
|
||||
@@ -135,3 +135,19 @@ async def test_rebuild_node(manager: ManagerClient, random_tables: RandomTables)
|
||||
servers = await manager.running_servers()
|
||||
await manager.rebuild_node(servers[0].server_id)
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_removenode(manager: ManagerClient):
|
||||
servers = await manager.running_servers()
|
||||
assert len(servers) >= 3
|
||||
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
|
||||
try:
|
||||
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id),
|
||||
manager.remove_node(servers[1].server_id, servers[2].server_id)])
|
||||
except Exception as e:
|
||||
logger.info(f"exception raised due to concurrent remove node requests: {e}")
|
||||
else:
|
||||
raise Exception("concurrent removenode request should result in a failure, but unexpectedly succeeded")
|
||||
|
||||
|
||||
@@ -27,16 +27,20 @@ async def test_long_join(manager: ManagerClient) -> None:
|
||||
await asyncio.gather(task)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_long_join_drop_wntries_on_bootstrapping(manager: ManagerClient) -> None:
|
||||
async def test_long_join_drop_entries_on_bootstrapping(manager: ManagerClient) -> None:
|
||||
"""The test checks that join works even if expiring entries are dropped
|
||||
on the joining node between placement of the join request and its processing"""
|
||||
s1 = await manager.server_add()
|
||||
servers = await manager.servers_add(2)
|
||||
inj = 'topology_coordinator_pause_before_processing_backlog'
|
||||
await manager.api.enable_injection(s1.ip_addr, inj, one_shot=True)
|
||||
s2 = await manager.server_add(start=False, config={
|
||||
[await manager.api.enable_injection(s.ip_addr, inj, one_shot=True) for s in servers]
|
||||
s = await manager.server_add(start=False, config={
|
||||
'error_injections_at_startup': ['pre_server_start_drop_expiring']
|
||||
})
|
||||
task = asyncio.create_task(manager.server_start(s2.server_id))
|
||||
await manager.server_sees_other_server(s1.ip_addr, s2.ip_addr, interval=300)
|
||||
await manager.api.message_injection(s1.ip_addr, inj)
|
||||
task = asyncio.create_task(manager.server_start(s.server_id))
|
||||
log = await manager.server_open_log(s.server_id)
|
||||
await log.wait_for("init - starting gossiper")
|
||||
servers.append(s)
|
||||
await manager.servers_see_each_other(servers, interval=300)
|
||||
await manager.api.enable_injection(s.ip_addr, 'join_node_response_drop_expiring', one_shot=True)
|
||||
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:-1]]
|
||||
await asyncio.gather(task)
|
||||
|
||||
@@ -79,6 +79,10 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
# Prevent scylladb/scylladb#20791
|
||||
logging.info(f"Wait until {srv1} sees {others} as alive")
|
||||
await manager.server_sees_others(srv1.server_id, len(others))
|
||||
|
||||
logging.info(f"{others} restarted, waiting until driver reconnects to them")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
|
||||
|
||||
@@ -106,9 +106,15 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
parameters=[k, v],
|
||||
host=host2)
|
||||
finish_time = time.time()
|
||||
await replace_future
|
||||
s = await replace_future
|
||||
logger.info(f"done, writes count {next_id}, took {finish_time - start_time} seconds")
|
||||
|
||||
# make sure that after we start snapshot transfer we no longer have stale writes
|
||||
log = await manager.server_open_log(s.server_id)
|
||||
m = await log.wait_for("group0_raft_sm - transfer snapshot from ")
|
||||
errs = await log.grep("storage_proxy - Failed to apply mutation from", from_mark=m)
|
||||
assert len(errs) == 0
|
||||
|
||||
result_set = await manager.get_cql().run_async(SimpleStatement("select * from ks.test_table",
|
||||
consistency_level=ConsistencyLevel.QUORUM),
|
||||
host=host2, all_pages=True)
|
||||
|
||||
@@ -79,6 +79,10 @@ private:
|
||||
}
|
||||
return std::unique_ptr<cached_page, cached_page_del>(this);
|
||||
}
|
||||
|
||||
bool only_ref() const {
|
||||
return _use_count <= 1;
|
||||
}
|
||||
public:
|
||||
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
|
||||
: parent(parent)
|
||||
@@ -115,11 +119,10 @@ private:
|
||||
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
|
||||
}
|
||||
|
||||
// Returns a buffer which reflects contents of this page.
|
||||
// The buffer will not prevent eviction.
|
||||
// Returns a pointer to the contents of the page.
|
||||
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
|
||||
temporary_buffer<char> get_buf_weak() {
|
||||
return temporary_buffer<char>(_lsa_buf.get(), _lsa_buf.size(), deleter());
|
||||
char* begin() {
|
||||
return _lsa_buf.get();
|
||||
}
|
||||
|
||||
size_t size_in_allocator() {
|
||||
@@ -208,10 +211,11 @@ public:
|
||||
class page_view {
|
||||
cached_page::ptr_type _page;
|
||||
size_t _offset;
|
||||
size_t _size;
|
||||
size_t _size = 0;
|
||||
std::optional<reader_permit::resource_units> _units;
|
||||
public:
|
||||
page_view() = default;
|
||||
|
||||
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
|
||||
: _page(std::move(page))
|
||||
, _offset(offset)
|
||||
@@ -219,15 +223,64 @@ public:
|
||||
, _units(std::move(units))
|
||||
{}
|
||||
|
||||
// The returned buffer is valid only until the LSA region associated with cached_file invalidates references.
|
||||
temporary_buffer<char> get_buf() {
|
||||
auto buf = _page->get_buf_weak();
|
||||
buf.trim(_size);
|
||||
buf.trim_front(_offset);
|
||||
return buf;
|
||||
page_view(page_view&& o) noexcept
|
||||
: _page(std::move(o._page))
|
||||
, _offset(std::exchange(o._offset, 0))
|
||||
, _size(std::exchange(o._size, 0))
|
||||
, _units(std::move(o._units))
|
||||
{}
|
||||
|
||||
page_view& operator=(page_view&& o) noexcept {
|
||||
_page = std::move(o._page);
|
||||
_offset = std::exchange(o._offset, 0);
|
||||
_size = std::exchange(o._size, 0);
|
||||
_units = std::move(o._units);
|
||||
return *this;
|
||||
}
|
||||
|
||||
operator bool() const { return bool(_page); }
|
||||
// Fills the page with garbage, releases the pointer and evicts the page so that it's no longer in cache.
|
||||
// For testing use-after-free on the buffer space.
|
||||
// After the call, the object is the same state as after being moved-from.
|
||||
void release_and_scramble() noexcept {
|
||||
if (_page->only_ref()) {
|
||||
std::memset(_page->_lsa_buf.get(), 0xfe, _page->_lsa_buf.size());
|
||||
cached_page& cp = *_page;
|
||||
_page = nullptr;
|
||||
cp.parent->_lru.remove(cp);
|
||||
cp.on_evicted();
|
||||
} else {
|
||||
_page = nullptr;
|
||||
}
|
||||
_size = 0;
|
||||
_offset = 0;
|
||||
_units = std::nullopt;
|
||||
}
|
||||
|
||||
operator bool() const { return bool(_page) && _size; }
|
||||
public: // ContiguousSharedBuffer concept
|
||||
const char* begin() const { return _page ? _page->begin() + _offset : nullptr; }
|
||||
const char* get() const { return begin(); }
|
||||
const char* end() const { return begin() + _size; }
|
||||
size_t size() const { return _size; }
|
||||
bool empty() const { return !_size; }
|
||||
char* get_write() { return const_cast<char*>(begin()); }
|
||||
|
||||
void trim(size_t pos) {
|
||||
_size = pos;
|
||||
}
|
||||
|
||||
void trim_front(size_t n) {
|
||||
_offset += n;
|
||||
_size -= n;
|
||||
}
|
||||
|
||||
page_view share() {
|
||||
return share(0, _size);
|
||||
}
|
||||
|
||||
page_view share(size_t pos, size_t len) {
|
||||
return page_view(_offset + pos, len, _page->share(), {});
|
||||
}
|
||||
};
|
||||
|
||||
// Generator of subsequent pages of data reflecting the contents of the file.
|
||||
@@ -306,7 +359,7 @@ public:
|
||||
? _cached_file->_last_page_size
|
||||
: page_size;
|
||||
units = get_page_units(page_size);
|
||||
page_view buf(_offset_in_page, size, std::move(page), std::move(units));
|
||||
page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), std::move(units));
|
||||
_offset_in_page = 0;
|
||||
++_page_idx;
|
||||
return buf;
|
||||
|
||||
40
utils/contiguous_shared_buffer.hh
Normal file
40
utils/contiguous_shared_buffer.hh
Normal file
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <concepts>
|
||||
#include <memory>
|
||||
|
||||
// A contiguous buffer of char objects which can be trimmed and
|
||||
// supports zero-copy sharing of its underlying memory.
|
||||
template<typename T>
|
||||
concept ContiguousSharedBuffer = std::movable<T>
|
||||
&& std::default_initializable<T>
|
||||
&& requires(T& obj, size_t pos, size_t len) {
|
||||
|
||||
// Creates a new buffer that shares the memory of the original buffer.
|
||||
// The lifetime of the new buffer is independent of the original buffer.
|
||||
{ obj.share() } -> std::same_as<T>;
|
||||
|
||||
// Like share() but the new buffer represents a sub-range of the original buffer.
|
||||
{ obj.share(pos, len) } -> std::same_as<T>;
|
||||
|
||||
// Trims the suffix of a buffer so that 'len' is the index of the first removed byte.
|
||||
{ obj.trim(len) } -> std::same_as<void>;
|
||||
|
||||
// Trims the prefix of the buffer so that `pos` is the index of the first byte after the trim.
|
||||
{ obj.trim_front(pos) } -> std::same_as<void>;
|
||||
|
||||
{ obj.begin() } -> std::same_as<const char*>;
|
||||
{ obj.get() } -> std::same_as<const char*>;
|
||||
{ obj.get_write() } -> std::same_as<char*>;
|
||||
{ obj.end() } -> std::same_as<const char*>;
|
||||
{ obj.size() } -> std::same_as<size_t>;
|
||||
{ obj.empty() } -> std::same_as<bool>;
|
||||
};
|
||||
@@ -17,11 +17,13 @@
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "bytes_ostream.hh"
|
||||
#include "contiguous_shared_buffer.hh"
|
||||
#include "fragment_range.hh"
|
||||
|
||||
/// Fragmented buffer consisting of multiple temporary_buffer<char>
|
||||
class fragmented_temporary_buffer {
|
||||
using vector_type = std::vector<seastar::temporary_buffer<char>>;
|
||||
/// Fragmented buffer consisting of multiple Buffer objects.
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer {
|
||||
using vector_type = std::vector<Buffer>;
|
||||
vector_type _fragments;
|
||||
size_t _size_bytes = 0;
|
||||
public:
|
||||
@@ -30,15 +32,15 @@ public:
|
||||
class view;
|
||||
class istream;
|
||||
class reader;
|
||||
using ostream = seastar::memory_output_stream<vector_type::iterator>;
|
||||
using ostream = seastar::memory_output_stream<typename vector_type::iterator>;
|
||||
|
||||
fragmented_temporary_buffer() = default;
|
||||
basic_fragmented_buffer() = default;
|
||||
|
||||
fragmented_temporary_buffer(std::vector<seastar::temporary_buffer<char>> fragments, size_t size_bytes) noexcept
|
||||
basic_fragmented_buffer(std::vector<Buffer> fragments, size_t size_bytes) noexcept
|
||||
: _fragments(std::move(fragments)), _size_bytes(size_bytes)
|
||||
{ }
|
||||
|
||||
fragmented_temporary_buffer(const char* str, size_t size)
|
||||
basic_fragmented_buffer(const char* str, size_t size)
|
||||
{
|
||||
*this = allocate_to_fit(size);
|
||||
size_t pos = 0;
|
||||
@@ -54,10 +56,10 @@ public:
|
||||
|
||||
ostream get_ostream() noexcept {
|
||||
if (_fragments.size() != 1) {
|
||||
return ostream::fragmented(_fragments.begin(), _size_bytes);
|
||||
return typename ostream::fragmented(_fragments.begin(), _size_bytes);
|
||||
}
|
||||
auto& current = *_fragments.begin();
|
||||
return ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
|
||||
return typename ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
|
||||
}
|
||||
|
||||
using const_fragment_iterator = typename vector_type::const_iterator;
|
||||
@@ -100,23 +102,23 @@ public:
|
||||
_fragments.erase(it.base(), _fragments.end());
|
||||
}
|
||||
|
||||
// Creates a fragmented temporary buffer of a specified size, supplied as a parameter.
|
||||
// Creates a fragmented buffer of a specified size, supplied as a parameter.
|
||||
// Max chunk size is limited to 128kb (the same limit as `bytes_stream` has).
|
||||
static fragmented_temporary_buffer allocate_to_fit(size_t data_size) {
|
||||
static basic_fragmented_buffer allocate_to_fit(size_t data_size) {
|
||||
constexpr size_t max_fragment_size = default_fragment_size; // 128KB
|
||||
|
||||
const size_t full_fragment_count = data_size / max_fragment_size; // number of max-sized fragments
|
||||
const size_t last_fragment_size = data_size % max_fragment_size;
|
||||
|
||||
std::vector<seastar::temporary_buffer<char>> fragments;
|
||||
std::vector<Buffer> fragments;
|
||||
fragments.reserve(full_fragment_count + !!last_fragment_size);
|
||||
for (size_t i = 0; i < full_fragment_count; ++i) {
|
||||
fragments.emplace_back(seastar::temporary_buffer<char>(max_fragment_size));
|
||||
fragments.emplace_back(Buffer(max_fragment_size));
|
||||
}
|
||||
if (last_fragment_size) {
|
||||
fragments.emplace_back(seastar::temporary_buffer<char>(last_fragment_size));
|
||||
fragments.emplace_back(Buffer(last_fragment_size));
|
||||
}
|
||||
return fragmented_temporary_buffer(std::move(fragments), data_size);
|
||||
return basic_fragmented_buffer(std::move(fragments), data_size);
|
||||
}
|
||||
|
||||
vector_type release() && noexcept {
|
||||
@@ -124,7 +126,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class fragmented_temporary_buffer::view {
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer<Buffer>::view {
|
||||
vector_type::const_iterator _current;
|
||||
const char* _current_position = nullptr;
|
||||
size_t _current_size = 0;
|
||||
@@ -252,7 +255,7 @@ public:
|
||||
_current_size = std::min(_current_size, _total_size);
|
||||
}
|
||||
|
||||
bool operator==(const fragmented_temporary_buffer::view& other) const noexcept {
|
||||
bool operator==(const basic_fragmented_buffer::view& other) const noexcept {
|
||||
auto this_it = begin();
|
||||
auto other_it = other.begin();
|
||||
|
||||
@@ -285,10 +288,14 @@ public:
|
||||
return this_it == end() && other_it == other.end();
|
||||
}
|
||||
};
|
||||
|
||||
using fragmented_temporary_buffer = basic_fragmented_buffer<temporary_buffer<char>>;
|
||||
|
||||
static_assert(FragmentRange<fragmented_temporary_buffer::view>);
|
||||
static_assert(FragmentedView<fragmented_temporary_buffer::view>);
|
||||
|
||||
inline fragmented_temporary_buffer::operator view() const noexcept
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
inline basic_fragmented_buffer<Buffer>::operator view() const noexcept
|
||||
{
|
||||
if (!_size_bytes) {
|
||||
return view();
|
||||
@@ -305,7 +312,8 @@ concept ExceptionThrower = requires(T obj, size_t n) {
|
||||
|
||||
}
|
||||
|
||||
class fragmented_temporary_buffer::istream {
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer<Buffer>::istream {
|
||||
vector_type::const_iterator _current;
|
||||
const char* _current_position;
|
||||
const char* _current_end;
|
||||
@@ -465,29 +473,32 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline fragmented_temporary_buffer::istream fragmented_temporary_buffer::get_istream() const noexcept // allow empty (ut for that)
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
inline basic_fragmented_buffer<Buffer>::istream basic_fragmented_buffer<Buffer>::get_istream() const noexcept // allow empty (ut for that)
|
||||
{
|
||||
return istream(_fragments, _size_bytes);
|
||||
}
|
||||
|
||||
class fragmented_temporary_buffer::reader {
|
||||
std::vector<temporary_buffer<char>> _fragments;
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer<Buffer>::reader {
|
||||
using FragBuffer = basic_fragmented_buffer<Buffer>;
|
||||
FragBuffer::vector_type _fragments;
|
||||
size_t _left = 0;
|
||||
public:
|
||||
future<fragmented_temporary_buffer> read_exactly(input_stream<char>& in, size_t length) {
|
||||
_fragments = std::vector<temporary_buffer<char>>();
|
||||
future<FragBuffer> read_exactly(input_stream<char>& in, size_t length) {
|
||||
_fragments = FragBuffer::vector_type();
|
||||
_left = length;
|
||||
return repeat_until_value([this, length, &in] {
|
||||
if (!_left) {
|
||||
return make_ready_future<std::optional<fragmented_temporary_buffer>>(fragmented_temporary_buffer(std::move(_fragments), length));
|
||||
return make_ready_future<std::optional<FragBuffer>>(FragBuffer(std::move(_fragments), length));
|
||||
}
|
||||
return in.read_up_to(_left).then([this] (temporary_buffer<char> buf) {
|
||||
if (buf.empty()) {
|
||||
return std::make_optional(fragmented_temporary_buffer());
|
||||
return std::make_optional(FragBuffer());
|
||||
}
|
||||
_left -= buf.size();
|
||||
_fragments.emplace_back(std::move(buf));
|
||||
return std::optional<fragmented_temporary_buffer>();
|
||||
_fragments.emplace_back(Buffer(std::move(buf)));
|
||||
return std::optional<FragBuffer>();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -495,7 +506,8 @@ public:
|
||||
|
||||
// The operator below is used only for logging
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& out, const fragmented_temporary_buffer::view& v) {
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
inline std::ostream& operator<<(std::ostream& out, const typename basic_fragmented_buffer<Buffer>::view& v) {
|
||||
for (bytes_view frag : fragment_range(v)) {
|
||||
out << to_hex(frag);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user