From 5a0ae55f6d355fcac82fcc6f8f7a91ce2ff22ae4 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 8 Jun 2017 13:45:56 +0200 Subject: [PATCH] Introduce schema_upgrader --- schema_upgrader.hh | 68 +++++++++++++++++++++++++++++++++ tests/streamed_mutation_test.cc | 20 ++++++++++ 2 files changed, 88 insertions(+) create mode 100644 schema_upgrader.hh diff --git a/schema_upgrader.hh b/schema_upgrader.hh new file mode 100644 index 0000000000..2683cec7dd --- /dev/null +++ b/schema_upgrader.hh @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "streamed_mutation.hh" +#include "converting_mutation_partition_applier.hh" + +// A StreamedMutationTransformer which transforms the stream to a different schema +class schema_upgrader { + schema_ptr _prev; + schema_ptr _new; +private: + row transform(row&& r, column_kind kind) { + row new_row; + r.for_each_cell([&] (column_id id, atomic_cell_or_collection& cell) { + const column_definition& col = _prev->column_at(kind, id); + const column_definition* new_col = _new->get_column_definition(col.name()); + if (new_col) { + converting_mutation_partition_applier::append_cell(new_row, kind, *new_col, col.type, std::move(cell)); + } + }); + return new_row; + } +public: + schema_upgrader(schema_ptr s) + : _new(std::move(s)) + { } + schema_ptr operator()(schema_ptr old) { + _prev = std::move(old); + return _new; + } + mutation_fragment consume(static_row&& row) { + return mutation_fragment(static_row(transform(std::move(row.cells()), column_kind::static_column))); + } + mutation_fragment consume(clustering_row&& row) { + return mutation_fragment(clustering_row(row.key(), row.tomb(), row.marker(), + transform(std::move(row.cells()), column_kind::regular_column))); + } + mutation_fragment consume(range_tombstone&& rt) { + return std::move(rt); + } + mutation_fragment operator()(mutation_fragment&& mf) { + return std::move(mf).consume(*this); + } +}; + +GCC6_CONCEPT( +static_assert(StreamedMutationTranformer()); +) diff --git a/tests/streamed_mutation_test.cc b/tests/streamed_mutation_test.cc index 3879f41edf..2ff294f817 100644 --- a/tests/streamed_mutation_test.cc +++ b/tests/streamed_mutation_test.cc @@ -29,8 +29,10 @@ #include "tests/test_services.hh" #include "schema_builder.hh" #include "total_order_check.hh" +#include "schema_upgrader.hh" #include "disk-error-handler.hh" +#include "mutation_assertions.hh" thread_local disk_error_signal_type commit_error; thread_local disk_error_signal_type general_disk_error; @@ -538,3 +540,21 @@ SEASTAR_TEST_CASE(test_ordering_of_position_in_partition_and_composite_view_in_a .check(); }); } + +SEASTAR_TEST_CASE(test_schema_upgrader_is_equivalent_with_mutation_upgrade) { + return seastar::async([] { + for_each_mutation_pair([](const mutation& m1, const mutation& m2, are_equal eq) { + if (m1.schema()->version() != m2.schema()->version()) { + // upgrade m1 to m2's schema + + auto from_upgrader = mutation_from_streamed_mutation( + transform(streamed_mutation_from_mutation(m1), schema_upgrader(m2.schema()))).get0(); + + auto regular = m1; + regular.upgrade(m2.schema()); + + assert_that(from_upgrader).has_mutation().is_equal_to(regular); + } + }); + }); +}