diff --git a/database.cc b/database.cc index fc72936006..e922a3f9cc 100644 --- a/database.cc +++ b/database.cc @@ -4,6 +4,7 @@ #include "database.hh" #include "net/byteorder.hh" +#include "db_clock.hh" bool less_unsigned(const bytes& v1, const bytes& v2) { @@ -11,13 +12,28 @@ less_unsigned(const bytes& v1, const bytes& v2) { [](int8_t v1, int8_t v2) { return uint8_t(v1) < uint8_t(v2); }); } +template +bool +abstract_type::default_less(const bytes& v1, const bytes& v2, Compare compare) { + auto o1 = deserialize(v1); + auto o2 = deserialize(v2); + if (!o1) { + return bool(o2); + } + if (!o2) { + return false; + } + auto& x1 = boost::any_cast(*o1); + auto& x2 = boost::any_cast(*o2); + return compare(x1, x2); +} + + template struct simple_type_impl : abstract_type { simple_type_impl(sstring name) : abstract_type(std::move(name)) {} virtual bool less(const bytes& v1, const bytes& v2) override { - auto& x1 = boost::any_cast(deserialize(v1)); - auto& x2 = boost::any_cast(deserialize(v2)); - return x1 < x2; + return default_less(v1, v2); } }; @@ -28,9 +44,15 @@ struct int32_type_impl : simple_type_impl { auto u = net::hton(uint32_t(v)); out.write(reinterpret_cast(&u), sizeof(u)); } - virtual boost::any deserialize(std::istream& in) { + virtual object_opt deserialize(std::istream& in) { uint32_t u; - in.read(reinterpret_cast(&u), sizeof(u)); + auto n = in.rdbuf()->sgetn(reinterpret_cast(&u), sizeof(u)); + if (!n) { + return {}; + } + if (n != 4) { + throw marshal_exception(); + } auto v = int32_t(net::ntoh(u)); return boost::any(v); } @@ -43,9 +65,15 @@ struct long_type_impl : simple_type_impl { auto u = net::hton(uint64_t(v)); out.write(reinterpret_cast(&u), sizeof(u)); } - virtual boost::any deserialize(std::istream& in) { + virtual object_opt deserialize(std::istream& in) { uint64_t u; - in.read(reinterpret_cast(&u), sizeof(u)); + auto n = in.rdbuf()->sgetn(reinterpret_cast(&u), sizeof(u)); + if (!n) { + return {}; + } + if (n != 8) { + throw marshal_exception(); + } auto v = int64_t(net::ntoh(u)); return boost::any(v); } @@ -57,7 +85,7 @@ struct string_type_impl : public abstract_type { auto& v = boost::any_cast(value); out.write(v.c_str(), v.size()); } - virtual boost::any deserialize(std::istream& in) { + virtual object_opt deserialize(std::istream& in) { std::vector tmp(std::istreambuf_iterator(in.rdbuf()), std::istreambuf_iterator()); // FIXME: validation? @@ -74,7 +102,7 @@ struct bytes_type_impl : public abstract_type { auto& v = boost::any_cast(value); out.write(v.c_str(), v.size()); } - virtual boost::any deserialize(std::istream& in) { + virtual object_opt deserialize(std::istream& in) { std::vector tmp(std::istreambuf_iterator(in.rdbuf()), std::istreambuf_iterator()); return boost::any(bytes(reinterpret_cast(tmp.data()), tmp.size())); @@ -84,11 +112,59 @@ struct bytes_type_impl : public abstract_type { } }; +struct boolean_type_impl : public simple_type_impl { + boolean_type_impl() : simple_type_impl("boolean") {} + virtual void serialize(const boost::any& value, std::ostream& out) override { + auto v = boost::any_cast(value); + char c = v; + out.put(c); + } + virtual object_opt deserialize(std::istream& in) override { + char tmp; + auto n = in.rdbuf()->sgetn(&tmp, 1); + if (n == 0) { + return {}; + } + return boost::any(tmp != 0); + } +}; + +struct date_type_impl : public abstract_type { + date_type_impl() : abstract_type("date") {} + virtual void serialize(const boost::any& value, std::ostream& out) override { + auto v = boost::any_cast(value); + int64_t i = v.time_since_epoch().count(); + i = net::hton(uint64_t(i)); + out.write(reinterpret_cast(&i), 8); + } + virtual object_opt deserialize(std::istream& in) override { + int64_t tmp; + auto n = in.rdbuf()->sgetn(reinterpret_cast(&tmp), 8); + if (n == 0) { + return {}; + } + if (n != 8) { + throw marshal_exception(); + } + tmp = net::ntoh(uint64_t(tmp)); + return boost::any(db_clock::time_point(db_clock::duration(tmp))); + } + virtual bool less(const bytes& b1, const bytes& b2) override { + // DateType has a bug where it compares the values as an unsigned type. + // Preserve this bug. + return default_less(b1, b2, [] (db_clock::time_point t1, db_clock::time_point t2) { + return uint64_t(t1.time_since_epoch().count() < t2.time_since_epoch().count()); + }); + } +}; + thread_local shared_ptr int_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii")); thread_local shared_ptr bytes_type(make_shared()); thread_local shared_ptr utf8_type(make_shared("utf8")); +thread_local shared_ptr boolean_type(make_shared()); +thread_local shared_ptr date_type(make_shared()); partition::partition(column_family& cf) : rows(key_compare(cf.clustering_key_type)) { diff --git a/database.hh b/database.hh index 31c1deb669..b9e0eb40d2 100644 --- a/database.hh +++ b/database.hh @@ -17,19 +17,30 @@ #include #include #include +#include // FIXME: should be int8_t using bytes = basic_sstring; +using object_opt = std::experimental::optional; + +class marshal_exception : public std::exception { + sstring _why; +public: + marshal_exception() : _why("marshalling error") {} + marshal_exception(sstring why) : _why(sstring("marshaling error: ") + why) {} + virtual const char* why() const { return _why.c_str(); } +}; + class abstract_type { sstring _name; public: abstract_type(sstring name) : _name(name) {} virtual ~abstract_type() {} virtual void serialize(const boost::any& value, std::ostream& out) = 0; - virtual boost::any deserialize(std::istream& in) = 0; + virtual object_opt deserialize(std::istream& in) = 0; virtual bool less(const bytes& v1, const bytes& v2) = 0; - boost::any deserialize(const bytes& v) { + object_opt deserialize(const bytes& v) { // FIXME: optimize std::istringstream iss(v); return deserialize(iss); @@ -44,6 +55,9 @@ public: sstring name() const { return _name; } +protected: + template > + bool default_less(const bytes& b1, const bytes& b2, Compare compare = Compare()); }; using data_type = shared_ptr; @@ -86,6 +100,7 @@ extern thread_local shared_ptr long_type; extern thread_local shared_ptr ascii_type; extern thread_local shared_ptr bytes_type; extern thread_local shared_ptr utf8_type; +extern thread_local shared_ptr boolean_type; template <> inline diff --git a/db_clock.hh b/db_clock.hh new file mode 100644 index 0000000000..9c4af2516b --- /dev/null +++ b/db_clock.hh @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#ifndef DB_CLOCK_HH_ +#define DB_CLOCK_HH_ + +#include +#include + +// the database clock follows Java - 1ms granularity, 64-bit counter, 1970 epoch + +class db_clock { + using base = std::chrono::system_clock; +public: + using rep = int64_t; + using period = std::ratio<1, 1000>; // milliseconds + using duration = std::chrono::duration; + using time_point = std::chrono::time_point; + + static constexpr bool is_steady = base::is_steady; + static std::time_t to_time_t(time_point t) { + return std::chrono::duration_cast(t.time_since_epoch()).count(); + } + static time_point from_time_t(std::time_t t) { + return time_point(std::chrono::duration_cast(std::chrono::seconds(t))); + } + static time_point now() { + auto now_since_epoch = base::now() - base::from_time_t(0); + return time_point(std::chrono::duration_cast(now_since_epoch)); + } +}; + +#endif /* DB_CLOCK_HH_ */