Introduce flat_mutation_reader
This reader operates on mutation_fragments instead of streamed_mutations. Each partition starts with a partition_header fragment and ends with end_of_partition fragment. Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
@@ -329,6 +329,7 @@ scylla_core = (['database.cc',
|
||||
'mutation_partition_view.cc',
|
||||
'mutation_partition_serializer.cc',
|
||||
'mutation_reader.cc',
|
||||
'flat_mutation_reader.cc',
|
||||
'mutation_query.cc',
|
||||
'keys.cc',
|
||||
'counters.cc',
|
||||
|
||||
34
flat_mutation_reader.cc
Normal file
34
flat_mutation_reader.cc
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "flat_mutation_reader.hh"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
|
||||
void flat_mutation_reader::impl::forward_buffer_to(schema_ptr& schema, const position_in_partition& pos) {
|
||||
_buffer.erase(std::remove_if(_buffer.begin(), _buffer.end(), [this, &pos, &schema] (mutation_fragment& f) {
|
||||
return !f.relevant_for_range_assuming_after(*schema, pos);
|
||||
}), _buffer.end());
|
||||
|
||||
_buffer_size = boost::accumulate(_buffer | boost::adaptors::transformed(std::mem_fn(&mutation_fragment::memory_usage)), size_t(0));
|
||||
}
|
||||
185
flat_mutation_reader.hh
Normal file
185
flat_mutation_reader.hh
Normal file
@@ -0,0 +1,185 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "position_in_partition.hh"
|
||||
#include "streamed_mutation.hh"
|
||||
#include "mutation_reader.hh"
|
||||
|
||||
#include <seastar/util/gcc6-concepts.hh>
|
||||
|
||||
using seastar::future;
|
||||
|
||||
GCC6_CONCEPT(
|
||||
template<typename Consumer>
|
||||
concept bool FlatMutationReaderConsumer() {
|
||||
return requires(Consumer c, mutation_fragment mf) {
|
||||
{ c(std::move(mf)) } -> stop_iteration;
|
||||
};
|
||||
}
|
||||
)
|
||||
|
||||
/*
|
||||
* Allows iteration on mutations using mutation_fragments.
|
||||
* It iterates over mutations one by one and for each mutation
|
||||
* it returns:
|
||||
* 1. partition_start mutation_fragment
|
||||
* 2. static_row mutation_fragment if one exists
|
||||
* 3. mutation_fragments for all clustering rows and range tombstones
|
||||
* in clustering key order
|
||||
* 4. partition_end mutation_fragment
|
||||
* The best way to consume those mutation_fragments is to call
|
||||
* flat_mutation_reader::consume with a consumer that receives the fragments.
|
||||
*/
|
||||
class flat_mutation_reader final {
|
||||
public:
|
||||
class impl {
|
||||
circular_buffer<mutation_fragment> _buffer;
|
||||
size_t _buffer_size = 0;
|
||||
bool _consume_done = false;
|
||||
protected:
|
||||
static constexpr size_t max_buffer_size_in_bytes = 8 * 1024;
|
||||
bool _end_of_stream = false;
|
||||
protected:
|
||||
template<typename... Args>
|
||||
void push_mutation_fragment(Args&&... args) {
|
||||
_buffer.emplace_back(std::forward<Args>(args)...);
|
||||
_buffer_size += _buffer.back().memory_usage();
|
||||
}
|
||||
void clear_buffer() {
|
||||
_buffer.erase(_buffer.begin(), _buffer.end());
|
||||
_buffer_size = 0;
|
||||
}
|
||||
void forward_buffer_to(schema_ptr& s, const position_in_partition& pos);
|
||||
public:
|
||||
virtual ~impl() {}
|
||||
virtual future<> fill_buffer() = 0;
|
||||
virtual void next_partition() = 0;
|
||||
|
||||
bool is_end_of_stream() const { return _end_of_stream; }
|
||||
bool is_buffer_empty() const { return _buffer.empty(); }
|
||||
bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; }
|
||||
|
||||
mutation_fragment pop_mutation_fragment() {
|
||||
auto mf = std::move(_buffer.front());
|
||||
_buffer.pop_front();
|
||||
_buffer_size -= mf.memory_usage();
|
||||
return mf;
|
||||
}
|
||||
|
||||
future<mutation_fragment_opt> operator()() {
|
||||
if (is_buffer_empty()) {
|
||||
if (is_end_of_stream()) {
|
||||
return make_ready_future<mutation_fragment_opt>();
|
||||
}
|
||||
return fill_buffer().then([this] { return operator()(); });
|
||||
}
|
||||
return make_ready_future<mutation_fragment_opt>(pop_mutation_fragment());
|
||||
}
|
||||
|
||||
template<typename Consumer>
|
||||
GCC6_CONCEPT(
|
||||
requires FlatMutationReaderConsumer<Consumer>()
|
||||
)
|
||||
future<> consume_pausable(Consumer consumer) {
|
||||
_consume_done = false;
|
||||
return do_until([this] { return (is_end_of_stream() && is_buffer_empty()) || _consume_done; }, [this, consumer = std::move(consumer)] () mutable {
|
||||
if (is_buffer_empty()) {
|
||||
return fill_buffer();
|
||||
}
|
||||
|
||||
_consume_done = consumer(pop_mutation_fragment()) == stop_iteration::yes;
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(const dht::partition_range&) = 0;
|
||||
virtual future<> fast_forward_to(position_range) = 0;
|
||||
};
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
public:
|
||||
flat_mutation_reader(std::unique_ptr<impl> impl) noexcept : _impl(std::move(impl)) {}
|
||||
|
||||
future<mutation_fragment_opt> operator()() {
|
||||
return _impl->operator()();
|
||||
}
|
||||
|
||||
template <typename Consumer>
|
||||
GCC6_CONCEPT(
|
||||
requires FlatMutationReaderConsumer<Consumer>()
|
||||
)
|
||||
auto consume_pausable(Consumer consumer) {
|
||||
return _impl->consume_pausable(std::move(consumer));
|
||||
}
|
||||
|
||||
void next_partition() { _impl->next_partition(); }
|
||||
|
||||
future<> fill_buffer() { return _impl->fill_buffer(); }
|
||||
|
||||
// Changes the range of partitions to pr. The range can only be moved
|
||||
// forwards. pr.begin() needs to be larger than pr.end() of the previousl
|
||||
// used range (i.e. either the initial one passed to the constructor or a
|
||||
// previous fast forward target).
|
||||
// pr needs to be valid until the reader is destroyed or fast_forward_to()
|
||||
// is called again.
|
||||
future<> fast_forward_to(const dht::partition_range& pr) {
|
||||
return _impl->fast_forward_to(pr);
|
||||
}
|
||||
// Skips to a later range of rows.
|
||||
// The new range must not overlap with the current range.
|
||||
//
|
||||
// In forwarding mode the stream does not return all fragments right away,
|
||||
// but only those belonging to the current clustering range. Initially
|
||||
// current range only covers the static row. The stream can be forwarded
|
||||
// (even before end-of- stream) to a later range with fast_forward_to().
|
||||
// Forwarding doesn't change initial restrictions of the stream, it can
|
||||
// only be used to skip over data.
|
||||
//
|
||||
// Monotonicity of positions is preserved by forwarding. That is fragments
|
||||
// emitted after forwarding will have greater positions than any fragments
|
||||
// emitted before forwarding.
|
||||
//
|
||||
// For any range, all range tombstones relevant for that range which are
|
||||
// present in the original stream will be emitted. Range tombstones
|
||||
// emitted before forwarding which overlap with the new range are not
|
||||
// necessarily re-emitted.
|
||||
//
|
||||
// When forwarding mode is not enabled, fast_forward_to()
|
||||
// cannot be used.
|
||||
future<> fast_forward_to(position_range cr) {
|
||||
return _impl->fast_forward_to(std::move(cr));
|
||||
}
|
||||
bool is_end_of_stream() const { return _impl->is_end_of_stream(); }
|
||||
bool is_buffer_empty() const { return _impl->is_buffer_empty(); }
|
||||
bool is_buffer_full() const { return _impl->is_buffer_full(); }
|
||||
};
|
||||
|
||||
template<typename Impl, typename... Args>
|
||||
flat_mutation_reader make_flat_mutation_reader(Args &&... args) {
|
||||
return flat_mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
|
||||
}
|
||||
Reference in New Issue
Block a user