From 233a84eb6bbcafa2755e413ee42ef775d9ee80c2 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 29 Apr 2015 20:47:29 -0400 Subject: [PATCH] sstables: convert row to internal representation Signed-off-by: Glauber Costa --- sstables/partition.cc | 203 +++++++++++++++++++++++++++++++++++++++++- sstables/sstables.hh | 4 + 2 files changed, 206 insertions(+), 1 deletion(-) diff --git a/sstables/partition.cc b/sstables/partition.cc index a14a684bea..411b7d0046 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -1,11 +1,13 @@ /* * Copyright 2015 Cloudius Systems */ -#include "database.hh" +#include "mutation.hh" #include "sstables.hh" #include "types.hh" #include "core/future-util.hh" #include "key.hh" +#include "keys.hh" +#include "core/do_with.hh" #include "dht/i_partitioner.hh" @@ -71,4 +73,203 @@ int sstable::binary_search(const T& entries, const key& sk, const dht::token& to // much code to .hh template int sstable::binary_search<>(const std::vector& entries, const key& sk); template int sstable::binary_search<>(const std::vector& entries, const key& sk); + +class mp_row_consumer : public row_consumer { + schema_ptr _schema; + key _key; + + struct column { + bool is_static; + bytes_view col_name; + std::vector clustering; + bytes cell; + const column_definition *cdef; + + static constexpr size_t static_size = 2; + + bool check_static(bytes_view col) const { + static bytes static_row(static_size, 0xff); + return col.compare(0, static_size, static_row) == 0; + } + + bytes_view fix_static_name(bytes_view col) { + if (is_static) { + col.remove_prefix(static_size); + } + return col; + } + + column(schema_ptr schema, bytes_view col) + : is_static(check_static(col)) + , col_name(fix_static_name(col)) + , clustering(composite_view(col_name).explode()) + , cell(std::move(clustering.back())) + , cdef(schema->get_column_definition(cell)) + { + + if (is_static) { + for (auto& e: clustering) { + if (e.size() != 0) { + throw malformed_sstable_exception("Static row has clustering key information. I didn't expect that!"); + } + } + } + + if (cell.size() && !cdef) { + throw malformed_sstable_exception(sprint("schema does not contain colum: %s", cell.c_str())); + } + + clustering.pop_back(); + } + }; +public: + lw_shared_ptr mut; + + mp_row_consumer(const key& key, const schema_ptr _schema) + : _schema(_schema) + , _key(key) + , mut(make_lw_shared(partition_key::from_exploded(*_schema, key.explode(*_schema)), _schema)) + { } + + void validate_row_marker() { + if (_schema->is_dense()) { + throw malformed_sstable_exception("row marker found in dense table"); + } + } + + virtual void consume_row_start(bytes_view key, sstables::deletion_time deltime) override { + // FIXME: We should be doing more than that: We need to check the deletion time and propagate the tombstone information + auto k = bytes_view(_key); + if (key != k) { + throw malformed_sstable_exception(sprint("Key mismatch. Got %s while processing %s", to_hex(key).c_str(), to_hex(k).c_str())); + } + } + + virtual void consume_cell(bytes_view col_name, bytes_view value, uint64_t timestamp, uint32_t ttl, uint32_t expiration) override { + static bytes cql_row_marker(3, 0x0); + + // The row marker exists mainly so that one can create empty rows. It should not be present + // in dense tables. It serializes to \x0\x0\x0 and should yield an empty vector. + // FIXME: What to do with its timestamp ? We are not setting any row-wide timestamp in the mutation partition + if (col_name == cql_row_marker) { + validate_row_marker(); + return; + } + + struct column col(_schema, col_name); + + // FIXME: collections are different, but not yet handled. + if (col.clustering.size() > (_schema->clustering_key_type()->types().size() + 1)) { + throw malformed_sstable_exception("wrong number of clustering columns"); + } + + ttl_opt opt; + if (ttl) { + gc_clock::duration secs(expiration); + auto tp = gc_clock::time_point(secs); + if (tp < gc_clock::now()) { + consume_deleted_cell(col, timestamp, tp); + return; + } + + opt = ttl_opt(tp); + } else { + opt = {}; + } + + auto ac = atomic_cell::make_live(timestamp, opt, value); + + if (col.is_static) { + mut->set_static_cell(*(col.cdef), ac); + return; + } + + auto clustering_prefix = exploded_clustering_prefix(std::move(col.clustering)); + + if (col.cell.size() == 0) { + auto clustering_key = clustering_key::from_clustering_prefix(*_schema, clustering_prefix); + auto& dr = mut->partition().clustered_row(clustering_key); + dr.created_at = timestamp; + return; + } + + mut->set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac)); + } + + virtual void consume_deleted_cell(bytes_view col_name, sstables::deletion_time deltime) override { + struct column col(_schema, col_name); + gc_clock::duration secs(deltime.local_deletion_time); + + consume_deleted_cell(col, deltime.marked_for_delete_at, gc_clock::time_point(secs)); + } + + void consume_deleted_cell(column &col, uint64_t timestamp, gc_clock::time_point ttl) { + auto ac = atomic_cell::make_dead(timestamp, ttl); + + if (col.is_static) { + printf("Deleted static cell!\n"); + mut->set_static_cell(*(col.cdef), atomic_cell_or_collection(ac)); + } else { + printf("Deleted non static cell!\n"); + auto clustering_prefix = exploded_clustering_prefix(std::move(col.clustering)); + mut->set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac)); + } + } + virtual void consume_row_end() override { + } + + virtual void consume_range_tombstone( + bytes_view start_col, bytes_view end_col, + sstables::deletion_time deltime) override { + throw runtime_exception("Not implemented"); + } +}; + + +future> +sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) { + + assert(schema); + + auto& partitioner = dht::global_partitioner(); + auto token = partitioner.get_token(key_view(key)); + + if (!filter_has_key(token)) { + return make_ready_future>(); + } + + auto& summary = _summary; + auto summary_idx = binary_search(summary.entries, key, token); + if (summary_idx < 0) { + // binary search gives us the first index _greater_ than the key searched for, + // i.e., its insertion position + auto gt = (summary_idx + 1) * -1; + summary_idx = gt - 1; + } + if (summary_idx < 0) { + return make_ready_future>(); + } + + auto position = _summary.entries[summary_idx].position; + return read_indexes(position).then([this, schema, &key, token] (auto index_list) { + auto index_idx = this->binary_search(index_list, key, token); + if (index_idx < 0) { + return make_ready_future>(); + } + + auto position = index_list[index_idx].position; + size_t end; + if (size_t(index_idx + 1) < index_list.size()) { + end = index_list[index_idx + 1].position; + } else { + end = this->data_size(); + } + + return do_with(mp_row_consumer(key, schema), [this, position, end] (auto& c) { + return this->data_consume_rows_at_once(c, position, end).then([&c] { + return make_ready_future>(c.mut); + }); + }); + }); +} } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 131afb9acd..8b326c3e96 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -18,6 +18,8 @@ #include "compress.hh" #include "row.hh" #include "dht/i_partitioner.hh" +#include "schema.hh" +#include "database_fwd.hh" namespace sstables { class key; @@ -169,6 +171,8 @@ public: void set_generation(unsigned long generation) { _generation = generation; } + future> convert_row(schema_ptr schema, const key& k); + // Allow the test cases from sstable_test.cc to test private methods. We use // a placeholder to avoid cluttering this class too much. The sstable_test class // will then re-export as public every method it needs.