From f254664fe652f0119592ef63103a8ec41c532180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 28 Feb 2019 19:49:46 -0800 Subject: [PATCH 01/12] db: refactor large data deletion code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The code for deleting entries from system.large_partitions was almost a duplicate from the code for deleting entries from system.large_rows. This patch unifies the two, which also improves the error message when we fail to delete entries from system.large_partitions. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.cc | 15 ++++----------- db/large_data_handler.hh | 16 ++++++---------- tests/sstable_3_x_test.cc | 5 +---- 3 files changed, 11 insertions(+), 25 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index be2f152600..ab760ba2a4 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -64,13 +64,6 @@ future<> cql_table_large_data_handler::update_large_partitions(const schema& s, }); } -future<> cql_table_large_data_handler::delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const { - static const sstring req = format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?", db::system_keyspace::LARGE_PARTITIONS); - return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name).discard_result().handle_exception([](std::exception_ptr ep) { - large_data_logger.warn("Failed to drop entries from {}: {}", db::system_keyspace::LARGE_PARTITIONS, ep); - }); -} - future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const { static const sstring req = @@ -100,15 +93,15 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable }); } -future<> cql_table_large_data_handler::delete_large_rows_entries(const schema& s, const sstring& sstable_name) const { +future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const { static const sstring req = format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?", - db::system_keyspace::LARGE_ROWS); + large_table_name); return db::execute_cql(req, s.ks_name(), s.cf_name(), sstable_name) .discard_result() - .handle_exception([&s, sstable_name] (std::exception_ptr ep) { + .handle_exception([&s, sstable_name, large_table_name] (std::exception_ptr ep) { large_data_logger.warn("Failed to drop entries from {}: ks = {}, table = {}, sst = {} exception = {}", - db::system_keyspace::LARGE_ROWS, s.ks_name(), s.cf_name(), sstable_name, ep); + large_table_name, s.ks_name(), s.cf_name(), sstable_name, ep); }); } } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index cd992347e5..1e47e9d5f9 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -23,6 +23,7 @@ #include #include "schema.hh" +#include "system_keyspace.hh" namespace sstables { class sstable; @@ -71,11 +72,11 @@ public: assert(!_stopped); future<> large_partitions = make_ready_future<>(); if (__builtin_expect(data_size > _partition_threshold_bytes, false)) { - large_partitions = delete_large_partitions_entry(s, filename); + large_partitions = delete_large_data_entries(s, filename, db::system_keyspace::LARGE_PARTITIONS); } future<> large_rows = make_ready_future<>(); if (__builtin_expect(data_size > _row_threshold_bytes, false)) { - large_rows = delete_large_rows_entries(s, filename); + large_rows = delete_large_data_entries(s, filename, db::system_keyspace::LARGE_ROWS); } return when_all(std::move(large_partitions), std::move(large_rows)).discard_result(); } @@ -84,9 +85,8 @@ public: protected: virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const = 0; - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const = 0; + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const = 0; virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const = 0; - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const = 0; }; class cql_table_large_data_handler : public large_data_handler { @@ -99,9 +99,8 @@ public: protected: virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override; - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override; + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override; }; class nop_large_data_handler : public large_data_handler { @@ -112,7 +111,7 @@ public: return make_ready_future<>(); } - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override { + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override { return make_ready_future<>(); } @@ -120,9 +119,6 @@ public: const clustering_key_prefix* clustering_key, uint64_t row_size) const override { return make_ready_future<>(); } - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override { - return make_ready_future<>(); - } }; } diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index 98d427c909..23fc4aa56b 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -4992,10 +4992,7 @@ struct large_row_handler : public db::large_data_handler { return make_ready_future<>(); } - virtual future<> delete_large_rows_entries(const schema& s, const sstring& sstable_name) const override { - return make_ready_future<>(); - } - virtual future<> delete_large_partitions_entry(const schema& s, const sstring& sstable_name) const override { + virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view) const override { return make_ready_future<>(); } }; From d7f263d33448a182397e28c4f2c50b981876c32a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Fri, 1 Mar 2019 14:37:43 -0800 Subject: [PATCH 02/12] db: Rename (maybe_)?update_large_partitions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This renames it to record_large_partitions, which matches record_large_rows. It also changes the signature to be closer to record_large_rows. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.cc | 10 +++++----- db/large_data_handler.hh | 8 ++++---- sstables/mc/writer.cc | 2 +- sstables/sstables.cc | 2 +- tests/sstable_3_x_test.cc | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index ab760ba2a4..b7a2df1df5 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -27,13 +27,11 @@ namespace db { -future<> large_data_handler::maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { +future<> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { assert(!_stopped); if (partition_size > _partition_threshold_bytes) { ++_stats.partitions_bigger_than_threshold; - - const schema& s = *sst.get_schema(); - return update_large_partitions(s, sst.get_filename(), key, partition_size); + return record_large_partitions(sst, key, partition_size); } return make_ready_future<>(); } @@ -46,9 +44,11 @@ template static std::string key_to_str(const T& key, const schema& return oss.str(); } -future<> cql_table_large_data_handler::update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& key, uint64_t partition_size) const { +future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { static const sstring req = format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, partition_size, partition_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?) USING TTL 2592000", db::system_keyspace::LARGE_PARTITIONS); + const schema& s = *sst.get_schema(); + const auto sstable_name = sst.get_filename(); auto ks_name = s.ks_name(); auto cf_name = s.cf_name(); auto timestamp = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 1e47e9d5f9..3dfe49d868 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -66,7 +66,7 @@ public: return make_ready_future<>(); } - future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; + future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const { assert(!_stopped); @@ -86,7 +86,7 @@ public: protected: virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const = 0; virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const = 0; - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const = 0; + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const = 0; }; class cql_table_large_data_handler : public large_data_handler { @@ -98,7 +98,7 @@ public: : large_data_handler(partition_threshold_bytes, row_threshold_bytes) {} protected: - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override; + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override; virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; }; @@ -107,7 +107,7 @@ class nop_large_data_handler : public large_data_handler { public: nop_large_data_handler() : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max()) {} - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& partition_key, uint64_t partition_size) const override { + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override { return make_ready_future<>(); } diff --git a/sstables/mc/writer.cc b/sstables/mc/writer.cc index 746f274b99..ee84d3b528 100644 --- a/sstables/mc/writer.cc +++ b/sstables/mc/writer.cc @@ -1338,7 +1338,7 @@ stop_iteration writer::consume_end_of_partition() { // compute size of the current row. _c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset; - _cfg.large_data_handler->maybe_update_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get(); + _cfg.large_data_handler->maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get(); // update is about merging column_stats with the data being stored by collector. _sst.get_metadata_collector().update(std::move(_c_stats)); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 9c5f06e527..25fd4767e0 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2118,7 +2118,7 @@ stop_iteration components_writer::consume_end_of_partition() { // compute size of the current row. _sst._c_stats.partition_size = _out.offset() - _sst._c_stats.start_offset; - _large_data_handler->maybe_update_large_partitions(_sst, *_partition_key, _sst._c_stats.partition_size).get(); + _large_data_handler->maybe_record_large_partitions(_sst, *_partition_key, _sst._c_stats.partition_size).get(); // update is about merging column_stats with the data being stored by collector. _sst._collector.update(std::move(_sst._c_stats)); diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index 23fc4aa56b..57f4a42fbf 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -4987,7 +4987,7 @@ struct large_row_handler : public db::large_data_handler { return make_ready_future<>(); } - virtual future<> update_large_partitions(const schema& s, const sstring& sstable_name, + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override { return make_ready_future<>(); } From f3089bf3d1eda2be01e3af07fe1fde6fb08b62d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Fri, 1 Mar 2019 15:09:46 -0800 Subject: [PATCH 03/12] db: refactor a try_record helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We had almost identical error handling for large_partitions and large_rows. Refactor in preparation for large_cells. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.cc | 73 ++++++++++++++++++---------------------- db/large_data_handler.hh | 3 -- 2 files changed, 33 insertions(+), 43 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index b7a2df1df5..c62359c8a3 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -36,7 +36,7 @@ future<> large_data_handler::maybe_record_large_partitions(const sstables::sstab return make_ready_future<>(); } -logging::logger cql_table_large_data_handler::large_data_logger("large_data"); +static logging::logger large_data_logger("large_data"); template static std::string key_to_str(const T& key, const schema& s) { std::ostringstream oss; @@ -44,53 +44,46 @@ template static std::string key_to_str(const T& key, const schema& return oss.str(); } -future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { - static const sstring req = format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, partition_size, partition_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?) USING TTL 2592000", - db::system_keyspace::LARGE_PARTITIONS); - const schema& s = *sst.get_schema(); - const auto sstable_name = sst.get_filename(); +template +static future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size, + std::string_view desc, std::string_view extra_path, const std::vector &extra_fields, Args&&... args) { + sstring extra_fields_str; + sstring extra_values; + for (std::string_view field : extra_fields) { + extra_fields_str += format(", {}", field); + extra_values += ", ?"; + } + const sstring req = format("INSERT INTO system.large_{}s (keyspace_name, table_name, sstable_name, {}_size, partition_key, compaction_time{}) VALUES (?, ?, ?, ?, ?, ?{}) USING TTL 2592000", + large_table, large_table, extra_fields_str, extra_values); + const schema &s = *sst.get_schema(); auto ks_name = s.ks_name(); auto cf_name = s.cf_name(); + const auto sstable_name = sst.get_filename(); + std::string pk_str = key_to_str(partition_key.to_partition_key(s), s); auto timestamp = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); - auto key_str = key_to_str(key.to_partition_key(s), s); - return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(partition_size), key_str, timestamp) - .then_wrapped([ks_name, cf_name, key_str, partition_size](auto&& f) { - try { - f.get(); - large_data_logger.warn("Writing large partition {}/{}:{} ({} bytes)", ks_name, cf_name, key_str, partition_size); - } catch (...) { - large_data_logger.warn("Failed to update {}: {}", db::system_keyspace::LARGE_PARTITIONS, std::current_exception()); - } - }); + large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes)", desc, ks_name, cf_name, pk_str, extra_path, size); + return db::execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...) + .discard_result() + .handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) { + large_data_logger.warn("Failed to add a record to system.large_{}s: ks = {}, table = {}, sst = {} exception = {}", + large_table, ks_name, cf_name, sstable_name, ep); + }); +} + +future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { + return try_record("partition", sst, key, int64_t(partition_size), "partition", "", {}); } future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const { - static const sstring req = - format("INSERT INTO system.{} (keyspace_name, table_name, sstable_name, row_size, partition_key, " - "clustering_key, compaction_time) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL 2592000", - db::system_keyspace::LARGE_ROWS); - auto f = [clustering_key, &partition_key, &sst, row_size] { + static const std::vector extra_fields{"clustering_key"}; + if (clustering_key) { const schema &s = *sst.get_schema(); - auto sstable_name = sst.get_filename(); - auto ks_name = s.ks_name(); - auto cf_name = s.cf_name(); - auto timestamp = std::chrono::duration_cast(db_clock::now().time_since_epoch()).count(); - std::string pk_str = key_to_str(partition_key.to_partition_key(s), s); - if (clustering_key) { - std::string ck_str = key_to_str(*clustering_key, s); - large_data_logger.warn("Writing large row {}/{}: {} {} ({} bytes)", ks_name, cf_name, pk_str, ck_str, row_size); - return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(row_size), pk_str, ck_str, timestamp); - } else { - large_data_logger.warn("Writing large static row {}/{}: {} ({} bytes)", ks_name, cf_name, pk_str, row_size); - return db::execute_cql(req, ks_name, cf_name, sstable_name, int64_t(row_size), pk_str, nullptr, timestamp); - } - }; - return f().discard_result().handle_exception([&sst] (std::exception_ptr ep) { - const schema &s = *sst.get_schema(); - large_data_logger.warn("Failed to add a record to {}: ks = {}, table = {}, sst = {} exception = {}", - db::system_keyspace::LARGE_ROWS, s.ks_name(), s.cf_name(), sst.get_filename(), ep); - }); + std::string ck_str = key_to_str(*clustering_key, s); + return try_record("row", sst, partition_key, int64_t(row_size), "row", ck_str, extra_fields, ck_str); + } else { + return try_record("row", sst, partition_key, int64_t(row_size), "static row", "", extra_fields, nullptr); + } } future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const { diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 3dfe49d868..18416138ad 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -90,9 +90,6 @@ protected: }; class cql_table_large_data_handler : public large_data_handler { -protected: - static logging::logger large_data_logger; - public: explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes) : large_data_handler(partition_threshold_bytes, row_threshold_bytes) {} From a17a936882d14c3c53ab93bc5ea574dac0056564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 7 Mar 2019 15:04:50 -0800 Subject: [PATCH 04/12] large_data_handler: assert it is not used after stop() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This should have been changed in the patch db: stop the commit log after the tables during shutdown But unfortunately I missed it then. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.hh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 18416138ad..c7ab53ded0 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -60,7 +60,8 @@ public: future<> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const { - if (__builtin_expect(!_stopped && row_size > _row_threshold_bytes, false)) { + assert(!stopped()); + if (__builtin_expect(row_size > _row_threshold_bytes, false)) { return record_large_rows(sst, partition_key, clustering_key, row_size); } return make_ready_future<>(); From 5fcb3ff2d7be6de82fb0e1d4b890682f7439b21f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 7 Mar 2019 15:22:07 -0800 Subject: [PATCH 05/12] db: don't use _stopped directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This gives flexibility in how it is implemented. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.cc | 2 +- db/large_data_handler.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index c62359c8a3..ab1d53f36a 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -28,7 +28,7 @@ namespace db { future<> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { - assert(!_stopped); + assert(!stopped()); if (partition_size > _partition_threshold_bytes) { ++_stats.partitions_bigger_than_threshold; return record_large_partitions(sst, key, partition_size); diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index c7ab53ded0..3d4ea4816e 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -70,7 +70,7 @@ public: future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const { - assert(!_stopped); + assert(!stopped()); future<> large_partitions = make_ready_future<>(); if (__builtin_expect(data_size > _partition_threshold_bytes, false)) { large_partitions = delete_large_data_entries(s, filename, db::system_keyspace::LARGE_PARTITIONS); From 0b763ec19b22264e08ea18c27796e98fed57e5ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 7 Mar 2019 15:29:38 -0800 Subject: [PATCH 06/12] sstables: delete dead error handling code. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit maybe_delete_large_data_entries handles exceptions internally, so the code this patch deletes would never run. Signed-off-by: Rafael Ávila de Espíndola --- sstables/sstables.cc | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 25fd4767e0..fd79250ae8 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3147,15 +3147,7 @@ static future<> maybe_delete_large_data_entry(shared_sstable sst, const db::large_data_handler& large_data_handler) { auto name = sst->get_filename(); - return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size()) - .then_wrapped([name = std::move(name)] (future<> f) { - if (f.failed()) { - // Just log and ignore failures to delete large data entries. - // They are not critical to the operation of the database. - sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, f.get_exception()); - } - return make_ready_future<>(); - }); + return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size()); } static future<> From 989ab33507a0d806f05238fd3526cca9f6472bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 7 Mar 2019 15:45:49 -0800 Subject: [PATCH 07/12] large_data_handler: Remove const from a few functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These will use a member semaphore variable in a followup patch, so they cannot be const. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.cc | 2 +- db/large_data_handler.hh | 6 +++--- sstables/sstables.cc | 6 +++--- sstables/sstables.hh | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index ab1d53f36a..d97e6b97b6 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -27,7 +27,7 @@ namespace db { -future<> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { +future<> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) { assert(!stopped()); if (partition_size > _partition_threshold_bytes) { ++_stats.partitions_bigger_than_threshold; diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 3d4ea4816e..ac1baafc12 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -59,7 +59,7 @@ public: } future<> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size) const { + const clustering_key_prefix* clustering_key, uint64_t row_size) { assert(!stopped()); if (__builtin_expect(row_size > _row_threshold_bytes, false)) { return record_large_rows(sst, partition_key, clustering_key, row_size); @@ -67,9 +67,9 @@ public: return make_ready_future<>(); } - future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; + future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size); - future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const { + future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) { assert(!stopped()); future<> large_partitions = make_ready_future<>(); if (__builtin_expect(data_size > _partition_threshold_bytes, false)) { diff --git a/sstables/sstables.cc b/sstables/sstables.cc index fd79250ae8..c70717d939 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3144,20 +3144,20 @@ sstable::unlink() } static future<> -maybe_delete_large_data_entry(shared_sstable sst, const db::large_data_handler& large_data_handler) +maybe_delete_large_data_entry(shared_sstable sst, db::large_data_handler& large_data_handler) { auto name = sst->get_filename(); return large_data_handler.maybe_delete_large_data_entries(*sst->get_schema(), name, sst->data_size()); } static future<> -delete_sstable_and_maybe_large_data_entries(shared_sstable sst, const db::large_data_handler& large_data_handler) +delete_sstable_and_maybe_large_data_entries(shared_sstable sst, db::large_data_handler& large_data_handler) { return when_all(sst->unlink(), maybe_delete_large_data_entry(sst, large_data_handler)).discard_result(); } future<> -delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler) { +delete_atomically(std::vector ssts, db::large_data_handler& large_data_handler) { return seastar::async([ssts = std::move(ssts), &large_data_handler] { sstring sstdir; min_max_tracker gen_tracker; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7aff9026f7..f9de6333e5 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -873,7 +873,7 @@ future<> await_background_jobs_on_all_shards(); // until all shards agree it can be deleted. // // This function only solves the second problem for now. -future<> delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler); +future<> delete_atomically(std::vector ssts, db::large_data_handler& large_data_handler); future<> replay_pending_delete_log(sstring log_file); struct index_sampling_state { From 54b856e5e4a39bfc4c44a964bfeee75c9c1090bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 7 Mar 2019 15:49:44 -0800 Subject: [PATCH 08/12] large_data_handler: propagate a future out of stop() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stop() will close a semaphore in a followup patch, so it needs to return a future. Signed-off-by: Rafael Ávila de Espíndola --- database.cc | 6 ++++-- database.hh | 2 +- db/large_data_handler.hh | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/database.cc b/database.cc index 0ff7757161..d71eca3531 100644 --- a/database.cc +++ b/database.cc @@ -1665,12 +1665,14 @@ future<> stop_database(sharded& sdb) { }); }).then([&sdb] { return sdb.invoke_on_all([](database& db) { - db.stop_large_data_handler(); + return db.stop_large_data_handler(); }); }); } -void database::stop_large_data_handler() { _large_data_handler->stop(); } +future<> database::stop_large_data_handler() { + return _large_data_handler->stop(); +} future<> database::stop() { diff --git a/database.hh b/database.hh index dd872e25ab..29f39844b7 100644 --- a/database.hh +++ b/database.hh @@ -1373,7 +1373,7 @@ public: future<> stop(); future<> close_tables(table_kind kind_to_close); - void stop_large_data_handler(); + future<> stop_large_data_handler(); unsigned shard_of(const dht::token& t); unsigned shard_of(const mutation& m); unsigned shard_of(const frozen_mutation& m); diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index ac1baafc12..b48d41a3f8 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -53,9 +53,10 @@ public: // Once large_data_handler is stopped it will ignore requests to update system.large_partitions. Any futures already // returned must be waited for by the caller. bool stopped() const { return _stopped; } - void stop() { + future<> stop() { assert(!stopped()); _stopped = true; + return make_ready_future<>(); } future<> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, From 8b4ae9516850965200604d4151e74e1bc1595659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 7 Mar 2019 15:56:56 -0800 Subject: [PATCH 09/12] large_data_handler: Run large data recording in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With this changes the futures returned by large_data_handler will not normally wait for entries to be written to system.large_rows or system.large_partitions. We use a semaphore to bound how behind system.large_* table updates can get. This should avoid delaying sstables writes in the common case, which is more relevant once we warn of large cells since the the default threshold will be just 1MB. Note that there is no ordering between the various maybe_record_* and maybe_delete_large_data_entries requests. This means that we can end up with a stale entry that is only removed once the TTL expires. Signed-off-by: Rafael Ávila de Espíndola --- db/large_data_handler.cc | 4 +++- db/large_data_handler.hh | 37 +++++++++++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index d97e6b97b6..fa5ae5a6aa 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -31,7 +31,9 @@ future<> large_data_handler::maybe_record_large_partitions(const sstables::sstab assert(!stopped()); if (partition_size > _partition_threshold_bytes) { ++_stats.partitions_bigger_than_threshold; - return record_large_partitions(sst, key, partition_size); + return with_sem([&sst, &key, partition_size, this] { + return record_large_partitions(sst, key, partition_size); + }); } return make_ready_future<>(); } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index b48d41a3f8..a932679c01 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -39,6 +39,26 @@ public: }; private: + // Assuming: + // * there is at most one log entry every 1MB + // * the average latency of the log is 4ms (depends on the load) + // * we aim to sustain 1GB/s of write bandwidth + // We need a concurrency of: + // C = (1GB/s / 1MB) * 4ms = 1k/s * 4ms = 4 + // 16 should be enough for everybody. + static const size_t max_concurrency = 16; + semaphore _sem{max_concurrency}; + + // A convenience function for using the above semaphore. Unlike the global with_semaphore, this will not wait on the + // future returned by func. The objective is for the future returned by func to run in parallel with whatever the + // caller is doing, but limit how far behind we can get. + template + future<> with_sem(Func&& func) { + return get_units(_sem, 1).then([func = std::forward(func)] (auto units) mutable { + func().finally([units = std::move(units)] {}); + }); + } + bool _stopped = false; uint64_t _partition_threshold_bytes; uint64_t _row_threshold_bytes; @@ -50,20 +70,21 @@ public: , _row_threshold_bytes(row_threshold_bytes) {} virtual ~large_data_handler() {} - // Once large_data_handler is stopped it will ignore requests to update system.large_partitions. Any futures already - // returned must be waited for by the caller. + // Once large_data_handler is stopped no further updates will be accepted. bool stopped() const { return _stopped; } future<> stop() { assert(!stopped()); _stopped = true; - return make_ready_future<>(); + return _sem.wait(max_concurrency); } future<> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) { assert(!stopped()); if (__builtin_expect(row_size > _row_threshold_bytes, false)) { - return record_large_rows(sst, partition_key, clustering_key, row_size); + return with_sem([&sst, &partition_key, clustering_key, row_size, this] { + return record_large_rows(sst, partition_key, clustering_key, row_size); + }); } return make_ready_future<>(); } @@ -74,11 +95,15 @@ public: assert(!stopped()); future<> large_partitions = make_ready_future<>(); if (__builtin_expect(data_size > _partition_threshold_bytes, false)) { - large_partitions = delete_large_data_entries(s, filename, db::system_keyspace::LARGE_PARTITIONS); + large_partitions = with_sem([&s, &filename, this] { + return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_PARTITIONS); + }); } future<> large_rows = make_ready_future<>(); if (__builtin_expect(data_size > _row_threshold_bytes, false)) { - large_rows = delete_large_data_entries(s, filename, db::system_keyspace::LARGE_ROWS); + large_rows = with_sem([&s, &filename, this] { + return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_ROWS); + }); } return when_all(std::move(large_partitions), std::move(large_rows)).discard_result(); } From d17083b4838e6f220d0eeeea2c21344db2b50828 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 28 Feb 2019 18:52:20 -0800 Subject: [PATCH 10/12] Create a system.large_cells table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is analogous to the system.large_rows table, but holds individual cells, so it also needs the column name. Signed-off-by: Rafael Ávila de Espíndola --- db/system_keyspace.cc | 22 +++++++++++++++++++++- db/system_keyspace.hh | 1 + 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index f7260fda52..034811f383 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -498,6 +498,26 @@ static schema_ptr large_rows() { return large_rows; } +static schema_ptr large_cells() { + static thread_local auto large_cells = [] { + auto id = generate_legacy_id(NAME, LARGE_CELLS); + return schema_builder(NAME, LARGE_CELLS, id) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + .with_column("sstable_name", utf8_type, column_kind::clustering_key) + // We want the larger cells first, so use reversed_type_impl + .with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) + .with_column("partition_key", utf8_type, column_kind::clustering_key) + .with_column("clustering_key", utf8_type, column_kind::clustering_key) + .with_column("column_name", utf8_type, column_kind::clustering_key) + .with_column("compaction_time", timestamp_type) + .set_comment("cells larger than specified threshold") + .with_version(generate_schema_version(id)) + .build(); + }(); + return large_cells; +} + namespace v3 { schema_ptr batches() { @@ -1691,7 +1711,7 @@ std::vector all_tables() { r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(), peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), - sstable_activity(), size_estimates(), large_partitions(), large_rows(), + sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(), v3::views_builds_in_progress(), v3::built_views(), v3::scylla_views_builds_in_progress(), v3::truncated(), diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index e264519a16..6680a9d1b2 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -89,6 +89,7 @@ static constexpr auto SSTABLE_ACTIVITY = "sstable_activity"; static constexpr auto SIZE_ESTIMATES = "size_estimates"; static constexpr auto LARGE_PARTITIONS = "large_partitions"; static constexpr auto LARGE_ROWS = "large_rows"; +static constexpr auto LARGE_CELLS = "large_cells"; namespace v3 { static constexpr auto BATCHES = "batches"; From 63251b66c1647715d8b889c2cbfacb1a03d89dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 28 Feb 2019 15:47:07 -0800 Subject: [PATCH 11/12] db: Record large cells MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #4234. Large cells are now recorded in system.large_cells. Signed-off-by: Rafael Ávila de Espíndola --- database.cc | 3 ++- db/config.hh | 3 +++ db/large_data_handler.cc | 14 +++++++++++++ db/large_data_handler.hh | 41 +++++++++++++++++++++++++++++++++------ sstables/mc/writer.cc | 40 +++++++++++++++++++++++++------------- tests/sstable_3_x_test.cc | 7 ++++++- 6 files changed, 86 insertions(+), 22 deletions(-) diff --git a/database.cc b/database.cc index d71eca3531..ad95d14558 100644 --- a/database.cc +++ b/database.cc @@ -221,7 +221,8 @@ database::database(const db::config& cfg, database_config dbcfg) , _enable_incremental_backups(cfg.incremental_backups()) , _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04) , _large_data_handler(std::make_unique(_cfg->compaction_large_partition_warning_threshold_mb()*1024*1024, - _cfg->compaction_large_row_warning_threshold_mb()*1024*1024)) + _cfg->compaction_large_row_warning_threshold_mb()*1024*1024, + _cfg->compaction_large_cell_warning_threshold_mb()*1024*1024)) , _nop_large_data_handler(std::make_unique()) , _result_memory_limiter(dbcfg.available_memory / 10) , _data_listeners(std::make_unique(*this)) diff --git a/db/config.hh b/db/config.hh index 1aa056e745..1f5fdb6350 100644 --- a/db/config.hh +++ b/db/config.hh @@ -231,6 +231,9 @@ public: val(compaction_large_row_warning_threshold_mb, uint32_t, 10, Used, \ "Log a warning when writing rows larger than this value" \ ) \ + val(compaction_large_cell_warning_threshold_mb, uint32_t, 1, Used, \ + "Log a warning when writing cells larger than this value" \ + ) \ /* Common memtable settings */ \ val(memtable_total_space_in_mb, uint32_t, 0, Invalid, \ "Specifies the total memory used for all memtables on a node. This replaces the per-table storage settings memtable_operations_in_millions and memtable_throughput_in_mb." \ diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index fa5ae5a6aa..c7ccebec98 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -76,6 +76,20 @@ future<> cql_table_large_data_handler::record_large_partitions(const sstables::s return try_record("partition", sst, key, int64_t(partition_size), "partition", "", {}); } +future<> cql_table_large_data_handler::record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const { + auto column_name = cdef.name_as_text(); + std::string_view cell_type = cdef.is_atomic() ? "cell" : "collection"; + static const std::vector extra_fields{"clustering_key", "column_name"}; + if (clustering_key) { + const schema &s = *sst.get_schema(); + auto ck_str = key_to_str(*clustering_key, s); + return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, format("{} {}", ck_str, column_name), extra_fields, ck_str, column_name); + } else { + return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, column_name, extra_fields, nullptr, column_name); + } +} + future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const { static const std::vector extra_fields{"clustering_key"}; diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index a932679c01..d216fcec22 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -62,12 +62,14 @@ private: bool _stopped = false; uint64_t _partition_threshold_bytes; uint64_t _row_threshold_bytes; + uint64_t _cell_threshold_bytes; mutable large_data_handler::stats _stats; public: - explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes) + explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes) : _partition_threshold_bytes(partition_threshold_bytes) - , _row_threshold_bytes(row_threshold_bytes) {} + , _row_threshold_bytes(row_threshold_bytes) + , _cell_threshold_bytes(cell_threshold_bytes) {} virtual ~large_data_handler() {} // Once large_data_handler is stopped no further updates will be accepted. @@ -91,6 +93,17 @@ public: future<> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size); + future<> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) { + assert(!stopped()); + if (__builtin_expect(cell_size > _cell_threshold_bytes, false)) { + return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, this] { + return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size); + }); + } + return make_ready_future<>(); + } + future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) { assert(!stopped()); future<> large_partitions = make_ready_future<>(); @@ -105,12 +118,20 @@ public: return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_ROWS); }); } - return when_all(std::move(large_partitions), std::move(large_rows)).discard_result(); + future<> large_cells = make_ready_future<>(); + if (__builtin_expect(data_size > _cell_threshold_bytes, false)) { + large_cells = with_sem([&s, &filename, this] { + return delete_large_data_entries(s, filename, db::system_keyspace::LARGE_CELLS); + }); + } + return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result(); } const large_data_handler::stats& stats() const { return _stats; } protected: + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const = 0; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const = 0; virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const = 0; virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const = 0; @@ -118,19 +139,22 @@ protected: class cql_table_large_data_handler : public large_data_handler { public: - explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes) - : large_data_handler(partition_threshold_bytes, row_threshold_bytes) {} + explicit cql_table_large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes) + : large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes) {} protected: virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override; virtual future<> delete_large_data_entries(const schema& s, const sstring& sstable_name, std::string_view large_table_name) const override; + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; }; class nop_large_data_handler : public large_data_handler { public: nop_large_data_handler() - : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max()) {} + : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max()) {} virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override { return make_ready_future<>(); } @@ -139,6 +163,11 @@ public: return make_ready_future<>(); } + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override { + return make_ready_future<>(); + } + virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override { return make_ready_future<>(); diff --git a/sstables/mc/writer.cc b/sstables/mc/writer.cc index ee84d3b528..200850be8b 100644 --- a/sstables/mc/writer.cc +++ b/sstables/mc/writer.cc @@ -668,17 +668,18 @@ private: }; // Writes single atomic cell - void write_cell(bytes_ostream& writer, atomic_cell_view cell, const column_definition& cdef, + void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef, const row_time_properties& properties, bytes_view cell_path = {}); // Writes information about row liveness (formerly 'row marker') void write_liveness_info(bytes_ostream& writer, const row_marker& marker); // Writes a CQL collection (list, set or map) - void write_collection(bytes_ostream& writer, const column_definition& cdef, collection_mutation_view collection, + void write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key, const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion); - void write_cells(bytes_ostream& writer, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion); + void write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, + const row& row_body, const row_time_properties& properties, bool has_complex_deletion); void write_row_body(bytes_ostream& writer, const clustering_row& row, bool has_complex_deletion); void write_static_row(const row& static_row); void collect_row_stats(uint64_t row_size, const clustering_key_prefix* clustering_key) { @@ -961,9 +962,10 @@ void writer::consume(tombstone t) { _tombstone_written = true; } -void writer::write_cell(bytes_ostream& writer, atomic_cell_view cell, const column_definition& cdef, - const row_time_properties& properties, bytes_view cell_path) { +void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, + const column_definition& cdef, const row_time_properties& properties, bytes_view cell_path) { + uint64_t current_pos = writer.size(); bool is_deleted = !cell.is_live(); bool has_value = !is_deleted && !cell.value().empty(); bool use_row_timestamp = (properties.timestamp == cell.timestamp()); @@ -1022,6 +1024,12 @@ void writer::write_cell(bytes_ostream& writer, atomic_cell_view cell, const colu } // Collect cell statistics + // We record collections in write_collection, so ignore them here + if (cdef.is_atomic()) { + uint64_t size = writer.size() - current_pos; + _cfg.large_data_handler->maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size); + } + _c_stats.update_timestamp(cell.timestamp()); if (is_deleted) { _c_stats.update_local_deletion_time_and_tombstone_histogram(cell.deletion_time()); @@ -1066,8 +1074,10 @@ void writer::write_liveness_info(bytes_ostream& writer, const row_marker& marker } } -void writer::write_collection(bytes_ostream& writer, const column_definition& cdef, - collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) { +void writer::write_collection(bytes_ostream& writer, const clustering_key_prefix* clustering_key, + const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties, + bool has_complex_deletion) { + uint64_t current_pos = writer.size(); auto& ctype = *static_pointer_cast(cdef.type); collection.data.with_linearized([&] (bytes_view collection_bv) { auto mview = ctype.deserialize_mutation_form(collection_bv); @@ -1082,19 +1092,21 @@ void writer::write_collection(bytes_ostream& writer, const column_definition& cd } for (const auto& [cell_path, cell]: mview.cells) { ++_c_stats.cells_count; - write_cell(writer, cell, cdef, properties, cell_path); + write_cell(writer, clustering_key, cell, cdef, properties, cell_path); } }); + uint64_t size = writer.size() - current_pos; + _cfg.large_data_handler->maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size); } -void writer::write_cells(bytes_ostream& writer, column_kind kind, const row& row_body, +void writer::write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, const row& row_body, const row_time_properties& properties, bool has_complex_deletion) { // Note that missing columns are written based on the whole set of regular columns as defined by schema. // This differs from Origin where all updated columns are tracked and the set of filled columns of a row // is compared with the set of all columns filled in the memtable. So our encoding may be less optimal in some cases // but still valid. write_missing_columns(writer, kind == column_kind::static_column ? _static_columns : _regular_columns, row_body); - row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion] (column_id id, const atomic_cell_or_collection& c) { + row_body.for_each_cell([this, &writer, kind, &properties, has_complex_deletion, clustering_key] (column_id id, const atomic_cell_or_collection& c) { auto&& column_definition = _schema.column_at(kind, id); if (!column_definition.is_atomic()) { _collections.push_back({&column_definition, c}); @@ -1103,11 +1115,11 @@ void writer::write_cells(bytes_ostream& writer, column_kind kind, const row& row atomic_cell_view cell = c.as_atomic_cell(column_definition); ++_c_stats.cells_count; ++_c_stats.column_count; - write_cell(writer, cell, column_definition, properties); + write_cell(writer, clustering_key, cell, column_definition, properties); }); for (const auto& col: _collections) { - write_collection(writer, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion); + write_collection(writer, clustering_key, *col.cdef, col.collection.get().as_collection_mutation(), properties, has_complex_deletion); } _collections.clear(); } @@ -1133,7 +1145,7 @@ void writer::write_row_body(bytes_ostream& writer, const clustering_row& row, bo } } - return write_cells(writer, column_kind::regular_column, row.cells(), properties, has_complex_deletion); + return write_cells(writer, &row.key(), column_kind::regular_column, row.cells(), properties, has_complex_deletion); } // Find if any collection in the row contains a collection-wide tombstone @@ -1174,7 +1186,7 @@ void writer::write_static_row(const row& static_row) { write(_sst.get_version(), *_data_writer, row_extended_flags::is_static); write_vint(_tmp_bufs, 0); // as the static row always comes first, the previous row size is always zero - write_cells(_tmp_bufs, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion); + write_cells(_tmp_bufs, nullptr, column_kind::static_column, static_row, row_time_properties{}, has_complex_deletion); write_vint(*_data_writer, _tmp_bufs.size()); flush_tmp_bufs(); diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index 57f4a42fbf..d651f09c95 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -4977,7 +4977,7 @@ struct large_row_handler : public db::large_data_handler { callback_t callback; large_row_handler(uint64_t threshold, callback_t callback) - : large_data_handler(std::numeric_limits::max(), threshold) + : large_data_handler(std::numeric_limits::max(), threshold, std::numeric_limits::max()) , callback(std::move(callback)) {} virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, @@ -4987,6 +4987,11 @@ struct large_row_handler : public db::large_data_handler { return make_ready_future<>(); } + virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) const override { + return make_ready_future<>(); + } + virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const override { return make_ready_future<>(); From f983570ac8e77e7c561c64f68d90ebbcc1207c77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Fri, 1 Mar 2019 16:14:44 -0800 Subject: [PATCH 12/12] Add a test for large cells MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Rafael Ávila de Espíndola --- tests/cql_query_test.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/cql_query_test.cc b/tests/cql_query_test.cc index ff76f38082..93945bec9a 100644 --- a/tests/cql_query_test.cc +++ b/tests/cql_query_test.cc @@ -52,9 +52,10 @@ SEASTAR_TEST_CASE(test_large_partitions) { return do_with_cql_env([](cql_test_env& e) { return make_ready_future<>(); }, cfg); } -SEASTAR_THREAD_TEST_CASE(test_large_rows) { +SEASTAR_THREAD_TEST_CASE(test_large_data) { db::config cfg{}; cfg.compaction_large_row_warning_threshold_mb(1); + cfg.compaction_large_cell_warning_threshold_mb(1); do_with_cql_env([](cql_test_env& e) { e.execute_cql("create table tbl (a int, b text, primary key (a))").get(); sstring blob(1024*1024, 'x'); @@ -82,6 +83,13 @@ SEASTAR_THREAD_TEST_CASE(test_large_rows) { BOOST_REQUIRE_EQUAL(row_size_bytes.size(), 8); long row_size = read_be(reinterpret_cast(&row_size_bytes[0])); BOOST_REQUIRE(row_size > 1024*1024 && row_size < 1025*1024); + + // Check that it was added to system.large_cells too + assert_that(e.execute_cql("select partition_key, column_name from system.large_cells where table_name = 'tbl' allow filtering;").get0()) + .is_rows() + .with_size(1) + .with_row({"44", "b", "tbl"}); + return make_ready_future<>(); }, cfg).get(); }