mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-24 02:20:37 +00:00
sstables: convert row to internal representation
Signed-off-by: Glauber Costa <glommer@cloudius-systems.com>
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*/
|
||||
#include "database.hh"
|
||||
#include "mutation.hh"
|
||||
#include "sstables.hh"
|
||||
#include "types.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "key.hh"
|
||||
#include "keys.hh"
|
||||
#include "core/do_with.hh"
|
||||
|
||||
#include "dht/i_partitioner.hh"
|
||||
|
||||
@@ -71,4 +73,203 @@ int sstable::binary_search(const T& entries, const key& sk, const dht::token& to
|
||||
// much code to .hh
|
||||
template int sstable::binary_search<>(const std::vector<summary_entry>& entries, const key& sk);
|
||||
template int sstable::binary_search<>(const std::vector<index_entry>& entries, const key& sk);
|
||||
|
||||
class mp_row_consumer : public row_consumer {
|
||||
schema_ptr _schema;
|
||||
key _key;
|
||||
|
||||
struct column {
|
||||
bool is_static;
|
||||
bytes_view col_name;
|
||||
std::vector<bytes> clustering;
|
||||
bytes cell;
|
||||
const column_definition *cdef;
|
||||
|
||||
static constexpr size_t static_size = 2;
|
||||
|
||||
bool check_static(bytes_view col) const {
|
||||
static bytes static_row(static_size, 0xff);
|
||||
return col.compare(0, static_size, static_row) == 0;
|
||||
}
|
||||
|
||||
bytes_view fix_static_name(bytes_view col) {
|
||||
if (is_static) {
|
||||
col.remove_prefix(static_size);
|
||||
}
|
||||
return col;
|
||||
}
|
||||
|
||||
column(schema_ptr schema, bytes_view col)
|
||||
: is_static(check_static(col))
|
||||
, col_name(fix_static_name(col))
|
||||
, clustering(composite_view(col_name).explode())
|
||||
, cell(std::move(clustering.back()))
|
||||
, cdef(schema->get_column_definition(cell))
|
||||
{
|
||||
|
||||
if (is_static) {
|
||||
for (auto& e: clustering) {
|
||||
if (e.size() != 0) {
|
||||
throw malformed_sstable_exception("Static row has clustering key information. I didn't expect that!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (cell.size() && !cdef) {
|
||||
throw malformed_sstable_exception(sprint("schema does not contain colum: %s", cell.c_str()));
|
||||
}
|
||||
|
||||
clustering.pop_back();
|
||||
}
|
||||
};
|
||||
public:
|
||||
lw_shared_ptr<mutation> mut;
|
||||
|
||||
mp_row_consumer(const key& key, const schema_ptr _schema)
|
||||
: _schema(_schema)
|
||||
, _key(key)
|
||||
, mut(make_lw_shared<mutation>(partition_key::from_exploded(*_schema, key.explode(*_schema)), _schema))
|
||||
{ }
|
||||
|
||||
void validate_row_marker() {
|
||||
if (_schema->is_dense()) {
|
||||
throw malformed_sstable_exception("row marker found in dense table");
|
||||
}
|
||||
}
|
||||
|
||||
virtual void consume_row_start(bytes_view key, sstables::deletion_time deltime) override {
|
||||
// FIXME: We should be doing more than that: We need to check the deletion time and propagate the tombstone information
|
||||
auto k = bytes_view(_key);
|
||||
if (key != k) {
|
||||
throw malformed_sstable_exception(sprint("Key mismatch. Got %s while processing %s", to_hex(key).c_str(), to_hex(k).c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
virtual void consume_cell(bytes_view col_name, bytes_view value, uint64_t timestamp, uint32_t ttl, uint32_t expiration) override {
|
||||
static bytes cql_row_marker(3, 0x0);
|
||||
|
||||
// The row marker exists mainly so that one can create empty rows. It should not be present
|
||||
// in dense tables. It serializes to \x0\x0\x0 and should yield an empty vector.
|
||||
// FIXME: What to do with its timestamp ? We are not setting any row-wide timestamp in the mutation partition
|
||||
if (col_name == cql_row_marker) {
|
||||
validate_row_marker();
|
||||
return;
|
||||
}
|
||||
|
||||
struct column col(_schema, col_name);
|
||||
|
||||
// FIXME: collections are different, but not yet handled.
|
||||
if (col.clustering.size() > (_schema->clustering_key_type()->types().size() + 1)) {
|
||||
throw malformed_sstable_exception("wrong number of clustering columns");
|
||||
}
|
||||
|
||||
ttl_opt opt;
|
||||
if (ttl) {
|
||||
gc_clock::duration secs(expiration);
|
||||
auto tp = gc_clock::time_point(secs);
|
||||
if (tp < gc_clock::now()) {
|
||||
consume_deleted_cell(col, timestamp, tp);
|
||||
return;
|
||||
}
|
||||
|
||||
opt = ttl_opt(tp);
|
||||
} else {
|
||||
opt = {};
|
||||
}
|
||||
|
||||
auto ac = atomic_cell::make_live(timestamp, opt, value);
|
||||
|
||||
if (col.is_static) {
|
||||
mut->set_static_cell(*(col.cdef), ac);
|
||||
return;
|
||||
}
|
||||
|
||||
auto clustering_prefix = exploded_clustering_prefix(std::move(col.clustering));
|
||||
|
||||
if (col.cell.size() == 0) {
|
||||
auto clustering_key = clustering_key::from_clustering_prefix(*_schema, clustering_prefix);
|
||||
auto& dr = mut->partition().clustered_row(clustering_key);
|
||||
dr.created_at = timestamp;
|
||||
return;
|
||||
}
|
||||
|
||||
mut->set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac));
|
||||
}
|
||||
|
||||
virtual void consume_deleted_cell(bytes_view col_name, sstables::deletion_time deltime) override {
|
||||
struct column col(_schema, col_name);
|
||||
gc_clock::duration secs(deltime.local_deletion_time);
|
||||
|
||||
consume_deleted_cell(col, deltime.marked_for_delete_at, gc_clock::time_point(secs));
|
||||
}
|
||||
|
||||
void consume_deleted_cell(column &col, uint64_t timestamp, gc_clock::time_point ttl) {
|
||||
auto ac = atomic_cell::make_dead(timestamp, ttl);
|
||||
|
||||
if (col.is_static) {
|
||||
printf("Deleted static cell!\n");
|
||||
mut->set_static_cell(*(col.cdef), atomic_cell_or_collection(ac));
|
||||
} else {
|
||||
printf("Deleted non static cell!\n");
|
||||
auto clustering_prefix = exploded_clustering_prefix(std::move(col.clustering));
|
||||
mut->set_cell(clustering_prefix, *(col.cdef), atomic_cell_or_collection(ac));
|
||||
}
|
||||
}
|
||||
virtual void consume_row_end() override {
|
||||
}
|
||||
|
||||
virtual void consume_range_tombstone(
|
||||
bytes_view start_col, bytes_view end_col,
|
||||
sstables::deletion_time deltime) override {
|
||||
throw runtime_exception("Not implemented");
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
future<lw_shared_ptr<mutation>>
|
||||
sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) {
|
||||
|
||||
assert(schema);
|
||||
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
auto token = partitioner.get_token(key_view(key));
|
||||
|
||||
if (!filter_has_key(token)) {
|
||||
return make_ready_future<lw_shared_ptr<mutation>>();
|
||||
}
|
||||
|
||||
auto& summary = _summary;
|
||||
auto summary_idx = binary_search(summary.entries, key, token);
|
||||
if (summary_idx < 0) {
|
||||
// binary search gives us the first index _greater_ than the key searched for,
|
||||
// i.e., its insertion position
|
||||
auto gt = (summary_idx + 1) * -1;
|
||||
summary_idx = gt - 1;
|
||||
}
|
||||
if (summary_idx < 0) {
|
||||
return make_ready_future<lw_shared_ptr<mutation>>();
|
||||
}
|
||||
|
||||
auto position = _summary.entries[summary_idx].position;
|
||||
return read_indexes(position).then([this, schema, &key, token] (auto index_list) {
|
||||
auto index_idx = this->binary_search(index_list, key, token);
|
||||
if (index_idx < 0) {
|
||||
return make_ready_future<lw_shared_ptr<mutation>>();
|
||||
}
|
||||
|
||||
auto position = index_list[index_idx].position;
|
||||
size_t end;
|
||||
if (size_t(index_idx + 1) < index_list.size()) {
|
||||
end = index_list[index_idx + 1].position;
|
||||
} else {
|
||||
end = this->data_size();
|
||||
}
|
||||
|
||||
return do_with(mp_row_consumer(key, schema), [this, position, end] (auto& c) {
|
||||
return this->data_consume_rows_at_once(c, position, end).then([&c] {
|
||||
return make_ready_future<lw_shared_ptr<mutation>>(c.mut);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
#include "compress.hh"
|
||||
#include "row.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "schema.hh"
|
||||
#include "database_fwd.hh"
|
||||
|
||||
namespace sstables {
|
||||
class key;
|
||||
@@ -169,6 +171,8 @@ public:
|
||||
|
||||
void set_generation(unsigned long generation) { _generation = generation; }
|
||||
|
||||
future<lw_shared_ptr<mutation>> convert_row(schema_ptr schema, const key& k);
|
||||
|
||||
// Allow the test cases from sstable_test.cc to test private methods. We use
|
||||
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
||||
// will then re-export as public every method it needs.
|
||||
|
||||
Reference in New Issue
Block a user