sstables: fix sigsegv in write code

c_stats is a thread local variable, which was being re-allocated
for every thrift row. It's intended to keep track of its stats.
Shlomi reported the sigsegv while running cassandra-stress, and
bisect pointed to the commit that introduce write of statistics.

After some investigation, I realized that the problem was about
c_stats, which was being explicitly deallocated by our code, and
that the implicit deallocator would find it after freed.

In addition to the problem above, Avi realized that our code to
write statistics was broken when concurrently writing sstables,
given that the same c_stats would be used for both.

To fix both problems, c_stats and functions to write sstables
were made members of the class sstable.

Reported-by: Shlomi Livne <shlomi@cloudius-systems.com>
Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
This commit is contained in:
Raphael S. Carvalho
2015-06-02 18:41:43 -03:00
parent 83d4b962ff
commit 3eec6d7135
3 changed files with 55 additions and 49 deletions

View File

@@ -65,6 +65,10 @@ struct column_stats {
has_legacy_counter_shards = false;
}
void reset() {
*this = column_stats();
}
void update_min_timestamp(uint64_t potential_min) {
min_timestamp = std::min(min_timestamp, potential_min);
}

View File

@@ -17,7 +17,6 @@
#include "types.hh"
#include "sstables.hh"
#include "compress.hh"
#include "metadata_collector.hh"
#include <boost/algorithm/string.hpp>
namespace sstables {
@@ -938,20 +937,14 @@ future<> sstable::store() {
});
}
// FIXME: It's terrible to have column_stats as a global variable because of
// bad design, but that was needed so as not to pass column_stats for every
// function that ends up updating any of its fields.
// column_stats is used to keep track of statistics for each row.
static __thread column_stats* c_stats;
// @clustering_key: it's expected that clustering key is already in its composite form.
// NOTE: empty clustering key means that there is no clustering key.
static future<> write_column_name(file_writer& out, const composite& clustering_key, const std::vector<bytes_view>& column_names) {
future<> sstable::write_column_name(file_writer& out, const composite& clustering_key, const std::vector<bytes_view>& column_names) {
// FIXME: min_components and max_components also keep track of clustering
// prefix, so we must merge clustering_key and column_names somehow and
// pass the result to the functions below.
column_name_helper::min_components(c_stats->min_column_names, column_names);
column_name_helper::max_components(c_stats->max_column_names, column_names);
column_name_helper::min_components(_c_stats.min_column_names, column_names);
column_name_helper::max_components(_c_stats.max_column_names, column_names);
// FIXME: This code assumes name is always composite, but it wouldn't if "WITH COMPACT STORAGE"
// was defined in the schema, for example.
@@ -961,9 +954,9 @@ static future<> write_column_name(file_writer& out, const composite& clustering_
});
}
static future<> write_static_column_name(file_writer& out, const schema& schema, const std::vector<bytes_view>& column_names) {
column_name_helper::min_components(c_stats->min_column_names, column_names);
column_name_helper::max_components(c_stats->max_column_names, column_names);
future<> sstable::write_static_column_name(file_writer& out, const schema& schema, const std::vector<bytes_view>& column_names) {
column_name_helper::min_components(_c_stats.min_column_names, column_names);
column_name_helper::max_components(_c_stats.max_column_names, column_names);
return do_with(composite::from_exploded(column_names), [&out, &schema] (composite& c) {
return do_with(composite::static_prefix(schema), [&out, &c] (composite& sp) {
@@ -973,19 +966,19 @@ static future<> write_static_column_name(file_writer& out, const schema& schema,
});
}
static inline void update_cell_stats(uint64_t timestamp) {
c_stats->update_min_timestamp(timestamp);
c_stats->update_max_timestamp(timestamp);
c_stats->column_count++;
static inline void update_cell_stats(column_stats& c_stats, uint64_t timestamp) {
c_stats.update_min_timestamp(timestamp);
c_stats.update_max_timestamp(timestamp);
c_stats.column_count++;
}
// Intended to write all cell components that follow column name.
static future<> write_cell(file_writer& out, atomic_cell_view cell) {
future<> sstable::write_cell(file_writer& out, atomic_cell_view cell) {
// FIXME: range tombstone and counter cells aren't supported yet.
uint64_t timestamp = cell.timestamp();
update_cell_stats(timestamp);
update_cell_stats(_c_stats, timestamp);
if (cell.is_live_and_has_ttl()) {
// expiring cell
@@ -1005,7 +998,7 @@ static future<> write_cell(file_writer& out, atomic_cell_view cell) {
uint32_t deletion_time_size = sizeof(uint32_t);
uint32_t deletion_time = cell.deletion_time().time_since_epoch().count();
c_stats->tombstone_histogram.update(deletion_time);
_c_stats.tombstone_histogram.update(deletion_time);
return write(out, mask, timestamp, deletion_time_size, deletion_time);
} else {
@@ -1020,19 +1013,19 @@ static future<> write_cell(file_writer& out, atomic_cell_view cell) {
}
}
static future<> write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key) {
future<> sstable::write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key) {
// Missing created_at (api::missing_timestamp) means no row marker.
if (clustered_row.row().created_at() == api::missing_timestamp) {
return make_ready_future<>();
}
// Write row mark cell to the beginning of clustered row.
return write_column_name(out, clustering_key, { bytes_view() }).then([&out, &clustered_row] {
return this->write_column_name(out, clustering_key, { bytes_view() }).then([&out, &clustered_row, this] {
column_mask mask = column_mask::none;
uint64_t timestamp = clustered_row.row().created_at();
uint32_t value_length = 0;
update_cell_stats(timestamp);
update_cell_stats(_c_stats, timestamp);
return write(out, mask, timestamp, value_length);
});
@@ -1040,17 +1033,17 @@ static future<> write_row_marker(file_writer& out, const rows_entry& clustered_r
// write_datafile_clustered_row() is about writing a clustered_row to data file according to SSTables format.
// clustered_row contains a set of cells sharing the same clustering key.
static future<> write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row) {
future<> sstable::write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row) {
auto clustering_key = composite::from_clustering_key(*schema, clustered_row.key());
return do_with(std::move(clustering_key), [&out, schema, &clustered_row] (auto& clustering_key) {
return write_row_marker(out, clustered_row, clustering_key).then(
[&out, &clustered_row, schema, &clustering_key] {
return do_with(std::move(clustering_key), [&out, schema, &clustered_row, this] (auto& clustering_key) {
return this->write_row_marker(out, clustered_row, clustering_key).then(
[&out, &clustered_row, schema, &clustering_key, this] {
// FIXME: Before writing cells, range tombstone must be written if the row has any (deletable_row::t).
assert(!clustered_row.row().deleted_at());
// Write all cells of a partition's row.
return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key] (auto& value) {
return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key, this] (auto& value) {
auto column_id = value.first;
auto&& column_definition = schema->regular_column_at(column_id);
// non atomic cell isn't supported yet. atomic cell maps to a single trift cell.
@@ -1062,16 +1055,16 @@ static future<> write_clustered_row(file_writer& out, schema_ptr schema, const r
atomic_cell_view cell = value.second.as_atomic_cell();
const bytes& column_name = column_definition.name();
return write_column_name(out, clustering_key, { bytes_view(column_name) }).then([&out, cell] {
return write_cell(out, cell);
return this->write_column_name(out, clustering_key, { bytes_view(column_name) }).then([&out, cell, this] {
return this->write_cell(out, cell);
});
});
});
});
}
static future<> write_static_row(file_writer& out, schema_ptr schema, const row& static_row) {
return do_for_each(static_row, [&out, schema] (auto& value) {
future<> sstable::write_static_row(file_writer& out, schema_ptr schema, const row& static_row) {
return do_for_each(static_row, [&out, schema, this] (auto& value) {
auto column_id = value.first;
auto&& column_definition = schema->static_column_at(column_id);
if (!column_definition.is_atomic()) {
@@ -1079,8 +1072,8 @@ static future<> write_static_row(file_writer& out, schema_ptr schema, const row&
}
assert(column_definition.is_static());
atomic_cell_view cell = value.second.as_atomic_cell();
return write_static_column_name(out, *schema, { bytes_view(column_definition.name()) }).then([&out, cell] {
return write_cell(out, cell);
return this->write_static_column_name(out, *schema, { bytes_view(column_definition.name()) }).then([&out, cell, this] {
return this->write_cell(out, cell);
});
});
}
@@ -1206,10 +1199,9 @@ future<> sstable::write_components(const memtable& mt) {
return do_for_each(mt.all_partitions(),
[w, index, &mt, collector, this] (const std::pair<const dht::decorated_key, mutation_partition>& partition_entry) {
// Initialize column stats and set current index of data to later compute row size.
c_stats = new column_stats();
// FIXME: it's likely that we need to set both sstable_level and repaired_at at this point.
c_stats->start_offset = w->offset();
// Set current index of data to later compute row size.
_c_stats.start_offset = w->offset();
return do_with(key::from_partition_key(*mt.schema(), partition_entry.first._key),
[w, index, &partition_entry, this] (auto& partition_key) {
@@ -1226,7 +1218,7 @@ future<> sstable::write_components(const memtable& mt) {
// Write partition key into data file.
return write(*w, p_key);
});
}).then([w, &partition_entry] {
}).then([w, &partition_entry, this] {
auto tombstone = partition_entry.second.partition_tombstone();
deletion_time d;
@@ -1234,10 +1226,10 @@ future<> sstable::write_components(const memtable& mt) {
d.local_deletion_time = tombstone.deletion_time.time_since_epoch().count();
d.marked_for_delete_at = tombstone.timestamp;
c_stats->tombstone_histogram.update(d.local_deletion_time);
c_stats->update_max_local_deletion_time(d.local_deletion_time);
c_stats->update_min_timestamp(d.marked_for_delete_at);
c_stats->update_max_timestamp(d.marked_for_delete_at);
_c_stats.tombstone_histogram.update(d.local_deletion_time);
_c_stats.update_max_local_deletion_time(d.local_deletion_time);
_c_stats.update_min_timestamp(d.marked_for_delete_at);
_c_stats.update_max_timestamp(d.marked_for_delete_at);
} else {
// Default values for live, undeleted rows.
d.local_deletion_time = std::numeric_limits<int32_t>::max();
@@ -1248,14 +1240,14 @@ future<> sstable::write_components(const memtable& mt) {
return write(*w, d);
});
});
}).then([w, &mt, &partition_entry] {
}).then([w, &mt, &partition_entry, this] {
auto& partition = partition_entry.second;
auto& static_row = partition.static_row();
return write_static_row(*w, mt.schema(), static_row).then([w, &mt, &partition] {
return write_static_row(*w, mt.schema(), static_row).then([w, &mt, &partition, this] {
// Write all CQL rows from a given mutation partition.
return do_for_each(partition.clustered_rows(), [w, &mt] (const rows_entry& clustered_row) {
return do_for_each(partition.clustered_rows(), [w, &mt, this] (const rows_entry& clustered_row) {
return write_clustered_row(*w, mt.schema(), clustered_row);
}).then([w] {
// end_of_row is appended to the end of each partition.
@@ -1263,12 +1255,12 @@ future<> sstable::write_components(const memtable& mt) {
return write(*w, end_of_row);
});
});
}).then([w, collector] {
}).then([w, collector, this] {
// compute size of the current row.
c_stats->row_size = w->offset() - c_stats->start_offset;
_c_stats.row_size = w->offset() - _c_stats.start_offset;
// update is about merging column_stats with the data being stored by collector.
collector->update(*c_stats);
delete c_stats;
collector->update(_c_stats);
_c_stats.reset();
return make_ready_future<>();
});
}).then([this, w] {

View File

@@ -24,6 +24,7 @@
#include "utils/i_filter.hh"
#include "core/stream.hh"
#include "writer.hh"
#include "metadata_collector.hh"
namespace sstables {
@@ -125,6 +126,7 @@ private:
utils::filter_ptr _filter;
summary _summary;
statistics _statistics;
column_stats _c_stats;
lw_shared_ptr<file> _index_file;
lw_shared_ptr<file> _data_file;
size_t _data_file_size;
@@ -200,6 +202,14 @@ private:
// FIXME: pending on Bloom filter implementation
bool filter_has_key(const key& key) { return _filter->is_present(bytes_view(key)); }
bool filter_has_key(const schema& s, const dht::decorated_key& dk) { return filter_has_key(key::from_partition_key(s, dk._key)); }
// NOTE: functions used to generate sstable components.
future<> write_row_marker(file_writer& out, const rows_entry& clustered_row, const composite& clustering_key);
future<> write_clustered_row(file_writer& out, schema_ptr schema, const rows_entry& clustered_row);
future<> write_static_row(file_writer& out, schema_ptr schema, const row& static_row);
future<> write_cell(file_writer& out, atomic_cell_view cell);
future<> write_column_name(file_writer& out, const composite& clustering_key, const std::vector<bytes_view>& column_names);
future<> write_static_column_name(file_writer& out, const schema& schema, const std::vector<bytes_view>& column_names);
public:
// Allow the test cases from sstable_test.cc to test private methods. We use
// a placeholder to avoid cluttering this class too much. The sstable_test class