row_consumer: de-virtualize io_priority() and resource_tracker()
Fixes #2830 Signed-off-by: Botond Dénes <bdenes@scylladb.com> Message-Id: <448a1f739ab8c88a7a5562bce8dce5ae6efdf934.1507302530.git.bdenes@scylladb.com>
This commit is contained in:
committed by
Duarte Nunes
parent
d2b294dc06
commit
a43901f842
@@ -79,6 +79,7 @@
|
||||
#include "utils/phased_barrier.hh"
|
||||
#include "cpu_controller.hh"
|
||||
#include "dirty_memory_manager.hh"
|
||||
#include "reader_resource_tracker.hh"
|
||||
|
||||
class cell_locker;
|
||||
class cell_locker_stats;
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "core/future-util.hh"
|
||||
#include "utils/move.hh"
|
||||
#include "stdx.hh"
|
||||
#include "reader_resource_tracker.hh"
|
||||
|
||||
// Dumb selector implementation for combined_mutation_reader that simply
|
||||
// forwards it's list of readers.
|
||||
|
||||
@@ -374,31 +374,6 @@ public:
|
||||
mutation_source make_empty_mutation_source();
|
||||
snapshot_source make_empty_snapshot_source();
|
||||
|
||||
|
||||
class reader_resource_tracker {
|
||||
semaphore* _sem = nullptr;
|
||||
public:
|
||||
reader_resource_tracker() = default;
|
||||
explicit reader_resource_tracker(semaphore* sem)
|
||||
: _sem(sem) {
|
||||
}
|
||||
|
||||
bool operator==(const reader_resource_tracker& other) const {
|
||||
return _sem == other._sem;
|
||||
}
|
||||
|
||||
file track(file f) const;
|
||||
|
||||
semaphore* get_semaphore() const {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
inline reader_resource_tracker no_resource_tracking() {
|
||||
return reader_resource_tracker(nullptr);
|
||||
}
|
||||
|
||||
|
||||
struct restricted_mutation_reader_config {
|
||||
semaphore* resources_sem = nullptr;
|
||||
uint64_t* active_reads = nullptr;
|
||||
|
||||
48
reader_resource_tracker.hh
Normal file
48
reader_resource_tracker.hh
Normal file
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <core/file.hh>
|
||||
#include <core/semaphore.hh>
|
||||
|
||||
class reader_resource_tracker {
|
||||
seastar::semaphore* _sem = nullptr;
|
||||
public:
|
||||
reader_resource_tracker() = default;
|
||||
explicit reader_resource_tracker(seastar::semaphore* sem)
|
||||
: _sem(sem) {
|
||||
}
|
||||
|
||||
bool operator==(const reader_resource_tracker& other) const {
|
||||
return _sem == other._sem;
|
||||
}
|
||||
|
||||
file track(file f) const;
|
||||
|
||||
semaphore* get_semaphore() const {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
inline reader_resource_tracker no_resource_tracking() {
|
||||
return reader_resource_tracker(nullptr);
|
||||
}
|
||||
@@ -53,7 +53,6 @@ public:
|
||||
};
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
const io_priority_class& _pc;
|
||||
const query::partition_slice& _slice;
|
||||
reader_resource_tracker _resource_tracker;
|
||||
bool _out_of_range = false;
|
||||
@@ -309,8 +308,8 @@ public:
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _schema(schema)
|
||||
, _pc(pc)
|
||||
: row_consumer(resource_tracker, pc)
|
||||
, _schema(schema)
|
||||
, _slice(slice)
|
||||
, _resource_tracker(std::move(resource_tracker))
|
||||
, _fwd(fwd)
|
||||
@@ -672,13 +671,6 @@ public:
|
||||
}
|
||||
return proceed::yes;
|
||||
}
|
||||
virtual const io_priority_class& io_priority() override {
|
||||
return _pc;
|
||||
}
|
||||
|
||||
virtual reader_resource_tracker resource_tracker() override {
|
||||
return _resource_tracker;
|
||||
}
|
||||
|
||||
// Returns true if the consumer is positioned at partition boundary,
|
||||
// meaning that after next read either get_mutation() will
|
||||
|
||||
@@ -27,8 +27,7 @@
|
||||
#include "core/temporary_buffer.hh"
|
||||
#include "consumer.hh"
|
||||
#include "sstables/types.hh"
|
||||
|
||||
class reader_resource_tracker;
|
||||
#include "reader_resource_tracker.hh"
|
||||
|
||||
// sstables::data_consume_row feeds the contents of a single row into a
|
||||
// row_consumer object:
|
||||
@@ -48,9 +47,19 @@ class reader_resource_tracker;
|
||||
// row into one buffer, the byte_views remain valid until consume_row_end()
|
||||
// is called.]
|
||||
class row_consumer {
|
||||
reader_resource_tracker _resource_tracker;
|
||||
const io_priority_class& _pc;
|
||||
|
||||
public:
|
||||
using proceed = data_consumer::proceed;
|
||||
|
||||
row_consumer(reader_resource_tracker resource_tracker, const io_priority_class& pc)
|
||||
: _resource_tracker(resource_tracker)
|
||||
, _pc(pc) {
|
||||
}
|
||||
|
||||
virtual ~row_consumer() = default;
|
||||
|
||||
// Consume the row's key and deletion_time. The latter determines if the
|
||||
// row is a tombstone, and if so, when it has been deleted.
|
||||
// Note that the key is in serialized form, and should be deserialized
|
||||
@@ -94,10 +103,12 @@ public:
|
||||
virtual void reset(sstables::indexable_element) = 0;
|
||||
|
||||
// Under which priority class to place I/O coming from this consumer
|
||||
virtual const io_priority_class& io_priority() = 0;
|
||||
const io_priority_class& io_priority() const {
|
||||
return _pc;
|
||||
}
|
||||
|
||||
// The restriction that applies to this consumer
|
||||
virtual reader_resource_tracker resource_tracker() = 0;
|
||||
|
||||
virtual ~row_consumer() { }
|
||||
reader_resource_tracker resource_tracker() const {
|
||||
return _resource_tracker;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -367,12 +367,17 @@ SEASTAR_TEST_CASE(compressed_random_access_read) {
|
||||
class test_row_consumer : public row_consumer {
|
||||
public:
|
||||
const int64_t desired_timestamp;
|
||||
test_row_consumer(int64_t t) : desired_timestamp(t) { }
|
||||
int count_row_start = 0;
|
||||
int count_cell = 0;
|
||||
int count_deleted_cell = 0;
|
||||
int count_range_tombstone = 0;
|
||||
int count_row_end = 0;
|
||||
|
||||
test_row_consumer(int64_t t)
|
||||
: row_consumer(no_resource_tracking()
|
||||
, default_priority_class()), desired_timestamp(t) {
|
||||
}
|
||||
|
||||
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
|
||||
BOOST_REQUIRE(bytes_view(key) == as_bytes("vinna"));
|
||||
BOOST_REQUIRE(deltime.local_deletion_time == std::numeric_limits<int32_t>::max());
|
||||
@@ -435,16 +440,12 @@ public:
|
||||
count_range_tombstone++;
|
||||
return proceed::yes;
|
||||
}
|
||||
|
||||
virtual proceed consume_row_end() override {
|
||||
count_row_end++;
|
||||
return proceed::yes;
|
||||
}
|
||||
virtual const io_priority_class& io_priority() override {
|
||||
return default_priority_class();
|
||||
}
|
||||
virtual reader_resource_tracker resource_tracker() override {
|
||||
return no_resource_tracking();
|
||||
}
|
||||
|
||||
virtual void reset(indexable_element) override { }
|
||||
};
|
||||
|
||||
@@ -523,6 +524,11 @@ public:
|
||||
int count_deleted_cell = 0;
|
||||
int count_row_end = 0;
|
||||
int count_range_tombstone = 0;
|
||||
|
||||
count_row_consumer()
|
||||
: row_consumer(no_resource_tracking(), default_priority_class()) {
|
||||
}
|
||||
|
||||
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
|
||||
count_row_start++;
|
||||
return proceed::yes;
|
||||
@@ -554,12 +560,6 @@ public:
|
||||
count_range_tombstone++;
|
||||
return proceed::yes;
|
||||
}
|
||||
virtual const io_priority_class& io_priority() override {
|
||||
return default_priority_class();
|
||||
}
|
||||
virtual reader_resource_tracker resource_tracker() override {
|
||||
return no_resource_tracking();
|
||||
}
|
||||
virtual void reset(indexable_element) override { }
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user