Compare commits
11 Commits
next
...
scylla-4.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afc18d5070 | ||
|
|
2ec22c2404 | ||
|
|
19da778271 | ||
|
|
cbd4c13ba6 | ||
|
|
338871802d | ||
|
|
8b5b1b8af6 | ||
|
|
ea89eff95d | ||
|
|
96421e7779 | ||
|
|
142336ca53 | ||
|
|
492f12248c | ||
|
|
7eb7a0e5fe |
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=4.6.dev
|
||||
VERSION=4.6.rc0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -995,6 +995,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
|
||||
}
|
||||
|
||||
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
|
||||
paging_state_copy->set_remaining(internal_paging_size);
|
||||
paging_state_copy->set_partition_key(std::move(index_pk));
|
||||
paging_state_copy->set_clustering_key(std::move(index_ck));
|
||||
return std::move(paging_state_copy);
|
||||
|
||||
@@ -512,6 +512,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
|
||||
uint64_t _file_pos = 0;
|
||||
uint64_t _flush_pos = 0;
|
||||
uint64_t _size_on_disk = 0;
|
||||
uint64_t _waste = 0;
|
||||
|
||||
size_t _alignment;
|
||||
|
||||
@@ -598,7 +599,7 @@ public:
|
||||
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
|
||||
++_segment_manager->totals.segments_destroyed;
|
||||
_segment_manager->totals.active_size_on_disk -= file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
|
||||
_segment_manager->totals.wasted_size_on_disk -= _waste;
|
||||
_segment_manager->add_file_to_delete(_file_name, _desc);
|
||||
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
|
||||
clogger.warn("Segment {} is dirty and is left on disk.", *this);
|
||||
@@ -725,7 +726,8 @@ public:
|
||||
auto s = co_await sync();
|
||||
co_await flush();
|
||||
co_await terminate();
|
||||
_segment_manager->totals.wasted_size_on_disk += (_size_on_disk - file_position());
|
||||
_waste = _size_on_disk - file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk += _waste;
|
||||
co_return s;
|
||||
}
|
||||
future<sseg_ptr> do_flush(uint64_t pos) {
|
||||
|
||||
6
dist/common/scripts/scylla_ntp_setup
vendored
6
dist/common/scripts/scylla_ntp_setup
vendored
@@ -66,18 +66,18 @@ if __name__ == '__main__':
|
||||
|
||||
target = None
|
||||
if os.path.exists('/lib/systemd/systemd-timesyncd'):
|
||||
if systemd_unit('systemd-timesyncd').is_active():
|
||||
if systemd_unit('systemd-timesyncd').is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
target = 'systemd-timesyncd'
|
||||
if shutil.which('chronyd'):
|
||||
if get_chrony_unit().is_active():
|
||||
if get_chrony_unit().is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
if not target:
|
||||
target = 'chrony'
|
||||
if shutil.which('ntpd'):
|
||||
if get_ntp_unit().is_active():
|
||||
if get_ntp_unit().is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
if not target:
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -1041,7 +1041,7 @@ class systemd_unit:
|
||||
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
|
||||
|
||||
def is_active(self):
|
||||
return True if run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip() == 'active' else False
|
||||
return run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
|
||||
def mask(self):
|
||||
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)
|
||||
|
||||
6
dist/docker/debian/build_docker.sh
vendored
6
dist/docker/debian/build_docker.sh
vendored
@@ -25,6 +25,10 @@ product="$(<build/SCYLLA-PRODUCT-FILE)"
|
||||
version="$(<build/SCYLLA-VERSION-FILE)"
|
||||
release="$(<build/SCYLLA-RELEASE-FILE)"
|
||||
|
||||
if [[ "$version" = *rc* ]]; then
|
||||
version=$(echo $version |sed 's/\(.*\)\.)*/\1~/')
|
||||
fi
|
||||
|
||||
mode="release"
|
||||
|
||||
if uname -m | grep x86_64 ; then
|
||||
@@ -93,7 +97,7 @@ run apt-get -y install hostname supervisor openssh-server openssh-client openjdk
|
||||
run locale-gen en_US.UTF-8
|
||||
run bash -ec "dpkg -i packages/*.deb"
|
||||
run apt-get -y clean all
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bashrc"
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
run mkdir -p /etc/supervisor.conf.d
|
||||
run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
|
||||
@@ -184,14 +184,18 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
|
||||
_logger.info("exception while advertising new connection: {}", std::current_exception());
|
||||
}
|
||||
// Block while monitoring for lifetime/errors.
|
||||
return conn->process().finally([this, conn] {
|
||||
return unadvertise_connection(conn);
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
if (is_broken_pipe_or_connection_reset(ep)) {
|
||||
// expected if another side closes a connection or we're shutting down
|
||||
return;
|
||||
return conn->process().then_wrapped([this, conn] (auto f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
auto ep = std::current_exception();
|
||||
if (!is_broken_pipe_or_connection_reset(ep)) {
|
||||
// some exceptions are expected if another side closes a connection
|
||||
// or we're shutting down
|
||||
_logger.info("exception while processing connection: {}", ep);
|
||||
}
|
||||
}
|
||||
_logger.info("exception while processing connection: {}", ep);
|
||||
return unadvertise_connection(conn);
|
||||
});
|
||||
});
|
||||
return stop_iteration::no;
|
||||
|
||||
@@ -477,49 +477,42 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
|
||||
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
|
||||
}
|
||||
|
||||
rpc::no_wait_type gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
|
||||
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
|
||||
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
|
||||
logger.warn("Failed to handle {}: {}", type, ep);
|
||||
});
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
}
|
||||
|
||||
void gossiper::init_messaging_service_handler() {
|
||||
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_syn_msg(from, std::move(syn_msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_ack_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_ack2_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return handle_echo_msg(from, generation_number_opt);
|
||||
});
|
||||
_messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
|
||||
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
|
||||
return gossiper.handle_shutdown_msg(from, generation_number_opt);
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
|
||||
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
|
||||
@@ -2178,6 +2171,9 @@ future<> gossiper::start() {
|
||||
}
|
||||
|
||||
future<> gossiper::shutdown() {
|
||||
if (!_background_msg.is_closed()) {
|
||||
co_await _background_msg.close();
|
||||
}
|
||||
if (this_shard_id() == 0) {
|
||||
co_await do_stop_gossiping();
|
||||
}
|
||||
|
||||
@@ -41,7 +41,9 @@
|
||||
#include "unimplemented.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/print.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include "utils/atomic_vector.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -138,12 +140,16 @@ private:
|
||||
bool _enabled = false;
|
||||
semaphore _callback_running{1};
|
||||
semaphore _apply_state_locally_semaphore{100};
|
||||
seastar::gate _background_msg;
|
||||
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
|
||||
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
|
||||
bool _advertise_myself = true;
|
||||
// Map ip address and generation number
|
||||
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
|
||||
future<> _failure_detector_loop_done{make_ready_future<>()} ;
|
||||
|
||||
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
|
||||
|
||||
public:
|
||||
// Get current generation number for the given nodes
|
||||
future<std::unordered_map<gms::inet_address, int32_t>>
|
||||
|
||||
@@ -613,7 +613,8 @@ static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
|
||||
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
|
||||
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
} else {
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
schema_ptr snp_schema = snp->schema();
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
|
||||
auto rd = std::move(rd_ref);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
while (!rd.is_end_of_stream()) {
|
||||
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
|
||||
co_await rd.fill_buffer();
|
||||
while (!rd.is_buffer_empty()) {
|
||||
co_await rd.pop_mutation_fragment().consume(wr);
|
||||
|
||||
@@ -49,12 +49,13 @@ private:
|
||||
public:
|
||||
partition_index_cache* _parent;
|
||||
key_type _key;
|
||||
std::variant<shared_promise<>, partition_index_page> _page;
|
||||
std::variant<lw_shared_ptr<shared_promise<>>, partition_index_page> _page;
|
||||
size_t _size_in_allocator = 0;
|
||||
public:
|
||||
entry(partition_index_cache* parent, key_type key)
|
||||
: _parent(parent)
|
||||
, _key(key)
|
||||
, _page(make_lw_shared<shared_promise<>>())
|
||||
{ }
|
||||
|
||||
void set_page(partition_index_page&& page) noexcept {
|
||||
@@ -76,7 +77,7 @@ private:
|
||||
// Always returns the same value for a given state of _page.
|
||||
size_t size_in_allocator() const { return _size_in_allocator; }
|
||||
|
||||
shared_promise<>& promise() { return std::get<shared_promise<>>(_page); }
|
||||
lw_shared_ptr<shared_promise<>> promise() { return std::get<lw_shared_ptr<shared_promise<>>>(_page); }
|
||||
bool ready() const { return std::holds_alternative<partition_index_page>(_page); }
|
||||
partition_index_page& page() { return std::get<partition_index_page>(_page); }
|
||||
const partition_index_page& page() const { return std::get<partition_index_page>(_page); }
|
||||
@@ -207,9 +208,7 @@ public:
|
||||
return make_ready_future<entry_ptr>(std::move(ptr));
|
||||
} else {
|
||||
++_shard_stats.blocks;
|
||||
return _as(_region, [ptr] () mutable {
|
||||
return ptr.get_entry().promise().get_shared_future();
|
||||
}).then([ptr] () mutable {
|
||||
return ptr.get_entry().promise()->get_shared_future().then([ptr] () mutable {
|
||||
return std::move(ptr);
|
||||
});
|
||||
}
|
||||
@@ -238,12 +237,12 @@ public:
|
||||
entry& e = ptr.get_entry();
|
||||
try {
|
||||
partition_index_page&& page = f.get0();
|
||||
e.promise().set_value();
|
||||
e.promise()->set_value();
|
||||
e.set_page(std::move(page));
|
||||
_shard_stats.used_bytes += e.size_in_allocator();
|
||||
++_shard_stats.populations;
|
||||
} catch (...) {
|
||||
e.promise().set_exception(std::current_exception());
|
||||
e.promise()->set_exception(std::current_exception());
|
||||
with_allocator(_region.allocator(), [&] {
|
||||
_cache.erase(key);
|
||||
});
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
@@ -56,3 +58,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user