diff --git a/database.cc b/database.cc index 0ff7757161..ad95d14558 100644 --- a/database.cc +++ b/database.cc @@ -221,7 +221,8 @@ database::database(const db::config& cfg, database_config dbcfg) , _enable_incremental_backups(cfg.incremental_backups()) , _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04) , _large_data_handler(std::make_unique(_cfg->compaction_large_partition_warning_threshold_mb()*1024*1024, - _cfg->compaction_large_row_warning_threshold_mb()*1024*1024)) + _cfg->compaction_large_row_warning_threshold_mb()*1024*1024, + _cfg->compaction_large_cell_warning_threshold_mb()*1024*1024)) , _nop_large_data_handler(std::make_unique()) , _result_memory_limiter(dbcfg.available_memory / 10) , _data_listeners(std::make_unique(*this)) @@ -1665,12 +1666,14 @@ future<> stop_database(sharded& sdb) { }); }).then([&sdb] { return sdb.invoke_on_all([](database& db) { - db.stop_large_data_handler(); + return db.stop_large_data_handler(); }); }); } -void database::stop_large_data_handler() { _large_data_handler->stop(); } +future<> database::stop_large_data_handler() { + return _large_data_handler->stop(); +} future<> database::stop() { diff --git a/database.hh b/database.hh index dd872e25ab..29f39844b7 100644 --- a/database.hh +++ b/database.hh @@ -1373,7 +1373,7 @@ public: future<> stop(); future<> close_tables(table_kind kind_to_close); - void stop_large_data_handler(); + future<> stop_large_data_handler(); unsigned shard_of(const dht::token& t); unsigned shard_of(const mutation& m); unsigned shard_of(const frozen_mutation& m); diff --git a/db/config.hh b/db/config.hh index 1aa056e745..1f5fdb6350 100644 --- a/db/config.hh +++ b/db/config.hh @@ -231,6 +231,9 @@ public: val(compaction_large_row_warning_threshold_mb, uint32_t, 10, Used, \ "Log a warning when writing rows larger than this value" \ ) \ + val(compaction_large_cell_warning_threshold_mb, uint32_t, 1, Used, \ + "Log a warning when writing cells larger than this value" \ + ) \ /* Common memtable settings */ \ val(memtable_total_space_in_mb, uint32_t, 0, Invalid, \ "Specifies the total memory used for all memtables on a node. This replaces the per-table storage settings memtable_operations_in_millions and memtable_throughput_in_mb." \ diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index be2f152600..c7ccebec98 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -27,18 +27,18 @@ namespace db { -future<> large_data_handler::maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { - assert(!_stopped); +future<> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) { + assert(!stopped()); if (partition_size > _partition_threshold_bytes) { ++_stats.partitions_bigger_than_threshold; - - const schema& s = *sst.get_schema(); - return update_large_partitions(s, sst.get_filename(), key, partition_size); + return with_sem([&sst, &key, partition_size, this] { + return record_large_partitions(sst, key, partition_size); + }); } return make_ready_future<>(); } -logging::logger cql_table_large_data_handler::large_data_logger("large_data"); +static logging::logger large_data_logger("large_data"); template static std::string key_to_str(const T& key, const schema& s) { std::ostringstream oss; @@ -46,69 +46,71 @@ template static std::string key_to_str(const T& key, const schema& return oss.str(); } -future<> cql_table_large_data_handler::update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& key, uint64_t partition_size) const { - static const sstring req = format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, partition_size, partition_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?) USING TTL 2592000", - db::system_keyspace::LARGE_PARTITIONS); +template +static future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size, + std::string_view desc, std::string_view extra_path, const std::vector &extra_fields, Args&&... args) { + sstring extra_fields_str; + sstring extra_values; + for (std::string_view field : extra_fields) { + extra_fields_str += format(", {}", field); + extra_values += ", ?"; + } + const sstring req = format("INSERT INTO system.large_{}s (keyspace_name, table_name, sstable_name, {}_size, partition_key, compaction_time{}) VALUES (?, ?, ?, ?, ?, ?{}) USING TTL 2592000", + large_table, large_table, extra_fields_str, extra_values); + const schema &s = *sst.get_schema(); auto ks_name = s.ks_name(); auto cf_name = s.cf_name(); + const auto sstable_name = sst.get_filename(); + std::string pk_str = key_to_str(partition_key.to_partition_key(s), s); auto timestamp = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); - auto key_str = key_to_str(key.to_partition_key(s), s); - return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(partition_size), key_str, timestamp) - .then_wrapped([ks_name, cf_name, key_str, partition_size](auto&& f) { - try { - f.get(); - large_data_logger.warn("Writing large partition {}/{}:{} ({} bytes)", ks_name, cf_name, key_str, partition_size); - } catch (...) { - large_data_logger.warn("Failed to update {}: {}", db::system_keyspace::LARGE_PARTITIONS, std::current_exception()); - } - }); + large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes)", desc, ks_name, cf_name, pk_str, extra_path, size); + return db::execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...) + .discard_result() + .handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) { + large_data_logger.warn("Failed to add a record to system.large_{}s: ks = {}, table = {}, sst = {} exception = {}", + large_table, ks_name, cf_name, sstable_name, ep); + }); } -future<> cql_table_large_data_handler::delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const { - static const sstring req = format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?", db::system_keyspace::LARGE_PARTITIONS); - return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name).discard_result().handle_exception([](std::exception_ptr ep) { - large_data_logger.warn("Failed to drop entries from {}: {}", db::system_keyspace::LARGE_PARTITIONS, ep); - }); +future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { + return try_record("partition", sst, key, int64_t(partition_size), "partition", "", {}); +} + +future<> cql_table_large_data_handler::record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const { + auto column_name = cdef.name_as_text(); + std::string_view cell_type = cdef.is_atomic() ? "cell" : "collection"; + static const std::vector extra_fields{"clustering_key", "column_name"}; + if (clustering_key) { + const schema &s = *sst.get_schema(); + auto ck_str = key_to_str(*clustering_key, s); + return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, format("{} {}", ck_str, column_name), extra_fields, ck_str, column_name); + } else { + return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, column_name, extra_fields, nullptr, column_name); + } } future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const { - static const sstring req = - format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, row_size, partition_key, " - "clustering_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL 2592000", - db::system_keyspace::LARGE_ROWS); - auto f = [clustering_key, &partition_key, &sst, row_size] { + static const std::vector extra_fields{"clustering_key"}; + if (clustering_key) { const schema &s = *sst.get_schema(); - auto sstable_name = sst.get_filename(); - auto ks_name = s.ks_name(); - auto cf_name = s.cf_name(); - auto timestamp = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); - std::string pk_str = key_to_str(partition_key.to_partition_key(s), s); - if (clustering_key) { - std::string ck_str = key_to_str(*clustering_key, s); - large_data_logger.warn("Writing large row {}/{}: {} {} ({} bytes)", ks_name, cf_name, pk_str, ck_str, row_size); - return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(row_size), pk_str, ck_str, timestamp); - } else { - large_data_logger.warn("Writing large static row {}/{}: {} ({} bytes)", ks_name, cf_name, pk_str, row_size); - return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(row_size), pk_str, nullptr, timestamp); - } - }; - return f().discard_result().handle_exception([&sst] (std::exception_ptr ep) { - const schema &s = *sst.get_schema(); - large_data_logger.warn("Failed to add a record to {}: ks = {}, table = {}, sst = {} exception = {}", - db::system_keyspace::LARGE_ROWS, s.ks_name(), s.cf_name(), sst.get_filename(), ep); - }); + std::string ck_str = key_to_str(*clustering_key, s); + return try_record("row", sst, partition_key, int64_t(row_size), "row", ck_str, extra_fields, ck_str); + } else { + return try_record("row", sst, partition_key, int64_t(row_size), "static row", "", extra_fields, nullptr); + } } -future<> cql_table_large_data_handler::delete_large_rows_entries(const schema& s, const sstring& sstable_name) const { +future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const { static const sstring req = format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?", - db::system_keyspace::LARGE_ROWS); + large_table_name); return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name) .discard_result() - .handle_exception([&s, sstable_name] (std::exception_ptr ep) { + .handle_exception([&s, sstable_name, large_table_name] (std::exception_ptr ep) { large_data_logger.warn("Failed to drop entries from {}: ks = {}, table = {}, sst = {} exception = {}", - db::system_keyspace::LARGE_ROWS, s.ks_name(), s.cf_name(), sstable_name, ep); + large_table_name, s.ks_name(), s.cf_name(), sstable_name, ep); }); } } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index cd992347e5..d216fcec22 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -23,6 +23,7 @@ #include #include "schema.hh" +#include "system_keyspace.hh" namespace sstables { class sstable; @@ -38,81 +39,132 @@ public: }; private: + // Assuming: + // * there is at most one log entry every 1MB + // * the average latency of the log is 4ms (depends on the load) + // * we aim to sustain 1GB/s of write bandwidth + // We need a concurrency of: + // C = (1GB/s / 1MB) * 4ms = 1k/s * 4ms = 4 + // 16 should be enough for everybody. + static const size_t max_concurrency = 16; + semaphore _sem{max_concurrency}; + + // A convenience function for using the above semaphore. Unlike the global with_semaphore, this will not wait on the + // future returned by func. The objective is for the future returned by func to run in parallel with whatever the + // caller is doing, but limit how far behind we can get. + template + future<> with_sem(Func&& func) { + return get_units(_sem, 1).then([func = std::forward(func)] (auto units) mutable { + func().finally([units = std::move(units)] {}); + }); + } + bool _stopped = false; uint64_t _partition_threshold_bytes; uint64_t _row_threshold_bytes; + uint64_t _cell_threshold_bytes; mutable large_data_handler::stats _stats; public: - explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes) + explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes) : _partition_threshold_bytes(partition_threshold_bytes) - , _row_threshold_bytes(row_threshold_bytes) {} + , _row_threshold_bytes(row_threshold_bytes) + , _cell_threshold_bytes(cell_threshold_bytes) {} virtual ~large_data_handler() {} - // Once large_data_handler is stopped it will ignore requests to update system.large_partitions. Any futures already - // returned must be waited for by the caller. + // Once large_data_handler is stopped no further updates will be accepted. bool stopped() const { return _stopped; } - void stop() { + future<> stop() { assert(!stopped()); _stopped = true; + return _sem.wait(max_concurrency); } future<> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size) const { - if (__builtin_expect(!_stopped && row_size > _row_threshold_bytes, false)) { - return record_large_rows(sst, partition_key, clustering_key, row_size); + const clustering_key_prefix* clustering_key, uint64_t row_size) { + assert(!stopped()); + if (__builtin_expect(row_size > _row_threshold_bytes, false)) { + return with_sem([&sst, &partition_key, clustering_key, row_size, this] { + return record_large_rows(sst, partition_key, clustering_key, row_size); + }); } return make_ready_future<>(); } - future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; + future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size); - future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const { - assert(!_stopped); + future<> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) { + assert(!stopped()); + if (__builtin_expect(cell_size > _cell_threshold_bytes, false)) { + return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, this] { + return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size); + }); + } + return make_ready_future<>(); + } + + future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) { + assert(!stopped()); future<> large_partitions = make_ready_future<>(); if (__builtin_expect(data_size > _partition_threshold_bytes, false)) { - large_partitions = delete_large_partitions_entry(s, filename); + large_partitions = with_sem([&s, &filename, this] { + return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_PARTITIONS); + }); } future<> large_rows = make_ready_future<>(); if (__builtin_expect(data_size > _row_threshold_bytes, false)) { - large_rows = delete_large_rows_entries(s, filename); + large_rows = with_sem([&s, &filename, this] { + return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_ROWS); + }); } - return when_all(std::move(large_partitions), std::move(large_rows)).discard_result(); + future<> large_cells = make_ready_future<>(); + if (__builtin_expect(data_size > _cell_threshold_bytes, false)) { + large_cells = with_sem([&s, &filename, this] { + return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_CELLS); + }); + } + return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result(); } const large_data_handler::stats& stats() const { return _stats; } protected: + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const = 0; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const = 0; - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const = 0; - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const = 0; - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const = 0; + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const = 0; + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const = 0; }; class cql_table_large_data_handler : public large_data_handler { -protected: - static logging::logger large_data_logger; - public: - explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes) - : large_data_handler(partition_threshold_bytes, row_threshold_bytes) {} + explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes) + : large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes) {} protected: - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override; - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override; + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override; + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override; + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override; }; class nop_large_data_handler : public large_data_handler { public: nop_large_data_handler() - : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max()) {} - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override { + : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max()) {} + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override { return make_ready_future<>(); } - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override { + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override { + return make_ready_future<>(); + } + + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override { return make_ready_future<>(); } @@ -120,9 +172,6 @@ public: const clustering_key_prefix* clustering_key, uint64_t row_size) const override { return make_ready_future<>(); } - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override { - return make_ready_future<>(); - } }; } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index f7260fda52..034811f383 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -498,6 +498,26 @@ static schema_ptr large_rows() { return large_rows; } +static schema_ptr large_cells() { + static thread_local auto large_cells = [] { + auto id = generate_legacy_id(NAME, LARGE_CELLS); + return schema_builder(NAME, LARGE_CELLS, id) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + .with_column("sstable_name", utf8_type, column_kind::clustering_key) + // We want the larger cells first, so use reversed_type_impl + .with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) + .with_column("partition_key", utf8_type, column_kind::clustering_key) + .with_column("clustering_key", utf8_type, column_kind::clustering_key) + .with_column("column_name", utf8_type, column_kind::clustering_key) + .with_column("compaction_time", timestamp_type) + .set_comment("cells larger than specified threshold") + .with_version(generate_schema_version(id)) + .build(); + }(); + return large_cells; +} + namespace v3 { schema_ptr batches() { @@ -1691,7 +1711,7 @@ std::vector all_tables() { r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(), peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), - sstable_activity(), size_estimates(), large_partitions(), large_rows(), + sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(), v3::views_builds_in_progress(), v3::built_views(), v3::scylla_views_builds_in_progress(), v3::truncated(), diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index e264519a16..6680a9d1b2 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -89,6 +89,7 @@ static constexpr auto SSTABLE_ACTIVITY = "sstable_activity"; static constexpr auto SIZE_ESTIMATES = "size_estimates"; static constexpr auto LARGE_PARTITIONS = "large_partitions"; static constexpr auto LARGE_ROWS = "large_rows"; +static constexpr auto LARGE_CELLS = "large_cells"; namespace v3 { static constexpr auto BATCHES = "batches"; diff --git a/sstables/mc/writer.cc b/sstables/mc/writer.cc index 746f274b99..200850be8b 100644 --- a/sstables/mc/writer.cc +++ b/sstables/mc/writer.cc @@ -668,17 +668,18 @@ private: }; // Writes single atomic cell - void write_cell(bytes_ostream& writer, atomic_cell_view cell, const column_definition& cdef, + void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef, const row_time_properties& properties, bytes_view cell_path = {}); // Writes information about row liveness (formerly 'row marker') void write_liveness_info(bytes_ostream& writer, const row_marker& marker); // Writes a CQL collection (list, set or map) - void write_collection(bytes_ostream& writer, const column_definition& cdef, collection_mutation_view collection, + void write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key, const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion); - void write_cells(bytes_ostream& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion); + void write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, + const row& row_body, const row_time_properties& properties, bool has_complex_deletion); void write_row_body(bytes_ostream& writer, const clustering_row& row, bool has_complex_deletion); void write_static_row(const row& static_row); void collect_row_stats(uint64_t row_size, const clustering_key_prefix* clustering_key) { @@ -961,9 +962,10 @@ void writer::consume(tombstone t) { _tombstone_written = true; } -void writer::write_cell(bytes_ostream& writer, atomic_cell_view cell, const column_definition& cdef, - const row_time_properties& properties, bytes_view cell_path) { +void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, + const column_definition& cdef, const row_time_properties& properties, bytes_view cell_path) { + uint64_t current_pos = writer.size(); bool is_deleted = !cell.is_live(); bool has_value = !is_deleted && !cell.value().empty(); bool use_row_timestamp = (properties.timestamp == cell.timestamp()); @@ -1022,6 +1024,12 @@ void writer::write_cell(bytes_ostream& writer, atomic_cell_view cell, const colu } // Collect cell statistics + // We record collections in write_collection, so ignore them here + if (cdef.is_atomic()) { + uint64_t size = writer.size() - current_pos; + _cfg.large_data_handler->maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size); + } + _c_stats.update_timestamp(cell.timestamp()); if (is_deleted) { _c_stats.update_local_deletion_time_and_tombstone_histogram(cell.deletion_time()); @@ -1066,8 +1074,10 @@ void writer::write_liveness_info(bytes_ostream& writer, const row_marker& marker } } -void writer::write_collection(bytes_ostream& writer, const column_definition& cdef, - collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) { +void writer::write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key, + const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties, + bool has_complex_deletion) { + uint64_t current_pos = writer.size(); auto& ctype = *static_pointer_cast(cdef.type); collection.data.with_linearized([&] (bytes_view collection_bv) { auto mview = ctype.deserialize_mutation_form(collection_bv); @@ -1082,19 +1092,21 @@ void writer::write_collection(bytes_ostream& writer, const column_definition& cd } for (const auto& [cell_path, cell]: mview.cells) { ++_c_stats.cells_count; - write_cell(writer, cell, cdef, properties, cell_path); + write_cell(writer, clustering_key, cell, cdef, properties, cell_path); } }); + uint64_t size = writer.size() - current_pos; + _cfg.large_data_handler->maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size); } -void writer::write_cells(bytes_ostream& writer, column_kind kind, const row& row_body, +void writer::write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion) { // Note that missing columns are written based on the whole set of regular columns as defined by schema. // This differs from Origin where all updated columns are tracked and the set of filled columns of a row // is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases // but still valid. write_missing_columns(writer, kind == column_kind::static_column ? _static_columns : _regular_columns, row_body); - row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) { + row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion, clustering_key] (column_id id, const atomic_cell_or_collection& c) { auto&& column_definition = _schema.column_at(kind, id); if (!column_definition.is_atomic()) { _collections.push_back({&column_definition, c}); @@ -1103,11 +1115,11 @@ void writer::write_cells(bytes_ostream& writer, column_kind kind, const row& row atomic_cell_view cell = c.as_atomic_cell(column_definition); ++_c_stats.cells_count; ++_c_stats.column_count; - write_cell(writer, cell, column_definition, properties); + write_cell(writer, clustering_key, cell, column_definition, properties); }); for (const auto& col: _collections) { - write_collection(writer, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion); + write_collection(writer, clustering_key, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion); } _collections.clear(); } @@ -1133,7 +1145,7 @@ void writer::write_row_body(bytes_ostream& writer, const clustering_row& row, bo } } - return write_cells(writer, column_kind::regular_column, row.cells(), properties, has_complex_deletion); + return write_cells(writer, &row.key(), column_kind::regular_column, row.cells(), properties, has_complex_deletion); } // Find if any collection in the row contains a collection-wide tombstone @@ -1174,7 +1186,7 @@ void writer::write_static_row(const row& static_row) { write(_sst.get_version(), *_data_writer, row_extended_flags::is_static); write_vint(_tmp_bufs, 0); // as the static row always comes first, the previous row size is always zero - write_cells(_tmp_bufs, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion); + write_cells(_tmp_bufs, nullptr, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion); write_vint(*_data_writer, _tmp_bufs.size()); flush_tmp_bufs(); @@ -1338,7 +1350,7 @@ stop_iteration writer::consume_end_of_partition() { // compute size of the current row. _c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset; - _cfg.large_data_handler->maybe_update_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get(); + _cfg.large_data_handler->maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get(); // update is about merging column_stats with the data being stored by collector. _sst.get_metadata_collector().update(std::move(_c_stats)); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 9c5f06e527..c70717d939 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2118,7 +2118,7 @@ stop_iteration components_writer::consume_end_of_partition() { // compute size of the current row. _sst._c_stats.partition_size = _out.offset() - _sst._c_stats.start_offset; - _large_data_handler->maybe_update_large_partitions(_sst, *_partition_key, _sst._c_stats.partition_size).get(); + _large_data_handler->maybe_record_large_partitions(_sst, *_partition_key, _sst._c_stats.partition_size).get(); // update is about merging column_stats with the data being stored by collector. _sst._collector.update(std::move(_sst._c_stats)); @@ -3144,28 +3144,20 @@ sstable::unlink() } static future<> -maybe_delete_large_data_entry(shared_sstable sst, const db::large_data_handler& large_data_handler) +maybe_delete_large_data_entry(shared_sstable sst, db::large_data_handler& large_data_handler) { auto name = sst->get_filename(); - return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size()) - .then_wrapped([name = std::move(name)] (future<> f) { - if (f.failed()) { - // Just log and ignore failures to delete large data entries. - // They are not critical to the operation of the database. - sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, f.get_exception()); - } - return make_ready_future<>(); - }); + return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size()); } static future<> -delete_sstable_and_maybe_large_data_entries(shared_sstable sst, const db::large_data_handler& large_data_handler) +delete_sstable_and_maybe_large_data_entries(shared_sstable sst, db::large_data_handler& large_data_handler) { return when_all(sst->unlink(), maybe_delete_large_data_entry(sst, large_data_handler)).discard_result(); } future<> -delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler) { +delete_atomically(std::vector ssts, db::large_data_handler& large_data_handler) { return seastar::async([ssts = std::move(ssts), &large_data_handler] { sstring sstdir; min_max_tracker gen_tracker; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7aff9026f7..f9de6333e5 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -873,7 +873,7 @@ future<> await_background_jobs_on_all_shards(); // until all shards agree it can be deleted. // // This function only solves the second problem for now. -future<> delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler); +future<> delete_atomically(std::vector ssts, db::large_data_handler& large_data_handler); future<> replay_pending_delete_log(sstring log_file); struct index_sampling_state { diff --git a/tests/cql_query_test.cc b/tests/cql_query_test.cc index ff76f38082..93945bec9a 100644 --- a/tests/cql_query_test.cc +++ b/tests/cql_query_test.cc @@ -52,9 +52,10 @@ SEASTAR_TEST_CASE(test_large_partitions) { return do_with_cql_env([](cql_test_env& e) { return make_ready_future<>(); }, cfg); } -SEASTAR_THREAD_TEST_CASE(test_large_rows) { +SEASTAR_THREAD_TEST_CASE(test_large_data) { db::config cfg{}; cfg.compaction_large_row_warning_threshold_mb(1); + cfg.compaction_large_cell_warning_threshold_mb(1); do_with_cql_env([](cql_test_env& e) { e.execute_cql("create table tbl (a int, b text, primary key (a))").get(); sstring blob(1024*1024, 'x'); @@ -82,6 +83,13 @@ SEASTAR_THREAD_TEST_CASE(test_large_rows) { BOOST_REQUIRE_EQUAL(row_size_bytes.size(), 8); long row_size = read_be(reinterpret_cast(&row_size_bytes[0])); BOOST_REQUIRE(row_size > 1024*1024 && row_size < 1025*1024); + + // Check that it was added to system.large_cells too + assert_that(e.execute_cql("select partition_key, column_name from system.large_cells where table_name = 'tbl' allow filtering;").get0()) + .is_rows() + .with_size(1) + .with_row({"44", "b", "tbl"}); + return make_ready_future<>(); }, cfg).get(); } diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index 98d427c909..d651f09c95 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -4977,7 +4977,7 @@ struct large_row_handler : public db::large_data_handler { callback_t callback; large_row_handler(uint64_t threshold, callback_t callback) - : large_data_handler(std::numeric_limits::max(), threshold) + : large_data_handler(std::numeric_limits::max(), threshold, std::numeric_limits::max()) , callback(std::move(callback)) {} virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, @@ -4987,15 +4987,17 @@ struct large_row_handler : public db::large_data_handler { return make_ready_future<>(); } - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override { + return make_ready_future<>(); + } + + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override { return make_ready_future<>(); } - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override { - return make_ready_future<>(); - } - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override { + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view) const override { return make_ready_future<>(); } };