/* * Copyright (C) 2014 Cloudius Systems, Ltd. */ #include "log.hh" #include "database.hh" #include "net/byteorder.hh" #include "db_clock.hh" #include "core/future-util.hh" thread_local logging::logger dblog("database"); template bool abstract_type::default_less(const bytes& v1, const bytes& v2, Compare compare) { auto o1 = deserialize(v1); auto o2 = deserialize(v2); if (!o1) { return bool(o2); } if (!o2) { return false; } auto& x1 = boost::any_cast(*o1); auto& x2 = boost::any_cast(*o2); return compare(x1, x2); } template struct simple_type_impl : abstract_type { simple_type_impl(sstring name) : abstract_type(std::move(name)) {} virtual bool less(const bytes& v1, const bytes& v2) override { return default_less(v1, v2); } }; struct int32_type_impl : simple_type_impl { int32_type_impl() : simple_type_impl("int32") {} virtual void serialize(const boost::any& value, std::ostream& out) override { auto v = boost::any_cast(value); auto u = net::hton(uint32_t(v)); out.write(reinterpret_cast(&u), sizeof(u)); } virtual object_opt deserialize(std::istream& in) { uint32_t u; auto n = in.rdbuf()->sgetn(reinterpret_cast(&u), sizeof(u)); if (!n) { return {}; } if (n != 4) { throw marshal_exception(); } auto v = int32_t(net::ntoh(u)); return boost::any(v); } }; struct long_type_impl : simple_type_impl { long_type_impl() : simple_type_impl("long") {} virtual void serialize(const boost::any& value, std::ostream& out) override { auto v = boost::any_cast(value); auto u = net::hton(uint64_t(v)); out.write(reinterpret_cast(&u), sizeof(u)); } virtual object_opt deserialize(std::istream& in) { uint64_t u; auto n = in.rdbuf()->sgetn(reinterpret_cast(&u), sizeof(u)); if (!n) { return {}; } if (n != 8) { throw marshal_exception(); } auto v = int64_t(net::ntoh(u)); return boost::any(v); } }; struct string_type_impl : public abstract_type { string_type_impl(sstring name) : abstract_type(name) {} virtual void serialize(const boost::any& value, std::ostream& out) override { auto& v = boost::any_cast(value); out.write(v.c_str(), v.size()); } virtual object_opt deserialize(std::istream& in) { std::vector tmp(std::istreambuf_iterator(in.rdbuf()), std::istreambuf_iterator()); // FIXME: validation? return boost::any(sstring(tmp.data(), tmp.size())); } virtual bool less(const bytes& v1, const bytes& v2) override { return less_unsigned(v1, v2); } }; struct bytes_type_impl : public abstract_type { bytes_type_impl() : abstract_type("bytes") {} virtual void serialize(const boost::any& value, std::ostream& out) override { auto& v = boost::any_cast(value); out.write(v.c_str(), v.size()); } virtual object_opt deserialize(std::istream& in) { std::vector tmp(std::istreambuf_iterator(in.rdbuf()), std::istreambuf_iterator()); return boost::any(bytes(reinterpret_cast(tmp.data()), tmp.size())); } virtual bool less(const bytes& v1, const bytes& v2) override { return less_unsigned(v1, v2); } }; struct boolean_type_impl : public simple_type_impl { boolean_type_impl() : simple_type_impl("boolean") {} virtual void serialize(const boost::any& value, std::ostream& out) override { auto v = boost::any_cast(value); char c = v; out.put(c); } virtual object_opt deserialize(std::istream& in) override { char tmp; auto n = in.rdbuf()->sgetn(&tmp, 1); if (n == 0) { return {}; } return boost::any(tmp != 0); } }; struct date_type_impl : public abstract_type { date_type_impl() : abstract_type("date") {} virtual void serialize(const boost::any& value, std::ostream& out) override { auto v = boost::any_cast(value); int64_t i = v.time_since_epoch().count(); i = net::hton(uint64_t(i)); out.write(reinterpret_cast(&i), 8); } virtual object_opt deserialize(std::istream& in) override { int64_t tmp; auto n = in.rdbuf()->sgetn(reinterpret_cast(&tmp), 8); if (n == 0) { return {}; } if (n != 8) { throw marshal_exception(); } tmp = net::ntoh(uint64_t(tmp)); return boost::any(db_clock::time_point(db_clock::duration(tmp))); } virtual bool less(const bytes& b1, const bytes& b2) override { // DateType has a bug where it compares the values as an unsigned type. // Preserve this bug. return default_less(b1, b2, [] (db_clock::time_point t1, db_clock::time_point t2) { return uint64_t(t1.time_since_epoch().count() < t2.time_since_epoch().count()); }); } }; struct timeuuid_type_impl : public abstract_type { timeuuid_type_impl() : abstract_type("timeuuid") {} virtual void serialize(const boost::any& value, std::ostream& out) override { // FIXME: optimize auto& uuid = boost::any_cast(value); out.write(to_bytes(uuid).begin(), 16); } virtual object_opt deserialize(std::istream& in) override { struct tmp { uint64_t msb, lsb; } t; auto n = in.rdbuf()->sgetn(reinterpret_cast(&t), 16); if (n == 0) { return {}; } if (n != 16) { throw marshal_exception(); } return boost::any(utils::UUID(net::ntoh(t.msb), net::ntoh(t.lsb))); } virtual bool less(const bytes& b1, const bytes& b2) override { if (b1.empty()) { return b2.empty() ? false : true; } if (b2.empty()) { return false; } auto r = compare_bytes(b1, b2); if (r != 0) { return r < 0; } else { return std::lexicographical_compare(b1.begin(), b1.end(), b2.begin(), b2.end()); } } private: static int compare_bytes(const bytes& o1, const bytes& o2) { auto compare_pos = [&] (unsigned pos, int mask, int ifequal) { int d = (o1[pos] & mask) - (o2[pos] & mask); return d ? d : ifequal; }; return compare_pos(6, 0xf, compare_pos(7, 0xff, compare_pos(4, 0xff, compare_pos(5, 0xff, compare_pos(0, 0xff, compare_pos(1, 0xff, compare_pos(2, 0xff, compare_pos(3, 0xff, 0)))))))); } friend class uuid_type_impl; }; struct timestamp_type_impl : simple_type_impl { timestamp_type_impl() : simple_type_impl("timestamp") {} virtual void serialize(const boost::any& value, std::ostream& out) override { uint64_t v = boost::any_cast(value).time_since_epoch().count(); v = net::hton(v); out.write(reinterpret_cast(&v), 8); } virtual object_opt deserialize(std::istream& is) override { uint64_t v; auto n = is.rdbuf()->sgetn(reinterpret_cast(&v), 8); if (n == 0) { return {}; } if (n != 8) { throw marshal_exception(); } return boost::any(db_clock::time_point(db_clock::duration(net::ntoh(v)))); } // FIXME: isCompatibleWith(timestampuuid) }; struct uuid_type_impl : abstract_type { uuid_type_impl() : abstract_type("uuid") {} virtual void serialize(const boost::any& value, std::ostream& out) override { // FIXME: optimize auto& uuid = boost::any_cast(value); out.write(to_bytes(uuid).begin(), 16); } virtual object_opt deserialize(std::istream& in) override { struct tmp { uint64_t msb, lsb; } t; auto n = in.rdbuf()->sgetn(reinterpret_cast(&t), 16); if (n == 0) { return {}; } if (n != 16) { throw marshal_exception(); } return boost::any(utils::UUID(net::ntoh(t.msb), net::ntoh(t.lsb))); } virtual bool less(const bytes& b1, const bytes& b2) override { if (b1.size() < 16) { return b2.size() < 16 ? false : true; } if (b2.size() < 16) { return false; } auto v1 = (b1[6] >> 4) & 0x0f; auto v2 = (b2[6] >> 4) & 0x0f; if (v1 != v2) { return v1 < v2; } if (v1 == 1) { auto c1 = timeuuid_type_impl::compare_bytes(b1, b2); auto c2 = timeuuid_type_impl::compare_bytes(b2, b1); // Require strict ordering if (c1 != c2) { return c1; } } return less_unsigned(b1, b2); } // FIXME: isCompatibleWith(uuid) }; thread_local shared_ptr int32_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii")); thread_local shared_ptr bytes_type(make_shared()); thread_local shared_ptr utf8_type(make_shared("utf8")); thread_local shared_ptr boolean_type(make_shared()); thread_local shared_ptr date_type(make_shared()); thread_local shared_ptr timeuuid_type(make_shared()); thread_local shared_ptr timestamp_type(make_shared()); thread_local shared_ptr uuid_type(make_shared()); partition::partition(column_family& cf) : rows(key_compare(cf.clustering_key_type)) { } column_family::column_family(shared_ptr partition_key_type, shared_ptr clustering_key_type) : partition_key_type(std::move(partition_key_type)) , clustering_key_type(std::move(clustering_key_type)) , partitions(key_compare(this->partition_key_type)) { } partition* column_family::find_partition(const bytes& key) { auto i = partitions.find(key); return i == partitions.end() ? nullptr : &i->second; } row* column_family::find_row(const bytes& partition_key, const bytes& clustering_key) { partition* p = find_partition(partition_key); if (!p) { return nullptr; } auto i = p->rows.find(clustering_key); return i == p->rows.end() ? nullptr : &i->second; } partition& column_family::find_or_create_partition(const bytes& key) { // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key); if (i == partitions.end() || key != i->first) { i = partitions.emplace_hint(i, std::make_pair(std::move(key), partition(*this))); } return i->second; } row& column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) { partition& p = find_or_create_partition(partition_key); // call lower_bound so we have a hint for the insert, just in case. auto i = p.rows.lower_bound(clustering_key); if (i == p.rows.end() || clustering_key != i->first) { i = p.rows.emplace_hint(i, std::make_pair(std::move(clustering_key), row())); } return i->second; } sstring to_hex(const bytes& b) { static char digits[] = "0123456789abcdef"; sstring out(sstring::initialized_later(), b.size() * 2); unsigned end = b.size(); for (unsigned i = 0; i != end; ++i) { uint8_t x = b[i]; out[2*i] = digits[x >> 4]; out[2*i+1] = digits[x & 0xf]; } return out; } sstring to_hex(const bytes_opt& b) { return b ? "null" : to_hex(*b); } class lister { file _f; std::function (directory_entry de)> _walker; directory_entry_type _expected_type; subscription _listing; public: lister(file f, directory_entry_type type, std::function (directory_entry)> walker) : _f(std::move(f)) , _walker(std::move(walker)) , _expected_type(type) , _listing(_f.list_directory([this] (directory_entry de) { return _visit(de); })) { } static future<> scan_dir(sstring name, directory_entry_type type, std::function (directory_entry)> walker); protected: future<> _visit(directory_entry de) { // FIXME: stat and try to recover if (!de.type) { dblog.error("database found file with unknown type {}", de.name); return make_ready_future<>(); } // Hide all synthetic directories and hidden files. if ((de.type != _expected_type) || (de.name[0] == '.')) { return make_ready_future<>(); } return _walker(de); } future<> done() { return _listing.done(); } }; future<> lister::scan_dir(sstring name, directory_entry_type type, std::function (directory_entry)> walker) { return engine.open_directory(name).then([type, walker = std::move(walker)] (file f) { auto l = make_lw_shared(std::move(f), type, walker); return l->done().then([l] { }); }); } static std::vector parse_fname(sstring filename) { std::vector comps; boost::split(comps , filename ,boost::is_any_of(".-")); return comps; } future keyspace::populate(sstring ksdir) { auto ks = make_lw_shared(); return lister::scan_dir(ksdir, directory_entry_type::directory, [ks, ksdir] (directory_entry de) { auto comps = parse_fname(de.name); if (comps.size() != 2) { dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name); return make_ready_future<>(); } sstring cfname = comps[0]; auto sstdir = ksdir + "/" + de.name; dblog.warn("Keyspace {}: Reading CF {} ", ksdir, comps[0]); return make_ready_future<>(); }).then([ks] { return make_ready_future(std::move(*ks)); }); } future database::populate(sstring datadir) { auto db = make_lw_shared(); return lister::scan_dir(datadir, directory_entry_type::directory, [db, datadir] (directory_entry de) { dblog.warn("Populating Keyspace {}", de.name); auto ksdir = datadir + "/" + de.name; return keyspace::populate(ksdir).then([db, de] (keyspace ks){ db->keyspaces[de.name] = std::move(ks); }); }).then([db] { return make_ready_future(std::move(*db)); }); }