Compare commits
3 Commits
copilot/pr
...
scylla-4.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bbef05ae3c | ||
|
|
6f324cb732 | ||
|
|
239499a35a |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.3.rc0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
25
cdc/log.cc
25
cdc/log.cc
@@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
|
||||
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
|
||||
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
|
||||
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
|
||||
b.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
|
||||
for (const auto& column : columns) {
|
||||
@@ -880,14 +881,26 @@ public:
|
||||
return _base_schema;
|
||||
}
|
||||
|
||||
clustering_key create_ck(int batch) const {
|
||||
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
|
||||
}
|
||||
|
||||
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
|
||||
// The numbering of batch sequence numbers starts from 0.
|
||||
clustering_key allocate_new_log_row() {
|
||||
auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) });
|
||||
auto log_ck = create_ck(_batch_no++);
|
||||
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
|
||||
return log_ck;
|
||||
}
|
||||
|
||||
bool has_rows() const {
|
||||
return _batch_no != 0;
|
||||
}
|
||||
|
||||
clustering_key last_row_key() const {
|
||||
return create_ck(_batch_no - 1);
|
||||
}
|
||||
|
||||
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
|
||||
clustering_key allocate_new_log_row(operation op) {
|
||||
auto log_ck = allocate_new_log_row();
|
||||
@@ -944,6 +957,11 @@ public:
|
||||
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
|
||||
}
|
||||
|
||||
void end_record() {
|
||||
if (has_rows()) {
|
||||
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
|
||||
}
|
||||
}
|
||||
private:
|
||||
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
|
||||
size_t pos = 0;
|
||||
@@ -1519,6 +1537,11 @@ public:
|
||||
cdc::inspect_mutation(m, v);
|
||||
}
|
||||
|
||||
void end_record() override {
|
||||
assert(_builder);
|
||||
_builder->end_record();
|
||||
}
|
||||
|
||||
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
|
||||
// The `transformer` object on which this method was called on should not be used anymore.
|
||||
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
|
||||
|
||||
@@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
|
||||
processor.produce_postimage(&ck);
|
||||
}
|
||||
}
|
||||
|
||||
processor.end_record();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro
|
||||
processor.produce_postimage(&cr.key());
|
||||
}
|
||||
}
|
||||
|
||||
processor.end_record();
|
||||
}
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -77,6 +77,10 @@ public:
|
||||
// both columns have different timestamp or TTL set.
|
||||
// m - the small mutation to be converted into CDC log rows.
|
||||
virtual void process_change(const mutation& m) = 0;
|
||||
|
||||
// Tells processor we have reached end of record - last part
|
||||
// of a given timestamp batch
|
||||
virtual void end_record() = 0;
|
||||
};
|
||||
|
||||
bool should_split(const mutation& base_mutation);
|
||||
|
||||
@@ -694,7 +694,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=666.development
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/scylla-4.3/latest/scylla.repo
|
||||
ARG VERSION=4.3.rc0
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -326,6 +326,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
|
||||
// cdc log clustering key
|
||||
assert_has_column(cdc::log_meta_column_name("operation"), byte_type);
|
||||
assert_has_column(cdc::log_meta_column_name("ttl"), long_type);
|
||||
assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type);
|
||||
|
||||
// pk
|
||||
assert_has_column(cdc::log_data_column_name("pk"), int32_type);
|
||||
@@ -534,6 +535,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
auto val_index = column_index(*rows, cdc::log_data_column_name("val"));
|
||||
auto val2_index = column_index(*rows, cdc::log_data_column_name("val2"));
|
||||
auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl"));
|
||||
auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch"));
|
||||
|
||||
auto val_type = int32_type;
|
||||
auto val = *first[0][val_index];
|
||||
@@ -583,10 +585,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
if (post_enabled) {
|
||||
val = *post_image.back()[val_index];
|
||||
val2 = *post_image.back()[val2_index];
|
||||
auto eor = *post_image.back()[eor_index];
|
||||
|
||||
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]);
|
||||
BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val)));
|
||||
BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2)));
|
||||
BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor)));
|
||||
}
|
||||
|
||||
const auto& ttl_cell = second[second.size() - 2][ttl_index];
|
||||
|
||||
Reference in New Issue
Block a user