Coverage for flowr / __main__.py: 100%
340 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
1"""CLI entrypoint for flowr — invoked via `python -m flowr`."""
3import argparse
4import importlib.metadata
5import sys
6from pathlib import Path
7from typing import Any
9from flowr.cli.output import format_json, format_text
10from flowr.cli.resolution import DefaultFlowNameResolver, FlowNameNotFoundError
11from flowr.cli.session_cmd import (
12 add_session_parser,
13 cmd_session_init,
14 cmd_session_list,
15 cmd_session_set_state,
16 cmd_session_show,
17)
18from flowr.domain.condition import evaluate_condition, parse_condition
19from flowr.domain.flow_definition import Flow, State, Transition
20from flowr.domain.loader import FlowParseError, load_flow_from_file, resolve_subflows
21from flowr.domain.mermaid import to_mermaid
22from flowr.domain.session import Session, SessionStackFrame
23from flowr.domain.validation import validate
24from flowr.infrastructure.config import (
25 FlowrConfig,
26 resolve_config,
27 resolve_config_with_sources,
28)
29from flowr.infrastructure.session_store import (
30 SessionNameNotFoundError,
31 SessionNotFoundError,
32 YamlSessionStore,
33)
36def build_parser() -> argparse.ArgumentParser:
37 """Build and return the argument parser.
39 Returns:
40 The configured ArgumentParser instance.
41 """
42 meta = importlib.metadata.metadata("flowr")
43 parser = argparse.ArgumentParser(
44 prog="flowr",
45 description=meta["Summary"],
46 )
47 parser.add_argument(
48 "--version",
49 action="version",
50 version=f"flowr {meta['Version']}",
51 )
52 parser.add_argument(
53 "--flows-dir",
54 dest="flows_dir",
55 default=None,
56 metavar="DIR",
57 help="Override the configured flows directory for this invocation",
58 )
59 _add_subcommands(parser)
60 return parser
63def _add_flow_args(parser: argparse.ArgumentParser) -> None:
64 """Add common args: flow file path and --json flag."""
65 parser.add_argument("flow_file", help="Path to flow YAML file or flow name")
66 parser.add_argument("--json", action="store_true", dest="json_output")
69def _add_evidence_args(parser: argparse.ArgumentParser) -> None:
70 """Add evidence input args."""
71 parser.add_argument(
72 "--evidence",
73 action="append",
74 default=[],
75 metavar="KEY=VALUE",
76 help="Evidence key=value pairs",
77 )
78 parser.add_argument(
79 "--evidence-json",
80 default=None,
81 metavar="JSON",
82 help="JSON evidence object",
83 )
86def _parse_evidence(args: argparse.Namespace) -> dict[str, str]:
87 """Parse evidence from CLI args into a dict.
89 Returns:
90 Dictionary mapping evidence keys to their string values.
91 """
92 evidence: dict[str, str] = {}
93 for pair in args.evidence:
94 key, _, value = pair.partition("=")
95 evidence[key] = value
96 if args.evidence_json is not None:
97 import json
99 data = json.loads(args.evidence_json)
100 for k, v in data.items():
101 evidence[k] = str(v)
102 return evidence
105def _add_subcommands(parser: argparse.ArgumentParser) -> None:
106 """Add all CLI subcommands."""
107 sub = parser.add_subparsers(dest="command")
109 # validate
110 p_validate = sub.add_parser("validate", help="Validate a flow definition")
111 _add_flow_args(p_validate)
113 # states
114 p_states = sub.add_parser("states", help="List all states in a flow")
115 _add_flow_args(p_states)
117 # check
118 p_check = sub.add_parser("check", help="Check a state or transition conditions")
119 p_check.add_argument(
120 "flow_file",
121 nargs="?",
122 default=None,
123 help="Path to flow YAML file or flow name (required unless --session)",
124 )
125 p_check.add_argument("--json", action="store_true", dest="json_output")
126 p_check.add_argument(
127 "state_id",
128 nargs="?",
129 default=None,
130 help="State id to inspect",
131 )
132 p_check.add_argument(
133 "target",
134 nargs="?",
135 default=None,
136 help="Target to check conditions for",
137 )
138 p_check.add_argument(
139 "--session",
140 nargs="?",
141 const="__default__",
142 default=None,
143 dest="session",
144 metavar="NAME",
145 help="Use session name or file path to resolve flow/state (read-only)",
146 )
148 # next
149 p_next = sub.add_parser("next", help="Show valid next transitions")
150 p_next.add_argument(
151 "flow_file",
152 nargs="?",
153 default=None,
154 help="Path to flow YAML file or flow name (required unless --session)",
155 )
156 p_next.add_argument("--json", action="store_true", dest="json_output")
157 p_next.add_argument("state_id", nargs="?", default=None, help="Current state id")
158 _add_evidence_args(p_next)
159 p_next.add_argument(
160 "--session",
161 nargs="?",
162 const="__default__",
163 default=None,
164 dest="session",
165 metavar="NAME",
166 help="Use session name or file path to resolve flow/state (read-only)",
167 )
169 # transition
170 p_transition = sub.add_parser("transition", help="Compute next state")
171 p_transition.add_argument(
172 "positional",
173 nargs="*",
174 help="Args: <flow> <state> <trigger> or <trigger> with --session",
175 )
176 p_transition.add_argument("--json", action="store_true", dest="json_output")
177 _add_evidence_args(p_transition)
178 p_transition.add_argument(
179 "--session",
180 nargs="?",
181 const="__default__",
182 default=None,
183 dest="session",
184 metavar="NAME",
185 help="Use session to resolve flow/state; auto-update after transition",
186 )
188 # config
189 p_config = sub.add_parser("config", help="Show resolved configuration")
190 p_config.add_argument(
191 "--json",
192 action="store_true",
193 dest="json_output",
194 help="Output as JSON",
195 )
197 # mermaid
198 p_mermaid = sub.add_parser("mermaid", help="Export as Mermaid diagram")
199 _add_flow_args(p_mermaid)
201 # session
202 add_session_parser(sub)
205def _cmd_validate(args: argparse.Namespace) -> int:
206 """Run validate subcommand.
208 Returns:
209 Exit code: 0 if valid, 1 if invalid.
210 """
211 flow = load_flow_from_file(args.flow_file)
212 all_flows = resolve_subflows(flow, args.flow_file)
213 result = validate(flow, all_flows if len(all_flows) > 1 else None)
214 output: dict[str, Any] = {
215 "valid": result.is_valid,
216 "violations": [],
217 }
218 for v in result.violations:
219 output["violations"].append(
220 {
221 "severity": v.severity.value,
222 "message": v.message,
223 "location": v.location,
224 }
225 )
226 if args.json_output:
227 print(format_json(output)) # noqa: T201
228 else:
229 print(format_text(output)) # noqa: T201
230 return 0 if result.is_valid else 1
233def _cmd_states(args: argparse.Namespace) -> int:
234 """Run states subcommand.
236 Returns:
237 Exit code: 0 on success.
238 """
239 flow = load_flow_from_file(args.flow_file)
240 state_ids = [s.id for s in flow.states]
241 if args.json_output:
242 print(format_json(state_ids)) # noqa: T201
243 else:
244 for sid in state_ids:
245 print(sid) # noqa: T201
246 return 0
249def _cmd_check(
250 args: argparse.Namespace,
251) -> int:
252 """Run check subcommand.
254 Returns:
255 Exit code: 0 on success, 1 on error.
256 """
257 if args.flow_file is None:
258 _error("flow_file is required when not using --session")
259 return 2
260 if args.state_id is None:
261 _error("state_id is required when not using --session")
262 return 2
263 flow = load_flow_from_file(args.flow_file)
264 state = _find_state(flow, args.state_id)
265 if state is None:
266 _error(f"State '{args.state_id}' not found")
267 return 1
268 if args.target is not None:
269 return _cmd_check_conditions(flow, state, args)
270 return _cmd_check_state(flow, state, args)
273def _cmd_check_state(_flow: Flow, state: State, args: argparse.Namespace) -> int:
274 """Show state details.
276 Returns:
277 Exit code: 0 on success.
278 """
279 output: dict[str, Any] = {"id": state.id}
280 if state.attrs:
281 output["attrs"] = state.attrs
282 if state.flow:
283 output["flow"] = state.flow
284 transitions = list(state.next.keys())
285 output["transitions"] = transitions
286 if args.json_output:
287 print(format_json(output)) # noqa: T201
288 else:
289 print(format_text(output)) # noqa: T201
290 return 0
293def _cmd_check_conditions(_flow: Flow, state: State, args: argparse.Namespace) -> int:
294 """Show conditions for a specific transition target.
296 Returns:
297 Exit code: 0 on success, 1 if target not found.
298 """
299 transition = state.next.get(args.target)
300 if transition is None:
301 _error(f"Transition target '{args.target}' not found in state '{state.id}'")
302 return 1
303 output: dict[str, Any] = {
304 "from": state.id,
305 "target": args.target,
306 }
307 if transition.conditions:
308 output["conditions"] = transition.conditions.conditions
309 else:
310 output["conditions"] = "(none)"
311 if args.json_output:
312 print(format_json(output)) # noqa: T201
313 else:
314 print(format_text(output)) # noqa: T201
315 return 0
318def _cmd_next(args: argparse.Namespace) -> int:
319 """Run next subcommand.
321 Returns:
322 Exit code: 0 on success, 1 if state not found.
323 """
324 if args.flow_file is None:
325 _error("flow_file is required when not using --session")
326 return 2
327 if args.state_id is None:
328 _error("state_id is required when not using --session")
329 return 2
330 flow = load_flow_from_file(args.flow_file)
331 state = _find_state(flow, args.state_id)
332 if state is None:
333 _error(f"State '{args.state_id}' not found")
334 return 1
335 evidence = _parse_evidence(args)
336 passing = _find_passing_transitions(state, evidence)
337 output: dict[str, Any] = {
338 "state": state.id,
339 "next": [t.target for t in passing],
340 }
341 if args.json_output:
342 print(format_json(output)) # noqa: T201
343 else:
344 print(format_text(output)) # noqa: T201
345 return 0
348def _cmd_transition(args: argparse.Namespace) -> int:
349 """Run transition subcommand.
351 Returns:
352 Exit code: 0 on success, 1 on error.
353 """
354 if hasattr(args, "positional") and args.positional:
355 flow_file = args.flow_file
356 state_id = args.positional[1]
357 trigger = args.positional[2]
358 else:
359 flow_file = args.flow_file
360 state_id = args.state_id
361 trigger = args.trigger
362 flow = load_flow_from_file(flow_file)
363 all_flows = resolve_subflows(flow, flow_file)
364 state = _find_state(flow, state_id)
365 if state is None:
366 _error(f"State '{state_id}' not found")
367 return 1
368 transition = state.next.get(trigger)
369 if transition is None:
370 _error(f"Trigger '{trigger}' not found in state '{state_id}'")
371 return 1
372 evidence = _parse_evidence(args)
373 if transition.conditions and not _conditions_met(
374 transition.conditions.conditions, evidence
375 ):
376 _error(f"Conditions not met for trigger '{trigger}'")
377 return 1
378 target = transition.target
379 target_state = _find_state(flow, target)
380 if target_state is not None and target_state.flow is not None:
381 child = _find_subflow(all_flows, target_state.flow, Path(flow_file))
382 if child and child.states:
383 target = f"{child.flow}/{child.states[0].id}"
384 output: dict[str, Any] = {
385 "from": state_id,
386 "trigger": trigger,
387 "to": target,
388 }
389 if args.json_output:
390 print(format_json(output)) # noqa: T201
391 else:
392 print(format_text(output)) # noqa: T201
393 return 0
396def _cmd_config(args: argparse.Namespace) -> int:
397 """Run config subcommand — show resolved configuration with sources.
399 Returns:
400 Exit code: 0 on success.
401 """
402 config, sources = resolve_config_with_sources(
403 cli_overrides={"flows_dir": args.flows_dir} if args.flows_dir else None,
404 )
405 rows = [
406 {
407 "key": "project_root",
408 "value": str(config.project_root),
409 "source": sources["project_root"],
410 },
411 {
412 "key": "flows_dir",
413 "value": str(config.flows_dir),
414 "source": sources["flows_dir"],
415 },
416 {
417 "key": "sessions_dir",
418 "value": str(config.sessions_dir),
419 "source": sources["sessions_dir"],
420 },
421 {
422 "key": "default_flow",
423 "value": config.default_flow,
424 "source": sources["default_flow"],
425 },
426 {
427 "key": "default_session",
428 "value": config.default_session,
429 "source": sources["default_session"],
430 },
431 ]
432 if args.json_output:
433 print(format_json(rows)) # noqa: T201
434 else:
435 for row in rows:
436 print(f"{row['key']} = {row['value']} ({row['source']})") # noqa: T201
437 return 0
440def _cmd_mermaid(args: argparse.Namespace) -> int:
441 """Run mermaid subcommand.
443 Returns:
444 Exit code: 0 on success.
445 """
446 flow = load_flow_from_file(args.flow_file)
447 diagram = to_mermaid(flow)
448 if args.json_output:
449 print(format_json({"mermaid": diagram})) # noqa: T201
450 else:
451 print(diagram) # noqa: T201
452 return 0
455def _find_state(flow: Flow, state_id: str) -> State | None:
456 """Find a state by id in a flow.
458 Returns:
459 The matching State, or None if not found.
460 """
461 for s in flow.states:
462 if s.id == state_id:
463 return s
464 return None
467def _find_passing_transitions(
468 state: State, evidence: dict[str, str]
469) -> list[Transition]:
470 """Find transitions whose conditions pass given evidence.
472 Returns:
473 List of transitions whose conditions are satisfied.
474 """
475 passing: list[Transition] = []
476 for _trigger, transition in state.next.items():
477 if transition.conditions is None or _conditions_met(
478 transition.conditions.conditions, evidence
479 ):
480 passing.append(transition)
481 return passing
484def _conditions_met(conditions: dict[str, str], evidence: dict[str, str]) -> bool:
485 """Check if all conditions are satisfied by evidence.
487 Returns:
488 True if all conditions pass, False otherwise.
489 """
490 for key, cond_str in conditions.items():
491 ev = evidence.get(key, "")
492 op, value = parse_condition(cond_str)
493 if not evaluate_condition(op, value, ev):
494 return False
495 return True
498def _find_subflow(
499 all_flows: list[Flow], flow_ref: str, _root_path: Path
500) -> Flow | None:
501 """Find a subflow by its flow name from the resolved list.
503 Returns:
504 The matching Flow, or None if not found.
505 """
506 for f in all_flows:
507 if f.flow == Path(flow_ref).stem:
508 return f
509 return None
512def _error(msg: str) -> None:
513 """Print error to stderr."""
514 print(f"error: {msg}", file=sys.stderr) # noqa: T201
517def _resolve_session(
518 session_name: str, config: FlowrConfig, resolver: DefaultFlowNameResolver
519) -> tuple[Session, Flow, Path]:
520 """Load session and resolve its flow.
522 Returns:
523 Tuple of (session, flow, flow_path).
524 """
525 store = YamlSessionStore(config.sessions_path())
526 try:
527 session = store.load(session_name)
528 except (SessionNotFoundError, SessionNameNotFoundError) as exc:
529 _error(str(exc))
530 sys.exit(1)
532 try:
533 flow_path = resolver.resolve(session.flow, config.flows_path())
534 except FlowNameNotFoundError as exc:
535 _error(str(exc))
536 sys.exit(1)
538 try:
539 flow = load_flow_from_file(flow_path)
540 except FlowParseError as exc: # pragma: no cover
541 _error(f"invalid flow definition: {exc}") # pragma: no cover
542 sys.exit(1) # pragma: no cover
544 return session, flow, flow_path
547def _apply_session_transition(
548 session: Session,
549 flow: Flow,
550 flow_path: Path,
551 trigger: str,
552 evidence: dict[str, str],
553) -> tuple[Session, str]:
554 """Apply a transition to a session, handling subflow push/pop.
556 Returns:
557 Tuple of (updated_session, target_display).
558 """
559 state = _find_state(flow, session.state)
560 if state is None:
561 _error(f"State '{session.state}' not found")
562 sys.exit(1)
564 transition = state.next.get(trigger)
565 if transition is None:
566 _error(f"Trigger '{trigger}' not found in state '{session.state}'")
567 sys.exit(1)
569 if transition.conditions and not _conditions_met(
570 transition.conditions.conditions, evidence
571 ): # pragma: no cover
572 _error(f"Conditions not met for trigger '{trigger}'") # pragma: no cover
573 sys.exit(1) # pragma: no cover
575 target = transition.target
576 all_flows = resolve_subflows(flow, flow_path)
577 target_state = _find_state(flow, target)
579 # Check if transition enters a subflow
580 enters_subflow = target_state is not None and target_state.flow is not None
582 if enters_subflow and target_state is not None and target_state.flow is not None:
583 flow_ref = target_state.flow
584 child = _find_subflow(all_flows, flow_ref, flow_path)
585 if child and child.states:
586 frame = SessionStackFrame(flow=session.flow, state=session.state)
587 subflow_initial = child.states[0].id
588 updated_session = session.push_stack(
589 frame, subflow_initial, new_flow=child.flow
590 )
591 target = f"{child.flow}/{subflow_initial}"
592 else: # pragma: no cover
593 updated_session = session.with_state(target) # pragma: no cover
594 elif session.stack and target in flow.exits:
595 # Transition exits a subflow
596 updated_session = session.pop_stack(target)
597 else:
598 updated_session = session.with_state(target)
600 return updated_session, target
603def _cmd_transition_session(
604 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
605) -> None:
606 """Run transition with session-aware flow/state resolution and auto-update."""
607 if not args.positional:
608 _error("trigger is required")
609 sys.exit(2)
610 trigger = args.positional[0]
612 session_name = (
613 config.default_session if args.session == "__default__" else args.session
614 )
616 session, flow, flow_path = _resolve_session(session_name, config, resolver)
617 evidence = _parse_evidence(args)
618 updated_session, target = _apply_session_transition(
619 session, flow, flow_path, trigger, evidence
620 )
622 store = YamlSessionStore(config.sessions_path())
623 store.save(updated_session)
625 output: dict[str, Any] = {
626 "from": session.state,
627 "trigger": trigger,
628 "to": target,
629 }
630 if args.json_output:
631 print(format_json(output)) # noqa: T201
632 else:
633 print(format_text(output)) # noqa: T201
634 sys.exit(0)
637def _handle_session(
638 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
639) -> None:
640 """Dispatch session subcommands."""
641 if args.session_command is None: # pragma: no cover
642 build_parser().parse_args(["session", "--help"])
643 sys.exit(2) # pragma: no cover
645 handlers = {
646 "init": cmd_session_init,
647 "show": cmd_session_show,
648 "set-state": cmd_session_set_state,
649 "list": cmd_session_list,
650 }
651 handler = handlers.get(args.session_command)
652 if handler is None:
653 _error(f"Unknown session command: {args.session_command}")
654 sys.exit(2)
656 try:
657 rc = handler(args, config, resolver)
658 except FlowParseError as exc: # pragma: no cover
659 _error(f"invalid flow definition: {exc}") # pragma: no cover
660 sys.exit(1) # pragma: no cover
661 sys.exit(rc)
664def _cmd_check_session(
665 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
666) -> None:
667 """Run check with session-aware flow/state resolution (read-only)."""
668 session_name = (
669 config.default_session if args.session == "__default__" else args.session
670 )
672 session, flow, _flow_path = _resolve_session(session_name, config, resolver)
673 state = _find_state(flow, session.state)
674 if state is None:
675 _error(f"State '{session.state}' not found")
676 sys.exit(1)
678 if args.target is not None:
679 rc = _cmd_check_conditions(flow, state, args)
680 else:
681 rc = _cmd_check_state(flow, state, args)
682 sys.exit(rc)
685def _cmd_next_session(
686 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
687) -> None:
688 """Run next with session-aware flow/state resolution (read-only)."""
689 session_name = (
690 config.default_session if args.session == "__default__" else args.session
691 )
693 session, flow, _flow_path = _resolve_session(session_name, config, resolver)
694 state = _find_state(flow, session.state)
695 if state is None:
696 _error(f"State '{session.state}' not found")
697 sys.exit(1)
699 evidence = _parse_evidence(args)
700 passing = _find_passing_transitions(state, evidence)
701 output: dict[str, Any] = {
702 "state": state.id,
703 "next": [t.target for t in passing],
704 }
705 if args.json_output:
706 print(format_json(output)) # noqa: T201
707 else:
708 print(format_text(output)) # noqa: T201
709 sys.exit(0)
712def _resolve_flow_for_command(
713 args: argparse.Namespace,
714 config: FlowrConfig,
715 resolver: DefaultFlowNameResolver,
716) -> None:
717 """Resolve flow_file for non-session, non-transition-session commands."""
718 flows_dir = config.flows_path()
719 if args.command == "transition":
720 if len(args.positional) < 3:
721 _error("transition requires <flow> <state> <trigger>")
722 sys.exit(2)
723 flow_file_arg = args.positional[0]
724 try:
725 args.flow_file = resolver.resolve(flow_file_arg, flows_dir)
726 except FlowNameNotFoundError as exc:
727 _error(str(exc))
728 sys.exit(1)
729 else:
730 try:
731 args.flow_file = resolver.resolve(args.flow_file, flows_dir)
732 except FlowNameNotFoundError as exc:
733 _error(str(exc))
734 sys.exit(1)
737def main() -> None:
738 """Run the application."""
739 args = build_parser().parse_args()
740 if args.command is None: # pragma: no cover
741 build_parser().print_help()
742 sys.exit(2)
744 resolver = DefaultFlowNameResolver()
745 config = resolve_config()
746 if args.flows_dir is not None:
747 config = resolve_config(cli_overrides={"flows_dir": args.flows_dir})
749 if args.command == "session": # pragma: no cover
750 _handle_session(args, config, resolver)
751 return # pragma: no cover
753 if args.command == "config": # pragma: no cover
754 rc = _cmd_config(args)
755 sys.exit(rc) # pragma: no cover
757 if args.command == "transition" and args.session is not None: # pragma: no cover
758 _cmd_transition_session(args, config, resolver)
759 return # pragma: no cover
761 if args.command == "check" and args.session is not None: # pragma: no cover
762 _cmd_check_session(args, config, resolver)
763 return # pragma: no cover
765 if args.command == "next" and args.session is not None: # pragma: no cover
766 _cmd_next_session(args, config, resolver)
767 return # pragma: no cover
769 _resolve_flow_for_command(args, config, resolver)
771 cmd_map = {
772 "validate": _cmd_validate,
773 "states": _cmd_states,
774 "check": _cmd_check,
775 "next": _cmd_next,
776 "transition": _cmd_transition,
777 "mermaid": _cmd_mermaid,
778 "config": _cmd_config,
779 }
780 handler = cmd_map.get(args.command)
781 if handler is None: # pragma: no cover
782 _error(f"Unknown command: {args.command}")
783 sys.exit(2)
784 try:
785 sys.exit(handler(args))
786 except FlowParseError as exc:
787 _error(f"invalid flow definition: {exc}")
788 sys.exit(1)
791if __name__ == "__main__":
792 main()