Merge "Record large cells to system.large_cells" from Rafael

Issue #4234 asks for a large collection detector. Discussing the issue
Benny pointed out that it is probably better to have a generic large
cell detector as it makes a natural progression on what we already
warn on (large partitions and large rows).

This patch series implements that. It is on top of
shutdown-order-patches-v7 which is currently on next.

With the charges to use a semaphore this patch series might be getting
a bit big. Let me know if I should split it.

* https://github.com/espindola/scylla espindola/large-cells-on-top-of-shutdown-v5:
  db: refactor large data deletion code
  db: Rename (maybe_)?update_large_partitions
  db: refactor a try_record helper
  large_data_handler: assert it is not used after stop()
  db: don't use _stopped directly
  sstables: delete dead error handling code.
  large_data_handler: Remove const from a few functions
  large_data_handler: propagate a future out of stop()
  large_data_handler: Run large data recording in parallel
  Create a system.large_cells table
  db: Record large cells
  Add a test for large cells
This commit is contained in:
Tomasz Grabiec
2019-03-13 09:44:45 +01:00
12 changed files with 216 additions and 124 deletions

View File

@@ -221,7 +221,8 @@ database::database(const db::config& cfg, database_config dbcfg)
, _enable_incremental_backups(cfg.incremental_backups())
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg->compaction_large_partition_warning_threshold_mb()*1024*1024,
_cfg->compaction_large_row_warning_threshold_mb()*1024*1024))
_cfg->compaction_large_row_warning_threshold_mb()*1024*1024,
_cfg->compaction_large_cell_warning_threshold_mb()*1024*1024))
, _nop_large_data_handler(std::make_unique<db::nop_large_data_handler>())
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>(*this))
@@ -1665,12 +1666,14 @@ future<> stop_database(sharded<database>& sdb) {
});
}).then([&sdb] {
return sdb.invoke_on_all([](database& db) {
db.stop_large_data_handler();
return db.stop_large_data_handler();
});
});
}
void database::stop_large_data_handler() { _large_data_handler->stop(); }
future<> database::stop_large_data_handler() {
return _large_data_handler->stop();
}
future<>
database::stop() {

View File

@@ -1373,7 +1373,7 @@ public:
future<> stop();
future<> close_tables(table_kind kind_to_close);
void stop_large_data_handler();
future<> stop_large_data_handler();
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);
unsigned shard_of(const frozen_mutation& m);

View File

