mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
db: add method to update the system table COMPACTION_HISTORY
It's supposed to be called at the end of compaction. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -1010,5 +1010,32 @@ query(distributed<service::storage_proxy>& proxy, const sstring& cf_name, const
|
||||
});
|
||||
}
|
||||
|
||||
static map_type_impl::native_type prepare_rows_merged(std::unordered_map<int32_t, int64_t>& rows_merged) {
|
||||
map_type_impl::native_type tmp;
|
||||
for (auto& r: rows_merged) {
|
||||
int32_t first = r.first;
|
||||
int64_t second = r.second;
|
||||
auto map_element = std::make_pair<data_value, data_value>(data_value(first), data_value(second));
|
||||
tmp.push_back(std::move(map_element));
|
||||
}
|
||||
return tmp;
|
||||
}
|
||||
|
||||
future<> update_compaction_history(sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
|
||||
std::unordered_map<int32_t, int64_t> rows_merged)
|
||||
{
|
||||
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
|
||||
if (ksname == "system" && cfname == COMPACTION_HISTORY) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto map_type = map_type_impl::get_instance(int32_type, long_type, true);
|
||||
|
||||
sstring req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
|
||||
return execute_cql(req, COMPACTION_HISTORY, utils::UUID_gen::get_time_UUID(), ksname, cfname, compacted_at, bytes_in, bytes_out,
|
||||
make_map_value(map_type, prepare_rows_merged(rows_merged))).discard_result();
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
} // namespace db
|
||||
|
||||
@@ -259,26 +259,15 @@ enum class bootstrap_state {
|
||||
compactionLog.truncateBlocking();
|
||||
}
|
||||
|
||||
public static void updateCompactionHistory(String ksname,
|
||||
String cfname,
|
||||
long compactedAt,
|
||||
long bytesIn,
|
||||
long bytesOut,
|
||||
Map<Integer, Long> rowsMerged)
|
||||
{
|
||||
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
|
||||
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
|
||||
return;
|
||||
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
|
||||
}
|
||||
|
||||
public static TabularData getCompactionHistory() throws OpenDataException
|
||||
{
|
||||
UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
|
||||
return CompactionHistoryTabularData.from(queryResultSet);
|
||||
}
|
||||
#endif
|
||||
future<> update_compaction_history(sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
|
||||
std::unordered_map<int32_t, int64_t> rows_merged);
|
||||
|
||||
typedef std::vector<db::replay_position> replay_positions;
|
||||
|
||||
future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, db::replay_position);
|
||||
|
||||
Reference in New Issue
Block a user