thrift: Implement batch_mutate on top of storage_proxy

So that the specified consistency level can be respected.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-06-08 16:14:11 +02:00
parent 12dca9fdc9
commit f237b5ff19

View File

@@ -444,60 +444,8 @@ public:
void batch_mutate(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
return with_cob(std::move(cob), std::move(exn_cob), [&] {
if (current_keyspace().empty()) {
throw make_exception<InvalidRequestException>("keyspace not set");
}
// Would like to use move_iterator below, but Mutation is filled with some const stuff.
return parallel_for_each(mutation_map.begin(), mutation_map.end(),
[this] (std::pair<std::string, std::map<std::string, std::vector<Mutation>>> key_cf) {
bytes thrift_key = to_bytes(key_cf.first);
std::map<std::string, std::vector<Mutation>>& cf_mutations_map = key_cf.second;
return parallel_for_each(
boost::make_move_iterator(cf_mutations_map.begin()),
boost::make_move_iterator(cf_mutations_map.end()),
[this, thrift_key] (std::pair<std::string, std::vector<Mutation>> cf_mutations) {
sstring cf_name = cf_mutations.first;
const std::vector<Mutation>& mutations = cf_mutations.second;
auto schema = lookup_schema(_db.local(), current_keyspace(), cf_name);
mutation m_to_apply(key_from_thrift(*schema, thrift_key), schema);
auto empty_clustering_key = clustering_key::make_empty();
for (const Mutation& m : mutations) {
if (m.__isset.column_or_supercolumn) {
auto&& cosc = m.column_or_supercolumn;
if (cosc.__isset.column) {
auto&& col = cosc.column;
bytes cname = to_bytes(col.name);
auto def = schema->get_column_definition(cname);
if (!def) {
throw make_exception<InvalidRequestException>("column %s not found", col.name);
}
if (def->kind != column_kind::regular_column) {
throw make_exception<InvalidRequestException>("Column %s is not settable", col.name);
}
m_to_apply.set_clustered_cell(empty_clustering_key, *def,
atomic_cell::make_live(col.timestamp, to_bytes(col.value), maybe_ttl(*schema, col)));
} else if (cosc.__isset.super_column) {
// FIXME: implement
} else if (cosc.__isset.counter_column) {
// FIXME: implement
} else if (cosc.__isset.counter_super_column) {
// FIXME: implement
} else {
throw make_exception<InvalidRequestException>("Empty ColumnOrSuperColumn");
}
} else if (m.__isset.deletion) {
// FIXME: implement
abort();
} else {
throw make_exception<InvalidRequestException>("Mutation must have either column or deletion");
}
}
auto shard = _db.local().shard_of(m_to_apply);
return _db.invoke_on(shard, [this, gs = global_schema_ptr(schema), cf_name, m = freeze(m_to_apply)] (database& db) {
return db.apply(gs, m);
});
});
});
auto muts = prepare_mutations(_db.local(), current_keyspace(), mutation_map);
return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level));
});
}
@@ -1639,6 +1587,23 @@ private:
throw make_exception<InvalidRequestException>("Mutation must have either column or deletion");
}
}
static std::vector<mutation> prepare_mutations(database& db, const sstring& ks_name, const std::map<std::string, std::map<std::string, std::vector<Mutation>>>& mutation_map) {
std::vector<mutation> muts;
for (auto&& key_cf : mutation_map) {
auto thrift_key = to_bytes_view(key_cf.first);
for (auto&& cf_mutations : key_cf.second) {
const sstring& cf_name = cf_mutations.first;
auto schema = lookup_schema(db, ks_name, cf_name);
const std::vector<Mutation> &mutations = cf_mutations.second;
mutation m_to_apply(key_from_thrift(*schema, thrift_key), schema);
for (const Mutation &m : mutations) {
add_to_mutation(*schema, m, m_to_apply);
}
muts.emplace_back(std::move(m_to_apply));
}
}
return muts;
}
};
class handler_factory : public CassandraCobSvIfFactory {