@@ -231,6 +231,9 @@ public:
val(compaction_large_row_warning_threshold_mb, uint32_t, 10, Used, \
"Log a warning when writing rows larger than this value" \
) \
val(compaction_large_cell_warning_threshold_mb, uint32_t, 1, Used, \
"Log a warning when writing cells larger than this value" \
) \
/* Common memtable settings */ \
val(memtable_total_space_in_mb, uint32_t, 0, Invalid, \
"Specifies the total memory used for all memtables on a node. This replaces the per-table storage settings memtable_operations_in_millions and memtable_throughput_in_mb." \

View File

@@ -27,18 +27,18 @@
namespace db {
future<> large_data_handler::maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const {
assert(!_stopped);
future<> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) {
assert(!stopped());
if (partition_size > _partition_threshold_bytes) {
++_stats.partitions_bigger_than_threshold;
const schema& s = *sst.get_schema();
return update_large_partitions(s, sst.get_filename(), key, partition_size);
return with_sem([&sst, &key, partition_size, this] {
return record_large_partitions(sst, key, partition_size);
});
}
return make_ready_future<>();
}
logging::logger cql_table_large_data_handler::large_data_logger("large_data");
static logging::logger large_data_logger("large_data");
template <typename T> static std::string key_to_str(const T& key, const schema& s) {
std::ostringstream oss;
@@ -46,69 +46,71 @@ template <typename T> static std::string key_to_str(const T& key, const schema&
return oss.str();
}
future<> cql_table_large_data_handler::update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& key, uint64_t partition_size) const {
static const sstring req = format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, partition_size, partition_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?) USING TTL 2592000",
db::system_keyspace::LARGE_PARTITIONS);
template <typename... Args>
static future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) {
sstring extra_fields_str;
sstring extra_values;
for (std::string_view field : extra_fields) {
extra_fields_str += format(", {}", field);
extra_values += ", ?";
}
const sstring req = format("INSERT INTO system.large_{}s (keyspace_name, table_name, sstable_name, {}_size, partition_key, compaction_time{}) VALUES (?, ?, ?, ?, ?, ?{}) USING TTL 2592000",
large_table, large_table, extra_fields_str, extra_values);
const schema &s = *sst.get_schema();
auto ks_name = s.ks_name();
auto cf_name = s.cf_name();
const auto sstable_name = sst.get_filename();
std::string pk_str = key_to_str(partition_key.to_partition_key(s), s);
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(db_clock::now().time_since_epoch()).count();
auto key_str = key_to_str(key.to_partition_key(s), s);
return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(partition_size), key_str, timestamp)
.then_wrapped([ks_name, cf_name, key_str, partition_size](auto&& f) {
try {
f.get();
large_data_logger.warn("Writing large partition {}/{}:{} ({} bytes)", ks_name, cf_name, key_str, partition_size);
} catch (...) {
large_data_logger.warn("Failed to update {}: {}", db::system_keyspace::LARGE_PARTITIONS, std::current_exception());
}
});
large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes)", desc, ks_name, cf_name, pk_str, extra_path, size);
return db::execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
.discard_result()
.handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) {
large_data_logger.warn("Failed to add a record to system.large_{}s: ks = {}, table = {}, sst = {} exception = {}",
large_table, ks_name, cf_name, sstable_name, ep);
});
}
future<> cql_table_large_data_handler::delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const {
static const sstring req = format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?", db::system_keyspace::LARGE_PARTITIONS);
return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name).discard_result().handle_exception([](std::exception_ptr ep) {
large_data_logger.warn("Failed to drop entries from {}: {}", db::system_keyspace::LARGE_PARTITIONS, ep);
});
future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const {
return try_record("partition", sst, key, int64_t(partition_size), "partition", "", {});
}
future<> cql_table_large_data_handler::record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const {
auto column_name = cdef.name_as_text();
std::string_view cell_type = cdef.is_atomic() ? "cell" : "collection";
static const std::vector<sstring> extra_fields{"clustering_key", "column_name"};
if (clustering_key) {
const schema &s = *sst.get_schema();
auto ck_str = key_to_str(*clustering_key, s);
return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, format("{} {}", ck_str, column_name), extra_fields, ck_str, column_name);
} else {
return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, column_name, extra_fields, nullptr, column_name);
}
}
future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) const {
static const sstring req =
format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, row_size, partition_key, "
"clustering_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL 2592000",
db::system_keyspace::LARGE_ROWS);
auto f = [clustering_key, &partition_key, &sst, row_size] {
static const std::vector<sstring> extra_fields{"clustering_key"};
if (clustering_key) {
const schema &s = *sst.get_schema();
auto sstable_name = sst.get_filename();
auto ks_name = s.ks_name();
auto cf_name = s.cf_name();
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(db_clock::now().time_since_epoch()).count();
std::string pk_str = key_to_str(partition_key.to_partition_key(s), s);
if (clustering_key) {
std::string ck_str = key_to_str(*clustering_key, s);
large_data_logger.warn("Writing large row {}/{}: {} {} ({} bytes)", ks_name, cf_name, pk_str, ck_str, row_size);
return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(row_size), pk_str, ck_str, timestamp);
} else {
large_data_logger.warn("Writing large static row {}/{}: {} ({} bytes)", ks_name, cf_name, pk_str, row_size);
return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(row_size), pk_str, nullptr, timestamp);
}
};
return f().discard_result().handle_exception([&sst] (std::exception_ptr ep) {
const schema &s = *sst.get_schema();
large_data_logger.warn("Failed to add a record to {}: ks = {}, table = {}, sst = {} exception = {}",
db::system_keyspace::LARGE_ROWS, s.ks_name(), s.cf_name(), sst.get_filename(), ep);
});
std::string ck_str = key_to_str(*clustering_key, s);
return try_record("row", sst, partition_key, int64_t(row_size), "row", ck_str, extra_fields, ck_str);
} else {
return try_record("row", sst, partition_key, int64_t(row_size), "static row", "", extra_fields, nullptr);
}
}
future<> cql_table_large_data_handler::delete_large_rows_entries(const schema& s, const sstring& sstable_name) const {
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const {
static const sstring req =
format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?",
db::system_keyspace::LARGE_ROWS);
large_table_name);
return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
.discard_result()
.handle_exception([&s, sstable_name] (std::exception_ptr ep) {
.handle_exception([&s, sstable_name, large_table_name] (std::exception_ptr ep) {
large_data_logger.warn("Failed to drop entries from {}: ks = {}, table = {}, sst = {} exception = {}",
db::system_keyspace::LARGE_ROWS, s.ks_name(), s.cf_name(), sstable_name, ep);
large_table_name, s.ks_name(), s.cf_name(), sstable_name, ep);
});
}
}

