From 62a26ef411335bd3138f8b7a68b4e6937cff6494 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 18:13:20 -0500 Subject: [PATCH 01/12] row consumer: don't switch state implicitly Soon enough, all the state machine will be separated from the prestate handling. To make it easier, we will decouple them as much as we can. Not automatically switching states in the read functions is part of this. Signed-off-by: Glauber Costa Reviewed-by: Nadav Har'El --- sstables/row.cc | 51 +++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/sstables/row.cc b/sstables/row.cc index 8fcc378992..1982380047 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -151,7 +151,7 @@ private: // (this is the common case), do this immediately. Otherwise, remember // what we have in the buffer, and remember to continue later by using // a "prestate": - inline void read_16(temporary_buffer& data, state next_state) { + inline void read_16(temporary_buffer& data) { if (data.size() >= sizeof(uint16_t)) { _u16 = consume_be(data); } else { @@ -160,9 +160,8 @@ private: data.trim(0); _prestate = prestate::READING_U16; } - _state = next_state; } - inline void read_32(temporary_buffer& data, state next_state) { + inline void read_32(temporary_buffer& data) { if (data.size() >= sizeof(uint32_t)) { _u32 = consume_be(data); } else { @@ -171,9 +170,8 @@ private: data.trim(0); _prestate = prestate::READING_U32; } - _state = next_state; } - inline void read_64(temporary_buffer& data, state next_state) { + inline void read_64(temporary_buffer& data) { if (data.size() >= sizeof(uint64_t)) { _u64 = consume_be(data); } else { @@ -182,9 +180,8 @@ private: data.trim(0); _prestate = prestate::READING_U64; } - _state = next_state; } - inline void read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where, state next_state) { + inline void read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where) { if (data.size() >= len) { where = data.share(0, len); data.trim_front(len); @@ -197,7 +194,6 @@ private: data.trim(0); _prestate = prestate::READING_BYTES; } - _state = next_state; } public: @@ -285,11 +281,13 @@ public: switch (_state) { case state::ROW_START: // read 2-byte key length into _u16 - read_16(data, state::ROW_KEY_BYTES); + read_16(data); + _state = state::ROW_KEY_BYTES; break; case state::ROW_KEY_BYTES: // After previously reading 16-bit length, read key's bytes. - read_bytes(data, _u16, _key, state::DELETION_TIME); + read_bytes(data, _u16, _key); + _state = state::DELETION_TIME; break; case state::DELETION_TIME: if (data.size() >= sizeof(uint32_t) + sizeof(uint64_t)) { @@ -304,11 +302,13 @@ public: _key.release(); _state = state::ATOM_START; } else { - read_32(data, state::DELETION_TIME_2); + read_32(data); + _state = state::DELETION_TIME_2; } break; case state::DELETION_TIME_2: - read_64(data, state::DELETION_TIME_3); + read_64(data); + _state = state::DELETION_TIME_3; break; case state::DELETION_TIME_3: { deletion_time del; @@ -356,7 +356,8 @@ public: } break; case state::ATOM_NAME_BYTES: - read_bytes(data, _u16, _key, state::ATOM_MASK); + read_bytes(data, _u16, _key); + _state = state::ATOM_MASK; break; case state::ATOM_MASK: { auto mask = consume_be(data); @@ -392,12 +393,14 @@ public: _expiration = consume_be(data); _state = state::CELL; } else { - read_32(data, state::EXPIRING_CELL_2); + read_32(data); + _state = state::EXPIRING_CELL_2; } break; case state::EXPIRING_CELL_2: _ttl = _u32; - read_32(data, state::EXPIRING_CELL_3); + read_32(data); + _state = state::EXPIRING_CELL_3; break; case state::EXPIRING_CELL_3: _expiration = _u32; @@ -409,11 +412,13 @@ public: _u32 = consume_be(data); _state = state::CELL_VALUE_BYTES; } else { - read_64(data, state::CELL_2); + read_64(data); + _state = state::CELL_2; } break; case state::CELL_2: - read_32(data, state::CELL_VALUE_BYTES); + read_32(data); + _state = state::CELL_VALUE_BYTES; break; case state::CELL_VALUE_BYTES: // TODO: use read_bytes(data, _u32, _key, state::ATOM_START), but need to know if it was successful to decide on next state and on running consumer @@ -472,17 +477,21 @@ public: _state = state::ATOM_START; break; case state::RANGE_TOMBSTONE: - read_16(data, state::RANGE_TOMBSTONE_2); + read_16(data); + _state = state::RANGE_TOMBSTONE_2; break; case state::RANGE_TOMBSTONE_2: // read the end column into _val. - read_bytes(data, _u16, _val, state::RANGE_TOMBSTONE_3); + read_bytes(data, _u16, _val); + _state = state::RANGE_TOMBSTONE_3; break; case state::RANGE_TOMBSTONE_3: - read_32(data, state::RANGE_TOMBSTONE_4); + read_32(data); + _state = state::RANGE_TOMBSTONE_4; break; case state::RANGE_TOMBSTONE_4: - read_64(data, state::RANGE_TOMBSTONE_5); + read_64(data); + _state = state::RANGE_TOMBSTONE_5; break; case state::RANGE_TOMBSTONE_5: { From 13af0ffbd25bff33e2e1406665329e0e50c36aca Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 18:21:07 -0500 Subject: [PATCH 02/12] row consumer: fix read_bytes temporary len It shouldn't be _u16, but rather whatever we passed as len. It currently works because all callers pass _u16 as len. But this will soon change. Signed-off-by: Glauber Costa Reviewed-by: Nadav Har'El --- sstables/row.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sstables/row.cc b/sstables/row.cc index 1982380047..43e9128dc2 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -187,7 +187,7 @@ private: data.trim_front(len); } else { // copy what we have so far, read the rest later - _read_bytes = temporary_buffer(_u16); + _read_bytes = temporary_buffer(len); std::copy(data.begin(), data.end(),_read_bytes.get_write()); _read_bytes_where = &where; _pos = data.size(); From 0ad8afb0eca5d0e175ab3e50c70720f387710782 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 18:25:44 -0500 Subject: [PATCH 03/12] row consumer: extend usage of the read_* functions In some places, we cannot use our read_* functions, because we don't know whether or not it succeeded, and that is important when passing the state along. The fix for this is trivial, since we can just return it from the reader. Note for reviewers: The commend in one of the functions say we should use: "read_bytes(data, _u32, _key ...". But in the actual code, the where buffer is _val, not _key. Signed-off-by: Glauber Costa --- sstables/row.cc | 41 +++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/sstables/row.cc b/sstables/row.cc index 43e9128dc2..d182784c83 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -147,44 +147,52 @@ public: } private: + enum class read_status { ready, waiting }; // Read a 16-bit integer into _u16. If the whole thing is in the buffer // (this is the common case), do this immediately. Otherwise, remember // what we have in the buffer, and remember to continue later by using // a "prestate": - inline void read_16(temporary_buffer& data) { + inline read_status read_16(temporary_buffer& data) { if (data.size() >= sizeof(uint16_t)) { _u16 = consume_be(data); + return read_status::ready; } else { std::copy(data.begin(), data.end(), _read_int.bytes); _pos = data.size(); data.trim(0); _prestate = prestate::READING_U16; + return read_status::waiting; } } - inline void read_32(temporary_buffer& data) { + inline read_status read_32(temporary_buffer& data) { if (data.size() >= sizeof(uint32_t)) { _u32 = consume_be(data); + return read_status::ready; } else { std::copy(data.begin(), data.end(), _read_int.bytes); _pos = data.size(); data.trim(0); _prestate = prestate::READING_U32; + return read_status::waiting; } } - inline void read_64(temporary_buffer& data) { + inline read_status read_64(temporary_buffer& data) { if (data.size() >= sizeof(uint64_t)) { _u64 = consume_be(data); + return read_status::ready; } else { std::copy(data.begin(), data.end(), _read_int.bytes); _pos = data.size(); data.trim(0); _prestate = prestate::READING_U64; + return read_status::waiting; } } - inline void read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where) { + inline read_status read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where) { if (data.size() >= len) { where = data.share(0, len); data.trim_front(len); + return read_status::ready; } else { // copy what we have so far, read the rest later _read_bytes = temporary_buffer(len); @@ -193,6 +201,7 @@ private: _pos = data.size(); data.trim(0); _prestate = prestate::READING_BYTES; + return read_status::waiting; } } @@ -322,9 +331,7 @@ public: break; } case state::ATOM_START: - // TODO: use read_16() here too. have read_16 return true if read now. - if (data.size() >= sizeof(uint16_t)) { - _u16 = consume_be(data); + if (read_16(data) == read_status::ready) { if (_u16 == 0) { // end of row marker _state = state::ROW_START; @@ -336,10 +343,6 @@ public: _state = state::ATOM_NAME_BYTES; } } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U16; _state = state::ATOM_START_2; } break; @@ -421,12 +424,10 @@ public: _state = state::CELL_VALUE_BYTES; break; case state::CELL_VALUE_BYTES: - // TODO: use read_bytes(data, _u32, _key, state::ATOM_START), but need to know if it was successful to decide on next state and on running consumer - if (data.size() >= _u32) { + if (read_bytes(data, _u32, _val) == read_status::ready) { // If the whole string is in our buffer, great, we don't - // need to copy, and can skip the CELL_VALUE_BYTES_2 state - _val = data.share(0, _u32); - data.trim_front(_u32); + // need to copy, and can skip the CELL_VALUE_BYTES_2 state. + // // finally pass it to the consumer: if (_deleted) { if (_val.size() != 4) { @@ -446,14 +447,6 @@ public: _val.release(); _state = state::ATOM_START; } else { - // copy what we have so far, read the rest later - _read_bytes = temporary_buffer(_u32); - std::copy(data.begin(), data.end(), - _read_bytes.get_write()); - _read_bytes_where = &_val; - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_BYTES; _state = state::CELL_VALUE_BYTES_2; } break; From 1f930cda4af999e3263687ef3e3a5da37ff14bd0 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 18:32:06 -0500 Subject: [PATCH 04/12] row consumer: extend use of read for multi-value fields In an attempt to gain some cycles, we are testing whether we can read many values at once, and if so, using consume_be directly for those. What we can do in this situation, is read the first value, and let the read fall through the next case if the read succeeds. The code actually looks a lot more elegant this way. Signed-off-by: Glauber Costa --- sstables/row.cc | 57 ++++++++++++++++++++----------------------------- 1 file changed, 23 insertions(+), 34 deletions(-) diff --git a/sstables/row.cc b/sstables/row.cc index d182784c83..eb75e3ebe8 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -299,26 +299,17 @@ public: _state = state::DELETION_TIME; break; case state::DELETION_TIME: - if (data.size() >= sizeof(uint32_t) + sizeof(uint64_t)) { - // If we can read the entire deletion time at once, we can - // skip the DELETION_TIME_2 and DELETION_TIME_3 states. - deletion_time del; - del.local_deletion_time = consume_be(data); - del.marked_for_delete_at = consume_be(data); - _consumer.consume_row_start(to_bytes_view(_key), del); - // after calling the consume function, we can release the - // buffers we held for it. - _key.release(); - _state = state::ATOM_START; - } else { - read_32(data); + if (read_32(data) != read_status::ready) { _state = state::DELETION_TIME_2; + break; } - break; + // fallthrough case state::DELETION_TIME_2: - read_64(data); - _state = state::DELETION_TIME_3; - break; + if (read_64(data) != read_status::ready) { + _state = state::DELETION_TIME_3; + break; + } + // fallthrough case state::DELETION_TIME_3: { deletion_time del; del.local_deletion_time = _u32; @@ -391,34 +382,32 @@ public: break; } case state::EXPIRING_CELL: - if (data.size() >= sizeof(uint32_t) + sizeof(uint32_t)) { - _ttl = consume_be(data); - _expiration = consume_be(data); - _state = state::CELL; - } else { - read_32(data); + if (read_32(data) != read_status::ready) { _state = state::EXPIRING_CELL_2; + break; } - break; + // fallthrough case state::EXPIRING_CELL_2: _ttl = _u32; - read_32(data); - _state = state::EXPIRING_CELL_3; - break; + if (read_32(data) != read_status::ready) { + _state = state::EXPIRING_CELL_3; + break; + } + // fallthrough case state::EXPIRING_CELL_3: _expiration = _u32; _state = state::CELL; break; - case state::CELL: - if (data.size() >= sizeof(uint64_t) + sizeof(uint32_t)) { - _u64 = consume_be(data); - _u32 = consume_be(data); + case state::CELL: { + auto status = read_64(data); + _state = state::CELL_2; + // Try to read both values in the same loop if possible + if (status == read_status::ready) { + read_32(data); _state = state::CELL_VALUE_BYTES; - } else { - read_64(data); - _state = state::CELL_2; } break; + } case state::CELL_2: read_32(data); _state = state::CELL_VALUE_BYTES; From 49ac04a60ad347bca9610151083d4852452d9530 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 27 Aug 2015 10:42:06 -0500 Subject: [PATCH 05/12] row consumer: fall through more often Because we didn't had before a way to know whether or not the read completed, we would always go back to the main loop, and would only optimize sequential reads for some kinds of data. However, As one could see in the previous patch, the new read_X functions will notify completion, allowing us to just fallthrough to the next case if that is the only possibility. In most cases, it isn't. With this, we can apply this optimization throughout all cases where we don't branch states, and with a very elegant resulting code. The performance actually increases by 0.75 %. It is not much, but it is more than the error margin (which sits at 0.20 %), and because the code is not made unreadable by it, this is a clear win to me. Signed-off-by: Glauber Costa --- sstables/row.cc | 68 +++++++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/sstables/row.cc b/sstables/row.cc index eb75e3ebe8..74c68ba7fe 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -290,14 +290,16 @@ public: switch (_state) { case state::ROW_START: // read 2-byte key length into _u16 - read_16(data); - _state = state::ROW_KEY_BYTES; - break; + if (read_16(data) != read_status::ready) { + _state = state::ROW_KEY_BYTES; + break; + } case state::ROW_KEY_BYTES: // After previously reading 16-bit length, read key's bytes. - read_bytes(data, _u16, _key); - _state = state::DELETION_TIME; - break; + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::DELETION_TIME; + break; + } case state::DELETION_TIME: if (read_32(data) != read_status::ready) { _state = state::DELETION_TIME_2; @@ -319,7 +321,6 @@ public: // buffers we held for it. _key.release(); _state = state::ATOM_START; - break; } case state::ATOM_START: if (read_16(data) == read_status::ready) { @@ -350,9 +351,10 @@ public: } break; case state::ATOM_NAME_BYTES: - read_bytes(data, _u16, _key); - _state = state::ATOM_MASK; - break; + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::ATOM_MASK; + break; + } case state::ATOM_MASK: { auto mask = consume_be(data); enum mask_type { @@ -397,21 +399,17 @@ public: case state::EXPIRING_CELL_3: _expiration = _u32; _state = state::CELL; - break; case state::CELL: { - auto status = read_64(data); - _state = state::CELL_2; - // Try to read both values in the same loop if possible - if (status == read_status::ready) { - read_32(data); - _state = state::CELL_VALUE_BYTES; + if (read_64(data) != read_status::ready) { + _state = state::CELL_2; + break; } - break; } case state::CELL_2: - read_32(data); - _state = state::CELL_VALUE_BYTES; - break; + if (read_32(data) != read_status::ready) { + _state = state::CELL_VALUE_BYTES; + break; + } case state::CELL_VALUE_BYTES: if (read_bytes(data, _u32, _val) == read_status::ready) { // If the whole string is in our buffer, great, we don't @@ -459,22 +457,26 @@ public: _state = state::ATOM_START; break; case state::RANGE_TOMBSTONE: - read_16(data); - _state = state::RANGE_TOMBSTONE_2; - break; + if (read_16(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_2; + break; + } case state::RANGE_TOMBSTONE_2: // read the end column into _val. - read_bytes(data, _u16, _val); - _state = state::RANGE_TOMBSTONE_3; - break; + if (read_bytes(data, _u16, _val) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_3; + break; + } case state::RANGE_TOMBSTONE_3: - read_32(data); - _state = state::RANGE_TOMBSTONE_4; - break; + if (read_32(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_4; + break; + } case state::RANGE_TOMBSTONE_4: - read_64(data); - _state = state::RANGE_TOMBSTONE_5; - break; + if (read_64(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_5; + break; + } case state::RANGE_TOMBSTONE_5: { deletion_time del; From f45b807f34ab3b15483c2607b1085d4aab3d0815 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 19:09:46 -0500 Subject: [PATCH 06/12] row consumer: move proceed class to a separate class Continuing the work of decoupling the the prestate and state parts of the NSM so we can reuse it, move the proceed class to a different holding class. Proceeding or not has nothing to do with "rows". Signed-off-by: Glauber Costa Reviewed-by: Nadav Har'El --- sstables/consumer.hh | 10 ++++++++++ sstables/row.cc | 1 + sstables/row.hh | 3 ++- 3 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 sstables/consumer.hh diff --git a/sstables/consumer.hh b/sstables/consumer.hh new file mode 100644 index 0000000000..b8044cb38b --- /dev/null +++ b/sstables/consumer.hh @@ -0,0 +1,10 @@ +/* +* Copyright (C) 2015 Cloudius Systems, Ltd. +* +*/ + +#pragma once + +namespace data_consumer { +enum class proceed { yes, no }; +} diff --git a/sstables/row.cc b/sstables/row.cc index 74c68ba7fe..d1269b1a0a 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -3,6 +3,7 @@ */ #include "sstables.hh" +#include "consumer.hh" template static inline T consume_be(temporary_buffer& p) { diff --git a/sstables/row.hh b/sstables/row.hh index 74865e1cf0..df45671948 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -8,6 +8,7 @@ #include "bytes.hh" #include "key.hh" #include "core/temporary_buffer.hh" +#include "consumer.hh" // sstables::data_consume_row feeds the contents of a single row into a // row_consumer object: @@ -28,7 +29,7 @@ // is called.] class row_consumer { public: - enum class proceed { yes, no }; + using proceed = data_consumer::proceed; // Consume the row's key and deletion_time. The latter determines if the // row is a tombstone, and if so, when it has been deleted. From e1945e473bd2f4dbed63de2e8a13ceac66ee5488 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 19:18:25 -0500 Subject: [PATCH 07/12] row consumer: make non_consuming an instance member It is now a static member that gets the instance members as parameters. There is no reason for that, and this will complicate the decoupling, since the prestate reader won't know about state. Signed-off-by: Glauber Costa Reviewed-by: Nadav Har'El --- sstables/row.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sstables/row.cc b/sstables/row.cc index d1269b1a0a..e2ad57e08c 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -58,11 +58,11 @@ private: // action when finishing to read a primitive type via a prestate, in // the rare case that a primitive type crossed a buffer). Such // non-consuming states need to run even if the data buffer is empty. - static bool non_consuming(state s, prestate ps) { - return (((s == state::DELETION_TIME_3) - || (s == state::CELL_VALUE_BYTES_2) - || (s == state::ATOM_START_2) - || (s == state::EXPIRING_CELL_3)) && (ps == prestate::NONE)); + bool non_consuming() const { + return (((_state == state::DELETION_TIME_3) + || (_state == state::CELL_VALUE_BYTES_2) + || (_state == state::ATOM_START_2) + || (_state == state::EXPIRING_CELL_3)) && (_prestate == prestate::NONE)); } // state for non-NONE prestates uint32_t _pos; @@ -230,7 +230,7 @@ public: return row_consumer::proceed::yes; } #endif - while (data || non_consuming(_state, _prestate)) { + while (data || non_consuming()) { if (_prestate != prestate::NONE) { // We're in the middle of reading a basic type, which crossed // an input buffer. Resume that read before continuing to From fbd68c3b01a565144145e3bfd4578658e14b23b6 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 11 Aug 2015 19:22:27 -0500 Subject: [PATCH 08/12] row consumer: move consume_be to consumer.hh It will be reused by the continuous_data_consumer Signed-off-by: Glauber Costa --- sstables/consumer.hh | 7 +++++++ sstables/row.cc | 7 ------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sstables/consumer.hh b/sstables/consumer.hh index b8044cb38b..aa18846216 100644 --- a/sstables/consumer.hh +++ b/sstables/consumer.hh @@ -5,6 +5,13 @@ #pragma once +template +static inline T consume_be(temporary_buffer& p) { + T i = net::ntoh(*unaligned_cast(p.get())); + p.trim_front(sizeof(T)); + return i; +} + namespace data_consumer { enum class proceed { yes, no }; } diff --git a/sstables/row.cc b/sstables/row.cc index e2ad57e08c..876b539bd8 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -5,13 +5,6 @@ #include "sstables.hh" #include "consumer.hh" -template -static inline T consume_be(temporary_buffer& p) { - T i = net::ntoh(*unaligned_cast(p.get())); - p.trim_front(sizeof(T)); - return i; -} - namespace sstables { // data_consume_rows_context remembers the context that an ongoing From d9b7f4bde3d1252fa2c1338285cbd5aa69668ece Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 27 Aug 2015 11:06:45 -0500 Subject: [PATCH 09/12] row consumer: separate processing of buffers from the main loop In my previous attempt, I have separated the state processor for the main loop, leaving that to be filled by a derived class. That felt a lot more natural, because then we don't have to replicate the loop logic in the derived classes. But well, oh, well, life is hard. Specially on fast paths. Doing that makes us insert an extra call in this loop, and that is noticeable: we would be 1.5 % slower, and that is not even counting the cost of making the state processing a virtual function later on. I could just argue that this is acceptable due to decoupling gains, but why I would argue that, if I can just rewrite it in a way that no performance is lost? And then I did. The disadvantage of this, is that the derived class will now have to re-code the loop, but no performance is lost. Another advantage of this, is that the derived class will now be able to call into process_buffer directly, without using virtual functions in this path for any of them. Signed-off-by: Glauber Costa --- sstables/row.cc | 121 ++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/sstables/row.cc b/sstables/row.cc index 876b539bd8..3ad9a5f452 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -199,6 +199,70 @@ private: } } + // This is separated so that the compiler can inline "process_buffer". Because this chunk is too big, + // it usually won't if this is part of the main function + void do_process_buffer(temporary_buffer& data) { + // We're in the middle of reading a basic type, which crossed + // an input buffer. Resume that read before continuing to + // handle the current state: + if (_prestate == prestate::READING_BYTES) { + auto n = std::min(_read_bytes.size() - _pos, data.size()); + std::copy(data.begin(), data.begin() + n, + _read_bytes.get_write() + _pos); + data.trim_front(n); + _pos += n; + if (_pos == _read_bytes.size()) { + *_read_bytes_where = std::move(_read_bytes); + _prestate = prestate::NONE; + } + } else { + // in the middle of reading an integer + unsigned len; + switch (_prestate) { + case prestate::READING_U16: + len = sizeof(uint16_t); + break; + case prestate::READING_U32: + len = sizeof(uint32_t); + break; + case prestate::READING_U64: + len = sizeof(uint64_t); + break; + default: + throw malformed_sstable_exception("unknown prestate"); + } + assert(_pos < len); + auto n = std::min((size_t)(len - _pos), data.size()); + std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); + data.trim_front(n); + _pos += n; + if (_pos == len) { + // done reading the integer, store it in _u16, _u32 or _u64: + switch (_prestate) { + case prestate::READING_U16: + _u16 = net::ntoh(_read_int.uint16); + break; + case prestate::READING_U32: + _u32 = net::ntoh(_read_int.uint32); + break; + case prestate::READING_U64: + _u64 = net::ntoh(_read_int.uint64); + break; + default: + throw malformed_sstable_exception( + "unknown prestate"); + } + _prestate = prestate::NONE; + } + } + } + +protected: + inline void process_buffer(temporary_buffer& data) { + while (__builtin_expect((_prestate != prestate::NONE), 0)) { + do_process_buffer(data); + } + } public: // process() feeds the given data into the state machine. // The consumer may request at any point (e.g., after reading a whole @@ -224,62 +288,7 @@ public: } #endif while (data || non_consuming()) { - if (_prestate != prestate::NONE) { - // We're in the middle of reading a basic type, which crossed - // an input buffer. Resume that read before continuing to - // handle the current state: - if (_prestate == prestate::READING_BYTES) { - auto n = std::min(_read_bytes.size() - _pos, data.size()); - std::copy(data.begin(), data.begin() + n, - _read_bytes.get_write() + _pos); - data.trim_front(n); - _pos += n; - if (_pos == _read_bytes.size()) { - *_read_bytes_where = std::move(_read_bytes); - _prestate = prestate::NONE; - } - } else { - // in the middle of reading an integer - unsigned len; - switch (_prestate) { - case prestate::READING_U16: - len = sizeof(uint16_t); - break; - case prestate::READING_U32: - len = sizeof(uint32_t); - break; - case prestate::READING_U64: - len = sizeof(uint64_t); - break; - default: - throw malformed_sstable_exception("unknown prestate"); - } - assert(_pos < len); - auto n = std::min((size_t)(len - _pos), data.size()); - std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); - data.trim_front(n); - _pos += n; - if (_pos == len) { - // done reading the integer, store it in _u16, _u32 or _u64: - switch (_prestate) { - case prestate::READING_U16: - _u16 = net::ntoh(_read_int.uint16); - break; - case prestate::READING_U32: - _u32 = net::ntoh(_read_int.uint32); - break; - case prestate::READING_U64: - _u64 = net::ntoh(_read_int.uint64); - break; - default: - throw malformed_sstable_exception( - "unknown prestate"); - } - _prestate = prestate::NONE; - } - } - continue; - } + process_buffer(data); switch (_state) { case state::ROW_START: From f8d35ef5ecd7e36c47fefc24e9ab68eb31261868 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 27 Aug 2015 11:36:09 -0500 Subject: [PATCH 10/12] sstables: move exception to its own file. I am moving the malformed exception here, to avoid circular dependencies. But since the file now exists, let's move them all. Signed-off-by: Glauber Costa --- sstables/exceptions.hh | 22 ++++++++++++++++++++++ sstables/sstables.cc | 6 ------ sstables/sstables.hh | 10 +--------- 3 files changed, 23 insertions(+), 15 deletions(-) create mode 100644 sstables/exceptions.hh diff --git a/sstables/exceptions.hh b/sstables/exceptions.hh new file mode 100644 index 0000000000..ea102a4897 --- /dev/null +++ b/sstables/exceptions.hh @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + * + */ + +#pragma once +namespace sstables { +class malformed_sstable_exception : public std::exception { + sstring _msg; +public: + malformed_sstable_exception(sstring s) : _msg(s) {} + const char *what() const noexcept { + return _msg.c_str(); + } +}; + +struct bufsize_mismatch_exception : malformed_sstable_exception { + bufsize_mismatch_exception(size_t size, size_t expected) : + malformed_sstable_exception(sprint("Buffer improperly sized to hold requested data. Got: %ld. Expected: %ld", size, expected)) + {} +}; +} diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 59a33c0f21..5625b7334a 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -110,12 +110,6 @@ static typename Map::key_type reverse_map(const typename Map::mapped_type& value throw std::out_of_range("unable to reverse map"); } -struct bufsize_mismatch_exception : malformed_sstable_exception { - bufsize_mismatch_exception(size_t size, size_t expected) : - malformed_sstable_exception(sprint("Buffer improperly sized to hold requested data. Got: %ld. Expected: %ld", size, expected)) - {} -}; - // This should be used every time we use read_exactly directly. // // read_exactly is a lot more convenient of an interface to use, because we'll diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 69e53068cd..5a5fa35674 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -27,6 +27,7 @@ #include "writer.hh" #include "metadata_collector.hh" #include "filter.hh" +#include "exceptions.hh" namespace sstables { @@ -89,15 +90,6 @@ public: class key; -class malformed_sstable_exception : public std::exception { - sstring _msg; -public: - malformed_sstable_exception(sstring s) : _msg(s) {} - const char *what() const noexcept { - return _msg.c_str(); - } -}; - using index_list = std::vector; class sstable { From 4b174c754d8b44f6aad777bf1171721b2244b2b1 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 27 Aug 2015 12:04:58 -0500 Subject: [PATCH 11/12] commonize the NSM In order to reuse the NSM in other scenarios, we need to push as much code as possible into a common class. This patch does that, making the continuous_data_consumer class now the main placeholder for the NSM class. The actual readers will have to inherit from it. However, despite using inheritance, I am not using virtual functions at all instead, we let the continuous_data_consumer receive an instance of the derived class, and then it can safely call its methods without paying the cost of virtual functions. In other attempt, I had kept the main process() function in the derived class, that had the responsibility of then coding the loop. With the use of the new pattern, we can keep the loop logic in the base class, which is a lot cleaner. There is a performance penalty associated with it, but it is fairly small: 0.5 % in the sequential_read perf_sstable test. I think we can live with it. Signed-off-by: Glauber Costa --- sstables/consumer.hh | 234 ++++++++++++++++++ sstables/row.cc | 567 ++++++++++++++----------------------------- 2 files changed, 419 insertions(+), 382 deletions(-) diff --git a/sstables/consumer.hh b/sstables/consumer.hh index aa18846216..89bc3c7c9e 100644 --- a/sstables/consumer.hh +++ b/sstables/consumer.hh @@ -5,6 +5,11 @@ #pragma once +#include +#include "core/future.hh" +#include "core/iostream.hh" +#include "sstables/exceptions.hh" + template static inline T consume_be(temporary_buffer& p) { T i = net::ntoh(*unaligned_cast(p.get())); @@ -14,4 +19,233 @@ static inline T consume_be(temporary_buffer& p) { namespace data_consumer { enum class proceed { yes, no }; + +template +class continuous_data_consumer { + using proceed = data_consumer::proceed; + StateProcessor& _state_processor; +protected: + input_stream _input; + // remaining length of input to read (if <0, continue until end of file). + int64_t _remain; + + // state machine progress: + enum class prestate { + NONE, + READING_U16, + READING_U32, + READING_U64, + READING_BYTES, + } _prestate = prestate::NONE; + + // state for non-NONE prestates + uint32_t _pos; + // state for READING_U16, READING_U32, READING_U64 prestate + uint16_t _u16; + uint32_t _u32; + uint64_t _u64; + union { + char bytes[sizeof(uint64_t)]; + uint64_t uint64; + uint32_t uint32; + uint16_t uint16; + } _read_int; + // state for READING_BYTES prestate + temporary_buffer _read_bytes; + temporary_buffer* _read_bytes_where; // which temporary_buffer to set, _key or _val? + + enum class read_status { ready, waiting }; + // Read a 16-bit integer into _u16. If the whole thing is in the buffer + // (this is the common case), do this immediately. Otherwise, remember + // what we have in the buffer, and remember to continue later by using + // a "prestate": + inline read_status read_16(temporary_buffer& data) { + if (data.size() >= sizeof(uint16_t)) { + _u16 = consume_be(data); + return read_status::ready; + } else { + std::copy(data.begin(), data.end(), _read_int.bytes); + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_U16; + return read_status::waiting; + } + } + inline read_status read_32(temporary_buffer& data) { + if (data.size() >= sizeof(uint32_t)) { + _u32 = consume_be(data); + return read_status::ready; + } else { + std::copy(data.begin(), data.end(), _read_int.bytes); + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_U32; + return read_status::waiting; + } + } + inline read_status read_64(temporary_buffer& data) { + if (data.size() >= sizeof(uint64_t)) { + _u64 = consume_be(data); + return read_status::ready; + } else { + std::copy(data.begin(), data.end(), _read_int.bytes); + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_U64; + return read_status::waiting; + } + } + inline read_status read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where) { + if (data.size() >= len) { + where = data.share(0, len); + data.trim_front(len); + return read_status::ready; + } else { + // copy what we have so far, read the rest later + _read_bytes = temporary_buffer(len); + std::copy(data.begin(), data.end(),_read_bytes.get_write()); + _read_bytes_where = &where; + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_BYTES; + return read_status::waiting; + } + } + + inline void process_buffer(temporary_buffer& data) { + while (__builtin_expect((_prestate != prestate::NONE), 0)) { + do_process_buffer(data); + } + } +private: + // This is separated so that the compiler can inline "process_buffer". Because this chunk is too big, + // it usually won't if this is part of the main function + void do_process_buffer(temporary_buffer& data) { + // We're in the middle of reading a basic type, which crossed + // an input buffer. Resume that read before continuing to + // handle the current state: + if (_prestate == prestate::READING_BYTES) { + auto n = std::min(_read_bytes.size() - _pos, data.size()); + std::copy(data.begin(), data.begin() + n, + _read_bytes.get_write() + _pos); + data.trim_front(n); + _pos += n; + if (_pos == _read_bytes.size()) { + *_read_bytes_where = std::move(_read_bytes); + _prestate = prestate::NONE; + } + } else { + // in the middle of reading an integer + unsigned len; + switch (_prestate) { + case prestate::READING_U16: + len = sizeof(uint16_t); + break; + case prestate::READING_U32: + len = sizeof(uint32_t); + break; + case prestate::READING_U64: + len = sizeof(uint64_t); + break; + default: + throw sstables::malformed_sstable_exception("unknown prestate"); + } + assert(_pos < len); + auto n = std::min((size_t)(len - _pos), data.size()); + std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); + data.trim_front(n); + _pos += n; + if (_pos == len) { + // done reading the integer, store it in _u16, _u32 or _u64: + switch (_prestate) { + case prestate::READING_U16: + _u16 = net::ntoh(_read_int.uint16); + break; + case prestate::READING_U32: + _u32 = net::ntoh(_read_int.uint32); + break; + case prestate::READING_U64: + _u64 = net::ntoh(_read_int.uint64); + break; + default: + throw sstables::malformed_sstable_exception( + "unknown prestate"); + } + _prestate = prestate::NONE; + } + } + } + + void verify_end_state() { + _state_processor.verify_end_state(); + } +public: + continuous_data_consumer(StateProcessor& state_processor, input_stream&& input, uint64_t maxlen) + : _state_processor(state_processor), _input(std::move(input)), _remain(maxlen) {} + + + continuous_data_consumer(continuous_data_consumer&&) = delete; + continuous_data_consumer(const continuous_data_consumer&) = default; + + template + future<> consume_input(Consumer& c) { + return _input.consume(c); + } + + // some states do not consume input (its only exists to perform some + // action when finishing to read a primitive type via a prestate, in + // the rare case that a primitive type crossed a buffer). Such + // non-consuming states need to run even if the data buffer is empty. + bool non_consuming() { + return _state_processor.non_consuming(); + } + + inline proceed process(temporary_buffer& data) { + while (data || non_consuming()) { + process_buffer(data); + auto ret = _state_processor.process_state(data); + if (__builtin_expect(ret == proceed::no, 0)) { + return ret; + } + } + return proceed::yes; + } + + using unconsumed_remainder = input_stream::unconsumed_remainder; + // called by input_stream::consume(): + future + operator()(temporary_buffer data) { + if (_remain >= 0 && data.size() >= (uint64_t)_remain) { + // We received more data than we actually care about, so process + // the beginning of the buffer, and return the rest to the stream + auto segment = data.share(0, _remain); + process(segment); + data.trim_front(_remain - segment.size()); + _remain -= (_remain - segment.size()); + if (_remain == 0) { + verify_end_state(); + } + return make_ready_future(std::move(data)); + } else if (data.empty()) { + // End of file + verify_end_state(); + return make_ready_future(std::move(data)); + } else { + // We can process the entire buffer (if the consumer wants to). + auto orig_data_size = data.size(); + if (process(data) == proceed::yes) { + assert(data.size() == 0); + if (_remain >= 0) { + _remain -= orig_data_size; + } + return make_ready_future(); + } else { + if (_remain >= 0) { + _remain -= orig_data_size - data.size(); + } + return make_ready_future(std::move(data)); + } + } + } +}; } diff --git a/sstables/row.cc b/sstables/row.cc index 3ad9a5f452..7424182d7c 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -9,21 +9,8 @@ namespace sstables { // data_consume_rows_context remembers the context that an ongoing // data_consume_rows() future is in. -class data_consume_rows_context { +class data_consume_rows_context : public data_consumer::continuous_data_consumer { private: - row_consumer& _consumer; - input_stream _input; - // remaining length of input to read (if <0, continue until end of file). - int64_t _remain; - - // state machine progress: - enum class prestate { - NONE, - READING_U16, - READING_U32, - READING_U64, - READING_BYTES, - } _prestate = prestate::NONE; enum class state { ROW_START, ROW_KEY_BYTES, @@ -47,31 +34,8 @@ private: RANGE_TOMBSTONE_4, RANGE_TOMBSTONE_5, } _state = state::ROW_START; - // some states do not consume input (its only exists to perform some - // action when finishing to read a primitive type via a prestate, in - // the rare case that a primitive type crossed a buffer). Such - // non-consuming states need to run even if the data buffer is empty. - bool non_consuming() const { - return (((_state == state::DELETION_TIME_3) - || (_state == state::CELL_VALUE_BYTES_2) - || (_state == state::ATOM_START_2) - || (_state == state::EXPIRING_CELL_3)) && (_prestate == prestate::NONE)); - } - // state for non-NONE prestates - uint32_t _pos; - // state for READING_U16, READING_U32, READING_U64 prestate - uint16_t _u16; - uint32_t _u32; - uint64_t _u64; - union { - char bytes[sizeof(uint64_t)]; - uint64_t uint64; - uint32_t uint32; - uint16_t uint16; - } _read_int; - // state for READING_BYTES prestate - temporary_buffer _read_bytes; - temporary_buffer* _read_bytes_where; // which temporary_buffer to set, _key or _val? + + row_consumer& _consumer; temporary_buffer _key; temporary_buffer _val; @@ -88,188 +52,19 @@ private: } public: - data_consume_rows_context(row_consumer& consumer, - input_stream && input, uint64_t maxlen) : - _consumer(consumer), _input(std::move(input)), _remain(maxlen) { - } - template - future<> consume_input(Consumer& c) { - return _input.consume(c); + bool non_consuming() const { + return (((_state == state::DELETION_TIME_3) + || (_state == state::CELL_VALUE_BYTES_2) + || (_state == state::ATOM_START_2) + || (_state == state::EXPIRING_CELL_3)) && (_prestate == prestate::NONE)); } - void verify_end_state() { - if (_state != state::ROW_START || _prestate != prestate::NONE) { - throw malformed_sstable_exception("end of input, but not end of row"); - } - } - - using unconsumed_remainder = input_stream::unconsumed_remainder; - // called by input_stream::consume(): - future - operator()(temporary_buffer data) { - if (_remain >= 0 && data.size() >= (uint64_t)_remain) { - // We received more data than we actually care about, so process - // the beginning of the buffer, and return the rest to the stream - auto segment = data.share(0, _remain); - process(segment); - data.trim_front(_remain - segment.size()); - _remain -= (_remain - segment.size()); - if (_remain == 0) { - verify_end_state(); - } - return make_ready_future(std::move(data)); - } else if (data.empty()) { - // End of file - verify_end_state(); - return make_ready_future(std::move(data)); - } else { - // We can process the entire buffer (if the consumer wants to). - auto orig_data_size = data.size(); - if (process(data) == row_consumer::proceed::yes) { - assert(data.size() == 0); - if (_remain >= 0) { - _remain -= orig_data_size; - } - return make_ready_future(); - } else { - if (_remain >= 0) { - _remain -= orig_data_size - data.size(); - } - return make_ready_future(std::move(data)); - } - } - } - -private: - enum class read_status { ready, waiting }; - // Read a 16-bit integer into _u16. If the whole thing is in the buffer - // (this is the common case), do this immediately. Otherwise, remember - // what we have in the buffer, and remember to continue later by using - // a "prestate": - inline read_status read_16(temporary_buffer& data) { - if (data.size() >= sizeof(uint16_t)) { - _u16 = consume_be(data); - return read_status::ready; - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U16; - return read_status::waiting; - } - } - inline read_status read_32(temporary_buffer& data) { - if (data.size() >= sizeof(uint32_t)) { - _u32 = consume_be(data); - return read_status::ready; - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U32; - return read_status::waiting; - } - } - inline read_status read_64(temporary_buffer& data) { - if (data.size() >= sizeof(uint64_t)) { - _u64 = consume_be(data); - return read_status::ready; - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U64; - return read_status::waiting; - } - } - inline read_status read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where) { - if (data.size() >= len) { - where = data.share(0, len); - data.trim_front(len); - return read_status::ready; - } else { - // copy what we have so far, read the rest later - _read_bytes = temporary_buffer(len); - std::copy(data.begin(), data.end(),_read_bytes.get_write()); - _read_bytes_where = &where; - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_BYTES; - return read_status::waiting; - } - } - - // This is separated so that the compiler can inline "process_buffer". Because this chunk is too big, - // it usually won't if this is part of the main function - void do_process_buffer(temporary_buffer& data) { - // We're in the middle of reading a basic type, which crossed - // an input buffer. Resume that read before continuing to - // handle the current state: - if (_prestate == prestate::READING_BYTES) { - auto n = std::min(_read_bytes.size() - _pos, data.size()); - std::copy(data.begin(), data.begin() + n, - _read_bytes.get_write() + _pos); - data.trim_front(n); - _pos += n; - if (_pos == _read_bytes.size()) { - *_read_bytes_where = std::move(_read_bytes); - _prestate = prestate::NONE; - } - } else { - // in the middle of reading an integer - unsigned len; - switch (_prestate) { - case prestate::READING_U16: - len = sizeof(uint16_t); - break; - case prestate::READING_U32: - len = sizeof(uint32_t); - break; - case prestate::READING_U64: - len = sizeof(uint64_t); - break; - default: - throw malformed_sstable_exception("unknown prestate"); - } - assert(_pos < len); - auto n = std::min((size_t)(len - _pos), data.size()); - std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); - data.trim_front(n); - _pos += n; - if (_pos == len) { - // done reading the integer, store it in _u16, _u32 or _u64: - switch (_prestate) { - case prestate::READING_U16: - _u16 = net::ntoh(_read_int.uint16); - break; - case prestate::READING_U32: - _u32 = net::ntoh(_read_int.uint32); - break; - case prestate::READING_U64: - _u64 = net::ntoh(_read_int.uint64); - break; - default: - throw malformed_sstable_exception( - "unknown prestate"); - } - _prestate = prestate::NONE; - } - } - } - -protected: - inline void process_buffer(temporary_buffer& data) { - while (__builtin_expect((_prestate != prestate::NONE), 0)) { - do_process_buffer(data); - } - } -public: // process() feeds the given data into the state machine. // The consumer may request at any point (e.g., after reading a whole // row) to stop the processing, in which case we trim the buffer to // leave only the unprocessed part. The caller must handle calling // process() again, and/or refilling the buffer, as needed. - row_consumer::proceed process(temporary_buffer& data) { + row_consumer::proceed process_state(temporary_buffer& data) { #if 0 // Testing hack: call process() for tiny chunks separately, to verify // that primitive types crossing input buffer are handled correctly. @@ -287,61 +82,43 @@ public: return row_consumer::proceed::yes; } #endif - while (data || non_consuming()) { - process_buffer(data); - - switch (_state) { - case state::ROW_START: - // read 2-byte key length into _u16 - if (read_16(data) != read_status::ready) { - _state = state::ROW_KEY_BYTES; - break; - } - case state::ROW_KEY_BYTES: - // After previously reading 16-bit length, read key's bytes. - if (read_bytes(data, _u16, _key) != read_status::ready) { - _state = state::DELETION_TIME; - break; - } - case state::DELETION_TIME: - if (read_32(data) != read_status::ready) { - _state = state::DELETION_TIME_2; - break; - } - // fallthrough - case state::DELETION_TIME_2: - if (read_64(data) != read_status::ready) { - _state = state::DELETION_TIME_3; - break; - } - // fallthrough - case state::DELETION_TIME_3: { - deletion_time del; - del.local_deletion_time = _u32; - del.marked_for_delete_at = _u64; - _consumer.consume_row_start(to_bytes_view(_key), del); - // after calling the consume function, we can release the - // buffers we held for it. - _key.release(); - _state = state::ATOM_START; - } - case state::ATOM_START: - if (read_16(data) == read_status::ready) { - if (_u16 == 0) { - // end of row marker - _state = state::ROW_START; - if (_consumer.consume_row_end() == - row_consumer::proceed::no) { - return row_consumer::proceed::no; - } - } else { - _state = state::ATOM_NAME_BYTES; - } - } else { - _state = state::ATOM_START_2; - } + switch (_state) { + case state::ROW_START: + // read 2-byte key length into _u16 + if (read_16(data) != read_status::ready) { + _state = state::ROW_KEY_BYTES; break; - case state::ATOM_START_2: + } + case state::ROW_KEY_BYTES: + // After previously reading 16-bit length, read key's bytes. + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::DELETION_TIME; + break; + } + case state::DELETION_TIME: + if (read_32(data) != read_status::ready) { + _state = state::DELETION_TIME_2; + break; + } + // fallthrough + case state::DELETION_TIME_2: + if (read_64(data) != read_status::ready) { + _state = state::DELETION_TIME_3; + break; + } + // fallthrough + case state::DELETION_TIME_3: { + deletion_time del; + del.local_deletion_time = _u32; + del.marked_for_delete_at = _u64; + _consumer.consume_row_start(to_bytes_view(_key), del); + // after calling the consume function, we can release the + // buffers we held for it. + _key.release(); + _state = state::ATOM_START; + } + case state::ATOM_START: + if (read_16(data) == read_status::ready) { if (_u16 == 0) { // end of row marker _state = state::ROW_START; @@ -352,95 +129,88 @@ public: } else { _state = state::ATOM_NAME_BYTES; } - break; - case state::ATOM_NAME_BYTES: - if (read_bytes(data, _u16, _key) != read_status::ready) { - _state = state::ATOM_MASK; - break; - } - case state::ATOM_MASK: { - auto mask = consume_be(data); - enum mask_type { - DELETION_MASK = 0x01, - EXPIRATION_MASK = 0x02, - COUNTER_MASK = 0x04, - COUNTER_UPDATE_MASK = 0x08, - RANGE_TOMBSTONE_MASK = 0x10, - }; - if (mask & RANGE_TOMBSTONE_MASK) { - _state = state::RANGE_TOMBSTONE; - } else if (mask & COUNTER_MASK) { - // FIXME: see ColumnSerializer.java:deserializeColumnBody - throw malformed_sstable_exception("FIXME COUNTER_MASK"); - } else if (mask & EXPIRATION_MASK) { - _deleted = false; - _state = state::EXPIRING_CELL; - } else { - // FIXME: see ColumnSerializer.java:deserializeColumnBody - if (mask & COUNTER_UPDATE_MASK) { - throw malformed_sstable_exception("FIXME COUNTER_UPDATE_MASK"); - } - _ttl = _expiration = 0; - _deleted = mask & DELETION_MASK; - _state = state::CELL; + } else { + _state = state::ATOM_START_2; + } + break; + case state::ATOM_START_2: + if (_u16 == 0) { + // end of row marker + _state = state::ROW_START; + if (_consumer.consume_row_end() == + row_consumer::proceed::no) { + return row_consumer::proceed::no; } + } else { + _state = state::ATOM_NAME_BYTES; + } + break; + case state::ATOM_NAME_BYTES: + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::ATOM_MASK; break; } - case state::EXPIRING_CELL: - if (read_32(data) != read_status::ready) { - _state = state::EXPIRING_CELL_2; - break; + case state::ATOM_MASK: { + auto mask = consume_be(data); + enum mask_type { + DELETION_MASK = 0x01, + EXPIRATION_MASK = 0x02, + COUNTER_MASK = 0x04, + COUNTER_UPDATE_MASK = 0x08, + RANGE_TOMBSTONE_MASK = 0x10, + }; + if (mask & RANGE_TOMBSTONE_MASK) { + _state = state::RANGE_TOMBSTONE; + } else if (mask & COUNTER_MASK) { + // FIXME: see ColumnSerializer.java:deserializeColumnBody + throw malformed_sstable_exception("FIXME COUNTER_MASK"); + } else if (mask & EXPIRATION_MASK) { + _deleted = false; + _state = state::EXPIRING_CELL; + } else { + // FIXME: see ColumnSerializer.java:deserializeColumnBody + if (mask & COUNTER_UPDATE_MASK) { + throw malformed_sstable_exception("FIXME COUNTER_UPDATE_MASK"); } - // fallthrough - case state::EXPIRING_CELL_2: - _ttl = _u32; - if (read_32(data) != read_status::ready) { - _state = state::EXPIRING_CELL_3; - break; - } - // fallthrough - case state::EXPIRING_CELL_3: - _expiration = _u32; + _ttl = _expiration = 0; + _deleted = mask & DELETION_MASK; _state = state::CELL; - case state::CELL: { - if (read_64(data) != read_status::ready) { - _state = state::CELL_2; - break; - } } - case state::CELL_2: - if (read_32(data) != read_status::ready) { - _state = state::CELL_VALUE_BYTES; - break; - } - case state::CELL_VALUE_BYTES: - if (read_bytes(data, _u32, _val) == read_status::ready) { - // If the whole string is in our buffer, great, we don't - // need to copy, and can skip the CELL_VALUE_BYTES_2 state. - // - // finally pass it to the consumer: - if (_deleted) { - if (_val.size() != 4) { - throw malformed_sstable_exception("deleted cell expects local_deletion_time value"); - } - deletion_time del; - del.local_deletion_time = consume_be(_val); - del.marked_for_delete_at = _u64; - _consumer.consume_deleted_cell(to_bytes_view(_key), del); - } else { - _consumer.consume_cell(to_bytes_view(_key), - to_bytes_view(_val), _u64, _ttl, _expiration); - } - // after calling the consume function, we can release the - // buffers we held for it. - _key.release(); - _val.release(); - _state = state::ATOM_START; - } else { - _state = state::CELL_VALUE_BYTES_2; - } + break; + } + case state::EXPIRING_CELL: + if (read_32(data) != read_status::ready) { + _state = state::EXPIRING_CELL_2; break; - case state::CELL_VALUE_BYTES_2: + } + // fallthrough + case state::EXPIRING_CELL_2: + _ttl = _u32; + if (read_32(data) != read_status::ready) { + _state = state::EXPIRING_CELL_3; + break; + } + // fallthrough + case state::EXPIRING_CELL_3: + _expiration = _u32; + _state = state::CELL; + case state::CELL: { + if (read_64(data) != read_status::ready) { + _state = state::CELL_2; + break; + } + } + case state::CELL_2: + if (read_32(data) != read_status::ready) { + _state = state::CELL_VALUE_BYTES; + break; + } + case state::CELL_VALUE_BYTES: + if (read_bytes(data, _u32, _val) == read_status::ready) { + // If the whole string is in our buffer, great, we don't + // need to copy, and can skip the CELL_VALUE_BYTES_2 state. + // + // finally pass it to the consumer: if (_deleted) { if (_val.size() != 4) { throw malformed_sstable_exception("deleted cell expects local_deletion_time value"); @@ -458,47 +228,80 @@ public: _key.release(); _val.release(); _state = state::ATOM_START; - break; - case state::RANGE_TOMBSTONE: - if (read_16(data) != read_status::ready) { - _state = state::RANGE_TOMBSTONE_2; - break; + } else { + _state = state::CELL_VALUE_BYTES_2; + } + break; + case state::CELL_VALUE_BYTES_2: + if (_deleted) { + if (_val.size() != 4) { + throw malformed_sstable_exception("deleted cell expects local_deletion_time value"); } - case state::RANGE_TOMBSTONE_2: - // read the end column into _val. - if (read_bytes(data, _u16, _val) != read_status::ready) { - _state = state::RANGE_TOMBSTONE_3; - break; - } - case state::RANGE_TOMBSTONE_3: - if (read_32(data) != read_status::ready) { - _state = state::RANGE_TOMBSTONE_4; - break; - } - case state::RANGE_TOMBSTONE_4: - if (read_64(data) != read_status::ready) { - _state = state::RANGE_TOMBSTONE_5; - break; - } - case state::RANGE_TOMBSTONE_5: - { deletion_time del; - del.local_deletion_time = _u32; + del.local_deletion_time = consume_be(_val); del.marked_for_delete_at = _u64; - _consumer.consume_range_tombstone(to_bytes_view(_key), - to_bytes_view(_val), del); - _key.release(); - _val.release(); - _state = state::ATOM_START; + _consumer.consume_deleted_cell(to_bytes_view(_key), del); + } else { + _consumer.consume_cell(to_bytes_view(_key), + to_bytes_view(_val), _u64, _ttl, _expiration); + } + // after calling the consume function, we can release the + // buffers we held for it. + _key.release(); + _val.release(); + _state = state::ATOM_START; + break; + case state::RANGE_TOMBSTONE: + if (read_16(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_2; break; } - - default: - throw malformed_sstable_exception("unknown state"); + case state::RANGE_TOMBSTONE_2: + // read the end column into _val. + if (read_bytes(data, _u16, _val) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_3; + break; } + case state::RANGE_TOMBSTONE_3: + if (read_32(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_4; + break; + } + case state::RANGE_TOMBSTONE_4: + if (read_64(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_5; + break; + } + case state::RANGE_TOMBSTONE_5: + { + deletion_time del; + del.local_deletion_time = _u32; + del.marked_for_delete_at = _u64; + _consumer.consume_range_tombstone(to_bytes_view(_key), + to_bytes_view(_val), del); + _key.release(); + _val.release(); + _state = state::ATOM_START; + break; } + default: + throw malformed_sstable_exception("unknown state"); + } + return row_consumer::proceed::yes; } + + data_consume_rows_context(row_consumer& consumer, + input_stream && input, uint64_t maxlen) : + continuous_data_consumer(*this, std::move(input), maxlen) + , _consumer(consumer) { + } + + void verify_end_state() { + if (_state != state::ROW_START || _prestate != prestate::NONE) { + throw malformed_sstable_exception("end of input, but not end of row"); + } + } }; // data_consume_rows() and data_consume_rows_at_once() both can read just a From babccb11123a9fc0d24f33996ceceb1b9cdb1477 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 27 Aug 2015 14:02:07 -0500 Subject: [PATCH 12/12] read_indexes: convert to the NSM Reading each member individually is not as efficient. Better convert to the NSM. Before: 717101.20 +- 649.77 partitions / sec (30 runs, 1 concurrent ops) After: 838169.80 +- 575.04 partitions / sec (30 runs, 1 concurrent ops) Gains: 16.88 % Signed-off-by: Glauber Costa --- sstables/index_reader.hh | 118 +++++++++++++++++++++++++++++++++++++++ sstables/sstables.cc | 55 ++---------------- 2 files changed, 124 insertions(+), 49 deletions(-) create mode 100644 sstables/index_reader.hh diff --git a/sstables/index_reader.hh b/sstables/index_reader.hh new file mode 100644 index 0000000000..3964173bfb --- /dev/null +++ b/sstables/index_reader.hh @@ -0,0 +1,118 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once +#include "sstables.hh" +#include "consumer.hh" + +namespace sstables { + +class index_consumer { + uint64_t max_quantity; +public: + index_list indexes; + + index_consumer(uint64_t q) : max_quantity(q) { + indexes.reserve(q); + } + + bool should_continue() { + return indexes.size() < max_quantity; + } + void consume_entry(index_entry&& ie) { + indexes.push_back(std::move(ie)); + } +}; + +class index_consume_entry_context: public data_consumer::continuous_data_consumer { + using proceed = data_consumer::proceed; +private: + index_consumer& _consumer; + + enum class state { + START, + KEY_SIZE, + KEY_BYTES, + POSITION, + PROMOTED_SIZE, + PROMOTED_BYTES, + CONSUME_ENTRY, + } _state = state::START; + + temporary_buffer _key; + temporary_buffer _promoted; + + static inline bytes to_bytes(temporary_buffer& b) { + using byte = bytes_view::value_type; + auto s = bytes(reinterpret_cast(b.get()), b.size()); + b.release(); + return s; + } + +public: + void verify_end_state() { + } + + bool non_consuming() const { + return ((_state == state::CONSUME_ENTRY) || (_state == state::START) || + ((_state == state::PROMOTED_BYTES) && (_prestate == prestate::NONE))); + } + + proceed process_state(temporary_buffer& data) { + switch (_state) { + // START comes first, to make the handling of the 0-quantity case simpler + case state::START: + if (!_consumer.should_continue()) { + return proceed::no; + } + _state = state::KEY_SIZE; + break; + case state::KEY_SIZE: + if (read_16(data) != read_status::ready) { + _state = state::KEY_BYTES; + break; + } + case state::KEY_BYTES: + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::POSITION; + break; + } + case state::POSITION: + if (read_64(data) != read_status::ready) { + _state = state::PROMOTED_SIZE; + break; + } + case state::PROMOTED_SIZE: + if (read_32(data) != read_status::ready) { + _state = state::PROMOTED_BYTES; + break; + } + case state::PROMOTED_BYTES: + if (read_bytes(data, _u32, _promoted) != read_status::ready) { + _state = state::CONSUME_ENTRY; + break; + } + case state::CONSUME_ENTRY: { + index_entry ie; + ie.key.value = to_bytes(_key); + ie.position = _u64; + ie.promoted_index.value = to_bytes(_promoted); + _consumer.consume_entry(std::move(ie)); + _state = state::START; + break; + } + default: + throw malformed_sstable_exception("unknown state"); + } + return proceed::yes; + } + + index_consume_entry_context(index_consumer& consumer, + input_stream&& input, uint64_t maxlen) + : continuous_data_consumer(*this, std::move(input), maxlen) + , _consumer(consumer) + {} + +}; +} diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 5625b7334a..142b7719a6 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -19,6 +19,7 @@ #include "sstables.hh" #include "compress.hh" #include "unimplemented.hh" +#include "index_reader.hh" #include #include #include @@ -756,56 +757,12 @@ future sstable::read_indexes(uint64_t summary_idx) { estimated_size = std::min(uint64_t(sstable_buffer_size), align_up(estimated_size, uint64_t(8 << 10))); - struct reader { - uint64_t count = 0; - std::vector indexes; - shared_file_random_access_reader stream; - reader(file f, uint64_t quantity, uint64_t estimated_size) : stream(f, estimated_size) { indexes.reserve(quantity); } - }; - - auto r = make_lw_shared(_index_file, quantity, estimated_size); - - r->stream.seek(position); - - auto end = [r, quantity] { return r->count >= quantity; }; - - return do_until(end, [this, r] { - r->indexes.emplace_back(); - auto fut = parse(r->stream, r->indexes.back()); - return std::move(fut).then_wrapped([this, r] (future<> f) mutable { - try { - f.get(); - r->count++; - } catch (bufsize_mismatch_exception &e) { - // We have optimistically emplaced back one element of the - // vector. If we have failed to parse, we should remove it - // so size() gives us the right picture. - r->indexes.pop_back(); - - // FIXME: If the file ends at an index boundary, there is - // no problem. Essentially, we can't know how many indexes - // are in a sampling group, so there isn't really any way - // to know, other than reading. - // - // If, however, we end in the middle of an index, this is a - // corrupted file. This code is not perfect because we only - // know that an exception happened, and it happened due to - // eof. We don't really know if eof happened at the index - // boundary. To know that, we would have to keep track of - // the real position of the stream (including what's - // already in the buffer) before we start to read the - // index, and after. We won't go through such complexity at - // the moment. - if (r->stream.eof()) { - r->count = std::numeric_limitscount)>::type>::max(); - } else { - throw e; - } - } - return make_ready_future<>(); + return do_with(index_consumer(quantity), [this, position, estimated_size] (index_consumer& ic) { + auto stream = make_file_input_stream(this->_index_file, position, estimated_size); + auto ctx = make_lw_shared(ic, std::move(stream), this->index_size() - position); + return ctx->consume_input(*ctx).then([ctx, &ic] { + return make_ready_future(std::move(ic.indexes)); }); - }).then([r] { - return make_ready_future(std::move(r->indexes)); }); }