Compare commits
25 Commits
scylla-5.2
...
next-5.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ccd51844c | ||
|
|
705ec24977 | ||
|
|
e89eb41e70 | ||
|
|
45814c7f14 | ||
|
|
331e0c4ca7 | ||
|
|
32be38dae5 | ||
|
|
3dacf6a4b1 | ||
|
|
3d360c7caf | ||
|
|
f7a3091734 | ||
|
|
6f0d32a42f | ||
|
|
d947f1e275 | ||
|
|
d727382cc1 | ||
|
|
15a090f711 | ||
|
|
d4c523e9ef | ||
|
|
b505ce4897 | ||
|
|
80861e3bce | ||
|
|
9004c9ee38 | ||
|
|
72494af137 | ||
|
|
c15e72695d | ||
|
|
83dd78fb9d | ||
|
|
1bd6584478 | ||
|
|
19f3e42583 | ||
|
|
bb9ceae2c3 | ||
|
|
fa154a8d00 | ||
|
|
a9101f14f6 |
@@ -117,6 +117,8 @@ struct date_type_impl final : public concrete_type<db_clock::time_point> {
|
||||
|
||||
using timestamp_date_base_class = concrete_type<db_clock::time_point>;
|
||||
|
||||
sstring timestamp_to_json_string(const timestamp_date_base_class& t, const bytes_view& bv);
|
||||
|
||||
struct timeuuid_type_impl final : public concrete_type<utils::UUID> {
|
||||
timeuuid_type_impl();
|
||||
static utils::UUID from_sstring(sstring_view s);
|
||||
|
||||
@@ -485,7 +485,7 @@ struct to_json_string_visitor {
|
||||
sstring operator()(const string_type_impl& t) { return quote_json_string(t.to_string(bv)); }
|
||||
sstring operator()(const bytes_type_impl& t) { return quote_json_string("0x" + t.to_string(bv)); }
|
||||
sstring operator()(const boolean_type_impl& t) { return t.to_string(bv); }
|
||||
sstring operator()(const timestamp_date_base_class& t) { return quote_json_string(t.to_string(bv)); }
|
||||
sstring operator()(const timestamp_date_base_class& t) { return quote_json_string(timestamp_to_json_string(t, bv)); }
|
||||
sstring operator()(const timeuuid_type_impl& t) { return quote_json_string(t.to_string(bv)); }
|
||||
sstring operator()(const map_type_impl& t) { return to_json_string_aux(t, bv); }
|
||||
sstring operator()(const set_type_impl& t) { return to_json_string_aux(t, bv); }
|
||||
|
||||
@@ -819,7 +819,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit")
|
||||
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default) "
|
||||
"bytes written to data file. Value must be between 0 and 1.")
|
||||
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .1, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
|
||||
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .2, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
|
||||
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable")
|
||||
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
|
||||
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting")
|
||||
|
||||
@@ -31,7 +31,7 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/index.html>`_ for suspending ScyllaDB Manager (only available for ScyllaDB Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Montioring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
@@ -180,4 +180,4 @@ Start the node
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check the upgrade instructions above for validation. Once you are sure the node rollback is successful, move to the next node in the cluster.
|
||||
Check the upgrade instructions above for validation. Once you are sure the node rollback is successful, move to the next node in the cluster.
|
||||
|
||||
@@ -34,7 +34,7 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/index.html>`_ for suspending Scylla Manager (only available Scylla Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `Scylla Montioring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note:: Before upgrading, make sure to use the latest `Scylla Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
|
||||
Upgrade steps
|
||||
=============
|
||||
|
||||
@@ -32,7 +32,7 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending ScyllaDB Manager (only available for ScyllaDB Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Montioring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
@@ -167,54 +167,27 @@ Download and install the new release
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``. You should use the same version as this version in case you want to |ROLLBACK|_ the upgrade. If you are not running a |SRC_VERSION|.x version, stop right here! This guide only covers |SRC_VERSION|.x to |NEW_VERSION|.y upgrades.
|
||||
Before upgrading, check what version you are running now using ``scylla --version``. You should use the same version as this version in case you want to |ROLLBACK|_ the upgrade. If you are not running a |SRC_VERSION|.x version, stop right here! This guide only covers |SRC_VERSION|.x to |NEW_VERSION|.y upgrades.
|
||||
|
||||
There are two alternative upgrade procedures: upgrading ScyllaDB and simultaneously updating 3rd party and OS packages - recommended if you
|
||||
are running a ScyllaDB official image (EC2 AMI, GCP, and Azure images), which is based on Ubuntu 20.04, and upgrading ScyllaDB without updating
|
||||
any external packages.
|
||||
If you’re using the ScyllaDB official image (recommended), see
|
||||
the **Debian/Ubuntu** tab for upgrade instructions. If you’re using your
|
||||
own image and have installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to apply an extended upgrade procedure:
|
||||
|
||||
#. Update the ScyllaDB deb repo (see above).
|
||||
#. Configure Java 1.8 (see above).
|
||||
#. Install the new ScyllaDB version with the additional
|
||||
``scylla-enterprise-machine-image`` package:
|
||||
|
||||
**To upgrade ScyllaDB and update 3rd party and OS packages (RECOMMENDED):**
|
||||
|
||||
Choosing this upgrade procedure allows you to upgrade your ScyllaDB version and update the 3rd party and OS packages using one command.
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION|.
|
||||
|
||||
#. Load the new repo:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo apt-get update
|
||||
|
||||
#. Run the following command to update the manifest file:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
cat scylla-enterprise-packages-<version>-<arch>.txt | sudo xargs -n1 apt-get install -y
|
||||
|
||||
Where:
|
||||
|
||||
* ``<version>`` - The ScyllaDB Enterprise version to which you are upgrading ( |NEW_VERSION| ).
|
||||
* ``<arch>`` - Architecture type: ``x86_64`` or ``aarch64``.
|
||||
|
||||
The file is included in the ScyllaDB Enterprise packages downloaded in the previous step. The file location is ``http://downloads.scylladb.com/downloads/scylla/aws/manifest/scylla-packages-<version>-<arch>.txt``
|
||||
|
||||
Example:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
cat scylla-enterprise-packages-2022.2.0-x86_64.txt | sudo xargs -n1 apt-get install -y
|
||||
|
||||
|
||||
.. note::
|
||||
|
||||
Alternatively, you can update the manifest file with the following command:
|
||||
|
||||
``sudo apt-get install $(awk '{print $1'} scylla-enterprise-packages-<version>-<arch>.txt) -y``
|
||||
|
||||
|
||||
|
||||
To upgrade ScyllaDB without updating any external packages, follow the :ref:`download and installation instructions for Debian/Ubuntu <upgrade-debian-ubuntu-5.2-to-enterprise-2023.1>`.
|
||||
.. code::
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla-enterprise
|
||||
sudo apt-get dist-upgrade scylla-enterprise-machine-image
|
||||
|
||||
#. Run ``scylla_setup`` without running ``io_setup``.
|
||||
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
@@ -169,11 +169,11 @@ class mutation_fragment_stream_validating_filter {
|
||||
sstring _name_storage;
|
||||
std::string_view _name_view; // always valid
|
||||
mutation_fragment_stream_validation_level _validation_level;
|
||||
bool _raise_errors;
|
||||
|
||||
private:
|
||||
sstring full_name() const;
|
||||
|
||||
mutation_fragment_stream_validating_filter(const char* name_literal, sstring name_value, const schema& s, mutation_fragment_stream_validation_level level);
|
||||
mutation_fragment_stream_validating_filter(const char* name_literal, sstring name_value, const schema& s,
|
||||
mutation_fragment_stream_validation_level level, bool raise_errors);
|
||||
|
||||
public:
|
||||
/// Constructor.
|
||||
@@ -181,12 +181,18 @@ public:
|
||||
/// \arg name is used in log messages to identify the validator, the
|
||||
/// schema identity is added automatically
|
||||
/// \arg compare_keys enable validating clustering key monotonicity
|
||||
mutation_fragment_stream_validating_filter(sstring name, const schema& s, mutation_fragment_stream_validation_level level);
|
||||
mutation_fragment_stream_validating_filter(const char* name, const schema& s, mutation_fragment_stream_validation_level level);
|
||||
mutation_fragment_stream_validating_filter(sstring name, const schema& s, mutation_fragment_stream_validation_level level, bool raise_errors = true);
|
||||
mutation_fragment_stream_validating_filter(const char* name, const schema& s, mutation_fragment_stream_validation_level level, bool raise_errors = true);
|
||||
|
||||
mutation_fragment_stream_validating_filter(mutation_fragment_stream_validating_filter&&) = delete;
|
||||
mutation_fragment_stream_validating_filter(const mutation_fragment_stream_validating_filter&) = delete;
|
||||
|
||||
sstring full_name() const;
|
||||
|
||||
bool raise_errors() const { return _raise_errors; }
|
||||
|
||||
const mutation_fragment_stream_validator& validator() const { return _validator; }
|
||||
|
||||
bool operator()(const dht::decorated_key& dk);
|
||||
bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
|
||||
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
|
||||
@@ -197,5 +203,5 @@ public:
|
||||
void reset(const mutation_fragment_v2& mf);
|
||||
/// Equivalent to `operator()(partition_end{})`
|
||||
bool on_end_of_partition();
|
||||
void on_end_of_stream();
|
||||
bool on_end_of_stream();
|
||||
};
|
||||
|
||||
@@ -191,7 +191,11 @@ void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) {
|
||||
|
||||
namespace {
|
||||
|
||||
[[noreturn]] void on_validation_error(seastar::logger& l, const seastar::sstring& reason) {
|
||||
bool on_validation_error(seastar::logger& l, const mutation_fragment_stream_validating_filter& zis, const seastar::sstring& reason) {
|
||||
if (!zis.raise_errors()) {
|
||||
l.error("{}", reason);
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
on_internal_error(l, reason);
|
||||
} catch (std::runtime_error& e) {
|
||||
@@ -209,13 +213,13 @@ bool mutation_fragment_stream_validating_filter::operator()(const dht::decorated
|
||||
if (_validator(dk.token())) {
|
||||
return true;
|
||||
}
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected token: previous {}, current {}",
|
||||
return on_validation_error(mrlog, *this, format("[validator {} for {}] Unexpected token: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_token(), dk.token()));
|
||||
} else {
|
||||
if (_validator(dk)) {
|
||||
return true;
|
||||
}
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected partition key: previous {}, current {}",
|
||||
return on_validation_error(mrlog, *this, format("[validator {} for {}] Unexpected partition key: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_partition_key(), dk));
|
||||
}
|
||||
}
|
||||
@@ -226,10 +230,11 @@ sstring mutation_fragment_stream_validating_filter::full_name() const {
|
||||
}
|
||||
|
||||
mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_filter(const char* name_literal, sstring name_value, const schema& s,
|
||||
mutation_fragment_stream_validation_level level)
|
||||
mutation_fragment_stream_validation_level level, bool raise_errors)
|
||||
: _validator(s)
|
||||
, _name_storage(std::move(name_value))
|
||||
, _validation_level(level)
|
||||
, _raise_errors(raise_errors)
|
||||
{
|
||||
if (name_literal) {
|
||||
_name_view = name_literal;
|
||||
@@ -260,13 +265,13 @@ mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_
|
||||
}
|
||||
|
||||
mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_filter(sstring name, const schema& s,
|
||||
mutation_fragment_stream_validation_level level)
|
||||
: mutation_fragment_stream_validating_filter(nullptr, std::move(name), s, level)
|
||||
mutation_fragment_stream_validation_level level, bool raise_errors)
|
||||
: mutation_fragment_stream_validating_filter(nullptr, std::move(name), s, level, raise_errors)
|
||||
{ }
|
||||
|
||||
mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_filter(const char* name, const schema& s,
|
||||
mutation_fragment_stream_validation_level level)
|
||||
: mutation_fragment_stream_validating_filter(name, {}, s, level)
|
||||
mutation_fragment_stream_validation_level level, bool raise_errors)
|
||||
: mutation_fragment_stream_validating_filter(name, {}, s, level, raise_errors)
|
||||
{ }
|
||||
|
||||
bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
|
||||
@@ -279,7 +284,9 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
|
||||
|
||||
mrlog.debug("[validator {}] {}:{} new_current_tombstone: {}", static_cast<void*>(this), kind, pos, new_current_tombstone);
|
||||
|
||||
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
|
||||
if (_validation_level == mutation_fragment_stream_validation_level::none) {
|
||||
return true;
|
||||
} else if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
|
||||
valid = _validator(kind, pos, new_current_tombstone);
|
||||
} else {
|
||||
valid = _validator(kind, new_current_tombstone);
|
||||
@@ -287,18 +294,19 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
|
||||
|
||||
if (__builtin_expect(!valid, false)) {
|
||||
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}:{}, current {}:{}",
|
||||
on_validation_error(mrlog, *this, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}:{}, current {}:{}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_partition_key(), _validator.previous_mutation_fragment_kind(), _validator.previous_position(), kind, pos));
|
||||
} else if (_validation_level >= mutation_fragment_stream_validation_level::partition_key) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}, current {}",
|
||||
on_validation_error(mrlog, *this, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_partition_key(), _validator.previous_mutation_fragment_kind(), kind));
|
||||
} else if (kind == mutation_fragment_v2::kind::partition_end && _validator.current_tombstone()) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Partition ended with active tombstone: {}",
|
||||
on_validation_error(mrlog, *this, format("[validator {} for {}] Partition ended with active tombstone: {}",
|
||||
static_cast<void*>(this), full_name(), _validator.current_tombstone()));
|
||||
} else {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: previous {}, current {}",
|
||||
on_validation_error(mrlog, *this, format("[validator {} for {}] Unexpected mutation fragment: previous {}, current {}",
|
||||
static_cast<void*>(this), full_name(), _validator.previous_mutation_fragment_kind(), kind));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -340,15 +348,16 @@ bool mutation_fragment_stream_validating_filter::on_end_of_partition() {
|
||||
return (*this)(mutation_fragment::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t()));
|
||||
}
|
||||
|
||||
void mutation_fragment_stream_validating_filter::on_end_of_stream() {
|
||||
bool mutation_fragment_stream_validating_filter::on_end_of_stream() {
|
||||
if (_validation_level < mutation_fragment_stream_validation_level::partition_region) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
mrlog.debug("[validator {}] EOS", static_cast<const void*>(this));
|
||||
if (!_validator.on_end_of_stream()) {
|
||||
on_validation_error(mrlog, format("[validator {} for {}] Stream ended with unclosed partition: {}", static_cast<const void*>(this), full_name(),
|
||||
return on_validation_error(mrlog, *this, format("[validator {} for {}] Stream ended with unclosed partition: {}", static_cast<const void*>(this), full_name(),
|
||||
_validator.previous_mutation_fragment_kind()));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static size_t compute_buffer_size(const schema& s, const flat_mutation_reader_v2::tracked_buffer& buffer)
|
||||
|
||||
@@ -4118,7 +4118,7 @@ def find_sstables():
|
||||
system_sstables_manager = std_unique_ptr(db["_system_sstables_manager"]).get()
|
||||
for manager in (user_sstables_manager, system_sstables_manager):
|
||||
for sst_list_name in ("_active", "_undergoing_close"):
|
||||
for sst in intrusive_list(manager[sst_list_name], link="_manager_link"):
|
||||
for sst in intrusive_list(manager[sst_list_name], link="_manager_list_link"):
|
||||
yield sst.address
|
||||
except gdb.error:
|
||||
# Scylla Enterprise 2020.1 compatibility
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
|
||||
#include "utils/error_injection.hh"
|
||||
#include "dht/sharder.hh"
|
||||
#include "types.hh"
|
||||
#include "writer.hh"
|
||||
@@ -1510,7 +1511,7 @@ size_t sstable::total_reclaimable_memory_size() const {
|
||||
}
|
||||
|
||||
size_t sstable::reclaim_memory_from_components() {
|
||||
size_t total_memory_reclaimed = 0;
|
||||
size_t memory_reclaimed_this_iteration = 0;
|
||||
|
||||
if (_components->filter) {
|
||||
auto filter_memory_size = _components->filter->memory_size();
|
||||
@@ -1518,12 +1519,31 @@ size_t sstable::reclaim_memory_from_components() {
|
||||
// Discard it from memory by replacing it with an always present variant.
|
||||
// No need to remove it from _recognized_components as the filter is still in disk.
|
||||
_components->filter = std::make_unique<utils::filter::always_present_filter>();
|
||||
total_memory_reclaimed += filter_memory_size;
|
||||
memory_reclaimed_this_iteration += filter_memory_size;
|
||||
}
|
||||
}
|
||||
|
||||
_total_reclaimable_memory.reset();
|
||||
return total_memory_reclaimed;
|
||||
_total_memory_reclaimed += memory_reclaimed_this_iteration;
|
||||
return memory_reclaimed_this_iteration;
|
||||
}
|
||||
|
||||
size_t sstable::total_memory_reclaimed() const {
|
||||
return _total_memory_reclaimed;
|
||||
}
|
||||
|
||||
future<> sstable::reload_reclaimed_components(const io_priority_class& pc) {
|
||||
if (_total_memory_reclaimed == 0) {
|
||||
// nothing to reload
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", std::chrono::seconds{3});
|
||||
|
||||
co_await read_filter(pc);
|
||||
_total_reclaimable_memory.reset();
|
||||
_total_memory_reclaimed -= _components->filter->memory_size();
|
||||
sstlog.info("Reloaded bloom filter of {}", get_filename());
|
||||
}
|
||||
|
||||
// This interface is only used during tests, snapshot loading and early initialization.
|
||||
|
||||
@@ -146,7 +146,8 @@ class sstable : public enable_lw_shared_from_this<sstable> {
|
||||
public:
|
||||
using version_types = sstable_version_types;
|
||||
using format_types = sstable_format_types;
|
||||
using manager_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
using manager_list_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
using manager_set_link_type = bi::set_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
public:
|
||||
sstable(schema_ptr schema,
|
||||
sstring dir,
|
||||
@@ -576,7 +577,11 @@ private:
|
||||
sstables_manager& _manager;
|
||||
|
||||
sstables_stats _stats;
|
||||
manager_link_type _manager_link;
|
||||
// link used by the _active list of sstables manager
|
||||
manager_list_link_type _manager_list_link;
|
||||
// link used by the _reclaimed set of sstables manager
|
||||
manager_set_link_type _manager_set_link;
|
||||
|
||||
|
||||
// The _large_data_stats map stores e.g. largest partitions, rows, cells sizes,
|
||||
// and max number of rows in a partition.
|
||||
@@ -590,6 +595,8 @@ private:
|
||||
// It is initialized to 0 to prevent the sstables manager from reclaiming memory
|
||||
// from the components before the SSTable has been fully loaded.
|
||||
mutable std::optional<size_t> _total_reclaimable_memory{0};
|
||||
// Total memory reclaimed so far from this sstable
|
||||
size_t _total_memory_reclaimed{0};
|
||||
public:
|
||||
const bool has_component(component_type f) const;
|
||||
sstables_manager& manager() { return _manager; }
|
||||
@@ -668,11 +675,15 @@ private:
|
||||
|
||||
future<> create_data() noexcept;
|
||||
|
||||
// Note that only bloom filters are reclaimable by the following methods.
|
||||
// Return the total reclaimable memory in this SSTable
|
||||
size_t total_reclaimable_memory_size() const;
|
||||
// Reclaim memory from the components back to the system.
|
||||
// Note that only bloom filters are reclaimable.
|
||||
size_t reclaim_memory_from_components();
|
||||
// Return memory reclaimed so far from this sstable
|
||||
size_t total_memory_reclaimed() const;
|
||||
// Reload components from which memory was previously reclaimed
|
||||
future<> reload_reclaimed_components(const io_priority_class& pc);
|
||||
|
||||
public:
|
||||
// Finds first position_in_partition in a given partition.
|
||||
@@ -917,6 +928,13 @@ public:
|
||||
// Drops all evictable in-memory caches of on-disk content.
|
||||
future<> drop_caches();
|
||||
|
||||
struct lesser_reclaimed_memory {
|
||||
// comparator class to be used by the _reclaimed set in sstables manager
|
||||
bool operator()(const sstable& sst1, const sstable& sst2) const {
|
||||
return sst1.total_memory_reclaimed() < sst2.total_memory_reclaimed();
|
||||
}
|
||||
};
|
||||
|
||||
// Allow the test cases from sstable_test.cc to test private methods. We use
|
||||
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
||||
// will then re-export as public every method it needs.
|
||||
|
||||
@@ -28,6 +28,7 @@ sstables_manager::sstables_manager(
|
||||
std::numeric_limits<size_t>::max())
|
||||
, _dir_semaphore(dir_sem)
|
||||
{
|
||||
_components_reloader_status = components_reloader_fiber();
|
||||
}
|
||||
|
||||
sstables_manager::~sstables_manager() {
|
||||
@@ -87,9 +88,59 @@ void sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(ssta
|
||||
auto memory_reclaimed = sst_with_max_memory->reclaim_memory_from_components();
|
||||
_total_memory_reclaimed += memory_reclaimed;
|
||||
_total_reclaimable_memory -= memory_reclaimed;
|
||||
_reclaimed.insert(*sst_with_max_memory);
|
||||
smlogger.info("Reclaimed {} bytes of memory from SSTable components. Total memory reclaimed so far is {} bytes", memory_reclaimed, _total_memory_reclaimed);
|
||||
}
|
||||
|
||||
size_t sstables_manager::get_memory_available_for_reclaimable_components() {
|
||||
size_t memory_reclaim_threshold = _available_memory * _db_config.components_memory_reclaim_threshold();
|
||||
return memory_reclaim_threshold - _total_reclaimable_memory;
|
||||
}
|
||||
|
||||
future<> sstables_manager::components_reloader_fiber() {
|
||||
sstlog.trace("components_reloader_fiber start");
|
||||
while (true) {
|
||||
co_await _sstable_deleted_event.when();
|
||||
|
||||
if (_closing) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
// Reload bloom filters from the smallest to largest so as to maximize
|
||||
// the number of bloom filters being reloaded.
|
||||
auto memory_available = get_memory_available_for_reclaimable_components();
|
||||
while (!_reclaimed.empty() && memory_available > 0) {
|
||||
auto sstable_to_reload = _reclaimed.begin();
|
||||
const size_t reclaimed_memory = sstable_to_reload->total_memory_reclaimed();
|
||||
if (reclaimed_memory > memory_available) {
|
||||
// cannot reload anymore sstables
|
||||
break;
|
||||
}
|
||||
|
||||
// Increment the total memory before reloading to prevent any parallel
|
||||
// fibers from loading new bloom filters into memory.
|
||||
_total_reclaimable_memory += reclaimed_memory;
|
||||
_reclaimed.erase(sstable_to_reload);
|
||||
// Use a lw_shared_ptr to prevent the sstable from getting deleted when
|
||||
// the components are being reloaded.
|
||||
auto sstable_ptr = sstable_to_reload->shared_from_this();
|
||||
try {
|
||||
co_await sstable_ptr->reload_reclaimed_components(default_priority_class());
|
||||
} catch (...) {
|
||||
// reload failed due to some reason
|
||||
sstlog.warn("Failed to reload reclaimed SSTable components : {}", std::current_exception());
|
||||
// revert back changes made before the reload
|
||||
_total_reclaimable_memory -= reclaimed_memory;
|
||||
_reclaimed.insert(*sstable_to_reload);
|
||||
break;
|
||||
}
|
||||
|
||||
_total_memory_reclaimed -= reclaimed_memory;
|
||||
memory_available = get_memory_available_for_reclaimable_components();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void sstables_manager::add(sstable* sst) {
|
||||
_active.push_back(*sst);
|
||||
}
|
||||
@@ -97,6 +148,8 @@ void sstables_manager::add(sstable* sst) {
|
||||
void sstables_manager::deactivate(sstable* sst) {
|
||||
// Remove SSTable from the reclaimable memory tracking
|
||||
_total_reclaimable_memory -= sst->total_reclaimable_memory_size();
|
||||
_total_memory_reclaimed -= sst->total_memory_reclaimed();
|
||||
_reclaimed.erase(*sst);
|
||||
|
||||
// At this point, sst has a reference count of zero, since we got here from
|
||||
// lw_shared_ptr_deleter<sstables::sstable>::dispose().
|
||||
@@ -113,6 +166,7 @@ void sstables_manager::deactivate(sstable* sst) {
|
||||
void sstables_manager::remove(sstable* sst) {
|
||||
_undergoing_close.erase(_undergoing_close.iterator_to(*sst));
|
||||
delete sst;
|
||||
_sstable_deleted_event.signal();
|
||||
maybe_done();
|
||||
}
|
||||
|
||||
@@ -127,6 +181,9 @@ future<> sstables_manager::close() {
|
||||
maybe_done();
|
||||
co_await _done.get_future();
|
||||
co_await _sstable_metadata_concurrency_sem.stop();
|
||||
// stop the components reload fiber
|
||||
_sstable_deleted_event.signal();
|
||||
co_await std::move(_components_reloader_status);
|
||||
}
|
||||
|
||||
sstable_directory::components_lister sstables_manager::get_components_lister(std::filesystem::path dir) {
|
||||
|
||||
@@ -45,8 +45,12 @@ static constexpr size_t default_sstable_buffer_size = 128 * 1024;
|
||||
|
||||
class sstables_manager {
|
||||
using list_type = boost::intrusive::list<sstable,
|
||||
boost::intrusive::member_hook<sstable, sstable::manager_link_type, &sstable::_manager_link>,
|
||||
boost::intrusive::member_hook<sstable, sstable::manager_list_link_type, &sstable::_manager_list_link>,
|
||||
boost::intrusive::constant_time_size<false>>;
|
||||
using set_type = boost::intrusive::set<sstable,
|
||||
boost::intrusive::member_hook<sstable, sstable::manager_set_link_type, &sstable::_manager_set_link>,
|
||||
boost::intrusive::constant_time_size<false>,
|
||||
boost::intrusive::compare<sstable::lesser_reclaimed_memory>>;
|
||||
private:
|
||||
size_t _available_memory;
|
||||
db::large_data_handler& _large_data_handler;
|
||||
@@ -70,6 +74,11 @@ private:
|
||||
size_t _total_reclaimable_memory{0};
|
||||
// Total memory reclaimed so far across all sstables
|
||||
size_t _total_memory_reclaimed{0};
|
||||
// Set of sstables from which memory has been reclaimed
|
||||
set_type _reclaimed;
|
||||
// Condition variable that gets notified when an sstable is deleted
|
||||
seastar::condition_variable _sstable_deleted_event;
|
||||
future<> _components_reloader_status = make_ready_future<>();
|
||||
|
||||
bool _closing = false;
|
||||
promise<> _done;
|
||||
@@ -131,6 +140,9 @@ private:
|
||||
// memory and if the total memory usage exceeds the pre-defined threshold,
|
||||
// reclaim it from the SSTable that has the most reclaimable memory.
|
||||
void increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst);
|
||||
// Fiber to reload reclaimed components back into memory when memory becomes available.
|
||||
future<> components_reloader_fiber();
|
||||
size_t get_memory_available_for_reclaimable_components();
|
||||
private:
|
||||
db::large_data_handler& get_large_data_handler() const {
|
||||
return _large_data_handler;
|
||||
|
||||
@@ -494,29 +494,29 @@ SEASTAR_TEST_CASE(test_time_casts_in_selection_clause) {
|
||||
}
|
||||
{
|
||||
auto msg = e.execute_cql("SELECT CAST(CAST(a AS timestamp) AS text), CAST(CAST(a AS date) AS text), CAST(CAST(b as date) AS text), CAST(CAST(c AS timestamp) AS text) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{utf8_type->from_string("2009-12-17T00:26:29.805000")},
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{utf8_type->from_string("2009-12-17T00:26:29.805Z")},
|
||||
{utf8_type->from_string("2009-12-17")},
|
||||
{utf8_type->from_string("2015-05-21")},
|
||||
{utf8_type->from_string("2015-05-21T00:00:00")}});
|
||||
{utf8_type->from_string("2015-05-21T00:00:00.000Z")}});
|
||||
}
|
||||
{
|
||||
auto msg = e.execute_cql("SELECT CAST(a AS text), CAST(b as text), CAST(c AS text), CAST(d AS text) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{utf8_type->from_string("d2177dd0-eaa2-11de-a572-001b779c76e3")},
|
||||
{utf8_type->from_string("2015-05-21T11:03:02")},
|
||||
{utf8_type->from_string("2015-05-21T11:03:02.000Z")},
|
||||
{utf8_type->from_string("2015-05-21")},
|
||||
{utf8_type->from_string("11:03:02.000000000")}});
|
||||
}
|
||||
{
|
||||
auto msg = e.execute_cql("SELECT CAST(CAST(a AS timestamp) AS ascii), CAST(CAST(a AS date) AS ascii), CAST(CAST(b as date) AS ascii), CAST(CAST(c AS timestamp) AS ascii) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{ascii_type->from_string("2009-12-17T00:26:29.805000")},
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{ascii_type->from_string("2009-12-17T00:26:29.805Z")},
|
||||
{ascii_type->from_string("2009-12-17")},
|
||||
{ascii_type->from_string("2015-05-21")},
|
||||
{ascii_type->from_string("2015-05-21T00:00:00")}});
|
||||
{ascii_type->from_string("2015-05-21T00:00:00.000Z")}});
|
||||
}
|
||||
{
|
||||
auto msg = e.execute_cql("SELECT CAST(a AS ascii), CAST(b as ascii), CAST(c AS ascii), CAST(d AS ascii) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{ascii_type->from_string("d2177dd0-eaa2-11de-a572-001b779c76e3")},
|
||||
{ascii_type->from_string("2015-05-21T11:03:02")},
|
||||
{ascii_type->from_string("2015-05-21T11:03:02.000Z")},
|
||||
{ascii_type->from_string("2015-05-21")},
|
||||
{ascii_type->from_string("11:03:02.000000000")}});
|
||||
}
|
||||
|
||||
@@ -8,6 +8,9 @@
|
||||
|
||||
#define BOOST_TEST_MODULE core
|
||||
|
||||
#include <stdexcept>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
#include <deque>
|
||||
#include <random>
|
||||
@@ -108,12 +111,15 @@ BOOST_AUTO_TEST_CASE(test_random_walk) {
|
||||
}
|
||||
|
||||
class exception_safety_checker {
|
||||
uint64_t _live_objects = 0;
|
||||
uint64_t _countdown = std::numeric_limits<uint64_t>::max();
|
||||
int64_t _live_objects = 0;
|
||||
int64_t _countdown = std::numeric_limits<int64_t>::max();
|
||||
public:
|
||||
bool ok() const {
|
||||
return !_live_objects;
|
||||
}
|
||||
int64_t live_objects() const {
|
||||
return _live_objects;
|
||||
}
|
||||
void set_countdown(unsigned x) {
|
||||
_countdown = x;
|
||||
}
|
||||
@@ -123,6 +129,9 @@ public:
|
||||
}
|
||||
++_live_objects;
|
||||
}
|
||||
void add_live_object_noexcept() noexcept {
|
||||
++_live_objects;
|
||||
}
|
||||
void del_live_object() {
|
||||
--_live_objects;
|
||||
}
|
||||
@@ -137,7 +146,9 @@ public:
|
||||
exception_safe_class(const exception_safe_class& x) : _esc(x._esc) {
|
||||
_esc.add_live_object();
|
||||
}
|
||||
exception_safe_class(exception_safe_class&&) = default;
|
||||
exception_safe_class(exception_safe_class&& x) noexcept : _esc(x._esc) {
|
||||
_esc.add_live_object_noexcept();
|
||||
}
|
||||
~exception_safe_class() {
|
||||
_esc.del_live_object();
|
||||
}
|
||||
@@ -242,3 +253,115 @@ BOOST_AUTO_TEST_CASE(test_amoritzed_reserve) {
|
||||
amortized_reserve(v, 1);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(tests_insertion_exception_safety) {
|
||||
constexpr size_t chunk_size = 512;
|
||||
using chunked_vector = utils::chunked_vector<exception_safe_class, chunk_size>;
|
||||
size_t max_chunk_capacity = chunked_vector::max_chunk_capacity();
|
||||
|
||||
// FIXME: convert to seastar test infstrature and use test::random
|
||||
// for reproducibility
|
||||
std::random_device r;
|
||||
auto seed = r();
|
||||
BOOST_TEST_MESSAGE(fmt::format("random-seed={}", seed));
|
||||
auto rand = std::default_random_engine(seed);
|
||||
auto size_dist = std::uniform_int_distribution<size_t>(1, 4 * max_chunk_capacity);
|
||||
|
||||
auto checker = exception_safety_checker();
|
||||
auto count = size_dist(rand);
|
||||
BOOST_TEST_MESSAGE(fmt::format("count={}", count));
|
||||
checker.set_countdown(count - 1);
|
||||
try {
|
||||
chunked_vector v;
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
v.emplace_back(checker);
|
||||
}
|
||||
BOOST_REQUIRE(false);
|
||||
} catch (...) {
|
||||
BOOST_REQUIRE_EQUAL(checker.live_objects(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(tests_insertion_exception_safety_with_reserve) {
|
||||
constexpr size_t chunk_size = 512;
|
||||
using chunked_vector = utils::chunked_vector<exception_safe_class, chunk_size>;
|
||||
size_t max_chunk_capacity = chunked_vector::max_chunk_capacity();
|
||||
|
||||
// FIXME: convert to seastar test infstrature and use test::random
|
||||
// for reproducibility
|
||||
std::random_device r;
|
||||
auto seed = r();
|
||||
BOOST_TEST_MESSAGE(fmt::format("random-seed={}", seed));
|
||||
auto rand = std::default_random_engine(seed);
|
||||
auto size_dist = std::uniform_int_distribution<size_t>(1, 4 * max_chunk_capacity);
|
||||
auto count = size_dist(rand);
|
||||
BOOST_TEST_MESSAGE(fmt::format("count={}", count));
|
||||
auto checker = exception_safety_checker();
|
||||
checker.set_countdown(count - 1);
|
||||
try {
|
||||
chunked_vector v;
|
||||
auto reserve_count = size_dist(rand);
|
||||
BOOST_TEST_MESSAGE(fmt::format("reserve_count={}", reserve_count));
|
||||
v.reserve(reserve_count);
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
v.emplace_back(checker);
|
||||
}
|
||||
BOOST_REQUIRE(false);
|
||||
} catch (...) {
|
||||
BOOST_REQUIRE_EQUAL(checker.live_objects(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Reproduces https://github.com/scylladb/scylladb/issues/18635
|
||||
BOOST_AUTO_TEST_CASE(tests_fill_constructor_exception_safety) {
|
||||
constexpr size_t chunk_size = 512;
|
||||
using chunked_vector = utils::chunked_vector<exception_safe_class, chunk_size>;
|
||||
size_t max_chunk_capacity = chunked_vector::max_chunk_capacity();
|
||||
|
||||
// FIXME: convert to seastar test infstrature and use test::random
|
||||
// for reproducibility
|
||||
std::random_device r;
|
||||
auto seed = r();
|
||||
BOOST_TEST_MESSAGE(fmt::format("random-seed={}", seed));
|
||||
auto rand = std::default_random_engine(seed);
|
||||
auto size_dist = std::uniform_int_distribution<size_t>(1, 4 * max_chunk_capacity);
|
||||
auto count = size_dist(rand);
|
||||
BOOST_TEST_MESSAGE(fmt::format("count={}", count));
|
||||
auto checker = exception_safety_checker();
|
||||
auto filler = std::optional<exception_safe_class>(checker);
|
||||
checker.set_countdown(count - 1);
|
||||
try {
|
||||
chunked_vector v(count, *filler);
|
||||
BOOST_REQUIRE(false);
|
||||
} catch (...) {
|
||||
filler.reset();
|
||||
BOOST_REQUIRE_EQUAL(checker.live_objects(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(tests_copy_constructor_exception_safety) {
|
||||
constexpr size_t chunk_size = 512;
|
||||
using chunked_vector = utils::chunked_vector<exception_safe_class, chunk_size>;
|
||||
size_t max_chunk_capacity = chunked_vector::max_chunk_capacity();
|
||||
|
||||
// FIXME: convert to seastar test infstrature and use test::random
|
||||
// for reproducibility
|
||||
std::random_device r;
|
||||
auto seed = r();
|
||||
BOOST_TEST_MESSAGE(fmt::format("random-seed={}", seed));
|
||||
auto rand = std::default_random_engine(seed);
|
||||
auto size_dist = std::uniform_int_distribution<size_t>(1, 4 * max_chunk_capacity);
|
||||
auto count = size_dist(rand);
|
||||
BOOST_TEST_MESSAGE(fmt::format("count={}", count));
|
||||
auto checker = exception_safety_checker();
|
||||
chunked_vector src(count, exception_safe_class(checker));
|
||||
|
||||
checker.set_countdown(count - 1);
|
||||
try {
|
||||
chunked_vector v(src);
|
||||
BOOST_REQUIRE(false);
|
||||
} catch (...) {
|
||||
src.clear();
|
||||
BOOST_REQUIRE_EQUAL(checker.live_objects(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ BOOST_AUTO_TEST_CASE(expr_printer_timestamp_test) {
|
||||
raw_value::make_value(timestamp_type->from_string("2011-03-02T03:05:00+0000")),
|
||||
timestamp_type
|
||||
);
|
||||
BOOST_REQUIRE_EQUAL(expr_print(timestamp_const), "'2011-03-02T03:05:00+0000'");
|
||||
BOOST_REQUIRE_EQUAL(expr_print(timestamp_const), "'2011-03-02T03:05:00.000Z'");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(expr_printer_time_test) {
|
||||
@@ -170,7 +170,7 @@ BOOST_AUTO_TEST_CASE(expr_printer_date_test) {
|
||||
raw_value::make_value(date_type->from_string("2011-02-03+0000")),
|
||||
date_type
|
||||
};
|
||||
BOOST_REQUIRE_EQUAL(expr_print(date_const), "'2011-02-03T00:00:00+0000'");
|
||||
BOOST_REQUIRE_EQUAL(expr_print(date_const), "'2011-02-03T00:00:00.000Z'");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(expr_printer_duration_test) {
|
||||
|
||||
@@ -95,7 +95,7 @@ SEASTAR_TEST_CASE(test_select_json_types) {
|
||||
"\"\\\"G\\\"\": \"127.0.0.1\", " // note the double quoting on case-sensitive column names
|
||||
"\"\\\"H\\\"\": 3, "
|
||||
"\"\\\"I\\\"\": \"zażółć gęślą jaźń\", "
|
||||
"\"j\": \"2001-10-18T14:15:55.134000\", "
|
||||
"\"j\": \"2001-10-18 14:15:55.134Z\", "
|
||||
"\"k\": \"d2177dd0-eaa2-11de-a572-001b779c76e3\", "
|
||||
"\"l\": \"d2177dd0-eaa2-11de-a572-001b779c76e3\", "
|
||||
"\"m\": \"varchar\", "
|
||||
@@ -127,7 +127,7 @@ SEASTAR_TEST_CASE(test_select_json_types) {
|
||||
utf8_type->decompose("\"127.0.0.1\""),
|
||||
utf8_type->decompose("3"),
|
||||
utf8_type->decompose("\"zażółć gęślą jaźń\""),
|
||||
utf8_type->decompose("\"2001-10-18T14:15:55.134000\""),
|
||||
utf8_type->decompose("\"2001-10-18 14:15:55.134Z\""),
|
||||
utf8_type->decompose("\"d2177dd0-eaa2-11de-a572-001b779c76e3\""),
|
||||
utf8_type->decompose("\"d2177dd0-eaa2-11de-a572-001b779c76e3\""),
|
||||
utf8_type->decompose("\"varchar\""),
|
||||
|
||||
@@ -643,3 +643,64 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage
|
||||
BOOST_REQUIRE(validator(dk0));
|
||||
BOOST_REQUIRE(!validator(dk0));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_validation_level) {
|
||||
simple_schema ss;
|
||||
|
||||
const auto dkeys = ss.make_pkeys(5);
|
||||
const auto& dk_ = dkeys[0];
|
||||
const auto& dk0 = dkeys[1];
|
||||
const auto& dk1 = dkeys[2];
|
||||
|
||||
const auto ck0 = ss.make_ckey(0);
|
||||
const auto ck1 = ss.make_ckey(1);
|
||||
const auto ck2 = ss.make_ckey(2);
|
||||
const auto ck3 = ss.make_ckey(3);
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout);
|
||||
|
||||
using vl = mutation_fragment_stream_validation_level;
|
||||
using mf_kind = mutation_fragment_v2::kind;
|
||||
|
||||
const auto ps_pos = position_in_partition_view(position_in_partition_view::partition_start_tag_t{});
|
||||
const auto sr_pos = position_in_partition_view(position_in_partition_view::static_row_tag_t{});
|
||||
const auto pe_pos = position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{});
|
||||
|
||||
for (const auto validation_level : {vl::none, vl::partition_region, vl::token, vl::partition_key, vl::clustering_key}) {
|
||||
testlog.info("valiation_level={}", static_cast<int>(validation_level));
|
||||
|
||||
mutation_fragment_stream_validating_filter validator("test", *ss.schema(), validation_level, false);
|
||||
|
||||
BOOST_REQUIRE(validator(mf_kind::partition_start, ps_pos, {}));
|
||||
BOOST_REQUIRE(validator(dk_));
|
||||
BOOST_REQUIRE(validator(mf_kind::static_row, sr_pos, {}));
|
||||
|
||||
// OOO fragment kind
|
||||
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition::for_key(ck0), {}));
|
||||
BOOST_REQUIRE(validation_level < vl::partition_region || !validator(mf_kind::static_row, sr_pos, {}));
|
||||
|
||||
// OOO clustering row
|
||||
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition::for_key(ck1), {}));
|
||||
BOOST_REQUIRE(validation_level < vl::clustering_key || !validator(mf_kind::clustering_row, position_in_partition::for_key(ck0), {}));
|
||||
|
||||
// Active range tombstone at partition-end
|
||||
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck2), ss.new_tombstone()));
|
||||
if (validation_level == vl::none) {
|
||||
BOOST_REQUIRE(validator(mf_kind::partition_end, pe_pos, {}));
|
||||
} else {
|
||||
BOOST_REQUIRE(!validator(mf_kind::partition_end, pe_pos, {}));
|
||||
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck3), tombstone()));
|
||||
BOOST_REQUIRE(validator(mf_kind::partition_end, pe_pos, {}));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(validator(dk1));
|
||||
|
||||
// OOO partition-key
|
||||
BOOST_REQUIRE(validation_level < vl::partition_key || !validator(dk1));
|
||||
|
||||
// OOO token
|
||||
BOOST_REQUIRE(validation_level < vl::token || !validator(dk0));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@
|
||||
#include "readers/from_fragments_v2.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -3494,25 +3495,35 @@ SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components) {
|
||||
// create a bloom filter
|
||||
auto sst_test = sstables::test(sst);
|
||||
sst_test.create_bloom_filter(100);
|
||||
sst_test.write_filter();
|
||||
auto total_reclaimable_memory = sst_test.total_reclaimable_memory_size();
|
||||
|
||||
// Test sstable::reclaim_memory_from_components() :
|
||||
BOOST_REQUIRE_EQUAL(sst_test.reclaim_memory_from_components(), total_reclaimable_memory);
|
||||
// No more memory to reclaim in the sstable
|
||||
BOOST_REQUIRE_EQUAL(sst_test.total_reclaimable_memory_size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
|
||||
|
||||
// Test sstable::reload_reclaimed_components() :
|
||||
// Reloading should load the bloom filter back into memory
|
||||
sst_test.reload_reclaimed_components();
|
||||
// SSTable should have reclaimable memory from the bloom filter
|
||||
BOOST_REQUIRE_EQUAL(sst_test.total_reclaimable_memory_size(), total_reclaimable_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), total_reclaimable_memory);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env, test_env_sstables_manager& sst_mgr, schema_ptr sptr, uint64_t estimated_partitions, sstring tmpdir_path) {
|
||||
auto sst = env.make_sstable(sptr, tmpdir_path, 0);
|
||||
std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env, test_env_sstables_manager& sst_mgr, schema_ptr sptr, uint64_t estimated_partitions, sstring tmpdir_path, int64_t generation) {
|
||||
auto sst = env.make_sstable(sptr, tmpdir_path, generation);
|
||||
sstables::test(sst).create_bloom_filter(estimated_partitions);
|
||||
sstables::test(sst).write_filter();
|
||||
auto sst_bf_memory = sst->filter_memory_size();
|
||||
sst_mgr.increment_total_reclaimable_memory_and_maybe_reclaim(sst.get());
|
||||
return {sst, sst_bf_memory};
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_under_pressure) {
|
||||
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
return async([&env, tmpdir_path] {
|
||||
simple_schema ss;
|
||||
@@ -3521,18 +3532,18 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_under_pressure) {
|
||||
auto& sst_mgr = env.manager();
|
||||
|
||||
// Verify nothing it reclaimed when under threshold
|
||||
auto [sst1, sst1_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 70, tmpdir_path);
|
||||
auto [sst1, sst1_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 70, tmpdir_path, 0);
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
|
||||
|
||||
auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 20, tmpdir_path);
|
||||
auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 20, tmpdir_path, 1);
|
||||
// Confirm reclaim was still not triggered
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
|
||||
|
||||
// Verify manager reclaims from the largest sst when the total usage crosses thresold.
|
||||
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 50, tmpdir_path);
|
||||
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 50, tmpdir_path, 2);
|
||||
// sst1 has the most reclaimable memory
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
|
||||
@@ -3540,18 +3551,29 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_under_pressure) {
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
|
||||
|
||||
// Reclaim should also work on the latest sst being added
|
||||
auto [sst4, sst4_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 100, tmpdir_path);
|
||||
auto [sst4, sst4_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 100, tmpdir_path, 3);
|
||||
// sst4 should have been reclaimed
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst4->filter_memory_size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory + sst4_bf_memory);
|
||||
|
||||
// Test auto reload - disposing sst3 should trigger reload of the
|
||||
// smallest filter in the reclaimed list, which is sst1's bloom filter.
|
||||
shared_sstable::dispose(sst3.release().release());
|
||||
BOOST_REQUIRE(eventually_true([&sst1 = sst1, sst1_bf_memory = sst1_bf_memory] { return sst1->filter_memory_size() == sst1_bf_memory; }));
|
||||
|
||||
// only sst4's bloom filter memory should be reported as reclaimed
|
||||
BOOST_REQUIRE(eventually_true([&sst_mgr, sst4_bf_memory = sst4_bf_memory] { return sst_mgr.get_total_memory_reclaimed() == sst4_bf_memory; }));
|
||||
// sst2 and sst4 remain the same
|
||||
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst4->filter_memory_size(), 0);
|
||||
});
|
||||
}, {
|
||||
// limit available memory to the sstables_manager to test reclaiming.
|
||||
// this will set the reclaim threshold to 100 bytes.
|
||||
.available_memory = 1000
|
||||
.available_memory = 500
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3581,3 +3603,57 @@ SEASTAR_TEST_CASE(test_reclaimed_bloom_filter_deletion_from_disk) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
return async([&env, tmpdir_path] {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return;
|
||||
#endif
|
||||
simple_schema ss;
|
||||
auto schema_ptr = ss.schema();
|
||||
|
||||
auto& sst_mgr = env.manager();
|
||||
|
||||
auto [sst1, sst1_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 100, tmpdir_path, 0);
|
||||
// there is sufficient memory for sst1's filter
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
|
||||
auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 60, tmpdir_path, 1);
|
||||
// total memory used by the bloom filters has crossed the threshold, so sst1's
|
||||
// filter, which occupies the most memory, will be discarded from memory.
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
|
||||
|
||||
// enable injector that delays reloading a filter
|
||||
utils::get_local_injector().enable("reload_reclaimed_components/pause", true);
|
||||
|
||||
// dispose sst2 to trigger reload of sst1's bloom filter
|
||||
shared_sstable::dispose(sst2.release().release());
|
||||
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
|
||||
BOOST_REQUIRE(eventually_true([&sst_mgr, sst1_bf_memory = sst1_bf_memory] { return sst_mgr.get_total_reclaimable_memory() == sst1_bf_memory; }));
|
||||
|
||||
// now that the reload is midway and paused, create new sst to verify that its
|
||||
// filter gets evicted immediately as the memory that became available is reserved
|
||||
// for sst1's filter reload.
|
||||
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 80, tmpdir_path, 2);
|
||||
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), 0);
|
||||
// confirm sst1 is not reloaded yet
|
||||
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory + sst3_bf_memory);
|
||||
|
||||
// resume reloading sst1 filter
|
||||
BOOST_REQUIRE(eventually_true([&sst1 = sst1, sst1_bf_memory = sst1_bf_memory] { return sst1->filter_memory_size() == sst1_bf_memory; }));
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst3_bf_memory);
|
||||
|
||||
utils::get_local_injector().disable("reload_reclaimed_components/pause");
|
||||
});
|
||||
}, {
|
||||
// limit available memory to the sstables_manager to test reclaiming.
|
||||
// this will set the reclaim threshold to 100 bytes.
|
||||
.available_memory = 500
|
||||
});
|
||||
}
|
||||
|
||||
@@ -179,7 +179,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) {
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
auto dir = tmpdir();
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
|
||||
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), generation_type(this_shard_id()).value()));
|
||||
|
||||
// Now there is one sstable to the upload directory, but it is incomplete and one component is missing.
|
||||
// We should fail validation and leave the directory untouched
|
||||
|
||||
@@ -276,27 +276,27 @@ void test_timestamp_like_string_conversions(data_type timestamp_type) {
|
||||
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-03T12:30:00+1230"), timestamp_type->decompose(tp)));
|
||||
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-02T23:00-0100"), timestamp_type->decompose(tp)));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00.000Z");
|
||||
|
||||
// test fractional milliseconds
|
||||
tp = db_clock::time_point(db_clock::duration(1435881600123));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00.123000");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00.123Z");
|
||||
|
||||
// test time_stamps around the unix epoch time
|
||||
tp = db_clock::time_point(db_clock::duration(0));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "1970-01-01T00:00:00");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "1970-01-01T00:00:00.000Z");
|
||||
tp = db_clock::time_point(db_clock::duration(456));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "1970-01-01T00:00:00.456000");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "1970-01-01T00:00:00.456Z");
|
||||
tp = db_clock::time_point(db_clock::duration(-456));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "1969-12-31T23:59:59.544000");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "1969-12-31T23:59:59.544Z");
|
||||
|
||||
// test time_stamps around year 0
|
||||
tp = db_clock::time_point(db_clock::duration(-62167219200000));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "0000-01-01T00:00:00");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "0000-01-01T00:00:00.000Z");
|
||||
tp = db_clock::time_point(db_clock::duration(-62167219199211));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "0000-01-01T00:00:00.789000");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "0000-01-01T00:00:00.789Z");
|
||||
tp = db_clock::time_point(db_clock::duration(-62167219200789));
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "-0001-12-31T23:59:59.211000");
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "-0001-12-31T23:59:59.211Z");
|
||||
|
||||
auto now = time(nullptr);
|
||||
::tm local_now;
|
||||
|
||||
@@ -169,7 +169,6 @@ def testNoLossOfPrecisionForCastToDecimal(cql, test_keyspace):
|
||||
assertRows(execute(cql, table, "SELECT CAST(bigint_clmn AS decimal), CAST(varint_clmn AS decimal) FROM %s"),
|
||||
row(Decimal("9223372036854775807"), Decimal("1234567890123456789")))
|
||||
|
||||
@pytest.mark.xfail(reason="issue #14518")
|
||||
def testTimeCastsInSelectionClause(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(a timeuuid primary key, b timestamp, c date, d time)") as table:
|
||||
yearMonthDay = "2015-05-21"
|
||||
|
||||
@@ -18,6 +18,7 @@ from cassandra.protocol import FunctionFailure, InvalidRequest
|
||||
import pytest
|
||||
import json
|
||||
from decimal import Decimal
|
||||
from datetime import datetime
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def type1(cql, test_keyspace):
|
||||
@@ -29,7 +30,7 @@ def type1(cql, test_keyspace):
|
||||
@pytest.fixture(scope="module")
|
||||
def table1(cql, test_keyspace, type1):
|
||||
table = test_keyspace + "." + unique_name()
|
||||
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, bigv bigint, a ascii, b boolean, vi varint, mai map<ascii, int>, tup frozen<tuple<text, int>>, l list<text>, d double, t time, dec decimal, tupmap map<frozen<tuple<text, int>>, int>, t1 frozen<{type1}>, \"CaseSensitive\" int)")
|
||||
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, bigv bigint, a ascii, b boolean, vi varint, mai map<ascii, int>, tup frozen<tuple<text, int>>, l list<text>, d double, t time, dec decimal, tupmap map<frozen<tuple<text, int>>, int>, t1 frozen<{type1}>, \"CaseSensitive\" int, ts timestamp)")
|
||||
yield table
|
||||
cql.execute("DROP TABLE " + table)
|
||||
|
||||
@@ -335,6 +336,33 @@ def test_select_json_time(cql, table1):
|
||||
cql.execute(stmt, [p, 123])
|
||||
assert list(cql.execute(f"SELECT JSON t from {table1} where p = {p}")) == [(EquivalentJson('{"t": "00:00:00.000000123"}'),)]
|
||||
|
||||
# Check that toJson() returns timestamp string in correct cassandra compatible format (issue #7997)
|
||||
# with milliseconds and timezone specification
|
||||
def test_tojson_timestamp(cql, table1):
|
||||
p = unique_key_int()
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, ts) VALUES (?, ?)")
|
||||
cql.execute(stmt, [p, datetime(2014, 1, 1, 12, 15, 45)])
|
||||
assert list(cql.execute(f"SELECT toJson(ts) from {table1} where p = {p}")) == [('"2014-01-01 12:15:45.000Z"',)]
|
||||
|
||||
# The EquivalentJson class wraps a JSON string, and compare equal to other
|
||||
# strings if both are valid JSON strings which decode to the same object.
|
||||
# EquivalentJson("....") can be used in assert_rows() checks below, to check
|
||||
# whether functionally-equivalent JSON is returned instead of checking for
|
||||
# identical strings.
|
||||
class EquivalentJson:
|
||||
def __init__(self, s):
|
||||
self.obj = json.loads(s)
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, EquivalentJson):
|
||||
return self.obj == other.obj
|
||||
elif isinstance(other, str):
|
||||
return self.obj == json.loads(other)
|
||||
return NotImplemented
|
||||
# Implementing __repr__ is useful because when a comparison fails, pytest
|
||||
# helpfully prints what it tried to compare, and uses __repr__ for that.
|
||||
def __repr__(self):
|
||||
return f'EquivalentJson("{self.obj}")'
|
||||
|
||||
# Test that toJson() can prints a decimal type with a very high mantissa.
|
||||
# Reproduces issue #8002, where it was written as 1 and a billion zeroes,
|
||||
# running out of memory.
|
||||
|
||||
@@ -47,6 +47,10 @@ public:
|
||||
size_t get_total_memory_reclaimed() {
|
||||
return _total_memory_reclaimed;
|
||||
}
|
||||
|
||||
size_t get_total_reclaimable_memory() {
|
||||
return _total_reclaimable_memory;
|
||||
}
|
||||
};
|
||||
|
||||
struct test_env_config {
|
||||
|
||||
@@ -230,6 +230,11 @@ public:
|
||||
_sst->_total_reclaimable_memory.reset();
|
||||
}
|
||||
|
||||
void write_filter() {
|
||||
_sst->_recognized_components.insert(component_type::Filter);
|
||||
_sst->write_filter(default_priority_class());
|
||||
}
|
||||
|
||||
size_t total_reclaimable_memory_size() const {
|
||||
return _sst->total_reclaimable_memory_size();
|
||||
}
|
||||
@@ -237,6 +242,10 @@ public:
|
||||
size_t reclaim_memory_from_components() {
|
||||
return _sst->reclaim_memory_from_components();
|
||||
}
|
||||
|
||||
void reload_reclaimed_components() {
|
||||
_sst->reload_reclaimed_components(default_priority_class()).get();
|
||||
}
|
||||
};
|
||||
|
||||
inline auto replacer_fn_no_op() {
|
||||
|
||||
35
types.cc
35
types.cc
@@ -68,7 +68,7 @@ requires requires {
|
||||
requires std::same_as<typename T::duration, std::chrono::milliseconds>;
|
||||
}
|
||||
sstring
|
||||
time_point_to_string(const T& tp)
|
||||
time_point_to_string(const T& tp, bool use_time_separator = true)
|
||||
{
|
||||
auto count = tp.time_since_epoch().count();
|
||||
auto d = std::div(int64_t(count), int64_t(1000));
|
||||
@@ -78,17 +78,7 @@ time_point_to_string(const T& tp)
|
||||
return fmt::format("{} milliseconds (out of range)", count);
|
||||
}
|
||||
|
||||
auto to_string = [] (const std::tm& tm) {
|
||||
auto year_digits = tm.tm_year >= -1900 ? 4 : 5;
|
||||
return fmt::format("{:-0{}d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}",
|
||||
tm.tm_year + 1900, year_digits, tm.tm_mon + 1, tm.tm_mday,
|
||||
tm.tm_hour, tm.tm_min, tm.tm_sec);
|
||||
};
|
||||
|
||||
auto millis = d.rem;
|
||||
if (!millis) {
|
||||
return fmt::format("{}", to_string(tm));
|
||||
}
|
||||
// adjust seconds for time points earlier than posix epoch
|
||||
// to keep the fractional millis positive
|
||||
if (millis < 0) {
|
||||
@@ -96,8 +86,13 @@ time_point_to_string(const T& tp)
|
||||
seconds--;
|
||||
gmtime_r(&seconds, &tm);
|
||||
}
|
||||
auto micros = millis * 1000;
|
||||
return fmt::format("{}.{:06d}", to_string(tm), micros);
|
||||
|
||||
const auto time_separator = (use_time_separator) ? "T" : " ";
|
||||
auto year_digits = tm.tm_year >= -1900 ? 4 : 5;
|
||||
|
||||
return fmt::format("{:-0{}d}-{:02d}-{:02d}{}{:02d}:{:02d}:{:02d}.{:03d}Z",
|
||||
tm.tm_year + 1900, year_digits, tm.tm_mon + 1, tm.tm_mday, time_separator,
|
||||
tm.tm_hour, tm.tm_min, tm.tm_sec, millis);
|
||||
}
|
||||
|
||||
sstring simple_date_to_string(const uint32_t days_count) {
|
||||
@@ -2725,6 +2720,12 @@ static sstring format_if_not_empty(
|
||||
return f(static_cast<const N&>(*b));
|
||||
}
|
||||
|
||||
sstring timestamp_to_json_string(const timestamp_date_base_class& t, const bytes_view& bv)
|
||||
{
|
||||
auto tp = value_cast<const timestamp_date_base_class::native_type>(t.deserialize(bv));
|
||||
return format_if_not_empty(t, &tp, [](const db_clock::time_point& v) { return time_point_to_string(v, false); });
|
||||
}
|
||||
|
||||
static sstring to_string_impl(const abstract_type& t, const void* v);
|
||||
|
||||
namespace {
|
||||
@@ -3506,16 +3507,12 @@ sstring data_value::to_parsable_string() const {
|
||||
|
||||
abstract_type::kind type_kind = _type->without_reversed().get_kind();
|
||||
|
||||
if (type_kind == abstract_type::kind::date || type_kind == abstract_type::kind::timestamp) {
|
||||
// Put timezone information after a date or timestamp to specify that it's in UTC
|
||||
// Otherwise it will be parsed as a date in the local timezone.
|
||||
return fmt::format("'{}+0000'", *this);
|
||||
}
|
||||
|
||||
if (type_kind == abstract_type::kind::utf8
|
||||
|| type_kind == abstract_type::kind::ascii
|
||||
|| type_kind == abstract_type::kind::inet
|
||||
|| type_kind == abstract_type::kind::time
|
||||
|| type_kind == abstract_type::kind::date
|
||||
|| type_kind == abstract_type::kind::timestamp
|
||||
) {
|
||||
// Put quotes on types that require it
|
||||
return fmt::format("'{}'", *this);
|
||||
|
||||
@@ -337,7 +337,8 @@ chunked_vector<T, max_contiguous_allocation>::chunked_vector(Iterator begin, Ite
|
||||
}
|
||||
|
||||
template <typename T, size_t max_contiguous_allocation>
|
||||
chunked_vector<T, max_contiguous_allocation>::chunked_vector(size_t n, const T& value) {
|
||||
chunked_vector<T, max_contiguous_allocation>::chunked_vector(size_t n, const T& value)
|
||||
: chunked_vector() {
|
||||
reserve(n);
|
||||
std::fill_n(std::back_inserter(*this), n, value);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user