View File

@@ -23,6 +23,7 @@
#include <cstdint>
#include "schema.hh"
#include "system_keyspace.hh"
namespace sstables {
class sstable;
@@ -38,81 +39,132 @@ public:
};
private:
// Assuming:
// * there is at most one log entry every 1MB
// * the average latency of the log is 4ms (depends on the load)
// * we aim to sustain 1GB/s of write bandwidth
// We need a concurrency of:
// C = (1GB/s / 1MB) * 4ms = 1k/s * 4ms = 4
// 16 should be enough for everybody.
static const size_t max_concurrency = 16;
semaphore _sem{max_concurrency};
// A convenience function for using the above semaphore. Unlike the global with_semaphore, this will not wait on the
// future returned by func. The objective is for the future returned by func to run in parallel with whatever the
// caller is doing, but limit how far behind we can get.
template<typename Func>
future<> with_sem(Func&& func) {
return get_units(_sem, 1).then([func = std::forward<Func>(func)] (auto units) mutable {
func().finally([units = std::move(units)] {});
});
}
bool _stopped = false;
uint64_t _partition_threshold_bytes;
uint64_t _row_threshold_bytes;
uint64_t _cell_threshold_bytes;
mutable large_data_handler::stats _stats;
public:
explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes)
explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes)
: _partition_threshold_bytes(partition_threshold_bytes)
, _row_threshold_bytes(row_threshold_bytes) {}
, _row_threshold_bytes(row_threshold_bytes)
, _cell_threshold_bytes(cell_threshold_bytes) {}
virtual ~large_data_handler() {}
// Once large_data_handler is stopped it will ignore requests to update system.large_partitions. Any futures already
// returned must be waited for by the caller.
// Once large_data_handler is stopped no further updates will be accepted.
bool stopped() const { return _stopped; }
void stop() {
future<> stop() {
assert(!stopped());
_stopped = true;
return _sem.wait(max_concurrency);
}
future<> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) const {
if (__builtin_expect(!_stopped && row_size > _row_threshold_bytes, false)) {
return record_large_rows(sst, partition_key, clustering_key, row_size);
const clustering_key_prefix* clustering_key, uint64_t row_size) {
assert(!stopped());
if (__builtin_expect(row_size > _row_threshold_bytes, false)) {
return with_sem([&sst, &partition_key, clustering_key, row_size, this] {
return record_large_rows(sst, partition_key, clustering_key, row_size);
});
}
return make_ready_future<>();
}
future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const;
future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size);
future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const {
assert(!_stopped);
future<> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) {
assert(!stopped());
if (__builtin_expect(cell_size > _cell_threshold_bytes, false)) {
return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, this] {
return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size);
});
}
return make_ready_future<>();
}
future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) {
assert(!stopped());
future<> large_partitions = make_ready_future<>();
if (__builtin_expect(data_size > _partition_threshold_bytes, false)) {
large_partitions = delete_large_partitions_entry(s, filename);
large_partitions = with_sem([&s, &filename, this] {
return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_PARTITIONS);
});
}
future<> large_rows = make_ready_future<>();
if (__builtin_expect(data_size > _row_threshold_bytes, false)) {
large_rows = delete_large_rows_entries(s, filename);
large_rows = with_sem([&s, &filename, this] {
return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_ROWS);
});
}
return when_all(std::move(large_partitions), std::move(large_rows)).discard_result();
future<> large_cells = make_ready_future<>();
if (__builtin_expect(data_size > _cell_threshold_bytes, false)) {
large_cells = with_sem([&s, &filename, this] {
return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_CELLS);
});
}
return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result();
}
const large_data_handler::stats& stats() const { return _stats; }
protected:
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const = 0;
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const = 0;
virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const = 0;
virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const = 0;
virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const = 0;
virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const = 0;
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const = 0;
};
class cql_table_large_data_handler : public large_data_handler {
protected:
static logging::logger large_data_logger;
public:
explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes)
: large_data_handler(partition_threshold_bytes, row_threshold_bytes) {}
explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes)
: large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes) {}
protected:
virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override;
virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override;
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override;
virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override;
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override;
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override;
virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override;
};
class nop_large_data_handler : public large_data_handler {
public:
nop_large_data_handler()
: large_data_handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max()) {}
virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override {
: large_data_handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max()) {}
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override {
return make_ready_future<>();
}
virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override {
virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override {
return make_ready_future<>();
}
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override {
return make_ready_future<>();
}
@@ -120,9 +172,6 @@ public:
const clustering_key_prefix* clustering_key, uint64_t row_size) const override {
return make_ready_future<>();
}
virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override {
return make_ready_future<>();
}
};
}

