diff --git a/sstables/sstables.cc b/sstables/sstables.cc index f18e9c54f8..15ccb288a6 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1298,6 +1298,41 @@ future<> sstable::write_components(const memtable& mt) { } void sstable::do_write_components(const memtable& mt) { + bool checksum_file = true; + // FIXME: CRC component must only be present when compression isn't enabled. + if (checksum_file) { + _components.insert(component_type::CRC); + } + + // TODO: Add compression support by having a specialized output stream. + auto w = make_shared(_data_file, 4096, checksum_file); + auto index = make_shared(_index_file, 4096); + + prepare_summary(_summary, mt); + auto filter_fp_chance = mt.schema()->bloom_filter_fp_chance(); + if (filter_fp_chance != 1.0) { + _components.insert(component_type::Filter); + } + _filter = utils::i_filter::get_filter(mt.all_partitions().size(), filter_fp_chance); + + prepare_statistics(_statistics); + + // NOTE: Cassandra gets partition name by calling getClass().getCanonicalName() on + // partition class. + add_validation_metadata(_statistics, dht::global_partitioner().name(), filter_fp_chance); + auto collector = make_lw_shared(); + + // Iterate through CQL partitions, then CQL rows, then CQL columns. + // Each mt.all_partitions() entry is a set of clustered rows sharing the same partition key. + for (auto& partition_entry: mt.all_partitions()) { + // FIXME: it's likely that we need to set both sstable_level and repaired_at at this point. + // Set current index of data to later compute row size. + _c_stats.start_offset = w->offset(); + auto partition_key = key::from_partition_key(*mt.schema(), partition_entry.first._key); + // Maybe add summary entry into in-memory representation of summary file. + maybe_add_summary_entry(_summary, bytes_view(partition_key), index->offset()); + _filter->add(bytes_view(partition_key)); + } } future<> sstable::write_components_t(const memtable& mt) {