Files
scylladb/dist/common/scripts/scylla_util.py
Takuya ASADA eb30594a60 dist: detect corrupted NUMA topology information
There are some environment which has corrupted NUMA topology
information, such as some instance types on AWS EC2 with specific Linux
kernel images.
On such environment, we cannot get HW information correctly from hwloc,
so we cannot proceed optimization on perftune.
To avoid causing script error, check NUMA topology information and skip
running perftune if the information corrupted.

Related scylladb/seastar#2925

Closes scylladb/scylladb#26344
2025-10-22 01:11:14 +03:00

522 lines
16 KiB
Python

# Copyright (C) 2017-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import configparser
import glob
import io
import os
import re
import shlex
import shutil
import subprocess
import yaml
import sys
import time
from pathlib import Path, PurePath
from subprocess import run, DEVNULL, PIPE, CalledProcessError
from datetime import datetime, timedelta
import distro
from scylla_sysconfdir import SYSCONFDIR
from scylla_product import PRODUCT
from multiprocessing import cpu_count
import traceback
import traceback_with_variables
import logging
def scylla_excepthook(etype, value, tb):
os.makedirs('/var/tmp/scylla', mode=0o755, exist_ok=True)
traceback.print_exception(etype, value, tb)
exc_logger = logging.getLogger(__name__)
exc_logger.setLevel(logging.DEBUG)
exc_logger_file = f'/var/tmp/scylla/{os.path.basename(sys.argv[0])}-{os.getpid()}-debug.log'
exc_logger.addHandler(logging.FileHandler(exc_logger_file))
traceback_with_variables.print_exc(e=value, file_=traceback_with_variables.LoggerAsFile(exc_logger))
print(f'Debug log created: {exc_logger_file}')
sys.excepthook = scylla_excepthook
def out(cmd, shell=True, timeout=None, encoding='utf-8', ignore_error=False, user=None, group=None):
res = subprocess.run(cmd, capture_output=True, shell=shell, timeout=timeout, check=False, encoding=encoding, user=user, group=group)
if not ignore_error and res.returncode != 0:
print(f'Command \'{cmd}\' returned non-zero exit status: {res.returncode}')
print('---------- stdout ----------')
print(res.stdout, end='')
print('------------------------------')
print('---------- stderr ----------')
print(res.stderr, end='')
print('------------------------------')
res.check_returncode()
return res.stdout.strip()
def scriptsdir_p():
p = Path(sys.argv[0]).resolve()
if p.parent.name == 'libexec':
return p.parents[1]
return p.parent
def scylladir_p():
p = scriptsdir_p()
return p.parent
def is_nonroot():
return Path(scylladir_p() / 'SCYLLA-NONROOT-FILE').exists()
def is_offline():
return Path(scylladir_p() / 'SCYLLA-OFFLINE-FILE').exists()
def is_container():
return Path(scylladir_p() / 'SCYLLA-CONTAINER-FILE').exists()
def bindir_p():
if is_nonroot():
return scylladir_p() / 'bin'
else:
return Path('/usr/bin')
def etcdir_p():
if is_nonroot():
return scylladir_p() / 'etc'
else:
return Path('/etc')
def datadir_p():
if is_nonroot():
return scylladir_p()
else:
return Path('/var/lib/scylla')
def scyllabindir_p():
return scylladir_p() / 'bin'
def sysconfdir_p():
return Path(SYSCONFDIR)
def scriptsdir():
return str(scriptsdir_p())
def scylladir():
return str(scylladir_p())
def bindir():
return str(bindir_p())
def etcdir():
return str(etcdir_p())
def datadir():
return str(datadir_p())
def scyllabindir():
return str(scyllabindir_p())
def sysconfdir():
return str(sysconfdir_p())
def get_id_like():
like = distro.like()
if not like:
return None
return like.split(' ')
def is_debian_variant():
d = get_id_like() if get_id_like() else distro.id()
return ('debian' in d)
def is_redhat_variant():
d = get_id_like() if get_id_like() else distro.id()
return ('rhel' in d) or ('fedora' in d) or ('oracle') in d
def is_gentoo():
return ('gentoo' in distro.id())
def is_arch():
return ('arch' in distro.id())
def is_amzn2():
return ('amzn' in distro.id()) and ('2' in distro.version())
def is_suse_variant():
d = get_id_like() if get_id_like() else distro.id()
return ('suse' in d)
def is_developer_mode():
# non-advancing comment matcher
_nocomment = r"^\s*(?!#)"
# non-capturing grouping
_scyllaeq = r"(?:\s*|=)"
f = open(etcdir() + "/scylla.d/dev-mode.conf", "r")
pattern = re.compile(_nocomment + r".*developer-mode" + _scyllaeq + "(1|true)")
return len([x for x in f if pattern.match(x)]) >= 1
def get_text_from_path(fpath):
board_vendor_path = Path(fpath)
if board_vendor_path.exists():
return board_vendor_path.read_text().strip()
return ""
def match_patterns_in_files(list_of_patterns_files):
for pattern, fpath in list_of_patterns_files:
if re.match(pattern, get_text_from_path(fpath), flags=re.IGNORECASE):
return True
return False
def hex2list(hex_str):
hex_str2 = hex_str.replace("0x", "").replace(",", "")
hex_int = int(hex_str2, 16)
bin_str = "{0:b}".format(hex_int)
bin_len = len(bin_str)
cpu_list = []
i = 0
while i < bin_len:
if 1 << i & hex_int:
j = i
while j + 1 < bin_len and 1 << j + 1 & hex_int:
j += 1
if j == i:
cpu_list.append(str(i))
else:
cpu_list.append("{0}-{1}".format(i, j))
i = j
i += 1
return ",".join(cpu_list)
SYSTEM_PARTITION_UUIDS = [
'21686148-6449-6e6f-744e-656564454649', # BIOS boot partition
'c12a7328-f81f-11d2-ba4b-00a0c93ec93b', # EFI system partition
'024dee41-33e7-11d3-9d69-0008c781f39f' # MBR partition scheme
]
def get_partition_uuid(dev):
return out(f'lsblk -n -oPARTTYPE {dev}')
def is_system_partition(dev):
uuid = get_partition_uuid(dev)
return (uuid in SYSTEM_PARTITION_UUIDS)
def is_unused_disk(dev):
# resolve symlink to real path
dev = os.path.realpath(dev)
# dev is not in /sys/class/block/, like /dev/nvme[0-9]+
if not os.path.isdir('/sys/class/block/{dev}'.format(dev=dev.replace('/dev/', ''))):
return False
try:
fd = os.open(dev, os.O_EXCL)
os.close(fd)
# dev is not reserved for system
return not is_system_partition(dev)
except OSError:
return False
CONCOLORS = {'green': '\033[1;32m', 'red': '\033[1;31m', 'nocolor': '\033[0m'}
def colorprint(msg, **kwargs):
fmt = dict(CONCOLORS)
fmt.update(kwargs)
print(msg.format(**fmt))
def parse_scylla_dirs_with_default(conf='/etc/scylla/scylla.yaml'):
y = yaml.safe_load(open(conf))
if 'workdir' not in y or not y['workdir']:
y['workdir'] = datadir()
if 'data_file_directories' not in y or \
not y['data_file_directories'] or \
not len(y['data_file_directories']) or \
not " ".join(y['data_file_directories']).strip():
y['data_file_directories'] = [os.path.join(y['workdir'], 'data')]
for t in [ "commitlog", "schema_commitlog", "hints", "view_hints", "saved_caches" ]:
key = "%s_directory" % t
if key not in y or not y[key]:
y[key] = os.path.join(y['workdir'], t)
return y
def get_scylla_dirs():
"""
Returns a list of scylla directories configured in /etc/scylla/scylla.yaml.
Verifies that mandatory parameters are set.
"""
y = parse_scylla_dirs_with_default()
dirs = []
dirs.extend(y['data_file_directories'])
dirs.append(y['commitlog_directory'])
dirs.append(y['schema_commitlog_directory'])
if 'hints_directory' in y and y['hints_directory']:
dirs.append(y['hints_directory'])
if 'view_hints_directory' in y and y['view_hints_directory']:
dirs.append(y['view_hints_directory'])
return [d for d in dirs if d is not None]
def perftune_base_command():
disk_tune_param = "--tune disks " + " ".join("--dir {}".format(d) for d in get_scylla_dirs())
return '/opt/scylladb/scripts/perftune.py {}'.format(disk_tune_param)
def is_valid_nic(nic):
if len(nic) == 0:
return False
return os.path.exists('/sys/class/net/{}'.format(nic))
# Remove this when we do not support SET_NIC configuration value anymore
def get_set_nic_and_disks_config_value(cfg):
"""
Get the SET_NIC_AND_DISKS configuration value.
Return the SET_NIC configuration value if SET_NIC_AND_DISKS is not found (old releases case).
:param cfg: sysconfig_parser object
:return configuration value
:except If the configuration value is not found
"""
# Sanity check
if cfg.has_option('SET_NIC_AND_DISKS') and cfg.has_option('SET_NIC'):
raise Exception("Only one of 'SET_NIC_AND_DISKS' and 'SET_NIC' is allowed to be present")
try:
return cfg.get('SET_NIC_AND_DISKS')
except Exception:
# For backwards compatibility
return cfg.get('SET_NIC')
def swap_exists():
swaps = out('swapon --noheadings --raw')
return True if swaps != '' else False
def check_sysfs_numa_topology_is_valid():
# Verify that the sysfs entry exists correctly, same as the checks
# performed by hwloc code (check_sysfs_cpu_path() on topology-linux.c)
if os.path.isdir("/sys/devices/system/cpu"):
if os.path.exists("/sys/devices/system/cpu/cpu0/topology/package_cpus") or os.path.exists("/sys/devices/system/cpu/cpu0/topology/core_cpus"):
return True
if os.path.exists("/sys/devices/system/cpu/cpu0/topology/core_siblings") or os.path.exists("/sys/devices/system/cpu/cpu0/topology/thread_siblings"):
return True
return False
def pkg_error_exit(pkg, offline_exit=True):
print(f'Package "{pkg}" required.')
if offline_exit:
sys.exit(1)
def yum_install(pkg, offline_exit=True):
if is_offline():
pkg_error_exit(pkg, offline_exit)
return run(f'yum install -y {pkg}', shell=True, check=True)
def apt_is_updated():
if os.path.exists('/var/lib/apt/periodic/update-success-stamp'):
cache_mtime = os.stat('/var/lib/apt/periodic/update-success-stamp').st_mtime
elif os.path.exists('/var/lib/apt/lists'):
cache_mtime = os.stat('/var/lib/apt/lists').st_mtime
else:
return False
return datetime.now() - datetime.fromtimestamp(cache_mtime) <= timedelta(days=1)
APT_GET_UPDATE_NUM_RETRY = 30
APT_GET_UPDATE_RETRY_INTERVAL = 10
def apt_install(pkg, offline_exit=True):
if is_offline():
pkg_error_exit(pkg, offline_exit)
# The lock for update and install/remove are different, and
# DPkg::Lock::Timeout will only wait for install/remove lock.
# So we need to manually retry apt-get update.
for i in range(APT_GET_UPDATE_NUM_RETRY):
if apt_is_updated():
break
try:
res = run('apt-get update', shell=True, check=True, stderr=PIPE, encoding='utf-8')
break
except CalledProcessError as e:
print(e.stderr, end='')
# if error is "Could not get lock", wait a while and retry
match = re.match('^E: Could not get lock ', e.stderr, re.MULTILINE)
if match:
print('Sleep 10 seconds to wait for apt lock...')
time.sleep(APT_GET_UPDATE_RETRY_INTERVAL)
# if this is last time to retry, re-raise exception
if i == APT_GET_UPDATE_NUM_RETRY - 1:
raise
# if error is not "Could not get lock", re-raise Exception
else:
raise
apt_env = os.environ.copy()
apt_env['DEBIAN_FRONTEND'] = 'noninteractive'
return run(f'apt-get -o DPkg::Lock::Timeout=300 install -y {pkg}', shell=True, check=True, env=apt_env)
def emerge_install(pkg, offline_exit=True):
if is_offline():
pkg_error_exit(pkg, offline_exit)
return run(f'emerge -uq {pkg}', shell=True, check=True)
def zypper_install(pkg, offline_exit=True):
if is_offline():
pkg_error_exit(pkg, offline_exit)
return run(f'zypper install -y {pkg}', shell=True, check=True)
def pkg_distro():
if is_debian_variant():
return 'debian'
if is_suse_variant():
return 'suse'
elif is_amzn2():
return 'amzn2'
else:
return distro.id()
pkg_xlat = {'cpupowerutils': {'debian': 'linux-cpupower', 'gentoo':'sys-power/cpupower', 'arch':'cpupower', 'suse': 'cpupower'},
'policycoreutils-python-utils': {'amzn2': 'policycoreutils-python'}}
def pkg_install(pkg, offline_exit=True):
if pkg in pkg_xlat and pkg_distro() in pkg_xlat[pkg]:
pkg = pkg_xlat[pkg][pkg_distro()]
if is_redhat_variant():
return yum_install(pkg, offline_exit)
elif is_debian_variant():
return apt_install(pkg, offline_exit)
elif is_gentoo():
return emerge_install(pkg, offline_exit)
elif is_suse_variant():
return zypper_install(pkg, offline_exit)
else:
pkg_error_exit(pkg)
def yum_uninstall(pkg):
return run(f'yum remove -y {pkg}', shell=True, check=True)
def apt_uninstall(pkg):
apt_env = os.environ.copy()
apt_env['DEBIAN_FRONTEND'] = 'noninteractive'
return run(f'apt-get -o DPkg::Lock::Timeout=300 remove -y {pkg}', shell=True, check=True, env=apt_env)
def emerge_uninstall(pkg):
return run(f'emerge --deselect {pkg}', shell=True, check=True)
def pkg_uninstall(pkg):
if is_redhat_variant():
return yum_uninstall(pkg)
elif is_debian_variant():
return apt_uninstall(pkg)
elif is_gentoo():
return emerge_uninstall(pkg)
else:
print(f'WARNING: Package "{pkg}" should be removed.')
class SystemdException(Exception):
pass
class systemd_unit:
def __init__(self, unit):
if is_nonroot():
self.ctlparam = '--user'
else:
self.ctlparam = ''
try:
run('systemctl {} cat {}'.format(self.ctlparam, unit), shell=True, check=True, stdout=DEVNULL, stderr=DEVNULL)
except subprocess.CalledProcessError:
raise SystemdException('unit {} is not found or invalid'.format(unit))
self._unit = unit
def __str__(self):
return self._unit
def start(self):
return run('systemctl {} start {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def stop(self):
return run('systemctl {} stop {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def restart(self):
return run('systemctl {} restart {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def enable(self):
return run('systemctl {} enable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def disable(self):
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def is_active(self):
return out('systemctl {} is-active {}'.format(self.ctlparam, self._unit), ignore_error=True)
def mask(self):
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def unmask(self):
return run('systemctl {} unmask {}'.format(self.ctlparam, self._unit), shell=True, check=True)
@classmethod
def reload(cls):
run('systemctl daemon-reload', shell=True, check=True)
@classmethod
def available(cls, unit):
res = run('systemctl cat {}'.format(unit), shell=True, check=False, stdout=DEVNULL, stderr=DEVNULL)
return res.returncode == 0
class sysconfig_parser:
def __load(self):
f = io.StringIO('[global]\n{}'.format(self._data))
self._cfg = configparser.ConfigParser()
self._cfg.optionxform = str
self._cfg.read_file(f)
def __escape(self, val):
return re.sub(r'"', r'\"', val)
def __unescape(self, val):
return re.sub(r'\\"', r'"', val)
def __format_line(self, key, val):
need_quotes = any([ch.isspace() for ch in val])
esc_val = self.__escape(val)
return f'{key}="{esc_val}"' if need_quotes else f'{key}={esc_val}'
def __add(self, key, val):
self._data += self.__format_line(key, val) + '\n'
self.__load()
def __init__(self, filename):
if isinstance(filename, PurePath):
self._filename = str(filename)
else:
self._filename = filename
if not os.path.exists(filename):
open(filename, 'a').close()
with open(filename) as f:
self._data = f.read()
self.__load()
def get(self, key):
val = self._cfg.get('global', key).strip('"')
return self.__unescape(val)
def has_option(self, key):
return self._cfg.has_option('global', key)
def set(self, key, val):
if not self.has_option(key):
return self.__add(key, val)
new_line = self.__format_line(key, val)
self._data = re.sub(f'^{key}=[^\n]*$', new_line, self._data, flags=re.MULTILINE)
self.__load()
def commit(self):
with open(self._filename, 'w') as f:
f.write(self._data)