diff --git a/alternator/streams.cc b/alternator/streams.cc index 698812f6e1..581942b627 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -908,10 +908,10 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche break; case stream_view_type::NEW_AND_OLD_IMAGES: opts.postimage(true); - opts.preimage(true); + opts.preimage(cdc::image_mode::full); break; case stream_view_type::OLD_IMAGE: - opts.preimage(true); + opts.preimage(cdc::image_mode::full); break; case stream_view_type::NEW_IMAGE: opts.postimage(true); diff --git a/cdc/cdc_options.hh b/cdc/cdc_options.hh index def27bfc5f..b447b5edb4 100644 --- a/cdc/cdc_options.hh +++ b/cdc/cdc_options.hh @@ -27,15 +27,31 @@ namespace cdc { -enum class delta_mode { +enum class delta_mode : uint8_t { off, keys, full, }; +/** + * (for now only pre-) image collection mode. + * Stating how much info to record. + * off == none + * on == changed columns + * full == all (changed and unmodified columns) + */ +enum class image_mode : uint8_t { + off, + on, + full, +}; + +std::ostream& operator<<(std::ostream& os, delta_mode); +std::ostream& operator<<(std::ostream& os, image_mode); + class options final { bool _enabled = false; - bool _preimage = false; + image_mode _preimage = image_mode::off; bool _postimage = false; delta_mode _delta_mode = delta_mode::full; int _ttl = 86400; // 24h in seconds @@ -47,13 +63,15 @@ public: sstring to_sstring() const; bool enabled() const { return _enabled; } - bool preimage() const { return _preimage; } + bool preimage() const { return _preimage != image_mode::off; } + bool full_preimage() const { return _preimage == image_mode::full; } bool postimage() const { return _postimage; } delta_mode get_delta_mode() const { return _delta_mode; } int ttl() const { return _ttl; } void enabled(bool b) { _enabled = b; } - void preimage(bool b) { _preimage = b; } + void preimage(bool b) { preimage(b ? image_mode::on : image_mode::off); } + void preimage(image_mode m) { _preimage = m; } void postimage(bool b) { _postimage = b; } void ttl(int v) { _ttl = v; } diff --git a/cdc/log.cc b/cdc/log.cc index 46d4ca699c..a81c264b25 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -293,9 +293,13 @@ future<> cdc::cdc_service::stop() { cdc::cdc_service::~cdc_service() = default; namespace { -const sstring delta_mode_string_off = "off"; -const sstring delta_mode_string_keys = "keys"; -const sstring delta_mode_string_full = "full"; +static const sstring delta_mode_string_off = "off"; +static const sstring delta_mode_string_keys = "keys"; +static const sstring delta_mode_string_full = "full"; + +static const std::string_view image_mode_string_on = "on"; +static const std::string_view image_mode_string_off = delta_mode_string_off; +static const std::string_view image_mode_string_full = delta_mode_string_full; sstring to_string(cdc::delta_mode dm) { switch (dm) { @@ -305,29 +309,64 @@ sstring to_string(cdc::delta_mode dm) { } throw std::logic_error("Impossible value of cdc::delta_mode"); } + +sstring to_string(cdc::image_mode m) { + switch (m) { + case cdc::image_mode::off : return "false"; + case cdc::image_mode::on : return "true"; + case cdc::image_mode::full : return sstring(image_mode_string_full); + } + throw std::logic_error("Impossible value of cdc::image_mode"); +} + } // anon. namespace +std::ostream& cdc::operator<<(std::ostream& os, delta_mode m) { + return os << to_string(m); +} + +std::ostream& cdc::operator<<(std::ostream& os, image_mode m) { + return os << to_string(m); +} + cdc::options::options(const std::map& map) { if (!map.contains("enabled")) { return; } for (auto& p : map) { - if (p.first == "enabled") { - _enabled = p.second == "true"; - } else if (p.first == "preimage") { - _preimage = p.second == "true"; - } else if (p.first == "postimage") { - _postimage = p.second == "true"; - } else if (p.first == "delta") { - if (p.second == delta_mode_string_keys) { + auto key = p.first; + auto val = p.second; + + std::transform(key.begin(), key.end(), key.begin(), ::tolower); + std::transform(val.begin(), val.end(), val.begin(), ::tolower); + + auto is_true = val == "true" || val == "1"; + auto is_false = val == "false" || val == "0"; + + if (key == "enabled") { + _enabled = is_true; + } else if (key == "preimage") { + if (is_true || val == image_mode_string_on) { + _preimage = image_mode::on; + } else if (val == image_mode_string_full) { + _preimage = image_mode::full; + } else if (val == image_mode_string_off || is_false) { + _preimage = image_mode::off; + } else { + throw exceptions::configuration_exception("Invalid value for CDC option \"preimage\": " + p.second); + } + } else if (key == "postimage") { + _postimage = is_true; + } else if (key == "delta") { + if (val == delta_mode_string_keys) { _delta_mode = delta_mode::keys; - } else if (p.second == delta_mode_string_off) { + } else if (val == delta_mode_string_off) { _delta_mode = delta_mode::off; - } else if (p.second != delta_mode_string_full) { + } else if (val != delta_mode_string_full) { throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second); } - } else if (p.first == "ttl") { + } else if (key == "ttl") { _ttl = std::stoi(p.second); if (_ttl < 0) { throw exceptions::configuration_exception("Invalid CDC option: ttl must be >= 0"); @@ -337,7 +376,7 @@ cdc::options::options(const std::map& map) { } } - if (_enabled && !_preimage && !_postimage && _delta_mode == delta_mode::off) { + if (_enabled && !preimage() && !postimage() && get_delta_mode() == delta_mode::off) { throw exceptions::configuration_exception("Invalid combination of CDC options: neither of" " {preimage, postimage, delta} is enabled"); } @@ -349,7 +388,7 @@ std::map cdc::options::to_map() const { } return { { "enabled", _enabled ? "true" : "false" }, - { "preimage", _preimage ? "true" : "false" }, + { "preimage", to_string(_preimage) }, { "postimage", _postimage ? "true" : "false" }, { "delta", to_string(_delta_mode) }, { "ttl", std::to_string(_ttl) }, @@ -1043,7 +1082,8 @@ public: } void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override { - generate_image(operation::pre_image, ck, &columns_to_include); + // iff we want full preimage, just ignore the affected columns and include everything. + generate_image(operation::pre_image, ck, _schema->cdc_options().full_preimage() ? nullptr : &columns_to_include); }; void produce_postimage(const clustering_key* ck) override { @@ -1417,7 +1457,7 @@ public: // TODO: this assumes all mutations touch the same set of columns. This might not be true, and we may need to do more horrible set operation here. if (!p.static_row().empty()) { // for postimage we need everything... - if (_schema->cdc_options().postimage()) { + if (_schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) { for (const column_definition& c: _schema->static_columns()) { static_columns.emplace_back(c.id); columns.emplace_back(&c); @@ -1435,7 +1475,7 @@ public: return re.row().deleted_at(); }); // for postimage we need everything... - if (has_row_delete || _schema->cdc_options().postimage()) { + if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) { for (const column_definition& c: _schema->regular_columns()) { regular_columns.emplace_back(c.id); columns.emplace_back(&c); diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index d6cd4f17fd..89cbc84c7e 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -517,7 +517,7 @@ SEASTAR_THREAD_TEST_CASE(test_primary_key_logging) { SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { do_with_cql_env_thread([](cql_test_env& e) { - auto test = [&e] (bool pre_enabled, bool post_enabled, bool with_ttl) { + auto test = [&e] (cdc::image_mode pre_enabled, bool post_enabled, bool with_ttl) { // note: 'val3' column is not used, but since not set in initial update, would provoke #6143 unless fixed. cquery_nofail(e, format("CREATE TABLE ks.tbl (pk int, pk2 int, ck int, ck2 int, val int, val2 int, val3 int, PRIMARY KEY((pk, pk2), ck, ck2)) " "WITH cdc = {{'enabled':'true', 'preimage':'{}', 'postimage':'{}'}}", pre_enabled, post_enabled)); @@ -525,7 +525,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { auto rows = select_log(e, "tbl"); - BOOST_REQUIRE_EQUAL(!pre_enabled, to_bytes_filtered(*rows, cdc::operation::pre_image).empty()); + BOOST_REQUIRE_EQUAL(pre_enabled == cdc::image_mode::off, to_bytes_filtered(*rows, cdc::operation::pre_image).empty()); BOOST_REQUIRE_EQUAL(!post_enabled, to_bytes_filtered(*rows, cdc::operation::post_image).empty()); auto first = to_bytes_filtered(*rows, cdc::operation::update); @@ -556,14 +556,14 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { auto second = to_bytes_filtered(*rows, cdc::operation::update); auto post_image = to_bytes_filtered(*rows, cdc::operation::post_image); - BOOST_REQUIRE_EQUAL(!pre_enabled, pre_image.empty()); + BOOST_REQUIRE_EQUAL(pre_enabled == cdc::image_mode::off, pre_image.empty()); BOOST_REQUIRE_EQUAL(!post_enabled, post_image.empty()); sort_by_time(*rows, second); sort_by_time(*rows, pre_image); sort_by_time(*rows, post_image); - if (pre_enabled) { + if (pre_enabled != cdc::image_mode::off) { BOOST_REQUIRE_EQUAL(pre_image.size(), i + 2); val = *pre_image.back()[val_index]; @@ -573,6 +573,13 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { BOOST_REQUIRE_EQUAL(bytes_opt(), pre_image.back()[ttl_index]); } + if (pre_enabled == cdc::image_mode::full) { + BOOST_REQUIRE_EQUAL(int32_type->decompose(22222), *pre_image.back()[val2_index]); + } + if (pre_enabled == cdc::image_mode::on) { + BOOST_REQUIRE(!pre_image.back()[val2_index]); + } + if (post_enabled) { val = *post_image.back()[val_index]; val2 = *post_image.back()[val2_index]; @@ -594,7 +601,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) { } e.execute_cql("DROP TABLE ks.tbl").get(); }; - for (auto pre : { true, false}) { + for (auto pre : { cdc::image_mode::on, cdc::image_mode::full, cdc::image_mode::off }) { for (auto post : { true, false }) { for (auto ttl : { true, false}) { test(pre, post, ttl);