expaneed tests

This commit is contained in:
Dominik Krenn 2025-11-05 10:11:46 +01:00
parent 8986a515e2
commit 5336eb2c16
4 changed files with 241 additions and 3 deletions

211
collatz.py Normal file
View File

@ -0,0 +1,211 @@
#!/usr/bin/env python3
import curses
import time
import os
from pathlib import Path
from mathstream import StreamNumber, add, mul, div, is_even, clear_logs
LOG_DIR = Path("instance/log")
def collatz_step(n, three, two, one):
return div(n, two) if is_even(n) else add(mul(n, three), one)
def draw_header(win, step, elapsed, avg_step, digits_len):
"""Render header above graph and panels."""
win.erase()
cols = curses.COLS - 1
bar = "" * cols
lines = [
f" Collatz (3n + 1) Streamed Viewer ",
f" Step: {step}",
f" Elapsed: {elapsed:8.2f}s | Avg/Step: {avg_step:8.5f}s",
f" Digits: {digits_len:,} | ↑↓ scroll number | PgUp/PgDn scroll log | q quit",
]
for i, line in enumerate(lines):
win.addstr(i, 0, line[:cols], curses.color_pair(1))
win.addstr(len(lines), 0, bar, curses.color_pair(2))
win.noutrefresh()
def draw_graph(win, graph_buf, direction_up, width):
"""Render a single-line graph: grey ░ for empty, colored █ for data."""
win.erase()
# Derive the actual width of the window so we never draw past its edge.
_, max_x = win.getmaxyx()
effective_width = max_x or width
padding = 3 # leave space for arrow and a small gap
cols = max(0, effective_width - padding)
# Clamp buffer to visible width
visible = graph_buf[-cols:] if len(graph_buf) > cols else graph_buf
fill_len = len(visible)
# arrow first
if effective_width > 0:
arrow = "" if direction_up else ""
arrow_color = curses.color_pair(6 if direction_up else 5)
try:
win.addstr(0, 0, arrow, arrow_color)
except curses.error:
pass
# draw filled section
for i, val in enumerate(visible):
col = padding + i
if col >= effective_width:
break
color = curses.color_pair(6 if val > 0 else 5)
try:
win.addstr(0, col, "", color)
except curses.error:
break
# fill remaining with ░
remaining = max(0, cols - fill_len)
fill_start = padding + fill_len
if remaining > 0 and fill_start < effective_width:
run = min(remaining, effective_width - fill_start)
if run > 0:
try:
win.addstr(0, fill_start, "" * run, curses.color_pair(7))
except curses.error:
pass
win.noutrefresh()
def draw_number(win, digits, scroll):
win.erase()
cols = curses.COLS
lines = [digits[i:i + cols - 1] for i in range(0, len(digits), cols - 1)]
max_lines = win.getmaxyx()[0]
scroll = max(0, min(scroll, max(0, len(lines) - max_lines)))
for i, chunk in enumerate(lines[scroll:scroll + max_lines]):
try:
win.addstr(i, 0, chunk, curses.color_pair(3))
except curses.error:
pass
win.noutrefresh()
return scroll
def draw_log_list(win, scroll):
win.erase()
if not LOG_DIR.exists():
LOG_DIR.mkdir(parents=True, exist_ok=True)
files = sorted(LOG_DIR.iterdir(), key=os.path.getmtime, reverse=True)
names = [f"{f.name}" for f in files]
max_lines = win.getmaxyx()[0]
scroll = max(0, min(scroll, max(0, len(names) - max_lines)))
for i, name in enumerate(names[scroll:scroll + max_lines]):
try:
win.addstr(i, 0, name[: curses.COLS - 1], curses.color_pair(4))
except curses.error:
pass
win.noutrefresh()
return scroll
def run_collatz(stdscr):
curses.curs_set(0)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_CYAN, -1) # header text
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN) # separator
curses.init_pair(3, curses.COLOR_WHITE, -1) # number
curses.init_pair(4, curses.COLOR_YELLOW, -1) # log list
curses.init_pair(5, curses.COLOR_RED, -1)
curses.init_pair(6, curses.COLOR_GREEN, -1)
curses.init_pair(7, curses.COLOR_WHITE, -1) # grey for empty
stdscr.nodelay(True)
stdscr.timeout(100)
start_file = Path("start.txt")
if not start_file.exists():
stdscr.addstr(0, 0, "Missing start.txt — please create one with your starting number.")
stdscr.refresh()
stdscr.getch()
return
clear_logs()
n = StreamNumber(start_file)
one, two, three = (StreamNumber(literal=s) for s in ("1", "2", "3"))
start_time = time.time()
step = 0
num_scroll = 0
log_scroll = 0
header_h = 5
graph_h = 1
num_h = (curses.LINES - header_h - graph_h) * 3 // 4
log_h = curses.LINES - header_h - graph_h - num_h - 1
num_win = curses.newwin(num_h, curses.COLS, header_h + graph_h + 1, 0)
graph_win = curses.newwin(graph_h, curses.COLS, header_h + 1, 0)
log_win = curses.newwin(log_h, curses.COLS, header_h + graph_h + num_h + 2, 0)
last_len = 0
graph_buf = []
while True:
step += 1
n = collatz_step(n, three, two, one)
digits = "".join(n.stream()) or "0"
cur_len = len(digits)
diff = cur_len - last_len
last_len = cur_len
graph_width = curses.COLS - 4
# Add new value to graph buffer
if diff != 0:
graph_buf.append(1 if diff > 0 else -1)
else:
graph_buf.append(0)
# Shift left if full
if len(graph_buf) > graph_width:
graph_buf = graph_buf[-graph_width:]
direction_up = diff >= 0
elapsed = time.time() - start_time
avg_step = elapsed / step if step else 0.0
draw_header(stdscr, step, elapsed, avg_step, len(digits))
draw_graph(graph_win, graph_buf, direction_up, curses.COLS)
num_scroll = draw_number(num_win, digits, num_scroll)
log_scroll = draw_log_list(log_win, log_scroll)
curses.doupdate()
ch = stdscr.getch()
if ch == ord("q"):
break
elif ch == curses.KEY_UP:
num_scroll = max(0, num_scroll - 1)
elif ch == curses.KEY_DOWN:
num_scroll += 1
elif ch == curses.KEY_PPAGE:
log_scroll = max(0, log_scroll - 3)
elif ch == curses.KEY_NPAGE:
log_scroll += 3
if digits == "1":
stdscr.nodelay(False)
stdscr.addstr(curses.LINES - 1, 0, "Reached 1 — press any key to exit.")
stdscr.refresh()
stdscr.getch()
break
def main():
curses.wrapper(run_collatz)
if __name__ == "__main__":
main()

