Files
scylladb/database.hh
Vlad Zolotarov 3520d4de10 locator: introduce a global distributed<snitch_ptr> i_endpoint_snitch::snitch_instance()
Snitch class semantics defined to be per-Node. To make it so we
introduce here a static member in an i_endpoint_snitch class that
has to contain the pointer to the relevant snitch class instance.

Since the snitch contents are not always pure const it has to be per
shard, therefore we'll make it a "distributed". All the I/O is going
to take place on a single shard and if there are changes - they are going
to be propagated to the rest of the shards.

The application is responsible to initialize this distributed<shnitch>
before it's used for the first time.

This patch effectively reverts most of the "locator: futurize
snitch creation" a2594015f9 patch - the part that modifies the
code that was creating the snitch instance. Since snitch is
created explicitly by the application and all the rest of the code
simply assumes that the above global is initialized we won't need
all those changes any more and the code will get back to be nice and simple
as it was before the patch above.

So, to summarize, this patch does the following:
   - Reverts the changes introduced by a2594015f9 related to the fact that
     every time a replication strategy was created there should have been created
     a snitch that would have been stored in this strategy object. More specifically,
     methods like keyspace::create_replication_strategy() do not return a future<>
     any more and this allows to simplify the code that calls it significantly.
   - Introduce the global distributed<snitch_ptr> object:
      - It belongs to the i_endpoint_snitch class.
      - There has been added a corresponding interface to access both global and
        shard-local instances.
      - locator::abstract_replication_strategy::create_replication_strategy() does
        not accept snitch_ptr&& - it'll get and pass the corresponding shard-local
        instance of the snitch to the replication strategy's constructor by itself.
      - Adjusted the existing snitch infrastructure to the new semantics:
         - Modified the create_snitch() to create and start all per-shard snitch
           instances and update the global variable.
         - Introduced a static i_endpoint_snitch::stop_snitch() function that properly
           stops the global distributed snitch.
         - Added the code to the gossiping_property_file_snitch that distributes the
           changed data to all per-shard snitch objects.
         - Made all existing snitches classes properly maintain their state in order
           to be able to shut down cleanly.
         - Patched both urchin and cql_query_test to initialize a snitch instance before
           all other services.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>

New in v6:
   - Rebased to the current master.
   - Extended a commit message a little - the summary.

New in v5:
   - database::create_keyspace(): added a missing _keyspaces.emplace()

New in v4:
   - Kept the database::create_keyspace() to return future<> by Glauber's request
     and added a description to this method that needs to be changed when Glauber
     adds his bits that require this interface.
2015-06-22 23:18:31 +03:00

401 lines
15 KiB
C++

