Files
scylladb/test/nodetool/test_tablestats.py

523 lines
22 KiB
Python

#
# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import json
import math
import pytest
import random
import re
import statistics
import yaml
from collections import defaultdict
from textwrap import indent
from typing import NamedTuple
from test.nodetool.rest_api_mock import expected_request
class Table(NamedTuple):
ks: str
cf: str
type: str
def response_from_list(keys, values):
return [dict(zip(keys, value)) for value in values]
def histogram(count=0, sum_=0, min_=0, max_=0, variance=0, mean=0, sample=[]):
assert count == len(sample)
result = {
'count': count,
'sum': sum_,
'min': min_,
'max': max_,
'variance': variance,
'mean': mean
}
if count == 0:
return result
result['sample'] = sample
return result
def make_random_histogram(count):
lower = 100
upper = 100
#upper = 3000
samples = [random.randint(lower, upper) for _ in range(count)]
return histogram(count,
sum(samples),
min(samples),
max(samples),
statistics.variance(samples),
statistics.mean(samples),
samples)
def make_moving_avg_and_histogram(count):
# 1, 5, 15 minutes rates
rates = [random.random() for _ in (1, 5, 15)]
# the mean rate from startup
mean_rate = random.random()
hist = make_random_histogram(count)
return {
'meter': {
'rates': rates,
'mean_rate': mean_rate,
'count': count,
},
'hist': hist,
}
class table_stats:
def __init__(self, ks, cf):
self.ks = ks
self.cf = cf
self.read = 351
self.read_latency_hist = make_moving_avg_and_histogram(self.read)
self.write = 278
self.write_latency_hist = make_moving_avg_and_histogram(self.write)
self.live_ss_table_count = 16
self.sst_per_level = [16]
self.live_disk_space_used = 1146924
self.total_disk_space_used = 1146924
self.snapshots_size = 0
self.memtable_off_heap_size = 0
self.bloom_filter_off_heap_memory_used = 304
self.index_summary_off_heap_memory_used = 37248
self.compression_metadata_off_heap_memory_used = 0
self.compression_ratio = 0
self.estimated_row_count = 100
self.memtable_columns_count = 0
self.memtable_live_data_size = 0
self.memtable_switch_count = 16
self.read_latency = 198
self.write_latency = 152
self.pending_flushes = 0
# it's but a dummy value, scylla does not support it.
self.percent_repaired = 0.0
self.bloom_filter_false_positives = 0
self.recent_bloom_filter_false_ratio = 0.0
self.bloom_filter_disk_space_used = 304
self.min_row_size = 216
self.max_row_size = 258
self.mean_row_size = 258
self.live_scanned_histogram = histogram()
self.tombstone_scanned_histogram = histogram()
# it's but a dummy value, scylla does not support it.
self.dropped_mutations = 0
@property
def table_name(self):
return f'{self.ks}:{self.cf}'
@property
def local_read_count(self):
return self.read_latency_hist['hist']['count']
@property
def local_read_latency(self):
return self.read_latency_hist['hist']['mean']
@property
def local_write_count(self):
return self.write_latency_hist['hist']['count']
@property
def local_write_latency(self):
return self.write_latency_hist['hist']['mean']
@property
def total_off_heap_memory_used(self):
return (self.memtable_off_heap_size +
self.bloom_filter_off_heap_memory_used +
self.index_summary_off_heap_memory_used +
self.compression_metadata_off_heap_memory_used)
def sstable_count_in_each_level(self):
# scylla hardwire FANOUT_SIZE to 10
sstable_fanout_size = 10
for level, count in enumerate(self.sst_per_level):
if level == 0:
max_count = 4
else:
max_count = int(max.pow(sstable_fanout_size, level))
if count > max_count:
yield f'{count}/{max_count}'
else:
yield f'{count}'
@property
def sstables_in_each_level(self):
return ', '.join(self.sstable_count_in_each_level())
@property
def avg_live_cells_per_slice(self):
return self.live_scanned_histogram['mean']
@property
def max_live_cells_per_slice(self):
return self.live_scanned_histogram['max']
@property
def avg_tombstones_per_slice(self):
return self.tombstone_scanned_histogram['mean']
@property
def max_tombstones_per_slice(self):
return self.tombstone_scanned_histogram['max']
def req(self, name, response, **kwargs):
return expected_request('GET', f'/column_family/metrics/{name}/{self.table_name}',
response=response, **kwargs)
def hist(self, name, response):
return expected_request('GET', f'/column_family/metrics/{name}/moving_average_histogram/{self.table_name}',
multiple=expected_request.ANY,
response=response)
def ssttables_per_level(self):
return expected_request('GET', f'/column_family/sstables/per_level/{self.table_name}',
response=self.sst_per_level)
def expected_summary_requests(self, is_scylla):
if is_scylla:
return [
self.hist('write_latency', self.write_latency_hist),
self.req('write_latency', self.write_latency),
self.hist('read_latency', self.read_latency_hist),
self.req('read_latency', self.read_latency),
self.req('pending_flushes', self.pending_flushes)]
else:
return [
self.hist('write_latency', self.write_latency_hist),
self.hist('read_latency', self.read_latency_hist),
self.req('read_latency', self.read_latency),
self.req('write_latency', self.write_latency),
self.req('pending_flushes', self.pending_flushes)]
def expected_details_requests(self, is_scylla):
if is_scylla:
return [
# scylla only requests for this metric for plain text output
self.req('live_ss_table_count', self.live_ss_table_count,
multiple=expected_request.ANY),
self.ssttables_per_level(),
self.req('live_disk_space_used', self.live_disk_space_used),
self.req('total_disk_space_used', self.total_disk_space_used),
self.req('snapshots_size', self.snapshots_size),
self.req('memtable_off_heap_size', self.memtable_off_heap_size),
self.req('bloom_filter_off_heap_memory_used', self.bloom_filter_off_heap_memory_used),
self.req('index_summary_off_heap_memory_used', self.index_summary_off_heap_memory_used),
self.req('compression_metadata_off_heap_memory_used', self.compression_metadata_off_heap_memory_used),
self.req('compression_ratio', self.compression_ratio),
self.req('estimated_row_count', self.estimated_row_count),
self.req('memtable_columns_count', self.memtable_columns_count),
self.req('memtable_live_data_size', self.memtable_live_data_size),
self.req('memtable_switch_count', self.memtable_switch_count),
self.hist('read_latency', self.read_latency_hist),
self.hist('write_latency', self.write_latency_hist),
self.req('pending_flushes', self.pending_flushes),
self.req('bloom_filter_false_positives', self.bloom_filter_false_positives),
self.req('recent_bloom_filter_false_ratio', self.recent_bloom_filter_false_ratio),
self.req('bloom_filter_disk_space_used', self.bloom_filter_disk_space_used),
self.req('min_row_size', self.min_row_size),
self.req('max_row_size', self.max_row_size),
self.req('mean_row_size', self.mean_row_size),
self.req('live_scanned_histogram', self.live_scanned_histogram),
self.req('tombstone_scanned_histogram', self.tombstone_scanned_histogram),
]
else:
return [
self.req('live_ss_table_count', self.live_ss_table_count),
self.ssttables_per_level(),
self.req('memtable_off_heap_size', self.memtable_off_heap_size),
self.req('bloom_filter_off_heap_memory_used', self.bloom_filter_off_heap_memory_used),
self.req('index_summary_off_heap_memory_used', self.index_summary_off_heap_memory_used),
self.req('compression_metadata_off_heap_memory_used', self.compression_metadata_off_heap_memory_used),
self.req('live_disk_space_used', self.live_disk_space_used),
self.req('total_disk_space_used', self.total_disk_space_used),
self.req('snapshots_size', self.snapshots_size),
self.req('compression_ratio', self.compression_ratio),
self.req('estimated_row_count', self.estimated_row_count),
self.req('memtable_columns_count', self.memtable_columns_count),
self.req('memtable_live_data_size', self.memtable_live_data_size),
self.req('memtable_switch_count', self.memtable_switch_count),
self.hist('read_latency', self.read_latency_hist),
self.hist('write_latency', self.write_latency_hist),
self.req('pending_flushes', self.pending_flushes),
self.req('bloom_filter_false_positives', self.bloom_filter_false_positives),
self.req('recent_bloom_filter_false_ratio', self.recent_bloom_filter_false_ratio),
self.req('bloom_filter_disk_space_used', self.bloom_filter_disk_space_used),
self.req('min_row_size', self.min_row_size),
self.req('max_row_size', self.max_row_size),
self.req('mean_row_size', self.mean_row_size),
self.req('live_scanned_histogram', self.live_scanned_histogram),
self.req('tombstone_scanned_histogram', self.tombstone_scanned_histogram),
]
def format(self):
return f'''\
Table: {self.cf}
SSTable count: {self.live_ss_table_count}
SSTables in each level: [{self.sstables_in_each_level}]
Space used (live): {self.live_disk_space_used}
Space used (total): {self.total_disk_space_used}
Space used by snapshots (total): {self.snapshots_size}
Off heap memory used (total): {self.total_off_heap_memory_used}
SSTable Compression Ratio: {self.compression_ratio:.2f}
Number of partitions (estimate): {self.estimated_row_count}
Memtable cell count: {self.memtable_columns_count}
Memtable data size: {self.memtable_live_data_size}
Memtable off heap memory used: {self.memtable_off_heap_size}
Memtable switch count: {self.memtable_switch_count}
Local read count: {self.read}
Local read latency: {self.local_read_latency / 1000:.3f} ms
Local write count: {self.write}
Local write latency: {self.local_write_latency / 1000:.3f} ms
Pending flushes: {self.pending_flushes}
Percent repaired: {self.percent_repaired}
Bloom filter false positives: {self.bloom_filter_false_positives}
Bloom filter false ratio: {self.recent_bloom_filter_false_ratio:.5f}
Bloom filter space used: {self.bloom_filter_disk_space_used}
Bloom filter off heap memory used: {self.bloom_filter_off_heap_memory_used}
Index summary off heap memory used: {self.index_summary_off_heap_memory_used}
Compression metadata off heap memory used: {self.compression_metadata_off_heap_memory_used}
Compacted partition minimum bytes: {self.min_row_size}
Compacted partition maximum bytes: {self.max_row_size}
Compacted partition mean bytes: {self.mean_row_size}
Average live cells per slice (last five minutes): {self.avg_live_cells_per_slice:.1f}
Maximum live cells per slice (last five minutes): {self.max_live_cells_per_slice}
Average tombstones per slice (last five minutes): {self.avg_tombstones_per_slice:.1f}
Maximum tombstones per slice (last five minutes): {self.max_tombstones_per_slice}
Dropped Mutations: {self.dropped_mutations}
'''
def to_map(self):
return {
'sstables_in_each_level': list(self.sstable_count_in_each_level()),
'space_used_live': f'{self.live_disk_space_used}',
'space_used_total': f'{self.total_disk_space_used}',
'space_used_by_snapshots_total': f'{self.snapshots_size}',
'off_heap_memory_used_total': f'{self.total_off_heap_memory_used}',
'sstable_compression_ratio': self.compression_ratio,
'number_of_partitions_estimate': self.estimated_row_count,
'memtable_cell_count': self.memtable_columns_count,
'memtable_data_size': f'{self.memtable_live_data_size}',
'memtable_off_heap_memory_used': f'{self.memtable_off_heap_size}',
'memtable_switch_count': self.memtable_switch_count,
'local_read_count': self.read,
'local_read_latency_ms': f'{self.local_read_latency / 1000:.3f}',
'local_write_count': self.write,
'local_write_latency_ms': f'{self.local_write_latency / 1000:.3f}',
'pending_flushes': self.pending_flushes,
'percent_repaired': self.percent_repaired,
'bloom_filter_false_positives': self.bloom_filter_false_positives,
'bloom_filter_false_ratio': f'{self.recent_bloom_filter_false_ratio:01.5f}',
'bloom_filter_space_used': f'{self.bloom_filter_disk_space_used}',
'bloom_filter_off_heap_memory_used': f'{self.bloom_filter_off_heap_memory_used}',
'index_summary_off_heap_memory_used': f'{self.index_summary_off_heap_memory_used}',
'compression_metadata_off_heap_memory_used': f'{self.compression_metadata_off_heap_memory_used}',
'compacted_partition_minimum_bytes': self.min_row_size,
'compacted_partition_maximum_bytes': self.max_row_size,
'compacted_partition_mean_bytes': self.mean_row_size,
'average_live_cells_per_slice_last_five_minutes': self.avg_live_cells_per_slice,
'maximum_live_cells_per_slice_last_five_minutes': self.max_live_cells_per_slice,
'average_tombstones_per_slice_last_five_minutes': self.avg_tombstones_per_slice,
'maximum_tombstones_per_slice_last_five_minutes': self.max_live_cells_per_slice,
'dropped_mutations': f'{self.dropped_mutations}',
}
class scientific_notation:
# Python and {fmt} prints a float like "1.234E-04",
# while Java prints like "1.23E-4"
def __init__(self, value, is_scylla):
self.value = value
self.is_scylla = is_scylla
def __format__(self, _):
if self.is_scylla:
if math.isnan(self.value):
return 'nan'
return f'{self.value:.15E}'
else:
if math.isnan(self.value):
return 'NaN'
matched = re.match(r'(\d.\d+)E(\+|\-)(\d+)', f'{self.value:.15E}')
assert matched
m, sign, e = matched.group(1), matched.group(2), int(matched.group(3))
return f'{m}E{sign}{e}'
class keyspace_stats:
def __init__(self, is_scylla):
self.tables = []
self.read_count = 0
self.total_read_time = 0
self.write_count = 0
self.total_write_time = 0
self.pending_flushes = 0
self.is_scylla = is_scylla
def add_table(self, table):
self.tables.append(table)
if table.read > 0:
self.read_count += table.local_read_count
self.total_read_time += table.read_latency
if table.local_write_count > 0:
self.write_count = table.local_write_count
self.total_write_time += table.write_latency
self.pending_flushes += table.pending_flushes
@property
def read_latency(self):
if self.read_count == 0:
v = math.nan
else:
v = self.total_read_time / self.read_count / 1000
return scientific_notation(v, self.is_scylla)
@property
def write_latency(self):
if self.write_count == 0:
v = math.nan
else:
v = self.total_write_time / self.write_count / 1000
return scientific_notation(v, self.is_scylla)
def to_map(self, is_scylla):
m = {
'read_count': self.read_count,
'read_latency_ms': self.read_latency.value,
'write_count': self.write_count,
'write_latency_ms': self.write_latency.value,
'pending_flushes': self.pending_flushes,
}
if not is_scylla:
# cassandra nodetool has a duplicated item
m['read_latency'] = self.read_latency.value
return m
@pytest.mark.parametrize('command,args,tables_to_print',
[
('tablestats', [], ['keyspace1.standard1', 'system.local']),
('tablestats', ['keyspace1'], ['keyspace1.standard1']),
('tablestats', ['keyspace1.standard1'], ['keyspace1.standard1']),
('cfstats', [], ['keyspace1.standard1', 'system.local']),
])
def test_plain_text_output(request, nodetool, command, args, tables_to_print):
is_scylla = request.config.getoption("nodetool") == 'scylla'
tables = [Table('keyspace1', 'standard1', 'ColumnFamilies'),
Table('system', 'local', 'ColumnFamilies')]
expected_requests = [expected_request(
'GET', '/column_family/',
multiple=expected_request.MULTIPLE,
response=response_from_list(['ks', 'cf', 'type'], tables))]
total_nr_tables = 0
keyspaces = defaultdict(lambda: keyspace_stats(is_scylla))
for table in tables:
total_nr_tables += 1
stats = table_stats(table.ks, table.cf)
included = f'{stats.ks}.{stats.cf}' in tables_to_print
if included:
if not is_scylla:
# scylla tallies the table stats afterwards
expected_requests += stats.expected_summary_requests(is_scylla)
ks = keyspaces[table.ks]
ks.add_table(stats)
if not is_scylla:
expected_requests.append(expected_request(
'GET', '/storage_service/keyspaces',
response=sorted(keyspaces.keys())))
expected_output = f'''\
Total number of tables: {total_nr_tables}
----------------
'''
for ks_name in sorted(keyspaces.keys()):
keyspace = keyspaces[ks_name]
if is_scylla:
for table in keyspace.tables:
# cassandra nodetool tallies the table stats in the loop above
expected_requests += table.expected_summary_requests(is_scylla)
expected_output += f'''\
Keyspace : {ks_name}
\tRead Count: {keyspace.read_count}
\tRead Latency: {keyspace.read_latency:.15E} ms
\tWrite Count: {keyspace.write_count}
\tWrite Latency: {keyspace.write_latency:.15E} ms
\tPending Flushes: {keyspace.pending_flushes}
'''
for table in keyspace.tables:
expected_requests.extend(table.expected_details_requests(is_scylla))
expected_output += indent(table.format(), '\t\t')
expected_output += '----------------\n'
res = nodetool(command, *args, expected_requests=expected_requests)
actual_output = res.stdout
assert actual_output == expected_output
@pytest.mark.parametrize('output_format', ['json', 'yaml'])
def test_output_format(request, nodetool, output_format):
is_scylla = request.config.getoption("nodetool") == 'scylla'
tables = [Table('keyspace1', 'standard1', 'ColumnFamilies'),
Table('system', 'local', 'ColumnFamilies')]
expected_requests = [expected_request(
'GET', '/column_family/',
multiple=expected_request.MULTIPLE,
response=response_from_list(['ks', 'cf', 'type'], tables))]
keyspaces = defaultdict(lambda: keyspace_stats(is_scylla))
for table in tables:
stats = table_stats(table.ks, table.cf)
if not is_scylla:
# scylla tallies the table stats afterwards
expected_requests += stats.expected_summary_requests(is_scylla)
keyspaces[table.ks].add_table(stats)
if not is_scylla:
expected_requests.append(expected_request(
'GET', '/storage_service/keyspaces',
response=sorted(keyspaces.keys())))
total_nr_tables = sum(len(ks.tables) for ks in keyspaces.values())
expected_dict = {'total_number_of_tables': total_nr_tables}
for ks_name in sorted(keyspaces.keys()):
keyspace = keyspaces[ks_name]
if is_scylla:
for table in keyspace.tables:
# cassandra nodetool tallies the table stats in the loop above
expected_requests += table.expected_summary_requests(is_scylla)
expected_dict[ks_name] = keyspace.to_map(is_scylla)
expected_tables = {}
for table in keyspace.tables:
expected_requests.extend(table.expected_details_requests(is_scylla))
expected_tables[table.cf] = table.to_map()
expected_dict[ks_name]['tables'] = expected_tables
parsers = {
'yaml': lambda m: yaml.load(m, Loader=yaml.Loader),
'json': json.loads
}
res = nodetool('tablestats', '--format', output_format,
expected_requests=expected_requests)
actual_output = res.stdout
actual_dict = parsers[output_format](actual_output)
assert actual_dict == expected_dict