From f237b5ff19f2162bada9f994657ef64f855fbe26 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 8 Jun 2016 16:14:11 +0200 Subject: [PATCH] thrift: Implement batch_mutate on top of storage_proxy So that the specified consistency level can be respected. Signed-off-by: Duarte Nunes --- thrift/handler.cc | 73 ++++++++++++----------------------------------- 1 file changed, 19 insertions(+), 54 deletions(-) diff --git a/thrift/handler.cc b/thrift/handler.cc index 6710e86a4c..edcf692ba6 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -444,60 +444,8 @@ public: void batch_mutate(tcxx::function cob, tcxx::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { return with_cob(std::move(cob), std::move(exn_cob), [&] { - if (current_keyspace().empty()) { - throw make_exception("keyspace not set"); - } - // Would like to use move_iterator below, but Mutation is filled with some const stuff. - return parallel_for_each(mutation_map.begin(), mutation_map.end(), - [this] (std::pair>> key_cf) { - bytes thrift_key = to_bytes(key_cf.first); - std::map>& cf_mutations_map = key_cf.second; - return parallel_for_each( - boost::make_move_iterator(cf_mutations_map.begin()), - boost::make_move_iterator(cf_mutations_map.end()), - [this, thrift_key] (std::pair> cf_mutations) { - sstring cf_name = cf_mutations.first; - const std::vector& mutations = cf_mutations.second; - auto schema = lookup_schema(_db.local(), current_keyspace(), cf_name); - mutation m_to_apply(key_from_thrift(*schema, thrift_key), schema); - auto empty_clustering_key = clustering_key::make_empty(); - for (const Mutation& m : mutations) { - if (m.__isset.column_or_supercolumn) { - auto&& cosc = m.column_or_supercolumn; - if (cosc.__isset.column) { - auto&& col = cosc.column; - bytes cname = to_bytes(col.name); - auto def = schema->get_column_definition(cname); - if (!def) { - throw make_exception("column %s not found", col.name); - } - if (def->kind != column_kind::regular_column) { - throw make_exception("Column %s is not settable", col.name); - } - m_to_apply.set_clustered_cell(empty_clustering_key, *def, - atomic_cell::make_live(col.timestamp, to_bytes(col.value), maybe_ttl(*schema, col))); - } else if (cosc.__isset.super_column) { - // FIXME: implement - } else if (cosc.__isset.counter_column) { - // FIXME: implement - } else if (cosc.__isset.counter_super_column) { - // FIXME: implement - } else { - throw make_exception("Empty ColumnOrSuperColumn"); - } - } else if (m.__isset.deletion) { - // FIXME: implement - abort(); - } else { - throw make_exception("Mutation must have either column or deletion"); - } - } - auto shard = _db.local().shard_of(m_to_apply); - return _db.invoke_on(shard, [this, gs = global_schema_ptr(schema), cf_name, m = freeze(m_to_apply)] (database& db) { - return db.apply(gs, m); - }); - }); - }); + auto muts = prepare_mutations(_db.local(), current_keyspace(), mutation_map); + return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level)); }); } @@ -1639,6 +1587,23 @@ private: throw make_exception("Mutation must have either column or deletion"); } } + static std::vector prepare_mutations(database& db, const sstring& ks_name, const std::map>>& mutation_map) { + std::vector muts; + for (auto&& key_cf : mutation_map) { + auto thrift_key = to_bytes_view(key_cf.first); + for (auto&& cf_mutations : key_cf.second) { + const sstring& cf_name = cf_mutations.first; + auto schema = lookup_schema(db, ks_name, cf_name); + const std::vector &mutations = cf_mutations.second; + mutation m_to_apply(key_from_thrift(*schema, thrift_key), schema); + for (const Mutation &m : mutations) { + add_to_mutation(*schema, m, m_to_apply); + } + muts.emplace_back(std::move(m_to_apply)); + } + } + return muts; + } }; class handler_factory : public CassandraCobSvIfFactory {