From ffd8886da9f22ceeac4ad15a6728cc06cbb0ed46 Mon Sep 17 00:00:00 2001 From: Vladimir Krivopalov Date: Tue, 15 May 2018 11:13:45 -0700 Subject: [PATCH] sstables: Support writing counters for SSTables 3.x. Signed-off-by: Vladimir Krivopalov --- sstables/sstables.cc | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 5bb74dcb34..332f11b63a 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1702,13 +1702,15 @@ static inline void update_cell_stats(column_stats& c_stats, api::timestamp_type c_stats.cells_count++; } -static void write_counter_value(counter_cell_view ccv, file_writer& out, sstable_version_types v) { +template +static void write_counter_value(counter_cell_view ccv, file_writer& out, sstable_version_types v, WriteLengthFunc&& write_len_func) { auto shard_count = ccv.shard_count(); static constexpr auto header_entry_size = sizeof(int16_t); static constexpr auto counter_shard_size = 32u; // counter_id: 16 + clock: 8 + value: 8 auto total_size = sizeof(int16_t) + shard_count * (header_entry_size + counter_shard_size); - write(v, out, int32_t(total_size), int16_t(shard_count)); + write_len_func(out, uint32_t(total_size)); + write(v, out, int16_t(shard_count)); for (auto i = 0u; i < shard_count; i++) { write(v, out, std::numeric_limits::min() + i); } @@ -1754,7 +1756,9 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d write(_version, out, mask, int64_t(0), timestamp); counter_cell_view ccv(cell); - write_counter_value(ccv, out, _version); + write_counter_value(ccv, out, _version, [v = _version] (file_writer& out, uint32_t value) { + return write(v, out, value); + }); _c_stats.update_local_deletion_time(std::numeric_limits::max()); } else if (cell.is_live_and_has_ttl()) { @@ -2852,7 +2856,15 @@ void sstable_writer_m::write_cell(file_writer& writer, atomic_cell_view cell, co } if (has_value) { - write_cell_value(writer, *cdef.type, cell.value()); + if (cdef.is_counter()) { + assert(!cell.is_counter_update()); + counter_cell_view ccv(cell); + write_counter_value(ccv, writer, sstable_version_types::mc, [] (file_writer& out, uint32_t value) { + return write_vint(out, value); + }); + } else { + write_cell_value(writer, *cdef.type, cell.value()); + } } // Collect cell statistics