diff --git a/sstables/metadata_collector.hh b/sstables/metadata_collector.hh index 5ad5017b24..cc666c7774 100644 --- a/sstables/metadata_collector.hh +++ b/sstables/metadata_collector.hh @@ -65,6 +65,10 @@ struct column_stats { has_legacy_counter_shards = false; } + void reset() { + *this = column_stats(); + } + void update_min_timestamp(uint64_t potential_min) { min_timestamp = std::min(min_timestamp, potential_min); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index f2feed9485..a2d74355bc 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -17,7 +17,6 @@ #include "types.hh" #include "sstables.hh" #include "compress.hh" -#include "metadata_collector.hh" #include namespace sstables { @@ -938,20 +937,14 @@ future<> sstable::store() { }); } -// FIXME: It's terrible to have column_stats as a global variable because of -// bad design, but that was needed so as not to pass column_stats for every -// function that ends up updating any of its fields. -// column_stats is used to keep track of statistics for each row. -static __thread column_stats* c_stats; - // @clustering_key: it's expected that clustering key is already in its composite form. // NOTE: empty clustering key means that there is no clustering key. -static future<> write_column_name(file_writer& out, const composite& clustering_key, const std::vector& column_names) { +future<> sstable::write_column_name(file_writer& out, const composite& clustering_key, const std::vector& column_names) { // FIXME: min_components and max_components also keep track of clustering // prefix, so we must merge clustering_key and column_names somehow and // pass the result to the functions below. - column_name_helper::min_components(c_stats->min_column_names, column_names); - column_name_helper::max_components(c_stats->max_column_names, column_names); + column_name_helper::min_components(_c_stats.min_column_names, column_names); + column_name_helper::max_components(_c_stats.max_column_names, column_names); // FIXME: This code assumes name is always composite, but it wouldn't if "WITH COMPACT STORAGE" // was defined in the schema, for example. @@ -961,9 +954,9 @@ static future<> write_column_name(file_writer& out, const composite& clustering_ }); } -static future<> write_static_column_name(file_writer& out, const schema& schema, const std::vector& column_names) { - column_name_helper::min_components(c_stats->min_column_names, column_names); - column_name_helper::max_components(c_stats->max_column_names, column_names); +future<> sstable::write_static_column_name(file_writer& out, const schema& schema, const std::vector& column_names) { + column_name_helper::min_components(_c_stats.min_column_names, column_names); + column_name_helper::max_components(_c_stats.max_column_names, column_names); return do_with(composite::from_exploded(column_names), [&out, &schema] (composite& c) { return do_with(composite::static_prefix(schema), [&out, &c] (composite& sp) { @@ -973,19 +966,19 @@ static future<> write_static_column_name(file_writer& out, const schema& schema, }); } -static inline void update_cell_stats(uint64_t timestamp) { - c_stats->update_min_timestamp(timestamp); - c_stats->update_max_timestamp(timestamp); - c_stats->column_count++; +static inline void update_cell_stats(column_stats& c_stats, uint64_t timestamp) { + c_stats.update_min_timestamp(timestamp); + c_stats.update_max_timestamp(timestamp); + c_stats.column_count++; } // Intended to write all cell components that follow column name. -static future<> write_cell(file_writer& out, atomic_cell_view cell) { +future<> sstable::write_cell(file_writer& out, atomic_cell_view cell) { // FIXME: range tombstone and counter cells aren't supported yet. uint64_t timestamp = cell.timestamp(); - update_cell_stats(timestamp); + update_cell_stats(_c_stats, timestamp); if (cell.is_live_and_has_ttl()) { // expiring cell @@ -1005,7 +998,7 @@ static future<> write_cell(file_writer& out, atomic_cell_view cell) { uint32_t deletion_time_size = sizeof(uint32_t); uint32_t deletion_time = cell.deletion_time().time_since_epoch().count(); - c_stats->tombstone_histogram.update(deletion_time); + _c_stats.tombstone_histogram.update(deletion_time); return write(out, mask, timestamp, deletion_time_size, deletion_time); } else { @@ -1020,19 +1013,19 @@ static future<> write_cell(file_writer& out, atomic_cell_view cell) { } } -static future<> write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key) { +future<> sstable::write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key) { // Missing created_at (api::missing_timestamp) means no row marker. if (clustered_row.row().created_at() == api::missing_timestamp) { return make_ready_future<>(); } // Write row mark cell to the beginning of clustered row. - return write_column_name(out, clustering_key, { bytes_view() }).then([&out, &clustered_row] { + return this->write_column_name(out, clustering_key, { bytes_view() }).then([&out, &clustered_row, this] { column_mask mask = column_mask::none; uint64_t timestamp = clustered_row.row().created_at(); uint32_t value_length = 0; - update_cell_stats(timestamp); + update_cell_stats(_c_stats, timestamp); return write(out, mask, timestamp, value_length); }); @@ -1040,17 +1033,17 @@ static future<> write_row_marker(file_writer& out, const rows_entry& clustered_r // write_datafile_clustered_row() is about writing a clustered_row to data file according to SSTables format. // clustered_row contains a set of cells sharing the same clustering key. -static future<> write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row) { +future<> sstable::write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row) { auto clustering_key = composite::from_clustering_key(*schema, clustered_row.key()); - return do_with(std::move(clustering_key), [&out, schema, &clustered_row] (auto& clustering_key) { - return write_row_marker(out, clustered_row, clustering_key).then( - [&out, &clustered_row, schema, &clustering_key] { + return do_with(std::move(clustering_key), [&out, schema, &clustered_row, this] (auto& clustering_key) { + return this->write_row_marker(out, clustered_row, clustering_key).then( + [&out, &clustered_row, schema, &clustering_key, this] { // FIXME: Before writing cells, range tombstone must be written if the row has any (deletable_row::t). assert(!clustered_row.row().deleted_at()); // Write all cells of a partition's row. - return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key] (auto& value) { + return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key, this] (auto& value) { auto column_id = value.first; auto&& column_definition = schema->regular_column_at(column_id); // non atomic cell isn't supported yet. atomic cell maps to a single trift cell. @@ -1062,16 +1055,16 @@ static future<> write_clustered_row(file_writer& out, schema_ptr schema, const r atomic_cell_view cell = value.second.as_atomic_cell(); const bytes& column_name = column_definition.name(); - return write_column_name(out, clustering_key, { bytes_view(column_name) }).then([&out, cell] { - return write_cell(out, cell); + return this->write_column_name(out, clustering_key, { bytes_view(column_name) }).then([&out, cell, this] { + return this->write_cell(out, cell); }); }); }); }); } -static future<> write_static_row(file_writer& out, schema_ptr schema, const row& static_row) { - return do_for_each(static_row, [&out, schema] (auto& value) { +future<> sstable::write_static_row(file_writer& out, schema_ptr schema, const row& static_row) { + return do_for_each(static_row, [&out, schema, this] (auto& value) { auto column_id = value.first; auto&& column_definition = schema->static_column_at(column_id); if (!column_definition.is_atomic()) { @@ -1079,8 +1072,8 @@ static future<> write_static_row(file_writer& out, schema_ptr schema, const row& } assert(column_definition.is_static()); atomic_cell_view cell = value.second.as_atomic_cell(); - return write_static_column_name(out, *schema, { bytes_view(column_definition.name()) }).then([&out, cell] { - return write_cell(out, cell); + return this->write_static_column_name(out, *schema, { bytes_view(column_definition.name()) }).then([&out, cell, this] { + return this->write_cell(out, cell); }); }); } @@ -1206,10 +1199,9 @@ future<> sstable::write_components(const memtable& mt) { return do_for_each(mt.all_partitions(), [w, index, &mt, collector, this] (const std::pair& partition_entry) { - // Initialize column stats and set current index of data to later compute row size. - c_stats = new column_stats(); // FIXME: it's likely that we need to set both sstable_level and repaired_at at this point. - c_stats->start_offset = w->offset(); + // Set current index of data to later compute row size. + _c_stats.start_offset = w->offset(); return do_with(key::from_partition_key(*mt.schema(), partition_entry.first._key), [w, index, &partition_entry, this] (auto& partition_key) { @@ -1226,7 +1218,7 @@ future<> sstable::write_components(const memtable& mt) { // Write partition key into data file. return write(*w, p_key); }); - }).then([w, &partition_entry] { + }).then([w, &partition_entry, this] { auto tombstone = partition_entry.second.partition_tombstone(); deletion_time d; @@ -1234,10 +1226,10 @@ future<> sstable::write_components(const memtable& mt) { d.local_deletion_time = tombstone.deletion_time.time_since_epoch().count(); d.marked_for_delete_at = tombstone.timestamp; - c_stats->tombstone_histogram.update(d.local_deletion_time); - c_stats->update_max_local_deletion_time(d.local_deletion_time); - c_stats->update_min_timestamp(d.marked_for_delete_at); - c_stats->update_max_timestamp(d.marked_for_delete_at); + _c_stats.tombstone_histogram.update(d.local_deletion_time); + _c_stats.update_max_local_deletion_time(d.local_deletion_time); + _c_stats.update_min_timestamp(d.marked_for_delete_at); + _c_stats.update_max_timestamp(d.marked_for_delete_at); } else { // Default values for live, undeleted rows. d.local_deletion_time = std::numeric_limits::max(); @@ -1248,14 +1240,14 @@ future<> sstable::write_components(const memtable& mt) { return write(*w, d); }); }); - }).then([w, &mt, &partition_entry] { + }).then([w, &mt, &partition_entry, this] { auto& partition = partition_entry.second; auto& static_row = partition.static_row(); - return write_static_row(*w, mt.schema(), static_row).then([w, &mt, &partition] { + return write_static_row(*w, mt.schema(), static_row).then([w, &mt, &partition, this] { // Write all CQL rows from a given mutation partition. - return do_for_each(partition.clustered_rows(), [w, &mt] (const rows_entry& clustered_row) { + return do_for_each(partition.clustered_rows(), [w, &mt, this] (const rows_entry& clustered_row) { return write_clustered_row(*w, mt.schema(), clustered_row); }).then([w] { // end_of_row is appended to the end of each partition. @@ -1263,12 +1255,12 @@ future<> sstable::write_components(const memtable& mt) { return write(*w, end_of_row); }); }); - }).then([w, collector] { + }).then([w, collector, this] { // compute size of the current row. - c_stats->row_size = w->offset() - c_stats->start_offset; + _c_stats.row_size = w->offset() - _c_stats.start_offset; // update is about merging column_stats with the data being stored by collector. - collector->update(*c_stats); - delete c_stats; + collector->update(_c_stats); + _c_stats.reset(); return make_ready_future<>(); }); }).then([this, w] { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index b53983d51d..73fbf5bb30 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -24,6 +24,7 @@ #include "utils/i_filter.hh" #include "core/stream.hh" #include "writer.hh" +#include "metadata_collector.hh" namespace sstables { @@ -125,6 +126,7 @@ private: utils::filter_ptr _filter; summary _summary; statistics _statistics; + column_stats _c_stats; lw_shared_ptr _index_file; lw_shared_ptr _data_file; size_t _data_file_size; @@ -200,6 +202,14 @@ private: // FIXME: pending on Bloom filter implementation bool filter_has_key(const key& key) { return _filter->is_present(bytes_view(key)); } bool filter_has_key(const schema& s, const dht::decorated_key& dk) { return filter_has_key(key::from_partition_key(s, dk._key)); } + + // NOTE: functions used to generate sstable components. + future<> write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key); + future<> write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row); + future<> write_static_row(file_writer& out, schema_ptr schema, const row& static_row); + future<> write_cell(file_writer& out, atomic_cell_view cell); + future<> write_column_name(file_writer& out, const composite& clustering_key, const std::vector& column_names); + future<> write_static_column_name(file_writer& out, const schema& schema, const std::vector& column_names); public: // Allow the test cases from sstable_test.cc to test private methods. We use // a placeholder to avoid cluttering this class too much. The sstable_test class