/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#ifndef DATABASE_HH_
#define DATABASE_HH_
#include "dht/i_partitioner.hh"
#include "locator/abstract_replication_strategy.hh"
#include "core/sstring.hh"
#include "core/shared_ptr.hh"
#include "net/byteorder.hh"
#include "utils/UUID.hh"
#include "utils/hash.hh"
#include "db_clock.hh"
#include "gc_clock.hh"
#include "core/distributed.hh"
#include <functional>
#include <cstdint>
#include <unordered_map>
#include <map>
#include <set>
#include <iostream>
#include <boost/functional/hash.hpp>
#include <experimental/optional>
#include <string.h>
#include "types.hh"
#include "compound.hh"
#include "core/future.hh"
#include "core/gate.hh"
#include "cql3/column_specification.hh"
#include "db/commitlog/replay_position.hh"
#include <limits>
#include <cstddef>
#include "schema.hh"
#include "timestamp.hh"
#include "tombstone.hh"
#include "atomic_cell.hh"
#include "query-request.hh"
#include "query-result.hh"
#include "keys.hh"
#include "mutation.hh"
#include "memtable.hh"
#include <list>
#include "mutation_reader.hh"
class frozen_mutation;
namespace sstables {
class sstable;
}
namespace db {
template<typename T>
class serializer;
class commitlog;
class config;
}
class replay_position_reordered_exception : public std::exception {};
using memtable_list = std::vector<lw_shared_ptr<memtable>>;
using sstable_list = std::map<unsigned long, lw_shared_ptr<sstables::sstable>>;
class column_family {
public:
struct config {
sstring datadir;
bool enable_disk_writes = true;
bool enable_disk_reads = true;
};
private:
schema_ptr _schema;
config _config;
lw_shared_ptr<memtable_list> _memtables;
// generation -> sstable. Ordered by key so we can easily get the most recent.
lw_shared_ptr<sstable_list> _sstables;
unsigned _sstable_generation = 1;
unsigned _mutation_count = 0;
db::replay_position _highest_flushed_rp;
private:
void add_sstable(sstables::sstable&& sstable);
void add_memtable();
memtable& active_memtable() { return *_memtables->back(); }
struct merge_comparator;
private:
// Creates a mutation reader which covers sstables.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// The 'range' parameter must be live as long as the reader is used.
mutation_reader make_sstable_reader(const query::partition_range& range) const;
public:
// Creates a mutation reader which covers all data sources for this column family.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// Note: for data queries use query() instead.
// The 'range' parameter must be live as long as the reader is used.
mutation_reader make_reader(const query::partition_range& range = query::full_partition_range) const;
// Queries can be satisfied from multiple data sources, so they are returned
// as temporaries.
//
// FIXME: in case a query is satisfied from a single memtable, avoid a copy
using const_mutation_partition_ptr = std::unique_ptr<const mutation_partition>;
using const_row_ptr = std::unique_ptr<const row>;
public:
column_family(schema_ptr schema, config cfg);
column_family(column_family&&);
~column_family();
schema_ptr schema() const { return _schema; }
future<const_mutation_partition_ptr> find_partition(const dht::decorated_key& key) const;
future<const_mutation_partition_ptr> find_partition_slow(const partition_key& key) const;
future<const_row_ptr> find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const;
void apply(const frozen_mutation& m, const db::replay_position& = db::replay_position(), database* = nullptr);
void apply(const mutation& m, const db::replay_position& = db::replay_position(), database* = nullptr);
// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges) const;
future<> populate(sstring datadir);
void seal_active_memtable(database* = nullptr);
future<> stop(database* db = nullptr) {
seal_active_memtable(db);
return _in_flight_seals.close();
}
private:
seastar::gate _in_flight_seals;
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
template <typename Func>
future<bool> for_all_partitions(Func&& func) const;
future<> probe_file(sstring sstdir, sstring fname);
void seal_on_overflow(database*);
void check_valid_rp(const db::replay_position&) const;
public:
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
future<bool> for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
};
class user_types_metadata {
std::unordered_map<bytes, user_type> _user_types;
public:
user_type get_type(bytes name) const {
return _user_types.at(name);
}
const std::unordered_map<bytes, user_type>& get_all_types() const {
return _user_types;
}
void add_type(user_type type) {
auto i = _user_types.find(type->_name);
assert(i == _user_types.end() || type->is_compatible_with(*i->second));
_user_types[type->_name] = std::move(type);
}
void remove_type(user_type type) {
_user_types.erase(type->_name);
}
};
class keyspace_metadata final {
sstring _name;
sstring _strategy_name;
std::map<sstring, sstring> _strategy_options;
std::unordered_map<sstring, schema_ptr> _cf_meta_data;
bool _durable_writes;
lw_shared_ptr<user_types_metadata> _user_types;
public:
keyspace_metadata(sstring name,
sstring strategy_name,
std::map<sstring, sstring> strategy_options,
bool durable_writes,
std::vector<schema_ptr> cf_defs = std::vector<schema_ptr>{},
lw_shared_ptr<user_types_metadata> user_types = make_lw_shared<user_types_metadata>())
: _name{std::move(name)}
, _strategy_name{strategy_name.empty() ? "NetworkTopologyStrategy" : strategy_name}
, _strategy_options{std::move(strategy_options)}
, _durable_writes{durable_writes}
, _user_types{std::move(user_types)}
{
for (auto&& s : cf_defs) {
_cf_meta_data.emplace(s->cf_name(), s);
}
}
static lw_shared_ptr<keyspace_metadata>
new_keyspace(sstring name,
sstring strategy_name,
std::map<sstring, sstring> options,
bool durables_writes,
std::vector<schema_ptr> cf_defs = std::vector<schema_ptr>{})
{
return ::make_lw_shared<keyspace_metadata>(name, strategy_name, options, durables_writes, cf_defs);
}
const sstring& name() const {
return _name;
}
const sstring& strategy_name() const {
return _strategy_name;
}
const std::map<sstring, sstring>& strategy_options() const {
return _strategy_options;
}
const std::unordered_map<sstring, schema_ptr>& cf_meta_data() const {
return _cf_meta_data;
}
bool durable_writes() const {
return _durable_writes;
}
const lw_shared_ptr<user_types_metadata>& user_types() const {
return _user_types;
}
void add_column_family(const schema_ptr& s) {
_cf_meta_data.emplace(s->cf_name(), s);
}
};
class keyspace {
public:
struct config {
sstring datadir;
bool enable_disk_reads = true;
bool enable_disk_writes = true;
};
private:
std::unique_ptr<locator::abstract_replication_strategy> _replication_strategy;
lw_shared_ptr<keyspace_metadata> _metadata;
config _config;
public:
explicit keyspace(lw_shared_ptr<keyspace_metadata> metadata, config cfg)
: _metadata(std::move(metadata))
, _config(std::move(cfg))
{}
user_types_metadata _user_types;
const lw_shared_ptr<keyspace_metadata>& metadata() const {
return _metadata;
}
void create_replication_strategy(const std::map<sstring, sstring>& options);
locator::abstract_replication_strategy& get_replication_strategy();
column_family::config make_column_family_config(const schema& s) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
void add_column_family(const schema_ptr& s) {
_metadata->add_column_family(s);
}
// FIXME to allow simple registration at boostrap
void set_replication_strategy(std::unique_ptr<locator::abstract_replication_strategy> replication_strategy);
private:
sstring column_family_directory(const sstring& name, utils::UUID uuid) const;
};
class no_such_keyspace : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
class no_such_column_family : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
// Policy for distributed<database>:
// broadcast metadata writes
// local metadata reads
// use shard_of() for data
class database {
std::unordered_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, column_family> _column_families;
std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid;
std::unique_ptr<db::commitlog> _commitlog;
std::unique_ptr<db::config> _cfg;
future<> init_commitlog();
future<> apply_in_memory(const frozen_mutation&, const db::replay_position&);
future<> populate(sstring datadir);
future<> populate_keyspace(sstring datadir, sstring ks_name);
public:
database();
database(const db::config&);
database(database&&) = default;
~database();
db::commitlog* commitlog() const {
return _commitlog.get();
}
future<> init_from_data_directory();
// but see: create_keyspace(distributed<database>&, sstring)
void add_keyspace(sstring name, keyspace k);
/** Adds cf with auto-generated UUID. */
void add_column_family(column_family&&);
void add_column_family(const utils::UUID&, column_family&&);
void update_column_family(const sstring& ks_name, const sstring& cf_name);
void drop_column_family(const sstring& ks_name, const sstring& cf_name);
/* throws std::out_of_range if missing */
const utils::UUID& find_uuid(const sstring& ks, const sstring& cf) const throw (std::out_of_range);
const utils::UUID& find_uuid(const schema_ptr&) const throw (std::out_of_range);
/**
* Creates a keyspace for a given metadata if it still doesn't exist.
*
* Fixme: this interface is left to return a future by a Glauber's request
* since it's upcoming patch is going to need this. Currently there is no
* reason for it to return a future. Therefore there should be a proper
* comment here describing the reason why this function returns a future
* since running a "git blame" on this function's header would give a
* confusing result about the reason it became futurized.
*
* @return ready future when the operation is complete
*/
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&);
/* below, find_keyspace throws no_such_<type> on fail */
keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace);
const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace);
bool has_keyspace(const sstring& name) const;
void update_keyspace(const sstring& name);
void drop_keyspace(const sstring& name);
const auto& keyspaces() const { return _keyspaces; }
column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family);
const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family);
column_family& find_column_family(const utils::UUID&) throw (no_such_column_family);
const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family);
column_family& find_column_family(const schema_ptr&) throw (no_such_column_family);
const column_family& find_column_family(const schema_ptr&) const throw (no_such_column_family);
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family);
schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family);
bool has_schema(const sstring& ks_name, const sstring& cf_name) const;
std::set<sstring> existing_index_names(const sstring& cf_to_exclude = sstring()) const;
future<> stop();
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);
unsigned shard_of(const frozen_mutation& m);
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges);
future<> apply(const frozen_mutation&);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm) const;
const sstring& get_snitch_name() const;
friend std::ostream& operator<<(std::ostream& out, const database& db);
friend future<> create_keyspace(distributed<database>&, const lw_shared_ptr<keyspace_metadata>&);
const std::unordered_map<sstring, keyspace>& get_keyspaces() const {
return _keyspaces;
}
const std::unordered_map<utils::UUID, column_family>& get_column_families() const {
return _column_families;
}
const std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash>&
get_column_families_mapping() const {
return _ks_cf_to_uuid;
}
};
// Creates a keyspace. Keyspaces have a non-sharded
// component (the directory), so a global function is needed.
future<> create_keyspace(distributed<database>& db, const lw_shared_ptr<keyspace_metadata>&);
// FIXME: stub
class secondary_index_manager {};
inline
void
column_family::apply(const mutation& m, const db::replay_position& rp, database* db) {
active_memtable().apply(m, rp);
seal_on_overflow(db);
}
inline
void
column_family::seal_on_overflow(database* db) {
// FIXME: something better
if (++_mutation_count == 100000) {
_mutation_count = 0;
seal_active_memtable(db);
}
}
inline
void
column_family::check_valid_rp(const db::replay_position& rp) const {
if (rp < _highest_flushed_rp) {
throw replay_position_reordered_exception();
}
}
inline
void
column_family::apply(const frozen_mutation& m, const db::replay_position& rp, database* db) {
check_valid_rp(rp);
active_memtable().apply(m, rp);
seal_on_overflow(db);
}
#endif /* DATABASE_HH_ */