view: row_locker: add latency_stats_tracker

Refactor the existing stats tracking and updating
code into struct latency_stats_tracker and while at it,
count lock_acquisitions only on success.

Decrement operations_currently_waiting_for_lock in the destructor
so it's always balanced with the uncoditional increment
in the ctor.

As for updating estimated_waiting_for_lock, it is always
updated in the dtor, both on success and failure since
the wait for the lock happened, whether waiting
timed out or not.

Fixes #12190

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #12225
This commit is contained in:
Benny Halevy
2022-12-04 16:21:27 +02:00
committed by Avi Kivity
parent 9ee78975b7
commit bdb6550305
2 changed files with 34 additions and 20 deletions

View File

@@ -8,7 +8,6 @@
#include "row_locking.hh"
#include "log.hh"
#include "utils/latency.hh"
static logging::logger mylog("row_locking");
@@ -50,23 +49,34 @@ row_locker::lock_holder::lock_holder(row_locker* locker, const dht::decorated_ke
, _row_exclusive(exclusive) {
}
row_locker::latency_stats_tracker::latency_stats_tracker(row_locker::single_lock_stats& stats)
: lock_stats(stats)
{
waiting_latency.start();
lock_stats.operations_currently_waiting_for_lock++;
}
row_locker::latency_stats_tracker::~latency_stats_tracker() {
lock_stats.operations_currently_waiting_for_lock--;
waiting_latency.stop();
lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency());
}
void row_locker::latency_stats_tracker::lock_acquired() {
lock_stats.lock_acquisitions++;
}
future<row_locker::lock_holder>
row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) {
mylog.debug("taking {} lock on entire partition {}", (exclusive ? "exclusive" : "shared"), pk);
auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_partition : stats.shared_partition);
auto i = _two_level_locks.try_emplace(pk, this).first;
single_lock_stats &single_lock_stats = exclusive ? stats.exclusive_partition : stats.shared_partition;
single_lock_stats.operations_currently_waiting_for_lock++;
utils::latency_counter waiting_latency;
waiting_latency.start();
auto f = exclusive ? i->second._partition_lock.write_lock(timeout) : i->second._partition_lock.read_lock(timeout);
// Note: we rely on the fact that &i->first, the pointer to a key, never
// becomes invalid (as long as the item is actually in the hash table),
// even in the case of rehashing.
return f.then([this, pk = &i->first, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency)] () mutable {
waiting_latency.stop();
single_lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency());
single_lock_stats.lock_acquisitions++;
single_lock_stats.operations_currently_waiting_for_lock--;
return f.then([this, pk = &i->first, exclusive, tracker = std::move(tracker)] () mutable {
tracker.lock_acquired();
return lock_holder(this, pk, exclusive);
});
}
@@ -74,6 +84,7 @@ row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_cl
future<row_locker::lock_holder>
row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) {
mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk);
auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_row : stats.shared_row);
auto i = _two_level_locks.try_emplace(pk, this).first;
future<lock_type::holder> lock_partition = i->second._partition_lock.hold_read_lock(timeout);
auto j = i->second._row_locks.find(cpk);
@@ -96,19 +107,12 @@ row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& c
});
}
}
single_lock_stats &single_lock_stats = exclusive ? stats.exclusive_row : stats.shared_row;
single_lock_stats.operations_currently_waiting_for_lock++;
utils::latency_counter waiting_latency;
waiting_latency.start();
return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency), timeout] (auto lock1) mutable {
return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable {
auto lock_row = exclusive ? row_lock.hold_write_lock(timeout) : row_lock.hold_read_lock(timeout);
return lock_row.then([this, pk, cpk, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency), lock1 = std::move(lock1)] (auto lock2) mutable {
return lock_row.then([this, pk, cpk, exclusive, tracker = std::move(tracker), lock1 = std::move(lock1)] (auto lock2) mutable {
lock1.release();
lock2.release();
waiting_latency.stop();
single_lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency());
single_lock_stats.lock_acquisitions++;
single_lock_stats.operations_currently_waiting_for_lock--;
tracker.lock_acquired();
return lock_holder(this, pk, cpk, exclusive);
});
});

View File

@@ -30,6 +30,7 @@
#include "dht/i_partitioner.hh"
#include "query-request.hh"
#include "utils/estimated_histogram.hh"
#include "utils/latency.hh"
class row_locker {
public:
@@ -44,6 +45,15 @@ public:
single_lock_stats exclusive_partition;
single_lock_stats shared_partition;
};
struct latency_stats_tracker {
single_lock_stats& lock_stats;
utils::latency_counter waiting_latency;
latency_stats_tracker(single_lock_stats& stats);
~latency_stats_tracker();
void lock_acquired();
};
// row_locker's locking functions lock_pk(), lock_ck() return a
// "lock_holder" object. When the caller destroys the object it received,
// the lock is released. The same type "lock_holder" is used regardless