From a195df836d08150c1e18fcc844463b200d442ef7 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:21:25 +0100 Subject: [PATCH 01/13] service: Cleanup formatting in storage_proxy.cc --- service/storage_proxy.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3fe8600faf..a91c68ae49 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2128,7 +2128,7 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev } Set allEndpoints = Gossiper.instance.getLiveTokenOwners(); - + int blockFor = allEndpoints.size(); final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor); @@ -2159,7 +2159,7 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev { return !Gossiper.instance.getUnreachableTokenOwners().isEmpty(); } - + public interface WritePerformer { public void apply(IMutation mutation, @@ -2320,15 +2320,15 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); } public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); } - + public long getReadRepairAttempted() { return ReadRepairMetrics.attempted.count(); } - + public long getReadRepairRepairedBlocking() { return ReadRepairMetrics.repairedBlocking.count(); } - + public long getReadRepairRepairedBackground() { return ReadRepairMetrics.repairedBackground.count(); } From ecf0db17cebccd4df8aab0a2be78c60e26e9c417 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:18:55 +0100 Subject: [PATCH 02/13] db: Drop comment which doesn't seem to be relevant any more --- database.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/database.cc b/database.cc index 3c531f3bd9..eef94d8c64 100644 --- a/database.cc +++ b/database.cc @@ -113,7 +113,6 @@ column_family::find_or_create_partition(const bytes& key) { row& column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) { mutation_partition& p = find_or_create_partition(partition_key); - // call lower_bound so we have a hint for the insert, just in case. return p.clustered_row(clustering_key); } From 1f6360ec3bd422445e4240f826a89803371f5fd3 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:17:54 +0100 Subject: [PATCH 03/13] cql3: Drop redundant key validation Keys are also validated right before in build_partition_keys(). --- cql3/statements/modification_statement.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 36ccd068ee..1fcb3ee251 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -59,7 +59,6 @@ modification_statement::get_mutations(const query_options& options, bool local, std::vector mutations; mutations.reserve(keys->size()); for (auto key : *keys) { - validation::validate_cql_key(s, key); mutations.emplace_back(std::move(key), s); auto& m = mutations.back(); this->add_update_for_key(m, *prefix, *params_ptr); From f321b9e9b55ea238abacf1fe277547451a9d52ae Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 13 Mar 2015 18:16:59 +0100 Subject: [PATCH 04/13] util: Make hash functions work on bytes_view --- dht/murmur3_partitioner.cc | 2 +- utils/murmur_hash.cc | 92 +++++++++++++++++++------------------- utils/murmur_hash.hh | 9 ++-- 3 files changed, 51 insertions(+), 52 deletions(-) diff --git a/dht/murmur3_partitioner.cc b/dht/murmur3_partitioner.cc index 19ef36eda1..199456f029 100644 --- a/dht/murmur3_partitioner.cc +++ b/dht/murmur3_partitioner.cc @@ -21,7 +21,7 @@ murmur3_partitioner::get_token(const bytes& key) { return minimum_token(); } std::array hash; - utils::murmur_hash::hash3_x64_128(key, 0, key.size(), 0, hash); + utils::murmur_hash::hash3_x64_128(key, 0, hash); // We don't normalize() the value, since token includes an is-before-everything // indicator. // FIXME: will this require a repair when importing a database? diff --git a/utils/murmur_hash.cc b/utils/murmur_hash.cc index a5e6357494..ec013aa2b5 100644 --- a/utils/murmur_hash.cc +++ b/utils/murmur_hash.cc @@ -26,8 +26,9 @@ namespace utils { namespace murmur_hash { -uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t seed) +uint32_t hash32(bytes_view data, uint32_t seed) { + uint32_t length = data.size(); uint32_t m = 0x5bd1e995; uint32_t r = 24; @@ -38,13 +39,13 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se for (uint32_t i = 0; i < len_4; i++) { uint32_t i_4 = i << 2; - uint32_t k = data[offset + i_4 + 3]; + uint32_t k = data[i_4 + 3]; k = k << 8; - k = k | (data[offset + i_4 + 2] & 0xff); + k = k | (data[i_4 + 2] & 0xff); k = k << 8; - k = k | (data[offset + i_4 + 1] & 0xff); + k = k | (data[i_4 + 1] & 0xff); k = k << 8; - k = k | (data[offset + i_4 + 0] & 0xff); + k = k | (data[i_4 + 0] & 0xff); k *= m; k ^= (uint32_t)k >> r; k *= m; @@ -60,15 +61,15 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se { if (left >= 3) { - h ^= (uint32_t) data[offset + length - 3] << 16; + h ^= (uint32_t) data[length - 3] << 16; } if (left >= 2) { - h ^= (uint32_t) data[offset + length - 2] << 8; + h ^= (uint32_t) data[length - 2] << 8; } if (left >= 1) { - h ^= (uint32_t) data[offset + length - 1]; + h ^= (uint32_t) data[length - 1]; } h *= m; @@ -81,8 +82,9 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se return h; } -uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t seed) +uint64_t hash2_64(bytes_view key, uint64_t seed) { + uint32_t length = key.size(); uint64_t m64 = 0xc6a4a7935bd1e995L; uint32_t r64 = 47; @@ -94,10 +96,10 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s { uint32_t i_8 = i << 3; - uint64_t k64 = ((uint64_t) key[offset+i_8+0] & 0xff) + (((uint64_t) key[offset+i_8+1] & 0xff)<<8) + - (((uint64_t) key[offset+i_8+2] & 0xff)<<16) + (((uint64_t) key[offset+i_8+3] & 0xff)<<24) + - (((uint64_t) key[offset+i_8+4] & 0xff)<<32) + (((uint64_t) key[offset+i_8+5] & 0xff)<<40) + - (((uint64_t) key[offset+i_8+6] & 0xff)<<48) + (((uint64_t) key[offset+i_8+7] & 0xff)<<56); + uint64_t k64 = ((uint64_t) key[i_8+0] & 0xff) + (((uint64_t) key[i_8+1] & 0xff)<<8) + + (((uint64_t) key[i_8+2] & 0xff)<<16) + (((uint64_t) key[i_8+3] & 0xff)<<24) + + (((uint64_t) key[i_8+4] & 0xff)<<32) + (((uint64_t) key[i_8+5] & 0xff)<<40) + + (((uint64_t) key[i_8+6] & 0xff)<<48) + (((uint64_t) key[i_8+7] & 0xff)<<56); k64 *= m64; k64 ^= k64 >> r64; @@ -114,19 +116,19 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s case 0: break; case 7: - h64 ^= (uint64_t) key[offset + length - rem + 6] << 48; + h64 ^= (uint64_t) key[length - rem + 6] << 48; case 6: - h64 ^= (uint64_t) key[offset + length - rem + 5] << 40; + h64 ^= (uint64_t) key[length - rem + 5] << 40; case 5: - h64 ^= (uint64_t) key[offset + length - rem + 4] << 32; + h64 ^= (uint64_t) key[length - rem + 4] << 32; case 4: - h64 ^= (uint64_t) key[offset + length - rem + 3] << 24; + h64 ^= (uint64_t) key[length - rem + 3] << 24; case 3: - h64 ^= (uint64_t) key[offset + length - rem + 2] << 16; + h64 ^= (uint64_t) key[length - rem + 2] << 16; case 2: - h64 ^= (uint64_t) key[offset + length - rem + 1] << 8; + h64 ^= (uint64_t) key[length - rem + 1] << 8; case 1: - h64 ^= (uint64_t) key[offset + length - rem]; + h64 ^= (uint64_t) key[length - rem]; h64 *= m64; } @@ -137,14 +139,13 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s return h64; } -static uint64_t getblock(const bytes &key, uint32_t offset, uint32_t index) +static uint64_t getblock(bytes_view key, uint32_t index) { uint32_t i_8 = index << 3; - uint32_t blockOffset = offset + i_8; - return ((uint64_t) key[blockOffset + 0] & 0xff) + (((uint64_t) key[blockOffset + 1] & 0xff) << 8) + - (((uint64_t) key[blockOffset + 2] & 0xff) << 16) + (((uint64_t) key[blockOffset + 3] & 0xff) << 24) + - (((uint64_t) key[blockOffset + 4] & 0xff) << 32) + (((uint64_t) key[blockOffset + 5] & 0xff) << 40) + - (((uint64_t) key[blockOffset + 6] & 0xff) << 48) + (((uint64_t) key[blockOffset + 7] & 0xff) << 56); + return ((uint64_t) key[i_8 + 0] & 0xff) + (((uint64_t) key[i_8 + 1] & 0xff) << 8) + + (((uint64_t) key[i_8 + 2] & 0xff) << 16) + (((uint64_t) key[i_8 + 3] & 0xff) << 24) + + (((uint64_t) key[i_8 + 4] & 0xff) << 32) + (((uint64_t) key[i_8 + 5] & 0xff) << 40) + + (((uint64_t) key[i_8 + 6] & 0xff) << 48) + (((uint64_t) key[i_8 + 7] & 0xff) << 56); } static uint64_t rotl64(uint64_t v, uint32_t n) @@ -163,8 +164,9 @@ static uint64_t fmix(uint64_t k) return k; } -void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t seed, std::array &result) +void hash3_x64_128(bytes_view key, uint64_t seed, std::array &result) { + uint32_t length = key.size(); const uint32_t nblocks = length >> 4; // Process as 128-bit blocks. uint64_t h1 = seed; @@ -178,8 +180,8 @@ void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t for(uint32_t i = 0; i < nblocks; i++) { - uint64_t k1 = getblock(key, offset, i*2+0); - uint64_t k2 = getblock(key, offset, i*2+1); + uint64_t k1 = getblock(key, i*2+0); + uint64_t k2 = getblock(key, i*2+1); k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1; @@ -194,29 +196,29 @@ void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t // tail // Advance offset to the unprocessed tail of the data. - offset += nblocks * 16; + key.remove_prefix(nblocks * 16); uint64_t k1 = 0; uint64_t k2 = 0; switch(length & 15) { - case 15: k2 ^= ((uint64_t) key[offset+14]) << 48; - case 14: k2 ^= ((uint64_t) key[offset+13]) << 40; - case 13: k2 ^= ((uint64_t) key[offset+12]) << 32; - case 12: k2 ^= ((uint64_t) key[offset+11]) << 24; - case 11: k2 ^= ((uint64_t) key[offset+10]) << 16; - case 10: k2 ^= ((uint64_t) key[offset+9]) << 8; - case 9: k2 ^= ((uint64_t) key[offset+8]) << 0; + case 15: k2 ^= ((uint64_t) key[14]) << 48; + case 14: k2 ^= ((uint64_t) key[13]) << 40; + case 13: k2 ^= ((uint64_t) key[12]) << 32; + case 12: k2 ^= ((uint64_t) key[11]) << 24; + case 11: k2 ^= ((uint64_t) key[10]) << 16; + case 10: k2 ^= ((uint64_t) key[9]) << 8; + case 9: k2 ^= ((uint64_t) key[8]) << 0; k2 *= c2; k2 = rotl64(k2,33); k2 *= c1; h2 ^= k2; - case 8: k1 ^= ((uint64_t) key[offset+7]) << 56; - case 7: k1 ^= ((uint64_t) key[offset+6]) << 48; - case 6: k1 ^= ((uint64_t) key[offset+5]) << 40; - case 5: k1 ^= ((uint64_t) key[offset+4]) << 32; - case 4: k1 ^= ((uint64_t) key[offset+3]) << 24; - case 3: k1 ^= ((uint64_t) key[offset+2]) << 16; - case 2: k1 ^= ((uint64_t) key[offset+1]) << 8; - case 1: k1 ^= ((uint64_t) key[offset]); + case 8: k1 ^= ((uint64_t) key[7]) << 56; + case 7: k1 ^= ((uint64_t) key[6]) << 48; + case 6: k1 ^= ((uint64_t) key[5]) << 40; + case 5: k1 ^= ((uint64_t) key[4]) << 32; + case 4: k1 ^= ((uint64_t) key[3]) << 24; + case 3: k1 ^= ((uint64_t) key[2]) << 16; + case 2: k1 ^= ((uint64_t) key[1]) << 8; + case 1: k1 ^= ((uint64_t) key[0]); k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1; }; diff --git a/utils/murmur_hash.hh b/utils/murmur_hash.hh index fa20c5f601..28b73fd96d 100644 --- a/utils/murmur_hash.hh +++ b/utils/murmur_hash.hh @@ -39,12 +39,9 @@ namespace utils { namespace murmur_hash { - uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, - int32_t seed); - uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, - uint64_t seed); - void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, - uint64_t seed, std::array &result); + uint32_t hash32(bytes_view data, int32_t seed); + uint64_t hash2_64(bytes_view key, uint64_t seed); + void hash3_x64_128(bytes_view key, uint64_t seed, std::array &result); }; } // namespace utils From 1d5de9e428d6d365c1c311714c5848f6a0a7995e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 12 Mar 2015 20:29:40 +0100 Subject: [PATCH 05/13] types: Introduce lexicographical_compare() version with third sequence --- types.hh | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/types.hh b/types.hh index 993741cdc0..b1744da438 100644 --- a/types.hh +++ b/types.hh @@ -27,6 +27,25 @@ class column_specification; } +// Like std::lexicographical_compare but injects values from shared sequence (types) to the comparator +// Compare is an abstract_type-aware less comparator, which takes the type as first argument. +template +bool lexicographical_compare(TypesIterator types, InputIt1 first1, InputIt1 last1, + InputIt2 first2, InputIt2 last2, Compare comp) { + while (first1 != last1 && first2 != last2) { + if (comp(*types, *first1, *first2)) { + return true; + } + if (comp(*types, *first2, *first1)) { + return false; + } + ++first1; + ++first2; + ++types; + } + return (first1 == last1) && (first2 != last2); +} + using object_opt = std::experimental::optional; class marshal_exception : public std::exception { From e3a04ae21bd7eb507168e80207d49285b3b2915d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 12 Mar 2015 20:30:47 +0100 Subject: [PATCH 06/13] types: Introduce generic is_prefixed_by() --- types.hh | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/types.hh b/types.hh index b1744da438..2a095719f7 100644 --- a/types.hh +++ b/types.hh @@ -46,6 +46,22 @@ bool lexicographical_compare(TypesIterator types, InputIt1 first1, InputIt1 last return (first1 == last1) && (first2 != last2); } +// Returns true iff the second sequence is a prefix of the first sequence +// Equality is an abstract_type-aware equality checker which takes the type as first argument. +template +bool is_prefixed_by(TypesIterator types, InputIt1 first1, InputIt1 last1, + InputIt2 first2, InputIt2 last2, Equality equality) { + while (first1 != last1 && first2 != last2) { + if (!equality(*types, *first1, *first2)) { + return false; + } + ++first1; + ++first2; + ++types; + } + return first2 == last2; +} + using object_opt = std::experimental::optional; class marshal_exception : public std::exception { From 4b33888c23836affcd0cbe367d94ce782a65912f Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:02:26 +0100 Subject: [PATCH 07/13] types: Add algorithms for comparing optional types --- types.hh | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/types.hh b/types.hh index 2a095719f7..4b9fcad9b2 100644 --- a/types.hh +++ b/types.hh @@ -199,6 +199,29 @@ public: }; using data_type = shared_ptr; +using bytes_view_opt = std::experimental::optional; + +static inline +bool optional_less_compare(data_type t, bytes_view_opt e1, bytes_view_opt e2) { + if (bool(e1) != bool(e2)) { + return bool(e2); + } + if (!e1) { + return false; + } + return t->less(*e1, *e2); +} + +static inline +bool optional_equal(data_type t, bytes_view_opt e1, bytes_view_opt e2) { + if (bool(e1) != bool(e2)) { + return false; + } + if (!e1) { + return true; + } + return t->equal(*e1, *e2); +} class collection_type_impl : public abstract_type { static thread_local logging::logger _logger; From fffa35ac6b6ecea884b9d1df77f8cb028ccb6fa8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:03:37 +0100 Subject: [PATCH 08/13] types: Make abstract_type::validate() work on bytes_view --- types.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/types.hh b/types.hh index 4b9fcad9b2..d28f0040bd 100644 --- a/types.hh +++ b/types.hh @@ -116,10 +116,10 @@ public: } } virtual object_opt deserialize(bytes_view v) = 0; - virtual void validate(const bytes& v) { + virtual void validate(bytes_view v) { // FIXME } - virtual void validate_collection_member(const bytes& v, const bytes& collection_name) { + virtual void validate_collection_member(bytes_view v, const bytes& collection_name) { validate(v); } virtual bool is_compatible_with(abstract_type& previous) { From 1b1af8cdfd4f2d1d923b029aba5aa2586c724587 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:32:29 +0100 Subject: [PATCH 09/13] db: Introduce types to hold keys Holding keys and their prefixes as "bytes" is error prone. It's easy to mix them up (or use wrong types). This change adds wrappers for keys with accessors which are meant to make misuses as difficult as possible. Prefix and full keys are now distinguished. Places which assumed that the representation is the same (it currently is) were changed not to do so. This will allow us to introduce more compact storage for non-prefix keys. --- cql3/operation.hh | 2 +- .../forwarding_primary_key_restrictions.hh | 19 +- cql3/restrictions/primary_key_restrictions.hh | 5 +- .../reverse_primary_key_restrictions.hh | 11 +- .../single_column_primary_key_restrictions.hh | 63 ++--- cql3/restrictions/statement_restrictions.hh | 21 +- cql3/statements/modification_statement.cc | 18 +- cql3/statements/modification_statement.hh | 6 +- cql3/statements/select_statement.cc | 4 +- cql3/update_parameters.hh | 2 +- database.cc | 64 +++-- database.hh | 48 ++-- dht/i_partitioner.hh | 18 +- dht/murmur3_partitioner.cc | 3 +- dht/murmur3_partitioner.hh | 2 +- keys.hh | 236 ++++++++++++++++++ query.hh | 108 ++++---- service/storage_proxy.cc | 8 +- tests/perf/perf_mutation.cc | 4 +- tests/urchin/cql_query_test.cc | 4 +- tests/urchin/mutation_test.cc | 33 ++- thrift/handler.cc | 37 ++- tuple.hh | 4 + unimplemented.cc | 1 + unimplemented.hh | 3 +- validation.cc | 11 +- validation.hh | 2 +- 27 files changed, 499 insertions(+), 238 deletions(-) create mode 100644 keys.hh diff --git a/cql3/operation.hh b/cql3/operation.hh index 58e44c535e..b0d8e4ee8d 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -109,7 +109,7 @@ public: /** * Execute the operation. */ - virtual void execute(mutation& m, const clustering_prefix& row_key, const update_parameters& params) = 0; + virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) = 0; /** * A parsed raw UPDATE operation. diff --git a/cql3/restrictions/forwarding_primary_key_restrictions.hh b/cql3/restrictions/forwarding_primary_key_restrictions.hh index e9b5fb770e..c7853b87e2 100644 --- a/cql3/restrictions/forwarding_primary_key_restrictions.hh +++ b/cql3/restrictions/forwarding_primary_key_restrictions.hh @@ -31,16 +31,17 @@ namespace cql3 { namespace restrictions { /** - * A primary_key_restrictions which forwards all its method calls to another - * primary_key_restrictions. Subclasses should override one or more methods to modify the behavior - * of the backing primary_key_restrictions as desired per the decorator pattern. + * A primary_key_restrictions which forwards all its method calls to another + * primary_key_restrictions. Subclasses should override one or more methods to modify the behavior + * of the backing primary_key_restrictions as desired per the decorator pattern. */ -class forwarding_primary_key_restrictions : public primary_key_restrictions { +template +class forwarding_primary_key_restrictions : public primary_key_restrictions { protected: /** * Returns the backing delegate instance that methods are forwarded to. */ - virtual ::shared_ptr get_delegate() = 0; + virtual ::shared_ptr> get_delegate() = 0; public: virtual bool uses_function(const sstring& ks_name, const sstring& function_name) override { @@ -61,11 +62,11 @@ public: } #endif - virtual std::vector values_as_serialized_tuples(const query_options& options) override { - return get_delegate()->values_as_serialized_tuples(options); + virtual std::vector values(const query_options& options) override { + return get_delegate()->values(options); } - virtual std::vector bounds(const query_options& options) override { + virtual std::vector> bounds(const query_options& options) override { return get_delegate()->bounds(options); } @@ -101,7 +102,7 @@ public: virtual void addIndexExpressionTo(List expressions, QueryOptions options) { get_delegate()->addIndexExpressionTo(expressions, options); } -#endif +#endif }; } diff --git a/cql3/restrictions/primary_key_restrictions.hh b/cql3/restrictions/primary_key_restrictions.hh index 6312458878..4684081238 100644 --- a/cql3/restrictions/primary_key_restrictions.hh +++ b/cql3/restrictions/primary_key_restrictions.hh @@ -47,13 +47,14 @@ namespace restrictions { * What was in AbstractPrimaryKeyRestrictions was moved here (In pre 1.8 Java interfaces could not have default * implementations of methods). */ +template class primary_key_restrictions : public restrictions { public: virtual void merge_with(::shared_ptr restriction) = 0; - virtual std::vector values_as_serialized_tuples(const query_options& options) = 0; + virtual std::vector values(const query_options& options) = 0; - virtual std::vector bounds(const query_options& options) = 0; + virtual std::vector> bounds(const query_options& options) = 0; virtual bool is_inclusive(statements::bound b) { return true; } diff --git a/cql3/restrictions/reverse_primary_key_restrictions.hh b/cql3/restrictions/reverse_primary_key_restrictions.hh index 2629d14ab9..3b40456fb0 100644 --- a/cql3/restrictions/reverse_primary_key_restrictions.hh +++ b/cql3/restrictions/reverse_primary_key_restrictions.hh @@ -33,19 +33,20 @@ namespace restrictions { /** * PrimaryKeyRestrictions decorator that reverse the slices. */ -class reversed_primary_key_restrictions : public forwarding_primary_key_restrictions { +template +class reversed_primary_key_restrictions : public forwarding_primary_key_restrictions { private: - ::shared_ptr _restrictions; + ::shared_ptr> _restrictions; protected: - virtual ::shared_ptr get_delegate() override { + virtual ::shared_ptr> get_delegate() override { return _restrictions; } public: - reversed_primary_key_restrictions(shared_ptr restrictions) + reversed_primary_key_restrictions(shared_ptr> restrictions) : _restrictions(std::move(restrictions)) { } - virtual std::vector bounds(const query_options& options) override { + virtual std::vector> bounds(const query_options& options) override { auto ranges = _restrictions->bounds(options); for (auto&& range : ranges) { range.reverse(); diff --git a/cql3/restrictions/single_column_primary_key_restrictions.hh b/cql3/restrictions/single_column_primary_key_restrictions.hh index 2125d8b49f..93f39a9e8e 100644 --- a/cql3/restrictions/single_column_primary_key_restrictions.hh +++ b/cql3/restrictions/single_column_primary_key_restrictions.hh @@ -37,19 +37,20 @@ namespace restrictions { /** * A set of single column restrictions on a primary key part (partition key or clustering key). */ -class single_column_primary_key_restrictions : public primary_key_restrictions { +template +class single_column_primary_key_restrictions : public primary_key_restrictions { + using range_type = query::range; + using range_bound = typename range_type::bound; private: schema_ptr _schema; ::shared_ptr _restrictions; - ::shared_ptr> _tuple; bool _slice; bool _contains; bool _in; public: - single_column_primary_key_restrictions(schema_ptr schema, ::shared_ptr> tuple) + single_column_primary_key_restrictions(schema_ptr schema) : _schema(schema) , _restrictions(::make_shared(schema)) - , _tuple(std::move(tuple)) , _slice(false) , _contains(false) , _in(false) @@ -121,7 +122,7 @@ public: do_merge_with(::static_pointer_cast(restriction)); } - virtual std::vector values_as_serialized_tuples(const query_options& options) override { + virtual std::vector values(const query_options& options) override { std::vector> value_vector; value_vector.reserve(_restrictions->size()); for (auto def : _restrictions->get_column_defs()) { @@ -140,16 +141,16 @@ public: value_vector.emplace_back(std::move(values)); } - std::vector result; + std::vector result; result.reserve(cartesian_product_size(value_vector)); for (auto&& v : make_cartesian_product(value_vector)) { - result.emplace_back(_tuple->serialize_value(v)); + result.emplace_back(ValueType::from_exploded(*_schema, v)); } return result; } - virtual std::vector bounds(const query_options& options) override { - std::vector ranges; + virtual std::vector bounds(const query_options& options) override { + std::vector ranges; std::vector> vec_of_values; // TODO: optimize for all EQ case @@ -164,26 +165,20 @@ public: } if (r->is_slice()) { - // TODO: make restriction::bounds() return query::range to simplify all this if (cartesian_product_is_empty(vec_of_values)) { - auto read_value = [r, &options] (statements::bound b) { + auto read_bound = [r, &options, this] (statements::bound b) -> std::experimental::optional { + if (!r->has_bound(b)) { + return {}; + } auto value = r->bounds(b, options)[0]; if (!value) { throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string())); } - return *value; + return {range_bound(ValueType::from_exploded(*_schema, {*value}), r->is_inclusive(b))}; }; - if (r->has_bound(statements::bound::START) && r->has_bound(statements::bound::END)) { - ranges.emplace_back(query::range(read_value(statements::bound::START), read_value(statements::bound::END), - r->is_inclusive(statements::bound::START), r->is_inclusive(statements::bound::END))); - } else if (r->has_bound(statements::bound::START)) { - ranges.emplace_back(query::range::make_starting_with(read_value(statements::bound::START), - r->is_inclusive(statements::bound::START))); - } else { - assert(r->has_bound(statements::bound::END)); - ranges.emplace_back(query::range::make_ending_with(read_value(statements::bound::END), - r->is_inclusive(statements::bound::END))); - } + ranges.emplace_back(range_type( + read_bound(statements::bound::START), + read_bound(statements::bound::END))); if (def->type->is_reversed()) { ranges.back().reverse(); } @@ -192,31 +187,25 @@ public: ranges.reserve(cartesian_product_size(vec_of_values)); for (auto&& prefix : make_cartesian_product(vec_of_values)) { - auto read_bounds = [r, &prefix, &options, this](bytes& value_holder, bool& inclusive_holder, statements::bound bound) { + auto read_bound = [r, &prefix, &options, this](statements::bound bound) -> range_bound { if (r->has_bound(bound)) { auto value = std::move(r->bounds(bound, options)[0]); if (!value) { throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string())); } prefix.emplace_back(std::move(value)); - value_holder = _tuple->serialize_value(prefix); + auto val = ValueType::from_exploded(*_schema, prefix); prefix.pop_back(); - inclusive_holder = r->is_inclusive(bound); + return range_bound(std::move(val), r->is_inclusive(bound)); } else { - value_holder = _tuple->serialize_value(prefix); - inclusive_holder = true; + return range_bound(ValueType::from_exploded(*_schema, prefix)); } }; - bytes start_tuple; - bytes end_tuple; - bool start_inclusive; - bool end_inclusive; + ranges.emplace_back(range_type( + read_bound(statements::bound::START), + read_bound(statements::bound::END))); - read_bounds(start_tuple, start_inclusive, statements::bound::START); - read_bounds(end_tuple, end_inclusive, statements::bound::END); - ranges.emplace_back(query::range(std::move(start_tuple), std::move(end_tuple), - start_inclusive, end_inclusive)); if (def->type->is_reversed()) { ranges.back().reverse(); } @@ -239,7 +228,7 @@ public: ranges.reserve(cartesian_product_size(vec_of_values)); for (auto&& prefix : make_cartesian_product(vec_of_values)) { - ranges.emplace_back(query::range::make_singular(_tuple->serialize_value(prefix))); + ranges.emplace_back(range_type::make_singular(ValueType::from_exploded(*_schema, prefix))); } return std::move(ranges); diff --git a/cql3/restrictions/statement_restrictions.hh b/cql3/restrictions/statement_restrictions.hh index 7b9df469de..272c960f51 100644 --- a/cql3/restrictions/statement_restrictions.hh +++ b/cql3/restrictions/statement_restrictions.hh @@ -50,12 +50,12 @@ private: /** * Restrictions on partitioning columns */ - ::shared_ptr _partition_key_restrictions; + ::shared_ptr> _partition_key_restrictions; /** * Restrictions on clustering columns */ - ::shared_ptr _clustering_columns_restrictions; + ::shared_ptr> _clustering_columns_restrictions; /** * Restriction on non-primary key columns (i.e. secondary index restrictions) @@ -212,14 +212,12 @@ private: auto& def = restriction->get_column_def(); if (def.is_partition_key()) { if (!_partition_key_restrictions) { - _partition_key_restrictions = ::make_shared(_schema, - _schema->partition_key_prefix_type); + _partition_key_restrictions = ::make_shared>(_schema); } _partition_key_restrictions->merge_with(restriction); } else if (def.is_clustering_key()) { if (!_clustering_columns_restrictions) { - _clustering_columns_restrictions = ::make_shared(_schema, - _schema->clustering_key_prefix_type); + _clustering_columns_restrictions = ::make_shared>(_schema); } _clustering_columns_restrictions->merge_with(restriction); } else { @@ -383,9 +381,9 @@ public: * @return the specified bound of the partition key * @throws InvalidRequestException if the boundary cannot be retrieved */ - std::vector get_partition_key_ranges(const query_options& options) const { + std::vector get_partition_key_ranges(const query_options& options) const { if (!_partition_key_restrictions) { - return {query::range::make_open_ended_both_sides()}; + return {query::partition_range::make_open_ended_both_sides()}; } return _partition_key_restrictions->bounds(options); } @@ -510,9 +508,9 @@ public: #endif public: - std::vector get_clustering_bounds(const query_options& options) const { + std::vector get_clustering_bounds(const query_options& options) const { if (!_clustering_columns_restrictions) { - return {query::range::make_open_ended_both_sides()}; + return {query::clustering_range::make_open_ended_both_sides()}; } return _clustering_columns_restrictions->bounds(options); } @@ -559,7 +557,8 @@ public: void reverse() { if (_clustering_columns_restrictions) { - _clustering_columns_restrictions = ::make_shared(_clustering_columns_restrictions); + _clustering_columns_restrictions = ::make_shared>( + _clustering_columns_restrictions); } } }; diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 1fcb3ee251..26bc027e6a 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -69,7 +69,7 @@ modification_statement::get_mutations(const query_options& options, bool local, future> modification_statement::make_update_parameters( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, const query_options& options, bool local, @@ -86,7 +86,7 @@ modification_statement::make_update_parameters( future modification_statement::read_required_rows( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, bool local, db::consistency_level cl) { @@ -180,7 +180,7 @@ modification_statement::create_clustering_prefix_internal(const query_options& o components.push_back(val); } } - return components; + return clustering_prefix(std::move(components)); } clustering_prefix @@ -221,9 +221,9 @@ modification_statement::create_clustering_prefix(const query_options& options) { return create_clustering_prefix_internal(options); } -std::vector +std::vector modification_statement::build_partition_keys(const query_options& options) { - std::vector result; + std::vector result; std::vector components; auto remaining = s->partition_key_size(); @@ -243,9 +243,9 @@ modification_statement::build_partition_keys(const query_options& options) { throw exceptions::invalid_request_exception(sprint("Invalid null value for partition key part %s", def.name_as_text())); } components.push_back(val); - partition_key key = serialize_value(*s->partition_key_type, components); + auto key = partition_key::one::from_exploded(*s, components); validation::validate_cql_key(s, key); - result.push_back(key); + result.emplace_back(std::move(key)); } else { for (auto&& val : values) { if (!val) { @@ -255,9 +255,9 @@ modification_statement::build_partition_keys(const query_options& options) { full_components.reserve(components.size() + 1); auto i = std::copy(components.begin(), components.end(), std::back_inserter(full_components)); *i = val; - partition_key key = serialize_value(*s->partition_key_type, full_components); + auto key = partition_key::one::from_exploded(*s, full_components); validation::validate_cql_key(s, key); - result.push_back(key); + result.emplace_back(std::move(key)); } } } else { diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index 8f5407949c..3bd62776b9 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -255,7 +255,7 @@ private: public: void add_key_value(column_definition& def, ::shared_ptr value); void process_where_clause(std::vector where_clause, ::shared_ptr names); - std::vector build_partition_keys(const query_options& options); + std::vector build_partition_keys(const query_options& options); private: clustering_prefix create_clustering_prefix(const query_options& options); @@ -273,7 +273,7 @@ public: protected: future read_required_rows( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, bool local, db::consistency_level cl); @@ -427,7 +427,7 @@ private: public: future> make_update_parameters( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, const query_options& options, bool local, diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 3375ddd00a..5b84dec166 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -124,7 +124,7 @@ select_statement::process_results(foreign_ptr> resu for (auto&& e : results->partitions) { // FIXME: deserialize into views - auto key = _schema->partition_key_type->deserialize_value(e.first); + auto key = e.first.explode(*_schema); auto& partition = e.second; if (!partition.static_row.empty() && partition.rows.empty() @@ -143,7 +143,7 @@ select_statement::process_results(foreign_ptr> resu } } else { for (auto&& e : partition.rows) { - auto c_key = _schema->clustering_key_type->deserialize_value(e.first); + auto c_key = e.first.explode(*_schema); auto& cells = e.second.cells; uint32_t static_id = 0; uint32_t regular_id = 0; diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index 91a8a636fc..1c685964eb 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -36,7 +36,7 @@ namespace cql3 { class update_parameters final { public: using prefetched_rows_type = std::experimental::optional< - std::unordered_map>; + std::unordered_map>; private: const gc_clock::duration _ttl; const prefetched_rows_type _prefetched; // For operation that require a read-before-write diff --git a/database.cc b/database.cc index eef94d8c64..b55aca3471 100644 --- a/database.cc +++ b/database.cc @@ -82,17 +82,17 @@ schema::schema(sstring ks_name, sstring cf_name, std::vector partition_k column_family::column_family(schema_ptr schema) : _schema(std::move(schema)) - , partitions(key_compare(_schema->thrift.partition_key_type)) { + , partitions(partition_key::one::less_compare(*_schema)) { } mutation_partition* -column_family::find_partition(const bytes& key) { +column_family::find_partition(const partition_key::one& key) { auto i = partitions.find(key); return i == partitions.end() ? nullptr : &i->second; } row* -column_family::find_row(const bytes& partition_key, const bytes& clustering_key) { +column_family::find_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key) { mutation_partition* p = find_partition(partition_key); if (!p) { return nullptr; @@ -101,17 +101,17 @@ column_family::find_row(const bytes& partition_key, const bytes& clustering_key) } mutation_partition& -column_family::find_or_create_partition(const bytes& key) { +column_family::find_or_create_partition(const partition_key::one& key) { // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key); - if (i == partitions.end() || key != i->first) { + if (i == partitions.end() || !key.equal(*_schema, i->first)) { i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema))); } return i->second; } row& -column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) { +column_family::find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key) { mutation_partition& p = find_or_create_partition(partition_key); return p.clustered_row(clustering_key); } @@ -312,6 +312,15 @@ keyspace::find_schema(const sstring& cf_name) { return cf->_schema; } +schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) { + auto ks = find_keyspace(ks_name); + if (!ks) { + return {}; + } + + return ks->find_schema(cf_name); +} + keyspace* database::find_keyspace(const sstring& name) { auto i = keyspaces.find(name); @@ -403,12 +412,14 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { } tombstone -mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key& key) { +mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::one& key) { tombstone t = _tombstone; - auto i = _row_tombstones.lower_bound(key); - if (i != _row_tombstones.end() && schema->clustering_key_prefix_type->is_prefix_of(i->first, key)) { - t.apply(i->second); + // FIXME: Optimize this + for (auto&& e : _row_tombstones) { + if (key.is_prefixed_by(*schema, e.first)) { + t.apply(e.second); + } } auto j = _rows.find(key); @@ -420,10 +431,10 @@ mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key& k } void -mutation_partition::apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { +mutation_partition::apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { auto& prefix = row_tombstone.first; auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first)) { + if (i == _row_tombstones.end() || !prefix.equal(*schema, i->first)) { _row_tombstones.emplace_hint(i, std::move(row_tombstone)); } else if (row_tombstone.second > i->second) { i->second = row_tombstone.second; @@ -432,17 +443,17 @@ mutation_partition::apply_row_tombstone(schema_ptr schema, std::pairclustering_key_size()) { - _rows[schema->clustering_key_type->serialize_value(prefix)].t.apply(t); + } else if (prefix.is_full(*schema)) { + _rows[clustering_key::one::from_clustering_prefix(*schema, prefix)].t.apply(t); } else { - apply_row_tombstone(schema, {schema->clustering_key_prefix_type->serialize_value(prefix), t}); + apply_row_tombstone(schema, {clustering_key::prefix::one::from_clustering_prefix(*schema, prefix), t}); } } row* -mutation_partition::find_row(const clustering_key& key) { +mutation_partition::find_row(const clustering_key::one& key) { auto i = _rows.find(key); if (i == _rows.end()) { return nullptr; @@ -456,7 +467,7 @@ bool column_definition::is_compact_value() const { } std::ostream& operator<<(std::ostream& os, const mutation& m) { - return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), m.key, m.p); + return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), static_cast(m.key), m.p); } std::ostream& operator<<(std::ostream& os, const mutation_partition& mp) { @@ -474,17 +485,19 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p if (!range.is_singular()) { fail(unimplemented::cause::RANGE_QUERIES); } - auto& key = range.start(); - if (!_schema->clustering_key_prefix_type->is_full(key)) { + auto& key = range.start_value(); + if (!key.is_full(*_schema)) { fail(unimplemented::cause::RANGE_QUERIES); } - auto row = partition.find_row(key); + // FIXME: Support looking up by prefixes + auto full_key = key.to_full(*_schema); + auto row = partition.find_row(full_key); if (!row) { continue; } // FIXME: handle removed rows properly. In CQL rows are separate entities (can be live or dead). - auto row_tombstone = partition.tombstone_for_row(_schema, key); + auto row_tombstone = partition.tombstone_for_row(_schema, full_key); query::result::row result_row; result_row.cells.reserve(slice.regular_columns.size()); @@ -509,7 +522,7 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p } } } - result.rows.emplace_back(key, std::move(result_row)); + result.rows.emplace_back(full_key, std::move(result_row)); --limit; } @@ -531,10 +544,7 @@ column_family::query(const query::read_command& cmd) { uint32_t limit = cmd.row_limit; for (auto&& range : cmd.partition_ranges) { if (range.is_singular()) { - auto& key = range.start(); - if (!_schema->partition_key_prefix_type->is_full(key)) { - fail(unimplemented::cause::RANGE_QUERIES); - } + auto& key = range.start_value(); auto partition = find_partition(key); if (!partition) { return make_ready_future>(result); diff --git a/database.hh b/database.hh index ebc650b473..d34f95a6dd 100644 --- a/database.hh +++ b/database.hh @@ -34,15 +34,8 @@ #include "timestamp.hh" #include "tombstone.hh" #include "atomic_cell.hh" -#include "bytes.hh" #include "query.hh" - -using partition_key_type = tuple_type<>; -using clustering_key_type = tuple_type<>; -using clustering_prefix_type = tuple_prefix; -using partition_key = bytes; -using clustering_key = bytes; -using clustering_prefix = clustering_prefix_type::value_type; +#include "keys.hh" using row = std::map; @@ -51,42 +44,42 @@ struct deletable_row final { row cells; }; -using row_tombstone_set = std::map; +using row_tombstone_set = std::map; class mutation_partition final { private: tombstone _tombstone; row _static_row; - std::map _rows; + std::map _rows; row_tombstone_set _row_tombstones; public: mutation_partition(schema_ptr s) - : _rows(key_compare(s->clustering_key_type)) - , _row_tombstones(serialized_compare(s->clustering_key_prefix_type)) + : _rows(clustering_key::one::less_compare(*s)) + , _row_tombstones(clustering_key::prefix::one::less_compare(*s)) { } void apply(tombstone t) { _tombstone.apply(t); } void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t); - void apply_row_tombstone(schema_ptr schema, bytes prefix, tombstone t) { + void apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t) { apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); } - void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone); + void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone); void apply(schema_ptr schema, const mutation_partition& p); const row_tombstone_set& row_tombstones() const { return _row_tombstones; } row& static_row() { return _static_row; } - row& clustered_row(const clustering_key& key) { return _rows[key].cells; } - row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; } - row* find_row(const clustering_key& key); - tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key); + row& clustered_row(const clustering_key::one& key) { return _rows[key].cells; } + row& clustered_row(clustering_key::one&& key) { return _rows[std::move(key)].cells; } + row* find_row(const clustering_key::one& key); + tombstone tombstone_for_row(schema_ptr schema, const clustering_key::one& key); friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); }; class mutation final { public: schema_ptr schema; - partition_key key; + partition_key::one key; mutation_partition p; public: - mutation(partition_key key_, schema_ptr schema_) + mutation(partition_key::one key_, schema_ptr schema_) : schema(std::move(schema_)) , key(std::move(key_)) , p(schema) @@ -100,11 +93,11 @@ public: } void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { - auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); + auto& row = p.clustered_row(clustering_key::one::from_clustering_prefix(*schema, prefix)); update_column(row, def, std::move(value)); } - void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { + void set_clustered_cell(const clustering_key::one& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(key); update_column(row, def, std::move(value)); } @@ -133,13 +126,13 @@ private: struct column_family { column_family(schema_ptr schema); - mutation_partition& find_or_create_partition(const bytes& key); - row& find_or_create_row(const bytes& partition_key, const bytes& clustering_key); - mutation_partition* find_partition(const bytes& key); - row* find_row(const bytes& partition_key, const bytes& clustering_key); + mutation_partition& find_or_create_partition(const partition_key::one& key); + row& find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key); + mutation_partition* find_partition(const partition_key::one& key); + row* find_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key); schema_ptr _schema; // partition key -> partition - std::map partitions; + std::map partitions; void apply(const mutation& m); // Returns at most "cmd.limit" rows future> query(const query::read_command& cmd); @@ -168,6 +161,7 @@ public: future<> init_from_data_directory(sstring datadir); future<> populate(sstring datadir); keyspace* find_keyspace(const sstring& name); + schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name); future<> stop() { return make_ready_future<>(); } void assign(database&& db) { *this = std::move(db); diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 65c6a9c528..16a725ba39 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -25,6 +25,7 @@ #include "core/shared_ptr.hh" #include "types.hh" +#include "keys.hh" #include namespace dht { @@ -66,7 +67,7 @@ token minimum_token(); class decorated_key { public: token _token; - bytes _key; + partition_key::one _key; }; class i_partitioner { @@ -78,10 +79,21 @@ public: * @param key the raw, client-facing key * @return decorated version of key */ - decorated_key decorate_key(const bytes& key) { + decorated_key decorate_key(const partition_key::one& key) { return { get_token(key), key }; } + /** + * Transform key to object representation of the on-disk format. + * + * @param key the raw, client-facing key + * @return decorated version of key + */ + decorated_key decorate_key(partition_key::one&& key) { + auto token = get_token(key); + return { std::move(token), std::move(key) }; + } + /** * Calculate a token representing the approximate "middle" of the given * range. @@ -105,7 +117,7 @@ public: * (This is NOT a method to create a token from its string representation; * for that, use tokenFactory.fromString.) */ - virtual token get_token(const bytes& key) = 0; + virtual token get_token(const partition_key::one& key) = 0; /** * @return a randomly generated token diff --git a/dht/murmur3_partitioner.cc b/dht/murmur3_partitioner.cc index 199456f029..d55ef11258 100644 --- a/dht/murmur3_partitioner.cc +++ b/dht/murmur3_partitioner.cc @@ -16,7 +16,8 @@ murmur3_partitioner::normalize(int64_t in) { } token -murmur3_partitioner::get_token(const bytes& key) { +murmur3_partitioner::get_token(const partition_key::one& key_) { + bytes_view key(key_); if (key.empty()) { return minimum_token(); } diff --git a/dht/murmur3_partitioner.hh b/dht/murmur3_partitioner.hh index 2255ab36b3..12e2ffd6b0 100644 --- a/dht/murmur3_partitioner.hh +++ b/dht/murmur3_partitioner.hh @@ -10,7 +10,7 @@ namespace dht { class murmur3_partitioner final : public i_partitioner { public: - virtual token get_token(const bytes& key) override; + virtual token get_token(const partition_key::one& key) override; virtual bool preserves_order() override { return false; } virtual std::map describe_ownership(const std::vector& sorted_tokens); virtual data_type get_token_validator(); diff --git a/keys.hh b/keys.hh new file mode 100644 index 0000000000..5ac9a607e1 --- /dev/null +++ b/keys.hh @@ -0,0 +1,236 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include "schema.hh" +#include "bytes.hh" +#include "types.hh" + +// +// This header defines type system for primary key holders. +// +// We distinguish partition keys and clustering keys. API-wise they are almost +// the same, but they're separate type hierarchies. +// +// Clustering keys are further divided into prefixed and non-prefixed (full). +// Non-prefixed keys always have full component set, as defined by schema. +// Prefixed ones can have any number of trailing components missing. They may +// differ in underlying representation. +// +// The main classes are: +// +// partition_key::one - full partition key +// clustering_key::one - full clustering key +// clustering_key::prefix::one - clustering key prefix +// +// These classes wrap only the minimum information required to store the key +// (the key value itself). Any information which can be inferred from schema +// is not stored. Therefore accessors need to be provided with a pointer to +// schema, from which information about structure is extracted. + +// FIXME: Keys can't contain nulls, so we could get rid of optionals + +// Abstracts serialized tuple, managed by tuple_type. +template +class tuple_wrapper { +protected: + bytes _bytes; +protected: + tuple_wrapper(bytes&& b) : _bytes(std::move(b)) {} + + static inline auto type(const schema& s) { + return TopLevel::tuple_type(s); + } +public: + static TopLevel make_empty(const schema& s) { + std::vector v; + v.resize(type(s)->types().size()); + return from_exploded(s, v); + } + + static TopLevel from_exploded(const schema& s, const std::vector& v) { + return TopLevel::from_bytes(type(s)->serialize_value(v)); + } + + static TopLevel from_deeply_exploded(const schema& s, const std::vector& v) { + return TopLevel::from_bytes(type(s)->serialize_value_deep(v)); + } + + static TopLevel from_single_value(const schema& s, bytes v) { + // FIXME: optimize + std::vector values; + values.emplace_back(bytes_opt(std::move(v))); + return from_exploded(s, values); + } + + // FIXME: get rid of optional<> and return views + std::vector explode(const schema& s) const { + return type(s)->deserialize_value(_bytes); + } + + struct less_compare { + data_type _t; + less_compare(const schema& s) : _t(type(s)) {} + bool operator()(const TopLevel& k1, const TopLevel& k2) const { + return _t->less(k1, k2); + } + }; + + struct hashing { + data_type _t; + hashing(const schema& s) : _t(type(s)) {} + size_t operator()(const TopLevel& o) const { + return _t->hash(o); + } + }; + + struct equality { + data_type _t; + equality(const schema& s) : _t(type(s)) {} + bool operator()(const TopLevel& o1, const TopLevel& o2) const { + return _t->equal(o1, o2); + } + }; + + bool equal(const schema& s, const TopLevel& other) const { + return type(s)->equal(*this, other); + } + + operator bytes_view() const { + return _bytes; + } +}; + +template +class prefixable_full_tuple : public tuple_wrapper { + using base = tuple_wrapper; +protected: + prefixable_full_tuple(bytes&& b) : base(std::move(b)) {} +public: + bool is_prefixed_by(const schema& s, const PrefixTopLevel& prefix) const { + auto t = base::type(s); + auto prefix_type = PrefixTopLevel::tuple_type(s); + return ::is_prefixed_by(t->types().begin(), + t->begin(*this), t->end(*this), + prefix_type->begin(prefix), prefix_type->end(prefix), + optional_equal); + } + + struct less_compare_with_prefix { + shared_ptr> prefix_type; + shared_ptr> full_type; + + less_compare_with_prefix(const schema& s) + : prefix_type(PrefixTopLevel::tuple_type(s)) + , full_type(TopLevel::tuple_type(s)) + { } + + bool operator()(const TopLevel& k1, const PrefixTopLevel& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + full_type->begin(k1), full_type->end(k1), + prefix_type->begin(k2), prefix_type->end(k2), + optional_less_compare); + } + + bool operator()(const PrefixTopLevel& k1, const TopLevel& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + prefix_type->begin(k1), prefix_type->end(k1), + full_type->begin(k2), full_type->end(k2), + optional_less_compare); + } + }; +}; + +template +class prefix_tuple_wrapper : public tuple_wrapper { + using base = tuple_wrapper; +protected: + prefix_tuple_wrapper(bytes&& b) : base(std::move(b)) {} +public: + bool is_full(const schema& s) const { + return TopLevel::tuple_type(s)->is_full(base::_bytes); + } + + // Can be called only if is_full() + FullTopLevel to_full(const schema& s) const { + return FullTopLevel::from_exploded(s, base::explode(s)); + } + + bool is_prefixed_by(const schema& s, const TopLevel& prefix) const { + auto t = base::type(s); + return ::is_prefixed_by(t->types().begin(), + t->begin(*this), t->end(*this), + t->begin(prefix), t->end(prefix), + optional_equal); + } +}; + +class partition_key { +public: + class one; + + using full_base = tuple_wrapper; + + class one : public full_base { + one(bytes&& b) : full_base(std::move(b)) {} + public: + static one from_bytes(bytes b) { return one(std::move(b)); } + static auto tuple_type(const schema& s) { return s.partition_key_type; } + }; +}; + +class clustering_prefix { + std::vector _v; +public: + clustering_prefix(std::vector&& v) : _v(std::move(v)) {} + clustering_prefix() {} + size_t size() const { + return _v.size(); + } + auto const& components() const { + return _v; + } + explicit operator bool() const { + return !_v.empty(); + } + bool is_full(const schema& s) const { + return _v.size() == s.clustering_key_size(); + } +}; + +class clustering_key { +public: + class one; + + struct prefix { + class one; + }; + + using full_base = prefixable_full_tuple; + using prefix_base = prefix_tuple_wrapper; + + class prefix::one : public prefix_base { + one(bytes&& b) : prefix_base(std::move(b)) {} + public: + static one from_bytes(bytes b) { return one(std::move(b)); } + static auto tuple_type(const schema& s) { return s.clustering_key_prefix_type; } + + static one from_clustering_prefix(const schema& s, const clustering_prefix& prefix) { + return from_exploded(s, prefix.components()); + } + }; + + class one : public full_base { + one(bytes&& b) : full_base(std::move(b)) {} + public: + static one from_bytes(bytes b) { return one(std::move(b)); } + static auto tuple_type(const schema& s) { return s.clustering_key_type; } + + static one from_clustering_prefix(const schema& s, const clustering_prefix& prefix) { + assert(prefix.is_full(s)); + return from_exploded(s, prefix.components()); + } + }; +}; diff --git a/query.hh b/query.hh index ae006323a2..a690308502 100644 --- a/query.hh +++ b/query.hh @@ -1,79 +1,83 @@ #pragma once +#include #include "schema.hh" #include "types.hh" #include "atomic_cell.hh" +#include "keys.hh" namespace query { // A range which can have inclusive, exclusive or open-ended bounds on each end. +template class range { - bytes _start; // empty if range is open on this end - bytes _end; // empty if range is open on this end (_end_inclusive == true) or same as start (_end_inclusive == false) - bool _start_inclusive; - bool _end_inclusive; + template + using optional = std::experimental::optional; public: - range(bytes start, bytes end, bool start_inclusive, bool end_inclusive) + class bound { + T _value; + bool _inclusive; + public: + bound(T value, bool inclusive = true) + : _value(std::move(value)) + , _inclusive(inclusive) + { } + const T& value() const { return _value; } + bool is_inclusive() const { return _inclusive; } + }; +private: + optional _start; + optional _end; + bool _singular; +public: + range(optional start, optional end) : _start(std::move(start)) , _end(std::move(end)) - , _start_inclusive(start_inclusive) - , _end_inclusive(end_inclusive) + , _singular(false) + { } + range(T value) + : _start(bound(std::move(value), true)) + , _end() + , _singular(true) { } public: + static range make(bound start, bound end) { + return range({std::move(start)}, {std::move(end)}); + } static range make_open_ended_both_sides() { - return {{}, {}, true, true}; + return {{}, {}}; } - static range make_singular(bytes key) { - return {std::move(key), {}, true, false}; + static range make_singular(T value) { + return {std::move(value)}; } - static range make_starting_with(bytes key, bool inclusive = true) { - assert(!key.empty()); - return {std::move(key), {}, inclusive, true}; + static range make_starting_with(bound b) { + return {{std::move(b)}, {}}; } - static range make_ending_with(bytes key, bool inclusive = true) { - assert(!key.empty()); - return {{}, std::move(key), true, inclusive}; - } - static range make_both_inclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), true, true}; - } - static range make_inclusive_exclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), true, false}; - } - static range make_exclusive_inclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), false, true}; - } - static range make_both_exclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), false, false}; + static range make_ending_with(bound b) { + return {{}, {std::move(b)}}; } bool is_singular() const { - return _end.empty() && !_end_inclusive; + return _singular; } bool is_full() const { - return _start.empty() && _end.empty(); + return !_start && !_end; } void reverse() { - if (!is_singular()) { + if (!_singular) { std::swap(_start, _end); - std::swap(_end_inclusive, _start_inclusive); } } - const bytes& start() const { - return _start; + const T& start_value() const { + return _start->value(); } - const bytes& end() const { - return _end; + const T& end_value() const { + return _end->value(); } }; +using partition_range = range; +using clustering_range = range; + class result { public: class partition; @@ -81,9 +85,7 @@ public: // TODO: Optimize for singular partition range. In such case the caller // knows the partition key, no need to send it back. - - // std::pair::first is a serialized partition key. - std::vector> partitions; + std::vector> partitions; }; class result::row { @@ -101,9 +103,7 @@ public: // TODO: for some queries we could avoid sending keys back, because the client knows // what the key is (single row query for instance). - // - // std::pair::first is a serialized clustering row key. - std::vector> rows; + std::vector> rows; public: // Returns row count in this result. If there is a static row and no clustering rows, that counts as one row. // Otherwise, if there are some clustering rows, the static row doesn't count. @@ -114,11 +114,11 @@ public: class partition_slice { public: - std::vector row_ranges; + std::vector row_ranges; std::vector static_columns; // TODO: consider using bitmap std::vector regular_columns; // TODO: consider using bitmap public: - partition_slice(std::vector row_ranges, std::vector static_columns, + partition_slice(std::vector row_ranges, std::vector static_columns, std::vector regular_columns) : row_ranges(std::move(row_ranges)) , static_columns(std::move(static_columns)) @@ -130,11 +130,11 @@ class read_command { public: sstring keyspace; sstring column_family; - std::vector partition_ranges; // ranges must be non-overlapping + std::vector partition_ranges; // ranges must be non-overlapping partition_slice slice; uint32_t row_limit; public: - read_command(const sstring& keyspace, const sstring& column_family, std::vector partition_ranges, + read_command(const sstring& keyspace, const sstring& column_family, std::vector partition_ranges, partition_slice slice, uint32_t row_limit) : keyspace(keyspace) , column_family(column_family) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index a91c68ae49..0590f2c010 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1155,8 +1155,12 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc future>> storage_proxy::query(lw_shared_ptr cmd, db::consistency_level cl) { - if (cmd->partition_ranges.empty()) { + static auto make_empty = [] { return make_ready_future>>(make_foreign(make_lw_shared())); + }; + + if (cmd->partition_ranges.empty()) { + return make_empty(); } if (cmd->partition_ranges.size() != 1) { @@ -1167,7 +1171,7 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev auto& range = cmd->partition_ranges[0]; if (range.is_singular()) { - auto& key = range.start(); + auto& key = range.start_value(); auto dk = dht::global_partitioner().decorate_key(key); auto shard = _db.local().shard_of(dk._token); return _db.invoke_on(shard, [cmd] (database& db) { diff --git a/tests/perf/perf_mutation.cc b/tests/perf/perf_mutation.cc index 08a83755d4..4231e86376 100644 --- a/tests/perf/perf_mutation.cc +++ b/tests/perf/perf_mutation.cc @@ -13,8 +13,8 @@ int main(int argc, char* argv[]) { std::cout << "Timing mutation of single column within one row...\n"; - partition_key key = to_bytes("key1"); - clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::one::from_exploded(*s, {int32_type->decompose(2)}); bytes value = int32_type->decompose(3); time_it([&] { diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index f6649f45d3..d3894b1669 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -41,7 +41,7 @@ static future<> require_column_has_value(distributed& ddb, const sstri auto cf = ks->find_column_family(table_name); assert(cf != nullptr); auto schema = cf->_schema; - auto pkey = schema->partition_key_type->serialize_value_deep(pk); + auto pkey = partition_key::one::from_deeply_exploded(*schema, pk); auto dk = dht::global_partitioner().decorate_key(pkey); auto shard = db.shard_of(dk._token); return ddb.invoke_on(shard, [pkey = std::move(pkey), @@ -57,7 +57,7 @@ static future<> require_column_has_value(distributed& ddb, const sstri auto schema = cf->_schema; auto p = cf->find_partition(pkey); assert(p != nullptr); - auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck)); + auto row = p->find_row(clustering_key::one::from_deeply_exploded(*schema, ck)); assert(row != nullptr); auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); assert(col_def != nullptr); diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 41a1188ff9..01cce2119a 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -24,8 +24,8 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { column_family cf(s); column_definition& r1_col = *s->get_column_definition("r1"); - partition_key key = to_bytes("key1"); - clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::one::from_exploded(*s, {int32_type->decompose(2)}); mutation m(key, s); m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3))); @@ -41,30 +41,26 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, - {{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type)); + {{"p1", utf8_type}}, {{"c1", int32_type}, {"c2", int32_type}}, {{"r1", int32_type}}, {}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); - - clustering_key c_key1 = s->clustering_key_type->decompose_value( - {int32_type->decompose(1)} - ); - - clustering_key c_key2 = s->clustering_key_type->decompose_value( - {int32_type->decompose(2)} - ); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); + auto c_key1 = clustering_key::one::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(0)}); + auto c_key1_prefix = clustering_key::prefix::one::from_exploded(*s, {int32_type->decompose(1)}); + auto c_key2 = clustering_key::one::from_exploded(*s, {int32_type->decompose(2), int32_type->decompose(0)}); + auto c_key2_prefix = clustering_key::prefix::one::from_exploded(*s, {int32_type->decompose(2)}); auto ttl = gc_clock::now() + std::chrono::seconds(1); mutation m(key, s); - m.p.apply_row_tombstone(s, c_key1, tombstone(1, ttl)); - m.p.apply_row_tombstone(s, c_key2, tombstone(0, ttl)); + m.p.apply_row_tombstone(s, c_key1_prefix, tombstone(1, ttl)); + m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(0, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key1), tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(0, ttl)); - m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl)); + m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl)); } @@ -73,7 +69,7 @@ BOOST_AUTO_TEST_CASE(test_map_mutations) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_map_type}}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}}; mutation m1(key, s); @@ -106,7 +102,7 @@ BOOST_AUTO_TEST_CASE(test_set_mutations) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_set_type}}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell({})}}; mutation m1(key, s); @@ -139,8 +135,7 @@ BOOST_AUTO_TEST_CASE(test_list_mutations) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_list_type}}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); - auto key_type = timeuuid_type; + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); }; collection_type_impl::mutation mmut1{{make_key(), make_atomic_cell(int32_type->decompose(101))}}; diff --git a/thrift/handler.cc b/thrift/handler.cc index a937187a3a..9c64d952a4 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -107,13 +107,20 @@ public: } void get_slice(tcxx::function const& _return)> cob, tcxx::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { - auto keyb = to_bytes(key); + auto& ks = lookup_keyspace(_db.local(), _ks_name); + auto schema = ks.find_schema(column_parent.column_family); + if (!_ks) { + return complete_with_exception(std::move(exn_cob), "column family %s not found", column_parent.column_family); + } + auto pk = key_from_thrift(schema, to_bytes(key)); + auto dk = dht::global_partitioner().decorate_key(pk); + auto shard = _db.local().shard_of(dk._token); + auto do_get = [this, - key = std::move(key), + pk = std::move(pk), column_parent = std::move(column_parent), predicate = std::move(predicate)] (database& db) { std::vector ret; - auto keyb = to_bytes(key); if (!column_parent.super_column.empty()) { throw unimplemented_exception(); } @@ -123,7 +130,7 @@ public: throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { auto&& range = predicate.slice_range; - row* rw = cf.find_row(keyb, bytes()); + row* rw = cf.find_row(pk, clustering_key::one::make_empty(*cf._schema)); if (rw) { auto beg = cf._schema->regular_begin(); if (!range.start.empty()) { @@ -157,8 +164,6 @@ public: throw make_exception("empty SlicePredicate"); } }; - auto dk = dht::global_partitioner().decorate_key(keyb); - auto shard = _db.local().shard_of(dk._token); _db.invoke_on(shard, [do_get = std::move(do_get)] (database& db) { return do_get(db); }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] @@ -233,20 +238,20 @@ public: if (!_ks) { return complete_with_exception(std::move(exn_cob), "keyspace not set"); } - static bytes null_clustering_key = to_bytes(""); // Would like to use move_iterator below, but Mutation is filled with some const stuff. parallel_for_each(mutation_map.begin(), mutation_map.end(), [this] (std::pair>> key_cf) { - bytes key = to_bytes(key_cf.first); + bytes thrift_key = to_bytes(key_cf.first); std::map>& cf_mutations_map = key_cf.second; return parallel_for_each( boost::make_move_iterator(cf_mutations_map.begin()), boost::make_move_iterator(cf_mutations_map.end()), - [this, key] (std::pair> cf_mutations) { + [this, thrift_key] (std::pair> cf_mutations) { sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; auto& cf = lookup_column_family(*_ks, cf_name); - mutation m_to_apply(key, cf._schema); + mutation m_to_apply(key_from_thrift(cf._schema, thrift_key), cf._schema); + auto empty_clustering_key = clustering_key::one::make_empty(*cf._schema); for (const Mutation& m : mutations) { if (m.__isset.column_or_supercolumn) { auto&& cosc = m.column_or_supercolumn; @@ -268,7 +273,7 @@ public: ttl = cf._schema->default_time_to_live; } auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); - m_to_apply.set_clustered_cell(null_clustering_key, *def, + m_to_apply.set_clustered_cell(empty_clustering_key, *def, atomic_cell::one::make_live(col.timestamp, ttl_option, to_bytes(col.value))); } else if (cosc.__isset.super_column) { // FIXME: implement @@ -291,7 +296,7 @@ public: return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { auto& ks = db.keyspaces.at(_ks_name); auto& cf = ks.column_families.at(cf_name); - cf.apply(std::move(m_to_apply)); + cf.apply(m_to_apply); }); }); }).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) { @@ -513,13 +518,19 @@ private: throw make_exception("column family %s not found", cf_name); } } - keyspace& lookup_keyspace(database& db, const sstring ks_name) { + static keyspace& lookup_keyspace(database& db, const sstring& ks_name) { try { return db.keyspaces.at(ks_name); } catch (std::out_of_range&) { throw make_exception("Keyspace %s not found", ks_name); } } + static partition_key::one key_from_thrift(schema_ptr s, bytes k) { + if (s->partition_key_size() != 1) { + fail(unimplemented::cause::THRIFT); + } + return partition_key::one::from_single_value(*s, std::move(k)); + } }; class handler_factory : public CassandraCobSvIfFactory { diff --git a/tuple.hh b/tuple.hh index 9433297221..d90463d0c8 100644 --- a/tuple.hh +++ b/tuple.hh @@ -32,6 +32,10 @@ public: tuple_type(tuple_type&&) = default; + auto const& types() { + return _types; + } + prefix_type as_prefix() { return prefix_type(_types); } diff --git a/unimplemented.cc b/unimplemented.cc index 0b9aa3c3c0..b3b9a86306 100644 --- a/unimplemented.cc +++ b/unimplemented.cc @@ -29,6 +29,7 @@ std::ostream& operator<<(std::ostream& out, cause c) { case cause::LEGACY_COMPOSITE_KEYS: return out << "LEGACY_COMPOSITE_KEYS"; case cause::COLLECTION_RANGE_TOMBSTONES: return out << "COLLECTION_RANGE_TOMBSTONES"; case cause::RANGE_QUERIES: return out << "RANGE_QUERIES"; + case cause::THRIFT: return out << "THRIFT"; } assert(0); } diff --git a/unimplemented.hh b/unimplemented.hh index fb4a182c16..42b6f1194c 100644 --- a/unimplemented.hh +++ b/unimplemented.hh @@ -27,7 +27,8 @@ enum class cause { TOKEN_RESTRICTION, LEGACY_COMPOSITE_KEYS, COLLECTION_RANGE_TOMBSTONES, - RANGE_QUERIES + RANGE_QUERIES, + THRIFT }; void fail(cause what) __attribute__((noreturn)); diff --git a/validation.cc b/validation.cc index 7963b3c2e6..34d20e0420 100644 --- a/validation.cc +++ b/validation.cc @@ -31,18 +31,19 @@ namespace validation { * Based on org.apache.cassandra.thrift.ThriftValidation#validate_key() */ void -validate_cql_key(schema_ptr schema, const partition_key& key) { - if (key.empty()) { +validate_cql_key(schema_ptr schema, const partition_key::one& key) { + bytes_view b(key); + if (b.empty()) { throw exceptions::invalid_request_exception("Key may not be empty"); } // check that key can be handled by FBUtilities.writeShortByteArray - if (key.size() > max_key_size) { - throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", key.size(), max_key_size)); + if (b.size() > max_key_size) { + throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", b.size(), max_key_size)); } try { - schema->partition_key_type->validate(key); + schema->partition_key_type->validate(b); } catch (const marshal_exception& e) { throw exceptions::invalid_request_exception(e.why()); } diff --git a/validation.hh b/validation.hh index f19e7723f0..5ff3fe2291 100644 --- a/validation.hh +++ b/validation.hh @@ -31,7 +31,7 @@ namespace validation { constexpr size_t max_key_size = std::numeric_limits::max(); -void validate_cql_key(schema_ptr schema, const partition_key& key); +void validate_cql_key(schema_ptr schema, const partition_key::one& key); schema_ptr validate_column_family(database& db, const sstring& keyspace_name, const sstring& cf_name); } From 9f60853271712787c17a921761d6b40ad1afd668 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 14 Mar 2015 13:32:40 +0100 Subject: [PATCH 10/13] db: Switch clustering key map and row tombstones to boost::intrusive::set std::map<> does not support lookup using different comparator than the one used to compare keys. For range prefix queries and for row prefix tombstone queries we will need to perform lookups using different comparators. --- database.cc | 115 ++++++++++++++++++++++++++++++++----------- database.hh | 139 +++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 213 insertions(+), 41 deletions(-) diff --git a/database.cc b/database.cc index b55aca3471..8d03045996 100644 --- a/database.cc +++ b/database.cc @@ -372,12 +372,17 @@ merge_column(const column_definition& def, } } +mutation_partition::~mutation_partition() { + _rows.clear_and_dispose(std::default_delete()); + _row_tombstones.clear_and_dispose(std::default_delete()); +} + void mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); - for (auto&& entry : p._row_tombstones) { - apply_row_tombstone(schema, entry); + for (auto&& e : p._row_tombstones) { + apply_row_tombstone(schema, e.prefix(), e.t()); } auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) { @@ -400,13 +405,14 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { merge_cells(_static_row, p._static_row, find_static_column_def); for (auto&& entry : p._rows) { - auto& key = entry.first; - auto i = _rows.find(key); + auto& key = entry.key(); + auto i = _rows.find(key, rows_entry::compare(*schema)); if (i == _rows.end()) { - _rows.emplace_hint(i, entry); + auto e = new rows_entry(entry); + _rows.insert(i, *e); } else { - i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, entry.second.cells, find_regular_column_def); + i->apply(entry.row().t); + merge_cells(i->row().cells, entry.row().cells, find_regular_column_def); } } } @@ -417,27 +423,47 @@ mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::o // FIXME: Optimize this for (auto&& e : _row_tombstones) { - if (key.is_prefixed_by(*schema, e.first)) { - t.apply(e.second); + if (key.is_prefixed_by(*schema, e.prefix())) { + t.apply(e.t()); } } - auto j = _rows.find(key); + auto j = _rows.find(key, rows_entry::compare_key(*schema)); if (j != _rows.end()) { - t.apply(j->second.t); + t.apply(j->row().t); + } + + return t; +} + +tombstone +mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::prefix::one& key) { + assert(key.is_full(*schema)); + tombstone t = _tombstone; + + // FIXME: Optimize this + for (auto&& e : _row_tombstones) { + if (key.is_prefixed_by(*schema, e.prefix())) { + t.apply(e.t()); + } + } + + auto j = _rows.find(key, rows_entry::compare_prefix(*schema)); + if (j != _rows.end()) { + t.apply(j->row().t); } return t; } void -mutation_partition::apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { - auto& prefix = row_tombstone.first; - auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !prefix.equal(*schema, i->first)) { - _row_tombstones.emplace_hint(i, std::move(row_tombstone)); - } else if (row_tombstone.second > i->second) { - i->second = row_tombstone.second; +mutation_partition::apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t) { + auto i = _row_tombstones.lower_bound(prefix, row_tombstones_entry::compare(*schema)); + if (i == _row_tombstones.end() || !prefix.equal(*schema, i->prefix())) { + auto e = new row_tombstones_entry(std::move(prefix), t); + _row_tombstones.insert(i, *e); + } else { + i->apply(t); } } @@ -446,19 +472,51 @@ mutation_partition::apply_delete(schema_ptr schema, const clustering_prefix& pre if (!prefix) { apply(t); } else if (prefix.is_full(*schema)) { - _rows[clustering_key::one::from_clustering_prefix(*schema, prefix)].t.apply(t); + apply_delete(schema, clustering_key::one::from_clustering_prefix(*schema, prefix), t); } else { - apply_row_tombstone(schema, {clustering_key::prefix::one::from_clustering_prefix(*schema, prefix), t}); + apply_row_tombstone(schema, clustering_key::prefix::one::from_clustering_prefix(*schema, prefix), t); } } +void +mutation_partition::apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t) { + auto i = _rows.lower_bound(key, rows_entry::compare(*schema)); + if (i == _rows.end() || !i->key().equal(*schema, key)) { + auto e = new rows_entry(std::move(key)); + e->row().apply(t); + _rows.insert(i, *e); + } else { + i->row().apply(t); + } +} + +rows_entry* +mutation_partition::find_entry(schema_ptr schema, const clustering_key::prefix::one& key) { + auto i = _rows.find(key, rows_entry::compare_prefix(*schema)); + if (i == _rows.end()) { + return nullptr; + } + return &*i; +} + row* mutation_partition::find_row(const clustering_key::one& key) { auto i = _rows.find(key); if (i == _rows.end()) { return nullptr; } - return &i->second.cells; + return &i->row().cells; +} + +row& +mutation_partition::clustered_row(const clustering_key::one& key) { + auto i = _rows.find(key); + if (i == _rows.end()) { + auto e = new rows_entry(key); + _rows.insert(i, *e); + return e->row().cells; + } + return i->row().cells; } bool column_definition::is_compact_value() const { @@ -489,22 +547,23 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p if (!key.is_full(*_schema)) { fail(unimplemented::cause::RANGE_QUERIES); } - // FIXME: Support looking up by prefixes - auto full_key = key.to_full(*_schema); - auto row = partition.find_row(full_key); + + rows_entry* row = partition.find_entry(_schema, key); if (!row) { continue; } + auto&& cells = &row->row().cells; + // FIXME: handle removed rows properly. In CQL rows are separate entities (can be live or dead). - auto row_tombstone = partition.tombstone_for_row(_schema, full_key); + auto row_tombstone = partition.tombstone_for_row(_schema, row->key()); query::result::row result_row; result_row.cells.reserve(slice.regular_columns.size()); for (auto id : slice.regular_columns) { - auto i = row->find(id); - if (i == row->end()) { + auto i = cells->find(id); + if (i == cells->end()) { result_row.cells.emplace_back(); } else { auto def = _schema->regular_column_at(id); @@ -522,7 +581,7 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p } } } - result.rows.emplace_back(full_key, std::move(result_row)); + result.rows.emplace_back(row->key(), std::move(result_row)); --limit; } diff --git a/database.hh b/database.hh index d34f95a6dd..7344f23e3b 100644 --- a/database.hh +++ b/database.hh @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -36,40 +35,153 @@ #include "atomic_cell.hh" #include "query.hh" #include "keys.hh" +#include using row = std::map; struct deletable_row final { tombstone t; row cells; + + void apply(tombstone t_) { + t.apply(t_); + } }; -using row_tombstone_set = std::map; +class row_tombstones_entry : public boost::intrusive::set_base_hook<> { + clustering_key::prefix::one _prefix; + tombstone _t; +public: + row_tombstones_entry(clustering_key::prefix::one&& prefix, tombstone t) + : _prefix(std::move(prefix)) + , _t(std::move(t)) + { } + clustering_key::prefix::one& prefix() { + return _prefix; + } + const clustering_key::prefix::one& prefix() const { + return _prefix; + } + tombstone& t() { + return _t; + } + const tombstone& t() const { + return _t; + } + void apply(tombstone t) { + _t.apply(t); + } + struct compare { + clustering_key::prefix::one::less_compare _c; + compare(const schema& s) : _c(s) {} + bool operator()(const row_tombstones_entry& e1, const row_tombstones_entry& e2) const { + return _c(e1._prefix, e2._prefix); + } + bool operator()(const clustering_key::prefix::one& prefix, const row_tombstones_entry& e) const { + return _c(prefix, e._prefix); + } + bool operator()(const row_tombstones_entry& e, const clustering_key::prefix::one& prefix) const { + return _c(e._prefix, prefix); + } + }; + template + struct delegating_compare { + Comparator _c; + delegating_compare(Comparator&& c) : _c(std::move(c)) {} + template + bool operator()(const Comparable& prefix, const row_tombstones_entry& e) const { + return _c(prefix, e._prefix); + } + template + bool operator()(const row_tombstones_entry& e, const Comparable& prefix) const { + return _c(e._prefix, prefix); + } + }; + template + static auto key_comparator(Comparator&& c) { + return delegating_compare(std::move(c)); + } +}; + +class rows_entry : public boost::intrusive::set_base_hook<> { + clustering_key::one _key; + deletable_row _row; +public: + rows_entry(clustering_key::one&& key) + : _key(std::move(key)) + { } + rows_entry(const clustering_key::one& key) + : _key(key) + { } + rows_entry(const rows_entry& e) + : _key(e._key) + , _row(e._row) + { } + clustering_key::one& key() { + return _key; + } + const clustering_key::one& key() const { + return _key; + } + deletable_row& row() { + return _row; + } + const deletable_row& row() const { + return _row; + } + void apply(tombstone t) { + _row.apply(t); + } + struct compare { + clustering_key::one::less_compare _c; + compare(const schema& s) : _c(s) {} + bool operator()(const rows_entry& e1, const rows_entry& e2) const { + return _c(e1._key, e2._key); + } + bool operator()(const clustering_key::one& key, const rows_entry& e) const { + return _c(key, e._key); + } + bool operator()(const rows_entry& e, const clustering_key::one& key) const { + return _c(e._key, key); + } + }; + struct compare_prefix { + clustering_key::one::less_compare_with_prefix _c; + compare_prefix(const schema& s) : _c(s) {} + bool operator()(const clustering_key::prefix::one& prefix, const rows_entry& e) const { + return _c(prefix, e._key); + } + bool operator()(const rows_entry& e, const clustering_key::prefix::one& prefix) const { + return _c(e._key, prefix); + } + }; +}; class mutation_partition final { private: tombstone _tombstone; row _static_row; - std::map _rows; - row_tombstone_set _row_tombstones; + boost::intrusive::set> _rows; + boost::intrusive::set> _row_tombstones; public: mutation_partition(schema_ptr s) - : _rows(clustering_key::one::less_compare(*s)) - , _row_tombstones(clustering_key::prefix::one::less_compare(*s)) + : _rows(rows_entry::compare(*s)) + , _row_tombstones(row_tombstones_entry::compare(*s)) { } + mutation_partition(mutation_partition&&) = default; + ~mutation_partition(); void apply(tombstone t) { _tombstone.apply(t); } void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t); - void apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t) { - apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); - } - void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone); + void apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t); + void apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t); void apply(schema_ptr schema, const mutation_partition& p); - const row_tombstone_set& row_tombstones() const { return _row_tombstones; } row& static_row() { return _static_row; } - row& clustered_row(const clustering_key::one& key) { return _rows[key].cells; } - row& clustered_row(clustering_key::one&& key) { return _rows[std::move(key)].cells; } + row& clustered_row(const clustering_key::one& key); row* find_row(const clustering_key::one& key); + row* find_row(schema_ptr schema, const clustering_key::prefix::one& key); + rows_entry* find_entry(schema_ptr schema, const clustering_key::prefix::one& key); tombstone tombstone_for_row(schema_ptr schema, const clustering_key::one& key); + tombstone tombstone_for_row(schema_ptr schema, const clustering_key::prefix::one& key); friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); }; @@ -126,6 +238,7 @@ private: struct column_family { column_family(schema_ptr schema); + column_family(column_family&&) = default; mutation_partition& find_or_create_partition(const partition_key::one& key); row& find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key); mutation_partition* find_partition(const partition_key::one& key); From 9d1fe2c8d9e88c5f9ec930bee01ebadbe057808c Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 17 Mar 2015 14:55:28 +0100 Subject: [PATCH 11/13] types: Rename tuple_type::component_iterator to 'iterator' --- tuple.hh | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tuple.hh b/tuple.hh index d90463d0c8..5064877433 100644 --- a/tuple.hh +++ b/tuple.hh @@ -79,7 +79,7 @@ public: bytes decompose_value(const value_type& values) { return ::serialize_value(*this, values); } - class component_iterator : public std::iterator> { + class iterator : public std::iterator> { private: ssize_t _types_left; bytes_view _v; @@ -115,27 +115,27 @@ public: } public: struct end_iterator_tag {}; - component_iterator(const tuple_type& t, const bytes_view& v) : _types_left(t._types.size()), _v(v) { + iterator(const tuple_type& t, const bytes_view& v) : _types_left(t._types.size()), _v(v) { read_current(); } - component_iterator(end_iterator_tag, const bytes_view& v) : _v(nullptr, 0) {} - component_iterator& operator++() { + iterator(end_iterator_tag, const bytes_view& v) : _v(nullptr, 0) {} + iterator& operator++() { --_types_left; read_current(); return *this; } const value_type& operator*() const { return _current; } - bool operator!=(const component_iterator& i) const { return _v.begin() != i._v.begin(); } - bool operator==(const component_iterator& i) const { return _v.begin() == i._v.begin(); } + bool operator!=(const iterator& i) const { return _v.begin() != i._v.begin(); } + bool operator==(const iterator& i) const { return _v.begin() == i._v.begin(); } }; - component_iterator begin(const bytes_view& v) const { - return component_iterator(*this, v); + iterator begin(const bytes_view& v) const { + return iterator(*this, v); } - component_iterator end(const bytes_view& v) const { - return component_iterator(typename component_iterator::end_iterator_tag(), v); + iterator end(const bytes_view& v) const { + return iterator(typename iterator::end_iterator_tag(), v); } auto iter_items(const bytes_view& v) { - return boost::iterator_range(begin(v), end(v)); + return boost::iterator_range(begin(v), end(v)); } value_type deserialize_value(bytes_view v) { std::vector result; From 6197c5306db846caa3f9d7e947400dc68a68e359 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 17 Mar 2015 15:17:10 +0100 Subject: [PATCH 12/13] db: Optimize range tombstone lookups From O(N) to O(log(N)) where N is the number of range tombstones. --- database.cc | 35 ++++++++++------------------------- database.hh | 3 +++ keys.hh | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/database.cc b/database.cc index 8d03045996..e79646bc66 100644 --- a/database.cc +++ b/database.cc @@ -421,34 +421,18 @@ tombstone mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::one& key) { tombstone t = _tombstone; - // FIXME: Optimize this - for (auto&& e : _row_tombstones) { - if (key.is_prefixed_by(*schema, e.prefix())) { - t.apply(e.t()); + auto c = row_tombstones_entry::key_comparator( + clustering_key::one::prefix_view_type::less_compare_with_prefix(*schema)); + + // _row_tombstones contains only strict prefixes + for (unsigned prefix_len = 1; prefix_len < schema->clustering_key_size(); ++prefix_len) { + auto i = _row_tombstones.find(key.prefix_view(*schema, prefix_len), c); + if (i != _row_tombstones.end()) { + t.apply(i->t()); } } - auto j = _rows.find(key, rows_entry::compare_key(*schema)); - if (j != _rows.end()) { - t.apply(j->row().t); - } - - return t; -} - -tombstone -mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::prefix::one& key) { - assert(key.is_full(*schema)); - tombstone t = _tombstone; - - // FIXME: Optimize this - for (auto&& e : _row_tombstones) { - if (key.is_prefixed_by(*schema, e.prefix())) { - t.apply(e.t()); - } - } - - auto j = _rows.find(key, rows_entry::compare_prefix(*schema)); + auto j = _rows.find(key, rows_entry::compare(*schema)); if (j != _rows.end()) { t.apply(j->row().t); } @@ -458,6 +442,7 @@ mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::p void mutation_partition::apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t) { + assert(!prefix.is_full(*schema)); auto i = _row_tombstones.lower_bound(prefix, row_tombstones_entry::compare(*schema)); if (i == _row_tombstones.end() || !prefix.equal(*schema, i->prefix())) { auto e = new row_tombstones_entry(std::move(prefix), t); diff --git a/database.hh b/database.hh index 7344f23e3b..486ca42788 100644 --- a/database.hh +++ b/database.hh @@ -162,6 +162,8 @@ private: tombstone _tombstone; row _static_row; boost::intrusive::set> _rows; + // Contains only strict prefixes so that we don't have to lookup full keys + // in both _row_tombstones and _rows. boost::intrusive::set> _row_tombstones; public: mutation_partition(schema_ptr s) @@ -173,6 +175,7 @@ public: void apply(tombstone t) { _tombstone.apply(t); } void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t); void apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t); + // prefix must not be full void apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t); void apply(schema_ptr schema, const mutation_partition& p); row& static_row() { return _static_row; } diff --git a/keys.hh b/keys.hh index 5ac9a607e1..d241a94a7f 100644 --- a/keys.hh +++ b/keys.hh @@ -103,12 +103,59 @@ public: } }; +template +class prefix_view_on_full_tuple { +public: + using iterator = typename tuple_type::iterator; +private: + bytes_view _b; + unsigned _prefix_len; + iterator _begin; + iterator _end; +public: + prefix_view_on_full_tuple(const schema& s, bytes_view b, unsigned prefix_len) + : _b(b) + , _prefix_len(prefix_len) + , _begin(TopLevel::tuple_type(s)->begin(_b)) + , _end(_begin) + { + std::advance(_end, prefix_len); + } + + iterator begin() const { return _begin; } + iterator end() const { return _end; } + + struct less_compare_with_prefix { + shared_ptr> prefix_type; + + less_compare_with_prefix(const schema& s) + : prefix_type(PrefixTopLevel::tuple_type(s)) + { } + + bool operator()(const prefix_view_on_full_tuple& k1, const PrefixTopLevel& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + k1.begin(), k1.end(), + prefix_type->begin(k2), prefix_type->end(k2), + optional_less_compare); + } + + bool operator()(const PrefixTopLevel& k1, const prefix_view_on_full_tuple& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + prefix_type->begin(k1), prefix_type->end(k1), + k2.begin(), k2.end(), + optional_less_compare); + } + }; +}; + template class prefixable_full_tuple : public tuple_wrapper { using base = tuple_wrapper; protected: prefixable_full_tuple(bytes&& b) : base(std::move(b)) {} public: + using prefix_view_type = prefix_view_on_full_tuple; + bool is_prefixed_by(const schema& s, const PrefixTopLevel& prefix) const { auto t = base::type(s); auto prefix_type = PrefixTopLevel::tuple_type(s); @@ -141,6 +188,10 @@ public: optional_less_compare); } }; + + auto prefix_view(const schema& s, unsigned prefix_len) const { + return prefix_view_type(s, *this, prefix_len); + } }; template From 83e91d6de4496c1c47123a63cb346c85966e0615 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 17 Mar 2015 15:19:30 +0100 Subject: [PATCH 13/13] tests: Add more tests for range tombstones --- tests/urchin/mutation_test.cc | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 01cce2119a..edf184411d 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -39,6 +39,40 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3))); } +BOOST_AUTO_TEST_CASE(test_multi_level_row_tombstones) { + auto s = make_lw_shared(schema(some_keyspace, some_column_family, + {{"p1", utf8_type}}, + {{"c1", int32_type}, {"c2", int32_type}, {"c3", int32_type}}, + {{"r1", int32_type}}, {}, utf8_type)); + + auto ttl = gc_clock::now() + std::chrono::seconds(1); + + mutation m(partition_key::one::from_exploded(*s, {to_bytes("key1")}), s); + + auto make_prefix = [s] (const std::vector& v) { + return clustering_key::prefix::one::from_deeply_exploded(*s, v); + }; + auto make_key = [s] (const std::vector& v) { + return clustering_key::one::from_deeply_exploded(*s, v); + }; + + m.p.apply_row_tombstone(s, make_prefix({1, 2}), tombstone(9, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 3})), tombstone(9, ttl)); + + m.p.apply_row_tombstone(s, make_prefix({1, 3}), tombstone(8, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(9, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(8, ttl)); + + m.p.apply_row_tombstone(s, make_prefix({1}), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(11, ttl)); + + m.p.apply_row_tombstone(s, make_prefix({1, 4}), tombstone(6, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 4, 0})), tombstone(11, ttl)); +} + BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}, {"c2", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));