diff --git a/cql3/query_options.cc b/cql3/query_options.cc index 06117a90f1..6ec898e111 100644 --- a/cql3/query_options.cc +++ b/cql3/query_options.cc @@ -135,21 +135,6 @@ query_options::query_options(std::vector values) db::consistency_level::ONE, infinite_timeout_config, std::move(values)) {} -db::consistency_level query_options::get_consistency() const -{ - return _consistency; -} - -cql3::raw_value_view query_options::get_value_at(size_t idx) const -{ - return _value_views.at(idx); -} - -size_t query_options::get_values_count() const -{ - return _value_views.size(); -} - cql3::raw_value_view query_options::make_temporary(cql3::raw_value value) const { if (value) { @@ -178,56 +163,6 @@ bytes_view query_options::linearize(fragmented_temporary_buffer::view view) cons } } -bool query_options::skip_metadata() const -{ - return _skip_metadata; -} - -int32_t query_options::get_page_size() const -{ - return get_specific_options().page_size; -} - -::shared_ptr query_options::get_paging_state() const -{ - return get_specific_options().state; -} - -std::experimental::optional query_options::get_serial_consistency() const -{ - return get_specific_options().serial_consistency; -} - -api::timestamp_type query_options::get_timestamp(service::query_state& state) const -{ - auto tstamp = get_specific_options().timestamp; - return tstamp != api::missing_timestamp ? tstamp : state.get_timestamp(); -} - -int query_options::get_protocol_version() const -{ - return _cql_serialization_format.protocol_version(); -} - -cql_serialization_format query_options::get_cql_serialization_format() const -{ - return _cql_serialization_format; -} - -const query_options::specific_options& query_options::get_specific_options() const -{ - return _options; -} - -const query_options& query_options::for_statement(size_t i) const -{ - if (!_batch_options) { - // No per-statement options supplied, so use the "global" options - return *this; - } - return _batch_options->at(i); -} - void query_options::prepare(const std::vector<::shared_ptr>& specs) { if (!_names) { diff --git a/cql3/query_options.hh b/cql3/query_options.hh index ac2c3cc4fd..e959e55a49 100644 --- a/cql3/query_options.hh +++ b/cql3/query_options.hh @@ -156,34 +156,76 @@ public: std::vector values, specific_options options = specific_options::DEFAULT); explicit query_options(std::unique_ptr, ::shared_ptr paging_state); - db::consistency_level get_consistency() const; const timeout_config& get_timeout_config() const { return _timeout_config; } - cql3::raw_value_view get_value_at(size_t idx) const; + + db::consistency_level get_consistency() const { + return _consistency; + } + + cql3::raw_value_view get_value_at(size_t idx) const { + return _value_views.at(idx); + } + + size_t get_values_count() const { + return _value_views.size(); + } + cql3::raw_value_view make_temporary(cql3::raw_value value) const; bytes_view linearize(fragmented_temporary_buffer::view) const; - size_t get_values_count() const; - bool skip_metadata() const; - /** The pageSize for this query. Will be <= 0 if not relevant for the query. */ - int32_t get_page_size() const; + + bool skip_metadata() const { + return _skip_metadata; + } + + int32_t get_page_size() const { + return get_specific_options().page_size; + } + /** The paging state for this query, or null if not relevant. */ - ::shared_ptr get_paging_state() const; + ::shared_ptr get_paging_state() const { + return get_specific_options().state; + } + /** Serial consistency for conditional updates. */ - std::experimental::optional get_serial_consistency() const; + std::experimental::optional get_serial_consistency() const { + return get_specific_options().serial_consistency; + } + + api::timestamp_type get_timestamp(service::query_state& state) const { + auto tstamp = get_specific_options().timestamp; + return tstamp != api::missing_timestamp ? tstamp : state.get_timestamp(); + } + + /** + * The protocol version for the query. Will be 3 if the object don't come from + * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift). + */ + int get_protocol_version() const { + return _cql_serialization_format.protocol_version(); + } + + cql_serialization_format get_cql_serialization_format() const { + return _cql_serialization_format; + } + + const query_options::specific_options& get_specific_options() const { + return _options; + } + + // Mainly for the sake of BatchQueryOptions + const query_options& for_statement(size_t i) const { + if (!_batch_options) { + // No per-statement options supplied, so use the "global" options + return *this; + } + return _batch_options->at(i); + } + const std::experimental::optional>& get_names() const noexcept { return _names; } - api::timestamp_type get_timestamp(service::query_state& state) const; - /** - * The protocol version for the query. Will be 3 if the object don't come from - * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift). - */ - int get_protocol_version() const; - cql_serialization_format get_cql_serialization_format() const; - // Mainly for the sake of BatchQueryOptions - const specific_options& get_specific_options() const; - const query_options& for_statement(size_t i) const; void prepare(const std::vector<::shared_ptr>& specs); private: void fill_value_views(); diff --git a/cql3/result_set.cc b/cql3/result_set.cc index 5f12167da1..604f7e5ffa 100644 --- a/cql3/result_set.cc +++ b/cql3/result_set.cc @@ -45,27 +45,25 @@ namespace cql3 { metadata::metadata(std::vector<::shared_ptr> names_) : _flags(flag_enum_set()) - , names(std::move(names_)) { - _column_count = names.size(); -} + , _column_info(make_lw_shared(std::move(names_))) +{ } metadata::metadata(flag_enum_set flags, std::vector<::shared_ptr> names_, uint32_t column_count, ::shared_ptr paging_state) : _flags(flags) - , names(std::move(names_)) - , _column_count(column_count) + , _column_info(make_lw_shared(std::move(names_), column_count)) , _paging_state(std::move(paging_state)) { } // The maximum number of values that the ResultSet can hold. This can be bigger than columnCount due to CASSANDRA-4911 uint32_t metadata::value_count() const { - return _flags.contains() ? _column_count : names.size(); + return _flags.contains() ? _column_info->_column_count : _column_info->_names.size(); } void metadata::add_non_serialized_column(::shared_ptr name) { // See comment above. Because columnCount doesn't account the newly added name, it // won't be serialized. - names.emplace_back(std::move(name)); + _column_info->_names.emplace_back(std::move(name)); } bool metadata::all_in_same_cf() const { @@ -73,7 +71,7 @@ bool metadata::all_in_same_cf() const { return false; } - return column_specification::all_in_same_table(names); + return column_specification::all_in_same_table(_column_info->_names); } void metadata::set_has_more_pages(::shared_ptr paging_state) { @@ -93,18 +91,10 @@ metadata::flag_enum_set metadata::flags() const { return _flags; } -uint32_t metadata::column_count() const { - return _column_count; -} - ::shared_ptr metadata::paging_state() const { return _paging_state; } -const std::vector<::shared_ptr>& metadata::get_names() const { - return names; -} - prepared_metadata::prepared_metadata(const std::vector<::shared_ptr>& names, const std::vector& partition_key_bind_indices) : _names{names} diff --git a/cql3/result_set.hh b/cql3/result_set.hh index 31ada2388b..c35c367595 100644 --- a/cql3/result_set.hh +++ b/cql3/result_set.hh @@ -70,18 +70,29 @@ public: using flag_enum_set = enum_set; -private: - flag_enum_set _flags; - -public: + struct column_info { // Please note that columnCount can actually be smaller than names, even if names is not null. This is // used to include columns in the resultSet that we need to do post-query re-orderings // (SelectStatement.orderResults) but that shouldn't be sent to the user as they haven't been requested // (CASSANDRA-4911). So the serialization code will exclude any columns in name whose index is >= columnCount. - std::vector<::shared_ptr> names; + std::vector<::shared_ptr> _names; + uint32_t _column_count; + + column_info(std::vector<::shared_ptr> names, uint32_t column_count) + : _names(std::move(names)) + , _column_count(column_count) + { } + + explicit column_info(std::vector<::shared_ptr> names) + : _names(std::move(names)) + , _column_count(_names.size()) + { } + }; +private: + flag_enum_set _flags; private: - uint32_t _column_count; + lw_shared_ptr _column_info; ::shared_ptr _paging_state; public: @@ -105,11 +116,13 @@ public: flag_enum_set flags() const; - uint32_t column_count() const; + uint32_t column_count() const { return _column_info->_column_count; } ::shared_ptr paging_state() const; - const std::vector<::shared_ptr>& get_names() const; + const std::vector<::shared_ptr>& get_names() const { + return _column_info->_names; + } }; ::shared_ptr make_empty_metadata(); @@ -223,14 +236,14 @@ public: class result { std::unique_ptr _result_set; result_generator _result_generator; - shared_ptr _metadata; + shared_ptr _metadata; public: explicit result(std::unique_ptr rs) : _result_set(std::move(rs)) , _metadata(_result_set->_metadata) { } - explicit result(result_generator generator, shared_ptr m) + explicit result(result_generator generator, shared_ptr m) : _result_generator(std::move(generator)) , _metadata(std::move(m)) { } @@ -240,7 +253,7 @@ public: if (_result_set) { return *_result_set; } else { - auto builder = result_set::builder(_metadata); + auto builder = result_set::builder(make_shared(*_metadata)); _result_generator.visit(builder); return std::move(builder).get_result_set(); } diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index e722cb9213..27e4ea4e4a 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -442,10 +442,15 @@ select_statement::do_execute(service::storage_proxy& proxy, if (_selection->is_trivial() && !_restrictions->need_filtering()) { return p->fetch_page_generator(page_size, now, _stats).then([this, p, limit] (result_generator generator) { - auto meta = make_shared(*_selection->get_result_metadata()); - if (!p->is_exhausted()) { - meta->set_has_more_pages(p->state()); - } + auto meta = [&] () -> shared_ptr { + if (!p->is_exhausted()) { + auto meta = make_shared(*_selection->get_result_metadata()); + meta->set_has_more_pages(p->state()); + return meta; + } else { + return _selection->get_result_metadata(); + } + }(); return shared_ptr( make_shared(result(std::move(generator), std::move(meta))) diff --git a/query-result-reader.hh b/query-result-reader.hh index 2b43bee2a6..38953324fb 100644 --- a/query-result-reader.hh +++ b/query-result-reader.hh @@ -156,6 +156,7 @@ class result_view { public: result_view(const bytes_ostream& v) : _v(ser::query_result_view{ser::as_input_stream(v)}) {} result_view(ser::query_result_view v) : _v(v) {} + explicit result_view(const query::result& res) : result_view(res.buf()) { } template static auto do_with(const query::result& res, Func&& func) { @@ -165,14 +166,12 @@ public: template static void consume(const query::result& res, const partition_slice& slice, ResultVisitor&& visitor) { - do_with(res, [&] (result_view v) { - v.consume(slice, visitor); - }); + result_view(res).consume(slice, visitor); } template GCC6_CONCEPT(requires ResultVisitor) - void consume(const partition_slice& slice, Visitor&& visitor) { + void consume(const partition_slice& slice, Visitor&& visitor) const { for (auto&& p : _v.partitions()) { auto rows = p.rows(); auto row_count = rows.size(); @@ -198,13 +197,21 @@ public: } } - std::tuple count_partitions_and_rows() { + std::tuple count_partitions_and_rows() const { auto&& ps = _v.partitions(); auto rows = boost::accumulate(ps | boost::adaptors::transformed([] (auto& p) { return std::max(p.rows().size(), size_t(1)); }), uint32_t(0)); return std::make_tuple(ps.size(), rows); } + + std::tuple> + get_last_partition_and_clustering_key() const { + auto ps = _v.partitions(); + auto& p = ps.back(); + auto rs = p.rows(); + return { p.key().value(), !rs.empty() ? rs.back().key() : stdx::optional() }; + } }; } diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index f3106c01b6..30cc101c0f 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -50,6 +50,14 @@ static logging::logger qlogger("paging"); namespace service::pager { +struct noop_visitor { + void accept_new_partition(uint32_t) { } + void accept_new_partition(const partition_key& key, uint32_t row_count) { } + void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) { } + void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) { } + void accept_partition_end(const query::result_row_view& static_row) { } +}; + static bool has_clustering_keys(const schema& s, const query::read_command& cmd) { return s.clustering_key_size() > 0 && !cmd.slice.options.contains(); @@ -247,14 +255,6 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd) future query_pager::fetch_page_generator(uint32_t page_size, gc_clock::time_point now, cql3::cql_stats& stats) { return do_fetch_page(page_size, now).then([this, page_size, now, &stats] (service::storage_proxy::coordinator_query_result qr) { - struct noop_visitor { - void accept_new_partition(uint32_t) { } - void accept_new_partition(const partition_key& key, uint32_t row_count) { } - void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) { } - void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) { } - void accept_partition_end(const query::result_row_view& static_row) { } - }; - _last_replicas = std::move(qr.last_replicas); _query_read_repair_decision = qr.read_repair_decision; handle_result(noop_visitor(), qr.query_result, page_size, now); @@ -335,24 +335,47 @@ public: const foreign_ptr>& results, uint32_t page_size, gc_clock::time_point now) { - query_result_visitor v(std::forward(visitor)); - query::result_view::consume(*results, _cmd->slice, v); - - if (_last_pkey) { + auto update_slice = [&] (const partition_key& last_pkey) { // refs #752, when doing aggregate queries we will re-use same // slice repeatedly. Since "specific ck ranges" only deal with // a single extra range, we must clear out the old one // Even if it was not so of course, leaving junk in the slice // is bad. - _cmd->slice.clear_range(*_schema, *_last_pkey); + _cmd->slice.clear_range(*_schema, last_pkey); + }; + + auto view = query::result_view(*results); + + uint32_t row_count; + if constexpr(!std::is_same_v, noop_visitor>) { + query_result_visitor v(std::forward(visitor)); + view.consume(_cmd->slice, v); + + if (_last_pkey) { + update_slice(*_last_pkey); + } + + row_count = v.total_rows; + _max = _max - row_count; + _exhausted = (v.total_rows < page_size && !results->is_short_read()) || _max == 0; + _last_pkey = v.last_pkey; + _last_ckey = v.last_ckey; + } else { + row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows()); + _max = _max - row_count; + _exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0; + + if (!_exhausted) { + if (_last_pkey) { + update_slice(*_last_pkey); + } + auto [ last_pkey, last_ckey ] = view.get_last_partition_and_clustering_key(); + _last_pkey = std::move(last_pkey); + _last_ckey = std::move(last_ckey); + } } - _max = _max - v.total_rows; - _exhausted = (v.total_rows < page_size && !results->is_short_read()) || _max == 0; - _last_pkey = v.last_pkey; - _last_ckey = v.last_ckey; - - qlogger.debug("Fetched {} rows, max_remain={} {}", v.total_rows, _max, _exhausted ? "(exh)" : ""); + qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : ""); if (_last_pkey) { qlogger.debug("Last partition key: {}", *_last_pkey); diff --git a/tests/UUID_test.cc b/tests/UUID_test.cc index e463753c73..633fb15487 100644 --- a/tests/UUID_test.cc +++ b/tests/UUID_test.cc @@ -50,3 +50,30 @@ BOOST_AUTO_TEST_CASE(test_UUID_comparison) { BOOST_REQUIRE_GT(p.first, p.second); } } + +BOOST_AUTO_TEST_CASE(test_from_string) { + auto check = [] (sstring_view sv) { + auto uuid = UUID(sv); + BOOST_CHECK_EQUAL(uuid.version(), 4); + BOOST_CHECK_EQUAL(uuid.to_sstring(), sv); + BOOST_CHECK_EQUAL((uuid.get_least_significant_bits() >> 62) & 0x3, 2); + }; + + check("b1415756-49c3-4fa8-9b72-d1b867b032af"); + check("85859d5c-fcf3-4b0b-9089-197b8b06735c"); + check("e596c2f2-d29d-44a0-bb89-0a90ff928490"); + check("f28f86f5-cbc2-4526-ba25-db90c226ec6a"); + check("ce84997b-6ea2-4468-9f02-8a65abf4141a"); +} + +BOOST_AUTO_TEST_CASE(test_make_random_uuid) { + std::vector uuids; + for (auto i = 0; i < 100; i++) { + auto uuid = utils::make_random_uuid(); + BOOST_CHECK_EQUAL(uuid.version(), 4); + BOOST_CHECK_EQUAL((uuid.get_least_significant_bits() >> 62) & 0x3, 2); + uuids.emplace_back(uuid); + } + std::sort(uuids.begin(), uuids.end()); + BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end()); +} diff --git a/utils/uuid.cc b/utils/uuid.cc index 578e6a601a..3cc68b0fdb 100644 --- a/utils/uuid.cc +++ b/utils/uuid.cc @@ -34,23 +34,16 @@ namespace utils { UUID make_random_uuid() { - // FIXME: keep in userspace - static thread_local std::random_device urandom; - static thread_local std::uniform_int_distribution dist(0, 255); - union { - uint8_t b[16]; - struct { - uint64_t msb, lsb; - } w; - } v; - for (auto& b : v.b) { - b = dist(urandom); - } - v.b[6] &= 0x0f; - v.b[6] |= 0x40; // version 4 - v.b[8] &= 0x3f; - v.b[8] |= 0x80; // IETF variant - return UUID(net::hton(v.w.msb), net::hton(v.w.lsb)); + static thread_local std::mt19937_64 engine(std::random_device().operator()()); + static thread_local std::uniform_int_distribution dist; + uint64_t msb, lsb; + msb = dist(engine); + lsb = dist(engine); + msb &= ~uint64_t(0x0f << 12); + msb |= 0x4 << 12; // version 4 + lsb &= ~(uint64_t(0x3) << 62); + lsb |= uint64_t(0x2) << 62; // IETF variant + return UUID(msb, lsb); } std::ostream& operator<<(std::ostream& out, const UUID& uuid) {