row_consumer: de-virtualize io_priority() and resource_tracker()

Fixes #2830

Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <448a1f739ab8c88a7a5562bce8dce5ae6efdf934.1507302530.git.bdenes@scylladb.com>
This commit is contained in:
Botond Dénes
2017-10-06 18:09:41 +03:00
committed by Duarte Nunes
parent d2b294dc06
commit a43901f842
7 changed files with 82 additions and 54 deletions

View File

@@ -79,6 +79,7 @@
#include "utils/phased_barrier.hh"
#include "cpu_controller.hh"
#include "dirty_memory_manager.hh"
#include "reader_resource_tracker.hh"
class cell_locker;
class cell_locker_stats;

View File

@@ -27,6 +27,7 @@
#include "core/future-util.hh"
#include "utils/move.hh"
#include "stdx.hh"
#include "reader_resource_tracker.hh"
// Dumb selector implementation for combined_mutation_reader that simply
// forwards it's list of readers.

View File

@@ -374,31 +374,6 @@ public:
mutation_source make_empty_mutation_source();
snapshot_source make_empty_snapshot_source();
class reader_resource_tracker {
semaphore* _sem = nullptr;
public:
reader_resource_tracker() = default;
explicit reader_resource_tracker(semaphore* sem)
: _sem(sem) {
}
bool operator==(const reader_resource_tracker& other) const {
return _sem == other._sem;
}
file track(file f) const;
semaphore* get_semaphore() const {
return _sem;
}
};
inline reader_resource_tracker no_resource_tracking() {
return reader_resource_tracker(nullptr);
}
struct restricted_mutation_reader_config {
semaphore* resources_sem = nullptr;
uint64_t* active_reads = nullptr;

View File

@@ -0,0 +1,48 @@
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* Copyright (C) 2017 ScyllaDB
*/
#pragma once
#include <core/file.hh>
#include <core/semaphore.hh>
class reader_resource_tracker {
seastar::semaphore* _sem = nullptr;
public:
reader_resource_tracker() = default;
explicit reader_resource_tracker(seastar::semaphore* sem)
: _sem(sem) {
}
bool operator==(const reader_resource_tracker& other) const {
return _sem == other._sem;
}
file track(file f) const;
semaphore* get_semaphore() const {
return _sem;
}
};
inline reader_resource_tracker no_resource_tracking() {
return reader_resource_tracker(nullptr);
}

View File

@@ -53,7 +53,6 @@ public:
};
private:
schema_ptr _schema;
const io_priority_class& _pc;
const query::partition_slice& _slice;
reader_resource_tracker _resource_tracker;
bool _out_of_range = false;
@@ -309,8 +308,8 @@ public:
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
streamed_mutation::forwarding fwd)
: _schema(schema)
, _pc(pc)
: row_consumer(resource_tracker, pc)
, _schema(schema)
, _slice(slice)
, _resource_tracker(std::move(resource_tracker))
, _fwd(fwd)
@@ -672,13 +671,6 @@ public:
}
return proceed::yes;
}
virtual const io_priority_class& io_priority() override {
return _pc;
}
virtual reader_resource_tracker resource_tracker() override {
return _resource_tracker;
}
// Returns true if the consumer is positioned at partition boundary,
// meaning that after next read either get_mutation() will

View File

@@ -27,8 +27,7 @@
#include "core/temporary_buffer.hh"
#include "consumer.hh"
#include "sstables/types.hh"
class reader_resource_tracker;
#include "reader_resource_tracker.hh"
// sstables::data_consume_row feeds the contents of a single row into a
// row_consumer object:
@@ -48,9 +47,19 @@ class reader_resource_tracker;
// row into one buffer, the byte_views remain valid until consume_row_end()
// is called.]
class row_consumer {
reader_resource_tracker _resource_tracker;
const io_priority_class& _pc;
public:
using proceed = data_consumer::proceed;
row_consumer(reader_resource_tracker resource_tracker, const io_priority_class& pc)
: _resource_tracker(resource_tracker)
, _pc(pc) {
}
virtual ~row_consumer() = default;
// Consume the row's key and deletion_time. The latter determines if the
// row is a tombstone, and if so, when it has been deleted.
// Note that the key is in serialized form, and should be deserialized
@@ -94,10 +103,12 @@ public:
virtual void reset(sstables::indexable_element) = 0;
// Under which priority class to place I/O coming from this consumer
virtual const io_priority_class& io_priority() = 0;
const io_priority_class& io_priority() const {
return _pc;
}
// The restriction that applies to this consumer
virtual reader_resource_tracker resource_tracker() = 0;
virtual ~row_consumer() { }
reader_resource_tracker resource_tracker() const {
return _resource_tracker;
}
};

View File

@@ -367,12 +367,17 @@ SEASTAR_TEST_CASE(compressed_random_access_read) {
class test_row_consumer : public row_consumer {
public:
const int64_t desired_timestamp;
test_row_consumer(int64_t t) : desired_timestamp(t) { }
int count_row_start = 0;
int count_cell = 0;
int count_deleted_cell = 0;
int count_range_tombstone = 0;
int count_row_end = 0;
test_row_consumer(int64_t t)
: row_consumer(no_resource_tracking()
, default_priority_class()), desired_timestamp(t) {
}
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
BOOST_REQUIRE(bytes_view(key) == as_bytes("vinna"));
BOOST_REQUIRE(deltime.local_deletion_time == std::numeric_limits<int32_t>::max());
@@ -435,16 +440,12 @@ public:
count_range_tombstone++;
return proceed::yes;
}
virtual proceed consume_row_end() override {
count_row_end++;
return proceed::yes;
}
virtual const io_priority_class& io_priority() override {
return default_priority_class();
}
virtual reader_resource_tracker resource_tracker() override {
return no_resource_tracking();
}
virtual void reset(indexable_element) override { }
};
@@ -523,6 +524,11 @@ public:
int count_deleted_cell = 0;
int count_row_end = 0;
int count_range_tombstone = 0;
count_row_consumer()
: row_consumer(no_resource_tracking(), default_priority_class()) {
}
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
count_row_start++;
return proceed::yes;
@@ -554,12 +560,6 @@ public:
count_range_tombstone++;
return proceed::yes;
}
virtual const io_priority_class& io_priority() override {
return default_priority_class();
}
virtual reader_resource_tracker resource_tracker() override {
return no_resource_tracking();
}
virtual void reset(indexable_element) override { }
};