mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 01:20:39 +00:00
This series fixes one cause of oversized allocations - and therefore potentially stalls and increased tail latencies - in Alternator.
The first patch in the series is the main fix - the later patches are cleanups requested by reviewers but also involved other pre-existing code, so I did those cleanups as separate patches.
Alternator's Scan or Query operation return a page of results. When the number of items is not limited by a "Limit" parameter, the default is to return a 1 MB page. If items are short, a large number of them can fit in that 1MB. The test test_query.py::test_query_large_page_small_rows has 30,000 items returned in a single page.
In the response JSON, all these items are returned in a single array "Items". Before this patch, we build the full response as a RapidJSON object before sending it. The problem is that unfortunately, RapidJSON stores arrays as contiguous allocations. This results in large contiguous allocations in workloads that scan many small items, and large contiguous allocations can also cause stalls and high tail latencies. For example, before this patch, running
test/alternator/run --runveryslow \
test_query.py::test_query_large_page_small_rows
reports in the log:
oversized allocation: 573440 bytes.
After this patch, this warning no longer appears.
The patch solves the problem by collecting the scanned items not in a RapidJSON array, but rather in a chunked_vector<rjson::value>, i.e, a chunked (non-contiguous) array of items (each a JSON value). After collecting this array separately from the response object, we need to print its content without actually inserting it into the object - we add a new function print_with_extra_array() to do that.
The new separate-chunked-vector technique is used when a large number (currently, >256) of items were scanned. When there is a smaller number of items in a page (this is typical when each item is longer), we just insert those items in the object and print it as before.
Beyond the original slow test that demonstrated the oversized allocation (which is now gone), this patch also includes a new test which exercises the new code with a scan of 700 (>256) items in a page - but this new test is fast enough to be permanently in our test suite and not a manual "veryslow" test as the other test.
Fixes #23535
The stalls caused by large allocations was seen by actual users, so it makes sense to backport this patch. On the other hand, the patch while not big is fairly intrusive (modifies the nomal Scan and Query path and also the later patches do some cleanup of additional code) so there is some small risk involved in the backport.
Closes scylladb/scylladb#24480
* github.com:scylladb/scylladb:
alternator: clean up by co-routinizing
alternator: avoid spamming the log when failing to write response
alternator: clean up and simplify request_return_type
alternator: avoid oversized allocation in Query/Scan
277 lines
14 KiB
C++
277 lines
14 KiB
C++
/*
|
|
* Copyright 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <seastar/core/future.hh>
|
|
#include "seastarx.hh"
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/util/noncopyable_function.hh>
|
|
|
|
#include "service/migration_manager.hh"
|
|
#include "service/client_state.hh"
|
|
#include "service_permit.hh"
|
|
#include "db/timeout_clock.hh"
|
|
|
|
#include "alternator/error.hh"
|
|
#include "stats.hh"
|
|
#include "utils/rjson.hh"
|
|
#include "utils/updateable_value.hh"
|
|
|
|
#include "tracing/trace_state.hh"
|
|
|
|
namespace db {
|
|
class system_distributed_keyspace;
|
|
}
|
|
|
|
namespace query {
|
|
class partition_slice;
|
|
class result;
|
|
}
|
|
|
|
namespace cql3::selection {
|
|
class selection;
|
|
}
|
|
|
|
namespace service {
|
|
class storage_proxy;
|
|
}
|
|
|
|
namespace cdc {
|
|
class metadata;
|
|
}
|
|
|
|
namespace gms {
|
|
|
|
class gossiper;
|
|
|
|
}
|
|
|
|
class schema_builder;
|
|
|
|
namespace alternator {
|
|
|
|
class rmw_operation;
|
|
|
|
namespace parsed {
|
|
class path;
|
|
};
|
|
|
|
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
|
bool is_alternator_keyspace(const sstring& ks_name);
|
|
// Wraps the db::get_tags_of_table and throws if the table is missing the tags extension.
|
|
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema);
|
|
|
|
// An attribute_path_map object is used to hold data for various attributes
|
|
// paths (parsed::path) in a hierarchy of attribute paths. Each attribute path
|
|
// has a root attribute, and then modified by member and index operators -
|
|
// for example in "a.b[2].c" we have "a" as the root, then ".b" member, then
|
|
// "[2]" index, and finally ".c" member.
|
|
// Data can be added to an attribute_path_map using the add() function, but
|
|
// requires that attributes with data not be *overlapping* or *conflicting*:
|
|
//
|
|
// 1. Two attribute paths which are identical or an ancestor of one another
|
|
// are considered *overlapping* and not allowed. If a.b.c has data,
|
|
// we can't add more data in a.b.c or any of its descendants like a.b.c.d.
|
|
//
|
|
// 2. Two attribute paths which need the same parent to have both a member and
|
|
// an index are considered *conflicting* and not allowed. E.g., if a.b has
|
|
// data, you can't add a[1]. The meaning of adding both would be that the
|
|
// attribute a is both a map and an array, which isn't sensible.
|
|
//
|
|
// These two requirements are common to the two places where Alternator uses
|
|
// this abstraction to describe how a hierarchical item is to be transformed:
|
|
//
|
|
// 1. In ProjectExpression: for filtering from a full top-level attribute
|
|
// only the parts for which user asked in ProjectionExpression.
|
|
//
|
|
// 2. In UpdateExpression: for taking the previous value of a top-level
|
|
// attribute, and modifying it based on the instructions in the user
|
|
// wrote in UpdateExpression.
|
|
|
|
template<typename T>
|
|
class attribute_path_map_node {
|
|
public:
|
|
using data_t = T;
|
|
// We need the extra unique_ptr<> here because libstdc++ unordered_map
|
|
// doesn't work with incomplete types :-(
|
|
using members_t = std::unordered_map<std::string, std::unique_ptr<attribute_path_map_node<T>>>;
|
|
// The indexes list is sorted because DynamoDB requires handling writes
|
|
// beyond the end of a list in index order.
|
|
using indexes_t = std::map<unsigned, std::unique_ptr<attribute_path_map_node<T>>>;
|
|
// The prohibition on "overlap" and "conflict" explained above means
|
|
// That only one of data, members or indexes is non-empty.
|
|
std::optional<std::variant<data_t, members_t, indexes_t>> _content;
|
|
|
|
bool is_empty() const { return !_content; }
|
|
bool has_value() const { return _content && std::holds_alternative<data_t>(*_content); }
|
|
bool has_members() const { return _content && std::holds_alternative<members_t>(*_content); }
|
|
bool has_indexes() const { return _content && std::holds_alternative<indexes_t>(*_content); }
|
|
// get_members() assumes that has_members() is true
|
|
members_t& get_members() { return std::get<members_t>(*_content); }
|
|
const members_t& get_members() const { return std::get<members_t>(*_content); }
|
|
indexes_t& get_indexes() { return std::get<indexes_t>(*_content); }
|
|
const indexes_t& get_indexes() const { return std::get<indexes_t>(*_content); }
|
|
T& get_value() { return std::get<T>(*_content); }
|
|
const T& get_value() const { return std::get<T>(*_content); }
|
|
};
|
|
|
|
template<typename T>
|
|
using attribute_path_map = std::unordered_map<std::string, attribute_path_map_node<T>>;
|
|
|
|
using attrs_to_get_node = attribute_path_map_node<std::monostate>;
|
|
// attrs_to_get lists which top-level attribute are needed, and possibly also
|
|
// which part of the top-level attribute is really needed (when nested
|
|
// attribute paths appeared in the query).
|
|
// Most code actually uses optional<attrs_to_get>. There, a disengaged
|
|
// optional means we should get all attributes, not specific ones.
|
|
using attrs_to_get = attribute_path_map<std::monostate>;
|
|
|
|
|
|
class executor : public peering_sharded_service<executor> {
|
|
gms::gossiper& _gossiper;
|
|
service::storage_proxy& _proxy;
|
|
service::migration_manager& _mm;
|
|
db::system_distributed_keyspace& _sdks;
|
|
cdc::metadata& _cdc_metadata;
|
|
utils::updateable_value<bool> _enforce_authorization;
|
|
// An smp_service_group to be used for limiting the concurrency when
|
|
// forwarding Alternator request between shards - if necessary for LWT.
|
|
smp_service_group _ssg;
|
|
|
|
public:
|
|
using client_state = service::client_state;
|
|
// request_return_type is the return type of the executor methods, which
|
|
// can be one of:
|
|
// 1. A string, which is the response body for the request.
|
|
// 2. A body_writer, an asynchronous function (returning future<>) that
|
|
// takes an output_stream and writes the response body into it.
|
|
// 3. An api_error, which is an error response that should be returned to
|
|
// the client.
|
|
// The body_writer is used for streaming responses, where the response body
|
|
// is written in chunks to the output_stream. This allows for efficient
|
|
// handling of large responses without needing to allocate a large buffer
|
|
// in memory.
|
|
using body_writer = noncopyable_function<future<>(output_stream<char>&&)>;
|
|
using request_return_type = std::variant<std::string, body_writer, api_error>;
|
|
stats _stats;
|
|
// The metric_groups object holds this stat object's metrics registered
|
|
// as long as the stats object is alive.
|
|
seastar::metrics::metric_groups _metrics;
|
|
static constexpr auto ATTRS_COLUMN_NAME = ":attrs";
|
|
static constexpr auto KEYSPACE_NAME_PREFIX = "alternator_";
|
|
static constexpr std::string_view INTERNAL_TABLE_PREFIX = ".scylla.alternator.";
|
|
|
|
executor(gms::gossiper& gossiper,
|
|
service::storage_proxy& proxy,
|
|
service::migration_manager& mm,
|
|
db::system_distributed_keyspace& sdks,
|
|
cdc::metadata& cdc_metadata,
|
|
smp_service_group ssg,
|
|
utils::updateable_value<uint32_t> default_timeout_in_ms);
|
|
|
|
future<request_return_type> create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> update_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> put_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> get_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> update_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> list_tables(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> scan(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> describe_endpoints(client_state& client_state, service_permit permit, rjson::value request, std::string host_header);
|
|
future<request_return_type> batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> query(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> tag_resource(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> untag_resource(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> list_tags_of_resource(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> update_time_to_live(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> list_streams(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> describe_stream(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> get_shard_iterator(client_state& client_state, service_permit permit, rjson::value request);
|
|
future<request_return_type> get_records(client_state& client_state, tracing::trace_state_ptr, service_permit permit, rjson::value request);
|
|
future<request_return_type> describe_continuous_backups(client_state& client_state, service_permit permit, rjson::value request);
|
|
|
|
future<> start();
|
|
future<> stop() {
|
|
// disconnect from the value source, but keep the value unchanged.
|
|
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
static sstring table_name(const schema&);
|
|
static db::timeout_clock::time_point default_timeout();
|
|
private:
|
|
static thread_local utils::updateable_value<uint32_t> s_default_timeout_in_ms;
|
|
public:
|
|
static schema_ptr find_table(service::storage_proxy&, const rjson::value& request);
|
|
|
|
private:
|
|
friend class rmw_operation;
|
|
|
|
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr);
|
|
|
|
public:
|
|
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&);
|
|
|
|
static std::optional<rjson::value> describe_single_item(schema_ptr,
|
|
const query::partition_slice&,
|
|
const cql3::selection::selection&,
|
|
const query::result&,
|
|
const std::optional<attrs_to_get>&,
|
|
uint64_t* = nullptr);
|
|
|
|
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
|
|
const query::partition_slice&& slice,
|
|
shared_ptr<cql3::selection::selection> selection,
|
|
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
|
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
|
|
uint64_t& rcu_half_units);
|
|
|
|
static void describe_single_item(const cql3::selection::selection&,
|
|
const std::vector<managed_bytes_opt>&,
|
|
const std::optional<attrs_to_get>&,
|
|
rjson::value&,
|
|
uint64_t* item_length_in_bytes = nullptr,
|
|
bool = false);
|
|
|
|
static bool add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp);
|
|
static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp);
|
|
static void supplement_table_stream_info(rjson::value& descr, const schema& schema, const service::storage_proxy& sp);
|
|
};
|
|
|
|
// is_big() checks approximately if the given JSON value is "bigger" than
|
|
// the given big_size number of bytes. The goal is to *quickly* detect
|
|
// oversized JSON that, for example, is too large to be serialized to a
|
|
// contiguous string - we don't need an accurate size for that. Moreover,
|
|
// as soon as we detect that the JSON is indeed "big", we can return true
|
|
// and don't need to continue calculating its exact size.
|
|
// For simplicity, we use a recursive implementation. This is fine because
|
|
// Alternator limits the depth of JSONs it reads from inputs, and doesn't
|
|
// add more than a couple of levels in its own output construction.
|
|
bool is_big(const rjson::value& val, int big_size = 100'000);
|
|
|
|
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
|
|
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
|
// appropriate user-readable api_error::access_denied is thrown.
|
|
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
|
|
|
|
/**
|
|
* Make return type for serializing the object "streamed",
|
|
* i.e. direct to HTTP output stream. Note: only useful for
|
|
* (very) large objects as there are overhead issues with this
|
|
* as well, but for massive lists of return objects this can
|
|
* help avoid large allocations/many re-allocs
|
|
*/
|
|
executor::body_writer make_streamed(rjson::value&&);
|
|
|
|
}
|