Compare commits
32 Commits
next
...
scylla-4.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8a1cfb6f8 | ||
|
|
fc312b3021 | ||
|
|
7b82aaf939 | ||
|
|
894a4abfae | ||
|
|
4dcf023470 | ||
|
|
283788828e | ||
|
|
730a147ba6 | ||
|
|
9897e83029 | ||
|
|
1a9b64e6f6 | ||
|
|
49fe9e2c8e | ||
|
|
d0580c41ee | ||
|
|
542394c82f | ||
|
|
018ad3f6f4 | ||
|
|
9b8b7efb54 | ||
|
|
1c3e63975f | ||
|
|
11bb03e46d | ||
|
|
810e410c5d | ||
|
|
97f6da0c3e | ||
|
|
c229fe9694 | ||
|
|
ee1ca8ae4d | ||
|
|
6bfd322e3b | ||
|
|
afc18d5070 | ||
|
|
2ec22c2404 | ||
|
|
19da778271 | ||
|
|
cbd4c13ba6 | ||
|
|
338871802d | ||
|
|
8b5b1b8af6 | ||
|
|
ea89eff95d | ||
|
|
96421e7779 | ||
|
|
142336ca53 | ||
|
|
492f12248c | ||
|
|
7eb7a0e5fe |
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=4.6.dev
|
||||
VERSION=4.6.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1017,18 +1017,16 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
_stats.api_operations.update_table++;
|
||||
elogger.trace("Updating table {}", request);
|
||||
|
||||
std::string table_name = get_table_name(request);
|
||||
if (table_name.find(INTERNAL_TABLE_PREFIX) == 0) {
|
||||
schema_ptr tab = get_table(_proxy, request);
|
||||
// the ugly but harmless conversion to string_view here is because
|
||||
// Seastar's sstring is missing a find(std::string_view) :-()
|
||||
if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) {
|
||||
return make_ready_future<request_return_type>(api_error::validation(
|
||||
format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
|
||||
}
|
||||
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
|
||||
tracing::add_table_name(trace_state, keyspace_name, table_name);
|
||||
tracing::add_table_name(trace_state, tab->ks_name(), tab->cf_name());
|
||||
|
||||
auto& db = _proxy.get_db().local();
|
||||
auto& cf = db.find_column_family(keyspace_name, table_name);
|
||||
|
||||
schema_builder builder(cf.schema());
|
||||
schema_builder builder(tab);
|
||||
|
||||
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
|
||||
@@ -593,8 +593,8 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
|
||||
clogger.trace("csm {}: insert dummy at {}", fmt::ptr(this), _lower_bound);
|
||||
auto it = with_allocator(_lsa_manager.region().allocator(), [&] {
|
||||
auto& rows = _snp->version()->partition().mutable_clustered_rows();
|
||||
auto new_entry = current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no);
|
||||
return rows.insert_before(_next_row.get_iterator_in_latest_version(), *new_entry);
|
||||
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no));
|
||||
return rows.insert_before(_next_row.get_iterator_in_latest_version(), std::move(new_entry));
|
||||
});
|
||||
_snp->tracker()->insert(*it);
|
||||
_last_row = partition_snapshot_row_weakref(*_snp, it, true);
|
||||
|
||||
@@ -1634,7 +1634,7 @@ future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader reader, co
|
||||
while (auto mf_opt = co_await reader()) {
|
||||
if (cdata.is_stop_requested()) [[unlikely]] {
|
||||
// Compaction manager will catch this exception and re-schedule the compaction.
|
||||
co_return coroutine::make_exception(compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested));
|
||||
throw compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested);
|
||||
}
|
||||
|
||||
const auto& mf = *mf_opt;
|
||||
|
||||
@@ -995,6 +995,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
|
||||
}
|
||||
|
||||
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
|
||||
paging_state_copy->set_remaining(internal_paging_size);
|
||||
paging_state_copy->set_partition_key(std::move(index_pk));
|
||||
paging_state_copy->set_clustering_key(std::move(index_ck));
|
||||
return std::move(paging_state_copy);
|
||||
|
||||
@@ -428,6 +428,8 @@ private:
|
||||
void abort_recycled_list(std::exception_ptr);
|
||||
void abort_deletion_promise(std::exception_ptr);
|
||||
|
||||
future<> recalculate_footprint();
|
||||
|
||||
future<> rename_file(sstring, sstring) const;
|
||||
size_t max_request_controller_units() const;
|
||||
segment_id_type _ids = 0;
|
||||
@@ -444,6 +446,7 @@ private:
|
||||
seastar::gate _gate;
|
||||
uint64_t _new_counter = 0;
|
||||
std::optional<size_t> _disk_write_alignment;
|
||||
seastar::semaphore _reserve_recalculation_guard;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
@@ -512,6 +515,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
|
||||
uint64_t _file_pos = 0;
|
||||
uint64_t _flush_pos = 0;
|
||||
uint64_t _size_on_disk = 0;
|
||||
uint64_t _waste = 0;
|
||||
|
||||
size_t _alignment;
|
||||
|
||||
@@ -598,7 +602,7 @@ public:
|
||||
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
|
||||
++_segment_manager->totals.segments_destroyed;
|
||||
_segment_manager->totals.active_size_on_disk -= file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
|
||||
_segment_manager->totals.wasted_size_on_disk -= _waste;
|
||||
_segment_manager->add_file_to_delete(_file_name, _desc);
|
||||
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
|
||||
clogger.warn("Segment {} is dirty and is left on disk.", *this);
|
||||
@@ -725,7 +729,8 @@ public:
|
||||
auto s = co_await sync();
|
||||
co_await flush();
|
||||
co_await terminate();
|
||||
_segment_manager->totals.wasted_size_on_disk += (_size_on_disk - file_position());
|
||||
_waste = _size_on_disk - file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk += _waste;
|
||||
co_return s;
|
||||
}
|
||||
future<sseg_ptr> do_flush(uint64_t pos) {
|
||||
@@ -1223,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
|
||||
, _recycled_segments(std::numeric_limits<size_t>::max())
|
||||
, _reserve_replenisher(make_ready_future<>())
|
||||
, _background_sync(make_ready_future<>())
|
||||
, _reserve_recalculation_guard(1)
|
||||
{
|
||||
assert(max_size > 0);
|
||||
assert(max_mutation_size < segment::multi_entry_size_magic);
|
||||
@@ -1248,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
|
||||
}
|
||||
try {
|
||||
gate::holder g(_gate);
|
||||
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
|
||||
if (_reserve_segments.full()) {
|
||||
// can happen if we recalculate
|
||||
continue;
|
||||
}
|
||||
// note: if we were strict with disk size, we would refuse to do this
|
||||
// unless disk footprint is lower than threshold. but we cannot (yet?)
|
||||
// trust that flush logic will absolutely free up an existing
|
||||
@@ -1519,7 +1530,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
|
||||
if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
|
||||
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
|
||||
auto nf = co_await ext->wrap_file(std::move(filename), f, flags);
|
||||
auto nf = co_await ext->wrap_file(filename, f, flags);
|
||||
if (nf) {
|
||||
f = std::move(nf);
|
||||
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
|
||||
@@ -1529,13 +1540,17 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
|
||||
f = make_checked_file(commit_error_handler, std::move(f));
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
commit_error_handler(ep);
|
||||
try {
|
||||
commit_error_handler(std::current_exception());
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
}
|
||||
}
|
||||
if (ep && f) {
|
||||
co_await f.close();
|
||||
}
|
||||
if (ep) {
|
||||
add_file_to_delete(filename, d);
|
||||
co_return coroutine::exception(std::move(ep));
|
||||
}
|
||||
|
||||
@@ -1865,6 +1880,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
||||
|
||||
std::exception_ptr recycle_error;
|
||||
|
||||
size_t num_deleted = 0;
|
||||
bool except = false;
|
||||
while (!files.empty()) {
|
||||
auto filename = std::move(files.back());
|
||||
files.pop_back();
|
||||
@@ -1914,8 +1931,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
||||
}
|
||||
}
|
||||
co_await delete_file(filename);
|
||||
++num_deleted;
|
||||
} catch (...) {
|
||||
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
|
||||
except = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1928,6 +1947,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
||||
if (recycle_error && _recycled_segments.empty()) {
|
||||
abort_recycled_list(recycle_error);
|
||||
}
|
||||
// If recycle failed and turned into a delete, we should fake-wakeup waiters
|
||||
// since we might still have cleaned up disk space.
|
||||
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
|
||||
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
|
||||
}
|
||||
|
||||
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
|
||||
if (except) {
|
||||
co_await recalculate_footprint();
|
||||
}
|
||||
}
|
||||
|
||||
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
|
||||
@@ -1942,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
|
||||
std::exchange(_disk_deletions, {}).set_exception(ep);
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::recalculate_footprint() {
|
||||
try {
|
||||
co_await do_pending_deletes();
|
||||
|
||||
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
|
||||
auto segments_copy = _segments;
|
||||
std::vector<sseg_ptr> reserves;
|
||||
std::vector<sstring> recycles;
|
||||
// this causes haywire things while we steal stuff, but...
|
||||
while (!_reserve_segments.empty()) {
|
||||
reserves.push_back(_reserve_segments.pop());
|
||||
}
|
||||
while (!_recycled_segments.empty()) {
|
||||
recycles.push_back(_recycled_segments.pop());
|
||||
}
|
||||
|
||||
// first, guesstimate sizes
|
||||
uint64_t recycle_size = recycles.size() * max_size;
|
||||
auto old = totals.total_size_on_disk;
|
||||
|
||||
totals.total_size_on_disk = recycle_size;
|
||||
for (auto& s : _segments) {
|
||||
totals.total_size_on_disk += s->_size_on_disk;
|
||||
}
|
||||
for (auto& s : reserves) {
|
||||
totals.total_size_on_disk += s->_size_on_disk;
|
||||
}
|
||||
|
||||
// now we need to adjust the actual sizes of recycled files
|
||||
|
||||
uint64_t actual_recycled_size = 0;
|
||||
|
||||
try {
|
||||
for (auto& filename : recycles) {
|
||||
auto s = co_await seastar::file_size(filename);
|
||||
actual_recycled_size += s;
|
||||
}
|
||||
} catch (...) {
|
||||
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
|
||||
actual_recycled_size = recycle_size; // best we got
|
||||
}
|
||||
|
||||
for (auto&& filename : recycles) {
|
||||
_recycled_segments.push(std::move(filename));
|
||||
}
|
||||
for (auto&& s : reserves) {
|
||||
_reserve_segments.push(std::move(s)); // you can have it back now.
|
||||
}
|
||||
|
||||
totals.total_size_on_disk += actual_recycled_size - recycle_size;
|
||||
// pushing things to reserve/recycled queues will have resumed any
|
||||
// waiters, so we should be done.
|
||||
} catch (...) {
|
||||
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
auto ftc = std::exchange(_files_to_close, {});
|
||||
auto ftd = std::exchange(_files_to_delete, {});
|
||||
|
||||
1
dist/common/scripts/scylla-housekeeping
vendored
1
dist/common/scripts/scylla-housekeeping
vendored
@@ -100,6 +100,7 @@ def version_compare(a, b):
|
||||
def create_uuid_file(fl):
|
||||
with open(args.uuid_file, 'w') as myfile:
|
||||
myfile.write(str(uuid.uuid1()) + "\n")
|
||||
os.fchmod(myfile, 0o644)
|
||||
|
||||
|
||||
def sanitize_version(version):
|
||||
|
||||
60
dist/common/scripts/scylla_io_setup
vendored
60
dist/common/scripts/scylla_io_setup
vendored
@@ -278,6 +278,66 @@ if __name__ == "__main__":
|
||||
disk_properties["read_bandwidth"] = 2527296683 * nr_disks
|
||||
disk_properties["write_iops"] = 156326 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 1063657088 * nr_disks
|
||||
elif idata.instance() == "im4gn.large":
|
||||
disk_properties["read_iops"] = 33943
|
||||
disk_properties["read_bandwidth"] = 288433525
|
||||
disk_properties["write_iops"] = 27877
|
||||
disk_properties["write_bandwidth"] = 126864680
|
||||
elif idata.instance() == "im4gn.xlarge":
|
||||
disk_properties["read_iops"] = 68122
|
||||
disk_properties["read_bandwidth"] = 576603520
|
||||
disk_properties["write_iops"] = 55246
|
||||
disk_properties["write_bandwidth"] = 254534954
|
||||
elif idata.instance() == "im4gn.2xlarge":
|
||||
disk_properties["read_iops"] = 136422
|
||||
disk_properties["read_bandwidth"] = 1152663765
|
||||
disk_properties["write_iops"] = 92184
|
||||
disk_properties["write_bandwidth"] = 508926453
|
||||
elif idata.instance() == "im4gn.4xlarge":
|
||||
disk_properties["read_iops"] = 273050
|
||||
disk_properties["read_bandwidth"] = 1638427264
|
||||
disk_properties["write_iops"] = 92173
|
||||
disk_properties["write_bandwidth"] = 1027966826
|
||||
elif idata.instance() == "im4gn.8xlarge":
|
||||
disk_properties["read_iops"] = 250241 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 1163130709 * nr_disks
|
||||
disk_properties["write_iops"] = 86374 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 977617664 * nr_disks
|
||||
elif idata.instance() == "im4gn.16xlarge":
|
||||
disk_properties["read_iops"] = 273030 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 1638211413 * nr_disks
|
||||
disk_properties["write_iops"] = 92607 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 1028340266 * nr_disks
|
||||
elif idata.instance() == "is4gen.medium":
|
||||
disk_properties["read_iops"] = 33965
|
||||
disk_properties["read_bandwidth"] = 288462506
|
||||
disk_properties["write_iops"] = 27876
|
||||
disk_properties["write_bandwidth"] = 126954200
|
||||
elif idata.instance() == "is4gen.large":
|
||||
disk_properties["read_iops"] = 68131
|
||||
disk_properties["read_bandwidth"] = 576654869
|
||||
disk_properties["write_iops"] = 55257
|
||||
disk_properties["write_bandwidth"] = 254551002
|
||||
elif idata.instance() == "is4gen.xlarge":
|
||||
disk_properties["read_iops"] = 136413
|
||||
disk_properties["read_bandwidth"] = 1152747904
|
||||
disk_properties["write_iops"] = 92180
|
||||
disk_properties["write_bandwidth"] = 508889546
|
||||
elif idata.instance() == "is4gen.2xlarge":
|
||||
disk_properties["read_iops"] = 273038
|
||||
disk_properties["read_bandwidth"] = 1628982613
|
||||
disk_properties["write_iops"] = 92182
|
||||
disk_properties["write_bandwidth"] = 1027983530
|
||||
elif idata.instance() == "is4gen.4xlarge":
|
||||
disk_properties["read_iops"] = 260493 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 1217396928 * nr_disks
|
||||
disk_properties["write_iops"] = 83169 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 1000390784 * nr_disks
|
||||
elif idata.instance() == "is4gen.8xlarge":
|
||||
disk_properties["read_iops"] = 273021 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 1656354602 * nr_disks
|
||||
disk_properties["write_iops"] = 92233 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 1028010325 * nr_disks
|
||||
properties_file = open(etcdir() + "/scylla.d/io_properties.yaml", "w")
|
||||
yaml.dump({ "disks": [ disk_properties ] }, properties_file, default_flow_style=False)
|
||||
ioconf = open(etcdir() + "/scylla.d/io.conf", "w")
|
||||
|
||||
6
dist/common/scripts/scylla_ntp_setup
vendored
6
dist/common/scripts/scylla_ntp_setup
vendored
@@ -66,18 +66,18 @@ if __name__ == '__main__':
|
||||
|
||||
target = None
|
||||
if os.path.exists('/lib/systemd/systemd-timesyncd'):
|
||||
if systemd_unit('systemd-timesyncd').is_active():
|
||||
if systemd_unit('systemd-timesyncd').is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
target = 'systemd-timesyncd'
|
||||
if shutil.which('chronyd'):
|
||||
if get_chrony_unit().is_active():
|
||||
if get_chrony_unit().is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
if not target:
|
||||
target = 'chrony'
|
||||
if shutil.which('ntpd'):
|
||||
if get_ntp_unit().is_active():
|
||||
if get_ntp_unit().is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
if not target:
|
||||
|
||||
21
dist/common/scripts/scylla_raid_setup
vendored
21
dist/common/scripts/scylla_raid_setup
vendored
@@ -30,6 +30,8 @@ import distro
|
||||
from pathlib import Path
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
import distro
|
||||
from pkg_resources import parse_version
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
@@ -117,6 +119,25 @@ if __name__ == '__main__':
|
||||
pkg_install('xfsprogs')
|
||||
if not shutil.which('mdadm'):
|
||||
pkg_install('mdadm')
|
||||
# XXX: Workaround for mdmonitor.service issue on CentOS8
|
||||
if is_redhat_variant() and distro.version() == '8':
|
||||
mdadm_rpm = run('rpm -q mdadm', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
match = re.match(r'^mdadm-([0-9]+\.[0-9]+-[a-zA-Z0-9]+)\.', mdadm_rpm)
|
||||
mdadm_version = match.group(1)
|
||||
if parse_version('4.1-14') < parse_version(mdadm_version):
|
||||
repo_data = '''
|
||||
[BaseOS_8_3_2011]
|
||||
name=CentOS8.3.2011 - Base
|
||||
baseurl=http://vault.centos.org/8.3.2011/BaseOS/$basearch/os/
|
||||
gpgcheck=1
|
||||
enabled=0
|
||||
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-centosofficial
|
||||
'''[1:-1]
|
||||
with open('/etc/yum.repos.d/CentOS-Vault-8.3.repo', 'w') as f:
|
||||
f.write(repo_data)
|
||||
run('dnf downgrade --enablerepo=BaseOS_8_3_2011 -y mdadm', shell=True, check=True)
|
||||
run('dnf install -y python3-dnf-plugin-versionlock', shell=True, check=True)
|
||||
run('dnf versionlock add mdadm', shell=True, check=True)
|
||||
try:
|
||||
md_service = systemd_unit('mdmonitor.service')
|
||||
except SystemdException:
|
||||
|
||||
4
dist/common/scripts/scylla_setup
vendored
4
dist/common/scripts/scylla_setup
vendored
@@ -370,6 +370,10 @@ if __name__ == '__main__':
|
||||
version_check = interactive_ask_service('Do you want to enable Scylla to check if there is a newer version of Scylla available?', 'Yes - start the Scylla-housekeeping service to check for a newer version. This check runs periodically. No - skips this step.', version_check)
|
||||
args.no_version_check = not version_check
|
||||
if version_check:
|
||||
cfg = sysconfig_parser(sysconfdir_p() / 'scylla-housekeeping')
|
||||
repo_files = cfg.get('REPO_FILES')
|
||||
for f in glob.glob(repo_files):
|
||||
os.chmod(f, 0o644)
|
||||
with open('/etc/scylla.d/housekeeping.cfg', 'w') as f:
|
||||
f.write('[housekeeping]\ncheck-version: True\n')
|
||||
os.chmod('/etc/scylla.d/housekeeping.cfg', 0o644)
|
||||
|
||||
6
dist/common/scripts/scylla_util.py
vendored
6
dist/common/scripts/scylla_util.py
vendored
@@ -674,7 +674,7 @@ class aws_instance:
|
||||
return self._type.split(".")[0]
|
||||
|
||||
def is_supported_instance_class(self):
|
||||
if self.instance_class() in ['i2', 'i3', 'i3en', 'c5d', 'm5d', 'm5ad', 'r5d', 'z1d', 'c6gd', 'm6gd', 'r6gd', 'x2gd']:
|
||||
if self.instance_class() in ['i2', 'i3', 'i3en', 'c5d', 'm5d', 'm5ad', 'r5d', 'z1d', 'c6gd', 'm6gd', 'r6gd', 'x2gd', 'im4gn', 'is4gen']:
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -683,7 +683,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5b', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d', 'c6g', 'c6gd', 'm6g', 'm6gd', 't4g', 'r6g', 'r6gd', 'x2gd']:
|
||||
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5b', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d', 'c6g', 'c6gd', 'm6g', 'm6gd', 't4g', 'r6g', 'r6gd', 'x2gd', 'im4gn', 'is4gen']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
@@ -1041,7 +1041,7 @@ class systemd_unit:
|
||||
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
|
||||
|
||||
def is_active(self):
|
||||
return True if run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip() == 'active' else False
|
||||
return run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
|
||||
def mask(self):
|
||||
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)
|
||||
|
||||
6
dist/docker/debian/build_docker.sh
vendored
6
dist/docker/debian/build_docker.sh
vendored
@@ -25,6 +25,10 @@ product="$(<build/SCYLLA-PRODUCT-FILE)"
|
||||
version="$(<build/SCYLLA-VERSION-FILE)"
|
||||
release="$(<build/SCYLLA-RELEASE-FILE)"
|
||||
|
||||
if [[ "$version" = *rc* ]]; then
|
||||
version=$(echo $version |sed 's/\(.*\)\.)*/\1~/')
|
||||
fi
|
||||
|
||||
mode="release"
|
||||
|
||||
if uname -m | grep x86_64 ; then
|
||||
@@ -93,7 +97,7 @@ run apt-get -y install hostname supervisor openssh-server openssh-client openjdk
|
||||
run locale-gen en_US.UTF-8
|
||||
run bash -ec "dpkg -i packages/*.deb"
|
||||
run apt-get -y clean all
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bashrc"
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
run mkdir -p /etc/supervisor.conf.d
|
||||
run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
|
||||
@@ -184,14 +184,18 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
|
||||
_logger.info("exception while advertising new connection: {}", std::current_exception());
|
||||
}
|
||||
// Block while monitoring for lifetime/errors.
|
||||
return conn->process().finally([this, conn] {
|
||||
return unadvertise_connection(conn);
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
if (is_broken_pipe_or_connection_reset(ep)) {
|
||||
// expected if another side closes a connection or we're shutting down
|
||||
return;
|
||||
return conn->process().then_wrapped([this, conn] (auto f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
auto ep = std::current_exception();
|
||||
if (!is_broken_pipe_or_connection_reset(ep)) {
|
||||
// some exceptions are expected if another side closes a connection
|
||||
// or we're shutting down
|
||||
_logger.info("exception while processing connection: {}", ep);
|
||||
}
|
||||
}
|
||||
_logger.info("exception while processing connection: {}", ep);
|
||||
return unadvertise_connection(conn);
|
||||
});
|
||||
});
|
||||
return stop_iteration::no;
|
||||
|
||||
@@ -477,49 +477,42 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
|
||||
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
|
||||
}
|
||||
|
||||
rpc::no_wait_type gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
|
||||
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
|
||||
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
|
||||
logger.warn("Failed to handle {}: {}", type, ep);
|
||||
});
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
}
|
||||
|
||||
void gossiper::init_messaging_service_handler() {
|
||||
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_syn_msg(from, std::move(syn_msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_ack_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_ack2_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return handle_echo_msg(from, generation_number_opt);
|
||||
});
|
||||
_messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
|
||||
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
|
||||
return gossiper.handle_shutdown_msg(from, generation_number_opt);
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
|
||||
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
|
||||
@@ -2178,6 +2171,9 @@ future<> gossiper::start() {
|
||||
}
|
||||
|
||||
future<> gossiper::shutdown() {
|
||||
if (!_background_msg.is_closed()) {
|
||||
co_await _background_msg.close();
|
||||
}
|
||||
if (this_shard_id() == 0) {
|
||||
co_await do_stop_gossiping();
|
||||
}
|
||||
|
||||
@@ -41,7 +41,9 @@
|
||||
#include "unimplemented.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/print.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include "utils/atomic_vector.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -138,12 +140,16 @@ private:
|
||||
bool _enabled = false;
|
||||
semaphore _callback_running{1};
|
||||
semaphore _apply_state_locally_semaphore{100};
|
||||
seastar::gate _background_msg;
|
||||
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
|
||||
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
|
||||
bool _advertise_myself = true;
|
||||
// Map ip address and generation number
|
||||
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
|
||||
future<> _failure_detector_loop_done{make_ready_future<>()} ;
|
||||
|
||||
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
|
||||
|
||||
public:
|
||||
// Get current generation number for the given nodes
|
||||
future<std::unordered_map<gms::inet_address, int32_t>>
|
||||
|
||||
@@ -613,7 +613,8 @@ static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
|
||||
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
|
||||
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
} else {
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
schema_ptr snp_schema = snp->schema();
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
|
||||
auto rd = std::move(rd_ref);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
while (!rd.is_end_of_stream()) {
|
||||
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
|
||||
co_await rd.fill_buffer();
|
||||
while (!rd.is_buffer_empty()) {
|
||||
co_await rd.pop_mutation_fragment().consume(wr);
|
||||
|
||||
@@ -411,11 +411,11 @@ public:
|
||||
} else {
|
||||
// Copy row from older version because rows in evictable versions must
|
||||
// hold values which are independently complete to be consistent on eviction.
|
||||
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, *_current_row[0].it));
|
||||
e->set_continuous(latest_i && latest_i->continuous());
|
||||
_snp.tracker()->insert(*e);
|
||||
rows.insert_before(latest_i, *e);
|
||||
return {*e, true};
|
||||
auto e_i = rows.insert_before(latest_i, std::move(e));
|
||||
return ensure_result{*e_i, true};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,11 +447,11 @@ public:
|
||||
}
|
||||
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
|
||||
auto latest_i = get_iterator_in_latest_version();
|
||||
auto e = current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
|
||||
is_continuous(latest_i && latest_i->continuous()));
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
|
||||
is_continuous(latest_i && latest_i->continuous())));
|
||||
_snp.tracker()->insert(*e);
|
||||
rows.insert_before(latest_i, *e);
|
||||
return ensure_result{*e, true};
|
||||
auto e_i = rows.insert_before(latest_i, std::move(e));
|
||||
return ensure_result{*e_i, true};
|
||||
}
|
||||
|
||||
// Brings the entry pointed to by the cursor to the front of the LRU
|
||||
|
||||
@@ -3670,7 +3670,7 @@ shared_ptr<abort_source> node_ops_meta_data::get_abort_source() {
|
||||
|
||||
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3680,7 +3680,7 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
|
||||
void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3691,7 +3691,7 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
|
||||
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
|
||||
@@ -49,12 +49,13 @@ private:
|
||||
public:
|
||||
partition_index_cache* _parent;
|
||||
key_type _key;
|
||||
std::variant<shared_promise<>, partition_index_page> _page;
|
||||
std::variant<lw_shared_ptr<shared_promise<>>, partition_index_page> _page;
|
||||
size_t _size_in_allocator = 0;
|
||||
public:
|
||||
entry(partition_index_cache* parent, key_type key)
|
||||
: _parent(parent)
|
||||
, _key(key)
|
||||
, _page(make_lw_shared<shared_promise<>>())
|
||||
{ }
|
||||
|
||||
void set_page(partition_index_page&& page) noexcept {
|
||||
@@ -76,7 +77,7 @@ private:
|
||||
// Always returns the same value for a given state of _page.
|
||||
size_t size_in_allocator() const { return _size_in_allocator; }
|
||||
|
||||
shared_promise<>& promise() { return std::get<shared_promise<>>(_page); }
|
||||
lw_shared_ptr<shared_promise<>> promise() { return std::get<lw_shared_ptr<shared_promise<>>>(_page); }
|
||||
bool ready() const { return std::holds_alternative<partition_index_page>(_page); }
|
||||
partition_index_page& page() { return std::get<partition_index_page>(_page); }
|
||||
const partition_index_page& page() const { return std::get<partition_index_page>(_page); }
|
||||
@@ -207,9 +208,7 @@ public:
|
||||
return make_ready_future<entry_ptr>(std::move(ptr));
|
||||
} else {
|
||||
++_shard_stats.blocks;
|
||||
return _as(_region, [ptr] () mutable {
|
||||
return ptr.get_entry().promise().get_shared_future();
|
||||
}).then([ptr] () mutable {
|
||||
return ptr.get_entry().promise()->get_shared_future().then([ptr] () mutable {
|
||||
return std::move(ptr);
|
||||
});
|
||||
}
|
||||
@@ -238,12 +237,12 @@ public:
|
||||
entry& e = ptr.get_entry();
|
||||
try {
|
||||
partition_index_page&& page = f.get0();
|
||||
e.promise().set_value();
|
||||
e.promise()->set_value();
|
||||
e.set_page(std::move(page));
|
||||
_shard_stats.used_bytes += e.size_in_allocator();
|
||||
++_shard_stats.populations;
|
||||
} catch (...) {
|
||||
e.promise().set_exception(std::current_exception());
|
||||
e.promise()->set_exception(std::current_exception());
|
||||
with_allocator(_region.allocator(), [&] {
|
||||
_cache.erase(key);
|
||||
});
|
||||
|
||||
43
test.py
43
test.py
@@ -291,6 +291,8 @@ class Test:
|
||||
def print_summary(self):
|
||||
pass
|
||||
|
||||
def get_junit_etree(self):
|
||||
return None
|
||||
|
||||
def check_log(self, trim):
|
||||
"""Check and trim logs and xml output for tests which have it"""
|
||||
@@ -338,9 +340,36 @@ class BoostTest(UnitTest):
|
||||
boost_args += ['--color_output=false']
|
||||
boost_args += ['--']
|
||||
self.args = boost_args + self.args
|
||||
self.casename = casename
|
||||
self.__junit_etree = None
|
||||
|
||||
def get_junit_etree(self):
|
||||
def adjust_suite_name(name):
|
||||
# Normalize "path/to/file.cc" to "path.to.file" to conform to
|
||||
# Jenkins expectations that the suite name is a class name. ".cc"
|
||||
# doesn't add any infomation. Add the mode, otherwise failures
|
||||
# in different modes are indistinguishable. The "test/" prefix adds
|
||||
# no information, so remove it.
|
||||
import re
|
||||
name = re.sub(r'^test/', '', name)
|
||||
name = re.sub(r'\.cc$', '', name)
|
||||
name = re.sub(r'/', '.', name)
|
||||
name = f'{name}.{self.mode}'
|
||||
return name
|
||||
if self.__junit_etree is None:
|
||||
self.__junit_etree = ET.parse(self.xmlout)
|
||||
root = self.__junit_etree.getroot()
|
||||
suites = root.findall('.//TestSuite')
|
||||
for suite in suites:
|
||||
suite.attrib['name'] = adjust_suite_name(suite.attrib['name'])
|
||||
skipped = suite.findall('./TestCase[@reason="disabled"]')
|
||||
for e in skipped:
|
||||
suite.remove(e)
|
||||
os.unlink(self.xmlout)
|
||||
return self.__junit_etree
|
||||
|
||||
def check_log(self, trim):
|
||||
ET.parse(self.xmlout)
|
||||
self.get_junit_etree()
|
||||
super().check_log(trim)
|
||||
|
||||
|
||||
@@ -800,6 +829,17 @@ def write_junit_report(tmpdir, mode):
|
||||
with open(junit_filename, "w") as f:
|
||||
ET.ElementTree(xml_results).write(f, encoding="unicode")
|
||||
|
||||
def write_consolidated_boost_junit_xml(tmpdir, mode):
|
||||
xml = ET.Element("TestLog")
|
||||
for suite in TestSuite.suites.values():
|
||||
for test in suite.tests:
|
||||
if test.mode != mode:
|
||||
continue
|
||||
test_xml = test.get_junit_etree()
|
||||
if test_xml is not None:
|
||||
xml.extend(test_xml.getroot().findall('.//TestSuite'))
|
||||
et = ET.ElementTree(xml)
|
||||
et.write(f'{tmpdir}/{mode}/xml/boost.xunit.xml', encoding='unicode')
|
||||
|
||||
def open_log(tmpdir):
|
||||
pathlib.Path(tmpdir).mkdir(parents=True, exist_ok=True)
|
||||
@@ -839,6 +879,7 @@ async def main():
|
||||
|
||||
for mode in options.modes:
|
||||
write_junit_report(options.tmpdir, mode)
|
||||
write_consolidated_boost_junit_xml(options.tmpdir, mode)
|
||||
|
||||
if 'coverage' in options.modes:
|
||||
coverage.generate_coverage_report("build/coverage", "tests")
|
||||
|
||||
@@ -16,6 +16,9 @@
|
||||
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Tests for basic table operations: CreateTable, DeleteTable, ListTables.
|
||||
# Also some basic tests for UpdateTable - although UpdateTable usually
|
||||
# enables more elaborate features (such as GSI or Streams) and those are
|
||||
# tested elsewhere.
|
||||
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
@@ -311,3 +314,17 @@ def test_table_sse_off(dynamodb):
|
||||
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
|
||||
table.delete();
|
||||
|
||||
# Test that trying to delete a table that doesn't exist fails in the
|
||||
# appropriate way (ResourceNotFoundException)
|
||||
def test_delete_table_non_existent(dynamodb, test_table):
|
||||
client = dynamodb.meta.client
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
||||
client.delete_table(TableName=random_string(20))
|
||||
|
||||
# Test that trying to update a table that doesn't exist fails in the
|
||||
# appropriate way (ResourceNotFoundException)
|
||||
def test_update_table_non_existent(dynamodb, test_table):
|
||||
client = dynamodb.meta.client
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
||||
client.update_table(TableName=random_string(20), BillingMode='PAY_PER_REQUEST')
|
||||
|
||||
@@ -44,7 +44,9 @@
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/commitlog/commitlog_extensions.hh"
|
||||
#include "db/commitlog/rp_set.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "log.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "test/lib/exception_utils.hh"
|
||||
@@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) {
|
||||
co_await log.clear();
|
||||
}
|
||||
}
|
||||
|
||||
static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) {
|
||||
commitlog::config cfg;
|
||||
|
||||
constexpr auto max_size_mb = 1;
|
||||
|
||||
cfg.commitlog_segment_size_in_mb = max_size_mb;
|
||||
cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count;
|
||||
cfg.commitlog_sync_period_in_ms = 10;
|
||||
cfg.reuse_segments = reuse;
|
||||
cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always
|
||||
cfg.use_o_dsync = true; // make sure we pre-allocate.
|
||||
|
||||
// not using cl_test, because we need to be able to abandon
|
||||
// the log.
|
||||
|
||||
tmpdir tmp;
|
||||
cfg.commit_log_location = tmp.path().string();
|
||||
|
||||
class myfail : public std::exception {
|
||||
public:
|
||||
using std::exception::exception;
|
||||
};
|
||||
|
||||
struct myext: public db::commitlog_file_extension {
|
||||
public:
|
||||
bool fail = false;
|
||||
bool thrown = false;
|
||||
bool do_file_delete;
|
||||
|
||||
myext(bool dd)
|
||||
: do_file_delete(dd)
|
||||
{}
|
||||
|
||||
seastar::future<seastar::file> wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override {
|
||||
if (fail && !thrown) {
|
||||
thrown = true;
|
||||
if (do_file_delete) {
|
||||
co_await f.close();
|
||||
co_await seastar::remove_file(filename);
|
||||
}
|
||||
throw myfail{};
|
||||
}
|
||||
co_return f;
|
||||
}
|
||||
seastar::future<> before_delete(const seastar::sstring&) override {
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
auto ep = std::make_unique<myext>(do_file_delete);
|
||||
auto& mx = *ep;
|
||||
|
||||
db::extensions myexts;
|
||||
myexts.add_commitlog_file_extension("hufflepuff", std::move(ep));
|
||||
|
||||
cfg.extensions = &myexts;
|
||||
|
||||
auto log = co_await commitlog::create_commitlog(cfg);
|
||||
|
||||
rp_set rps;
|
||||
// uncomment for verbosity
|
||||
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
|
||||
|
||||
auto uuid = utils::UUID_gen::get_time_UUID();
|
||||
auto size = log.max_record_size();
|
||||
|
||||
auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) {
|
||||
log.discard_completed_segments(id, rps);
|
||||
mx.fail = true;
|
||||
});
|
||||
|
||||
try {
|
||||
while (!mx.thrown) {
|
||||
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
|
||||
dst.fill('1', size);
|
||||
});
|
||||
rps.put(std::move(h));
|
||||
}
|
||||
} catch (...) {
|
||||
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
|
||||
}
|
||||
|
||||
co_await log.shutdown();
|
||||
co_await log.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test generating an exception in segment file allocation
|
||||
*/
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) {
|
||||
co_await do_test_exception_in_allocate_ex(false);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) {
|
||||
co_await do_test_exception_in_allocate_ex(false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test generating an exception in segment file allocation, but also
|
||||
* delete the file, which in turn should cause follow-up exceptions
|
||||
* in cleanup delete. Which CL should handle
|
||||
*/
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) {
|
||||
co_await do_test_exception_in_allocate_ex(true, false);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) {
|
||||
co_await do_test_exception_in_allocate_ex(true);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
@@ -56,3 +58,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1395,6 +1395,8 @@ private:
|
||||
}
|
||||
|
||||
lsa_buffer alloc_buf(size_t buf_size) {
|
||||
// Note: Can be re-entered from allocation sites below due to memory reclamation which
|
||||
// invokes segment compaction.
|
||||
static_assert(segment::size % buf_align == 0);
|
||||
if (buf_size > segment::size) {
|
||||
throw_with_backtrace<std::runtime_error>(format("Buffer size {} too large", buf_size));
|
||||
@@ -1447,6 +1449,7 @@ private:
|
||||
|
||||
if (seg != _buf_active) {
|
||||
if (desc.is_empty()) {
|
||||
assert(desc._buf_pointers.empty());
|
||||
_segment_descs.erase(desc);
|
||||
desc._buf_pointers = std::vector<entangled>();
|
||||
free_segment(seg, desc);
|
||||
@@ -1457,7 +1460,7 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void compact_segment_locked(segment* seg, segment_descriptor& desc) {
|
||||
void compact_segment_locked(segment* seg, segment_descriptor& desc) noexcept {
|
||||
auto seg_occupancy = desc.occupancy();
|
||||
llogger.debug("Compacting segment {} from region {}, {}", fmt::ptr(seg), id(), seg_occupancy);
|
||||
|
||||
@@ -1472,6 +1475,7 @@ private:
|
||||
for (entangled& e : _buf_ptrs_for_compact_segment) {
|
||||
if (e) {
|
||||
lsa_buffer* old_ptr = e.get(&lsa_buffer::_link);
|
||||
assert(&desc == old_ptr->_desc);
|
||||
lsa_buffer dst = alloc_buf(old_ptr->_size);
|
||||
memcpy(dst._buf, old_ptr->_buf, dst._size);
|
||||
old_ptr->_link = std::move(dst._link);
|
||||
@@ -1502,6 +1506,10 @@ private:
|
||||
std::vector<entangled> ptrs;
|
||||
ptrs.reserve(segment::size / buf_align);
|
||||
segment* new_active = new_segment();
|
||||
if (_buf_active) [[unlikely]] {
|
||||
// Memory allocation above could allocate active buffer during segment compaction.
|
||||
close_buf_active();
|
||||
}
|
||||
assert((uintptr_t)new_active->at(0) % buf_align == 0);
|
||||
segment_descriptor& desc = shard_segment_pool.descriptor(new_active);
|
||||
desc._buf_pointers = std::move(ptrs);
|
||||
|
||||
Reference in New Issue
Block a user