GetInt() was observed to fail when the integer JSON value overflows the int32_t type, which `GetInt()` uses for storage. When this happens, rapidjson will assign a distinct 64 bit integer type to the value, and attempting to access it as 32 bit integer triggers the wrong-type error, resulting in assert failure. This was hit on the field where invoking nodetool netstats resulted in nodetool crashing when the streamed bytes amounts were higher than maxint. To avoid such bugs in the future, replace all usage of GetInt() in nodetool of GetInt64(), just to be sure. A reproducer is added to the nodetool netstats crash. Fixes: scylladb/scylladb#23394 Closes scylladb/scylladb#23395
433 lines
14 KiB
Python
433 lines
14 KiB
Python
#
|
|
# Copyright 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from test.nodetool.rest_api_mock import expected_request
|
|
from enum import Enum
|
|
|
|
import pytest
|
|
|
|
|
|
class operating_mode(Enum):
|
|
NONE = "STARTING"
|
|
STARTING = "STARTING"
|
|
NORMAL = "NORMAL"
|
|
JOINING = "JOINING"
|
|
BOOTSTRAP = "BOOTSTRAP"
|
|
LEAVING = "LEAVING"
|
|
DECOMMISSIONED = "DECOMMISSIONED"
|
|
MOVING = "MOVING"
|
|
DRAINING = "DRAINING"
|
|
DRAINED = "DRAINED"
|
|
MAINTENANCE = "MAINTENANCE"
|
|
|
|
|
|
class stream_state(Enum):
|
|
INITIALIZED = "INITIALIZED"
|
|
PREPARING = "PREPARING"
|
|
STREAMING = "STREAMING"
|
|
WAIT_COMPLETE = "WAIT_COMPLETE"
|
|
COMPLETE = "COMPLETE"
|
|
FAILED = "FAILED"
|
|
|
|
|
|
class direction(Enum):
|
|
IN = "IN"
|
|
OUT = "OUT"
|
|
|
|
|
|
class test_flag(Enum):
|
|
normal = 0
|
|
starting_mode = 1
|
|
private_ip = 2
|
|
one_stream = 3
|
|
no_streams = 4
|
|
sender_only = 5
|
|
receiver_only = 6
|
|
human_readable_short = 7
|
|
human_readable_long = 8
|
|
|
|
|
|
class SessionSummary():
|
|
|
|
def __init__(self, summaries, files):
|
|
self.total_count = 0
|
|
self.total_size = 0
|
|
self.done_count = 0
|
|
self.done_size = 0
|
|
|
|
for cf in summaries:
|
|
self.total_count += cf["files"]
|
|
self.total_size += cf["total_size"]
|
|
|
|
for file in files:
|
|
if file["current_bytes"] == file["total_bytes"]:
|
|
self.done_count += 1
|
|
self.done_size += file["current_bytes"]
|
|
|
|
|
|
def _is_starting(mode):
|
|
return mode == operating_mode.NONE or mode == operating_mode.STARTING
|
|
|
|
|
|
def _format_bytes(v, human_readable):
|
|
if v < 1024 or not human_readable:
|
|
return f"{v} bytes"
|
|
|
|
# Not general, but good enough for this test
|
|
for power, name in [(3, "GiB"), (2, "MiB"), (1, "KiB")]:
|
|
dividend = 1024 ** power
|
|
if v >= dividend:
|
|
return "{:.2f} {}".format(v / dividend, name)
|
|
|
|
|
|
def _check_output(
|
|
res,
|
|
human_readable,
|
|
mode,
|
|
streams,
|
|
read_repair_attempted,
|
|
read_repair_repaired_blocking,
|
|
read_repair_repaired_background,
|
|
messages_pending,
|
|
messages_sent,
|
|
messages_dropped,
|
|
messages_respond_pending,
|
|
messages_respond_completed):
|
|
lines = res.split("\n")
|
|
|
|
i = 0
|
|
assert lines[i] == f"Mode: {mode.value}"
|
|
|
|
if streams:
|
|
streams_by_id = {s["plan_id"]: s for s in streams}
|
|
# Streams are printed out-of-order
|
|
while streams_by_id:
|
|
i += 1
|
|
description, plan_id = lines[i].split()
|
|
assert plan_id in streams_by_id
|
|
stream = streams_by_id[plan_id]
|
|
assert description == stream["description"]
|
|
|
|
for session in stream["sessions"]:
|
|
i += 1
|
|
if session["peer"] == session["connecting"]:
|
|
assert lines[i] == f' /{session["peer"]}'
|
|
else:
|
|
assert lines[i] == f' /{session["peer"]} (using /{session["connecting"]})'
|
|
|
|
for summaries, files, action_continuous, action_perfect, target in (
|
|
(session.get("receiving_summaries", []), session.get("receiving_files", []), "Receiving", "received", "from"),
|
|
(session.get("sending_summaries", []), session.get("sending_files", []), "Sending", "sent", "to")):
|
|
if not summaries:
|
|
continue
|
|
i += 1
|
|
files = [f["value"] for f in files]
|
|
summary = SessionSummary(summaries, files)
|
|
assert lines[i] == " {} {} files, {} total. Already {} {} files, {} total".format(
|
|
action_continuous,
|
|
summary.total_count,
|
|
_format_bytes(summary.total_size, human_readable),
|
|
action_perfect,
|
|
summary.done_count,
|
|
_format_bytes(summary.done_size, human_readable))
|
|
for file in files:
|
|
i += 1
|
|
assert lines[i] == ' {} {}/{} bytes({}%) {} {} idx:{}/{}'.format(
|
|
file["file_name"],
|
|
file["current_bytes"],
|
|
file["total_bytes"],
|
|
int(file["current_bytes"]/file["total_bytes"] * 100),
|
|
action_perfect,
|
|
target,
|
|
file["session_index"],
|
|
file["peer"])
|
|
|
|
del streams_by_id[plan_id]
|
|
else:
|
|
i += 1
|
|
assert lines[i] == "Not sending any streams."
|
|
|
|
i += 1
|
|
|
|
if _is_starting(mode):
|
|
assert lines[i] == ""
|
|
assert i == len(lines) - 1
|
|
return
|
|
else:
|
|
assert lines[i] == "Read Repair Statistics:"
|
|
|
|
i += 1
|
|
assert lines[i] == f"Attempted: {read_repair_attempted}"
|
|
i += 1
|
|
assert lines[i] == f"Mismatch (Blocking): {read_repair_repaired_blocking}"
|
|
i += 1
|
|
assert lines[i] == f"Mismatch (Background): {read_repair_repaired_background}"
|
|
|
|
line_fmt = "{:<25}{:>10}{:>10}{:>15}{:>10}"
|
|
|
|
i += 1
|
|
assert lines[i] == line_fmt.format("Pool Name", "Active", "Pending", "Completed", "Dropped")
|
|
|
|
def sum_nodes(nodes):
|
|
return sum([n["value"] for n in nodes])
|
|
|
|
i += 1
|
|
assert lines[i] == line_fmt.format("Large messages", "n/a", sum_nodes(messages_pending),
|
|
sum_nodes(messages_sent), 0)
|
|
i += 1
|
|
assert lines[i] == line_fmt.format("Small messages", "n/a", sum_nodes(messages_respond_pending),
|
|
sum_nodes(messages_respond_completed), 0)
|
|
i += 1
|
|
assert lines[i] == line_fmt.format("Gossip messages", "n/a", 0, 0, 0)
|
|
|
|
i += 1
|
|
assert lines[i] == ""
|
|
assert i == len(lines) - 1
|
|
|
|
|
|
@pytest.mark.parametrize("flag", (
|
|
test_flag.normal,
|
|
test_flag.starting_mode,
|
|
test_flag.private_ip,
|
|
test_flag.one_stream,
|
|
test_flag.no_streams,
|
|
test_flag.sender_only,
|
|
test_flag.receiver_only,
|
|
test_flag.human_readable_short,
|
|
test_flag.human_readable_long))
|
|
def test_netstats(nodetool, flag):
|
|
mode = operating_mode.STARTING if flag == test_flag.starting_mode else operating_mode.NORMAL
|
|
streams = [
|
|
{
|
|
"plan_id": "1e77eb26-a372-4eb4-aeaa-72f224cf6b4c",
|
|
"description": "description1",
|
|
"sessions": [
|
|
{
|
|
"peer": "127.0.0.1",
|
|
"session_index": 1,
|
|
"connecting": "127.1.0.1",
|
|
"receiving_summaries": [
|
|
{
|
|
"cf_id": "eee7eb26-a372-4eb4-aeaa-72f224cf6b4c",
|
|
"files": 3,
|
|
"total_size": 13832,
|
|
},
|
|
{
|
|
"cf_id": "fff7eb26-a372-4eb4-aeaa-72f224cf6b4c",
|
|
"files": 2,
|
|
"total_size": 3832,
|
|
},
|
|
],
|
|
"sending_summaries": [
|
|
{
|
|
"cf_id": "eee7eb26-a372-4eb4-aeaa-72f224cf6b4c",
|
|
"files": 1,
|
|
"total_size": 3832,
|
|
},
|
|
{
|
|
"cf_id": "fff7eb26-a372-4eb4-aeaa-72f224cf6b4c",
|
|
"files": 1,
|
|
"total_size": 3832,
|
|
},
|
|
],
|
|
"state": stream_state.STREAMING.value,
|
|
"receiving_files": [
|
|
{
|
|
"key": "file1",
|
|
"value": {
|
|
"peer": "127.0.0.1",
|
|
"session_index": 1,
|
|
"file_name": "me-3ge2_0ly2_3e65c2erzwxcn7tws3-big-Data.db",
|
|
"direction": direction.IN.value,
|
|
"current_bytes": 134,
|
|
"total_bytes": 96387798789,
|
|
},
|
|
},
|
|
{
|
|
"key": "file2",
|
|
"value": {
|
|
"peer": "127.0.0.1",
|
|
"session_index": 1,
|
|
"file_name": "me-3ge2_0ly2_3e65c2erzwxcn7tws3-big-Index.db",
|
|
"direction": direction.IN.value,
|
|
"current_bytes": 963,
|
|
"total_bytes": 963,
|
|
},
|
|
},
|
|
],
|
|
"sending_files": [
|
|
{
|
|
"key": "file3",
|
|
"value": {
|
|
"peer": "127.0.0.1",
|
|
"session_index": 1,
|
|
"file_name": "me-3ge2_0ly2_3we0g2vlf404p2fbjn-big-Index.db",
|
|
"direction": direction.OUT.value,
|
|
"current_bytes": 34,
|
|
"total_bytes": 603,
|
|
},
|
|
},
|
|
{
|
|
"key": "file4",
|
|
"value": {
|
|
"peer": "127.0.0.1",
|
|
"session_index": 1,
|
|
"file_name": "me-3ge2_0ly2_3we0g2vlf404p2fbjn-big-Filter.db",
|
|
"direction": direction.OUT.value,
|
|
"current_bytes": 63,
|
|
"total_bytes": 63,
|
|
},
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
{
|
|
"plan_id": "2e87eb26-a372-4af4-aeaa-72f224cf6b4c",
|
|
"description": "description2",
|
|
"sessions": [
|
|
{
|
|
"peer": "127.0.0.3",
|
|
"session_index": 2,
|
|
"connecting": "127.1.0.3",
|
|
"receiving_summaries": [],
|
|
"sending_summaries": [],
|
|
"state": stream_state.STREAMING.value,
|
|
"receiving_files": [],
|
|
"sending_files": [],
|
|
},
|
|
{
|
|
"peer": "127.0.0.3",
|
|
"session_index": 3,
|
|
"connecting": "127.1.0.3",
|
|
"receiving_summaries": [],
|
|
"sending_summaries": [],
|
|
"state": stream_state.STREAMING.value,
|
|
"receiving_files": [],
|
|
"sending_files": [],
|
|
},
|
|
],
|
|
}
|
|
]
|
|
|
|
if flag == test_flag.private_ip:
|
|
streams[0]["sessions"][0]["connecting"] = "52.65.9.1"
|
|
elif flag == test_flag.one_stream:
|
|
del streams[1]
|
|
elif flag == test_flag.no_streams or flag == test_flag.starting_mode:
|
|
streams.clear()
|
|
elif flag == test_flag.sender_only:
|
|
for session in streams[0]["sessions"]:
|
|
del session["receiving_summaries"]
|
|
del session["receiving_files"]
|
|
elif flag == test_flag.receiver_only:
|
|
for session in streams[0]["sessions"]:
|
|
del session["sending_summaries"]
|
|
del session["sending_files"]
|
|
|
|
read_repair_attempted = 78680
|
|
read_repair_repaired_blocking = 78678
|
|
read_repair_repaired_background = 2
|
|
messages_pending = [
|
|
{
|
|
"key": "127.0.0.1",
|
|
"value": 4,
|
|
},
|
|
{
|
|
"key": "127.0.0.3",
|
|
"value": 2,
|
|
},
|
|
]
|
|
messages_sent = [
|
|
{
|
|
"key": "127.0.0.1",
|
|
"value": 48796,
|
|
},
|
|
{
|
|
"key": "127.0.0.3",
|
|
"value": 987,
|
|
},
|
|
]
|
|
messages_dropped = [
|
|
{
|
|
"key": "127.0.0.1",
|
|
"value": 0,
|
|
},
|
|
{
|
|
"key": "127.0.0.3",
|
|
"value": 0,
|
|
},
|
|
]
|
|
messages_respond_pending = [
|
|
{
|
|
"key": "127.0.0.1",
|
|
"value": 3,
|
|
},
|
|
{
|
|
"key": "127.0.0.3",
|
|
"value": 1,
|
|
},
|
|
]
|
|
messages_respond_completed = [
|
|
{
|
|
"key": "127.0.0.1",
|
|
"value": 97870,
|
|
},
|
|
{
|
|
"key": "127.0.0.3",
|
|
"value": 980,
|
|
},
|
|
]
|
|
|
|
expected_requests = [
|
|
expected_request("GET", "/storage_service/operation_mode", response=mode.value),
|
|
expected_request("GET", "/stream_manager/", response=streams),
|
|
expected_request("GET", "/storage_service/is_starting", response=_is_starting(mode)),
|
|
]
|
|
|
|
if not _is_starting(mode):
|
|
expected_requests += [
|
|
expected_request("GET", "/storage_proxy/read_repair_attempted", response=read_repair_attempted),
|
|
expected_request("GET", "/storage_proxy/read_repair_repaired_blocking",
|
|
response=read_repair_repaired_blocking),
|
|
expected_request("GET", "/storage_proxy/read_repair_repaired_background",
|
|
response=read_repair_repaired_background),
|
|
expected_request("GET", "/messaging_service/messages/pending", response=messages_pending),
|
|
expected_request("GET", "/messaging_service/messages/sent", response=messages_sent),
|
|
expected_request("GET", "/messaging_service/messages/dropped", response=messages_dropped,
|
|
multiple=expected_request.ANY),
|
|
expected_request("GET", "/messaging_service/messages/respond_pending", response=messages_respond_pending),
|
|
expected_request("GET", "/messaging_service/messages/respond_completed",
|
|
response=messages_respond_completed),
|
|
]
|
|
|
|
args = ["netstats"]
|
|
if flag == test_flag.human_readable_short:
|
|
args.append("-H")
|
|
human_readable = True
|
|
elif flag == test_flag.human_readable_long:
|
|
args.append("--human-readable")
|
|
human_readable = True
|
|
else:
|
|
human_readable = False
|
|
|
|
res = nodetool(*args, expected_requests=expected_requests)
|
|
|
|
_check_output(
|
|
res.stdout,
|
|
human_readable,
|
|
mode,
|
|
streams,
|
|
read_repair_attempted,
|
|
read_repair_repaired_blocking,
|
|
read_repair_repaired_background,
|
|
messages_pending,
|
|
messages_sent,
|
|
messages_dropped,
|
|
messages_respond_pending,
|
|
messages_respond_completed,
|
|
)
|