Merge seastar-dev avi/smp
This initial attemt at sharding broadcasts all writes while directing reads to the local shards. Further refinements will keep schema updates broadcasted, but will unicast row mutations to their shard (and conversely convert row reads from reading the local shard to unicast as well). Of particular interest is the change to the thrift handler, where a sequential application of mutations is converted to parallel application, in order to hide SMP latency and improve batching.
This commit is contained in:
@@ -88,7 +88,7 @@ query_processor::get_statement(const sstring_view& query, service::client_state&
|
||||
#if 0
|
||||
Tracing.trace("Preparing statement");
|
||||
#endif
|
||||
return statement->prepare(_db);
|
||||
return statement->prepare(_db.local());
|
||||
}
|
||||
|
||||
::shared_ptr<parsed_statement>
|
||||
|
||||
@@ -32,15 +32,16 @@
|
||||
#include "cql3/statements/cf_statement.hh"
|
||||
#include "service/query_state.hh"
|
||||
#include "log.hh"
|
||||
#include "core/distributed.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class query_processor {
|
||||
private:
|
||||
service::storage_proxy& _proxy;
|
||||
database& _db;
|
||||
distributed<database>& _db;
|
||||
public:
|
||||
query_processor(service::storage_proxy& proxy, database& db) : _proxy(proxy), _db(db) {}
|
||||
query_processor(service::storage_proxy& proxy, distributed<database>& db) : _proxy(proxy), _db(db) {}
|
||||
|
||||
#if 0
|
||||
public static final SemanticVersion CQL_VERSION = new SemanticVersion("3.2.0");
|
||||
|
||||
37
database.cc
37
database.cc
@@ -204,6 +204,13 @@ future<database> database::populate(sstring datadir) {
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
database::init_from_data_directory(sstring datadir) {
|
||||
return populate(datadir).then([this] (database&& db) {
|
||||
*this = db;
|
||||
});
|
||||
}
|
||||
|
||||
column_definition::column_definition(bytes name, data_type type, column_id id, column_kind kind)
|
||||
: _name(std::move(name))
|
||||
, type(std::move(type))
|
||||
@@ -257,9 +264,9 @@ database::find_keyspace(const sstring& name) {
|
||||
}
|
||||
|
||||
void
|
||||
column_family::apply(mutation&& m) {
|
||||
mutation_partition& p = find_or_create_partition(std::move(m.key));
|
||||
p.apply(_schema, std::move(m.p));
|
||||
column_family::apply(const mutation& m) {
|
||||
mutation_partition& p = find_or_create_partition(m.key);
|
||||
p.apply(_schema, m.p);
|
||||
}
|
||||
|
||||
// Based on org.apache.cassandra.db.AbstractCell#reconcile()
|
||||
@@ -299,39 +306,39 @@ compare_for_merge(const column_definition& def,
|
||||
}
|
||||
|
||||
void
|
||||
mutation_partition::apply(schema_ptr schema, mutation_partition&& p) {
|
||||
mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
|
||||
_tombstone.apply(p._tombstone);
|
||||
|
||||
for (auto&& entry : p._row_tombstones) {
|
||||
apply_row_tombstone(schema, std::move(entry));
|
||||
apply_row_tombstone(schema, entry);
|
||||
}
|
||||
|
||||
auto merge_cells = [this, schema] (row& old_row, row&& new_row) {
|
||||
auto merge_cells = [this, schema] (row& old_row, const row& new_row) {
|
||||
for (auto&& new_column : new_row) {
|
||||
auto col = new_column.first;
|
||||
auto i = old_row.find(col);
|
||||
if (i == old_row.end()) {
|
||||
_static_row.emplace_hint(i, std::move(new_column));
|
||||
_static_row.emplace_hint(i, new_column);
|
||||
} else {
|
||||
auto& old_column = *i;
|
||||
auto& def = schema->regular_column_at(col);
|
||||
if (compare_for_merge(def, old_column, new_column) < 0) {
|
||||
old_column.second = std::move(new_column.second);
|
||||
old_column.second = new_column.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
merge_cells(_static_row, std::move(p._static_row));
|
||||
merge_cells(_static_row, p._static_row);
|
||||
|
||||
for (auto&& entry : p._rows) {
|
||||
auto& key = entry.first;
|
||||
auto i = _rows.find(key);
|
||||
if (i == _rows.end()) {
|
||||
_rows.emplace_hint(i, std::move(entry));
|
||||
_rows.emplace_hint(i, entry);
|
||||
} else {
|
||||
i->second.t.apply(entry.second.t);
|
||||
merge_cells(i->second.cells, std::move(entry.second.cells));
|
||||
merge_cells(i->second.cells, entry.second.cells);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -388,3 +395,11 @@ bool column_definition::is_compact_value() const {
|
||||
unimplemented::compact_tables();
|
||||
return false;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const mutation& m) {
|
||||
return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), m.key, m.p);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const mutation_partition& mp) {
|
||||
return fprint(os, "{mutation_partition: ...}");
|
||||
}
|
||||
|
||||
18
database.hh
18
database.hh
@@ -162,13 +162,14 @@ public:
|
||||
apply_row_tombstone(schema, {std::move(prefix), std::move(t)});
|
||||
}
|
||||
void apply_row_tombstone(schema_ptr schema, std::pair<bytes, tombstone> row_tombstone);
|
||||
void apply(schema_ptr schema, mutation_partition&& p);
|
||||
void apply(schema_ptr schema, const mutation_partition& p);
|
||||
const row_tombstone_set& row_tombstones() const { return _row_tombstones; }
|
||||
row& static_row() { return _static_row; }
|
||||
row& clustered_row(const clustering_key& key) { return _rows[key].cells; }
|
||||
row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; }
|
||||
row* find_row(const clustering_key& key);
|
||||
tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key);
|
||||
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
|
||||
};
|
||||
|
||||
class mutation final {
|
||||
@@ -184,7 +185,7 @@ public:
|
||||
{ }
|
||||
|
||||
mutation(mutation&&) = default;
|
||||
mutation(const mutation&) = delete;
|
||||
mutation(const mutation&) = default;
|
||||
|
||||
void set_static_cell(const column_definition& def, boost::any value) {
|
||||
p.static_row()[def.id] = std::move(value);
|
||||
@@ -199,6 +200,7 @@ public:
|
||||
auto& row = p.clustered_row(key);
|
||||
row[def.id] = std::move(value);
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
|
||||
};
|
||||
|
||||
struct column_family {
|
||||
@@ -210,7 +212,7 @@ struct column_family {
|
||||
schema_ptr _schema;
|
||||
// partition key -> partition
|
||||
std::map<bytes, mutation_partition, key_compare> partitions;
|
||||
void apply(mutation&& m);
|
||||
void apply(const mutation& m);
|
||||
};
|
||||
|
||||
class keyspace {
|
||||
@@ -221,12 +223,20 @@ public:
|
||||
column_family* find_column_family(const sstring& cf_name);
|
||||
};
|
||||
|
||||
// Policy for distributed<database>:
|
||||
// broadcast writes
|
||||
// local reads
|
||||
|
||||
class database {
|
||||
public:
|
||||
std::unordered_map<sstring, keyspace> keyspaces;
|
||||
future<> init_from_data_directory(sstring datadir);
|
||||
static future<database> populate(sstring datadir);
|
||||
keyspace* find_keyspace(const sstring& name);
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
void assign(database&& db) {
|
||||
*this = std::move(db);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
#endif /* DATABASE_HH_ */
|
||||
|
||||
10
main.cc
10
main.cc
@@ -19,6 +19,7 @@ int main(int ac, char** av) {
|
||||
("datadir", bpo::value<std::string>()->default_value("/var/lib/cassandra/data"), "data directory");
|
||||
|
||||
auto server = std::make_unique<distributed<thrift_server>>();;
|
||||
distributed<database> db;
|
||||
|
||||
return app.run(ac, av, [&] {
|
||||
auto&& config = app.configuration();
|
||||
@@ -26,16 +27,17 @@ int main(int ac, char** av) {
|
||||
uint16_t cql_port = config["cql-port"].as<uint16_t>();
|
||||
sstring datadir = config["datadir"].as<std::string>();
|
||||
|
||||
return database::populate(datadir).then([cql_port, thrift_port] (database db) {
|
||||
auto pdb = new database(std::move(db));
|
||||
return db.start().then([datadir, &db] {
|
||||
return db.invoke_on_all(&database::init_from_data_directory, datadir);
|
||||
}).then([&db, cql_port, thrift_port] {
|
||||
auto cserver = new distributed<cql_server>;
|
||||
cserver->start(std::ref(*pdb)).then([server = std::move(cserver), cql_port] () mutable {
|
||||
cserver->start(std::ref(db)).then([server = std::move(cserver), cql_port] () mutable {
|
||||
server->invoke_on_all(&cql_server::listen, ipv4_addr{cql_port});
|
||||
}).then([cql_port] {
|
||||
std::cout << "CQL server listening on port " << cql_port << " ...\n";
|
||||
});
|
||||
auto tserver = new distributed<thrift_server>;
|
||||
tserver->start(std::ref(*pdb)).then([server = std::move(tserver), thrift_port] () mutable {
|
||||
tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port] () mutable {
|
||||
server->invoke_on_all(&thrift_server::listen, ipv4_addr{thrift_port});
|
||||
}).then([thrift_port] {
|
||||
std::cout << "Thrift server listening on port " << thrift_port << " ...\n";
|
||||
|
||||
@@ -477,18 +477,20 @@ namespace service {
|
||||
future<>
|
||||
storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl) {
|
||||
// FIXME: send it to replicas instead of applying locally
|
||||
for (auto&& m : mutations) {
|
||||
// FIXME: lookup column_family by UUID
|
||||
keyspace* ks = _db.find_keyspace(m.schema->ks_name);
|
||||
assert(ks); // FIXME: load keyspace meta-data from storage
|
||||
column_family* cf = ks->find_column_family(m.schema->cf_name);
|
||||
if (cf) {
|
||||
cf->apply(std::move(m));
|
||||
} else {
|
||||
// TODO: log a warning
|
||||
auto pmut = make_lw_shared(std::move(mutations));
|
||||
return _db.invoke_on_all([pmut, cl] (database& db) {
|
||||
for (auto&& m : *pmut) {
|
||||
// FIXME: lookup column_family by UUID
|
||||
keyspace* ks = db.find_keyspace(m.schema->ks_name);
|
||||
assert(ks); // FIXME: load keyspace meta-data from storage
|
||||
column_family* cf = ks->find_column_family(m.schema->cf_name);
|
||||
if (cf) {
|
||||
cf->apply(m);
|
||||
} else {
|
||||
// TODO: log a warning
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
#if 0
|
||||
Tracing.trace("Determining replicas for mutation");
|
||||
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
|
||||
|
||||
@@ -25,15 +25,16 @@
|
||||
#pragma once
|
||||
|
||||
#include "database.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "db/consistency_level.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
class storage_proxy /*implements StorageProxyMBean*/ {
|
||||
private:
|
||||
database& _db;
|
||||
distributed<database>& _db;
|
||||
public:
|
||||
storage_proxy(database& db) : _db(db) {}
|
||||
storage_proxy(distributed<database>& db) : _db(db) {}
|
||||
|
||||
/**
|
||||
* Use this method to have these Mutations applied
|
||||
|
||||
@@ -2,11 +2,10 @@
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*/
|
||||
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "tests/test-utils.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "core/app-template.hh"
|
||||
|
||||
struct conversation_state {
|
||||
service::storage_proxy proxy;
|
||||
@@ -15,7 +14,7 @@ struct conversation_state {
|
||||
service::query_state query_state;
|
||||
cql3::query_options& options;
|
||||
|
||||
conversation_state(database& db, const sstring& ks_name)
|
||||
conversation_state(distributed<database>& db, const sstring& ks_name)
|
||||
: proxy(db)
|
||||
, qp(proxy, db)
|
||||
, client_state(service::client_state::for_internal_calls())
|
||||
@@ -33,46 +32,48 @@ struct conversation_state {
|
||||
static const sstring ks_name = "ks";
|
||||
static const sstring table_name = "cf";
|
||||
|
||||
static void require_column_has_value(database& db, const sstring& ks_name, const sstring& table_name,
|
||||
static void require_column_has_value(distributed<database>& ddb, const sstring& ks_name, const sstring& table_name,
|
||||
std::vector<boost::any> pk, std::vector<boost::any> ck, const sstring& column_name, boost::any expected)
|
||||
{
|
||||
auto& db = ddb.local();
|
||||
auto ks = db.find_keyspace(ks_name);
|
||||
BOOST_REQUIRE(ks != nullptr);
|
||||
assert(ks != nullptr);
|
||||
auto cf = ks->find_column_family(table_name);
|
||||
BOOST_REQUIRE(cf != nullptr);
|
||||
assert(cf != nullptr);
|
||||
auto schema = cf->_schema;
|
||||
auto p = cf->find_partition(schema->partition_key_type->serialize_value_deep(pk));
|
||||
BOOST_REQUIRE(p != nullptr);
|
||||
assert(p != nullptr);
|
||||
auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck));
|
||||
BOOST_REQUIRE(row != nullptr);
|
||||
assert(row != nullptr);
|
||||
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
|
||||
BOOST_REQUIRE(col_def != nullptr);
|
||||
assert(col_def != nullptr);
|
||||
auto i = row->find(col_def->id);
|
||||
if (i == row->end()) {
|
||||
BOOST_FAIL("column not set");
|
||||
assert(((void)"column not set", 0));
|
||||
}
|
||||
auto& cell = boost::any_cast<const atomic_cell&>(i->second);
|
||||
BOOST_REQUIRE(cell.is_live());
|
||||
BOOST_REQUIRE(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected)));
|
||||
assert(cell.is_live());
|
||||
assert(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected)));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_insert_statement) {
|
||||
auto db = make_shared<database>();
|
||||
|
||||
// CQL: create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));
|
||||
keyspace ks;
|
||||
auto cf_schema = make_lw_shared<schema>(ks_name, table_name,
|
||||
std::vector<schema::column>({{"p1", utf8_type}}),
|
||||
std::vector<schema::column>({{"c1", int32_type}}),
|
||||
std::vector<schema::column>({{"r1", int32_type}}),
|
||||
utf8_type
|
||||
);
|
||||
ks.column_families.emplace(table_name, column_family(cf_schema));
|
||||
db->keyspaces.emplace(ks_name, std::move(ks));
|
||||
|
||||
future<> test_insert_statement() {
|
||||
auto db = make_shared<distributed<database>>();
|
||||
auto state = make_shared<conversation_state>(*db, ks_name);
|
||||
|
||||
return now().then([state, db] {
|
||||
// CQL: create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));
|
||||
return db->start().then([db] {
|
||||
return db->invoke_on_all([] (database& db) {
|
||||
keyspace ks;
|
||||
auto cf_schema = make_lw_shared<schema>(ks_name, table_name,
|
||||
std::vector<schema::column>({{"p1", utf8_type}}),
|
||||
std::vector<schema::column>({{"c1", int32_type}}),
|
||||
std::vector<schema::column>({{"r1", int32_type}}),
|
||||
utf8_type
|
||||
);
|
||||
ks.column_families.emplace(table_name, column_family(cf_schema));
|
||||
db.keyspaces.emplace(ks_name, std::move(ks));
|
||||
});
|
||||
}).then([state, db] {
|
||||
return state->execute_cql("insert into cf (p1, c1, r1) values ('key1', 1, 100);");
|
||||
}).then([state, db] {
|
||||
require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 100);
|
||||
@@ -80,5 +81,14 @@ SEASTAR_TEST_CASE(test_insert_statement) {
|
||||
return state->execute_cql("update cf set r1 = 66 where p1 = 'key1' and c1 = 1;");
|
||||
}).then([state, db] {
|
||||
require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 66);
|
||||
});
|
||||
}).then([db] {
|
||||
return db->stop();
|
||||
}).then([db] {
|
||||
// keep db alive until stop(), above
|
||||
engine().exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
return app_template().run(ac, av, test_insert_statement);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,13 @@
|
||||
* Copyright 2014 Cloudius Systems
|
||||
*/
|
||||
|
||||
// Some thrift headers include other files from within namespaces,
|
||||
// which is totally broken. Include those files here to avoid
|
||||
// breakage:
|
||||
#include <sys/param.h>
|
||||
// end thrift workaround
|
||||
#include "Cassandra.h"
|
||||
#include "core/distributed.hh"
|
||||
#include "database.hh"
|
||||
#include "core/sstring.hh"
|
||||
#include "core/print.hh"
|
||||
@@ -38,12 +44,34 @@ void complete_with_exception(tcxx::function<void (::apache::thrift::TDelayedExce
|
||||
exn_cob(TDelayedException::delayException(make_exception<Ex>(fmt, std::forward<Args>(args)...)));
|
||||
}
|
||||
|
||||
class delayed_exception_wrapper : public ::apache::thrift::TDelayedException {
|
||||
std::exception_ptr _ex;
|
||||
public:
|
||||
delayed_exception_wrapper(std::exception_ptr ex) : _ex(std::move(ex)) {}
|
||||
virtual void throw_it() override {
|
||||
std::rethrow_exception(std::move(_ex));
|
||||
}
|
||||
};
|
||||
|
||||
template <typename... T>
|
||||
void complete(future<T...>& fut,
|
||||
const tcxx::function<void (const T&...)>& cob,
|
||||
const tcxx::function<void (::apache::thrift::TDelayedException* _throw)>& exn_cob) {
|
||||
try {
|
||||
apply(std::move(cob), fut.get());
|
||||
} catch (...) {
|
||||
delayed_exception_wrapper dew(std::current_exception());
|
||||
exn_cob(&dew);
|
||||
}
|
||||
}
|
||||
|
||||
class CassandraAsyncHandler : public CassandraCobSvIf {
|
||||
database& _db;
|
||||
distributed<database>& _db;
|
||||
keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection?
|
||||
sstring _ks_name;
|
||||
sstring _cql_version;
|
||||
public:
|
||||
explicit CassandraAsyncHandler(database& db) : _db(db) {}
|
||||
explicit CassandraAsyncHandler(distributed<database>& db) : _db(db) {}
|
||||
void login(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const AuthenticationRequest& auth_request) {
|
||||
// FIXME: implement
|
||||
return unimplemented(exn_cob);
|
||||
@@ -51,7 +79,8 @@ public:
|
||||
|
||||
void set_keyspace(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
|
||||
try {
|
||||
_ks = &_db.keyspaces.at(keyspace);
|
||||
_ks = &_db.local().keyspaces.at(keyspace);
|
||||
_ks_name = keyspace;
|
||||
cob();
|
||||
} catch (std::out_of_range& e) {
|
||||
return complete_with_exception<InvalidRequestException>(std::move(exn_cob),
|
||||
@@ -183,61 +212,68 @@ public:
|
||||
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "keyspace not set");
|
||||
}
|
||||
static bytes null_clustering_key = to_bytes("");
|
||||
try {
|
||||
for (auto&& key_cf : mutation_map) {
|
||||
bytes key = to_bytes(key_cf.first);
|
||||
const std::map<std::string, std::vector<Mutation>>& cf_mutations_map = key_cf.second;
|
||||
for (auto&& cf_mutations : cf_mutations_map) {
|
||||
sstring cf_name = cf_mutations.first;
|
||||
const std::vector<Mutation>& mutations = cf_mutations.second;
|
||||
auto& cf = lookup_column_family(cf_name);
|
||||
mutation m_to_apply(key, cf._schema);
|
||||
for (const Mutation& m : mutations) {
|
||||
if (m.__isset.column_or_supercolumn) {
|
||||
auto&& cosc = m.column_or_supercolumn;
|
||||
if (cosc.__isset.column) {
|
||||
auto&& col = cosc.column;
|
||||
bytes cname = to_bytes(col.name);
|
||||
auto def = cf._schema->get_column_definition(cname);
|
||||
if (!def) {
|
||||
throw make_exception<InvalidRequestException>("column %s not found", col.name);
|
||||
}
|
||||
if (def->kind != column_definition::column_kind::REGULAR) {
|
||||
throw make_exception<InvalidRequestException>("Column %s is not settable", col.name);
|
||||
}
|
||||
gc_clock::duration ttl;
|
||||
if (col.__isset.ttl) {
|
||||
ttl = std::chrono::duration_cast<gc_clock::duration>(std::chrono::seconds(col.ttl));
|
||||
}
|
||||
if (ttl.count() <= 0) {
|
||||
ttl = cf._schema->default_time_to_live;
|
||||
}
|
||||
auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt();
|
||||
m_to_apply.set_clustered_cell(null_clustering_key, *def,
|
||||
atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}});
|
||||
} else if (cosc.__isset.super_column) {
|
||||
// FIXME: implement
|
||||
} else if (cosc.__isset.counter_column) {
|
||||
// FIXME: implement
|
||||
} else if (cosc.__isset.counter_super_column) {
|
||||
// FIXME: implement
|
||||
} else {
|
||||
throw make_exception<InvalidRequestException>("Empty ColumnOrSuperColumn");
|
||||
// Would like to use move_iterator below, but Mutation is filled with some const stuff.
|
||||
parallel_for_each(mutation_map.begin(), mutation_map.end(),
|
||||
[this] (std::pair<std::string, std::map<std::string, std::vector<Mutation>>> key_cf) {
|
||||
bytes key = to_bytes(key_cf.first);
|
||||
std::map<std::string, std::vector<Mutation>>& cf_mutations_map = key_cf.second;
|
||||
return parallel_for_each(
|
||||
boost::make_move_iterator(cf_mutations_map.begin()),
|
||||
boost::make_move_iterator(cf_mutations_map.end()),
|
||||
[this, key] (std::pair<std::string, std::vector<Mutation>> cf_mutations) {
|
||||
sstring cf_name = cf_mutations.first;
|
||||
const std::vector<Mutation>& mutations = cf_mutations.second;
|
||||
auto& cf = lookup_column_family(cf_name);
|
||||
mutation m_to_apply(key, cf._schema);
|
||||
for (const Mutation& m : mutations) {
|
||||
if (m.__isset.column_or_supercolumn) {
|
||||
auto&& cosc = m.column_or_supercolumn;
|
||||
if (cosc.__isset.column) {
|
||||
auto&& col = cosc.column;
|
||||
bytes cname = to_bytes(col.name);
|
||||
auto def = cf._schema->get_column_definition(cname);
|
||||
if (!def) {
|
||||
throw make_exception<InvalidRequestException>("column %s not found", col.name);
|
||||
}
|
||||
} else if (m.__isset.deletion) {
|
||||
if (def->kind != column_definition::column_kind::REGULAR) {
|
||||
throw make_exception<InvalidRequestException>("Column %s is not settable", col.name);
|
||||
}
|
||||
gc_clock::duration ttl;
|
||||
if (col.__isset.ttl) {
|
||||
ttl = std::chrono::duration_cast<gc_clock::duration>(std::chrono::seconds(col.ttl));
|
||||
}
|
||||
if (ttl.count() <= 0) {
|
||||
ttl = cf._schema->default_time_to_live;
|
||||
}
|
||||
auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt();
|
||||
m_to_apply.set_clustered_cell(null_clustering_key, *def,
|
||||
atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}});
|
||||
} else if (cosc.__isset.super_column) {
|
||||
// FIXME: implement
|
||||
} else if (cosc.__isset.counter_column) {
|
||||
// FIXME: implement
|
||||
} else if (cosc.__isset.counter_super_column) {
|
||||
// FIXME: implement
|
||||
abort();
|
||||
} else {
|
||||
throw make_exception<InvalidRequestException>("Mutation must have either column or deletion");
|
||||
throw make_exception<InvalidRequestException>("Empty ColumnOrSuperColumn");
|
||||
}
|
||||
} else if (m.__isset.deletion) {
|
||||
// FIXME: implement
|
||||
abort();
|
||||
} else {
|
||||
throw make_exception<InvalidRequestException>("Mutation must have either column or deletion");
|
||||
}
|
||||
cf.apply(std::move(m_to_apply));
|
||||
}
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
return exn_cob(TDelayedException::delayException(ex));
|
||||
}
|
||||
cob();
|
||||
return _db.invoke_on_all([this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) {
|
||||
auto& ks = db.keyspaces.at(_ks_name);
|
||||
auto& cf = ks.column_families.at(cf_name);
|
||||
cf.apply(std::move(m_to_apply));
|
||||
});
|
||||
});
|
||||
}).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) {
|
||||
complete(ret, cob, exn_cob);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
void atomic_batch_mutate(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
|
||||
@@ -348,31 +384,37 @@ public:
|
||||
|
||||
void system_add_keyspace(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
|
||||
std::string schema_id = "schema-id"; // FIXME: make meaningful
|
||||
if (_db.keyspaces.count(ks_def.name)) {
|
||||
if (_db.local().keyspaces.count(ks_def.name)) {
|
||||
InvalidRequestException ire;
|
||||
ire.why = sprint("Keyspace %s already exists", ks_def.name);
|
||||
exn_cob(TDelayedException::delayException(ire));
|
||||
}
|
||||
keyspace& ks = _db.keyspaces[ks_def.name];
|
||||
for (const CfDef& cf_def : ks_def.cf_defs) {
|
||||
std::vector<schema::column> partition_key;
|
||||
std::vector<schema::column> clustering_key;
|
||||
std::vector<schema::column> regular_columns;
|
||||
// FIXME: get this from comparator
|
||||
auto column_name_type = utf8_type;
|
||||
// FIXME: look at key_alias and key_validator first
|
||||
partition_key.push_back({"key", bytes_type});
|
||||
// FIXME: guess clustering keys
|
||||
for (const ColumnDef& col_def : cf_def.column_metadata) {
|
||||
// FIXME: look at all fields, not just name
|
||||
regular_columns.push_back({to_bytes(col_def.name), bytes_type});
|
||||
_db.invoke_on_all([this, ks_def = std::move(ks_def)] (database& db) {
|
||||
keyspace& ks = db.keyspaces[ks_def.name];
|
||||
for (const CfDef& cf_def : ks_def.cf_defs) {
|
||||
std::vector<schema::column> partition_key;
|
||||
std::vector<schema::column> clustering_key;
|
||||
std::vector<schema::column> regular_columns;
|
||||
// FIXME: get this from comparator
|
||||
auto column_name_type = utf8_type;
|
||||
// FIXME: look at key_alias and key_validator first
|
||||
partition_key.push_back({"key", bytes_type});
|
||||
// FIXME: guess clustering keys
|
||||
for (const ColumnDef& col_def : cf_def.column_metadata) {
|
||||
// FIXME: look at all fields, not just name
|
||||
regular_columns.push_back({to_bytes(col_def.name), bytes_type});
|
||||
}
|
||||
auto s = make_lw_shared<schema>(ks_def.name, cf_def.name,
|
||||
std::move(partition_key), std::move(clustering_key), std::move(regular_columns), column_name_type);
|
||||
column_family cf(s);
|
||||
ks.column_families.emplace(cf_def.name, std::move(cf));
|
||||
}
|
||||
auto s = make_lw_shared<schema>(ks_def.name, cf_def.name,
|
||||
std::move(partition_key), std::move(clustering_key), std::move(regular_columns), column_name_type);
|
||||
column_family cf(s);
|
||||
ks.column_families.emplace(cf_def.name, std::move(cf));
|
||||
}
|
||||
cob(schema_id);
|
||||
}).then([schema_id = std::move(schema_id)] {
|
||||
return make_ready_future<std::string>(std::move(schema_id));
|
||||
}).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<std::string> result) {
|
||||
complete(result, cob, exn_cob);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
void system_drop_keyspace(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
|
||||
@@ -449,9 +491,9 @@ private:
|
||||
};
|
||||
|
||||
class handler_factory : public CassandraCobSvIfFactory {
|
||||
database& _db;
|
||||
distributed<database>& _db;
|
||||
public:
|
||||
explicit handler_factory(database& db) : _db(db) {}
|
||||
explicit handler_factory(distributed<database>& db) : _db(db) {}
|
||||
typedef CassandraCobSvIf Handler;
|
||||
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
|
||||
return new CassandraAsyncHandler(_db);
|
||||
@@ -462,6 +504,6 @@ public:
|
||||
};
|
||||
|
||||
std::unique_ptr<CassandraCobSvIfFactory>
|
||||
create_handler_factory(database& db) {
|
||||
create_handler_factory(distributed<database>& db) {
|
||||
return std::make_unique<handler_factory>(db);
|
||||
}
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
|
||||
#include "Cassandra.h"
|
||||
#include "database.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include <memory>
|
||||
|
||||
std::unique_ptr<org::apache::cassandra::CassandraCobSvIfFactory> create_handler_factory(database& db);
|
||||
std::unique_ptr<org::apache::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db);
|
||||
|
||||
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
|
||||
|
||||
@@ -36,7 +36,7 @@ public:
|
||||
thrift_stats(thrift_server& server);
|
||||
};
|
||||
|
||||
thrift_server::thrift_server(database& db)
|
||||
thrift_server::thrift_server(distributed<database>& db)
|
||||
: _stats(new thrift_stats(*this))
|
||||
, _handler_factory(create_handler_factory(db).release())
|
||||
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
#define APPS_SEASTAR_THRIFT_SERVER_HH_
|
||||
|
||||
#include "core/reactor.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include <memory>
|
||||
#include <cstdint>
|
||||
|
||||
@@ -42,7 +43,7 @@ class thrift_server {
|
||||
uint64_t _current_connections = 0;
|
||||
uint64_t _requests_served = 0;
|
||||
public:
|
||||
thrift_server(database& db);
|
||||
thrift_server(distributed<database>& db);
|
||||
future<> listen(ipv4_addr addr);
|
||||
void do_accepts(int which);
|
||||
class connection;
|
||||
|
||||
@@ -230,7 +230,7 @@ private:
|
||||
sstring make_frame(uint8_t version, size_t length);
|
||||
};
|
||||
|
||||
cql_server::cql_server(database& db)
|
||||
cql_server::cql_server(distributed<database>& db)
|
||||
: _proxy(db)
|
||||
, _query_processor(_proxy, db)
|
||||
{
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include "core/reactor.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "core/distributed.hh"
|
||||
|
||||
class database;
|
||||
|
||||
@@ -16,7 +17,7 @@ class cql_server {
|
||||
service::storage_proxy _proxy;
|
||||
cql3::query_processor _query_processor;
|
||||
public:
|
||||
cql_server(database& db);
|
||||
cql_server(distributed<database>& db);
|
||||
future<> listen(ipv4_addr addr);
|
||||
void do_accepts(int which);
|
||||
private:
|
||||
|
||||
5
types.cc
5
types.cc
@@ -5,6 +5,7 @@
|
||||
#include <boost/lexical_cast.hpp>
|
||||
#include "cql3/cql3_type.hh"
|
||||
#include "types.hh"
|
||||
#include "core/print.hh"
|
||||
|
||||
template<typename T>
|
||||
struct simple_type_traits {
|
||||
@@ -396,6 +397,10 @@ struct uuid_type_impl : abstract_type {
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const bytes& b) {
|
||||
return os << to_hex(b);
|
||||
}
|
||||
|
||||
thread_local shared_ptr<abstract_type> int32_type(make_shared<int32_type_impl>());
|
||||
thread_local shared_ptr<abstract_type> long_type(make_shared<long_type_impl>());
|
||||
thread_local shared_ptr<abstract_type> ascii_type(make_shared<string_type_impl>("ascii", cql3::native_cql3_type::ascii));
|
||||
|
||||
2
types.hh
2
types.hh
@@ -31,6 +31,8 @@ using sstring_view = std::experimental::string_view;
|
||||
sstring to_hex(const bytes& b);
|
||||
sstring to_hex(const bytes_opt& b);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const bytes& b);
|
||||
|
||||
using object_opt = std::experimental::optional<boost::any>;
|
||||
|
||||
class marshal_exception : public std::exception {
|
||||
|
||||
Reference in New Issue
Block a user