logstor: truncate table
implement freeing all segments of a table for table truncate. first do barrier to flush all active and mixed segments and put all the table's data in compaction groups, then stop compaction for the table, then free the table's segments and remove the live entries from the index.
This commit is contained in:
@@ -3041,6 +3041,10 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, std:
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
|
||||
if (cf.uses_kv_storage()) {
|
||||
co_await cf.discard_kv_storage();
|
||||
}
|
||||
|
||||
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
|
||||
@@ -1066,6 +1066,7 @@ public:
|
||||
bool needs_flush() const;
|
||||
future<> clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<db::replay_position> discard_sstables(db_clock::time_point);
|
||||
future<> discard_kv_storage();
|
||||
|
||||
bool can_flush() const;
|
||||
|
||||
|
||||
@@ -73,6 +73,13 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void erase(const index_key& key, log_location loc) {
|
||||
auto it = _index.find(key);
|
||||
if (it != _index.end() && it->location == loc) {
|
||||
_index.erase(key);
|
||||
}
|
||||
}
|
||||
|
||||
auto begin() const { return _index.begin(); }
|
||||
auto end() const { return _index.end(); }
|
||||
};
|
||||
@@ -118,6 +125,10 @@ public:
|
||||
return get_bucket(key).insert_if_newer(key, std::move(new_entry));
|
||||
}
|
||||
|
||||
void erase(const index_key& key, log_location loc) {
|
||||
get_bucket(key).erase(key, loc);
|
||||
}
|
||||
|
||||
class const_iterator {
|
||||
using bucket_array = log_index::bucket_array;
|
||||
using bucket_iterator = decltype(std::declval<const log_index_bucket>().begin());
|
||||
|
||||
@@ -72,6 +72,10 @@ future<> logstor::do_barrier() {
|
||||
return _segment_manager.do_barrier();
|
||||
}
|
||||
|
||||
future<> logstor::truncate_table(table_id tid) {
|
||||
return _segment_manager.truncate_table(tid);
|
||||
}
|
||||
|
||||
future<> logstor::write(const mutation& m, group_id group) {
|
||||
auto key = calculate_key(*m.schema(), m.decorated_key());
|
||||
|
||||
|
||||
@@ -58,6 +58,8 @@ public:
|
||||
|
||||
future<> do_barrier();
|
||||
|
||||
future<> truncate_table(table_id);
|
||||
|
||||
static index_key calculate_key(const schema&, const dht::decorated_key&);
|
||||
|
||||
future<> write(const mutation&, group_id);
|
||||
|
||||
@@ -688,6 +688,8 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<std::vector<log_segment_id>> free_compaction_groups(table_id);
|
||||
|
||||
future<> write_to_separator(separator&, write_buffer&, log_location base_location);
|
||||
future<> write_to_separator(separator&, log_segment_id);
|
||||
future<> flush_separator(separator&);
|
||||
@@ -704,6 +706,7 @@ private:
|
||||
future<> compact_segments(group_id gid, std::vector<log_segment_id> segments);
|
||||
|
||||
void remove_segment(segment_descriptor& desc);
|
||||
future<std::pair<compaction_group_it, std::vector<log_segment_id>>> remove_compaction_group(compaction_group_it);
|
||||
|
||||
separator::buffer& get_separator_buffer(separator& sep, const log_record_writer& writer);
|
||||
|
||||
@@ -933,6 +936,8 @@ public:
|
||||
|
||||
future<> do_barrier();
|
||||
|
||||
future<> truncate_table(table_id table);
|
||||
|
||||
private:
|
||||
|
||||
struct segment_allocation_guard {
|
||||
@@ -1510,6 +1515,31 @@ future<> segment_manager_impl::free_segment(log_segment_id segment_id) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> segment_manager_impl::truncate_table(table_id table) {
|
||||
logstor_logger.info("Truncating table {}", table);
|
||||
|
||||
auto holder = _async_gate.hold();
|
||||
|
||||
// do a barrier to ensure all the table's data is flushed to segments
|
||||
// in the table's compaction groups. we assume there are no new writes
|
||||
// to the table by this point.
|
||||
co_await do_barrier();
|
||||
|
||||
// stop and free the compaction groups of the table.
|
||||
auto segments = co_await _compaction_mgr.free_compaction_groups(table);
|
||||
|
||||
// free all segments and remove their records from the index.
|
||||
for (auto seg_id : segments) {
|
||||
logstor_logger.debug("Freeing segment {} of truncated table {}", seg_id, table);
|
||||
co_await for_each_record(seg_id, [this] (log_location loc, log_record record) {
|
||||
_index.erase(record.key, loc);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
co_await free_segment(seg_id);
|
||||
}
|
||||
}
|
||||
|
||||
future<> segment_manager_impl::for_each_record(log_segment_id segment_id,
|
||||
std::function<future<>(log_location, log_record)> callback) {
|
||||
auto holder = _async_gate.hold();
|
||||
@@ -1836,6 +1866,42 @@ void compaction_manager::remove_segment(segment_descriptor& desc) {
|
||||
}
|
||||
}
|
||||
|
||||
future<std::pair<compaction_manager::compaction_group_it, std::vector<log_segment_id>>>
|
||||
compaction_manager::remove_compaction_group(compaction_group_it it) {
|
||||
std::vector<log_segment_id> segments;
|
||||
auto& cg = it->second;
|
||||
auto& hist = cg.segment_hist;
|
||||
while (!hist.empty()) {
|
||||
co_await coroutine::maybe_yield();
|
||||
auto& desc = hist.one_of_largest();
|
||||
auto seg_id = _sm.desc_to_segment_id(desc);
|
||||
segments.push_back(seg_id);
|
||||
hist.erase(desc);
|
||||
}
|
||||
|
||||
if (_next_group_for_compaction == it) {
|
||||
++_next_group_for_compaction;
|
||||
}
|
||||
it = _compaction_groups.erase(it);
|
||||
co_return std::make_pair(it, std::move(segments));
|
||||
}
|
||||
|
||||
future<std::vector<log_segment_id>> compaction_manager::free_compaction_groups(table_id table) {
|
||||
co_await disable_auto_compaction(table);
|
||||
|
||||
std::vector<log_segment_id> segments_to_free;
|
||||
|
||||
auto it = _compaction_groups.lower_bound(group_id{table, 0});
|
||||
while (it != _compaction_groups.end() && it->first.table == table) {
|
||||
co_await coroutine::maybe_yield();
|
||||
auto [next_it, segments] = co_await remove_compaction_group(it);
|
||||
segments_to_free.insert(segments_to_free.end(), segments.begin(), segments.end());
|
||||
it = next_it;
|
||||
}
|
||||
|
||||
co_return std::move(segments_to_free);
|
||||
}
|
||||
|
||||
separator::buffer& compaction_manager::get_separator_buffer(separator& sep, const log_record_writer& writer) {
|
||||
auto gid = writer.record().group;
|
||||
|
||||
@@ -2206,6 +2272,10 @@ future<> segment_manager::do_barrier() {
|
||||
return _impl->do_barrier();
|
||||
}
|
||||
|
||||
future<> segment_manager::truncate_table(table_id table) {
|
||||
return _impl->truncate_table(table);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
template<>
|
||||
|
||||
@@ -83,6 +83,8 @@ public:
|
||||
// to their group_id.
|
||||
future<> do_barrier();
|
||||
|
||||
future<> truncate_table(table_id);
|
||||
|
||||
friend class segment_manager_impl;
|
||||
|
||||
};
|
||||
|
||||
@@ -4281,6 +4281,12 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
|
||||
co_return rp;
|
||||
}
|
||||
|
||||
future<> table::discard_kv_storage() {
|
||||
if (_logstor) {
|
||||
co_await _logstor->truncate_table(_schema->id());
|
||||
}
|
||||
}
|
||||
|
||||
void table::mark_ready_for_writes(db::commitlog* cl) {
|
||||
if (!_readonly) {
|
||||
on_internal_error(dblog, ::format("table {}.{} is already writable", _schema->ks_name(), _schema->cf_name()));
|
||||
|
||||
@@ -330,3 +330,53 @@ async def test_compaction(manager: ManagerClient):
|
||||
metrics = await manager.metrics.query(servers[0].ip_addr)
|
||||
segments_compacted = metrics.get("scylla_logstor_sm_segments_compacted") or 0
|
||||
assert segments_compacted == 4, f"Expected 4 segments to be compacted, but got {segments_compacted}"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drop_table(manager: ManagerClient):
|
||||
"""
|
||||
Test log compaction by creating dead data and verifying space reclamation.
|
||||
"""
|
||||
cmdline = ['--logger-log-level', 'logstor=trace']
|
||||
cfg = {'enable_logstor': True, 'experimental_features': ['logstor']}
|
||||
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH tablets={'initial': 4}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'")
|
||||
|
||||
# create another table that will not be dropped to verify it's not affected
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'")
|
||||
|
||||
# write data to fill few segments
|
||||
value_size = 30 * 1024
|
||||
value = 'x' * value_size
|
||||
for i in range(20):
|
||||
await cql.run_async(f"INSERT INTO {ks}.test1 (pk, v) VALUES ({i}, '{value}')")
|
||||
await cql.run_async(f"INSERT INTO {ks}.test2 (pk, v) VALUES ({i}, '{value}')")
|
||||
|
||||
await cql.run_async(f"DROP TABLE {ks}.test1")
|
||||
|
||||
# verify test2 is not affected
|
||||
for i in range(20):
|
||||
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test2 WHERE pk = {i}")
|
||||
assert len(rows) == 1, f"Expected 1 row for key {i} in test2, but got {len(rows)}"
|
||||
assert rows[0].v == value, f"Expected value of size {value_size} for key {i} in test2, but got {len(rows[0].v)}"
|
||||
|
||||
# recreate the table and verify that old data is not visible
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'")
|
||||
|
||||
for i in range(20):
|
||||
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test1 WHERE pk = {i}")
|
||||
assert len(rows) == 0, f"Expected no rows for key {i} after table drop, but got {len(rows)}"
|
||||
|
||||
# write new data to the recreated table and verify
|
||||
await cql.run_async(f"INSERT INTO {ks}.test1 (pk, v) VALUES (1, 'new_value')")
|
||||
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test1 WHERE pk = 1")
|
||||
assert len(rows) == 1, f"Expected 1 row for key 1 after new insert, but got {len(rows)}"
|
||||
assert rows[0].v == 'new_value', f"Expected value 'new_value' for key 1 after new insert, but got {rows[0].v}"
|
||||
|
||||
# verify test2 again
|
||||
for i in range(20):
|
||||
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test2 WHERE pk = {i}")
|
||||
assert len(rows) == 1, f"Expected 1 row for key {i} in test2 after all operations, but got {len(rows)}"
|
||||
assert rows[0].v == value, f"Expected value of size {value_size} for key {i} in test2 after all operations, but got {len(rows[0].v)}"
|
||||
|
||||
Reference in New Issue
Block a user