system_keyspace: Remove support for legacy truncation records

Fixes #6341

Since scylla no longer supports upgrading from a version without the
"new" (dedicated) truncation record table, we can remove support for these
and the migtration thereof.

Make sure the above holds whereever this is committed.

Note that this does not  remove the "truncated_at" field in
system.local.
This commit is contained in:
Calle Wilund
2020-08-03 13:33:57 +00:00
committed by Avi Kivity
parent a9013030cf
commit 30a700c5b0
6 changed files with 0 additions and 245 deletions

View File

@@ -379,7 +379,6 @@ scylla_tests = set([
'test/boost/storage_proxy_test',
'test/boost/top_k_test',
'test/boost/transport_test',
'test/boost/truncation_migration_test',
'test/boost/types_test',
'test/boost/user_function_test',
'test/boost/user_types_test',

View File

@@ -1312,20 +1312,6 @@ typedef utils::UUID truncation_key;
typedef std::unordered_map<truncation_key, truncation_record> truncation_map;
static constexpr uint8_t current_version = 1;
static bool need_legacy_truncation_records = true;
static thread_local struct {
std::optional<future<>> done;
gms::feature::listener_registration reg;
} migration_complete;
future<> wait_for_truncation_record_migration_complete() {
// caller of this helper (test) first synchronously
// enables the feature migration_complete listens on,
// thus making thie future emplaced here
assert(migration_complete.done);
return std::move(*migration_complete.done);
}
/**
* This method is used to remove information about truncation time for specified column family
@@ -1378,149 +1364,10 @@ static future<> cache_truncation_record(distributed<database>& db) {
});
}
future<> migrate_truncation_records(const gms::feature& cluster_supports_truncation_table) {
sstring req = format("SELECT truncated_at FROM system.{} WHERE key = '{}'", LOCAL, LOCAL);
return qctx->qp().execute_internal(req).then([&cluster_supports_truncation_table](::shared_ptr<cql3::untyped_result_set> rs) {
truncation_map tmp;
if (!rs->empty() && rs->one().has("truncated_at")) {
auto map = rs->one().get_map<utils::UUID, bytes>("truncated_at");
for (auto& p : map) {
auto uuid = p.first;
auto buf = p.second;
try {
truncation_record e;
if (buf.size() & 1) {
// new record.
if (buf[0] != current_version) {
slogger.warn("Found truncation record of unknown version {}. Ignoring.", int(buf[0]));
continue;
}
e = ser::deserialize_from_buffer(buf, boost::type<truncation_record>(), 1);
if (e.magic == truncation_record::current_magic) {
tmp[uuid] = e;
continue;
}
} else {
// old scylla records. (We hope)
// Read 64+64 bit RP:s, even though the
// struct (and official serial size) is 64+32.
data_input in(buf);
slogger.debug("Reading old type record");
while (in.avail() > sizeof(db_clock::rep)) {
auto id = in.read<uint64_t>();
auto pos = in.read<uint64_t>();
e.positions.emplace_back(id, position_type(pos));
}
if (in.avail() == sizeof(db_clock::rep)) {
e.time_stamp = db_clock::time_point(db_clock::duration(in.read<db_clock::rep>()));
tmp[uuid] = e;
continue;
}
}
} catch (std::out_of_range &) {
}
// Trying to load an origin table.
// This is useless to us, because the only usage for this
// data is commit log and batch replay, and we cannot replay
// either from origin anyway.
slogger.warn("Error reading truncation record for {}. "
"Most likely this is data from a cassandra instance."
"Make sure you have cleared commit and batch logs before upgrading.",
uuid
);
}
}
auto i = tmp.begin();
auto e = tmp.end();
return parallel_for_each(i, e, [](const truncation_map::value_type& p) {
const utils::UUID& uuid = p.first;
const truncation_record& tr = p.second;
return get_truncation_record(uuid).then([&](truncation_record new_record) {
if (!new_record.positions.empty() && new_record.time_stamp >= tr.time_stamp) {
return make_ready_future<>();
}
return parallel_for_each(tr.positions, [&](replay_position rp) {
return save_truncation_record(uuid, tr.time_stamp, rp);
});
});
}).then([&cluster_supports_truncation_table, tmp = std::move(tmp)] {
if (!cluster_supports_truncation_table || !tmp.empty()) {
migration_complete.reg = cluster_supports_truncation_table.when_enabled([] {
// this potentially races with a truncation, i.e. someone could be inserting into
// the legacy column while we delete it. But this is ok, it will just mean we have
// some unneeded data and will do a merge again next boot, but eventually we
// will remove the legacy data...
auto level = need_legacy_truncation_records ? seastar::log_level::info : seastar::log_level::debug;
slogger.log(level, "Got cluster agreement on truncation table feature. Removing legacy records.");
need_legacy_truncation_records = false;
sstring req = format("DELETE truncated_at from system.{} WHERE key = '{}'", LOCAL, LOCAL);
migration_complete.done = qctx->qp().execute_internal(req).discard_result().then([level] {
slogger.log(level, "Legacy records deleted.");
return force_blocking_flush(LOCAL);
});
});
}
return make_ready_future<>();
});
});
}
static future<> save_legacy_truncation_records(utils::UUID id, db_clock::time_point truncated_at, replay_positions positions) {
truncation_record r;
r.magic = truncation_record::current_magic;
r.time_stamp = truncated_at;
r.positions = std::move(positions);
auto buf = ser::serialize_to_buffer<bytes>(r, sizeof(current_version));
buf[0] = current_version;
static_assert(sizeof(current_version) == 1, "using this as mark");
assert(buf.size() & 1); // verify we've created an odd-numbered buffer
map_type_impl::native_type tmp;
tmp.emplace_back(id, data_value(buf));
auto map_type = map_type_impl::get_instance(uuid_type, bytes_type, true);
sstring req = format("UPDATE system.{} SET truncated_at = truncated_at + ? WHERE key = '{}'", LOCAL, LOCAL);
return qctx->qp().execute_internal(req, {make_map_value(map_type, tmp)}).then([](auto rs) {
return force_blocking_flush(LOCAL);
});
}
static future<> maybe_save_legacy_truncation_record(utils::UUID id, db_clock::time_point truncated_at, db::replay_position rp) {
if (!need_legacy_truncation_records) {
return make_ready_future<>();
}
// TODO: this is horribly ineffective, we're doing a full flush of all system tables for all cores
// once, for each core (calling us). But right now, redesigning so that calling here (or, rather,
// save_truncation_records), is done from "somewhere higher, once per machine, not shard" is tricky.
// Mainly because drop_tables also uses truncate. And is run per-core as well. Gah.
return get_truncation_record(id).then([id, truncated_at, rp](truncation_record e) {
auto i = std::find_if(e.positions.begin(), e.positions.end(), [rp](replay_position& p) {
return p.shard_id() == rp.shard_id();
});
if (i == e.positions.end()) {
e.positions.emplace_back(rp);
} else {
*i = rp;
}
return save_legacy_truncation_records(id, std::max(truncated_at, e.time_stamp), e.positions);
});
}
future<> save_truncation_record(utils::UUID id, db_clock::time_point truncated_at, db::replay_position rp) {
sstring req = format("INSERT INTO system.{} (table_uuid, shard, position, segment_id, truncated_at) VALUES(?,?,?,?,?)", TRUNCATED);
return qctx->qp().execute_internal(req, {id, int32_t(rp.shard_id()), int32_t(rp.pos), int64_t(rp.base_id()), truncated_at}).discard_result().then([] {
return force_blocking_flush(TRUNCATED);
}).then([=] {
return maybe_save_legacy_truncation_record(id, truncated_at, rp);
});
}

View File

@@ -407,9 +407,6 @@ enum class bootstrap_state {
typedef std::vector<db::replay_position> replay_positions;
future<> migrate_truncation_records(const gms::feature& cluster_supports_truncation_table);
// for tests
future<> wait_for_truncation_record_migration_complete();
future<> save_truncation_record(utils::UUID, db_clock::time_point truncated_at, db::replay_position);
future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, db::replay_position);
future<> remove_truncation_record(utils::UUID);

View File

@@ -882,9 +882,6 @@ int main(int ac, char** av) {
// schema migration, if needed, is also done on shard 0
db::legacy_schema_migrator::migrate(proxy, db, qp.local()).get();
// truncation record migration
db::system_keyspace::migrate_truncation_records(feature_service.local().cluster_supports_truncation_table()).get();
supervisor::notify("loading system sstables");
distributed_loader::ensure_system_table_directories(db).get();

View File

@@ -1,83 +0,0 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string_view>
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "utils/joinpoint.hh"
SEASTAR_TEST_CASE(test_truncation_record_migration) {
cql_test_config cfg;
cfg.disabled_features = { "TRUNCATION_TABLE" };
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("CREATE TABLE test (a int, b int, PRIMARY KEY (a))").get();
e.execute_cql("INSERT INTO test (a, b) VALUES (1, 100);").get();
assert_that(e.execute_cql("SELECT * FROM test").get0())
.is_rows().with_size(1);
assert_that(e.execute_cql("SELECT * FROM system.truncated").get0())
.is_rows().is_empty();
assert_that(e.execute_cql("SELECT truncated_at FROM system.local WHERE key = 'local'").get0())
.is_rows().is_null();
// Do a truncation
// Cannot do via cql, because we don't have an actual functioning rpc active.
do_with(utils::make_joinpoint([] { return db_clock::now();}), [](auto& tsf) {
return service::get_storage_proxy().invoke_on_all([&tsf](service::storage_proxy& sp) {
return sp.get_db().local().truncate("ks", "test", [&tsf] { return tsf.value(); });
});
}).get();
assert_that(e.execute_cql("SELECT * FROM test").get0())
.is_rows().is_empty();
assert_that(e.execute_cql("SELECT * FROM system.truncated").get0())
.is_rows().is_not_empty();
// should also have created legacy record
assert_that(e.execute_cql("SELECT truncated_at FROM system.local WHERE key = 'local'").get0())
.is_rows().is_not_null();
// Now enable truncation_table feature. Should remove the
// legacy records.
service::get_storage_service().invoke_on_all([] (service::storage_service& ss) {
ss.features().cluster_supports_truncation_table().enable();
}).get();
db::system_keyspace::wait_for_truncation_record_migration_complete().get();
assert_that(e.execute_cql("SELECT truncated_at FROM system.local WHERE key = 'local'").get0())
.is_rows().is_null();
}, cfg);
}

View File

@@ -550,8 +550,6 @@ public:
db::system_keyspace::init_local_cache().get();
auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); });
db::system_keyspace::migrate_truncation_records(feature_service.local().cluster_supports_truncation_table()).get();
service::get_local_storage_service().init_messaging_service_part().get();
service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get();
service::get_local_storage_service().join_cluster().get();