In the tablestats (cfstats) command. Fixes: https://github.com/scylladb/scylladb/issues/27962 Closes scylladb/scylladb#27965
523 lines
22 KiB
Python
523 lines
22 KiB
Python
#
|
|
# Copyright 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import json
|
|
import math
|
|
import pytest
|
|
import random
|
|
import re
|
|
import statistics
|
|
import yaml
|
|
from collections import defaultdict
|
|
from textwrap import indent
|
|
from typing import NamedTuple
|
|
from test.nodetool.rest_api_mock import expected_request
|
|
|
|
|
|
class Table(NamedTuple):
|
|
ks: str
|
|
cf: str
|
|
type: str
|
|
|
|
|
|
def response_from_list(keys, values):
|
|
return [dict(zip(keys, value)) for value in values]
|
|
|
|
|
|
def histogram(count=0, sum_=0, min_=0, max_=0, variance=0, mean=0, sample=[]):
|
|
assert count == len(sample)
|
|
result = {
|
|
'count': count,
|
|
'sum': sum_,
|
|
'min': min_,
|
|
'max': max_,
|
|
'variance': variance,
|
|
'mean': mean
|
|
}
|
|
if count == 0:
|
|
return result
|
|
result['sample'] = sample
|
|
return result
|
|
|
|
|
|
def make_random_histogram(count):
|
|
lower = 100
|
|
upper = 100
|
|
#upper = 3000
|
|
samples = [random.randint(lower, upper) for _ in range(count)]
|
|
return histogram(count,
|
|
sum(samples),
|
|
min(samples),
|
|
max(samples),
|
|
statistics.variance(samples),
|
|
statistics.mean(samples),
|
|
samples)
|
|
|
|
|
|
def make_moving_avg_and_histogram(count):
|
|
# 1, 5, 15 minutes rates
|
|
rates = [random.random() for _ in (1, 5, 15)]
|
|
# the mean rate from startup
|
|
mean_rate = random.random()
|
|
hist = make_random_histogram(count)
|
|
return {
|
|
'meter': {
|
|
'rates': rates,
|
|
'mean_rate': mean_rate,
|
|
'count': count,
|
|
},
|
|
'hist': hist,
|
|
}
|
|
|
|
|
|
class table_stats:
|
|
def __init__(self, ks, cf):
|
|
self.ks = ks
|
|
self.cf = cf
|
|
self.read = 351
|
|
self.read_latency_hist = make_moving_avg_and_histogram(self.read)
|
|
self.write = 278
|
|
self.write_latency_hist = make_moving_avg_and_histogram(self.write)
|
|
self.live_ss_table_count = 16
|
|
self.sst_per_level = [16]
|
|
self.live_disk_space_used = 1146924
|
|
self.total_disk_space_used = 1146924
|
|
self.snapshots_size = 0
|
|
self.memtable_off_heap_size = 0
|
|
self.bloom_filter_off_heap_memory_used = 304
|
|
self.index_summary_off_heap_memory_used = 37248
|
|
self.compression_metadata_off_heap_memory_used = 0
|
|
self.compression_ratio = 0
|
|
self.estimated_row_count = 100
|
|
self.memtable_columns_count = 0
|
|
self.memtable_live_data_size = 0
|
|
self.memtable_switch_count = 16
|
|
self.read_latency = 198
|
|
self.write_latency = 152
|
|
self.pending_flushes = 0
|
|
# it's but a dummy value, scylla does not support it.
|
|
self.percent_repaired = 0.0
|
|
self.bloom_filter_false_positives = 0
|
|
self.recent_bloom_filter_false_ratio = 0.0
|
|
self.bloom_filter_disk_space_used = 304
|
|
self.min_row_size = 216
|
|
self.max_row_size = 258
|
|
self.mean_row_size = 258
|
|
self.live_scanned_histogram = histogram()
|
|
self.tombstone_scanned_histogram = histogram()
|
|
# it's but a dummy value, scylla does not support it.
|
|
self.dropped_mutations = 0
|
|
|
|
@property
|
|
def table_name(self):
|
|
return f'{self.ks}:{self.cf}'
|
|
|
|
@property
|
|
def local_read_count(self):
|
|
return self.read_latency_hist['hist']['count']
|
|
|
|
@property
|
|
def local_read_latency(self):
|
|
return self.read_latency_hist['hist']['mean']
|
|
|
|
@property
|
|
def local_write_count(self):
|
|
return self.write_latency_hist['hist']['count']
|
|
|
|
@property
|
|
def local_write_latency(self):
|
|
return self.write_latency_hist['hist']['mean']
|
|
|
|
@property
|
|
def total_off_heap_memory_used(self):
|
|
return (self.memtable_off_heap_size +
|
|
self.bloom_filter_off_heap_memory_used +
|
|
self.index_summary_off_heap_memory_used +
|
|
self.compression_metadata_off_heap_memory_used)
|
|
|
|
def sstable_count_in_each_level(self):
|
|
# scylla hardwire FANOUT_SIZE to 10
|
|
sstable_fanout_size = 10
|
|
for level, count in enumerate(self.sst_per_level):
|
|
if level == 0:
|
|
max_count = 4
|
|
else:
|
|
max_count = int(max.pow(sstable_fanout_size, level))
|
|
if count > max_count:
|
|
yield f'{count}/{max_count}'
|
|
else:
|
|
yield f'{count}'
|
|
|
|
@property
|
|
def sstables_in_each_level(self):
|
|
return ', '.join(self.sstable_count_in_each_level())
|
|
|
|
@property
|
|
def avg_live_cells_per_slice(self):
|
|
return self.live_scanned_histogram['mean']
|
|
|
|
@property
|
|
def max_live_cells_per_slice(self):
|
|
return self.live_scanned_histogram['max']
|
|
|
|
@property
|
|
def avg_tombstones_per_slice(self):
|
|
return self.tombstone_scanned_histogram['mean']
|
|
|
|
@property
|
|
def max_tombstones_per_slice(self):
|
|
return self.tombstone_scanned_histogram['max']
|
|
|
|
def req(self, name, response, **kwargs):
|
|
return expected_request('GET', f'/column_family/metrics/{name}/{self.table_name}',
|
|
response=response, **kwargs)
|
|
|
|
def hist(self, name, response):
|
|
return expected_request('GET', f'/column_family/metrics/{name}/moving_average_histogram/{self.table_name}',
|
|
multiple=expected_request.ANY,
|
|
response=response)
|
|
|
|
def ssttables_per_level(self):
|
|
return expected_request('GET', f'/column_family/sstables/per_level/{self.table_name}',
|
|
response=self.sst_per_level)
|
|
|
|
def expected_summary_requests(self, is_scylla):
|
|
if is_scylla:
|
|
return [
|
|
self.hist('write_latency', self.write_latency_hist),
|
|
self.req('write_latency', self.write_latency),
|
|
self.hist('read_latency', self.read_latency_hist),
|
|
self.req('read_latency', self.read_latency),
|
|
self.req('pending_flushes', self.pending_flushes)]
|
|
else:
|
|
return [
|
|
self.hist('write_latency', self.write_latency_hist),
|
|
self.hist('read_latency', self.read_latency_hist),
|
|
self.req('read_latency', self.read_latency),
|
|
self.req('write_latency', self.write_latency),
|
|
self.req('pending_flushes', self.pending_flushes)]
|
|
|
|
def expected_details_requests(self, is_scylla):
|
|
if is_scylla:
|
|
return [
|
|
# scylla only requests for this metric for plain text output
|
|
self.req('live_ss_table_count', self.live_ss_table_count,
|
|
multiple=expected_request.ANY),
|
|
self.ssttables_per_level(),
|
|
self.req('live_disk_space_used', self.live_disk_space_used),
|
|
self.req('total_disk_space_used', self.total_disk_space_used),
|
|
self.req('snapshots_size', self.snapshots_size),
|
|
self.req('memtable_off_heap_size', self.memtable_off_heap_size),
|
|
self.req('bloom_filter_off_heap_memory_used', self.bloom_filter_off_heap_memory_used),
|
|
self.req('index_summary_off_heap_memory_used', self.index_summary_off_heap_memory_used),
|
|
self.req('compression_metadata_off_heap_memory_used', self.compression_metadata_off_heap_memory_used),
|
|
self.req('compression_ratio', self.compression_ratio),
|
|
self.req('estimated_row_count', self.estimated_row_count),
|
|
self.req('memtable_columns_count', self.memtable_columns_count),
|
|
self.req('memtable_live_data_size', self.memtable_live_data_size),
|
|
self.req('memtable_switch_count', self.memtable_switch_count),
|
|
self.hist('read_latency', self.read_latency_hist),
|
|
self.hist('write_latency', self.write_latency_hist),
|
|
self.req('pending_flushes', self.pending_flushes),
|
|
self.req('bloom_filter_false_positives', self.bloom_filter_false_positives),
|
|
self.req('recent_bloom_filter_false_ratio', self.recent_bloom_filter_false_ratio),
|
|
self.req('bloom_filter_disk_space_used', self.bloom_filter_disk_space_used),
|
|
self.req('min_row_size', self.min_row_size),
|
|
self.req('max_row_size', self.max_row_size),
|
|
self.req('mean_row_size', self.mean_row_size),
|
|
self.req('live_scanned_histogram', self.live_scanned_histogram),
|
|
self.req('tombstone_scanned_histogram', self.tombstone_scanned_histogram),
|
|
]
|
|
else:
|
|
return [
|
|
self.req('live_ss_table_count', self.live_ss_table_count),
|
|
self.ssttables_per_level(),
|
|
self.req('memtable_off_heap_size', self.memtable_off_heap_size),
|
|
self.req('bloom_filter_off_heap_memory_used', self.bloom_filter_off_heap_memory_used),
|
|
self.req('index_summary_off_heap_memory_used', self.index_summary_off_heap_memory_used),
|
|
self.req('compression_metadata_off_heap_memory_used', self.compression_metadata_off_heap_memory_used),
|
|
self.req('live_disk_space_used', self.live_disk_space_used),
|
|
self.req('total_disk_space_used', self.total_disk_space_used),
|
|
self.req('snapshots_size', self.snapshots_size),
|
|
self.req('compression_ratio', self.compression_ratio),
|
|
self.req('estimated_row_count', self.estimated_row_count),
|
|
self.req('memtable_columns_count', self.memtable_columns_count),
|
|
self.req('memtable_live_data_size', self.memtable_live_data_size),
|
|
self.req('memtable_switch_count', self.memtable_switch_count),
|
|
self.hist('read_latency', self.read_latency_hist),
|
|
self.hist('write_latency', self.write_latency_hist),
|
|
self.req('pending_flushes', self.pending_flushes),
|
|
self.req('bloom_filter_false_positives', self.bloom_filter_false_positives),
|
|
self.req('recent_bloom_filter_false_ratio', self.recent_bloom_filter_false_ratio),
|
|
self.req('bloom_filter_disk_space_used', self.bloom_filter_disk_space_used),
|
|
self.req('min_row_size', self.min_row_size),
|
|
self.req('max_row_size', self.max_row_size),
|
|
self.req('mean_row_size', self.mean_row_size),
|
|
self.req('live_scanned_histogram', self.live_scanned_histogram),
|
|
self.req('tombstone_scanned_histogram', self.tombstone_scanned_histogram),
|
|
]
|
|
|
|
def format(self):
|
|
return f'''\
|
|
Table: {self.cf}
|
|
SSTable count: {self.live_ss_table_count}
|
|
SSTables in each level: [{self.sstables_in_each_level}]
|
|
Space used (live): {self.live_disk_space_used}
|
|
Space used (total): {self.total_disk_space_used}
|
|
Space used by snapshots (total): {self.snapshots_size}
|
|
Off heap memory used (total): {self.total_off_heap_memory_used}
|
|
SSTable Compression Ratio: {self.compression_ratio:.2f}
|
|
Number of partitions (estimate): {self.estimated_row_count}
|
|
Memtable cell count: {self.memtable_columns_count}
|
|
Memtable data size: {self.memtable_live_data_size}
|
|
Memtable off heap memory used: {self.memtable_off_heap_size}
|
|
Memtable switch count: {self.memtable_switch_count}
|
|
Local read count: {self.read}
|
|
Local read latency: {self.local_read_latency / 1000:.3f} ms
|
|
Local write count: {self.write}
|
|
Local write latency: {self.local_write_latency / 1000:.3f} ms
|
|
Pending flushes: {self.pending_flushes}
|
|
Percent repaired: {self.percent_repaired}
|
|
Bloom filter false positives: {self.bloom_filter_false_positives}
|
|
Bloom filter false ratio: {self.recent_bloom_filter_false_ratio:.5f}
|
|
Bloom filter space used: {self.bloom_filter_disk_space_used}
|
|
Bloom filter off heap memory used: {self.bloom_filter_off_heap_memory_used}
|
|
Index summary off heap memory used: {self.index_summary_off_heap_memory_used}
|
|
Compression metadata off heap memory used: {self.compression_metadata_off_heap_memory_used}
|
|
Compacted partition minimum bytes: {self.min_row_size}
|
|
Compacted partition maximum bytes: {self.max_row_size}
|
|
Compacted partition mean bytes: {self.mean_row_size}
|
|
Average live cells per slice (last five minutes): {self.avg_live_cells_per_slice:.1f}
|
|
Maximum live cells per slice (last five minutes): {self.max_live_cells_per_slice}
|
|
Average tombstones per slice (last five minutes): {self.avg_tombstones_per_slice:.1f}
|
|
Maximum tombstones per slice (last five minutes): {self.max_tombstones_per_slice}
|
|
Dropped Mutations: {self.dropped_mutations}
|
|
|
|
'''
|
|
|
|
def to_map(self):
|
|
return {
|
|
'sstables_in_each_level': list(self.sstable_count_in_each_level()),
|
|
'space_used_live': f'{self.live_disk_space_used}',
|
|
'space_used_total': f'{self.total_disk_space_used}',
|
|
'space_used_by_snapshots_total': f'{self.snapshots_size}',
|
|
'off_heap_memory_used_total': f'{self.total_off_heap_memory_used}',
|
|
'sstable_compression_ratio': self.compression_ratio,
|
|
'number_of_partitions_estimate': self.estimated_row_count,
|
|
'memtable_cell_count': self.memtable_columns_count,
|
|
'memtable_data_size': f'{self.memtable_live_data_size}',
|
|
'memtable_off_heap_memory_used': f'{self.memtable_off_heap_size}',
|
|
'memtable_switch_count': self.memtable_switch_count,
|
|
'local_read_count': self.read,
|
|
'local_read_latency_ms': f'{self.local_read_latency / 1000:.3f}',
|
|
'local_write_count': self.write,
|
|
'local_write_latency_ms': f'{self.local_write_latency / 1000:.3f}',
|
|
'pending_flushes': self.pending_flushes,
|
|
'percent_repaired': self.percent_repaired,
|
|
'bloom_filter_false_positives': self.bloom_filter_false_positives,
|
|
'bloom_filter_false_ratio': f'{self.recent_bloom_filter_false_ratio:01.5f}',
|
|
'bloom_filter_space_used': f'{self.bloom_filter_disk_space_used}',
|
|
'bloom_filter_off_heap_memory_used': f'{self.bloom_filter_off_heap_memory_used}',
|
|
'index_summary_off_heap_memory_used': f'{self.index_summary_off_heap_memory_used}',
|
|
'compression_metadata_off_heap_memory_used': f'{self.compression_metadata_off_heap_memory_used}',
|
|
'compacted_partition_minimum_bytes': self.min_row_size,
|
|
'compacted_partition_maximum_bytes': self.max_row_size,
|
|
'compacted_partition_mean_bytes': self.mean_row_size,
|
|
'average_live_cells_per_slice_last_five_minutes': self.avg_live_cells_per_slice,
|
|
'maximum_live_cells_per_slice_last_five_minutes': self.max_live_cells_per_slice,
|
|
'average_tombstones_per_slice_last_five_minutes': self.avg_tombstones_per_slice,
|
|
'maximum_tombstones_per_slice_last_five_minutes': self.max_live_cells_per_slice,
|
|
'dropped_mutations': f'{self.dropped_mutations}',
|
|
}
|
|
|
|
|
|
class scientific_notation:
|
|
# Python and {fmt} prints a float like "1.234E-04",
|
|
# while Java prints like "1.23E-4"
|
|
def __init__(self, value, is_scylla):
|
|
self.value = value
|
|
self.is_scylla = is_scylla
|
|
|
|
def __format__(self, _):
|
|
if self.is_scylla:
|
|
if math.isnan(self.value):
|
|
return 'nan'
|
|
return f'{self.value:.15E}'
|
|
else:
|
|
if math.isnan(self.value):
|
|
return 'NaN'
|
|
matched = re.match(r'(\d.\d+)E(\+|\-)(\d+)', f'{self.value:.15E}')
|
|
assert matched
|
|
m, sign, e = matched.group(1), matched.group(2), int(matched.group(3))
|
|
return f'{m}E{sign}{e}'
|
|
|
|
|
|
class keyspace_stats:
|
|
def __init__(self, is_scylla):
|
|
self.tables = []
|
|
self.read_count = 0
|
|
self.total_read_time = 0
|
|
self.write_count = 0
|
|
self.total_write_time = 0
|
|
self.pending_flushes = 0
|
|
self.is_scylla = is_scylla
|
|
|
|
def add_table(self, table):
|
|
self.tables.append(table)
|
|
|
|
if table.read > 0:
|
|
self.read_count += table.local_read_count
|
|
self.total_read_time += table.read_latency
|
|
|
|
if table.local_write_count > 0:
|
|
self.write_count = table.local_write_count
|
|
self.total_write_time += table.write_latency
|
|
|
|
self.pending_flushes += table.pending_flushes
|
|
|
|
@property
|
|
def read_latency(self):
|
|
if self.read_count == 0:
|
|
v = math.nan
|
|
else:
|
|
v = self.total_read_time / self.read_count / 1000
|
|
return scientific_notation(v, self.is_scylla)
|
|
|
|
@property
|
|
def write_latency(self):
|
|
if self.write_count == 0:
|
|
v = math.nan
|
|
else:
|
|
v = self.total_write_time / self.write_count / 1000
|
|
return scientific_notation(v, self.is_scylla)
|
|
|
|
def to_map(self, is_scylla):
|
|
m = {
|
|
'read_count': self.read_count,
|
|
'read_latency_ms': self.read_latency.value,
|
|
'write_count': self.write_count,
|
|
'write_latency_ms': self.write_latency.value,
|
|
'pending_flushes': self.pending_flushes,
|
|
}
|
|
if not is_scylla:
|
|
# cassandra nodetool has a duplicated item
|
|
m['read_latency'] = self.read_latency.value
|
|
return m
|
|
|
|
|
|
@pytest.mark.parametrize('command,args,tables_to_print',
|
|
[
|
|
('tablestats', [], ['keyspace1.standard1', 'system.local']),
|
|
('tablestats', ['keyspace1'], ['keyspace1.standard1']),
|
|
('tablestats', ['keyspace1.standard1'], ['keyspace1.standard1']),
|
|
('cfstats', [], ['keyspace1.standard1', 'system.local']),
|
|
])
|
|
def test_plain_text_output(request, nodetool, command, args, tables_to_print):
|
|
is_scylla = request.config.getoption("nodetool") == 'scylla'
|
|
|
|
tables = [Table('keyspace1', 'standard1', 'ColumnFamilies'),
|
|
Table('system', 'local', 'ColumnFamilies')]
|
|
expected_requests = [expected_request(
|
|
'GET', '/column_family/',
|
|
multiple=expected_request.MULTIPLE,
|
|
response=response_from_list(['ks', 'cf', 'type'], tables))]
|
|
|
|
total_nr_tables = 0
|
|
keyspaces = defaultdict(lambda: keyspace_stats(is_scylla))
|
|
for table in tables:
|
|
total_nr_tables += 1
|
|
stats = table_stats(table.ks, table.cf)
|
|
included = f'{stats.ks}.{stats.cf}' in tables_to_print
|
|
if included:
|
|
if not is_scylla:
|
|
# scylla tallies the table stats afterwards
|
|
expected_requests += stats.expected_summary_requests(is_scylla)
|
|
ks = keyspaces[table.ks]
|
|
ks.add_table(stats)
|
|
|
|
if not is_scylla:
|
|
expected_requests.append(expected_request(
|
|
'GET', '/storage_service/keyspaces',
|
|
response=sorted(keyspaces.keys())))
|
|
expected_output = f'''\
|
|
Total number of tables: {total_nr_tables}
|
|
----------------
|
|
'''
|
|
for ks_name in sorted(keyspaces.keys()):
|
|
keyspace = keyspaces[ks_name]
|
|
if is_scylla:
|
|
for table in keyspace.tables:
|
|
# cassandra nodetool tallies the table stats in the loop above
|
|
expected_requests += table.expected_summary_requests(is_scylla)
|
|
expected_output += f'''\
|
|
Keyspace : {ks_name}
|
|
\tRead Count: {keyspace.read_count}
|
|
\tRead Latency: {keyspace.read_latency:.15E} ms
|
|
\tWrite Count: {keyspace.write_count}
|
|
\tWrite Latency: {keyspace.write_latency:.15E} ms
|
|
\tPending Flushes: {keyspace.pending_flushes}
|
|
'''
|
|
for table in keyspace.tables:
|
|
expected_requests.extend(table.expected_details_requests(is_scylla))
|
|
expected_output += indent(table.format(), '\t\t')
|
|
expected_output += '----------------\n'
|
|
|
|
res = nodetool(command, *args, expected_requests=expected_requests)
|
|
actual_output = res.stdout
|
|
assert actual_output == expected_output
|
|
|
|
|
|
@pytest.mark.parametrize('output_format', ['json', 'yaml'])
|
|
def test_output_format(request, nodetool, output_format):
|
|
is_scylla = request.config.getoption("nodetool") == 'scylla'
|
|
|
|
tables = [Table('keyspace1', 'standard1', 'ColumnFamilies'),
|
|
Table('system', 'local', 'ColumnFamilies')]
|
|
expected_requests = [expected_request(
|
|
'GET', '/column_family/',
|
|
multiple=expected_request.MULTIPLE,
|
|
response=response_from_list(['ks', 'cf', 'type'], tables))]
|
|
|
|
keyspaces = defaultdict(lambda: keyspace_stats(is_scylla))
|
|
for table in tables:
|
|
stats = table_stats(table.ks, table.cf)
|
|
if not is_scylla:
|
|
# scylla tallies the table stats afterwards
|
|
expected_requests += stats.expected_summary_requests(is_scylla)
|
|
keyspaces[table.ks].add_table(stats)
|
|
|
|
if not is_scylla:
|
|
expected_requests.append(expected_request(
|
|
'GET', '/storage_service/keyspaces',
|
|
response=sorted(keyspaces.keys())))
|
|
total_nr_tables = sum(len(ks.tables) for ks in keyspaces.values())
|
|
expected_dict = {'total_number_of_tables': total_nr_tables}
|
|
for ks_name in sorted(keyspaces.keys()):
|
|
keyspace = keyspaces[ks_name]
|
|
if is_scylla:
|
|
for table in keyspace.tables:
|
|
# cassandra nodetool tallies the table stats in the loop above
|
|
expected_requests += table.expected_summary_requests(is_scylla)
|
|
expected_dict[ks_name] = keyspace.to_map(is_scylla)
|
|
|
|
expected_tables = {}
|
|
for table in keyspace.tables:
|
|
expected_requests.extend(table.expected_details_requests(is_scylla))
|
|
expected_tables[table.cf] = table.to_map()
|
|
expected_dict[ks_name]['tables'] = expected_tables
|
|
|
|
parsers = {
|
|
'yaml': lambda m: yaml.load(m, Loader=yaml.Loader),
|
|
'json': json.loads
|
|
}
|
|
|
|
res = nodetool('tablestats', '--format', output_format,
|
|
expected_requests=expected_requests)
|
|
actual_output = res.stdout
|
|
actual_dict = parsers[output_format](actual_output)
|
|
|
|
assert actual_dict == expected_dict
|