diff --git a/db/view/row_locking.cc b/db/view/row_locking.cc index a8a1b15c89..b02eb965d0 100644 --- a/db/view/row_locking.cc +++ b/db/view/row_locking.cc @@ -8,7 +8,6 @@ #include "row_locking.hh" #include "log.hh" -#include "utils/latency.hh" static logging::logger mylog("row_locking"); @@ -50,23 +49,34 @@ row_locker::lock_holder::lock_holder(row_locker* locker, const dht::decorated_ke , _row_exclusive(exclusive) { } +row_locker::latency_stats_tracker::latency_stats_tracker(row_locker::single_lock_stats& stats) + : lock_stats(stats) +{ + waiting_latency.start(); + lock_stats.operations_currently_waiting_for_lock++; +} + +row_locker::latency_stats_tracker::~latency_stats_tracker() { + lock_stats.operations_currently_waiting_for_lock--; + waiting_latency.stop(); + lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency()); +} + +void row_locker::latency_stats_tracker::lock_acquired() { + lock_stats.lock_acquisitions++; +} + future row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) { mylog.debug("taking {} lock on entire partition {}", (exclusive ? "exclusive" : "shared"), pk); + auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_partition : stats.shared_partition); auto i = _two_level_locks.try_emplace(pk, this).first; - single_lock_stats &single_lock_stats = exclusive ? stats.exclusive_partition : stats.shared_partition; - single_lock_stats.operations_currently_waiting_for_lock++; - utils::latency_counter waiting_latency; - waiting_latency.start(); auto f = exclusive ? i->second._partition_lock.write_lock(timeout) : i->second._partition_lock.read_lock(timeout); // Note: we rely on the fact that &i->first, the pointer to a key, never // becomes invalid (as long as the item is actually in the hash table), // even in the case of rehashing. - return f.then([this, pk = &i->first, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency)] () mutable { - waiting_latency.stop(); - single_lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency()); - single_lock_stats.lock_acquisitions++; - single_lock_stats.operations_currently_waiting_for_lock--; + return f.then([this, pk = &i->first, exclusive, tracker = std::move(tracker)] () mutable { + tracker.lock_acquired(); return lock_holder(this, pk, exclusive); }); } @@ -74,6 +84,7 @@ row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_cl future row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) { mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk); + auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_row : stats.shared_row); auto i = _two_level_locks.try_emplace(pk, this).first; future lock_partition = i->second._partition_lock.hold_read_lock(timeout); auto j = i->second._row_locks.find(cpk); @@ -96,19 +107,12 @@ row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& c }); } } - single_lock_stats &single_lock_stats = exclusive ? stats.exclusive_row : stats.shared_row; - single_lock_stats.operations_currently_waiting_for_lock++; - utils::latency_counter waiting_latency; - waiting_latency.start(); - return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency), timeout] (auto lock1) mutable { + return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable { auto lock_row = exclusive ? row_lock.hold_write_lock(timeout) : row_lock.hold_read_lock(timeout); - return lock_row.then([this, pk, cpk, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency), lock1 = std::move(lock1)] (auto lock2) mutable { + return lock_row.then([this, pk, cpk, exclusive, tracker = std::move(tracker), lock1 = std::move(lock1)] (auto lock2) mutable { lock1.release(); lock2.release(); - waiting_latency.stop(); - single_lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency()); - single_lock_stats.lock_acquisitions++; - single_lock_stats.operations_currently_waiting_for_lock--; + tracker.lock_acquired(); return lock_holder(this, pk, cpk, exclusive); }); }); diff --git a/db/view/row_locking.hh b/db/view/row_locking.hh index a60716e83d..773c70e2c0 100644 --- a/db/view/row_locking.hh +++ b/db/view/row_locking.hh @@ -30,6 +30,7 @@ #include "dht/i_partitioner.hh" #include "query-request.hh" #include "utils/estimated_histogram.hh" +#include "utils/latency.hh" class row_locker { public: @@ -44,6 +45,15 @@ public: single_lock_stats exclusive_partition; single_lock_stats shared_partition; }; + struct latency_stats_tracker { + single_lock_stats& lock_stats; + utils::latency_counter waiting_latency; + + latency_stats_tracker(single_lock_stats& stats); + ~latency_stats_tracker(); + + void lock_acquired(); + }; // row_locker's locking functions lock_pk(), lock_ck() return a // "lock_holder" object. When the caller destroys the object it received, // the lock is released. The same type "lock_holder" is used regardless