Merge tag 'avi/serialization_format/v2'

From Avi:

The protocol_version variable is problematic for two reasons:
- as an integer, it can get confused with a nearby unrelated integer argument
- it represents a layering violation, where the transport protocol leaks into
  the database layer

Fix by abstracting it with a new serialization_format class.  The transport
layer converts protocol_version to serialization_format.
This commit is contained in:
Tomasz Grabiec
2015-03-16 17:19:59 +01:00
10 changed files with 183 additions and 128 deletions

View File

@@ -185,12 +185,12 @@ public:
: map(std::move(map)) {
}
static value from_serialized(bytes value, map_type type, int version) {
static value from_serialized(bytes value, map_type type, serialization_format sf) {
try {
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
// FIXME: deserialize_for_native_protocol?!
auto m = boost::any_cast<map_type_impl::native_type>(type->deserialize(value, version));
auto m = boost::any_cast<map_type_impl::native_type>(type->deserialize(value, sf));
std::map<bytes, bytes, serialized_compare> map(type->get_keys_type()->as_less_comparator());
for (auto&& e : m) {
map.emplace(type->get_keys_type()->decompose(e.first),
@@ -203,22 +203,22 @@ public:
}
virtual bytes_opt get(const query_options& options) override {
return get_with_protocol_version(options.get_protocol_version());
return get_with_protocol_version(options.get_serialization_format());
}
virtual bytes get_with_protocol_version(int protocol_version) {
virtual bytes get_with_protocol_version(serialization_format sf) {
//FIXME: share code with serialize_partially_deserialized_form
size_t len = collection_value_len(protocol_version) * map.size() * 2 + collection_size_len(protocol_version);
size_t len = collection_value_len(sf) * map.size() * 2 + collection_size_len(sf);
for (auto&& e : map) {
len += e.first.size() + e.second.size();
}
bytes b(bytes::initialized_later(), len);
bytes::iterator out = b.begin();
write_collection_size(out, protocol_version, map.size());
write_collection_size(out, map.size(), sf);
for (auto&& e : map) {
write_collection_value(out, protocol_version, e.first);
write_collection_value(out, protocol_version, e.second);
write_collection_value(out, sf, e.first);
write_collection_value(out, sf, e.second);
}
return b;
}
@@ -413,7 +413,8 @@ public:
m.set_clustered_cell(prefix, column, params.make_dead_cell());
}
} else {
auto v = map_type_impl::serialize_partially_deserialized_form({map_value->map.begin(), map_value->map.end()}, 3);
auto v = map_type_impl::serialize_partially_deserialized_form({map_value->map.begin(), map_value->map.end()},
serialization_format::internal());
if (column.is_static()) {
m.set_static_cell(column, params.make_cell(std::move(v)));
} else {

View File

@@ -29,6 +29,6 @@ namespace cql3 {
const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp};
default_query_options query_options::DEFAULT{db::consistency_level::ONE,
{}, false, query_options::specific_options::DEFAULT, 3};
{}, false, query_options::specific_options::DEFAULT, 3, serialization_format::use_32_bit()};
}

View File

@@ -31,6 +31,7 @@
#include "service/pager/paging_state.hh"
#include "cql3/column_specification.hh"
#include "cql3/column_identifier.hh"
#include "serialization_format.hh"
namespace cql3 {
@@ -40,7 +41,9 @@ class default_query_options;
* Options for a query.
*/
class query_options {
serialization_format _serialization_format;
public:
explicit query_options(serialization_format sf) : _serialization_format(sf) {}
// Options that are likely to not be present in most queries
struct specific_options final {
static const specific_options DEFAULT;
@@ -117,6 +120,7 @@ public:
* a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
*/
virtual int get_protocol_version() const = 0;
serialization_format get_serialization_format() const { return _serialization_format; }
// Mainly for the sake of BatchQueryOptions
virtual const specific_options& get_specific_options() const = 0;
@@ -206,8 +210,9 @@ private:
const int32_t _protocol_version; // transient
public:
default_query_options(db::consistency_level consistency, std::vector<bytes_opt> values, bool skip_metadata, specific_options options,
int32_t protocol_version)
: _consistency(consistency)
int32_t protocol_version, serialization_format sf)
: query_options(sf)
, _consistency(consistency)
, _values(std::move(values))
, _skip_metadata(skip_metadata)
, _options(std::move(options))
@@ -234,7 +239,10 @@ class query_options_wrapper : public query_options {
protected:
std::unique_ptr<query_options> _wrapped;
public:
query_options_wrapper(std::unique_ptr<query_options> wrapped) : _wrapped(std::move(wrapped)) {}
query_options_wrapper(std::unique_ptr<query_options> wrapped)
: query_options(wrapped->get_serialization_format())
, _wrapped(std::move(wrapped)) {
}
virtual db::consistency_level get_consistency() const override {
return _wrapped->get_consistency();

View File

@@ -169,12 +169,12 @@ public:
: _elements(std::move(elements)) {
}
static value from_serialized(bytes_view v, set_type type, int version) {
static value from_serialized(bytes_view v, set_type type, serialization_format sf) {
try {
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
// FIXME: deserializeForNativeProtocol?!
auto s = boost::any_cast<set_type_impl::native_type>(type->deserialize(v, version));
auto s = boost::any_cast<set_type_impl::native_type>(type->deserialize(v, sf));
std::set<bytes, serialized_compare> elements(type->as_less_comparator());
for (auto&& element : s) {
elements.insert(elements.end(), type->get_elements_type()->decompose(element));
@@ -186,12 +186,12 @@ public:
}
virtual bytes_opt get(const query_options& options) override {
return get_with_protocol_version(options.get_protocol_version());
return get_with_protocol_version(options.get_serialization_format());
}
virtual bytes get_with_protocol_version(int protocol_version) override {
virtual bytes get_with_protocol_version(serialization_format sf) override {
return collection_type_impl::pack(_elements.begin(), _elements.end(),
_elements.size(), protocol_version);
_elements.size(), sf);
}
bool equals(set_type st, const value& v) {
@@ -337,7 +337,8 @@ public:
} else {
// for frozen sets, we're overwriting the whole cell
auto v = set_type->serialize_partially_deserialized_form(
{set_value->_elements.begin(), set_value->_elements.end()}, 3);
{set_value->_elements.begin(), set_value->_elements.end()},
serialization_format::internal());
if (set_value->_elements.empty()) {
m.set_cell(row_key, column, params.make_dead_cell());
} else {

View File

@@ -180,7 +180,7 @@ public:
public:
virtual ~collection_terminal() {}
/** Gets the value of the collection when serialized with the given protocol version format */
virtual bytes get_with_protocol_version(int protocol_version) = 0;
virtual bytes get_with_protocol_version(serialization_format sf) = 0;
};
/**

26
serialization_format.hh Normal file
View File

@@ -0,0 +1,26 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
// Abstraction of transport protocol-dependent serialization format
// Protocols v1, v2 used 16 bits for collection sizes, while v3 and
// above use 32 bits. But letting every bit of the code know what
// transport protocol we're using (and in some cases, we aren't using
// any transport -- it's for internal storage) is bad, so abstract it
// away here.
class serialization_format {
bool _use_32_bit;
private:
explicit serialization_format(bool use_32_bit) : _use_32_bit(use_32_bit) {}
public:
static serialization_format use_16_bit() { return serialization_format(false); }
static serialization_format use_32_bit() { return serialization_format(true); }
static serialization_format internal() { return use_32_bit(); }
bool using_32_bits_for_collections() const { return _use_32_bit; }
};

View File

@@ -73,7 +73,8 @@ static future<> require_column_has_value(distributed<database>& ddb, const sstri
} else {
auto cell = i->second.as_collection_mutation();
auto type = dynamic_pointer_cast<collection_type_impl>(col_def->type);
actual = type->to_value(type->deserialize_mutation_form(cell.data), 3);
actual = type->to_value(type->deserialize_mutation_form(cell.data),
serialization_format::internal());
}
assert(col_def->type->equal(actual, col_def->type->decompose(expected)));
row->find(col_def->id);

View File

@@ -151,6 +151,7 @@ class cql_server::connection {
output_stream<char> _write_buf;
future<> _ready_to_respond = make_ready_future<>();
uint8_t _version = 0;
serialization_format _serialization_format = serialization_format::use_16_bit();
service::client_state _client_state;
std::unordered_map<uint16_t, cql_query_state> _query_states;
public:
@@ -205,6 +206,7 @@ private:
std::unique_ptr<cql3::query_options> read_options(temporary_buffer<char>& buf);
cql_query_state& get_query_state(uint16_t stream);
void init_serialization_format();
};
class cql_server::response {
@@ -326,6 +328,7 @@ cql_server::connection::read_frame() {
return make_ready_future<ret_type>();
}
_version = buf[0];
init_serialization_format();
if (_version < 1 || _version > 4) {
throw bad_cql_protocol_version();
}
@@ -409,6 +412,15 @@ cql_query_state& cql_server::connection::get_query_state(uint16_t stream)
return i->second;
}
void
cql_server::connection::init_serialization_format() {
if (_version < 3) {
_serialization_format = serialization_format::use_16_bit();
} else {
_serialization_format = serialization_format::use_32_bit();
}
}
future<> cql_server::connection::process_query(uint16_t stream, temporary_buffer<char> buf)
{
auto query = read_long_string_view(buf);
@@ -671,7 +683,7 @@ std::unique_ptr<cql3::query_options> cql_server::connection::read_options(tempor
auto consistency = read_consistency(buf);
if (_version == 1) {
return std::make_unique<cql3::default_query_options>(consistency, std::vector<bytes_opt>{},
false, cql3::query_options::specific_options::DEFAULT, 1);
false, cql3::query_options::specific_options::DEFAULT, 1, _serialization_format);
}
assert(_version >= 2);
@@ -718,10 +730,11 @@ std::unique_ptr<cql3::query_options> cql_server::connection::read_options(tempor
}
options = std::make_unique<cql3::default_query_options>(consistency, std::move(values), skip_metadata,
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, _version);
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, _version,
_serialization_format);
} else {
options = std::make_unique<cql3::default_query_options>(consistency, std::move(values), skip_metadata,
cql3::query_options::specific_options::DEFAULT, _version);
cql3::query_options::specific_options::DEFAULT, _version, _serialization_format);
}
if (names.empty()) {

166
types.cc
View File

@@ -674,44 +674,44 @@ collection_type_impl::as_cql3_type() {
abort();
}
size_t collection_size_len(int version) {
if (version >= 3) {
size_t collection_size_len(serialization_format sf) {
if (sf.using_32_bits_for_collections()) {
return sizeof(int32_t);
}
return sizeof(uint16_t);
}
size_t collection_value_len(int version) {
if (version >= 3) {
size_t collection_value_len(serialization_format sf) {
if (sf.using_32_bits_for_collections()) {
return sizeof(int32_t);
}
return sizeof(uint16_t);
}
int read_collection_size(bytes_view& in, int version) {
if (version >= 3) {
int read_collection_size(bytes_view& in, serialization_format sf) {
if (sf.using_32_bits_for_collections()) {
return read_simple<int32_t>(in);
} else {
return read_simple<uint16_t>(in);
}
}
void write_collection_size(bytes::iterator& out, int size, int version) {
if (version >= 3) {
void write_collection_size(bytes::iterator& out, int size, serialization_format sf) {
if (sf.using_32_bits_for_collections()) {
serialize_int32(out, size);
} else {
serialize_int16(out, uint16_t(size));
}
}
bytes_view read_collection_value(bytes_view& in, int version) {
auto size = version >= 3 ? read_simple<int32_t>(in) : read_simple<uint16_t>(in);
bytes_view read_collection_value(bytes_view& in, serialization_format sf) {
auto size = sf.using_32_bits_for_collections() ? read_simple<int32_t>(in) : read_simple<uint16_t>(in);
return read_simple_bytes(in, size);
}
void write_collection_value(bytes::iterator& out, int version, bytes_view val_bytes) {
if (version >= 3) {
void write_collection_value(bytes::iterator& out, serialization_format sf, bytes_view val_bytes) {
if (sf.using_32_bits_for_collections()) {
serialize_int32(out, int32_t(val_bytes.size()));
} else {
serialize_int16(out, uint16_t(val_bytes.size()));
@@ -719,10 +719,10 @@ void write_collection_value(bytes::iterator& out, int version, bytes_view val_by
out = std::copy_n(val_bytes.begin(), val_bytes.size(), out);
}
void write_collection_value(bytes::iterator& out, int version, data_type type, const boost::any& value) {
void write_collection_value(bytes::iterator& out, serialization_format sf, data_type type, const boost::any& value) {
size_t val_len = type->serialized_size(value);
if (version >= 3) {
if (sf.using_32_bits_for_collections()) {
serialize_int32(out, val_len);
} else {
serialize_int16(out, val_len);
@@ -786,19 +786,19 @@ map_type_impl::compare_maps(data_type keys, data_type values, bytes_view o1, byt
} else if (o2.empty()) {
return 1;
}
int protocol_version = 3;
int size1 = read_collection_size(o1, protocol_version);
int size2 = read_collection_size(o2, protocol_version);
auto sf = serialization_format::internal();
int size1 = read_collection_size(o1, sf);
int size2 = read_collection_size(o2, sf);
// FIXME: use std::lexicographical_compare()
for (int i = 0; i < std::min(size1, size2); ++i) {
auto k1 = read_collection_value(o1, protocol_version);
auto k2 = read_collection_value(o2, protocol_version);
auto k1 = read_collection_value(o1, sf);
auto k2 = read_collection_value(o2, sf);
auto cmp = keys->compare(k1, k2);
if (cmp != 0) {
return cmp;
}
auto v1 = read_collection_value(o1, protocol_version);
auto v2 = read_collection_value(o2, protocol_version);
auto v1 = read_collection_value(o1, sf);
auto v2 = read_collection_value(o2, sf);
cmp = values->compare(v1, v2);
if (cmp != 0) {
return cmp;
@@ -809,14 +809,14 @@ map_type_impl::compare_maps(data_type keys, data_type values, bytes_view o1, byt
void
map_type_impl::serialize(const boost::any& value, bytes::iterator& out) {
return serialize(value, out, 3);
return serialize(value, out, serialization_format::internal());
}
size_t
map_type_impl::serialized_size(const boost::any& value) {
auto& m = boost::any_cast<const native_type&>(value);
size_t len = collection_size_len(3);
size_t psz = collection_value_len(3);
size_t len = collection_size_len(serialization_format::internal());
size_t psz = collection_value_len(serialization_format::internal());
for (auto&& kv : m) {
len += psz + _keys->serialized_size(kv.first);
len += psz + _values->serialized_size(kv.second);
@@ -826,31 +826,31 @@ map_type_impl::serialized_size(const boost::any& value) {
}
void
map_type_impl::serialize(const boost::any& value, bytes::iterator& out, int protocol_version) {
map_type_impl::serialize(const boost::any& value, bytes::iterator& out, serialization_format sf) {
auto& m = boost::any_cast<const native_type&>(value);
write_collection_size(out, m.size(), protocol_version);
write_collection_size(out, m.size(), sf);
for (auto&& kv : m) {
write_collection_value(out, protocol_version, _keys, kv.first);
write_collection_value(out, protocol_version, _values, kv.second);
write_collection_value(out, sf, _keys, kv.first);
write_collection_value(out, sf, _values, kv.second);
}
}
object_opt
map_type_impl::deserialize(bytes_view v) {
return deserialize(v, 3);
return deserialize(v, serialization_format::internal());
}
object_opt
map_type_impl::deserialize(bytes_view in, int protocol_version) {
map_type_impl::deserialize(bytes_view in, serialization_format sf) {
if (in.empty()) {
return {};
}
native_type m;
auto size = read_collection_size(in, protocol_version);
auto size = read_collection_size(in, sf);
for (int i = 0; i < size; ++i) {
auto kb = read_collection_value(in, protocol_version);
auto kb = read_collection_value(in, sf);
auto k = _keys->deserialize(kb);
auto vb = read_collection_value(in, protocol_version);
auto vb = read_collection_value(in, sf);
auto v = _values->deserialize(vb);
m.insert(m.end(), std::make_pair(std::move(k), std::move(v)));
}
@@ -882,7 +882,7 @@ map_type_impl::serialized_values(std::vector<atomic_cell::one> cells) {
}
bytes
map_type_impl::to_value(mutation_view mut, int protocol_version) {
map_type_impl::to_value(mutation_view mut, serialization_format sf) {
std::vector<bytes_view> tmp;
tmp.reserve(mut.size() * 2);
for (auto&& e : mut) {
@@ -891,23 +891,23 @@ map_type_impl::to_value(mutation_view mut, int protocol_version) {
tmp.emplace_back(e.second.value());
}
}
return pack(tmp.begin(), tmp.end(), tmp.size() / 2, protocol_version);
return pack(tmp.begin(), tmp.end(), tmp.size() / 2, sf);
}
bytes
map_type_impl::serialize_partially_deserialized_form(
const std::vector<std::pair<bytes_view, bytes_view>>& v, int protocol_version) {
size_t len = collection_value_len(protocol_version) * v.size() * 2 + collection_size_len(protocol_version);
const std::vector<std::pair<bytes_view, bytes_view>>& v, serialization_format sf) {
size_t len = collection_value_len(sf) * v.size() * 2 + collection_size_len(sf);
for (auto&& e : v) {
len += e.first.size() + e.second.size();
}
bytes b(bytes::initialized_later(), len);
bytes::iterator out = b.begin();
write_collection_size(out, protocol_version, v.size());
write_collection_size(out, v.size(), sf);
for (auto&& e : v) {
write_collection_value(out, protocol_version, e.first);
write_collection_value(out, protocol_version, e.second);
write_collection_value(out, sf, e.first);
write_collection_value(out, sf, e.second);
}
return b;
@@ -995,16 +995,16 @@ class listlike_partial_deserializing_iterator
bytes_view* _in;
int _remain;
bytes_view _cur;
int _protocol_version;
serialization_format _sf;
private:
struct end_tag {};
listlike_partial_deserializing_iterator(bytes_view& in, int protocol_version)
: _in(&in), _protocol_version(protocol_version) {
_remain = read_collection_size(*_in, _protocol_version);
listlike_partial_deserializing_iterator(bytes_view& in, serialization_format sf)
: _in(&in), _sf(sf) {
_remain = read_collection_size(*_in, _sf);
parse();
}
listlike_partial_deserializing_iterator(end_tag)
: _remain(0) {
: _remain(0), _sf(serialization_format::internal()) { // _sf is bogus, but doesn't matter
}
public:
bytes_view operator*() const { return _cur; }
@@ -1023,16 +1023,16 @@ public:
bool operator!=(const listlike_partial_deserializing_iterator& x) const {
return _remain != x._remain;
}
static listlike_partial_deserializing_iterator begin(bytes_view& in, int protocol_version) {
return { in, protocol_version };
static listlike_partial_deserializing_iterator begin(bytes_view& in, serialization_format sf) {
return { in, sf };
}
static listlike_partial_deserializing_iterator end(bytes_view in, int protocol_version) {
static listlike_partial_deserializing_iterator end(bytes_view in, serialization_format sf) {
return { end_tag() };
}
private:
void parse() {
if (_remain) {
_cur = read_collection_value(*_in, _protocol_version);
_cur = read_collection_value(*_in, _sf);
} else {
_cur = {};
}
@@ -1083,22 +1083,23 @@ set_type_impl::is_value_compatible_with_frozen(collection_type_impl& previous) {
bool
set_type_impl::less(bytes_view o1, bytes_view o2) {
using llpdi = listlike_partial_deserializing_iterator;
auto sf = serialization_format::internal();
return std::lexicographical_compare(
llpdi::begin(o1, 3), llpdi::end(o1, 3),
llpdi::begin(o2, 3), llpdi::end(o2, 3),
llpdi::begin(o1, sf), llpdi::end(o1, sf),
llpdi::begin(o2, sf), llpdi::end(o2, sf),
[this] (bytes_view o1, bytes_view o2) { return _elements->less(o1, o2); });
}
void
set_type_impl::serialize(const boost::any& value, bytes::iterator& out) {
return serialize(value, out, 3);
return serialize(value, out, serialization_format::internal());
}
size_t
set_type_impl::serialized_size(const boost::any& value) {
auto& s = boost::any_cast<const native_type&>(value);
size_t len = collection_size_len(3);
size_t psz = collection_value_len(3);;
size_t len = collection_size_len(serialization_format::internal());
size_t psz = collection_value_len(serialization_format::internal());
for (auto&& e : s) {
len += psz + _elements->serialized_size(e);
}
@@ -1108,29 +1109,29 @@ set_type_impl::serialized_size(const boost::any& value) {
void
set_type_impl::serialize(const boost::any& value, bytes::iterator& out, int protocol_version) {
set_type_impl::serialize(const boost::any& value, bytes::iterator& out, serialization_format sf) {
auto& s = boost::any_cast<const native_type&>(value);
write_collection_size(out, s.size(), protocol_version);
write_collection_size(out, s.size(), sf);
for (auto&& e : s) {
write_collection_value(out, protocol_version, _elements, e);
write_collection_value(out, sf, _elements, e);
}
}
object_opt
set_type_impl::deserialize(bytes_view in) {
return deserialize(in, 3);
return deserialize(in, serialization_format::internal());
}
object_opt
set_type_impl::deserialize(bytes_view in, int protocol_version) {
set_type_impl::deserialize(bytes_view in, serialization_format sf) {
if (in.empty()) {
return {};
}
auto nr = read_collection_size(in, protocol_version);
auto nr = read_collection_size(in, sf);
native_type s;
s.reserve(nr);
for (int i = 0; i != nr; ++i) {
auto e = _elements->deserialize(read_collection_value(in, protocol_version));
auto e = _elements->deserialize(read_collection_value(in, sf));
if (!e) {
throw marshal_exception();
}
@@ -1145,7 +1146,8 @@ set_type_impl::to_string(const bytes& b) {
std::ostringstream out;
bool first = true;
auto v = bytes_view(b);
std::for_each(llpdi::begin(v, 3), llpdi::end(v, 3), [&first, &out, this] (bytes_view e) {
auto sf = serialization_format::internal();
std::for_each(llpdi::begin(v, sf), llpdi::end(v, sf), [&first, &out, this] (bytes_view e) {
if (first) {
first = false;
} else {
@@ -1175,7 +1177,7 @@ set_type_impl::serialized_values(std::vector<atomic_cell::one> cells) {
}
bytes
set_type_impl::to_value(mutation_view mut, int protocol_version) {
set_type_impl::to_value(mutation_view mut, serialization_format sf) {
std::vector<bytes_view> tmp;
tmp.reserve(mut.size());
for (auto&& e : mut) {
@@ -1183,13 +1185,13 @@ set_type_impl::to_value(mutation_view mut, int protocol_version) {
tmp.emplace_back(e.first);
}
}
return pack(tmp.begin(), tmp.end(), tmp.size(), protocol_version);
return pack(tmp.begin(), tmp.end(), tmp.size(), sf);
}
bytes
set_type_impl::serialize_partially_deserialized_form(
const std::vector<bytes_view>& v, int protocol_version) {
return pack(v.begin(), v.end(), v.size(), protocol_version);
const std::vector<bytes_view>& v, serialization_format sf) {
return pack(v.begin(), v.end(), v.size(), sf);
}
list_type
@@ -1242,31 +1244,32 @@ list_type_impl::is_value_compatible_with_frozen(collection_type_impl& previous)
bool
list_type_impl::less(bytes_view o1, bytes_view o2) {
using llpdi = listlike_partial_deserializing_iterator;
auto sf = serialization_format::internal();
return std::lexicographical_compare(
llpdi::begin(o1, 3), llpdi::end(o1, 3),
llpdi::begin(o2, 3), llpdi::end(o2, 3),
llpdi::begin(o1, sf), llpdi::end(o1, sf),
llpdi::begin(o2, sf), llpdi::end(o2, sf),
[this] (bytes_view o1, bytes_view o2) { return _elements->less(o1, o2); });
}
void
list_type_impl::serialize(const boost::any& value, bytes::iterator& out) {
return serialize(value, out, 3);
return serialize(value, out, serialization_format::internal());
}
void
list_type_impl::serialize(const boost::any& value, bytes::iterator& out, int protocol_version) {
list_type_impl::serialize(const boost::any& value, bytes::iterator& out, serialization_format sf) {
auto& s = boost::any_cast<const native_type&>(value);
write_collection_size(out, s.size(), protocol_version);
write_collection_size(out, s.size(), sf);
for (auto&& e : s) {
write_collection_value(out, protocol_version, _elements, e);
write_collection_value(out, sf, _elements, e);
}
}
size_t
list_type_impl::serialized_size(const boost::any& value) {
auto& s = boost::any_cast<const native_type&>(value);
size_t len = collection_size_len(3);
size_t psz = collection_value_len(3);
size_t len = collection_size_len(serialization_format::internal());
size_t psz = collection_value_len(serialization_format::internal());
for (auto&& e : s) {
len += psz + _elements->serialized_size(e);
}
@@ -1275,19 +1278,19 @@ list_type_impl::serialized_size(const boost::any& value) {
object_opt
list_type_impl::deserialize(bytes_view in) {
return deserialize(in, 3);
return deserialize(in, serialization_format::internal());
}
object_opt
list_type_impl::deserialize(bytes_view in, int protocol_version) {
list_type_impl::deserialize(bytes_view in, serialization_format sf) {
if (in.empty()) {
return {};
}
auto nr = read_collection_size(in, protocol_version);
auto nr = read_collection_size(in, sf);
native_type s;
s.reserve(nr);
for (int i = 0; i != nr; ++i) {
auto e = _elements->deserialize(read_collection_value(in, protocol_version));
auto e = _elements->deserialize(read_collection_value(in, sf));
if (!e) {
throw marshal_exception();
}
@@ -1302,7 +1305,8 @@ list_type_impl::to_string(const bytes& b) {
std::ostringstream out;
bool first = true;
auto v = bytes_view(b);
std::for_each(llpdi::begin(v, 3), llpdi::end(v, 3), [&first, &out, this] (bytes_view e) {
auto sf = serialization_format::internal();
std::for_each(llpdi::begin(v, sf), llpdi::end(v, sf), [&first, &out, this] (bytes_view e) {
if (first) {
first = false;
} else {
@@ -1332,7 +1336,7 @@ list_type_impl::serialized_values(std::vector<atomic_cell::one> cells) {
}
bytes
list_type_impl::to_value(mutation_view mut, int protocol_version) {
list_type_impl::to_value(mutation_view mut, serialization_format sf) {
std::vector<bytes_view> tmp;
tmp.reserve(mut.size());
for (auto&& e : mut) {
@@ -1340,7 +1344,7 @@ list_type_impl::to_value(mutation_view mut, int protocol_version) {
tmp.emplace_back(e.second.value());
}
}
return pack(tmp.begin(), tmp.end(), tmp.size(), protocol_version);
return pack(tmp.begin(), tmp.end(), tmp.size(), sf);
}
thread_local const shared_ptr<abstract_type> int32_type(make_shared<int32_type_impl>());

View File

@@ -18,6 +18,7 @@
#include "bytes.hh"
#include "log.hh"
#include "atomic_cell.hh"
#include "serialization_format.hh"
namespace cql3 {
@@ -202,9 +203,9 @@ public:
virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) = 0;
virtual shared_ptr<cql3::cql3_type> as_cql3_type() override;
template <typename BytesViewIterator>
static bytes pack(BytesViewIterator start, BytesViewIterator finish, int elements, int version);
static bytes pack(BytesViewIterator start, BytesViewIterator finish, int elements, serialization_format sf);
mutation_view deserialize_mutation_form(bytes_view in);
virtual bytes to_value(mutation_view mut, int protocol_version) = 0;
virtual bytes to_value(mutation_view mut, serialization_format sf) = 0;
// FIXME: use iterators?
collection_mutation::one serialize_mutation_form(const mutation& mut);
collection_mutation::one serialize_mutation_form(mutation_view mut);
@@ -283,17 +284,17 @@ public:
bytes_view o1, bytes_view o2);
virtual bool is_byte_order_comparable() const override { return false; }
virtual void serialize(const boost::any& value, bytes::iterator& out) override;
void serialize(const boost::any& value, bytes::iterator& out, int protocol_version);
void serialize(const boost::any& value, bytes::iterator& out, serialization_format sf);
virtual size_t serialized_size(const boost::any& value);
virtual object_opt deserialize(bytes_view v) override;
object_opt deserialize(bytes_view v, int protocol_version);
object_opt deserialize(bytes_view v, serialization_format sf);
virtual sstring to_string(const bytes& b) override;
virtual size_t hash(bytes_view v) override;
virtual bytes from_string(sstring_view text) override;
virtual std::vector<bytes> serialized_values(std::vector<atomic_cell::one> cells) override;
static bytes serialize_partially_deserialized_form(const std::vector<std::pair<bytes_view, bytes_view>>& v,
int protocol_version);
virtual bytes to_value(mutation_view mut, int protocol_version) override;
serialization_format sf);
virtual bytes to_value(mutation_view mut, serialization_format sf) override;
};
using map_type = shared_ptr<map_type_impl>;
@@ -319,17 +320,17 @@ public:
virtual bool less(bytes_view o1, bytes_view o2) override;
virtual bool is_byte_order_comparable() const override { return _elements->is_byte_order_comparable(); }
virtual void serialize(const boost::any& value, bytes::iterator& out) override;
void serialize(const boost::any& value, bytes::iterator& out, int protocol_version);
void serialize(const boost::any& value, bytes::iterator& out, serialization_format sf);
virtual size_t serialized_size(const boost::any& value) override;
virtual object_opt deserialize(bytes_view v) override;
object_opt deserialize(bytes_view v, int protocol_version);
object_opt deserialize(bytes_view v, serialization_format sf);
virtual sstring to_string(const bytes& b) override;
virtual size_t hash(bytes_view v) override;
virtual bytes from_string(sstring_view text) override;
virtual std::vector<bytes> serialized_values(std::vector<atomic_cell::one> cells) override;
virtual bytes to_value(mutation_view mut, int protocol_version) override;
virtual bytes to_value(mutation_view mut, serialization_format sf) override;
bytes serialize_partially_deserialized_form(
const std::vector<bytes_view>& v, int protocol_version);
const std::vector<bytes_view>& v, serialization_format sf);
};
@@ -356,15 +357,15 @@ public:
virtual bool less(bytes_view o1, bytes_view o2) override;
// FIXME: origin doesn't override is_byte_order_comparable(). Why?
virtual void serialize(const boost::any& value, bytes::iterator& out) override;
void serialize(const boost::any& value, bytes::iterator& out, int protocol_version);
void serialize(const boost::any& value, bytes::iterator& out, serialization_format sf);
virtual size_t serialized_size(const boost::any& value) override;
virtual object_opt deserialize(bytes_view v) override;
object_opt deserialize(bytes_view v, int protocol_version);
object_opt deserialize(bytes_view v, serialization_format sf);
virtual sstring to_string(const bytes& b) override;
virtual size_t hash(bytes_view v) override;
virtual bytes from_string(sstring_view text) override;
virtual std::vector<bytes> serialized_values(std::vector<atomic_cell::one> cells) override;
virtual bytes to_value(mutation_view mut, int protocol_version) override;
virtual bytes to_value(mutation_view mut, serialization_format sf) override;
};
using list_type = shared_ptr<list_type_impl>;
@@ -581,25 +582,25 @@ inline sstring read_simple_short_string(bytes_view& v) {
return ret;
}
size_t collection_size_len(int version);
size_t collection_value_len(int version);
void write_collection_size(bytes::iterator& out, int size, int version);
void write_collection_value(bytes::iterator& out, int version, bytes_view val_bytes);
void write_collection_value(bytes::iterator& out, int version, data_type type, const boost::any& value);
size_t collection_size_len(serialization_format sf);
size_t collection_value_len(serialization_format sf);
void write_collection_size(bytes::iterator& out, int size, serialization_format sf);
void write_collection_value(bytes::iterator& out, serialization_format sf, bytes_view val_bytes);
void write_collection_value(bytes::iterator& out, serialization_format sf, data_type type, const boost::any& value);
template <typename BytesViewIterator>
bytes
collection_type_impl::pack(BytesViewIterator start, BytesViewIterator finish, int elements, int protocol_version) {
size_t len = collection_size_len(protocol_version);
size_t psz = collection_value_len(protocol_version);
collection_type_impl::pack(BytesViewIterator start, BytesViewIterator finish, int elements, serialization_format sf) {
size_t len = collection_size_len(sf);
size_t psz = collection_value_len(sf);
for (auto j = start; j != finish; j++) {
len += j->size() + psz;
}
bytes out(bytes::initialized_later(), len);
bytes::iterator i = out.begin();
write_collection_size(i, elements, protocol_version);
write_collection_size(i, elements, sf);
while (start != finish) {
write_collection_value(i, protocol_version, *start++);
write_collection_value(i, sf, *start++);
}
return out;
}