diff --git a/nut/__main__.py b/nut/__main__.py index ee4f07c..477d81f 100644 --- a/nut/__main__.py +++ b/nut/__main__.py @@ -1,27 +1,46 @@ -from .cli import CLI +from .cli import CLI, ParsedCommandInput def help_text(): return [ - "%bold%%cyan%nut CLI ... you expected a help menu? cute :3%reset%" + "%bold%%cyan%nut CLI: tiny shell, big squirrel energy :3%reset%", + "%magenta%Tip:%reset% try %bold%nut help run%reset% or %bold%nut run -d --target dev foo bar%reset%", ] -def main(): +def main() -> int: cli = CLI(help_callback=help_text) - cli.command("run") \ - .flag("debug", "d") \ - .handle(cmd_run) + cli.command( + "run", + help="Run with passthrough args", + description="Run accepts known flags and also forwards unknown trailing tokens.", + aliases=["r"], + passthrough=True, + ).flag("debug", "d", help="Enable command debug mode").option( + "target", + "t", + default="dev", + help="Execution target", + ).handle(cmd_run) - cli.command("build") \ - .handle(cmd_build) + cli.command( + "build", + help="Build output", + description="Build artifacts with optional release mode and output directory.", + aliases=["b"], + ).flag("release", "r", help="Enable release build").option( + "output", + "o", + default="dist", + help="Output directory", + ).arg("inputs", nargs="*", help="Optional input names").handle(cmd_build) return cli.run() -def cmd_run(args, flags): +def cmd_run(args, flags) -> int: print("RUN", args, flags) return 0 -def cmd_build(args, flags): +def cmd_build(args, flags) -> int: print("BUILD", args, flags) return 0 diff --git a/nut/cli.py b/nut/cli.py index df7d4ec..8d172ef 100644 --- a/nut/cli.py +++ b/nut/cli.py @@ -1,11 +1,48 @@ import argparse -from typing import Any, Callable, Iterable, Optional +import difflib +import sys +import traceback +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple -from colorama import Fore, Style, init +try: + from colorama import Fore, Style, init as _colorama_init +except Exception: # pragma: no cover - fallback for missing optional dependency + _COLORAMA_AVAILABLE = False -init(autoreset=True) + class _NoColor: # pragma: no cover - simple value holder for fallback + BLACK = "" + BLUE = "" + CYAN = "" + GREEN = "" + MAGENTA = "" + RED = "" + RESET_ALL = "" + WHITE = "" + YELLOW = "" + BRIGHT = "" + + Fore = _NoColor() # type: ignore[assignment] + Style = _NoColor() # type: ignore[assignment] + + def _colorama_init(**_: Any) -> None: + return None +else: + _COLORAMA_AVAILABLE = True + _colorama_init(autoreset=True) + +_COLOR_TOKENS = ( + "%red%", + "%green%", + "%yellow%", + "%blue%", + "%magenta%", + "%cyan%", + "%white%", + "%reset%", + "%bold%", +) -_INTERNAL_ARG_FIELDS = frozenset({"command", "func", "_args", "help"}) _COLOR_TOKEN_MAP = { "%red%": Fore.RED, "%green%": Fore.GREEN, @@ -18,6 +55,64 @@ _COLOR_TOKEN_MAP = { "%bold%": Style.BRIGHT, } +_NON_VALUE_ACTIONS = ( + argparse._StoreTrueAction, + argparse._StoreFalseAction, + argparse._CountAction, + argparse._StoreConstAction, + argparse._AppendConstAction, + argparse._HelpAction, +) + +_INTERNAL_NAMESPACE_FIELDS = frozenset({"command_help"}) +_RESERVED_COMMANDS = frozenset({"help"}) + +@dataclass +class ParsedCommandInput: + """Structured payload passed to parsed-mode handlers.""" + + command: str + invoked_as: str + args: List[Any] + flags: Dict[str, Any] + passthrough: List[str] + namespace: argparse.Namespace + argv: List[str] + +class CLIError(Exception): + exit_code = 2 + + def __init__(self, message: str, usage_parser: Optional[argparse.ArgumentParser] = None): + super().__init__(message) + self.message = message + self.usage_parser = usage_parser + +class CLIUsageError(CLIError): + pass + +class CLIConfigError(CLIError): + pass + +class CLIContractError(CLIError): + pass + +class _CLIArgumentParser(argparse.ArgumentParser): + def error(self, message: str) -> None: + raise CLIUsageError(message, usage_parser=self) + +@dataclass +class _CommandSpec: + name: str + parser: _CLIArgumentParser + help: Optional[str] + description: Optional[str] + aliases: List[str] = field(default_factory=list) + passthrough: bool = False + handler: Optional[Callable[..., Any]] = None + handler_mode: str = "simple" + positional_dests: List[str] = field(default_factory=list) + option_dests: List[str] = field(default_factory=list) + short_options: Dict[str, bool] = field(default_factory=dict) # True => takes value class CLI: def __init__( @@ -25,81 +120,454 @@ class CLI: prog: str = "nut", help_callback: Optional[Callable[[], Iterable[str]]] = None, ): - self.parser = argparse.ArgumentParser(prog=prog, add_help=False) - self.subparsers = self.parser.add_subparsers(dest="command") - + self.prog = prog self.help_callback = help_callback + self._color_enabled = True + self._cli_debug = False - self.parser.add_argument("-h", "--help", action="store_true") - - def command(self, name: str) -> "_Command": - command_parser = self.subparsers.add_parser(name, add_help=False) - return _Command(command_parser) - - def run(self) -> int: - parsed_args = self.parser.parse_args() - - if self._should_show_help(parsed_args): - self._print_help() - return 0 - - handler = getattr(parsed_args, "func", None) - if handler is None: - raise RuntimeError("No handler defined") - - args_list = list(getattr(parsed_args, "_args", [])) - - flags = { - k: v for k, v in vars(parsed_args).items() - if k not in _INTERNAL_ARG_FIELDS - } - - return handler(args_list, flags) - - def _should_show_help(self, parsed_args: argparse.Namespace) -> bool: - return bool( - getattr(parsed_args, "help", False) - or not getattr(parsed_args, "command", None) + self.parser = _CLIArgumentParser(prog=prog, add_help=False, allow_abbrev=False) + self.subparsers = self.parser.add_subparsers( + dest="command", + metavar="command", + parser_class=_CLIArgumentParser, ) - def _print_help(self) -> None: + self.parser.add_argument("-h", "--help", dest="global_help", action="store_true", help="Show help") + self.parser.add_argument( + "--cli-debug", + action="store_true", + help="Show tracebacks for CLI internals", + ) + self.parser.add_argument( + "--no-color", + action="store_true", + help="Disable colored output", + ) + + self._global_short_options = {"h": False} + + self._commands: Dict[str, _CommandSpec] = {} + self._command_lookup: Dict[str, str] = {} + + self._help_parser = self._create_help_command_parser() + + def command( + self, + name: str, + help: Optional[str] = None, + description: Optional[str] = None, + aliases: Optional[Sequence[str]] = None, + passthrough: bool = False, + ) -> "_Command": + normalized_aliases = list(aliases or []) + self._validate_command_registration(name, normalized_aliases) + + command_parser = self.subparsers.add_parser( + name, + add_help=False, + allow_abbrev=False, + help=help, + description=description, + aliases=normalized_aliases, + ) + command_parser.add_argument( + "-h", + "--help", + dest="command_help", + action="store_true", + help="Show help for this command", + ) + + spec = _CommandSpec( + name=name, + parser=command_parser, + help=help, + description=description, + aliases=normalized_aliases, + passthrough=bool(passthrough), + ) + + self._commands[name] = spec + self._register_command_lookup(name, name) + for alias in normalized_aliases: + self._register_command_lookup(alias, name) + + return _Command(spec) + + def run(self, argv: Optional[Sequence[str]] = None) -> int: + raw_argv = list(sys.argv[1:] if argv is None else argv) + self._cli_debug = "--cli-debug" in raw_argv + + try: + return self._run_internal(raw_argv) + except CLIError as exc: + return self._handle_framework_error(exc) + except Exception as exc: # pragma: no cover - defensive safety net + return self._handle_unexpected_error(exc) + + def _run_internal(self, raw_argv: List[str]) -> int: + global_tokens, command_token, command_tokens = self._split_global_and_command_tokens(raw_argv) + global_tokens = self._expand_short_flag_clusters( + global_tokens, + self._global_short_options, + context="global options", + ) + + global_args = self.parser.parse_args(global_tokens) + self._cli_debug = bool(global_args.cli_debug) + self._color_enabled = _COLORAMA_AVAILABLE and (not bool(global_args.no_color)) + + if bool(global_args.global_help): + self._print_top_help() + return 0 + + if command_token is None: + self._print_top_help() + return 0 + + if command_token == "help": + return self._run_help_command(command_tokens) + + canonical_name = self._resolve_command_name(command_token) + command_spec = self._commands[canonical_name] + return self._run_command(command_spec, command_token, command_tokens) + + def _run_help_command(self, command_tokens: List[str]) -> int: + help_args = self._help_parser.parse_args(command_tokens) + + if bool(help_args.command_help): + self._print_help_for_help_command() + return 0 + + topic = getattr(help_args, "topic", None) + if topic is None: + self._print_top_help() + return 0 + + canonical_name = self._resolve_command_name(topic) + spec = self._commands[canonical_name] + self._print_command_help(spec) + return 0 + + def _run_command(self, spec: _CommandSpec, invoked_as: str, command_tokens: List[str]) -> int: + expanded_tokens = self._expand_short_flag_clusters( + command_tokens, + spec.short_options, + context="command '{}'".format(spec.name), + ) + + if spec.passthrough: + namespace, passthrough = spec.parser.parse_known_args(expanded_tokens) + else: + namespace = spec.parser.parse_args(expanded_tokens) + passthrough = [] + + if bool(getattr(namespace, "command_help", False)): + self._print_command_help(spec) + return 0 + + if spec.handler is None: + raise CLIConfigError("No handler defined for command '{}'".format(spec.name)) + + positional_args = self._collect_positional_args(namespace, spec) + flag_values = {dest: getattr(namespace, dest) for dest in spec.option_dests} + + if spec.handler_mode == "simple": + runtime_args = list(positional_args) + runtime_args.extend(passthrough) + result = spec.handler(runtime_args, flag_values) + else: + parsed_payload = ParsedCommandInput( + command=spec.name, + invoked_as=invoked_as, + args=positional_args, + flags=flag_values, + passthrough=list(passthrough), + namespace=namespace, + argv=list(command_tokens), + ) + result = spec.handler(parsed_payload) + + return self._normalize_exit_code(result, spec.name) + + def _normalize_exit_code(self, value: Any, command_name: str) -> int: + if not isinstance(value, int): + raise CLIContractError( + "Command '{}' must return an int exit code, got {}".format( + command_name, + type(value).__name__, + ) + ) + return value + + def _split_global_and_command_tokens( + self, argv: List[str] + ) -> Tuple[List[str], Optional[str], List[str]]: + global_tokens: List[str] = [] + command_token: Optional[str] = None + command_tokens: List[str] = [] + + index = 0 + while index < len(argv): + token = argv[index] + if token == "--": + global_tokens.append(token) + index += 1 + break + if token.startswith("-") and token != "-": + global_tokens.append(token) + index += 1 + continue + break + + if index < len(argv): + command_token = argv[index] + command_tokens = argv[index + 1 :] + + return global_tokens, command_token, command_tokens + + def _resolve_command_name(self, token: str) -> str: + canonical = self._command_lookup.get(token) + if canonical is not None: + return canonical + + available = sorted(self._command_lookup.keys()) + suggestions = difflib.get_close_matches(token, available, n=3, cutoff=0.45) + + lines = ["Unknown command '{}'".format(token)] + if suggestions: + lines.append("Did you mean: {}".format(", ".join(suggestions))) + lines.append("Run '{} -h' to see available commands.".format(self.prog)) + + raise CLIUsageError("\n".join(lines), usage_parser=self.parser) + + def _expand_short_flag_clusters( + self, + tokens: List[str], + short_options: Dict[str, bool], + context: str, + ) -> List[str]: + expanded: List[str] = [] + + for token in tokens: + if token == "--": + expanded.append(token) + continue + + if not token.startswith("-") or token.startswith("--") or len(token) <= 2: + expanded.append(token) + continue + + cluster = token[1:] + + if any(ch.isdigit() for ch in cluster): + expanded.append(token) + continue + + known = [ch for ch in cluster if ch in short_options] + if not known: + expanded.append(token) + continue + + if any(short_options.get(ch, False) for ch in cluster if ch in short_options): + bad = [ch for ch in cluster if short_options.get(ch, False)] + raise CLIUsageError( + "Cannot group short option(s) {} in {} because they require values: '{}'".format( + ", ".join("-{}".format(ch) for ch in bad), + context, + token, + ) + ) + + unknown = [ch for ch in cluster if ch not in short_options] + if unknown: + expanded.append(token) + continue + + expanded.extend("-{}".format(ch) for ch in cluster) + + return expanded + + def _collect_positional_args(self, namespace: argparse.Namespace, spec: _CommandSpec) -> List[Any]: + values: List[Any] = [] + for dest in spec.positional_dests: + value = getattr(namespace, dest) + if value is None: + continue + if isinstance(value, list): + values.extend(value) + continue + values.append(value) + return values + + def _handle_framework_error(self, exc: CLIError) -> int: + if self._cli_debug: + traceback.print_exc() + else: + self._print_error_message(exc.message) + if exc.usage_parser is not None: + usage = exc.usage_parser.format_usage().strip() + if usage: + self._print_plain(usage) + return exc.exit_code + + def _handle_unexpected_error(self, exc: Exception) -> int: + if self._cli_debug: + traceback.print_exc() + else: + self._print_error_message(str(exc)) + return 1 + + def _validate_command_registration(self, name: str, aliases: Sequence[str]) -> None: + self._validate_command_name(name) + + if name in _RESERVED_COMMANDS: + raise CLIConfigError("'{}' is reserved by the CLI".format(name)) + + for alias in aliases: + self._validate_command_name(alias) + if alias in _RESERVED_COMMANDS: + raise CLIConfigError("Alias '{}' is reserved by the CLI".format(alias)) + + tokens = [name] + list(aliases) + for token in tokens: + if token in self._command_lookup: + raise CLIConfigError("Duplicate command or alias '{}'".format(token)) + + def _validate_command_name(self, name: str) -> None: + if not name: + raise CLIConfigError("Command name cannot be empty") + if name.startswith("-"): + raise CLIConfigError("Command name cannot start with '-': '{}'".format(name)) + + def _register_command_lookup(self, token: str, canonical_name: str) -> None: + self._command_lookup[token] = canonical_name + + def _create_help_command_parser(self) -> _CLIArgumentParser: + help_parser = self.subparsers.add_parser( + "help", + add_help=False, + allow_abbrev=False, + help="Show help for commands", + description="Show help for commands", + ) + help_parser.add_argument("topic", nargs="?", help="Command to show help for") + help_parser.add_argument( + "-h", + "--help", + dest="command_help", + action="store_true", + help="Show help for the help command", + ) + return help_parser + + def _print_banner(self) -> None: if self.help_callback is None: - self.parser.print_help() return for line in self.help_callback(): - print(self._color(str(line))) + self._print_colored(str(line)) - def _color(self, text: str) -> str: - for key, value in _COLOR_TOKEN_MAP.items(): - text = text.replace(key, value) + def _print_top_help(self) -> None: + self._print_banner() + self._print_plain(self.parser.format_help().rstrip()) + def _print_help_for_help_command(self) -> None: + self._print_banner() + self._print_plain(self._help_parser.format_help().rstrip()) + + def _print_command_help(self, spec: _CommandSpec) -> None: + self._print_banner() + self._print_plain(spec.parser.format_help().rstrip()) + + def _print_error_message(self, message: str) -> None: + self._print_colored("%red%error:%reset% {}".format(message)) + + def _print_colored(self, text: str) -> None: + self._print_plain(self._render_colors(text)) + + def _print_plain(self, text: str) -> None: + if text: + print(text) + + def _render_colors(self, text: str) -> str: + if not self._color_enabled: + for token in _COLOR_TOKENS: + text = text.replace(token, "") + return text + + for token, ansi_code in _COLOR_TOKEN_MAP.items(): + text = text.replace(token, ansi_code) return text - class _Command: - def __init__(self, parser: argparse.ArgumentParser): - self.parser = parser - self.parser.set_defaults(func=self._missing_handler) - - self.parser.add_argument("_args", nargs="*") - self.parser.add_argument("-h", "--help", action="store_true") + def __init__(self, spec: _CommandSpec): + self.spec = spec + self.parser = spec.parser def arg(self, name: str, **kwargs: Any) -> "_Command": - self.parser.add_argument(name, **kwargs) + action = self.parser.add_argument(name, **kwargs) + self._track_action(action) return self - def flag(self, name: str, short: Optional[str] = None) -> "_Command": - opts = [f"--{name}"] - if short: - opts.append(f"-{short.lstrip('-')}") + def flag(self, name: str, short: Optional[str] = None, **kwargs: Any) -> "_Command": + options = [self._normalize_long_option(name)] + if short is not None: + short_char = self._validate_short_option(short) + options.append("-{}".format(short_char)) - self.parser.add_argument(*opts, action="store_true") + action = self.parser.add_argument(*options, action="store_true", **kwargs) + self._track_action(action) return self - def handle(self, func: Callable[..., int]) -> "_Command": - self.parser.set_defaults(func=func) + def option(self, name: str, short: Optional[str] = None, **kwargs: Any) -> "_Command": + options = [self._normalize_long_option(name)] + if short is not None: + short_char = self._validate_short_option(short) + options.append("-{}".format(short_char)) + + action = self.parser.add_argument(*options, **kwargs) + self._track_action(action) return self - def _missing_handler(self, *_: Any) -> int: - raise RuntimeError("No handler defined") + def handle(self, func: Callable[..., Any], mode: str = "simple") -> "_Command": + if mode not in {"simple", "parsed"}: + raise CLIConfigError("Unknown handler mode '{}'".format(mode)) + + self.spec.handler = func + self.spec.handler_mode = mode + return self + + def _track_action(self, action: argparse.Action) -> None: + if action.dest in _INTERNAL_NAMESPACE_FIELDS: + return + + if action.option_strings: + if action.dest not in self.spec.option_dests: + self.spec.option_dests.append(action.dest) + self._track_short_options(action) + return + + if action.dest not in self.spec.positional_dests: + self.spec.positional_dests.append(action.dest) + + def _track_short_options(self, action: argparse.Action) -> None: + takes_value = self._action_takes_value(action) + + for option in action.option_strings: + if option.startswith("--"): + continue + if option.startswith("-") and len(option) == 2: + self.spec.short_options[option[1]] = takes_value + + def _action_takes_value(self, action: argparse.Action) -> bool: + return not isinstance(action, _NON_VALUE_ACTIONS) + + def _normalize_long_option(self, name: str) -> str: + return name if name.startswith("--") else "--{}".format(name) + + def _validate_short_option(self, short: str) -> str: + value = short.lstrip("-") + if len(value) != 1: + raise CLIConfigError("Short option must be exactly one character, got '{}'".format(short)) + return value