mutation_partition: Allow caching cell hashes

We add storage to a row to hold the cached hashes of each individual
cell. We don't store the hash in each cell because that would a)
change the cell equality function, and b) require us to change a cell
in a potentially fragmented buffer.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2018-01-13 18:21:11 +00:00
parent 71ba99d53e
commit 99a3e3aa76
3 changed files with 145 additions and 57 deletions

View File

@@ -66,4 +66,6 @@ public:
}
};
using default_hasher = xx_hasher;
}

View File

@@ -640,6 +640,28 @@ static api::timestamp_type hash_row_slice(query::digester& hasher,
return max;
}
cell_hash_opt row::cell_hash_for(column_id id) const {
if (_type == storage_type::vector) {
return id < max_vector_size && _storage.vector.present.test(id) ? _storage.vector.v[id].hash : cell_hash_opt();
}
auto it = _storage.set.find(id, cell_entry::compare());
if (it != _storage.set.end()) {
return it->hash();
}
return cell_hash_opt();
}
void row::prepare_hash(const schema& s, column_kind kind) const {
// const to avoid removing const qualifiers on the read path
for_each_cell([&s, kind] (column_id id, const cell_and_hash& c_a_h) {
if (!c_a_h.hash) {
query::default_hasher cellh;
feed_hash(cellh, c_a_h.cell, s.column_at(kind, id));
c_a_h.hash = cell_hash{cellh.finalize_uint64()};
}
});
}
template<typename RowWriter>
static void get_compacted_row_slice(const schema& s,
const query::partition_slice& slice,
@@ -955,31 +977,36 @@ mutation_partition mutation_partition::sliced(const schema& s, const query::clus
return p;
}
static
void
apply_monotonically(const column_definition& def, atomic_cell_or_collection& dst, atomic_cell_or_collection& src) {
apply_monotonically(const column_definition& def, cell_and_hash& dst,
atomic_cell_or_collection& src, cell_hash_opt src_hash) {
// Must be run via with_linearized_managed_bytes() context, but assume it is
// provided via an upper layer
if (def.is_atomic()) {
if (def.is_counter()) {
counter_cell_view::apply_reversibly(dst, src); // FIXME: Optimize
} else if (compare_atomic_cell_for_merge(dst.as_atomic_cell(), src.as_atomic_cell()) < 0) {
std::swap(dst, src);
counter_cell_view::apply_reversibly(dst.cell, src); // FIXME: Optimize
dst.hash = { };
} else if (compare_atomic_cell_for_merge(dst.cell.as_atomic_cell(), src.as_atomic_cell()) < 0) {
std::swap(dst.cell, src);
dst.hash = std::move(src_hash);
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(def.type);
dst = ct->merge(dst.as_collection_mutation(), src.as_collection_mutation());
dst.cell = ct->merge(dst.cell.as_collection_mutation(), src.as_collection_mutation());
dst.hash = { };
}
}
void
row::apply(const column_definition& column, const atomic_cell_or_collection& value) {
row::apply(const column_definition& column, const atomic_cell_or_collection& value, cell_hash_opt hash) {
auto tmp = value;
apply_monotonically(column, std::move(tmp));
apply_monotonically(column, std::move(tmp), std::move(hash));
}
void
row::apply(const column_definition& column, atomic_cell_or_collection&& value) {
apply_monotonically(column, std::move(value));
row::apply(const column_definition& column, atomic_cell_or_collection&& value, cell_hash_opt hash) {
apply_monotonically(column, std::move(value), std::move(hash));
}
template<typename Func>
@@ -997,7 +1024,7 @@ void row::consume_with(Func&& func) {
auto del = current_deleter<cell_entry>();
auto i = _storage.set.begin();
while (i != _storage.set.end()) {
func(i->id(), i->cell());
func(i->id(), i->get_cell_and_hash());
i = _storage.set.erase_and_dispose(i, del);
--_size;
}
@@ -1005,7 +1032,7 @@ void row::consume_with(Func&& func) {
}
void
row::apply_monotonically(const column_definition& column, atomic_cell_or_collection&& value) {
row::apply_monotonically(const column_definition& column, atomic_cell_or_collection&& value, cell_hash_opt hash) {
static_assert(std::is_nothrow_move_constructible<atomic_cell_or_collection>::value
&& std::is_nothrow_move_assignable<atomic_cell_or_collection>::value,
"noexcept required for atomicity");
@@ -1015,15 +1042,15 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
if (_type == storage_type::vector && id < max_vector_size) {
if (id >= _storage.vector.v.size()) {
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(std::move(value));
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), std::move(hash)});
_storage.vector.present.set(id);
_size++;
} else if (!bool(_storage.vector.v[id])) {
_storage.vector.v[id] = std::move(value);
} else if (auto& cell_and_hash = _storage.vector.v[id]; !bool(cell_and_hash.cell)) {
cell_and_hash = { std::move(value), std::move(hash) };
_storage.vector.present.set(id);
_size++;
} else {
::apply_monotonically(column, _storage.vector.v[id], value);
::apply_monotonically(column, cell_and_hash, value, std::move(hash));
}
} else {
if (_type == storage_type::vector) {
@@ -1034,9 +1061,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
cell_entry* e = current_allocator().construct<cell_entry>(id);
_storage.set.insert(i, *e);
_size++;
e->_cell = std::move(value);
e->_cell_and_hash = { std::move(value), std::move(hash) };
} else {
::apply_monotonically(column, i->cell(), value);
::apply_monotonically(column, i->_cell_and_hash, value, std::move(hash));
}
}
}
@@ -1045,7 +1072,7 @@ void
row::append_cell(column_id id, atomic_cell_or_collection value) {
if (_type == storage_type::vector && id < max_vector_size) {
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(std::move(value));
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
_storage.vector.present.set(id);
} else {
if (_type == storage_type::vector) {
@@ -1057,8 +1084,8 @@ row::append_cell(column_id id, atomic_cell_or_collection value) {
_size++;
}
const atomic_cell_or_collection*
row::find_cell(column_id id) const {
const cell_and_hash*
row::find_cell_and_hash(column_id id) const {
if (_type == storage_type::vector) {
if (id >= _storage.vector.v.size() || !_storage.vector.present.test(id)) {
return nullptr;
@@ -1069,16 +1096,21 @@ row::find_cell(column_id id) const {
if (i == _storage.set.end()) {
return nullptr;
}
return &i->cell();
return &i->get_cell_and_hash();
}
}
const atomic_cell_or_collection*
row::find_cell(column_id id) const {
return &find_cell_and_hash(id)->cell;
}
size_t row::external_memory_usage() const {
size_t mem = 0;
if (_type == storage_type::vector) {
mem += _storage.vector.v.external_memory_usage();
for (auto&& ac_o_c : _storage.vector.v) {
mem += ac_o_c.external_memory_usage();
for (auto&& c_a_h : _storage.vector.v) {
mem += c_a_h.cell.external_memory_usage();
}
} else {
for (auto&& ce : _storage.set) {
@@ -1321,13 +1353,13 @@ row::~row() {
row::cell_entry::cell_entry(const cell_entry& o)
: _id(o._id)
, _cell(o._cell)
, _cell_and_hash(o._cell_and_hash)
{ }
row::cell_entry::cell_entry(cell_entry&& o) noexcept
: _link()
, _id(o._id)
, _cell(std::move(o._cell))
, _cell_and_hash(std::move(o._cell_and_hash))
{
using container_type = row::map_type;
container_type::node_algorithms::replace_node(o._link.this_ptr(), _link.this_ptr());
@@ -1348,13 +1380,13 @@ void row::vector_to_set()
map_type set;
try {
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
auto& c = _storage.vector.v[i];
auto e = current_allocator().construct<cell_entry>(i, std::move(c));
auto& c_a_h = _storage.vector.v[i];
auto e = current_allocator().construct<cell_entry>(i, std::move(c_a_h));
set.insert(set.end(), *e);
}
} catch (...) {
set.clear_and_dispose([this, del = current_deleter<cell_entry>()] (cell_entry* ce) noexcept {
_storage.vector.v[ce->id()] = std::move(ce->cell());
_storage.vector.v[ce->id()] = std::move(ce->get_cell_and_hash());
del(ce);
});
throw;
@@ -1439,8 +1471,8 @@ void row::apply(const schema& s, column_kind kind, const row& other) {
} else {
reserve(other._storage.set.rbegin()->id());
}
other.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
apply(s.column_at(kind, id), cell);
other.for_each_cell([&] (column_id id, const cell_and_hash& c_a_h) {
apply(s.column_at(kind, id), c_a_h.cell, c_a_h.hash);
});
}
@@ -1457,8 +1489,8 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
} else {
reserve(other._storage.set.rbegin()->id());
}
other.consume_with([&] (column_id id, atomic_cell_or_collection& cell) {
apply_monotonically(s.column_at(kind, id), std::move(cell));
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
});
}

View File

@@ -29,6 +29,7 @@
#include <boost/range/adaptor/filtered.hpp>
#include <seastar/core/bitset-iter.hh>
#include <seastar/util/optimized_optional.hh>
#include "schema.hh"
#include "tombstone.hh"
@@ -48,6 +49,32 @@
class mutation_fragment;
class clustering_row;
struct cell_hash {
using size_type = uint64_t;
static constexpr size_type no_hash = 0;
size_type hash = no_hash;
explicit operator bool() const noexcept {
return hash != no_hash;
}
};
template<>
struct appending_hash<cell_hash> {
template<typename Hasher>
void operator()(Hasher& h, const cell_hash& ch) const {
feed_hash(h, ch.hash);
}
};
using cell_hash_opt = seastar::optimized_optional<cell_hash>;
struct cell_and_hash {
atomic_cell_or_collection cell;
mutable cell_hash_opt hash;
};
//
// Container for cells of a row. Cells are identified by column_id.
//
@@ -58,15 +85,19 @@ class clustering_row;
// Can be used as a range of row::cell_entry.
//
class row {
class cell_entry {
boost::intrusive::set_member_hook<> _link;
column_id _id;
atomic_cell_or_collection _cell;
cell_and_hash _cell_and_hash;
friend class row;
public:
cell_entry(column_id id, atomic_cell_or_collection cell)
cell_entry(column_id id, cell_and_hash c_a_h)
: _id(id)
, _cell(std::move(cell))
, _cell_and_hash(std::move(c_a_h))
{ }
cell_entry(column_id id, atomic_cell_or_collection cell)
: cell_entry(id, cell_and_hash{std::move(cell), cell_hash_opt()})
{ }
cell_entry(column_id id)
: _id(id)
@@ -75,8 +106,11 @@ class row {
cell_entry(const cell_entry&);
column_id id() const { return _id; }
const atomic_cell_or_collection& cell() const { return _cell; }
atomic_cell_or_collection& cell() { return _cell; }
const atomic_cell_or_collection& cell() const { return _cell_and_hash.cell; }
atomic_cell_or_collection& cell() { return _cell_and_hash.cell; }
const cell_hash_opt& hash() const { return _cell_and_hash.hash; }
const cell_and_hash& get_cell_and_hash() const { return _cell_and_hash; }
cell_and_hash& get_cell_and_hash() { return _cell_and_hash; }
struct compare {
bool operator()(const cell_entry& e1, const cell_entry& e2) const {
@@ -107,7 +141,7 @@ public:
static constexpr size_t max_vector_size = 32;
static constexpr size_t internal_count = 5;
private:
using vector_type = managed_vector<atomic_cell_or_collection, internal_count, size_type>;
using vector_type = managed_vector<cell_and_hash, internal_count, size_type>;
struct vector_storage {
std::bitset<max_vector_size> present;
@@ -143,6 +177,8 @@ public:
// Returns a pointer to cell's value or nullptr if column is not set.
const atomic_cell_or_collection* find_cell(column_id id) const;
// Returns a pointer to cell's value and hash or nullptr if column is not set.
const cell_and_hash* find_cell_and_hash(column_id id) const;
private:
template<typename Func>
void remove_if(Func&& func) {
@@ -151,7 +187,7 @@ private:
if (!_storage.vector.present.test(i)) {
continue;
}
auto& c = _storage.vector.v[i];
auto& c = _storage.vector.v[i].cell;
if (func(i, c)) {
c = atomic_cell_or_collection();
_storage.vector.present.reset(i);
@@ -176,10 +212,10 @@ private:
auto get_range_vector() const {
auto id_range = boost::irange<column_id>(0, _storage.vector.v.size());
return boost::combine(id_range, _storage.vector.v)
| boost::adaptors::filtered([this] (const boost::tuple<const column_id&, const atomic_cell_or_collection&>& t) {
| boost::adaptors::filtered([this] (const boost::tuple<const column_id&, const cell_and_hash&>& t) {
return _storage.vector.present.test(t.get<0>());
}) | boost::adaptors::transformed([] (const boost::tuple<const column_id&, const atomic_cell_or_collection&>& t) {
return std::pair<column_id, const atomic_cell_or_collection&>(t.get<0>(), t.get<1>());
}) | boost::adaptors::transformed([] (const boost::tuple<const column_id&, const cell_and_hash&>& t) {
return std::pair<column_id, const atomic_cell_or_collection&>(t.get<0>(), t.get<1>().cell);
});
}
auto get_range_set() const {
@@ -195,43 +231,58 @@ private:
template<typename Func>
void consume_with(Func&&);
// Func obeys the same requirements as for for_each_cell below.
template<typename Func, typename MaybeConstCellAndHash>
static constexpr auto maybe_invoke_with_hash(Func& func, column_id id, MaybeConstCellAndHash& c_a_h) {
if constexpr (std::is_invocable_v<Func, column_id, const cell_and_hash&>) {
return func(id, c_a_h);
} else {
return func(id, c_a_h.cell);
}
}
public:
// Calls Func(column_id, atomic_cell_or_collection&) for each cell in this row.
// Calls Func(column_id, cell_and_hash&) or Func(column_id, atomic_cell_and_collection&)
// for each cell in this row, depending on the concrete Func type.
// noexcept if Func doesn't throw.
template<typename Func>
void for_each_cell(Func&& func) {
if (_type == storage_type::vector) {
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
func(i, _storage.vector.v[i]);
maybe_invoke_with_hash(func, i, _storage.vector.v[i]);
}
} else {
for (auto& cell : _storage.set) {
func(cell.id(), cell.cell());
maybe_invoke_with_hash(func, cell.id(), cell.get_cell_and_hash());
}
}
}
template<typename Func>
void for_each_cell(Func&& func) const {
for_each_cell_until([func = std::forward<Func>(func)] (column_id id, const atomic_cell_or_collection& c) {
func(id, c);
return stop_iteration::no;
});
if (_type == storage_type::vector) {
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
maybe_invoke_with_hash(func, i, _storage.vector.v[i]);
}
} else {
for (auto& cell : _storage.set) {
maybe_invoke_with_hash(func, cell.id(), cell.get_cell_and_hash());
}
}
}
template<typename Func>
void for_each_cell_until(Func&& func) const {
if (_type == storage_type::vector) {
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
auto& cell = _storage.vector.v[i];
if (func(i, cell) == stop_iteration::yes) {
if (maybe_invoke_with_hash(func, i, _storage.vector.v[i]) == stop_iteration::yes) {
break;
}
}
} else {
for (auto& cell : _storage.set) {
const auto& c = cell.cell();
if (func(cell.id(), c) == stop_iteration::yes) {
if (maybe_invoke_with_hash(func, cell.id(), cell.get_cell_and_hash()) == stop_iteration::yes) {
break;
}
}
@@ -240,15 +291,14 @@ public:
// Merges cell's value into the row.
// Weak exception guarantees.
void apply(const column_definition& column, const atomic_cell_or_collection& cell);
void apply(const column_definition& column, const atomic_cell_or_collection& cell, cell_hash_opt hash = cell_hash_opt());
// Merges cell's value into the row.
// Weak exception guarantees.
void apply(const column_definition& column, atomic_cell_or_collection&& cell);
void apply(const column_definition& column, atomic_cell_or_collection&& cell, cell_hash_opt hash = cell_hash_opt());
// Monotonic exception guarantees. In case of exception the sum of cell and this remains the same as before the exception.
void apply_monotonically(const column_definition& column, atomic_cell_or_collection&& cell);
void apply_monotonically(const column_definition& column, atomic_cell_or_collection&& cell, cell_hash_opt hash = cell_hash_opt());
// Adds cell to the row. The column must not be already set.
void append_cell(column_id id, atomic_cell_or_collection cell);
@@ -272,6 +322,10 @@ public:
size_t external_memory_usage() const;
cell_hash_opt cell_hash_for(column_id id) const;
void prepare_hash(const schema& s, column_kind kind) const;
friend std::ostream& operator<<(std::ostream& os, const row& r);
};