View File

@@ -498,6 +498,26 @@ static schema_ptr large_rows() {
return large_rows;
}
static schema_ptr large_cells() {
static thread_local auto large_cells = [] {
auto id = generate_legacy_id(NAME, LARGE_CELLS);
return schema_builder(NAME, LARGE_CELLS, id)
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
// We want the larger cells first, so use reversed_type_impl
.with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
.with_column("partition_key", utf8_type, column_kind::clustering_key)
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
.with_column("column_name", utf8_type, column_kind::clustering_key)
.with_column("compaction_time", timestamp_type)
.set_comment("cells larger than specified threshold")
.with_version(generate_schema_version(id))
.build();
}();
return large_cells;
}
namespace v3 {
schema_ptr batches() {
@@ -1691,7 +1711,7 @@ std::vector<schema_ptr> all_tables() {
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
peers(), peer_events(), range_xfers(),
compactions_in_progress(), compaction_history(),
sstable_activity(), size_estimates(), large_partitions(), large_rows(),
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),

View File

@@ -89,6 +89,7 @@ static constexpr auto SSTABLE_ACTIVITY = "sstable_activity";
static constexpr auto SIZE_ESTIMATES = "size_estimates";
static constexpr auto LARGE_PARTITIONS = "large_partitions";
static constexpr auto LARGE_ROWS = "large_rows";
static constexpr auto LARGE_CELLS = "large_cells";
namespace v3 {
static constexpr auto BATCHES = "batches";

View File

@@ -668,17 +668,18 @@ private:
};
// Writes single atomic cell
void write_cell(bytes_ostream& writer, atomic_cell_view cell, const column_definition& cdef,
void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, bytes_view cell_path = {});
// Writes information about row liveness (formerly 'row marker')
void write_liveness_info(bytes_ostream& writer, const row_marker& marker);
// Writes a CQL collection (list, set or map)
void write_collection(bytes_ostream& writer, const column_definition& cdef, collection_mutation_view collection,
void write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key, const column_definition& cdef, collection_mutation_view collection,
const row_time_properties& properties, bool has_complex_deletion);
void write_cells(bytes_ostream& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion);
void write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind,
const row& row_body, const row_time_properties& properties, bool has_complex_deletion);
void write_row_body(bytes_ostream& writer, const clustering_row& row, bool has_complex_deletion);
void write_static_row(const row& static_row);
void collect_row_stats(uint64_t row_size, const clustering_key_prefix* clustering_key) {
@@ -961,9 +962,10 @@ void writer::consume(tombstone t) {
_tombstone_written = true;
}
void writer::write_cell(bytes_ostream& writer, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, bytes_view cell_path) {
void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell,
const column_definition& cdef, const row_time_properties& properties, bytes_view cell_path) {
uint64_t current_pos = writer.size();
bool is_deleted = !cell.is_live();
bool has_value = !is_deleted && !cell.value().empty();
bool use_row_timestamp = (properties.timestamp == cell.timestamp());
@@ -1022,6 +1024,12 @@ void writer::write_cell(bytes_ostream& writer, atomic_cell_view cell, const colu
}
// Collect cell statistics
// We record collections in write_collection, so ignore them here
if (cdef.is_atomic()) {
uint64_t size = writer.size() - current_pos;
_cfg.large_data_handler->maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size);
}
_c_stats.update_timestamp(cell.timestamp());
if (is_deleted) {
_c_stats.update_local_deletion_time_and_tombstone_histogram(cell.deletion_time());
@@ -1066,8 +1074,10 @@ void writer::write_liveness_info(bytes_ostream& writer, const row_marker& marker
}
}
void writer::write_collection(bytes_ostream& writer, const column_definition& cdef,
collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) {
void writer::write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key,
const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties,
bool has_complex_deletion) {
uint64_t current_pos = writer.size();
auto& ctype = *static_pointer_cast<const collection_type_impl>(cdef.type);
collection.data.with_linearized([&] (bytes_view collection_bv) {
auto mview = ctype.deserialize_mutation_form(collection_bv);
@@ -1082,19 +1092,21 @@ void writer::write_collection(bytes_ostream& writer, const column_definition& cd
}
for (const auto& [cell_path, cell]: mview.cells) {
++_c_stats.cells_count;
write_cell(writer, cell, cdef, properties, cell_path);
write_cell(writer, clustering_key, cell, cdef, properties, cell_path);
}
});
uint64_t size = writer.size() - current_pos;
_cfg.large_data_handler->maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size);
}
void writer::write_cells(bytes_ostream& writer, column_kind kind, const row& row_body,
void writer::write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, const row& row_body,
const row_time_properties& properties, bool has_complex_deletion) {
// Note that missing columns are written based on the whole set of regular columns as defined by schema.
// This differs from Origin where all updated columns are tracked and the set of filled columns of a row
// is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases
// but still valid.
write_missing_columns(writer, kind == column_kind::static_column ? _static_columns : _regular_columns, row_body);
row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) {
row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion, clustering_key] (column_id id, const atomic_cell_or_collection& c) {
auto&& column_definition = _schema.column_at(kind, id);
if (!column_definition.is_atomic()) {
_collections.push_back({&column_definition, c});
@@ -1103,11 +1115,11 @@ void writer::write_cells(bytes_ostream& writer, column_kind kind, const row& row
atomic_cell_view cell = c.as_atomic_cell(column_definition);
++_c_stats.cells_count;
++_c_stats.column_count;
write_cell(writer, cell, column_definition, properties);
write_cell(writer, clustering_key, cell, column_definition, properties);
});
for (const auto& col: _collections) {
write_collection(writer, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion);
write_collection(writer, clustering_key, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion);
}
_collections.clear();
}
@@ -1133,7 +1145,7 @@ void writer::write_row_body(bytes_ostream& writer, const clustering_row& row, bo
}
}
return write_cells(writer, column_kind::regular_column, row.cells(), properties, has_complex_deletion);
return write_cells(writer, &row.key(), column_kind::regular_column, row.cells(), properties, has_complex_deletion);
}
// Find if any collection in the row contains a collection-wide tombstone
@@ -1174,7 +1186,7 @@ void writer::write_static_row(const row& static_row) {
write(_sst.get_version(), *_data_writer, row_extended_flags::is_static);
write_vint(_tmp_bufs, 0); // as the static row always comes first, the previous row size is always zero
write_cells(_tmp_bufs, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion);
write_cells(_tmp_bufs, nullptr, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion);
write_vint(*_data_writer, _tmp_bufs.size());
flush_tmp_bufs();
@@ -1338,7 +1350,7 @@ stop_iteration writer::consume_end_of_partition() {
// compute size of the current row.
_c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset;
_cfg.large_data_handler->maybe_update_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get();
_cfg.large_data_handler->maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get();
// update is about merging column_stats with the data being stored by collector.
_sst.get_metadata_collector().update(std::move(_c_stats));

View File

@@ -2118,7 +2118,7 @@ stop_iteration components_writer::consume_end_of_partition() {
// compute size of the current row.
_sst._c_stats.partition_size = _out.offset() - _sst._c_stats.start_offset;
_large_data_handler->maybe_update_large_partitions(_sst, *_partition_key, _sst._c_stats.partition_size).get();
_large_data_handler->maybe_record_large_partitions(_sst, *_partition_key, _sst._c_stats.partition_size).get();
// update is about merging column_stats with the data being stored by collector.
_sst._collector.update(std::move(_sst._c_stats));
@@ -3144,28 +3144,20 @@ sstable::unlink()
}
static future<>
maybe_delete_large_data_entry(shared_sstable sst, const db::large_data_handler& large_data_handler)
maybe_delete_large_data_entry(shared_sstable sst, db::large_data_handler& large_data_handler)
{
auto name = sst->get_filename();
return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size())
.then_wrapped([name = std::move(name)] (future<> f) {
if (f.failed()) {
// Just log and ignore failures to delete large data entries.
// They are not critical to the operation of the database.
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, f.get_exception());
}
return make_ready_future<>();
});
return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size());
}
static future<>
delete_sstable_and_maybe_large_data_entries(shared_sstable sst, const db::large_data_handler& large_data_handler)
delete_sstable_and_maybe_large_data_entries(shared_sstable sst, db::large_data_handler& large_data_handler)
{
return when_all(sst->unlink(), maybe_delete_large_data_entry(sst, large_data_handler)).discard_result();
}
future<>
delete_atomically(std::vector<shared_sstable> ssts, const db::large_data_handler& large_data_handler) {
delete_atomically(std::vector<shared_sstable> ssts, db::large_data_handler& large_data_handler) {
return seastar::async([ssts = std::move(ssts), &large_data_handler] {
sstring sstdir;
min_max_tracker<int64_t> gen_tracker;

View File

@@ -873,7 +873,7 @@ future<> await_background_jobs_on_all_shards();
// until all shards agree it can be deleted.
//
// This function only solves the second problem for now.
future<> delete_atomically(std::vector<shared_sstable> ssts, const db::large_data_handler& large_data_handler);
future<> delete_atomically(std::vector<shared_sstable> ssts, db::large_data_handler& large_data_handler);
future<> replay_pending_delete_log(sstring log_file);
struct index_sampling_state {

View File

@@ -52,9 +52,10 @@ SEASTAR_TEST_CASE(test_large_partitions) {
return do_with_cql_env([](cql_test_env& e) { return make_ready_future<>(); }, cfg);
}
SEASTAR_THREAD_TEST_CASE(test_large_rows) {
SEASTAR_THREAD_TEST_CASE(test_large_data) {
db::config cfg{};
cfg.compaction_large_row_warning_threshold_mb(1);
cfg.compaction_large_cell_warning_threshold_mb(1);
do_with_cql_env([](cql_test_env& e) {
e.execute_cql("create table tbl (a int, b text, primary key (a))").get();
sstring blob(1024*1024, 'x');
@@ -82,6 +83,13 @@ SEASTAR_THREAD_TEST_CASE(test_large_rows) {
BOOST_REQUIRE_EQUAL(row_size_bytes.size(), 8);
long row_size = read_be<long>(reinterpret_cast<const char*>(&row_size_bytes[0]));
BOOST_REQUIRE(row_size > 1024*1024 && row_size < 1025*1024);
// Check that it was added to system.large_cells too
assert_that(e.execute_cql("select partition_key, column_name from system.large_cells where table_name = 'tbl' allow filtering;").get0())
.is_rows()
.with_size(1)
.with_row({"44", "b", "tbl"});
return make_ready_future<>();
}, cfg).get();
}

View File

@@ -4977,7 +4977,7 @@ struct large_row_handler : public db::large_data_handler {
callback_t callback;
large_row_handler(uint64_t threshold, callback_t callback)
: large_data_handler(std::numeric_limits<uint64_t>::max(), threshold)
: large_data_handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max())
, callback(std::move(callback)) {}
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
@@ -4987,15 +4987,17 @@ struct large_row_handler : public db::large_data_handler {
return make_ready_future<>();
}
virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name,
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override {
return make_ready_future<>();
}
virtual future<> record_large_partitions(const sstables::sstable& sst,
const sstables::key& partition_key, uint64_t partition_size) const override {
return make_ready_future<>();
}
virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override {
return make_ready_future<>();
}
virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override {
virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view) const override {
return make_ready_future<>();
}
};