mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-02 13:06:57 +00:00
merge: CDC: allow "full" preimage in logs
Merged pull request https://github.com/scylladb/scylla/pull/7028 By Calle Wilund: Changes the "preimage" option from binary true/false to on/off/full (accepting true/false, and using old style notation for normal to string - for upgrade reasons), where "full" will force us to include all columns in pre image log rows. Adds small test (just adding the case to preimage test). Uses the feature in alternator Fixes #7030 alternator: Set "preimage" to "full" for streams cdc_test: Do small test of "full" cdc: Make pre image optionally "full" (include all columns)
This commit is contained in:
@@ -908,10 +908,10 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
|
||||
break;
|
||||
case stream_view_type::NEW_AND_OLD_IMAGES:
|
||||
opts.postimage(true);
|
||||
opts.preimage(true);
|
||||
opts.preimage(cdc::image_mode::full);
|
||||
break;
|
||||
case stream_view_type::OLD_IMAGE:
|
||||
opts.preimage(true);
|
||||
opts.preimage(cdc::image_mode::full);
|
||||
break;
|
||||
case stream_view_type::NEW_IMAGE:
|
||||
opts.postimage(true);
|
||||
|
||||
@@ -27,15 +27,31 @@
|
||||
|
||||
namespace cdc {
|
||||
|
||||
enum class delta_mode {
|
||||
enum class delta_mode : uint8_t {
|
||||
off,
|
||||
keys,
|
||||
full,
|
||||
};
|
||||
|
||||
/**
|
||||
* (for now only pre-) image collection mode.
|
||||
* Stating how much info to record.
|
||||
* off == none
|
||||
* on == changed columns
|
||||
* full == all (changed and unmodified columns)
|
||||
*/
|
||||
enum class image_mode : uint8_t {
|
||||
off,
|
||||
on,
|
||||
full,
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, delta_mode);
|
||||
std::ostream& operator<<(std::ostream& os, image_mode);
|
||||
|
||||
class options final {
|
||||
bool _enabled = false;
|
||||
bool _preimage = false;
|
||||
image_mode _preimage = image_mode::off;
|
||||
bool _postimage = false;
|
||||
delta_mode _delta_mode = delta_mode::full;
|
||||
int _ttl = 86400; // 24h in seconds
|
||||
@@ -47,13 +63,15 @@ public:
|
||||
sstring to_sstring() const;
|
||||
|
||||
bool enabled() const { return _enabled; }
|
||||
bool preimage() const { return _preimage; }
|
||||
bool preimage() const { return _preimage != image_mode::off; }
|
||||
bool full_preimage() const { return _preimage == image_mode::full; }
|
||||
bool postimage() const { return _postimage; }
|
||||
delta_mode get_delta_mode() const { return _delta_mode; }
|
||||
int ttl() const { return _ttl; }
|
||||
|
||||
void enabled(bool b) { _enabled = b; }
|
||||
void preimage(bool b) { _preimage = b; }
|
||||
void preimage(bool b) { preimage(b ? image_mode::on : image_mode::off); }
|
||||
void preimage(image_mode m) { _preimage = m; }
|
||||
void postimage(bool b) { _postimage = b; }
|
||||
void ttl(int v) { _ttl = v; }
|
||||
|
||||
|
||||
78
cdc/log.cc
78
cdc/log.cc
@@ -293,9 +293,13 @@ future<> cdc::cdc_service::stop() {
|
||||
cdc::cdc_service::~cdc_service() = default;
|
||||
|
||||
namespace {
|
||||
const sstring delta_mode_string_off = "off";
|
||||
const sstring delta_mode_string_keys = "keys";
|
||||
const sstring delta_mode_string_full = "full";
|
||||
static const sstring delta_mode_string_off = "off";
|
||||
static const sstring delta_mode_string_keys = "keys";
|
||||
static const sstring delta_mode_string_full = "full";
|
||||
|
||||
static const std::string_view image_mode_string_on = "on";
|
||||
static const std::string_view image_mode_string_off = delta_mode_string_off;
|
||||
static const std::string_view image_mode_string_full = delta_mode_string_full;
|
||||
|
||||
sstring to_string(cdc::delta_mode dm) {
|
||||
switch (dm) {
|
||||
@@ -305,29 +309,64 @@ sstring to_string(cdc::delta_mode dm) {
|
||||
}
|
||||
throw std::logic_error("Impossible value of cdc::delta_mode");
|
||||
}
|
||||
|
||||
sstring to_string(cdc::image_mode m) {
|
||||
switch (m) {
|
||||
case cdc::image_mode::off : return "false";
|
||||
case cdc::image_mode::on : return "true";
|
||||
case cdc::image_mode::full : return sstring(image_mode_string_full);
|
||||
}
|
||||
throw std::logic_error("Impossible value of cdc::image_mode");
|
||||
}
|
||||
|
||||
} // anon. namespace
|
||||
|
||||
std::ostream& cdc::operator<<(std::ostream& os, delta_mode m) {
|
||||
return os << to_string(m);
|
||||
}
|
||||
|
||||
std::ostream& cdc::operator<<(std::ostream& os, image_mode m) {
|
||||
return os << to_string(m);
|
||||
}
|
||||
|
||||
cdc::options::options(const std::map<sstring, sstring>& map) {
|
||||
if (!map.contains("enabled")) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& p : map) {
|
||||
if (p.first == "enabled") {
|
||||
_enabled = p.second == "true";
|
||||
} else if (p.first == "preimage") {
|
||||
_preimage = p.second == "true";
|
||||
} else if (p.first == "postimage") {
|
||||
_postimage = p.second == "true";
|
||||
} else if (p.first == "delta") {
|
||||
if (p.second == delta_mode_string_keys) {
|
||||
auto key = p.first;
|
||||
auto val = p.second;
|
||||
|
||||
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
|
||||
std::transform(val.begin(), val.end(), val.begin(), ::tolower);
|
||||
|
||||
auto is_true = val == "true" || val == "1";
|
||||
auto is_false = val == "false" || val == "0";
|
||||
|
||||
if (key == "enabled") {
|
||||
_enabled = is_true;
|
||||
} else if (key == "preimage") {
|
||||
if (is_true || val == image_mode_string_on) {
|
||||
_preimage = image_mode::on;
|
||||
} else if (val == image_mode_string_full) {
|
||||
_preimage = image_mode::full;
|
||||
} else if (val == image_mode_string_off || is_false) {
|
||||
_preimage = image_mode::off;
|
||||
} else {
|
||||
throw exceptions::configuration_exception("Invalid value for CDC option \"preimage\": " + p.second);
|
||||
}
|
||||
} else if (key == "postimage") {
|
||||
_postimage = is_true;
|
||||
} else if (key == "delta") {
|
||||
if (val == delta_mode_string_keys) {
|
||||
_delta_mode = delta_mode::keys;
|
||||
} else if (p.second == delta_mode_string_off) {
|
||||
} else if (val == delta_mode_string_off) {
|
||||
_delta_mode = delta_mode::off;
|
||||
} else if (p.second != delta_mode_string_full) {
|
||||
} else if (val != delta_mode_string_full) {
|
||||
throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second);
|
||||
}
|
||||
} else if (p.first == "ttl") {
|
||||
} else if (key == "ttl") {
|
||||
_ttl = std::stoi(p.second);
|
||||
if (_ttl < 0) {
|
||||
throw exceptions::configuration_exception("Invalid CDC option: ttl must be >= 0");
|
||||
@@ -337,7 +376,7 @@ cdc::options::options(const std::map<sstring, sstring>& map) {
|
||||
}
|
||||
}
|
||||
|
||||
if (_enabled && !_preimage && !_postimage && _delta_mode == delta_mode::off) {
|
||||
if (_enabled && !preimage() && !postimage() && get_delta_mode() == delta_mode::off) {
|
||||
throw exceptions::configuration_exception("Invalid combination of CDC options: neither of"
|
||||
" {preimage, postimage, delta} is enabled");
|
||||
}
|
||||
@@ -349,7 +388,7 @@ std::map<sstring, sstring> cdc::options::to_map() const {
|
||||
}
|
||||
return {
|
||||
{ "enabled", _enabled ? "true" : "false" },
|
||||
{ "preimage", _preimage ? "true" : "false" },
|
||||
{ "preimage", to_string(_preimage) },
|
||||
{ "postimage", _postimage ? "true" : "false" },
|
||||
{ "delta", to_string(_delta_mode) },
|
||||
{ "ttl", std::to_string(_ttl) },
|
||||
@@ -1043,7 +1082,8 @@ public:
|
||||
}
|
||||
|
||||
void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override {
|
||||
generate_image(operation::pre_image, ck, &columns_to_include);
|
||||
// iff we want full preimage, just ignore the affected columns and include everything.
|
||||
generate_image(operation::pre_image, ck, _schema->cdc_options().full_preimage() ? nullptr : &columns_to_include);
|
||||
};
|
||||
|
||||
void produce_postimage(const clustering_key* ck) override {
|
||||
@@ -1417,7 +1457,7 @@ public:
|
||||
// TODO: this assumes all mutations touch the same set of columns. This might not be true, and we may need to do more horrible set operation here.
|
||||
if (!p.static_row().empty()) {
|
||||
// for postimage we need everything...
|
||||
if (_schema->cdc_options().postimage()) {
|
||||
if (_schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) {
|
||||
for (const column_definition& c: _schema->static_columns()) {
|
||||
static_columns.emplace_back(c.id);
|
||||
columns.emplace_back(&c);
|
||||
@@ -1435,7 +1475,7 @@ public:
|
||||
return re.row().deleted_at();
|
||||
});
|
||||
// for postimage we need everything...
|
||||
if (has_row_delete || _schema->cdc_options().postimage()) {
|
||||
if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) {
|
||||
for (const column_definition& c: _schema->regular_columns()) {
|
||||
regular_columns.emplace_back(c.id);
|
||||
columns.emplace_back(&c);
|
||||
|
||||
@@ -517,7 +517,7 @@ SEASTAR_THREAD_TEST_CASE(test_primary_key_logging) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
do_with_cql_env_thread([](cql_test_env& e) {
|
||||
auto test = [&e] (bool pre_enabled, bool post_enabled, bool with_ttl) {
|
||||
auto test = [&e] (cdc::image_mode pre_enabled, bool post_enabled, bool with_ttl) {
|
||||
// note: 'val3' column is not used, but since not set in initial update, would provoke #6143 unless fixed.
|
||||
cquery_nofail(e, format("CREATE TABLE ks.tbl (pk int, pk2 int, ck int, ck2 int, val int, val2 int, val3 int, PRIMARY KEY((pk, pk2), ck, ck2)) "
|
||||
"WITH cdc = {{'enabled':'true', 'preimage':'{}', 'postimage':'{}'}}", pre_enabled, post_enabled));
|
||||
@@ -525,7 +525,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
|
||||
auto rows = select_log(e, "tbl");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(!pre_enabled, to_bytes_filtered(*rows, cdc::operation::pre_image).empty());
|
||||
BOOST_REQUIRE_EQUAL(pre_enabled == cdc::image_mode::off, to_bytes_filtered(*rows, cdc::operation::pre_image).empty());
|
||||
BOOST_REQUIRE_EQUAL(!post_enabled, to_bytes_filtered(*rows, cdc::operation::post_image).empty());
|
||||
|
||||
auto first = to_bytes_filtered(*rows, cdc::operation::update);
|
||||
@@ -556,14 +556,14 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
auto second = to_bytes_filtered(*rows, cdc::operation::update);
|
||||
auto post_image = to_bytes_filtered(*rows, cdc::operation::post_image);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(!pre_enabled, pre_image.empty());
|
||||
BOOST_REQUIRE_EQUAL(pre_enabled == cdc::image_mode::off, pre_image.empty());
|
||||
BOOST_REQUIRE_EQUAL(!post_enabled, post_image.empty());
|
||||
|
||||
sort_by_time(*rows, second);
|
||||
sort_by_time(*rows, pre_image);
|
||||
sort_by_time(*rows, post_image);
|
||||
|
||||
if (pre_enabled) {
|
||||
if (pre_enabled != cdc::image_mode::off) {
|
||||
BOOST_REQUIRE_EQUAL(pre_image.size(), i + 2);
|
||||
|
||||
val = *pre_image.back()[val_index];
|
||||
@@ -573,6 +573,13 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
BOOST_REQUIRE_EQUAL(bytes_opt(), pre_image.back()[ttl_index]);
|
||||
}
|
||||
|
||||
if (pre_enabled == cdc::image_mode::full) {
|
||||
BOOST_REQUIRE_EQUAL(int32_type->decompose(22222), *pre_image.back()[val2_index]);
|
||||
}
|
||||
if (pre_enabled == cdc::image_mode::on) {
|
||||
BOOST_REQUIRE(!pre_image.back()[val2_index]);
|
||||
}
|
||||
|
||||
if (post_enabled) {
|
||||
val = *post_image.back()[val_index];
|
||||
val2 = *post_image.back()[val2_index];
|
||||
@@ -594,7 +601,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
}
|
||||
e.execute_cql("DROP TABLE ks.tbl").get();
|
||||
};
|
||||
for (auto pre : { true, false}) {
|
||||
for (auto pre : { cdc::image_mode::on, cdc::image_mode::full, cdc::image_mode::off }) {
|
||||
for (auto post : { true, false }) {
|
||||
for (auto ttl : { true, false}) {
|
||||
test(pre, post, ttl);
|
||||
|
||||
Reference in New Issue
Block a user