1
start.txt Normal file
View File

@ -0,0 +1 @@
55569392576944383732069997790263232211253447162098935262971634652345115098934212633724484589756741539606575

31
test.py
View File

@ -15,6 +15,8 @@ from mathstream import (
clear_logs,
collect_garbage,
DivideByZeroError,
active_streams,
tracked_files,
)
NUMBERS_DIR = Path(__file__).parent / "tests"
@ -103,11 +105,34 @@ def main() -> None:
else:
raise AssertionError("mod by zero did not raise DivideByZeroError")
# manual frees should immediately drop staged files
staged = [
total,
difference,
product,
quotient,
powered,
modulus,
neg_mod_pos,
pos_mod_neg,
neg_mod_neg,
literal_combo,
]
for stream in staged:
stream.free()
literal_even.free()
literal_odd.free()
zero_literal.free()
check_bool("total freed file gone", total.path.exists(), False)
check_bool("literal_even freed file gone", literal_even.path.exists(), False)
removed = collect_garbage(0)
print(f"collect_garbage removed {len(removed)} files")
check_bool("total exists post GC", total.path.exists(), False)
check_bool("literal_even exists post GC", literal_even.path.exists(), False)
print(f"collect_garbage removed {len(removed)} files after manual free")
check_bool("huge operand persists", big.path.exists(), True)
print("Active streams:", active_streams())
print("Tracked files:", tracked_files())
if __name__ == "__main__":

View File

@ -0,0 +1 @@
993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887993284887