diff --git a/atomic_cell.hh b/atomic_cell.hh index a0ed9ace59..070d809e25 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -29,10 +29,11 @@ #include "net/byteorder.hh" #include #include +#include -template +template static inline -void set_field(managed_bytes& v, unsigned offset, T val) { +void set_field(Input& v, unsigned offset, T val) { reinterpret_cast*>(v.begin() + offset)->raw = net::hton(val); } @@ -58,6 +59,7 @@ private: static constexpr int8_t EXPIRY_FLAG = 0x02; // When present, expiry field is present. Set only for live cells static constexpr int8_t REVERT_FLAG = 0x04; // transient flag used to efficiently implement ReversiblyMergeable for atomic cells. static constexpr int8_t COUNTER_UPDATE_FLAG = 0x08; // Cell is a counter update. + static constexpr int8_t COUNTER_IN_PLACE_REVERT = 0x10; static constexpr unsigned flags_size = 1; static constexpr unsigned timestamp_offset = flags_size; static constexpr unsigned timestamp_size = 8; @@ -67,6 +69,7 @@ private: static constexpr unsigned deletion_time_size = 4; static constexpr unsigned ttl_offset = expiry_offset + expiry_size; static constexpr unsigned ttl_size = 4; + friend class counter_cell_builder; private: static bool is_counter_update(bytes_view cell) { return cell[0] & COUNTER_UPDATE_FLAG; @@ -74,10 +77,17 @@ private: static bool is_revert_set(bytes_view cell) { return cell[0] & REVERT_FLAG; } + static bool is_counter_in_place_revert_set(bytes_view cell) { + return cell[0] & COUNTER_IN_PLACE_REVERT; + } template static void set_revert(BytesContainer& cell, bool revert) { cell[0] = (cell[0] & ~REVERT_FLAG) | (revert * REVERT_FLAG); } + template + static void set_counter_in_place_revert(BytesContainer& cell, bool flag) { + cell[0] = (cell[0] & ~COUNTER_IN_PLACE_REVERT) | (flag * COUNTER_IN_PLACE_REVERT); + } static bool is_live(const bytes_view& cell) { return cell[0] & LIVE_FLAG; } @@ -91,13 +101,30 @@ private: static api::timestamp_type timestamp(const bytes_view& cell) { return get_field(cell, timestamp_offset); } + template + static void set_timestamp(BytesContainer& cell, api::timestamp_type ts) { + set_field(cell, timestamp_offset, ts); + } // Can be called on live cells only - static bytes_view value(bytes_view cell) { +private: + template + static BytesView do_get_value(BytesView cell) { auto expiry_field_size = bool(cell[0] & EXPIRY_FLAG) * (expiry_size + ttl_size); auto value_offset = flags_size + timestamp_size + expiry_field_size; cell.remove_prefix(value_offset); return cell; } +public: + static bytes_view value(bytes_view cell) { + return do_get_value(cell); + } + static bytes_mutable_view value(bytes_mutable_view cell) { + return do_get_value(cell); + } + // Can be called on live counter update cells only + static int64_t counter_update_value(bytes_view cell) { + return get_field(cell, flags_size + timestamp_size); + } // Can be called only when is_dead() is true. static gc_clock::time_point deletion_time(const bytes_view& cell) { assert(is_dead(cell)); @@ -130,12 +157,12 @@ private: std::copy_n(value.begin(), value.size(), b.begin() + value_offset); return b; } - static managed_bytes make_live_counter_update(api::timestamp_type timestamp, bytes_view value) { + static managed_bytes make_live_counter_update(api::timestamp_type timestamp, int64_t value) { auto value_offset = flags_size + timestamp_size; - managed_bytes b(managed_bytes::initialized_later(), value_offset + value.size()); + managed_bytes b(managed_bytes::initialized_later(), value_offset + sizeof(value)); b[0] = LIVE_FLAG | COUNTER_UPDATE_FLAG; set_field(b, timestamp_offset, timestamp); - std::copy_n(value.begin(), value.size(), b.begin() + value_offset); + set_field(b, value_offset, value); return b; } static managed_bytes make_live(api::timestamp_type timestamp, bytes_view value, gc_clock::time_point expiry, gc_clock::duration ttl) { @@ -148,6 +175,31 @@ private: std::copy_n(value.begin(), value.size(), b.begin() + value_offset); return b; } + // make_live_from_serializer() is intended for users that need to serialise + // some object or objects to the format used in atomic_cell::value(). + // With just make_live() the patter would look like follows: + // 1. allocate a buffer and write to it serialised objects + // 2. pass that buffer to make_live() + // 3. make_live() needs to prepend some metadata to the cell value so it + // allocates a new buffer and copies the content of the original one + // + // The allocation and copy of a buffer can be avoided. + // make_live_from_serializer() allows the user code to specify the timestamp + // and size of the cell value as well as provide the serialiser function + // object, which would write the serialised value of the cell to the buffer + // given to it by make_live_from_serializer(). + template + GCC6_CONCEPT(requires requires(Serializer serializer, bytes::iterator it) { + serializer(it); + }) + static managed_bytes make_live_from_serializer(api::timestamp_type timestamp, size_t size, Serializer&& serializer) { + auto value_offset = flags_size + timestamp_size; + managed_bytes b(managed_bytes::initialized_later(), value_offset + size); + b[0] = LIVE_FLAG; + set_field(b, timestamp_offset, timestamp); + serializer(b.begin() + value_offset); + return b; + } template friend class atomic_cell_base; friend class atomic_cell; @@ -167,6 +219,9 @@ public: bool is_revert_set() const { return atomic_cell_type::is_revert_set(_data); } + bool is_counter_in_place_revert_set() const { + return atomic_cell_type::is_counter_in_place_revert_set(_data); + } bool is_live() const { return atomic_cell_type::is_live(_data); } @@ -189,10 +244,17 @@ public: api::timestamp_type timestamp() const { return atomic_cell_type::timestamp(_data); } + void set_timestamp(api::timestamp_type ts) { + atomic_cell_type::set_timestamp(_data, ts); + } // Can be called on live cells only - bytes_view value() const { + auto value() const { return atomic_cell_type::value(_data); } + // Can be called on live counter update cells only + int64_t counter_update_value() const { + return atomic_cell_type::counter_update_value(_data); + } // Can be called only when is_dead(gc_clock::time_point) gc_clock::time_point deletion_time() const { return !is_live() ? atomic_cell_type::deletion_time(_data) : expiry() - ttl(); @@ -215,6 +277,9 @@ public: void set_revert(bool revert) { atomic_cell_type::set_revert(_data, revert); } + void set_counter_in_place_revert(bool flag) { + atomic_cell_type::set_counter_in_place_revert(_data, flag); + } }; class atomic_cell_view final : public atomic_cell_base { @@ -226,6 +291,14 @@ public: friend std::ostream& operator<<(std::ostream& os, const atomic_cell_view& acv); }; +class atomic_cell_mutable_view final : public atomic_cell_base { + atomic_cell_mutable_view(bytes_mutable_view data) : atomic_cell_base(std::move(data)) {} +public: + static atomic_cell_mutable_view from_bytes(bytes_mutable_view data) { return atomic_cell_mutable_view(data); } + + friend class atomic_cell; +}; + class atomic_cell_ref final : public atomic_cell_base { public: atomic_cell_ref(managed_bytes& buf) : atomic_cell_base(buf) {} @@ -254,12 +327,9 @@ public: static atomic_cell make_live(api::timestamp_type timestamp, const bytes& value) { return make_live(timestamp, bytes_view(value)); } - static atomic_cell make_live_counter_update(api::timestamp_type timestamp, bytes_view value) { + static atomic_cell make_live_counter_update(api::timestamp_type timestamp, int64_t value) { return atomic_cell_type::make_live_counter_update(timestamp, value); } - static atomic_cell make_live_counter_update(api::timestamp_type timestamp, const bytes& value) { - return atomic_cell_type::make_live_counter_update(timestamp, bytes_view(value)); - } static atomic_cell make_live(api::timestamp_type timestamp, bytes_view value, gc_clock::time_point expiry, gc_clock::duration ttl) { @@ -277,6 +347,10 @@ public: return atomic_cell_type::make_live(timestamp, value, gc_clock::now() + *ttl, *ttl); } } + template + static atomic_cell make_live_from_serializer(api::timestamp_type timestamp, size_t size, Serializer&& serializer) { + return atomic_cell_type::make_live_from_serializer(timestamp, size, std::forward(serializer)); + } friend class atomic_cell_or_collection; friend std::ostream& operator<<(std::ostream& os, const atomic_cell& ac); }; diff --git a/atomic_cell_or_collection.hh b/atomic_cell_or_collection.hh index 93daf40fe2..4a3ea36588 100644 --- a/atomic_cell_or_collection.hh +++ b/atomic_cell_or_collection.hh @@ -39,10 +39,14 @@ public: static atomic_cell_or_collection from_atomic_cell(atomic_cell data) { return { std::move(data._data) }; } atomic_cell_view as_atomic_cell() const { return atomic_cell_view::from_bytes(_data); } atomic_cell_ref as_atomic_cell_ref() { return { _data }; } + atomic_cell_mutable_view as_mutable_atomic_cell() { return atomic_cell_mutable_view::from_bytes(_data); } atomic_cell_or_collection(collection_mutation cm) : _data(std::move(cm.data)) {} explicit operator bool() const { return !_data.empty(); } + bool can_use_mutable_view() const { + return !_data.is_fragmented(); + } static atomic_cell_or_collection from_collection_mutation(collection_mutation data) { return std::move(data.data); } diff --git a/bytes.hh b/bytes.hh index 95208f7be8..db03dcbb71 100644 --- a/bytes.hh +++ b/bytes.hh @@ -26,9 +26,11 @@ #include #include #include +#include "utils/mutable_view.hh" using bytes = basic_sstring; using bytes_view = std::experimental::basic_string_view; +using bytes_mutable_view = basic_mutable_view; using bytes_opt = std::experimental::optional; using sstring_view = std::experimental::string_view; diff --git a/cell_locking.hh b/cell_locking.hh index 3819356c24..8ac23136d8 100644 --- a/cell_locking.hh +++ b/cell_locking.hh @@ -136,7 +136,17 @@ public: class locked_cell; +struct cell_locker_stats { + uint64_t lock_acquisitions = 0; + uint64_t operations_waiting_for_lock = 0; +}; + class cell_locker { +public: + using timeout_clock = lowres_clock; +private: + using semaphore_type = basic_semaphore; + class partition_entry; struct cell_address { @@ -148,7 +158,7 @@ class cell_locker { public enable_lw_shared_from_this { partition_entry& _parent; cell_address _address; - semaphore _semaphore { 0 }; + semaphore_type _semaphore { 0 }; friend class cell_locker; public: @@ -177,8 +187,8 @@ class cell_locker { return _address.position; } - future<> lock() { - return _semaphore.wait(); + future<> lock(timeout_clock::time_point _timeout) { + return _semaphore.wait(_timeout); } void unlock() { _semaphore.signal(); @@ -236,14 +246,14 @@ class cell_locker { bi::hash, bi::constant_time_size>; - static constexpr size_t initial_bucket_count = 64; + static constexpr size_t initial_bucket_count = 16; using max_load_factor = std::ratio<3, 4>; - dht::decorated_key _key; cell_locker& _parent; size_t _rehash_at_size = compute_rehash_at_size(initial_bucket_count); std::unique_ptr _buckets; // TODO: start with internal storage? size_t _cell_count = 0; // cells_type::empty() is not O(1) if the hook is auto-unlink + cells_type::bucket_type _internal_buckets[initial_bucket_count]; cells_type _cells; schema_ptr _schema; @@ -267,8 +277,7 @@ class cell_locker { partition_entry(schema_ptr s, cell_locker& parent, const dht::decorated_key& dk) : _key(dk) , _parent(parent) - , _buckets(std::make_unique(initial_bucket_count)) - , _cells(cells_type::bucket_traits(_buckets.get(), initial_bucket_count), + , _cells(cells_type::bucket_traits(_internal_buckets, initial_bucket_count), cell_entry::hasher(*s), cell_entry::equal_compare(*s)) , _schema(s) { } @@ -335,6 +344,7 @@ class cell_locker { // partitions_type uses equality comparator which keeps a reference to the // original schema, we must ensure that it doesn't die. schema_ptr _original_schema; + cell_locker_stats& _stats; friend class locked_cell; private: @@ -355,12 +365,13 @@ private: } } public: - explicit cell_locker(schema_ptr s) + explicit cell_locker(schema_ptr s, cell_locker_stats& stats) : _buckets(std::make_unique(initial_bucket_count)) , _partitions(partitions_type::bucket_traits(_buckets.get(), initial_bucket_count), partition_entry::hasher(), partition_entry::equal_compare(*s)) , _schema(s) , _original_schema(std::move(s)) + , _stats(stats) { } ~cell_locker() { @@ -375,7 +386,8 @@ public: } // partition_cells_range is required to be in cell_locker::schema() - future> lock_cells(const dht::decorated_key& dk, partition_cells_range&& range); + future> lock_cells(const dht::decorated_key& dk, partition_cells_range&& range, + timeout_clock::time_point timeout); }; @@ -404,7 +416,9 @@ struct cell_locker::locker { partition_cells_range::iterator _current_ck; cells_range::const_iterator _current_cell; + timeout_clock::time_point _timeout; std::vector _locks; + cell_locker_stats& _stats; private: void update_ck() { if (!is_done()) { @@ -416,12 +430,14 @@ private: bool is_done() const { return _current_ck == _range.end(); } public: - explicit locker(const ::schema& s, partition_entry& pe, partition_cells_range&& range) + explicit locker(const ::schema& s, cell_locker_stats& st, partition_entry& pe, partition_cells_range&& range, timeout_clock::time_point timeout) : _hasher(s) , _eq_cmp(s) , _partition_entry(pe) , _range(std::move(range)) , _current_ck(_range.begin()) + , _timeout(timeout) + , _stats(st) { update_ck(); } @@ -442,7 +458,7 @@ public: }; inline -future> cell_locker::lock_cells(const dht::decorated_key& dk, partition_cells_range&& range) { +future> cell_locker::lock_cells(const dht::decorated_key& dk, partition_cells_range&& range, timeout_clock::time_point timeout) { partition_entry::hasher pe_hash; partition_entry::equal_compare pe_eq(*_schema); @@ -464,6 +480,7 @@ future> cell_locker::lock_cells(const dht::decorated_ke } for (auto&& c : r) { auto cell = make_lw_shared(*partition, position_in_partition(r.position()), c); + _stats.lock_acquisitions++; partition->insert(cell); locks.emplace_back(std::move(cell)); } @@ -477,7 +494,7 @@ future> cell_locker::lock_cells(const dht::decorated_ke return make_ready_future>(std::move(locks)); } - auto l = std::make_unique(*_schema, *it, std::move(range)); + auto l = std::make_unique(*_schema, _stats, *it, std::move(range), timeout); auto f = l->lock_all(); return f.then([l = std::move(l)] { return std::move(*l).get(); @@ -498,12 +515,16 @@ future<> cell_locker::locker::lock_next() { cell_address ca { position_in_partition(_current_ck->position()), cid }; auto it = _partition_entry.cells().find(ca, _hasher, _eq_cmp); if (it != _partition_entry.cells().end()) { - return it->lock().then([this, ce = it->shared_from_this()] () mutable { + _stats.operations_waiting_for_lock++; + return it->lock(_timeout).then([this, ce = it->shared_from_this()] () mutable { + _stats.operations_waiting_for_lock--; + _stats.lock_acquisitions++; _locks.emplace_back(std::move(ce)); }); } auto cell = make_lw_shared(_partition_entry, position_in_partition(_current_ck->position()), cid); + _stats.lock_acquisitions++; _partition_entry.insert(cell); _locks.emplace_back(std::move(cell)); } diff --git a/counters.cc b/counters.cc index c0ef4ed40b..cc7dc7fbfd 100644 --- a/counters.cc +++ b/counters.cc @@ -42,10 +42,79 @@ std::ostream& operator<<(std::ostream& os, counter_cell_view ccv) { return os << "{counter_cell timestamp: " << ccv.timestamp() << " shards: {" << ::join(", ", ccv.shards()) << "}}"; } +static bool apply_in_place(atomic_cell_or_collection& dst, atomic_cell_or_collection& src) +{ + auto dst_ccmv = counter_cell_mutable_view(dst.as_mutable_atomic_cell()); + auto src_ccmv = counter_cell_mutable_view(src.as_mutable_atomic_cell()); + auto dst_shards = dst_ccmv.shards(); + auto src_shards = src_ccmv.shards(); + + auto dst_it = dst_shards.begin(); + auto src_it = src_shards.begin(); + + while (src_it != src_shards.end()) { + while (dst_it != dst_shards.end() && dst_it->id() < src_it->id()) { + ++dst_it; + } + if (dst_it == dst_shards.end() || dst_it->id() != src_it->id()) { + // Fast-path failed. Revert and fall back to the slow path. + if (dst_it == dst_shards.end()) { + --dst_it; + } + while (src_it != src_shards.begin()) { + --src_it; + while (dst_it->id() != src_it->id()) { + --dst_it; + } + src_it->swap_value_and_clock(*dst_it); + } + return false; + } + if (dst_it->logical_clock() < src_it->logical_clock()) { + dst_it->swap_value_and_clock(*src_it); + } else { + src_it->set_value_and_clock(*dst_it); + } + ++src_it; + } + + auto dst_ts = dst_ccmv.timestamp(); + auto src_ts = src_ccmv.timestamp(); + dst_ccmv.set_timestamp(std::max(dst_ts, src_ts)); + src_ccmv.set_timestamp(dst_ts); + src.as_mutable_atomic_cell().set_counter_in_place_revert(true); + return true; +} + +static void revert_in_place_apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src) +{ + assert(dst.can_use_mutable_view() && src.can_use_mutable_view()); + auto dst_ccmv = counter_cell_mutable_view(dst.as_mutable_atomic_cell()); + auto src_ccmv = counter_cell_mutable_view(src.as_mutable_atomic_cell()); + auto dst_shards = dst_ccmv.shards(); + auto src_shards = src_ccmv.shards(); + + auto dst_it = dst_shards.begin(); + auto src_it = src_shards.begin(); + + while (src_it != src_shards.end()) { + while (dst_it != dst_shards.end() && dst_it->id() < src_it->id()) { + ++dst_it; + } + assert(dst_it != dst_shards.end() && dst_it->id() == src_it->id()); + dst_it->swap_value_and_clock(*src_it); + ++src_it; + } + + auto dst_ts = dst_ccmv.timestamp(); + auto src_ts = src_ccmv.timestamp(); + dst_ccmv.set_timestamp(src_ts); + src_ccmv.set_timestamp(dst_ts); + src.as_mutable_atomic_cell().set_counter_in_place_revert(false); +} + bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_cell_or_collection& src) { - // TODO: optimise for single shard existing in the other - // TODO: optimise for no new shards? auto dst_ac = dst.as_atomic_cell(); auto src_ac = src.as_atomic_cell(); @@ -58,23 +127,29 @@ bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_ } if (dst_ac.is_counter_update() && src_ac.is_counter_update()) { - // FIXME: store deltas just as a normal int64_t and get rid of these calls - // to long_type - auto src_v = value_cast(long_type->deserialize_value(src_ac.value())); - auto dst_v = value_cast(long_type->deserialize_value(dst_ac.value())); + auto src_v = src_ac.counter_update_value(); + auto dst_v = dst_ac.counter_update_value(); dst = atomic_cell::make_live_counter_update(std::max(dst_ac.timestamp(), src_ac.timestamp()), - long_type->decompose(src_v + dst_v)); + src_v + dst_v); return true; } assert(!dst_ac.is_counter_update()); assert(!src_ac.is_counter_update()); - auto a_shards = counter_cell_view(dst_ac).shards(); - auto b_shards = counter_cell_view(src_ac).shards(); + if (counter_cell_view(dst_ac).shard_count() >= counter_cell_view(src_ac).shard_count() + && dst.can_use_mutable_view() && src.can_use_mutable_view()) { + if (apply_in_place(dst, src)) { + return true; + } + } + + src.as_mutable_atomic_cell().set_counter_in_place_revert(false); + auto dst_shards = counter_cell_view(dst_ac).shards(); + auto src_shards = counter_cell_view(src_ac).shards(); counter_cell_builder result; - combine(a_shards.begin(), a_shards.end(), b_shards.begin(), b_shards.end(), + combine(dst_shards.begin(), dst_shards.end(), src_shards.begin(), src_shards.end(), result.inserter(), counter_shard_view::less_compare_by_id(), [] (auto& x, auto& y) { return x.logical_clock() < y.logical_clock() ? y : x; }); @@ -87,10 +162,12 @@ bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_ void counter_cell_view::revert_apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src) { if (dst.as_atomic_cell().is_counter_update()) { - auto src_v = value_cast(long_type->deserialize_value(src.as_atomic_cell().value())); - auto dst_v = value_cast(long_type->deserialize_value(dst.as_atomic_cell().value())); + auto src_v = src.as_atomic_cell().counter_update_value(); + auto dst_v = dst.as_atomic_cell().counter_update_value(); dst = atomic_cell::make_live(dst.as_atomic_cell().timestamp(), long_type->decompose(dst_v - src_v)); + } else if (src.as_atomic_cell().is_counter_in_place_revert_set()) { + revert_in_place_apply(dst, src); } else { std::swap(dst, src); } @@ -145,10 +222,9 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st if (!acv.is_live()) { return; // continue -- we are in lambda } - auto delta = value_cast(long_type->deserialize_value(acv.value())); - counter_cell_builder ccb; - ccb.add_shard(counter_shard(counter_id::local(), delta, clock_offset + 1)); - ac_o_c = ccb.build(acv.timestamp()); + auto delta = acv.counter_update_value(); + auto cs = counter_shard(counter_id::local(), delta, clock_offset + 1); + ac_o_c = counter_cell_builder::from_single_shard(acv.timestamp(), cs); }); }; @@ -173,17 +249,10 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st continue; } - struct counter_shard_or_tombstone { - stdx::optional shard; - tombstone tomb; - }; - std::deque> shards; + std::deque> shards; it->row().cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& ac_o_c) { auto acv = ac_o_c.as_atomic_cell(); if (!acv.is_live()) { - counter_shard_or_tombstone cs_o_t { { }, - tombstone(acv.timestamp(), acv.deletion_time()) }; - shards.emplace_back(std::make_pair(id, cs_o_t)); return; // continue -- we are in lambda } counter_cell_view ccv(acv); @@ -191,7 +260,7 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st if (!cs) { return; // continue } - shards.emplace_back(std::make_pair(id, counter_shard_or_tombstone { counter_shard(*cs), tombstone() })); + shards.emplace_back(std::make_pair(id, counter_shard(*cs))); }); cr.row().cells().for_each_cell([&] (column_id id, atomic_cell_or_collection& ac_o_c) { @@ -203,26 +272,17 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st shards.pop_front(); } - auto delta = value_cast(long_type->deserialize_value(acv.value())); + auto delta = acv.counter_update_value(); - counter_cell_builder ccb; if (shards.empty() || shards.front().first > id) { - ccb.add_shard(counter_shard(counter_id::local(), delta, clock_offset + 1)); - } else if (shards.front().second.tomb.timestamp == api::missing_timestamp) { - auto& cs = *shards.front().second.shard; - cs.update(delta, clock_offset + 1); - ccb.add_shard(cs); - shards.pop_front(); + auto cs = counter_shard(counter_id::local(), delta, clock_offset + 1); + ac_o_c = counter_cell_builder::from_single_shard(acv.timestamp(), cs); } else { - // We are apply the tombstone that's already there second time. - // It is not necessary but there is no easy way to remove cell - // from a mutation. - tombstone t = shards.front().second.tomb; - ac_o_c = atomic_cell::make_dead(t.timestamp, t.deletion_time); + auto& cs = shards.front().second; + cs.update(delta, clock_offset + 1); + ac_o_c = counter_cell_builder::from_single_shard(acv.timestamp(), cs); shards.pop_front(); - return; // continue -- we are in lambda } - ac_o_c = ccb.build(acv.timestamp()); }); } } diff --git a/counters.hh b/counters.hh index 7f362f7114..030ac1dbb5 100644 --- a/counters.hh +++ b/counters.hh @@ -67,7 +67,8 @@ static_assert(std::is_pod::value, "counter_id should be a POD type") std::ostream& operator<<(std::ostream& os, const counter_id& id); -class counter_shard_view { +template +class basic_counter_shard_view { enum class offset : unsigned { id = 0u, value = unsigned(id) + sizeof(counter_id), @@ -75,32 +76,58 @@ class counter_shard_view { total_size = unsigned(logical_clock) + sizeof(int64_t), }; private: - bytes_view::const_pointer _base; + typename View::pointer _base; private: template T read(offset off) const { T value; - std::copy_n(_base + static_cast(off), sizeof(T), reinterpret_cast(&value)); + std::copy_n(_base + static_cast(off), sizeof(T), reinterpret_cast(&value)); return value; } public: static constexpr auto size = size_t(offset::total_size); public: - counter_shard_view() = default; - explicit counter_shard_view(bytes_view::const_pointer ptr) noexcept + basic_counter_shard_view() = default; + explicit basic_counter_shard_view(typename View::pointer ptr) noexcept : _base(ptr) { } counter_id id() const { return read(offset::id); } int64_t value() const { return read(offset::value); } int64_t logical_clock() const { return read(offset::logical_clock); } + void swap_value_and_clock(basic_counter_shard_view& other) noexcept { + static constexpr size_t off = size_t(offset::value); + static constexpr size_t size = size_t(offset::total_size) - off; + + typename View::value_type tmp[size]; + std::copy_n(_base + off, size, tmp); + std::copy_n(other._base + off, size, _base + off); + std::copy_n(tmp, size, other._base + off); + } + + void set_value_and_clock(const basic_counter_shard_view& other) noexcept { + static constexpr size_t off = size_t(offset::value); + static constexpr size_t size = size_t(offset::total_size) - off; + std::copy_n(other._base + off, size, _base + off); + } + + bool operator==(const basic_counter_shard_view& other) const { + return id() == other.id() && value() == other.value() + && logical_clock() == other.logical_clock(); + } + bool operator!=(const basic_counter_shard_view& other) const { + return !(*this == other); + } + struct less_compare_by_id { - bool operator()(const counter_shard_view& x, const counter_shard_view& y) const { + bool operator()(const basic_counter_shard_view& x, const basic_counter_shard_view& y) const { return x.id() < y.id(); } }; }; +using counter_shard_view = basic_counter_shard_view; + std::ostream& operator<<(std::ostream& os, counter_shard_view csv); class counter_shard { @@ -110,7 +137,7 @@ class counter_shard { private: template static void write(const T& value, bytes::iterator& out) { - out = std::copy_n(reinterpret_cast(&value), sizeof(T), out); + out = std::copy_n(reinterpret_cast(&value), sizeof(T), out); } public: counter_shard(counter_id id, int64_t value, int64_t logical_clock) noexcept @@ -180,10 +207,15 @@ public: } atomic_cell build(api::timestamp_type timestamp) const { - bytes b(bytes::initialized_later(), serialized_size()); - auto out = b.begin(); - serialize(out); - return atomic_cell::make_live(timestamp, b); + return atomic_cell::make_live_from_serializer(timestamp, serialized_size(), [this] (bytes::iterator out) { + serialize(out); + }); + } + + static atomic_cell from_single_shard(api::timestamp_type timestamp, const counter_shard& cs) { + return atomic_cell::make_live_from_serializer(timestamp, counter_shard::serialized_size(), [&cs] (bytes::iterator out) { + cs.serialize(out); + }); } class inserter_iterator : public std::iterator { @@ -210,26 +242,28 @@ public: // := // := // := * -class counter_cell_view { - atomic_cell_view _cell; +template +class basic_counter_cell_view { +protected: + atomic_cell_base _cell; private: - class shard_iterator : public std::iterator { - bytes_view::const_pointer _current; - counter_shard_view _current_view; + class shard_iterator : public std::iterator> { + typename View::pointer _current; + basic_counter_shard_view _current_view; public: shard_iterator() = default; - shard_iterator(bytes_view::const_pointer ptr) noexcept + shard_iterator(typename View::pointer ptr) noexcept : _current(ptr), _current_view(ptr) { } - const counter_shard_view& operator*() const noexcept { + basic_counter_shard_view& operator*() noexcept { return _current_view; } - const counter_shard_view* operator->() const noexcept { + basic_counter_shard_view* operator->() noexcept { return &_current_view; } shard_iterator& operator++() noexcept { _current += counter_shard_view::size; - _current_view = counter_shard_view(_current); + _current_view = basic_counter_shard_view(_current); return *this; } shard_iterator operator++(int) noexcept { @@ -237,6 +271,16 @@ private: operator++(); return it; } + shard_iterator& operator--() noexcept { + _current -= counter_shard_view::size; + _current_view = basic_counter_shard_view(_current); + return *this; + } + shard_iterator operator--(int) noexcept { + auto it = *this; + operator--(); + return it; + } bool operator==(const shard_iterator& other) const noexcept { return _current == other._current; } @@ -257,7 +301,7 @@ public: } public: // ac must be a live counter cell - explicit counter_cell_view(atomic_cell_view ac) noexcept : _cell(ac) { + explicit basic_counter_cell_view(atomic_cell_base ac) noexcept : _cell(ac) { assert(_cell.is_live()); assert(!_cell.is_counter_update()); } @@ -287,6 +331,14 @@ public: return get_shard(counter_id::local()); } + bool operator==(const basic_counter_cell_view& other) const { + return timestamp() == other.timestamp() && boost::equal(shards(), other.shards()); + } +}; + +struct counter_cell_view : basic_counter_cell_view { + using basic_counter_cell_view::basic_counter_cell_view; + // Reversibly applies two counter cells, at least one of them must be live. // Returns true iff dst was modified. static bool apply_reversibly(atomic_cell_or_collection& dst, atomic_cell_or_collection& src); @@ -301,6 +353,12 @@ public: friend std::ostream& operator<<(std::ostream& os, counter_cell_view ccv); }; +struct counter_cell_mutable_view : basic_counter_cell_view { + using basic_counter_cell_view::basic_counter_cell_view; + + void set_timestamp(api::timestamp_type ts) { _cell.set_timestamp(ts); } +}; + // Transforms mutation dst from counter updates to counter shards using state // stored in current_state. // If current_state is present it has to be in the same schema as dst. diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index 9cfb7e1f48..87ca91a492 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -147,8 +147,7 @@ public: }; atomic_cell make_counter_update_cell(int64_t delta) const { - // FIXME: create directly from int64_t to avoid allocation - return atomic_cell::make_live_counter_update(_timestamp, long_type->decompose(delta)); + return atomic_cell::make_live_counter_update(_timestamp, delta); } tombstone make_tombstone() const { diff --git a/database.cc b/database.cc index 624bdde93a..e1569f2871 100644 --- a/database.cc +++ b/database.cc @@ -52,6 +52,7 @@ #include #include #include +#include #include #include "frozen_mutation.hh" #include "mutation_partition_applier.hh" @@ -124,7 +125,7 @@ column_family::make_streaming_memtable_big_list(streaming_memtable_big& smb) { return make_lw_shared(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager); } -column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) +column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager, cell_locker_stats& cl_stats) : _schema(std::move(schema)) , _config(std::move(config)) , _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list()) @@ -135,7 +136,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl , _commitlog(cl) , _compaction_manager(compaction_manager) , _flush_queue(std::make_unique()) - , _counter_cell_locks(std::make_unique(_schema)) + , _counter_cell_locks(std::make_unique(_schema, cl_stats)) { if (!_config.enable_disk_writes) { dblog.warn("Writes disabled, column family no durable."); @@ -668,9 +669,9 @@ column_family::make_streaming_reader(schema_ptr s, return make_multi_range_reader(s, std::move(source), ranges, slice, pc, nullptr, streamed_mutation::forwarding::no); } -future> column_family::lock_counter_cells(const mutation& m) { +future> column_family::lock_counter_cells(const mutation& m, timeout_clock::time_point timeout) { assert(m.schema() == _counter_cell_locks->schema()); - return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition())); + return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout); } // Not performance critical. Currently used for testing only. @@ -1696,6 +1697,7 @@ database::database() : database(db::config()) database::database(const db::config& cfg) : _stats(make_lw_shared()) + , _cl_stats(std::make_unique()) , _cfg(std::make_unique(cfg)) // Allow system tables a pool of 10 MB memory to write, but never block on other regions. , _system_dirty_memory_manager(*this, 10 << 20) @@ -1823,6 +1825,12 @@ database::setup_metrics() { sm::make_derive("short_mutation_queries", _stats->short_mutation_queries, sm::description("The rate of mutation queries that returned less rows than requested due to result size limiting.")), + + sm::make_total_operations("counter_cell_lock_acquisition", _cl_stats->lock_acquisitions, + sm::description("The number of acquired counter cell locks.")), + + sm::make_queue_length("counter_cell_lock_pending", _cl_stats->operations_waiting_for_lock, + sm::description("The number of counter updates waiting for a lock.")), }); } @@ -2064,9 +2072,9 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family: lw_shared_ptr cf; if (cfg.enable_commitlog && _commitlog) { - cf = make_lw_shared(schema, std::move(cfg), *_commitlog, _compaction_manager); + cf = make_lw_shared(schema, std::move(cfg), *_commitlog, _compaction_manager, *_cl_stats); } else { - cf = make_lw_shared(schema, std::move(cfg), column_family::no_commitlog(), _compaction_manager); + cf = make_lw_shared(schema, std::move(cfg), column_family::no_commitlog(), _compaction_manager, *_cl_stats); } auto uuid = schema->id(); @@ -2619,62 +2627,58 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const do_apply(rp, m, m_schema); } -future database::do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema) { +future database::do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, + timeout_clock::time_point timeout,tracing::trace_state_ptr trace_state) { auto m = fm.unfreeze(m_schema); m.upgrade(cf.schema()); // prepare partition slice - query::clustering_row_ranges cr_ranges; - std::vector static_columns; static_columns.reserve(m.partition().static_row().size()); - m.partition().static_row().for_each_cell([&] (auto id, auto) { + m.partition().static_row().for_each_cell([&] (auto id, auto&&) { static_columns.emplace_back(id); }); - std::set regular_columns; + query::clustering_row_ranges cr_ranges; + cr_ranges.reserve(8); + std::vector regular_columns; + regular_columns.reserve(32); + for (auto&& cr : m.partition().clustered_rows()) { cr_ranges.emplace_back(query::clustering_range::make_singular(cr.key())); - cr.row().cells().for_each_cell([&] (auto id, auto) { - regular_columns.emplace(id); + cr.row().cells().for_each_cell([&] (auto id, auto&&) { + regular_columns.emplace_back(id); }); } - auto slice = query::partition_slice(std::move(cr_ranges), std::move(static_columns), - boost::copy_range>(regular_columns), { }, { }, - cql_serialization_format::internal(), query::max_rows); + boost::sort(regular_columns); + regular_columns.erase(std::unique(regular_columns.begin(), regular_columns.end()), + regular_columns.end()); - return do_with(std::move(slice), std::move(m), std::vector(), stdx::optional(), - [this, &cf] (const query::partition_slice& slice, mutation& m, std::vector& locks, - stdx::optional& fm) mutable { - return cf.lock_counter_cells(m).then([&, m_schema = cf.schema(), this] (std::vector lcs) { + auto slice = query::partition_slice(std::move(cr_ranges), std::move(static_columns), + std::move(regular_columns), { }, { }, cql_serialization_format::internal(), query::max_rows); + + return do_with(std::move(slice), std::move(m), std::vector(), + [this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, std::vector& locks) mutable { + tracing::trace(trace_state, "Acquiring counter locks"); + return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector lcs) mutable { locks = std::move(lcs); // Before counter update is applied it needs to be transformed from // deltas to counter shards. To do that, we need to read the current // counter state for each modified cell... - // FIXME: tracing - return mutation_query(m_schema, cf.as_mutation_source(), - dht::partition_range::make_singular(m.decorated_key()), - slice, query::max_rows, query::max_partitions, - gc_clock::now(), { }).then([this, &cf, &m, &fm, m_schema] (auto result) { - + tracing::trace(trace_state, "Reading counter values from the CF"); + return counter_write_query(m_schema, cf.as_mutation_source(), m.decorated_key(), slice, trace_state) + .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter // cells we can look for our shard in each of them, increment // its clock and apply the delta. - - auto& partitions = result.partitions(); - mutation_opt mopt = partitions.empty() ? mutation_opt() - : partitions[0].mut().unfreeze(m_schema); transform_counter_updates_to_shards(m, mopt ? &*mopt : nullptr, cf.failed_counter_applies_to_memtable()); - - // FIXME: oh dear, another freeze - // FIXME: timeout - fm = freeze(m); - return this->do_apply(m_schema, *fm, { }); - }).then([&fm] { - return std::move(*fm); + tracing::trace(trace_state, "Applying counter update"); + return this->apply_with_commitlog(cf, m, timeout); + }).then([&m] { + return std::move(m); }); }); }); @@ -2825,20 +2829,50 @@ future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema }, timeout); } -future database::apply_counter_update(schema_ptr s, const frozen_mutation& m, timeout_clock::time_point timeout) { +future<> database::apply_in_memory(const mutation& m, column_family& cf, db::replay_position rp, timeout_clock::time_point timeout) { + return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, &cf, rp = std::move(rp)] { + cf.apply(m, rp); + }, timeout); +} + +future database::apply_counter_update(schema_ptr s, const frozen_mutation& m, timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) { if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } try { auto& cf = find_column_family(m.column_family_id()); - return do_apply_counter_update(cf, m, s); + return do_apply_counter_update(cf, m, s, timeout, std::move(trace_state)); } catch (no_such_column_family&) { dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); throw; } } +future<> database::apply_with_commitlog(column_family& cf, const mutation& m, timeout_clock::time_point timeout) { + if (cf.commitlog() != nullptr) { + return do_with(freeze(m), [this, &m, &cf, timeout] (frozen_mutation& fm) { + commitlog_entry_writer cew(m.schema(), fm); + return cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); + }).then([this, &m, &cf, timeout] (db::replay_position rp) { + return apply_in_memory(m, cf, rp, timeout).handle_exception([this, &cf, &m, timeout] (auto ep) { + try { + std::rethrow_exception(ep); + } catch (replay_position_reordered_exception&) { + // expensive, but we're assuming this is super rare. + // if we failed to apply the mutation due to future re-ordering + // (which should be the ever only reason for rp mismatch in CF) + // let's just try again, add the mutation to the CL once more, + // and assume success in inevitable eventually. + dblog.debug("replay_position reordering detected"); + return this->apply_with_commitlog(cf, m, timeout); + } + }); + }); + } + return apply_in_memory(m, cf, db::replay_position(), timeout); +} + future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::UUID uuid, const frozen_mutation& m, timeout_clock::time_point timeout) { if (cf.commitlog() != nullptr) { commitlog_entry_writer cew(s, m); diff --git a/database.hh b/database.hh index 4a654de3eb..9ee817d052 100644 --- a/database.hh +++ b/database.hh @@ -79,6 +79,7 @@ #include "lister.hh" class cell_locker; +class cell_locker_stats; class locked_cell; class frozen_mutation; @@ -393,6 +394,8 @@ struct cf_stats { class column_family { public: + using timeout_clock = lowres_clock; + struct config { sstring datadir; bool enable_disk_writes = true; @@ -662,16 +665,16 @@ public: return _cache; } - future> lock_counter_cells(const mutation& m); + future> lock_counter_cells(const mutation& m, timeout_clock::time_point timeout); logalloc::occupancy_stats occupancy() const; private: - column_family(schema_ptr schema, config cfg, db::commitlog* cl, compaction_manager&); + column_family(schema_ptr schema, config cfg, db::commitlog* cl, compaction_manager&, cell_locker_stats& cl_stats); public: - column_family(schema_ptr schema, config cfg, db::commitlog& cl, compaction_manager& cm) - : column_family(schema, std::move(cfg), &cl, cm) {set_metrics();} - column_family(schema_ptr schema, config cfg, no_commitlog, compaction_manager& cm) - : column_family(schema, std::move(cfg), nullptr, cm) {set_metrics();} + column_family(schema_ptr schema, config cfg, db::commitlog& cl, compaction_manager& cm, cell_locker_stats& cl_stats) + : column_family(schema, std::move(cfg), &cl, cm, cl_stats) {set_metrics();} + column_family(schema_ptr schema, config cfg, no_commitlog, compaction_manager& cm, cell_locker_stats& cl_stats) + : column_family(schema, std::move(cfg), nullptr, cm, cl_stats) {set_metrics();} column_family(column_family&&) = delete; // 'this' is being captured during construction ~column_family(); const schema_ptr& schema() const { return _schema; } @@ -1098,6 +1101,7 @@ private: }; lw_shared_ptr _stats; + std::unique_ptr _cl_stats; std::unique_ptr _cfg; @@ -1122,7 +1126,7 @@ private: future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position, timeout_clock::time_point timeout); - + future<> apply_in_memory(const mutation& m, column_family& cf, db::replay_position rp, timeout_clock::time_point timeout); private: // Unless you are an earlier boostraper or the database itself, you should // not be using this directly. Go for the public create_keyspace instead. @@ -1133,10 +1137,12 @@ private: future<> do_apply(schema_ptr, const frozen_mutation&, timeout_clock::time_point timeout); future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, timeout_clock::time_point timeout); + future<> apply_with_commitlog(column_family& cf, const mutation& m, timeout_clock::time_point timeout); query::result_memory_limiter _result_memory_limiter; - future do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema); + future do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, timeout_clock::time_point timeout, + tracing::trace_state_ptr trace_state); public: static utils::UUID empty_version; @@ -1211,7 +1217,7 @@ public: // Throws timed_out_error when timeout is reached. future<> apply(schema_ptr, const frozen_mutation&, timeout_clock::time_point timeout = timeout_clock::time_point::max()); future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented); - future apply_counter_update(schema_ptr, const frozen_mutation& m, timeout_clock::time_point timeout = timeout_clock::time_point::max()); + future apply_counter_update(schema_ptr, const frozen_mutation& m, timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state); keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; future<> clear_snapshot(sstring tag, std::vector keyspace_names); diff --git a/db/config.hh b/db/config.hh index 30679bbed6..e63866c2a2 100644 --- a/db/config.hh +++ b/db/config.hh @@ -489,7 +489,7 @@ public: val(read_request_timeout_in_ms, uint32_t, 5000, Used, \ "The time that the coordinator waits for read operations to complete" \ ) \ - val(counter_write_request_timeout_in_ms, uint32_t, 5000, Unused, \ + val(counter_write_request_timeout_in_ms, uint32_t, 5000, Used, \ "The time that the coordinator waits for counter writes to complete." \ ) \ val(cas_contention_timeout_in_ms, uint32_t, 5000, Unused, \ diff --git a/mutation_partition.cc b/mutation_partition.cc index 2a387eb92c..662f749dce 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1998,3 +1998,50 @@ mutation_query(schema_ptr s, bool row_tombstone_is_shadowed(const schema& schema, const tombstone& row_tombstone, const row_marker& marker) { return schema.is_view() && marker.timestamp() > row_tombstone.timestamp; } + +deletable_row::deletable_row(clustering_row&& cr) + : _deleted_at(cr.tomb()) + , _marker(std::move(cr.marker())) + , _cells(std::move(cr.cells())) +{ } + +class counter_write_query_result_builder { + const schema& _schema; + mutation_opt _mutation; +public: + counter_write_query_result_builder(const schema& s) : _schema(s) { } + void consume_new_partition(const dht::decorated_key& dk) { + _mutation = mutation(dk, _schema.shared_from_this()); + } + void consume(tombstone) { } + stop_iteration consume(static_row&& sr, tombstone, bool) { + _mutation->partition().static_row() = std::move(sr.cells()); + return stop_iteration::no; + } + stop_iteration consume(clustering_row&& cr, tombstone, bool) { + _mutation->partition().insert_row(_schema, cr.key(), deletable_row(std::move(cr))); + return stop_iteration::no; + } + stop_iteration consume(range_tombstone&& rt) { + return stop_iteration::no; + } + stop_iteration consume_end_of_partition() { + return stop_iteration::no; + } + mutation_opt consume_end_of_stream() { + return std::move(_mutation); + } +}; + +future counter_write_query(schema_ptr s, const mutation_source& source, + const dht::decorated_key& dk, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_ptr) +{ + auto cwqrb = counter_write_query_result_builder(*s); + auto cfq = make_stable_flattened_mutations_consumer>( + *s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb)); + auto reader = source(s, dht::partition_range::make_singular(dk), slice, + service::get_local_sstable_query_read_priority(), std::move(trace_ptr)); + return consume_flattened(std::move(reader), std::move(cfq), false); +} diff --git a/mutation_partition.hh b/mutation_partition.hh index 679f20db99..f66b15d929 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -411,12 +411,15 @@ struct appending_hash { } }; +class clustering_row; + class deletable_row final { tombstone _deleted_at; row_marker _marker; row _cells; public: deletable_row() {} + explicit deletable_row(clustering_row&&); void apply(tombstone deleted_at) { _deleted_at.apply(deleted_at); @@ -724,6 +727,7 @@ public: private: template void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const; + friend class counter_write_query_result_builder; }; // A shadowable row tombstone is valid only if the row has no live marker. In other words, diff --git a/mutation_partition_serializer.cc b/mutation_partition_serializer.cc index c33f7e5815..2214d7e68c 100644 --- a/mutation_partition_serializer.cc +++ b/mutation_partition_serializer.cc @@ -55,7 +55,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c) auto value = std::move(writer).write_created_at(c.timestamp()); return [&c, value = std::move(value)] () mutable { if (c.is_counter_update()) { - auto delta = value_cast(long_type->deserialize_value(c.value())); + auto delta = c.counter_update_value(); return std::move(value).start_value_counter_cell_update() .write_delta(delta) .end_counter_cell_update(); diff --git a/mutation_partition_view.cc b/mutation_partition_view.cc index 08f45d7f2a..c36e564f6a 100644 --- a/mutation_partition_view.cc +++ b/mutation_partition_view.cc @@ -78,7 +78,7 @@ atomic_cell read_atomic_cell(atomic_cell_variant cv) return ccb.build(_created_at); } atomic_cell operator()(ser::counter_cell_update_view& ccv) const { - return atomic_cell::make_live_counter_update(_created_at, long_type->decompose(ccv.delta())); + return atomic_cell::make_live_counter_update(_created_at, ccv.delta()); } atomic_cell operator()(ser::unknown_variant_type&) const { throw std::runtime_error("Trying to deserialize counter cell in unknown state"); diff --git a/mutation_query.hh b/mutation_query.hh index 1df08275e9..0107e2493e 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -139,3 +139,10 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, tracing::trace_state_ptr trace_ptr = nullptr); + +// Performs a query for counter updates. +future counter_write_query(schema_ptr, const mutation_source&, + const dht::decorated_key& dk, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_ptr); + diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index ae8ad5b932..a0165d9d06 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -229,6 +229,9 @@ public: _ready.set_exception(mutation_write_timeout_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, total_block_for(), _type)); } }; + bool is_counter() const { + return _type == db::write_type::COUNTER; + } void unthrottle() { _proxy->_stats.background_writes++; _proxy->_stats.background_write_bytes += _mutation_holder->size(); @@ -532,6 +535,9 @@ storage_proxy::storage_proxy(distributed& db) : _db(db) { }); _metrics.add_group(REPLICA_STATS_CATEGORY, { + sm::make_total_operations("received_counter_updates", _stats.received_counter_updates, + sm::description("number of counter updates received by this node acting as an update leader")), + sm::make_total_operations("received_mutations", _stats.received_mutations, sm::description("number of mutations received by a replica Node")), @@ -977,30 +983,25 @@ storage_proxy::mutate_locally(std::vector mutations, clock_type::time_ } future<> -storage_proxy::mutate_counters_on_leader(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout) { - return do_with(std::vector(), [this, cl, timeout, mutations = std::move(mutations)] (std::vector& ms_for_replication) mutable { - ms_for_replication.reserve(mutations.size()); - return parallel_for_each(std::move(mutations), [this, cl, timeout, &ms_for_replication] (const mutation& m) { - return mutate_counter_on_leader(m, timeout).then([&] (mutation m) { - ms_for_replication.emplace_back(std::move(m)); - }); - }).then([&, this, cl] { - return replicate_counters_from_leader(std::move(ms_for_replication), cl, { }); +storage_proxy::mutate_counters_on_leader(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout, + tracing::trace_state_ptr trace_state) { + _stats.received_counter_updates += mutations.size(); + return do_with(std::move(mutations), [this, cl, timeout, trace_state = std::move(trace_state)] (std::vector& update_ms) mutable { + return parallel_for_each(update_ms, [this, cl, timeout, trace_state] (frozen_mutation_and_schema& fm_a_s) { + return mutate_counter_on_leader_and_replicate(fm_a_s.s, std::move(fm_a_s.fm), cl, timeout, trace_state); }); }); } -future -storage_proxy::mutate_counter_on_leader(const mutation& m, clock_type::time_point timeout) { - auto fm = freeze(m); +future<> +storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation fm, db::consistency_level cl, clock_type::time_point timeout, + tracing::trace_state_ptr trace_state) { auto shard = _db.local().shard_of(fm); - return _db.invoke_on(shard, [gs = global_schema_ptr(m.schema()), fm = std::move(fm), timeout] (database& db) { - return db.apply_counter_update(gs, fm, timeout).then([] (frozen_mutation fm) { - return make_foreign(std::make_unique(std::move(fm))); + return _db.invoke_on(shard, [gs = global_schema_ptr(s), fm = std::move(fm), cl, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) { + auto trace_state = gt.get(); + return db.apply_counter_update(gs, fm, timeout, trace_state).then([cl, timeout, trace_state] (mutation m) mutable { + return service::get_local_storage_proxy().replicate_counter_from_leader(std::move(m), cl, std::move(trace_state), timeout); }); - }).then([s = m.schema()] (foreign_ptr> fm) { - // FIXME: way too many freeze/unfreeze cycles - return fm->unfreeze(s); }); } @@ -1112,8 +1113,9 @@ future> storage_proxy::mutat }); } -future<> storage_proxy::mutate_begin(std::vector ids, db::consistency_level cl) { - return parallel_for_each(ids, [this, cl] (unique_response_handler& protected_response) { +future<> storage_proxy::mutate_begin(std::vector ids, db::consistency_level cl, + stdx::optional timeout_opt) { + return parallel_for_each(ids, [this, cl, timeout_opt] (unique_response_handler& protected_response) { auto response_id = protected_response.id; // it is better to send first and hint afterwards to reduce latency // but request may complete before hint_to_dead_endpoints() is called and @@ -1121,7 +1123,7 @@ future<> storage_proxy::mutate_begin(std::vector ids, d // frozen_mutation copy, or manage handler live time differently. hint_to_dead_endpoints(response_id, cl); - auto timeout = clock_type::now() + std::chrono::milliseconds(_db.local().get_config().write_request_timeout_in_ms()); + auto timeout = timeout_opt.value_or(clock_type::now() + std::chrono::milliseconds(_db.local().get_config().write_request_timeout_in_ms())); // call before send_to_live_endpoints() for the same reason as above auto f = response_wait(response_id, timeout); send_to_live_endpoints(protected_response.release(), timeout); // response is now running and it will either complete or timeout @@ -1202,31 +1204,49 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level } // Choose a leader for each mutation - std::unordered_map> leaders; + std::unordered_map> leaders; for (auto& m : mutations) { auto leader = find_leader_for_counter_update(m, cl); - leaders[leader].emplace_back(std::move(m)); + leaders[leader].emplace_back(frozen_mutation_and_schema { freeze(m), m.schema() }); // FIXME: check if CL can be reached } // Forward mutations to the leaders chosen for them - auto timeout = clock_type::now() + std::chrono::milliseconds(_db.local().get_config().write_request_timeout_in_ms()); + auto timeout = clock_type::now() + std::chrono::milliseconds(_db.local().get_config().counter_write_request_timeout_in_ms()); auto my_address = utils::fb_utilities::get_broadcast_address(); return parallel_for_each(leaders, [this, cl, timeout, tr_state = std::move(tr_state), my_address] (auto& endpoint_and_mutations) { auto endpoint = endpoint_and_mutations.first; + // The leader receives a vector of mutations and processes them together, + // so if there is a timeout we don't really know which one is to "blame" + // and what to put in ks and cf fields of write timeout exception. + // Let's just use the schema of the first mutation in a vector. + auto handle_error = [this, sp = this->shared_from_this(), s = endpoint_and_mutations.second[0].s, cl] (std::exception_ptr exp) { + auto& ks = _db.local().find_keyspace(s->ks_name()); + try { + std::rethrow_exception(std::move(exp)); + } catch (rpc::timeout_error&) { + return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER)); + } catch (timed_out_error&) { + return make_exception_future<>(mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(ks, cl), db::write_type::COUNTER)); + } + }; + + auto f = make_ready_future<>(); if (endpoint == my_address) { - return this->mutate_counters_on_leader(std::move(endpoint_and_mutations.second), cl, timeout); + f = this->mutate_counters_on_leader(std::move(endpoint_and_mutations.second), cl, timeout, tr_state); } else { auto& mutations = endpoint_and_mutations.second; auto fms = boost::copy_range>(mutations | boost::adaptors::transformed([] (auto& m) { - return freeze(m); + return std::move(m.fm); })); auto& ms = net::get_local_messaging_service(); auto msg_addr = net::messaging_service::msg_addr{ endpoint_and_mutations.first, 0 }; - return ms.send_counter_mutation(msg_addr, timeout, std::move(fms), cl, tracing::make_trace_info(tr_state)); + tracing::trace(tr_state, "Enqueuing counter update to {}", msg_addr); + f = ms.send_counter_mutation(msg_addr, timeout, std::move(fms), cl, tracing::make_trace_info(tr_state)); } + return f.handle_exception(std::move(handle_error)); }); } @@ -1250,9 +1270,10 @@ future<> storage_proxy::mutate(std::vector mutations, db::consistency_ ); } -future<> storage_proxy::replicate_counters_from_leader(std::vector mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state) { +future<> storage_proxy::replicate_counter_from_leader(mutation m, db::consistency_level cl, tracing::trace_state_ptr tr_state, + clock_type::time_point timeout) { // FIXME: do not send the mutation to itself, it has already been applied (it is not incorrect to do so, though) - return mutate_internal(std::move(mutations), cl, true, std::move(tr_state)); + return mutate_internal(std::array{std::move(m)}, cl, true, std::move(tr_state), timeout); } /* @@ -1262,7 +1283,8 @@ future<> storage_proxy::replicate_counters_from_leader(std::vector mut */ template future<> -storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool counters, tracing::trace_state_ptr tr_state) { +storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool counters, tracing::trace_state_ptr tr_state, + stdx::optional timeout_opt) { logger.trace("mutate cl={}", cl); mlogger.trace("mutations={}", mutations); if (boost::empty(mutations)) { @@ -1277,8 +1299,8 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c utils::latency_counter lc; lc.start(); - return mutate_prepare(mutations, cl, type, tr_state).then([this, cl] (std::vector ids) { - return mutate_begin(std::move(ids), cl); + return mutate_prepare(mutations, cl, type, tr_state).then([this, cl, timeout_opt] (std::vector ids) { + return mutate_begin(std::move(ids), cl, timeout_opt); }).then_wrapped([p = shared_from_this(), lc, tr_state] (future<> f) mutable { return p->mutate_end(std::move(f), lc, std::move(tr_state)); }); @@ -1504,7 +1526,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo lw_shared_ptr m = handler.get_mutation_for(coordinator); - if (!m) { + if (!m || (handler.is_counter() && coordinator == my_address)) { got_response(response_id, coordinator); } else { if (!handler.read_repair_write()) { @@ -3531,18 +3553,25 @@ void storage_proxy::init_messaging_service() { auto& ms = net::get_local_messaging_service(); ms.register_counter_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, std::vector fms, db::consistency_level cl, stdx::optional trace_info) { auto src_addr = net::messaging_service::get_source(cinfo); - // FIXME: tracing - return do_with(std::vector(), [cl, src_addr, timeout = *t, fms = std::move(fms)] (std::vector& mutations) mutable { + + tracing::trace_state_ptr trace_state_ptr; + if (trace_info) { + trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*trace_info); + tracing::begin(trace_state_ptr); + tracing::trace(trace_state_ptr, "Message received from /{}", src_addr.addr); + } + + return do_with(std::vector(), + [cl, src_addr, timeout = *t, fms = std::move(fms), trace_state_ptr = std::move(trace_state_ptr)] (std::vector& mutations) mutable { return parallel_for_each(std::move(fms), [&mutations, src_addr] (frozen_mutation& fm) { // FIXME: optimise for cases when all fms are in the same schema auto schema_version = fm.schema_version(); return get_schema_for_write(schema_version, std::move(src_addr)).then([&mutations, fm = std::move(fm)] (schema_ptr s) mutable { - // FIXME: unfreeze/freeze/unfreeze/freeze... - mutations.emplace_back(fm.unfreeze(s)); + mutations.emplace_back(frozen_mutation_and_schema { std::move(fm), std::move(s) }); }); - }).then([&, cl, timeout] { + }).then([trace_state_ptr = std::move(trace_state_ptr), &mutations, cl, timeout] { auto sp = get_local_shared_storage_proxy(); - return sp->mutate_counters_on_leader(std::move(mutations), cl, timeout); + return sp->mutate_counters_on_leader(std::move(mutations), cl, timeout, std::move(trace_state_ptr)); }); }); }); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 860ee0db6e..88a16d0736 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -52,6 +52,7 @@ #include "utils/estimated_histogram.hh" #include "tracing/trace_state.hh" #include +#include "frozen_mutation.hh" namespace compat { @@ -156,6 +157,9 @@ public: // number of mutations received as a coordinator uint64_t received_mutations = 0; + // number of counter updates received as a leader + uint64_t received_counter_updates = 0; + // number of forwarded mutations uint64_t forwarded_mutations = 0; uint64_t forwarding_errors = 0; @@ -266,19 +270,25 @@ private: future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler); template future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state); - future<> mutate_begin(std::vector ids, db::consistency_level cl); + future<> mutate_begin(std::vector ids, db::consistency_level cl, stdx::optional timeout_opt = { }); future<> mutate_end(future<> mutate_result, utils::latency_counter, tracing::trace_state_ptr trace_state); future<> schedule_repair(std::unordered_map>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state); bool need_throttle_writes() const; void unthrottle(); void handle_read_error(std::exception_ptr eptr, bool range); template - future<> mutate_internal(Range mutations, db::consistency_level cl, bool counter_write, tracing::trace_state_ptr tr_state); + future<> mutate_internal(Range mutations, db::consistency_level cl, bool counter_write, tracing::trace_state_ptr tr_state, stdx::optional timeout_opt = { }); future>> query_nonsingular_mutations_locally( schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector& pr, tracing::trace_state_ptr trace_state, uint64_t max_size); - future<> mutate_counters_on_leader(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout); - future mutate_counter_on_leader(const mutation& m, clock_type::time_point timeout); + struct frozen_mutation_and_schema { + frozen_mutation fm; + schema_ptr s; + }; + future<> mutate_counters_on_leader(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout, + tracing::trace_state_ptr trace_state); + future<> mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation m, db::consistency_level cl, clock_type::time_point timeout, + tracing::trace_state_ptr trace_state); gms::inet_address find_leader_for_counter_update(const mutation& m, db::consistency_level cl); public: @@ -314,7 +324,8 @@ public: */ future<> mutate(std::vector mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, bool raw_counters = false); - future<> replicate_counters_from_leader(std::vector mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state); + future<> replicate_counter_from_leader(mutation m, db::consistency_level cl, tracing::trace_state_ptr tr_state, + clock_type::time_point timeout); template future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state); diff --git a/tests/cell_locker_test.cc b/tests/cell_locker_test.cc index f6ac073fb8..498e2402f4 100644 --- a/tests/cell_locker_test.cc +++ b/tests/cell_locker_test.cc @@ -75,6 +75,10 @@ static schema_ptr make_schema_disjoint_with_others() static data_value empty_value = data_value(to_bytes("")); +static cell_locker::timeout_clock::time_point no_timeout { + cell_locker::timeout_clock::duration(std::numeric_limits::max()) +}; + static auto make_row(const sstring& key, std::initializer_list cells) { return std::pair>(key, cells); } @@ -100,15 +104,16 @@ SEASTAR_TEST_CASE(test_simple_locking_cells) { auto destroy = [] (auto) { }; auto s = make_schema(); - cell_locker cl(s); + cell_locker_stats cl_stats; + cell_locker cl(s, cl_stats); auto m = make_mutation(s, "0", { "s1", "s3" }, { make_row("one", { "r1", "r2" }), make_row("two", { "r2", "r3" }), }); - auto l1 = cl.lock_cells(m.decorated_key(), partition_cells_range(m.partition())).get0(); - auto f2 = cl.lock_cells(m.decorated_key(), partition_cells_range(m.partition())); + auto l1 = cl.lock_cells(m.decorated_key(), partition_cells_range(m.partition()), no_timeout).get0(); + auto f2 = cl.lock_cells(m.decorated_key(), partition_cells_range(m.partition()), no_timeout); BOOST_REQUIRE(!f2.available()); destroy(std::move(l1)); @@ -119,7 +124,8 @@ SEASTAR_TEST_CASE(test_simple_locking_cells) { SEASTAR_TEST_CASE(test_disjoint_mutations) { return seastar::async([&] { auto s = make_schema(); - cell_locker cl(s); + cell_locker_stats cl_stats; + cell_locker cl(s, cl_stats); auto m1 = make_mutation(s, "0", { "s1" }, { make_row("one", { "r1", "r2" }), @@ -133,9 +139,9 @@ SEASTAR_TEST_CASE(test_disjoint_mutations) { auto m3 = mutation(partition_key::from_single_value(*s, to_bytes("1")), s); m3.partition() = m1.partition(); - auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition())).get0(); - auto l2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition())).get0(); - auto l3 = cl.lock_cells(m3.decorated_key(), partition_cells_range(m3.partition())).get0(); + auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0(); + auto l2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout).get0(); + auto l3 = cl.lock_cells(m3.decorated_key(), partition_cells_range(m3.partition()), no_timeout).get0(); }); } @@ -144,7 +150,8 @@ SEASTAR_TEST_CASE(test_single_cell_overlap) { auto destroy = [] (auto) { }; auto s = make_schema(); - cell_locker cl(s); + cell_locker_stats cl_stats; + cell_locker cl(s, cl_stats); auto m1 = make_mutation(s, "0", { "s1" }, { make_row("one", { "r1", "r2" }), @@ -159,12 +166,12 @@ SEASTAR_TEST_CASE(test_single_cell_overlap) { make_row("one", { "r2", "r3" }), }); - auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition())).get0(); - auto f2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition())); + auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0(); + auto f2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout); BOOST_REQUIRE(!f2.available()); destroy(std::move(l1)); auto l2 = f2.get0(); - auto f3 = cl.lock_cells(m3.decorated_key(), partition_cells_range(m3.partition())); + auto f3 = cl.lock_cells(m3.decorated_key(), partition_cells_range(m3.partition()), no_timeout); BOOST_REQUIRE(!f3.available()); destroy(std::move(l2)); auto l3 = f3.get0(); @@ -177,7 +184,8 @@ SEASTAR_TEST_CASE(test_schema_change) { auto s1 = make_schema(); auto s2 = make_alternative_schema(); - cell_locker cl(s1); + cell_locker_stats cl_stats; + cell_locker cl(s1, cl_stats); auto m1 = make_mutation(s1, "0", { "s1", "s2", "s3"}, { make_row("one", { "r1", "r2", "r3" }), @@ -194,14 +202,14 @@ SEASTAR_TEST_CASE(test_schema_change) { make_row("one", { "r1", "r3" }), }); - auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition())).get0(); + auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0(); destroy(std::move(m1)); destroy(std::move(s1)); cl.set_schema(s2); - auto l2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition())).get0(); - auto f3 = cl.lock_cells(m3.decorated_key(), partition_cells_range(m3.partition())); + auto l2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout).get0(); + auto f3 = cl.lock_cells(m3.decorated_key(), partition_cells_range(m3.partition()), no_timeout); BOOST_REQUIRE(!f3.available()); destroy(std::move(l1)); auto l3 = f3.get0(); @@ -213,6 +221,66 @@ SEASTAR_TEST_CASE(test_schema_change) { make_row("one", { "r8", "r9" }), make_row("two", { "r8", "r9" }), }); - auto l4 = cl.lock_cells(m4.decorated_key(), partition_cells_range(m4.partition())).get0(); + auto l4 = cl.lock_cells(m4.decorated_key(), partition_cells_range(m4.partition()), no_timeout).get0(); + }); +} + +SEASTAR_TEST_CASE(test_timed_out) { + return seastar::async([&] { + auto destroy = [] (auto) { }; + + auto s = make_schema(); + cell_locker_stats cl_stats; + cell_locker cl(s, cl_stats); + + auto m1 = make_mutation(s, "0", { "s1", "s2", "s3"}, { + make_row("one", { "r2", "r3" }), + }); + auto m2 = make_mutation(s, "0", { }, { + make_row("one", { "r1", "r2" }), + }); + + auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0(); + + auto timeout = cell_locker::timeout_clock::now() - std::chrono::hours(1); + BOOST_REQUIRE_THROW(cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), timeout).get0(), + timed_out_error); + + auto f2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout); + BOOST_REQUIRE(!f2.available()); + destroy(std::move(l1)); + auto l2 = f2.get0(); + }); +} + +SEASTAR_TEST_CASE(test_locker_stats) { + return seastar::async([&] { + auto destroy = [] (auto) { }; + + auto s = make_schema(); + cell_locker_stats cl_stats; + cell_locker cl(s, cl_stats); + + auto m1 = make_mutation(s, "0", { "s2", "s3" }, { + make_row("one", { "r1", "r2" }), + }); + + auto m2 = make_mutation(s, "0", { "s1", "s3" }, { + make_row("one", { "r2", "r3" }), + }); + + auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0(); + BOOST_REQUIRE_EQUAL(cl_stats.lock_acquisitions, 4); + BOOST_REQUIRE_EQUAL(cl_stats.operations_waiting_for_lock, 0); + + auto f2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout); + BOOST_REQUIRE_EQUAL(cl_stats.lock_acquisitions, 5); + BOOST_REQUIRE_EQUAL(cl_stats.operations_waiting_for_lock, 1); + BOOST_REQUIRE(!f2.available()); + + destroy(std::move(l1)); + destroy(f2.get0()); + BOOST_REQUIRE_EQUAL(cl_stats.lock_acquisitions, 8); + BOOST_REQUIRE_EQUAL(cl_stats.operations_waiting_for_lock, 0); }); } diff --git a/tests/counter_test.cc b/tests/counter_test.cc index 4349413470..1ef2c7ab3f 100644 --- a/tests/counter_test.cc +++ b/tests/counter_test.cc @@ -69,6 +69,76 @@ SEASTAR_TEST_CASE(test_counter_cell) { }); } +SEASTAR_TEST_CASE(test_reversability_of_apply) { + return seastar::async([] { + auto verify_applies_reversibly = [] (atomic_cell_or_collection dst, atomic_cell_or_collection src, int64_t value) { + auto original_dst = dst; + + auto applied = counter_cell_view::apply_reversibly(dst, src); + auto applied_dst = dst; + + auto cv = counter_cell_view(dst.as_atomic_cell()); + BOOST_REQUIRE_EQUAL(cv.total_value(), value); + BOOST_REQUIRE_EQUAL(cv.timestamp(), std::max(dst.as_atomic_cell().timestamp(), src.as_atomic_cell().timestamp())); + + if (applied) { + counter_cell_view::revert_apply(dst, src); + } + BOOST_REQUIRE_EQUAL(counter_cell_view(dst.as_atomic_cell()), + counter_cell_view(original_dst.as_atomic_cell())); + + applied = counter_cell_view::apply_reversibly(dst, src); + BOOST_REQUIRE_EQUAL(counter_cell_view(dst.as_atomic_cell()), + counter_cell_view(applied_dst.as_atomic_cell())); + + if (applied) { + counter_cell_view::revert_apply(dst, src); + } + BOOST_REQUIRE_EQUAL(counter_cell_view(dst.as_atomic_cell()), + counter_cell_view(original_dst.as_atomic_cell())); + }; + auto id = generate_ids(5); + + counter_cell_builder b1; + b1.add_shard(counter_shard(id[0], 3, 1)); + b1.add_shard(counter_shard(id[2], 2, 2)); + b1.add_shard(counter_shard(id[4], 1, 3)); + auto c1 = atomic_cell_or_collection(b1.build(1)); + + auto c2 = counter_cell_builder::from_single_shard(2, counter_shard(id[2], 8, 3)); + + verify_applies_reversibly(c1, c2, 12); + verify_applies_reversibly(c2, c1, 12); + + counter_cell_builder b2; + b2.add_shard(counter_shard(id[1], 4, 5)); + b2.add_shard(counter_shard(id[3], 5, 4)); + auto c3 = atomic_cell_or_collection(b2.build(2)); + + verify_applies_reversibly(c1, c3, 15); + verify_applies_reversibly(c3, c1, 15); + + auto c4 = counter_cell_builder::from_single_shard(0, counter_shard(id[2], 8, 1)); + + verify_applies_reversibly(c1, c4, 6); + verify_applies_reversibly(c4, c1, 6); + + counter_cell_builder b3; + b3.add_shard(counter_shard(id[0], 9, 0)); + b3.add_shard(counter_shard(id[2], 12, 3)); + b3.add_shard(counter_shard(id[3], 5, 4)); + auto c5 = atomic_cell_or_collection(b3.build(2)); + + verify_applies_reversibly(c1, c5, 21); + verify_applies_reversibly(c5, c1, 21); + + auto c6 = counter_cell_builder::from_single_shard(3, counter_shard(id[2], 8, 1)); + + verify_applies_reversibly(c1, c6, 6); + verify_applies_reversibly(c6, c1, 6); + }); +} + schema_ptr get_schema() { return schema_builder("ks", "cf") .with_column("pk", int32_type, column_kind::partition_key) @@ -207,11 +277,11 @@ SEASTAR_TEST_CASE(test_counter_update_mutations) { auto ck = clustering_key::from_single_value(*s, int32_type->decompose(0)); auto& col = *s->get_column_definition(utf8_type->decompose(sstring("c1"))); - auto c1 = atomic_cell::make_live_counter_update(api::new_timestamp(), long_type->decompose(int64_t(5))); + auto c1 = atomic_cell::make_live_counter_update(api::new_timestamp(), 5); mutation m1(pk, s); m1.set_clustered_cell(ck, col, c1); - auto c2 = atomic_cell::make_live_counter_update(api::new_timestamp(), long_type->decompose(int64_t(9))); + auto c2 = atomic_cell::make_live_counter_update(api::new_timestamp(), 9); mutation m2(pk, s); m2.set_clustered_cell(ck, col, c2); @@ -219,16 +289,12 @@ SEASTAR_TEST_CASE(test_counter_update_mutations) { mutation m3(pk, s); m3.set_clustered_cell(ck, col, c3); - auto counter_update_value = [&] (atomic_cell_view acv) { - return value_cast(long_type->deserialize_value(acv.value())); - }; - auto m12 = m1; m12.apply(m2); auto ac = get_counter_cell(m12); BOOST_REQUIRE(ac.is_live()); BOOST_REQUIRE(ac.is_counter_update()); - BOOST_REQUIRE_EQUAL(counter_update_value(ac), 14); + BOOST_REQUIRE_EQUAL(ac.counter_update_value(), 14); auto m123 = m12; m123.apply(m3); diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index fa8c1ee76c..101a4af67c 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -45,6 +45,7 @@ #include "tests/mutation_reader_assertions.hh" #include "tests/result_set_assertions.hh" #include "mutation_source_test.hh" +#include "cell_locking.hh" #include "disk-error-handler.hh" @@ -74,11 +75,12 @@ with_column_family(schema_ptr s, column_family::config cfg, Func func) { auto dir = make_lw_shared(); cfg.datadir = { dir->path }; auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, *cl_stats); cf->mark_ready_for_writes(); return func(*cf).then([cf, cm] { return cf->stop(); - }).finally([cf, cm, dir] {}); + }).finally([cf, cm, dir, cl_stats] {}); } SEASTAR_TEST_CASE(test_mutation_is_applied) { diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 212587bb1f..0504f91df6 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -44,6 +44,7 @@ #include "mutation_assertions.hh" #include "mutation_reader_assertions.hh" #include "counters.hh" +#include "cell_locking.hh" #include #include @@ -1017,7 +1018,8 @@ SEASTAR_TEST_CASE(compaction_manager_test) { cfg.datadir = tmp->path; cfg.enable_commitlog = false; cfg.enable_incremental_backups = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, *cl_stats); cf->start(); cf->mark_ready_for_writes(); cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered); @@ -1075,7 +1077,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) { }); }); }); - }).finally([s, cm, tmp] { + }).finally([s, cm, tmp, cl_stats] { return cm->stop().then([cm] {}); }); } @@ -1098,7 +1100,8 @@ SEASTAR_TEST_CASE(compact) { builder.set_gc_grace_seconds(std::numeric_limits::max()); auto s = builder.build(); auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats); cf->mark_ready_for_writes(); return open_sstables(s, "tests/sstables/compaction", {1,2,3}).then([s, cf, cm, generation] (auto sstables) { @@ -1174,7 +1177,7 @@ SEASTAR_TEST_CASE(compact) { }); }); }); - }); + }).finally([cl_stats] { }); // verify that the compacted sstable look like } @@ -1208,7 +1211,8 @@ static future> compact_sstables(std::vector(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats); cf->mark_ready_for_writes(); auto generations = make_lw_shared>(std::move(generations_to_compact)); @@ -1288,7 +1292,7 @@ static future> compact_sstables(std::vector(); - }).then([cf, cm, created] { + }).then([cf, cm, created, cl_stats] { return std::move(*created); }); } @@ -1789,10 +1793,11 @@ SEASTAR_TEST_CASE(leveled_01) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); column_family::config cfg; + cell_locker_stats cl_stats; compaction_manager cm; cfg.enable_disk_writes = false; cfg.enable_commitlog = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); cf->mark_ready_for_writes(); auto key_and_token_pair = token_generation_for_current_shard(50); @@ -1836,10 +1841,11 @@ SEASTAR_TEST_CASE(leveled_02) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); column_family::config cfg; + cell_locker_stats cl_stats; compaction_manager cm; cfg.enable_disk_writes = false; cfg.enable_commitlog = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); cf->mark_ready_for_writes(); auto key_and_token_pair = token_generation_for_current_shard(50); @@ -1893,10 +1899,11 @@ SEASTAR_TEST_CASE(leveled_03) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); column_family::config cfg; + cell_locker_stats cl_stats; compaction_manager cm; cfg.enable_disk_writes = false; cfg.enable_commitlog = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); cf->mark_ready_for_writes(); auto key_and_token_pair = token_generation_for_current_shard(50); @@ -1954,10 +1961,11 @@ SEASTAR_TEST_CASE(leveled_04) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); column_family::config cfg; + cell_locker_stats cl_stats; compaction_manager cm; cfg.enable_disk_writes = false; cfg.enable_commitlog = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); cf->mark_ready_for_writes(); auto key_and_token_pair = token_generation_for_current_shard(50); @@ -2046,10 +2054,11 @@ SEASTAR_TEST_CASE(leveled_06) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); column_family::config cfg; + cell_locker_stats cl_stats; compaction_manager cm; cfg.enable_disk_writes = false; cfg.enable_commitlog = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); cf->mark_ready_for_writes(); auto max_sstable_size_in_mb = 1; @@ -2085,9 +2094,10 @@ SEASTAR_TEST_CASE(leveled_07) { column_family::config cfg; compaction_manager cm; + cell_locker_stats cl_stats; cfg.enable_disk_writes = false; cfg.enable_commitlog = false; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); cf->mark_ready_for_writes(); auto key_and_token_pair = token_generation_for_current_shard(5); @@ -2123,7 +2133,8 @@ SEASTAR_TEST_CASE(check_overlapping) { column_family::config cfg; compaction_manager cm; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); auto key_and_token_pair = token_generation_for_current_shard(4); auto min_key = key_and_token_pair[0].first; @@ -2197,6 +2208,8 @@ static shared_sstable make_sstable_containing(std::function ss SEASTAR_TEST_CASE(tombstone_purge_test) { BOOST_REQUIRE(smp::count == 1); return seastar::async([] { + cell_locker_stats cl_stats; + // In a column family with gc_grace_seconds set to 0, check that a tombstone // is purged after compaction. auto builder = schema_builder("tests", "tombstone_purge") @@ -2210,9 +2223,9 @@ SEASTAR_TEST_CASE(tombstone_purge_test) { return make_lw_shared(s, tmp->path, (*gen)++, la, big); }; - auto compact = [&sst_gen, s] (std::vector all, std::vector to_compact) -> std::vector { + auto compact = [&, s] (std::vector all, std::vector to_compact) -> std::vector { auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, cl_stats); cf->mark_ready_for_writes(); for (auto&& sst : all) { column_family_test(cf).add_sstable(sst); @@ -2399,7 +2412,8 @@ SEASTAR_TEST_CASE(sstable_rewrite) { return sst; }; auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats); cf->mark_ready_for_writes(); std::vector sstables; sstables.push_back(std::move(sstp)); @@ -2418,7 +2432,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) { }).then([reader] (streamed_mutation_opt m) { BOOST_REQUIRE(!m); }); - }).then([cm, cf] {}); + }).then([cm, cf, cl_stats] {}); }).then([sst, mt, s] {}); }); } @@ -2739,7 +2753,8 @@ SEASTAR_TEST_CASE(test_sstable_max_local_deletion_time_2) { auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type)); auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, cl_stats); auto mt = make_lw_shared(s); auto now = gc_clock::now(); int32_t last_expiry = 0; @@ -2793,13 +2808,14 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); compaction_manager cm; column_family::config cfg; + cell_locker_stats cl_stats; auto key_and_token_pair = token_generation_for_current_shard(4); auto min_key = key_and_token_pair[0].first; auto max_key = key_and_token_pair[key_and_token_pair.size()-1].first; { - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); auto sst1 = add_sstable_for_overlapping_test(cf, /*gen*/1, min_key, key_and_token_pair[1].first, build_stats(0, 10, 10)); auto sst2 = add_sstable_for_overlapping_test(cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(0, 10, std::numeric_limits::max())); auto sst3 = add_sstable_for_overlapping_test(cf, /*gen*/3, min_key, max_key, build_stats(20, 25, std::numeric_limits::max())); @@ -2809,7 +2825,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) { } { - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); auto sst1 = add_sstable_for_overlapping_test(cf, /*gen*/1, min_key, key_and_token_pair[1].first, build_stats(0, 10, 10)); auto sst2 = add_sstable_for_overlapping_test(cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(15, 20, std::numeric_limits::max())); auto sst3 = add_sstable_for_overlapping_test(cf, /*gen*/3, min_key, max_key, build_stats(30, 40, std::numeric_limits::max())); @@ -2827,7 +2843,8 @@ SEASTAR_TEST_CASE(basic_date_tiered_strategy_test) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); compaction_manager cm; column_family::config cfg; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); std::vector candidates; int min_threshold = cf->schema()->min_compaction_threshold(); @@ -2863,7 +2880,8 @@ SEASTAR_TEST_CASE(date_tiered_strategy_test_2) { {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); compaction_manager cm; column_family::config cfg; - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); // deterministic timestamp for Fri, 01 Jan 2016 00:00:00 GMT. auto tp = db_clock::from_time_t(1451606400); @@ -3069,7 +3087,8 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test_2) { .with_column("r1", int32_type) .build(); auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, *cl_stats); auto tmp = make_lw_shared(); auto mt = make_lw_shared(s); const column_definition& r1_col = *s->get_column_definition("r1"); @@ -3284,7 +3303,8 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) { auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); auto cm = make_lw_shared(); - auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, column_family::config(), column_family::no_commitlog(), *cm, cl_stats); auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options()); std::vector candidates; diff --git a/tests/sstable_test.cc b/tests/sstable_test.cc index e6ca912d6b..9f668db531 100644 --- a/tests/sstable_test.cc +++ b/tests/sstable_test.cc @@ -38,6 +38,7 @@ #include "tmpdir.hh" #include "partition_slice_builder.hh" #include "tests/test_services.hh" +#include "cell_locking.hh" #include "disk-error-handler.hh" @@ -934,7 +935,8 @@ SEASTAR_TEST_CASE(reshuffle) { cfg.datadir = "tests/sstables/generation"; cfg.enable_commitlog = false; cfg.enable_incremental_backups = false; - auto cf = make_lw_shared(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm); + auto cl_stats = make_lw_shared(); + auto cf = make_lw_shared(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm, *cl_stats); cf->start(); cf->mark_ready_for_writes(); std::set existing_sstables = { 1, 5 }; @@ -952,7 +954,7 @@ SEASTAR_TEST_CASE(reshuffle) { ).discard_result().then([cm] { return cm->stop(); }); - }).then([cm, cf] {}); + }).then([cm, cf, cl_stats] {}); }); }, "tests/sstables/generation"); } diff --git a/thrift/handler.cc b/thrift/handler.cc index e2e2accfc3..a154acdcc2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -1767,7 +1767,7 @@ private: } static void add_live_cell(const schema& s, const CounterColumn& col, const column_definition& def, clustering_key_prefix ckey, mutation& m_to_apply) { //thrift_validation::validate_column(col, def); - auto cell = atomic_cell::make_live_counter_update(api::new_timestamp(), long_type->decompose(col.value)); + auto cell = atomic_cell::make_live_counter_update(api::new_timestamp(), col.value); m_to_apply.set_clustered_cell(std::move(ckey), def, std::move(cell)); } static void add_to_mutation(const schema& s, const CounterColumn& col, mutation& m_to_apply) { diff --git a/to_string.hh b/to_string.hh index ffeb05c176..4cd064f506 100644 --- a/to_string.hh +++ b/to_string.hh @@ -87,6 +87,12 @@ std::ostream& operator<<(std::ostream& os, const std::set& items) { return os; } +template +std::ostream& operator<<(std::ostream& os, const std::array& items) { + os << "{" << join(", ", items) << "}"; + return os; +} + template std::ostream& operator<<(std::ostream& os, const std::experimental::optional& opt) { if (opt) { diff --git a/utils/managed_bytes.hh b/utils/managed_bytes.hh index 5af66233f1..d3297d95dc 100644 --- a/utils/managed_bytes.hh +++ b/utils/managed_bytes.hh @@ -319,6 +319,15 @@ public: return { data(), size() }; } + bool is_fragmented() const { + return external() && _u.ptr->next; + } + + operator bytes_mutable_view() { + assert(!is_fragmented()); + return { data(), size() }; + }; + bytes_view::value_type& operator[](size_type index) { return value_at_index(index); } diff --git a/utils/mutable_view.hh b/utils/mutable_view.hh new file mode 100644 index 0000000000..b4e4424215 --- /dev/null +++ b/utils/mutable_view.hh @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include + +#include "stdx.hh" + +template +class basic_mutable_view { + CharT* _begin = nullptr; + CharT* _end = nullptr; +public: + using value_type = CharT; + using pointer = CharT*; + using iterator = CharT*; + + template + basic_mutable_view(basic_sstring& str) + : _begin(str.begin()) + , _end(str.end()) + { } + + basic_mutable_view(CharT* ptr, size_t length) + : _begin(ptr) + , _end(ptr + length) + { } + + operator stdx::basic_string_view() const noexcept { + return stdx::basic_string_view(begin(), size()); + } + + CharT& operator[](size_t idx) const { return _begin[idx]; } + + iterator begin() const { return _begin; } + iterator end() const { return _end; } + + CharT* data() const { return _begin; } + size_t size() const { return _end - _begin; } + bool empty() const { return _begin == _end; } + + void remove_prefix(size_t n) { + _begin += n; + } + void remove_suffix(size_t n) { + _end -= n; + } +};