Merge branch 'types' into db
More type work. Reviewed-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
94
database.cc
94
database.cc
@@ -4,6 +4,7 @@
|
||||
|
||||
#include "database.hh"
|
||||
#include "net/byteorder.hh"
|
||||
#include "db_clock.hh"
|
||||
|
||||
bool
|
||||
less_unsigned(const bytes& v1, const bytes& v2) {
|
||||
@@ -11,13 +12,28 @@ less_unsigned(const bytes& v1, const bytes& v2) {
|
||||
[](int8_t v1, int8_t v2) { return uint8_t(v1) < uint8_t(v2); });
|
||||
}
|
||||
|
||||
template <typename T, typename Compare>
|
||||
bool
|
||||
abstract_type::default_less(const bytes& v1, const bytes& v2, Compare compare) {
|
||||
auto o1 = deserialize(v1);
|
||||
auto o2 = deserialize(v2);
|
||||
if (!o1) {
|
||||
return bool(o2);
|
||||
}
|
||||
if (!o2) {
|
||||
return false;
|
||||
}
|
||||
auto& x1 = boost::any_cast<const T&>(*o1);
|
||||
auto& x2 = boost::any_cast<const T&>(*o2);
|
||||
return compare(x1, x2);
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
struct simple_type_impl : abstract_type {
|
||||
simple_type_impl(sstring name) : abstract_type(std::move(name)) {}
|
||||
virtual bool less(const bytes& v1, const bytes& v2) override {
|
||||
auto& x1 = boost::any_cast<const T&>(deserialize(v1));
|
||||
auto& x2 = boost::any_cast<const T&>(deserialize(v2));
|
||||
return x1 < x2;
|
||||
return default_less<T>(v1, v2);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -28,9 +44,15 @@ struct int32_type_impl : simple_type_impl<int32_t> {
|
||||
auto u = net::hton(uint32_t(v));
|
||||
out.write(reinterpret_cast<const char*>(&u), sizeof(u));
|
||||
}
|
||||
virtual boost::any deserialize(std::istream& in) {
|
||||
virtual object_opt deserialize(std::istream& in) {
|
||||
uint32_t u;
|
||||
in.read(reinterpret_cast<char*>(&u), sizeof(u));
|
||||
auto n = in.rdbuf()->sgetn(reinterpret_cast<char*>(&u), sizeof(u));
|
||||
if (!n) {
|
||||
return {};
|
||||
}
|
||||
if (n != 4) {
|
||||
throw marshal_exception();
|
||||
}
|
||||
auto v = int32_t(net::ntoh(u));
|
||||
return boost::any(v);
|
||||
}
|
||||
@@ -43,9 +65,15 @@ struct long_type_impl : simple_type_impl<int64_t> {
|
||||
auto u = net::hton(uint64_t(v));
|
||||
out.write(reinterpret_cast<const char*>(&u), sizeof(u));
|
||||
}
|
||||
virtual boost::any deserialize(std::istream& in) {
|
||||
virtual object_opt deserialize(std::istream& in) {
|
||||
uint64_t u;
|
||||
in.read(reinterpret_cast<char*>(&u), sizeof(u));
|
||||
auto n = in.rdbuf()->sgetn(reinterpret_cast<char*>(&u), sizeof(u));
|
||||
if (!n) {
|
||||
return {};
|
||||
}
|
||||
if (n != 8) {
|
||||
throw marshal_exception();
|
||||
}
|
||||
auto v = int64_t(net::ntoh(u));
|
||||
return boost::any(v);
|
||||
}
|
||||
@@ -57,7 +85,7 @@ struct string_type_impl : public abstract_type {
|
||||
auto& v = boost::any_cast<const sstring&>(value);
|
||||
out.write(v.c_str(), v.size());
|
||||
}
|
||||
virtual boost::any deserialize(std::istream& in) {
|
||||
virtual object_opt deserialize(std::istream& in) {
|
||||
std::vector<char> tmp(std::istreambuf_iterator<char>(in.rdbuf()),
|
||||
std::istreambuf_iterator<char>());
|
||||
// FIXME: validation?
|
||||
@@ -74,7 +102,7 @@ struct bytes_type_impl : public abstract_type {
|
||||
auto& v = boost::any_cast<const bytes&>(value);
|
||||
out.write(v.c_str(), v.size());
|
||||
}
|
||||
virtual boost::any deserialize(std::istream& in) {
|
||||
virtual object_opt deserialize(std::istream& in) {
|
||||
std::vector<char> tmp(std::istreambuf_iterator<char>(in.rdbuf()),
|
||||
std::istreambuf_iterator<char>());
|
||||
return boost::any(bytes(reinterpret_cast<const char*>(tmp.data()), tmp.size()));
|
||||
@@ -84,11 +112,59 @@ struct bytes_type_impl : public abstract_type {
|
||||
}
|
||||
};
|
||||
|
||||
struct boolean_type_impl : public simple_type_impl<bool> {
|
||||
boolean_type_impl() : simple_type_impl<bool>("boolean") {}
|
||||
virtual void serialize(const boost::any& value, std::ostream& out) override {
|
||||
auto v = boost::any_cast<bool>(value);
|
||||
char c = v;
|
||||
out.put(c);
|
||||
}
|
||||
virtual object_opt deserialize(std::istream& in) override {
|
||||
char tmp;
|
||||
auto n = in.rdbuf()->sgetn(&tmp, 1);
|
||||
if (n == 0) {
|
||||
return {};
|
||||
}
|
||||
return boost::any(tmp != 0);
|
||||
}
|
||||
};
|
||||
|
||||
struct date_type_impl : public abstract_type {
|
||||
date_type_impl() : abstract_type("date") {}
|
||||
virtual void serialize(const boost::any& value, std::ostream& out) override {
|
||||
auto v = boost::any_cast<db_clock::time_point>(value);
|
||||
int64_t i = v.time_since_epoch().count();
|
||||
i = net::hton(uint64_t(i));
|
||||
out.write(reinterpret_cast<char*>(&i), 8);
|
||||
}
|
||||
virtual object_opt deserialize(std::istream& in) override {
|
||||
int64_t tmp;
|
||||
auto n = in.rdbuf()->sgetn(reinterpret_cast<char*>(&tmp), 8);
|
||||
if (n == 0) {
|
||||
return {};
|
||||
}
|
||||
if (n != 8) {
|
||||
throw marshal_exception();
|
||||
}
|
||||
tmp = net::ntoh(uint64_t(tmp));
|
||||
return boost::any(db_clock::time_point(db_clock::duration(tmp)));
|
||||
}
|
||||
virtual bool less(const bytes& b1, const bytes& b2) override {
|
||||
// DateType has a bug where it compares the values as an unsigned type.
|
||||
// Preserve this bug.
|
||||
return default_less<db_clock::time_point>(b1, b2, [] (db_clock::time_point t1, db_clock::time_point t2) {
|
||||
return uint64_t(t1.time_since_epoch().count() < t2.time_since_epoch().count());
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
thread_local shared_ptr<abstract_type> int_type(make_shared<int32_type_impl>());
|
||||
thread_local shared_ptr<abstract_type> long_type(make_shared<long_type_impl>());
|
||||
thread_local shared_ptr<abstract_type> ascii_type(make_shared<string_type_impl>("ascii"));
|
||||
thread_local shared_ptr<abstract_type> bytes_type(make_shared<bytes_type_impl>());
|
||||
thread_local shared_ptr<abstract_type> utf8_type(make_shared<string_type_impl>("utf8"));
|
||||
thread_local shared_ptr<abstract_type> boolean_type(make_shared<boolean_type_impl>());
|
||||
thread_local shared_ptr<abstract_type> date_type(make_shared<date_type_impl>());
|
||||
|
||||
partition::partition(column_family& cf)
|
||||
: rows(key_compare(cf.clustering_key_type)) {
|
||||
|
||||
19
database.hh
19
database.hh
@@ -17,19 +17,30 @@
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <experimental/optional>
|
||||
|
||||
// FIXME: should be int8_t
|
||||
using bytes = basic_sstring<char, uint32_t, 31>;
|
||||
|
||||
using object_opt = std::experimental::optional<boost::any>;
|
||||
|
||||
class marshal_exception : public std::exception {
|
||||
sstring _why;
|
||||
public:
|
||||
marshal_exception() : _why("marshalling error") {}
|
||||
marshal_exception(sstring why) : _why(sstring("marshaling error: ") + why) {}
|
||||
virtual const char* why() const { return _why.c_str(); }
|
||||
};
|
||||
|
||||
class abstract_type {
|
||||
sstring _name;
|
||||
public:
|
||||
abstract_type(sstring name) : _name(name) {}
|
||||
virtual ~abstract_type() {}
|
||||
virtual void serialize(const boost::any& value, std::ostream& out) = 0;
|
||||
virtual boost::any deserialize(std::istream& in) = 0;
|
||||
virtual object_opt deserialize(std::istream& in) = 0;
|
||||
virtual bool less(const bytes& v1, const bytes& v2) = 0;
|
||||
boost::any deserialize(const bytes& v) {
|
||||
object_opt deserialize(const bytes& v) {
|
||||
// FIXME: optimize
|
||||
std::istringstream iss(v);
|
||||
return deserialize(iss);
|
||||
@@ -44,6 +55,9 @@ public:
|
||||
sstring name() const {
|
||||
return _name;
|
||||
}
|
||||
protected:
|
||||
template <typename T, typename Compare = std::less<T>>
|
||||
bool default_less(const bytes& b1, const bytes& b2, Compare compare = Compare());
|
||||
};
|
||||
|
||||
using data_type = shared_ptr<abstract_type>;
|
||||
@@ -86,6 +100,7 @@ extern thread_local shared_ptr<abstract_type> long_type;
|
||||
extern thread_local shared_ptr<abstract_type> ascii_type;
|
||||
extern thread_local shared_ptr<abstract_type> bytes_type;
|
||||
extern thread_local shared_ptr<abstract_type> utf8_type;
|
||||
extern thread_local shared_ptr<abstract_type> boolean_type;
|
||||
|
||||
template <>
|
||||
inline
|
||||
|
||||
34
db_clock.hh
Normal file
34
db_clock.hh
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2014 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#ifndef DB_CLOCK_HH_
|
||||
#define DB_CLOCK_HH_
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
|
||||
// the database clock follows Java - 1ms granularity, 64-bit counter, 1970 epoch
|
||||
|
||||
class db_clock {
|
||||
using base = std::chrono::system_clock;
|
||||
public:
|
||||
using rep = int64_t;
|
||||
using period = std::ratio<1, 1000>; // milliseconds
|
||||
using duration = std::chrono::duration<rep, period>;
|
||||
using time_point = std::chrono::time_point<db_clock, duration>;
|
||||
|
||||
static constexpr bool is_steady = base::is_steady;
|
||||
static std::time_t to_time_t(time_point t) {
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(t.time_since_epoch()).count();
|
||||
}
|
||||
static time_point from_time_t(std::time_t t) {
|
||||
return time_point(std::chrono::duration_cast<duration>(std::chrono::seconds(t)));
|
||||
}
|
||||
static time_point now() {
|
||||
auto now_since_epoch = base::now() - base::from_time_t(0);
|
||||
return time_point(std::chrono::duration_cast<duration>(now_since_epoch));
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* DB_CLOCK_HH_ */
|
||||
Reference in New Issue
Block a user