Merge 'pgo: enable tablets for SI and LWT' from Michael Litvak

PGO training for secondary indexes and LWT was configured with tablets
disabled because it wasn't supported at the time. This is no longer the
case, so we should remove the restrictions and enable the training with
the default mode.

To make this work we also need to fix the training cluster to be RF-rack-valid,
because some workloads have RF=3 but the cluster has 3 nodes in a single rack.
We change the script to create a 3-rack cluster by writing a separate rackdc file
for each node.

no backport needed - small build improvement

Closes scylladb/scylladb#30002

* github.com:scylladb/scylladb:
  pgo: enable train with tablets for SI and LWT
  pgo: make training cluster RF-rack-valid
This commit is contained in:
Avi Kivity
2026-05-24 22:15:23 +03:00
4 changed files with 21 additions and 7 deletions

View File

@@ -1,2 +0,0 @@
dc=dc1
rack=rack1

View File

@@ -2,9 +2,8 @@
keyspace: ks
# The CQL for creating a keyspace (optional if it already exists)
# FIXME: use tablets after https://github.com/scylladb/scylladb/issues/18068 is done.
keyspace_definition: |
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND TABLETS = {'enabled': false};
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};
# Table name
table: targettable

View File

@@ -1,9 +1,8 @@
keyspace: sec_index
# FIXME: use tablets after https://github.com/scylladb/scylladb/issues/22677 is done.
keyspace_definition: |
CREATE KEYSPACE IF NOT EXISTS sec_index WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND TABLETS = {'enabled': false};
CREATE KEYSPACE IF NOT EXISTS sec_index WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};
table: users

View File

@@ -354,6 +354,22 @@ async def validate_addrs_unused(addresses: list[str]) -> None:
diagnostics = f"Command: {shlex.join(ss_command)}\nOutput (expected empty):\n{ss_output.decode()}"
raise AddressAlreadyInUseException(addresses, diagnostics)
def write_rackdc_properties(cluster_workdir: PathLike, addr: str, dc: str, rack: str) -> None:
"""Write a node-local rack/DC configuration file for the snitch."""
conf_dir = os.path.realpath(f"{cluster_workdir}/{addr}/conf")
os.makedirs(conf_dir, exist_ok=True)
with open(f"{conf_dir}/cassandra-rackdc.properties", "w") as f:
f.write(f"dc={dc}\n")
f.write(f"rack={rack}\n")
def prepare_node_conf(cluster_workdir: PathLike, addr: str, dc: str, rack: str) -> None:
"""Populate a node-local conf directory and apply its rack/DC settings."""
node_workdir = os.path.realpath(f"{cluster_workdir}/{addr}")
conf_dir = f"{node_workdir}/conf"
if not os.path.exists(conf_dir):
shutil.copytree(os.path.realpath("../conf"), conf_dir)
write_rackdc_properties(cluster_workdir=cluster_workdir, addr=addr, dc=dc, rack=rack)
async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str, seed: str, cluster_name: str, extra_opts: list[str]) -> Process:
"""Starts a Scylla node.
Its --workdir will be $cluster_workdir/$addr/, its log file will be $cluster_workdir/$addr.log,
@@ -364,12 +380,13 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
# The directory change to it happens via the cwd=cluster_workdir in run()
llvm_profile_file = f"{addr}-%m.profraw"
scylla_workdir = f"{addr}"
scylla_home = os.path.realpath(f"{cluster_workdir}/{scylla_workdir}")
logfile = f"{addr}.log"
socket = maintenance_socket_path(cluster_workdir, addr)
command = [
"env",
f"LLVM_PROFILE_FILE={llvm_profile_file}",
f"SCYLLA_HOME={os.path.realpath(os.getcwd())}", # We assume that the script has Scylla's `conf/` as its filesystem neighbour.
f"SCYLLA_HOME={scylla_home}",
os.path.realpath(executable),
f"--workdir={scylla_workdir}",
f"--maintenance-socket={socket}",
@@ -433,6 +450,7 @@ async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optiona
seed = addrs[0]
try:
for i in range(0, len(addrs)):
prepare_node_conf(cluster_workdir=workdir, addr=addrs[i], dc="dc1", rack=f"rack{i + 1}")
proc = await start_node(executable, addr=addrs[i], seed=seed, cluster_workdir=workdir, cluster_name=cluster_name, extra_opts=extra_opts+cpuset_args[i])
procs.append(proc)
await wait_for_node(proc, addrs[i], timeout)