diff --git a/configure.py b/configure.py index 5b74af3d1b..611cb06b66 100755 --- a/configure.py +++ b/configure.py @@ -379,7 +379,6 @@ scylla_tests = set([ 'test/boost/storage_proxy_test', 'test/boost/top_k_test', 'test/boost/transport_test', - 'test/boost/truncation_migration_test', 'test/boost/types_test', 'test/boost/user_function_test', 'test/boost/user_types_test', diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 4d21672556..8546cdddf7 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1312,20 +1312,6 @@ typedef utils::UUID truncation_key; typedef std::unordered_map truncation_map; static constexpr uint8_t current_version = 1; -static bool need_legacy_truncation_records = true; - -static thread_local struct { - std::optional> done; - gms::feature::listener_registration reg; -} migration_complete; - -future<> wait_for_truncation_record_migration_complete() { - // caller of this helper (test) first synchronously - // enables the feature migration_complete listens on, - // thus making thie future emplaced here - assert(migration_complete.done); - return std::move(*migration_complete.done); -} /** * This method is used to remove information about truncation time for specified column family @@ -1378,149 +1364,10 @@ static future<> cache_truncation_record(distributed& db) { }); } -future<> migrate_truncation_records(const gms::feature& cluster_supports_truncation_table) { - sstring req = format("SELECT truncated_at FROM system.{} WHERE key = '{}'", LOCAL, LOCAL); - return qctx->qp().execute_internal(req).then([&cluster_supports_truncation_table](::shared_ptr rs) { - truncation_map tmp; - if (!rs->empty() && rs->one().has("truncated_at")) { - auto map = rs->one().get_map("truncated_at"); - for (auto& p : map) { - auto uuid = p.first; - auto buf = p.second; - - try { - truncation_record e; - - if (buf.size() & 1) { - // new record. - if (buf[0] != current_version) { - slogger.warn("Found truncation record of unknown version {}. Ignoring.", int(buf[0])); - continue; - } - e = ser::deserialize_from_buffer(buf, boost::type(), 1); - if (e.magic == truncation_record::current_magic) { - tmp[uuid] = e; - continue; - } - } else { - // old scylla records. (We hope) - // Read 64+64 bit RP:s, even though the - // struct (and official serial size) is 64+32. - data_input in(buf); - - slogger.debug("Reading old type record"); - while (in.avail() > sizeof(db_clock::rep)) { - auto id = in.read(); - auto pos = in.read(); - e.positions.emplace_back(id, position_type(pos)); - } - if (in.avail() == sizeof(db_clock::rep)) { - e.time_stamp = db_clock::time_point(db_clock::duration(in.read())); - tmp[uuid] = e; - continue; - } - } - } catch (std::out_of_range &) { - } - // Trying to load an origin table. - // This is useless to us, because the only usage for this - // data is commit log and batch replay, and we cannot replay - // either from origin anyway. - slogger.warn("Error reading truncation record for {}. " - "Most likely this is data from a cassandra instance." - "Make sure you have cleared commit and batch logs before upgrading.", - uuid - ); - } - } - - auto i = tmp.begin(); - auto e = tmp.end(); - return parallel_for_each(i, e, [](const truncation_map::value_type& p) { - const utils::UUID& uuid = p.first; - const truncation_record& tr = p.second; - return get_truncation_record(uuid).then([&](truncation_record new_record) { - if (!new_record.positions.empty() && new_record.time_stamp >= tr.time_stamp) { - return make_ready_future<>(); - } - return parallel_for_each(tr.positions, [&](replay_position rp) { - return save_truncation_record(uuid, tr.time_stamp, rp); - }); - }); - }).then([&cluster_supports_truncation_table, tmp = std::move(tmp)] { - if (!cluster_supports_truncation_table || !tmp.empty()) { - migration_complete.reg = cluster_supports_truncation_table.when_enabled([] { - // this potentially races with a truncation, i.e. someone could be inserting into - // the legacy column while we delete it. But this is ok, it will just mean we have - // some unneeded data and will do a merge again next boot, but eventually we - // will remove the legacy data... - auto level = need_legacy_truncation_records ? seastar::log_level::info : seastar::log_level::debug; - slogger.log(level, "Got cluster agreement on truncation table feature. Removing legacy records."); - need_legacy_truncation_records = false; - sstring req = format("DELETE truncated_at from system.{} WHERE key = '{}'", LOCAL, LOCAL); - - migration_complete.done = qctx->qp().execute_internal(req).discard_result().then([level] { - slogger.log(level, "Legacy records deleted."); - return force_blocking_flush(LOCAL); - }); - }); - } - return make_ready_future<>(); - }); - }); -} - -static future<> save_legacy_truncation_records(utils::UUID id, db_clock::time_point truncated_at, replay_positions positions) { - truncation_record r; - - r.magic = truncation_record::current_magic; - r.time_stamp = truncated_at; - r.positions = std::move(positions); - - auto buf = ser::serialize_to_buffer(r, sizeof(current_version)); - - buf[0] = current_version; - - static_assert(sizeof(current_version) == 1, "using this as mark"); - assert(buf.size() & 1); // verify we've created an odd-numbered buffer - - map_type_impl::native_type tmp; - tmp.emplace_back(id, data_value(buf)); - auto map_type = map_type_impl::get_instance(uuid_type, bytes_type, true); - - sstring req = format("UPDATE system.{} SET truncated_at = truncated_at + ? WHERE key = '{}'", LOCAL, LOCAL); - return qctx->qp().execute_internal(req, {make_map_value(map_type, tmp)}).then([](auto rs) { - return force_blocking_flush(LOCAL); - }); -} - -static future<> maybe_save_legacy_truncation_record(utils::UUID id, db_clock::time_point truncated_at, db::replay_position rp) { - if (!need_legacy_truncation_records) { - return make_ready_future<>(); - } - // TODO: this is horribly ineffective, we're doing a full flush of all system tables for all cores - // once, for each core (calling us). But right now, redesigning so that calling here (or, rather, - // save_truncation_records), is done from "somewhere higher, once per machine, not shard" is tricky. - // Mainly because drop_tables also uses truncate. And is run per-core as well. Gah. - return get_truncation_record(id).then([id, truncated_at, rp](truncation_record e) { - auto i = std::find_if(e.positions.begin(), e.positions.end(), [rp](replay_position& p) { - return p.shard_id() == rp.shard_id(); - }); - if (i == e.positions.end()) { - e.positions.emplace_back(rp); - } else { - *i = rp; - } - return save_legacy_truncation_records(id, std::max(truncated_at, e.time_stamp), e.positions); - }); -} - future<> save_truncation_record(utils::UUID id, db_clock::time_point truncated_at, db::replay_position rp) { sstring req = format("INSERT INTO system.{} (table_uuid, shard, position, segment_id, truncated_at) VALUES(?,?,?,?,?)", TRUNCATED); return qctx->qp().execute_internal(req, {id, int32_t(rp.shard_id()), int32_t(rp.pos), int64_t(rp.base_id()), truncated_at}).discard_result().then([] { return force_blocking_flush(TRUNCATED); - }).then([=] { - return maybe_save_legacy_truncation_record(id, truncated_at, rp); }); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 8fa171d5b0..6c83f56f7e 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -407,9 +407,6 @@ enum class bootstrap_state { typedef std::vector replay_positions; - future<> migrate_truncation_records(const gms::feature& cluster_supports_truncation_table); - // for tests - future<> wait_for_truncation_record_migration_complete(); future<> save_truncation_record(utils::UUID, db_clock::time_point truncated_at, db::replay_position); future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, db::replay_position); future<> remove_truncation_record(utils::UUID); diff --git a/main.cc b/main.cc index c20fc43eb8..b83e63d1b4 100644 --- a/main.cc +++ b/main.cc @@ -882,9 +882,6 @@ int main(int ac, char** av) { // schema migration, if needed, is also done on shard 0 db::legacy_schema_migrator::migrate(proxy, db, qp.local()).get(); - // truncation record migration - db::system_keyspace::migrate_truncation_records(feature_service.local().cluster_supports_truncation_table()).get(); - supervisor::notify("loading system sstables"); distributed_loader::ensure_system_table_directories(db).get(); diff --git a/test/boost/truncation_migration_test.cc b/test/boost/truncation_migration_test.cc deleted file mode 100644 index f594dbc858..0000000000 --- a/test/boost/truncation_migration_test.cc +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (C) 2019 ScyllaDB - */ - -/* - * This file is part of Scylla. - * - * Scylla is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Scylla is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Scylla. If not, see . - */ - -#include -#include - -#include "test/lib/cql_test_env.hh" -#include "test/lib/cql_assertions.hh" -#include "db/config.hh" -#include "db/system_keyspace.hh" -#include "service/storage_service.hh" -#include "service/storage_proxy.hh" -#include "utils/joinpoint.hh" - -SEASTAR_TEST_CASE(test_truncation_record_migration) { - cql_test_config cfg; - - cfg.disabled_features = { "TRUNCATION_TABLE" }; - - return do_with_cql_env_thread([](cql_test_env& e) { - e.execute_cql("CREATE TABLE test (a int, b int, PRIMARY KEY (a))").get(); - e.execute_cql("INSERT INTO test (a, b) VALUES (1, 100);").get(); - - assert_that(e.execute_cql("SELECT * FROM test").get0()) - .is_rows().with_size(1); - - assert_that(e.execute_cql("SELECT * FROM system.truncated").get0()) - .is_rows().is_empty(); - assert_that(e.execute_cql("SELECT truncated_at FROM system.local WHERE key = 'local'").get0()) - .is_rows().is_null(); - - // Do a truncation - // Cannot do via cql, because we don't have an actual functioning rpc active. - - do_with(utils::make_joinpoint([] { return db_clock::now();}), [](auto& tsf) { - return service::get_storage_proxy().invoke_on_all([&tsf](service::storage_proxy& sp) { - return sp.get_db().local().truncate("ks", "test", [&tsf] { return tsf.value(); }); - }); - }).get(); - - assert_that(e.execute_cql("SELECT * FROM test").get0()) - .is_rows().is_empty(); - - - assert_that(e.execute_cql("SELECT * FROM system.truncated").get0()) - .is_rows().is_not_empty(); - // should also have created legacy record - assert_that(e.execute_cql("SELECT truncated_at FROM system.local WHERE key = 'local'").get0()) - .is_rows().is_not_null(); - - // Now enable truncation_table feature. Should remove the - // legacy records. - - service::get_storage_service().invoke_on_all([] (service::storage_service& ss) { - ss.features().cluster_supports_truncation_table().enable(); - }).get(); - - db::system_keyspace::wait_for_truncation_record_migration_complete().get(); - - assert_that(e.execute_cql("SELECT truncated_at FROM system.local WHERE key = 'local'").get0()) - .is_rows().is_null(); - - }, cfg); -} - diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 66c74048db..4421700cb5 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -550,8 +550,6 @@ public: db::system_keyspace::init_local_cache().get(); auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); }); - db::system_keyspace::migrate_truncation_records(feature_service.local().cluster_supports_truncation_table()).get(); - service::get_local_storage_service().init_messaging_service_part().get(); service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get(); service::get_local_storage_service().join_cluster().get();