nut/chipnut/cli.py
2026-04-04 12:02:15 +02:00

574 lines
18 KiB
Python

import argparse
import difflib
import sys
import traceback
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple
try:
from colorama import Fore, Style, init as _colorama_init
except Exception: # pragma: no cover - fallback for missing optional dependency
_COLORAMA_AVAILABLE = False
class _NoColor: # pragma: no cover - simple value holder for fallback
BLACK = ""
BLUE = ""
CYAN = ""
GREEN = ""
MAGENTA = ""
RED = ""
RESET_ALL = ""
WHITE = ""
YELLOW = ""
BRIGHT = ""
Fore = _NoColor() # type: ignore[assignment]
Style = _NoColor() # type: ignore[assignment]
def _colorama_init(**_: Any) -> None:
return None
else:
_COLORAMA_AVAILABLE = True
_colorama_init(autoreset=True)
_COLOR_TOKENS = (
"%red%",
"%green%",
"%yellow%",
"%blue%",
"%magenta%",
"%cyan%",
"%white%",
"%reset%",
"%bold%",
)
_COLOR_TOKEN_MAP = {
"%red%": Fore.RED,
"%green%": Fore.GREEN,
"%yellow%": Fore.YELLOW,
"%blue%": Fore.BLUE,
"%magenta%": Fore.MAGENTA,
"%cyan%": Fore.CYAN,
"%white%": Fore.WHITE,
"%reset%": Style.RESET_ALL,
"%bold%": Style.BRIGHT,
}
_NON_VALUE_ACTIONS = (
argparse._StoreTrueAction,
argparse._StoreFalseAction,
argparse._CountAction,
argparse._StoreConstAction,
argparse._AppendConstAction,
argparse._HelpAction,
)
_INTERNAL_NAMESPACE_FIELDS = frozenset({"command_help"})
_RESERVED_COMMANDS = frozenset({"help"})
@dataclass
class ParsedCommandInput:
"""Structured payload passed to parsed-mode handlers."""
command: str
invoked_as: str
args: List[Any]
flags: Dict[str, Any]
passthrough: List[str]
namespace: argparse.Namespace
argv: List[str]
class CLIError(Exception):
exit_code = 2
def __init__(self, message: str, usage_parser: Optional[argparse.ArgumentParser] = None):
super().__init__(message)
self.message = message
self.usage_parser = usage_parser
class CLIUsageError(CLIError):
pass
class CLIConfigError(CLIError):
pass
class CLIContractError(CLIError):
pass
class _CLIArgumentParser(argparse.ArgumentParser):
def error(self, message: str) -> None:
raise CLIUsageError(message, usage_parser=self)
@dataclass
class _CommandSpec:
name: str
parser: _CLIArgumentParser
help: Optional[str]
description: Optional[str]
aliases: List[str] = field(default_factory=list)
passthrough: bool = False
handler: Optional[Callable[..., Any]] = None
handler_mode: str = "simple"
positional_dests: List[str] = field(default_factory=list)
option_dests: List[str] = field(default_factory=list)
short_options: Dict[str, bool] = field(default_factory=dict) # True => takes value
class CLI:
def __init__(
self,
prog: str = "chipnut",
help_callback: Optional[Callable[[], Iterable[str]]] = None,
):
self.prog = prog
self.help_callback = help_callback
self._color_enabled = True
self._cli_debug = False
self.parser = _CLIArgumentParser(prog=prog, add_help=False, allow_abbrev=False)
self.subparsers = self.parser.add_subparsers(
dest="command",
metavar="command",
parser_class=_CLIArgumentParser,
)
self.parser.add_argument("-h", "--help", dest="global_help", action="store_true", help="Show help")
self.parser.add_argument(
"--cli-debug",
action="store_true",
help="Show tracebacks for CLI internals",
)
self.parser.add_argument(
"--no-color",
action="store_true",
help="Disable colored output",
)
self._global_short_options = {"h": False}
self._commands: Dict[str, _CommandSpec] = {}
self._command_lookup: Dict[str, str] = {}
self._help_parser = self._create_help_command_parser()
def command(
self,
name: str,
help: Optional[str] = None,
description: Optional[str] = None,
aliases: Optional[Sequence[str]] = None,
passthrough: bool = False,
) -> "_Command":
normalized_aliases = list(aliases or [])
self._validate_command_registration(name, normalized_aliases)
command_parser = self.subparsers.add_parser(
name,
add_help=False,
allow_abbrev=False,
help=help,
description=description,
aliases=normalized_aliases,
)
command_parser.add_argument(
"-h",
"--help",
dest="command_help",
action="store_true",
help="Show help for this command",
)
spec = _CommandSpec(
name=name,
parser=command_parser,
help=help,
description=description,
aliases=normalized_aliases,
passthrough=bool(passthrough),
)
self._commands[name] = spec
self._register_command_lookup(name, name)
for alias in normalized_aliases:
self._register_command_lookup(alias, name)
return _Command(spec)
def run(self, argv: Optional[Sequence[str]] = None) -> int:
raw_argv = list(sys.argv[1:] if argv is None else argv)
self._cli_debug = "--cli-debug" in raw_argv
try:
return self._run_internal(raw_argv)
except CLIError as exc:
return self._handle_framework_error(exc)
except Exception as exc: # pragma: no cover - defensive safety net
return self._handle_unexpected_error(exc)
def _run_internal(self, raw_argv: List[str]) -> int:
global_tokens, command_token, command_tokens = self._split_global_and_command_tokens(raw_argv)
global_tokens = self._expand_short_flag_clusters(
global_tokens,
self._global_short_options,
context="global options",
)
global_args = self.parser.parse_args(global_tokens)
self._cli_debug = bool(global_args.cli_debug)
self._color_enabled = _COLORAMA_AVAILABLE and (not bool(global_args.no_color))
if bool(global_args.global_help):
self._print_top_help()
return 0
if command_token is None:
self._print_top_help()
return 0
if command_token == "help":
return self._run_help_command(command_tokens)
canonical_name = self._resolve_command_name(command_token)
command_spec = self._commands[canonical_name]
return self._run_command(command_spec, command_token, command_tokens)
def _run_help_command(self, command_tokens: List[str]) -> int:
help_args = self._help_parser.parse_args(command_tokens)
if bool(help_args.command_help):
self._print_help_for_help_command()
return 0
topic = getattr(help_args, "topic", None)
if topic is None:
self._print_top_help()
return 0
canonical_name = self._resolve_command_name(topic)
spec = self._commands[canonical_name]
self._print_command_help(spec)
return 0
def _run_command(self, spec: _CommandSpec, invoked_as: str, command_tokens: List[str]) -> int:
expanded_tokens = self._expand_short_flag_clusters(
command_tokens,
spec.short_options,
context="command '{}'".format(spec.name),
)
if spec.passthrough:
namespace, passthrough = spec.parser.parse_known_args(expanded_tokens)
else:
namespace = spec.parser.parse_args(expanded_tokens)
passthrough = []
if bool(getattr(namespace, "command_help", False)):
self._print_command_help(spec)
return 0
if spec.handler is None:
raise CLIConfigError("No handler defined for command '{}'".format(spec.name))
positional_args = self._collect_positional_args(namespace, spec)
flag_values = {dest: getattr(namespace, dest) for dest in spec.option_dests}
if spec.handler_mode == "simple":
runtime_args = list(positional_args)
runtime_args.extend(passthrough)
result = spec.handler(runtime_args, flag_values)
else:
parsed_payload = ParsedCommandInput(
command=spec.name,
invoked_as=invoked_as,
args=positional_args,
flags=flag_values,
passthrough=list(passthrough),
namespace=namespace,
argv=list(command_tokens),
)
result = spec.handler(parsed_payload)
return self._normalize_exit_code(result, spec.name)
def _normalize_exit_code(self, value: Any, command_name: str) -> int:
if not isinstance(value, int):
raise CLIContractError(
"Command '{}' must return an int exit code, got {}".format(
command_name,
type(value).__name__,
)
)
return value
def _split_global_and_command_tokens(
self, argv: List[str]
) -> Tuple[List[str], Optional[str], List[str]]:
global_tokens: List[str] = []
command_token: Optional[str] = None
command_tokens: List[str] = []
index = 0
while index < len(argv):
token = argv[index]
if token == "--":
global_tokens.append(token)
index += 1
break
if token.startswith("-") and token != "-":
global_tokens.append(token)
index += 1
continue
break
if index < len(argv):
command_token = argv[index]
command_tokens = argv[index + 1 :]
return global_tokens, command_token, command_tokens
def _resolve_command_name(self, token: str) -> str:
canonical = self._command_lookup.get(token)
if canonical is not None:
return canonical
available = sorted(self._command_lookup.keys())
suggestions = difflib.get_close_matches(token, available, n=3, cutoff=0.45)
lines = ["Unknown command '{}'".format(token)]
if suggestions:
lines.append("Did you mean: {}".format(", ".join(suggestions)))
lines.append("Run '{} -h' to see available commands.".format(self.prog))
raise CLIUsageError("\n".join(lines), usage_parser=self.parser)
def _expand_short_flag_clusters(
self,
tokens: List[str],
short_options: Dict[str, bool],
context: str,
) -> List[str]:
expanded: List[str] = []
for token in tokens:
if token == "--":
expanded.append(token)
continue
if not token.startswith("-") or token.startswith("--") or len(token) <= 2:
expanded.append(token)
continue
cluster = token[1:]
if any(ch.isdigit() for ch in cluster):
expanded.append(token)
continue
known = [ch for ch in cluster if ch in short_options]
if not known:
expanded.append(token)
continue
if any(short_options.get(ch, False) for ch in cluster if ch in short_options):
bad = [ch for ch in cluster if short_options.get(ch, False)]
raise CLIUsageError(
"Cannot group short option(s) {} in {} because they require values: '{}'".format(
", ".join("-{}".format(ch) for ch in bad),
context,
token,
)
)
unknown = [ch for ch in cluster if ch not in short_options]
if unknown:
expanded.append(token)
continue
expanded.extend("-{}".format(ch) for ch in cluster)
return expanded
def _collect_positional_args(self, namespace: argparse.Namespace, spec: _CommandSpec) -> List[Any]:
values: List[Any] = []
for dest in spec.positional_dests:
value = getattr(namespace, dest)
if value is None:
continue
if isinstance(value, list):
values.extend(value)
continue
values.append(value)
return values
def _handle_framework_error(self, exc: CLIError) -> int:
if self._cli_debug:
traceback.print_exc()
else:
self._print_error_message(exc.message)
if exc.usage_parser is not None:
usage = exc.usage_parser.format_usage().strip()
if usage:
self._print_plain(usage)
return exc.exit_code
def _handle_unexpected_error(self, exc: Exception) -> int:
if self._cli_debug:
traceback.print_exc()
else:
self._print_error_message(str(exc))
return 1
def _validate_command_registration(self, name: str, aliases: Sequence[str]) -> None:
self._validate_command_name(name)
if name in _RESERVED_COMMANDS:
raise CLIConfigError("'{}' is reserved by the CLI".format(name))
for alias in aliases:
self._validate_command_name(alias)
if alias in _RESERVED_COMMANDS:
raise CLIConfigError("Alias '{}' is reserved by the CLI".format(alias))
tokens = [name] + list(aliases)
for token in tokens:
if token in self._command_lookup:
raise CLIConfigError("Duplicate command or alias '{}'".format(token))
def _validate_command_name(self, name: str) -> None:
if not name:
raise CLIConfigError("Command name cannot be empty")
if name.startswith("-"):
raise CLIConfigError("Command name cannot start with '-': '{}'".format(name))
def _register_command_lookup(self, token: str, canonical_name: str) -> None:
self._command_lookup[token] = canonical_name
def _create_help_command_parser(self) -> _CLIArgumentParser:
help_parser = self.subparsers.add_parser(
"help",
add_help=False,
allow_abbrev=False,
help="Show help for commands",
description="Show help for commands",
)
help_parser.add_argument("topic", nargs="?", help="Command to show help for")
help_parser.add_argument(
"-h",
"--help",
dest="command_help",
action="store_true",
help="Show help for the help command",
)
return help_parser
def _print_banner(self) -> None:
if self.help_callback is None:
return
for line in self.help_callback():
self._print_colored(str(line))
def _print_top_help(self) -> None:
self._print_banner()
self._print_plain(self.parser.format_help().rstrip())
def _print_help_for_help_command(self) -> None:
self._print_banner()
self._print_plain(self._help_parser.format_help().rstrip())
def _print_command_help(self, spec: _CommandSpec) -> None:
self._print_banner()
self._print_plain(spec.parser.format_help().rstrip())
def _print_error_message(self, message: str) -> None:
self._print_colored("%red%error:%reset% {}".format(message))
def _print_colored(self, text: str) -> None:
self._print_plain(self._render_colors(text))
def _print_plain(self, text: str) -> None:
if text:
print(text)
def _render_colors(self, text: str) -> str:
if not self._color_enabled:
for token in _COLOR_TOKENS:
text = text.replace(token, "")
return text
for token, ansi_code in _COLOR_TOKEN_MAP.items():
text = text.replace(token, ansi_code)
return text
class _Command:
def __init__(self, spec: _CommandSpec):
self.spec = spec
self.parser = spec.parser
def arg(self, name: str, **kwargs: Any) -> "_Command":
action = self.parser.add_argument(name, **kwargs)
self._track_action(action)
return self
def flag(self, name: str, short: Optional[str] = None, **kwargs: Any) -> "_Command":
options = [self._normalize_long_option(name)]
if short is not None:
short_char = self._validate_short_option(short)
options.append("-{}".format(short_char))
action = self.parser.add_argument(*options, action="store_true", **kwargs)
self._track_action(action)
return self
def option(self, name: str, short: Optional[str] = None, **kwargs: Any) -> "_Command":
options = [self._normalize_long_option(name)]
if short is not None:
short_char = self._validate_short_option(short)
options.append("-{}".format(short_char))
action = self.parser.add_argument(*options, **kwargs)
self._track_action(action)
return self
def handle(self, func: Callable[..., Any], mode: str = "simple") -> "_Command":
if mode not in {"simple", "parsed"}:
raise CLIConfigError("Unknown handler mode '{}'".format(mode))
self.spec.handler = func
self.spec.handler_mode = mode
return self
def _track_action(self, action: argparse.Action) -> None:
if action.dest in _INTERNAL_NAMESPACE_FIELDS:
return
if action.option_strings:
if action.dest not in self.spec.option_dests:
self.spec.option_dests.append(action.dest)
self._track_short_options(action)
return
if action.dest not in self.spec.positional_dests:
self.spec.positional_dests.append(action.dest)
def _track_short_options(self, action: argparse.Action) -> None:
takes_value = self._action_takes_value(action)
for option in action.option_strings:
if option.startswith("--"):
continue
if option.startswith("-") and len(option) == 2:
self.spec.short_options[option[1]] = takes_value
def _action_takes_value(self, action: argparse.Action) -> bool:
return not isinstance(action, _NON_VALUE_ACTIONS)
def _normalize_long_option(self, name: str) -> str:
return name if name.startswith("--") else "--{}".format(name)
def _validate_short_option(self, short: str) -> str:
value = short.lstrip("-")
if len(value) != 1:
raise CLIConfigError("Short option must be exactly one character, got '{}'".format(short))
return value