Files
scylladb/streamed_mutation.cc
2016-06-20 21:29:49 +01:00

217 lines
7.2 KiB
C++

/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/algorithm/heap_algorithm.hpp>
#include "mutation.hh"
#include "streamed_mutation.hh"
mutation_fragment::mutation_fragment(static_row&& r)
: _kind(kind::static_row), _data(std::make_unique<data>())
{
new (&_data->_static_row) static_row(std::move(r));
}
mutation_fragment::mutation_fragment(clustering_row&& r)
: _kind(kind::clustering_row), _data(std::make_unique<data>())
{
new (&_data->_clustering_row) clustering_row(std::move(r));
}
mutation_fragment::mutation_fragment(range_tombstone_begin&& r)
: _kind(kind::range_tombstone_begin), _data(std::make_unique<data>())
{
new (&_data->_range_tombstone_begin) range_tombstone_begin(std::move(r));
}
mutation_fragment::mutation_fragment(range_tombstone_end&& r)
: _kind(kind::range_tombstone_end), _data(std::make_unique<data>())
{
new (&_data->_range_tombstone_end) range_tombstone_end(std::move(r));
}
void mutation_fragment::destroy_data() noexcept
{
switch (_kind) {
case kind::static_row:
_data->_static_row.~static_row();
break;
case kind::clustering_row:
_data->_clustering_row.~clustering_row();
break;
case kind::range_tombstone_begin:
_data->_range_tombstone_begin.~range_tombstone_begin();
break;
case kind::range_tombstone_end:
_data->_range_tombstone_end.~range_tombstone_end();
break;
}
}
const clustering_key_prefix& mutation_fragment::key() const
{
assert(has_key());
switch (_kind) {
case kind::clustering_row:
return as_clustering_row().key();
case kind::range_tombstone_begin:
return as_range_tombstone_begin().key();
case kind::range_tombstone_end:
return as_range_tombstone_end().key();
default:
abort();
}
}
int mutation_fragment::bound_kind_weight() const {
assert(has_key());
switch (_kind) {
case kind::clustering_row:
return 0;
case kind::range_tombstone_begin:
return weight(as_range_tombstone_begin().bound().kind);
case kind::range_tombstone_end:
return weight(as_range_tombstone_end().bound().kind);
default:
abort();
}
}
void mutation_fragment::apply(const schema& s, mutation_fragment&& mf)
{
assert(_kind == mf._kind);
switch (_kind) {
case kind::static_row:
_data->_static_row.apply(s, std::move(mf._data->_static_row));
mf._data->_static_row.~static_row();
break;
case kind::clustering_row:
_data->_clustering_row.apply(s, std::move(mf._data->_clustering_row));
mf._data->_clustering_row.~clustering_row();
break;
case kind::range_tombstone_begin:
_data->_range_tombstone_begin.apply(std::move(mf._data->_range_tombstone_begin));
mf._data->_range_tombstone_begin.~range_tombstone_begin();
break;
case kind::range_tombstone_end:
mf._data->_range_tombstone_end.~range_tombstone_end();
break;
}
mf._data.reset();
}
position_in_partition mutation_fragment::position() const
{
switch (_kind) {
case kind::static_row:
return _data->_static_row.position();
case kind::clustering_row:
return _data->_clustering_row.position();
case kind::range_tombstone_begin:
return _data->_range_tombstone_begin.position();
case kind::range_tombstone_end:
return _data->_range_tombstone_end.position();
}
abort();
}
std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) {
auto& s = *sm.schema();
fprint(os, "{%s.%s key %s streamed mutation}", s.ks_name(), s.cf_name(), sm.decorated_key());
return os;
}
streamed_mutation streamed_mutation_from_mutation(mutation m)
{
class reader final : public streamed_mutation::impl {
mutation _mutation;
bound_view::compare _cmp;
bool _static_row_done = false;
range_tombstone::container_type::const_iterator _rt_it;
range_tombstone::container_type::const_iterator _rt_end;
mutation_partition::rows_type::const_iterator _cr_it;
mutation_partition::rows_type::const_iterator _cr_end;
stdx::optional<range_tombstone_end> _range_tombstone_end;
private:
mutation_fragment_opt read_next() {
if (_cr_it != _cr_end) {
bool return_ck = true;
if (_range_tombstone_end) {
return_ck = _cmp(_cr_it->key(), _range_tombstone_end->bound());
} else if (_rt_it != _rt_end) {
return_ck = _cmp(_cr_it->key(), _rt_it->start_bound());
}
if (return_ck) {
return mutation_fragment(std::move(*_cr_it++));
}
}
if (_range_tombstone_end) {
auto mf = mutation_fragment(std::move(*_range_tombstone_end));
_range_tombstone_end = { };
return mf;
} else if (_rt_it != _rt_end) {
auto rt = std::move(*_rt_it++);
_range_tombstone_end = range_tombstone_end(std::move(rt.end), rt.end_kind);
mutation_fragment mf = range_tombstone_begin(std::move(rt.start), rt.start_kind, rt.tomb);
return mf;
}
return { };
}
private:
void do_fill_buffer() {
if (!_static_row_done) {
_static_row_done = true;
if (!_mutation.partition().static_row().empty()) {
push_mutation_fragment(static_row(std::move(_mutation.partition().static_row())));
}
}
while (!is_end_of_stream() && !is_buffer_full()) {
auto mfopt = read_next();
if (mfopt) {
push_mutation_fragment(std::move(*mfopt));
} else {
_end_of_stream = true;
}
}
}
public:
explicit reader(mutation m)
: streamed_mutation::impl(m.schema(), m.decorated_key(), m.partition().partition_tombstone())
, _mutation(std::move(m))
, _cmp(*_mutation.schema())
{
_rt_it = _mutation.partition().row_tombstones().begin();
_cr_it = _mutation.partition().clustered_rows().begin();
_rt_end = _mutation.partition().row_tombstones().end();
_cr_end = _mutation.partition().clustered_rows().end();
do_fill_buffer();
}
virtual future<> fill_buffer() override {
do_fill_buffer();
return make_ready_future<>();
}
};
return make_streamed_mutation<reader>(std::move(m));
}