Coverage for flowr / __main__.py: 96%
536 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
1"""CLI entrypoint for flowr — invoked via `python -m flowr`."""
3import argparse
4import importlib.metadata
5import sys
6from collections.abc import Callable
7from pathlib import Path
8from typing import Any
10import yaml
12from flowr.cli.output import format_json, format_text
13from flowr.cli.resolution import DefaultFlowNameResolver, FlowNameNotFoundError
14from flowr.cli.session_cmd import (
15 add_session_parser,
16 cmd_session_init,
17 cmd_session_list,
18 cmd_session_set_state,
19 cmd_session_show,
20)
21from flowr.domain.condition import evaluate_condition, parse_condition
22from flowr.domain.flow_definition import Flow, State, Transition
23from flowr.domain.loader import FlowParseError, load_flow_from_file, resolve_subflows
24from flowr.domain.session import Session, SessionStackFrame
25from flowr.domain.validation import validate
26from flowr.infrastructure.config import (
27 FlowrConfig,
28 resolve_config,
29 resolve_config_with_sources,
30)
31from flowr.infrastructure.session_store import (
32 SessionNameNotFoundError,
33 SessionNotFoundError,
34 YamlSessionStore,
35)
37add_serve_parser: Callable[..., None] | None = None
38cmd_serve: Callable[..., int] | None = None
39try:
40 from flowr.cli.serve import add_serve_parser, cmd_serve
42 _SERVE_AVAILABLE = True
43except ImportError:
44 _SERVE_AVAILABLE = False
47def build_parser() -> argparse.ArgumentParser:
48 """Build and return the argument parser.
50 Returns:
51 The configured ArgumentParser instance.
52 """
53 meta = importlib.metadata.metadata("flowr")
54 parser = argparse.ArgumentParser(
55 prog="flowr",
56 description=meta["Summary"],
57 )
58 parser.add_argument(
59 "--version",
60 action="version",
61 version=f"flowr {meta['Version']}",
62 )
63 parser.add_argument(
64 "--flows-dir",
65 dest="flows_dir",
66 default=None,
67 metavar="DIR",
68 help="Override the configured flows directory for this invocation",
69 )
70 _add_subcommands(parser)
71 return parser
74def _add_evidence_args(parser: argparse.ArgumentParser) -> None:
75 """Add evidence input args."""
76 parser.add_argument(
77 "--evidence",
78 action="append",
79 default=[],
80 metavar="KEY=VALUE",
81 help="Evidence key=value pairs",
82 )
83 parser.add_argument(
84 "--evidence-json",
85 default=None,
86 metavar="JSON",
87 help="JSON evidence object",
88 )
91def _parse_evidence(args: argparse.Namespace) -> dict[str, str]:
92 """Parse evidence from CLI args into a dict.
94 Returns:
95 Dictionary mapping evidence keys to their string values.
96 """
97 evidence: dict[str, str] = {}
98 for pair in args.evidence:
99 key, _, value = pair.partition("=")
100 evidence[key] = value
101 if args.evidence_json is not None:
102 import json
104 data = json.loads(args.evidence_json)
105 for k, v in data.items():
106 evidence[k] = str(v)
107 return evidence
110def _add_subcommands(parser: argparse.ArgumentParser) -> None:
111 """Add all CLI subcommands."""
112 sub = parser.add_subparsers(dest="command")
114 # validate
115 p_validate = sub.add_parser("validate", help="Validate a flow definition")
116 p_validate.add_argument(
117 "flow_file",
118 nargs="?",
119 default=None,
120 help="Path to flow YAML file or flow name (required unless --session)",
121 )
122 p_validate.add_argument("--text", action="store_true", dest="text_output")
123 p_validate.add_argument(
124 "--session",
125 nargs="?",
126 const="__default__",
127 default=None,
128 dest="session",
129 metavar="NAME",
130 help="Use session name to resolve flow (read-only)",
131 )
133 # states
134 p_states = sub.add_parser("states", help="List all states in a flow")
135 p_states.add_argument(
136 "flow_file",
137 nargs="?",
138 default=None,
139 help="Path to flow YAML file or flow name (required unless --session)",
140 )
141 p_states.add_argument("--text", action="store_true", dest="text_output")
142 p_states.add_argument(
143 "--session",
144 nargs="?",
145 const="__default__",
146 default=None,
147 dest="session",
148 metavar="NAME",
149 help="Use session name to resolve flow (read-only)",
150 )
152 # check
153 p_check = sub.add_parser("check", help="Check a state or transition conditions")
154 p_check.add_argument(
155 "flow_file",
156 nargs="?",
157 default=None,
158 help="Path to flow YAML file or flow name (required unless --session)",
159 )
160 p_check.add_argument("--text", action="store_true", dest="text_output")
161 p_check.add_argument(
162 "state_id",
163 nargs="?",
164 default=None,
165 help="State id to inspect",
166 )
167 p_check.add_argument(
168 "target",
169 nargs="?",
170 default=None,
171 help="Target to check conditions for",
172 )
173 p_check.add_argument(
174 "--session",
175 nargs="?",
176 const="__default__",
177 default=None,
178 dest="session",
179 metavar="NAME",
180 help="Use session name or file path to resolve flow/state (read-only)",
181 )
183 # next
184 p_next = sub.add_parser("next", help="Show valid next transitions")
185 p_next.add_argument(
186 "flow_file",
187 nargs="?",
188 default=None,
189 help="Path to flow YAML file or flow name (required unless --session)",
190 )
191 p_next.add_argument("--text", action="store_true", dest="text_output")
192 p_next.add_argument("state_id", nargs="?", default=None, help="Current state id")
193 _add_evidence_args(p_next)
194 p_next.add_argument(
195 "--session",
196 nargs="?",
197 const="__default__",
198 default=None,
199 dest="session",
200 metavar="NAME",
201 help="Use session name or file path to resolve flow/state (read-only)",
202 )
204 # transition
205 p_transition = sub.add_parser("transition", help="Compute next state")
206 p_transition.add_argument(
207 "positional",
208 nargs="*",
209 help="Args: <flow> <state> <trigger> or <trigger> with --session",
210 )
211 p_transition.add_argument("--text", action="store_true", dest="text_output")
212 _add_evidence_args(p_transition)
213 p_transition.add_argument(
214 "--session",
215 nargs="?",
216 const="__default__",
217 default=None,
218 dest="session",
219 metavar="NAME",
220 help="Use session to resolve flow/state; auto-update after transition",
221 )
223 # config
224 p_config = sub.add_parser("config", help="Show resolved configuration")
225 p_config.add_argument(
226 "--text",
227 action="store_true",
228 dest="text_output",
229 help="Output as human-readable text",
230 )
232 # export
233 p_export = sub.add_parser(
234 "export", help="Export a flow definition to various formats"
235 )
236 p_export.add_argument("input_path", help="Path to flow YAML file or directory")
237 p_export.add_argument(
238 "--format",
239 required=True,
240 dest="export_format",
241 help="Export format",
242 )
243 p_export.add_argument(
244 "--output",
245 "-o",
246 dest="output_path",
247 help="Write output to file instead of stdout",
248 )
249 from flowr.exporters.registry import EXPORTERS as EXPORTERS_FOR_ARGS
251 for _name, adapter in EXPORTERS_FOR_ARGS.items():
252 adapter.add_arguments(p_export)
254 # session
255 add_session_parser(sub)
257 if _SERVE_AVAILABLE and add_serve_parser is not None:
258 add_serve_parser(sub)
261def _cmd_validate(args: argparse.Namespace) -> int:
262 """Run validate subcommand.
264 Returns:
265 Exit code: 0 if valid, 1 if invalid.
266 """
267 if args.flow_file is None:
268 _error("flow_file is required when not using --session")
269 return 2
270 flow = load_flow_from_file(args.flow_file)
271 all_flows = resolve_subflows(flow, args.flow_file)
272 result = validate(flow, all_flows if len(all_flows) > 1 else None)
273 output: dict[str, Any] = {
274 "valid": result.is_valid,
275 "violations": [],
276 }
277 for v in result.violations:
278 output["violations"].append(
279 {
280 "severity": v.severity.value,
281 "message": v.message,
282 "location": v.location,
283 }
284 )
285 if args.text_output:
286 print(format_text(output)) # noqa: T201
287 else:
288 print(format_json(output)) # noqa: T201
289 return 0 if result.is_valid else 1
292def _cmd_states(args: argparse.Namespace) -> int:
293 """Run states subcommand.
295 Returns:
296 Exit code: 0 on success.
297 """
298 if args.flow_file is None:
299 _error("flow_file is required when not using --session")
300 return 2
301 flow = load_flow_from_file(args.flow_file)
302 state_ids = [s.id for s in flow.states]
303 if args.text_output:
304 for sid in state_ids:
305 print(sid) # noqa: T201
306 else:
307 print(format_json(state_ids)) # noqa: T201
308 return 0
311def _cmd_check(
312 args: argparse.Namespace,
313) -> int:
314 """Run check subcommand.
316 Returns:
317 Exit code: 0 on success, 1 on error.
318 """
319 if args.flow_file is None:
320 _error("flow_file is required when not using --session")
321 return 2
322 if args.state_id is None:
323 _error("state_id is required when not using --session")
324 return 2
325 flow = load_flow_from_file(args.flow_file)
326 state = _find_state(flow, args.state_id)
327 if state is None:
328 _error(f"State '{args.state_id}' not found")
329 return 1
330 if args.target is not None:
331 return _cmd_check_conditions(flow, state, args)
332 return _cmd_check_state(flow, state, args)
335def _cmd_check_state(_flow: Flow, state: State, args: argparse.Namespace) -> int:
336 """Show state details.
338 Returns:
339 Exit code: 0 on success.
340 """
341 output: dict[str, Any] = {"id": state.id}
342 if state.attrs:
343 output["attrs"] = state.attrs
344 if state.flow:
345 output["flow"] = state.flow
346 transitions = list(state.next.keys())
347 output["transitions"] = transitions
348 if args.text_output:
349 print(format_text(output)) # noqa: T201
350 else:
351 print(format_json(output)) # noqa: T201
352 return 0
355def _cmd_check_conditions(_flow: Flow, state: State, args: argparse.Namespace) -> int:
356 """Show conditions for a specific transition target.
358 Returns:
359 Exit code: 0 on success, 1 if target not found.
360 """
361 transition = state.next.get(args.target)
362 if transition is None:
363 _error(f"Transition target '{args.target}' not found in state '{state.id}'")
364 return 1
365 output: dict[str, Any] = {
366 "from": state.id,
367 "target": args.target,
368 }
369 if transition.conditions:
370 output["conditions"] = transition.conditions.conditions
371 else:
372 output["conditions"] = "(none)"
373 if args.text_output:
374 print(format_text(output)) # noqa: T201
375 else:
376 print(format_json(output)) # noqa: T201
377 return 0
380def _cmd_next(args: argparse.Namespace) -> int:
381 """Run next subcommand.
383 Returns:
384 Exit code: 0 on success, 1 if state not found.
385 """
386 if args.flow_file is None:
387 _error("flow_file is required when not using --session")
388 return 2
389 if args.state_id is None:
390 _error("state_id is required when not using --session")
391 return 2
392 flow = load_flow_from_file(args.flow_file)
393 state = _find_state(flow, args.state_id)
394 if state is None:
395 _error(f"State '{args.state_id}' not found")
396 return 1
397 evidence = _parse_evidence(args)
398 transitions = _build_transition_list(state, evidence)
399 if args.text_output:
400 print(_format_transitions_text(state.id, transitions)) # noqa: T201
401 else:
402 print(format_json({"state": state.id, "transitions": transitions})) # noqa: T201
403 return 0
406def _cmd_transition(args: argparse.Namespace) -> int:
407 """Run transition subcommand.
409 Returns:
410 Exit code: 0 on success, 1 on error.
411 """
412 if hasattr(args, "positional") and args.positional:
413 flow_file = args.flow_file
414 state_id = args.positional[1]
415 trigger = args.positional[2]
416 else:
417 flow_file = args.flow_file
418 state_id = args.state_id
419 trigger = args.trigger
420 flow = load_flow_from_file(flow_file)
421 all_flows = resolve_subflows(flow, flow_file)
422 state = _find_state(flow, state_id)
423 if state is None:
424 _error(f"State '{state_id}' not found")
425 return 1
426 transition = state.next.get(trigger)
427 if transition is None:
428 _error(f"Trigger '{trigger}' not found in state '{state_id}'")
429 return 1
430 evidence = _parse_evidence(args)
431 if transition.conditions and not _conditions_met(
432 transition.conditions.conditions, evidence
433 ):
434 _error(f"Conditions not met for trigger '{trigger}'")
435 return 1
436 target = transition.target
437 target_state = _find_state(flow, target)
438 if target_state is not None and target_state.flow is not None:
439 child = _find_subflow(all_flows, target_state.flow, Path(flow_file))
440 if child and child.states:
441 target = f"{child.flow}/{child.states[0].id}"
442 output: dict[str, Any] = {
443 "from": state_id,
444 "trigger": trigger,
445 "to": target,
446 }
447 if args.text_output:
448 print(format_text(output)) # noqa: T201
449 else:
450 print(format_json(output)) # noqa: T201
451 return 0
454def _cmd_config(args: argparse.Namespace) -> int:
455 """Run config subcommand — show resolved configuration with sources.
457 Returns:
458 Exit code: 0 on success.
459 """
460 config, sources = resolve_config_with_sources(
461 cli_overrides={"flows_dir": args.flows_dir} if args.flows_dir else None,
462 )
463 rows = [
464 {
465 "key": "project_root",
466 "value": _display_path(config.project_root),
467 "source": sources["project_root"],
468 },
469 {
470 "key": "flows_dir",
471 "value": str(config.flows_dir),
472 "source": sources["flows_dir"],
473 },
474 {
475 "key": "sessions_dir",
476 "value": str(config.sessions_dir),
477 "source": sources["sessions_dir"],
478 },
479 {
480 "key": "default_flow",
481 "value": config.default_flow,
482 "source": sources["default_flow"],
483 },
484 {
485 "key": "default_session",
486 "value": config.default_session,
487 "source": sources["default_session"],
488 },
489 ]
490 if args.text_output:
491 for row in rows:
492 print(f"{row['key']} = {row['value']} ({row['source']})") # noqa: T201
493 else:
494 print(format_json(rows)) # noqa: T201
495 return 0
498def _extract_adapter_options(args: argparse.Namespace) -> dict:
499 options: dict = {}
500 for key, value in vars(args).items():
501 if key.startswith("adapter_"):
502 options[key[len("adapter_") :]] = value
503 return options
506def _load_flows_from_directory(dir_path: Path) -> list[tuple[str, Flow]]:
507 yaml_files = sorted(dir_path.glob("*.yaml")) + sorted(dir_path.glob("*.yml"))
508 for subdir in sorted(dir_path.iterdir()):
509 if subdir.is_dir():
510 yaml_files.extend(
511 sorted(subdir.glob("*.yaml")) + sorted(subdir.glob("*.yml"))
512 )
513 flows: list[tuple[str, Flow]] = []
514 for yaml_file in yaml_files:
515 flow = load_flow_from_file(yaml_file)
516 flows.append((yaml_file.stem, flow))
517 return flows
520def _load_subflows(flow: Flow, search_dir: Path) -> dict[str, Flow]:
521 subflows: dict[str, Flow] = {}
522 for state in flow.states:
523 if state.flow and state.flow not in subflows:
524 for pattern in (
525 f"{state.flow}.yaml",
526 f"{state.flow}.yml",
527 f"{state.flow}-flow.yaml",
528 f"{state.flow}-flow.yml",
529 ):
530 candidate = search_dir / pattern
531 if candidate.exists():
532 subflows[state.flow] = load_flow_from_file(candidate)
533 break
534 return subflows
537def _cmd_export(args: argparse.Namespace) -> int:
538 from flowr.exporters.registry import EXPORTERS as EXPORTERS_REGISTRY
540 input_path = Path(args.input_path)
541 if not input_path.exists():
542 _error(f"path does not exist: {args.input_path}")
543 return 1
544 adapter = EXPORTERS_REGISTRY.get(args.export_format)
545 if adapter is None:
546 available = ", ".join(sorted(EXPORTERS_REGISTRY.keys()))
547 _error(f"unknown format '{args.export_format}'. available: {available}")
548 return 1
549 options = _extract_adapter_options(args)
550 accepted = adapter.accepted_options()
551 unused = [k for k in options if options[k] and k not in accepted]
552 if unused:
553 flag_names = ", ".join(f"--{k.replace('_', '-')}" for k in unused)
554 print( # noqa: T201
555 f"warning: unused flags for format '{args.export_format}': {flag_names}",
556 file=sys.stderr,
557 )
558 if input_path.is_dir():
559 flows = _load_flows_from_directory(input_path)
560 if not flows:
561 _error(f"no flow files found in directory: {args.input_path}")
562 return 1
563 output = adapter.export_directory(flows, options)
564 else:
565 flow = load_flow_from_file(input_path)
566 subflows = _load_subflows(flow, input_path.parent)
567 output = adapter.export(flow, options, subflows=subflows)
568 output_path = getattr(args, "output_path", None)
569 if output_path:
570 out = Path(output_path)
571 if out.suffix == ".js" and args.export_format == "json":
572 var_name = "window.FLOWVIZ_DATA"
573 output = f"{var_name} = {output};\n"
574 out.parent.mkdir(parents=True, exist_ok=True)
575 out.write_text(output, encoding="utf-8")
576 else:
577 print(output) # noqa: T201
578 return 0
581def _find_state(flow: Flow, state_id: str) -> State | None:
582 """Find a state by id in a flow.
584 Returns:
585 The matching State, or None if not found.
586 """
587 for s in flow.states:
588 if s.id == state_id:
589 return s
590 return None
593def _find_passing_transitions(
594 state: State, evidence: dict[str, str]
595) -> list[Transition]:
596 """Find transitions whose conditions pass given evidence.
598 Returns:
599 List of transitions whose conditions are satisfied.
600 """
601 passing: list[Transition] = []
602 for _trigger, transition in state.next.items():
603 if transition.conditions is None or _conditions_met(
604 transition.conditions.conditions, evidence
605 ):
606 passing.append(transition)
607 return passing
610def _build_transition_list(
611 state: State, evidence: dict[str, str]
612) -> list[dict[str, Any]]:
613 """Build rich transition info for all transitions from a state.
615 Returns:
616 List of dicts with trigger, target, status, and conditions.
617 """
618 transitions: list[dict[str, Any]] = []
619 for trigger, transition in state.next.items():
620 if transition.conditions is None or _conditions_met(
621 transition.conditions.conditions, evidence
622 ):
623 status = "open"
624 else:
625 status = "blocked"
626 transitions.append(
627 {
628 "trigger": trigger,
629 "target": transition.target,
630 "status": status,
631 "conditions": transition.conditions.conditions
632 if transition.conditions
633 else None,
634 }
635 )
636 return transitions
639def _format_transitions_text(state_id: str, transitions: list[dict[str, Any]]) -> str:
640 """Format transitions as human-readable text."""
641 lines = [f"state: {state_id}"]
642 for t in transitions:
643 marker = " [blocked]" if t["status"] == "blocked" else ""
644 cond = ""
645 if t["status"] == "blocked" and t["conditions"]:
646 pairs = ", ".join(f"{k}={v}" for k, v in t["conditions"].items())
647 cond = f" need: {pairs}"
648 lines.append(f" {t['trigger']} → {t['target']}{marker}{cond}")
649 return "\n".join(lines)
652def _conditions_met(conditions: dict[str, str], evidence: dict[str, str]) -> bool:
653 """Check if all conditions are satisfied by evidence.
655 Returns:
656 True if all conditions pass, False otherwise.
657 """
658 for key, cond_str in conditions.items():
659 ev = evidence.get(key, "")
660 op, value = parse_condition(cond_str)
661 if not evaluate_condition(op, value, ev):
662 return False
663 return True
666def _find_subflow(
667 all_flows: list[Flow], flow_ref: str, _root_path: Path
668) -> Flow | None:
669 """Find a subflow by its flow name from the resolved list.
671 Returns:
672 The matching Flow, or None if not found.
673 """
674 for f in all_flows:
675 if f.flow == Path(flow_ref).stem:
676 return f
677 return None
680def _display_path(path: Path) -> str:
681 """Display a path relative to cwd, or '.' if same."""
682 try:
683 rel = path.relative_to(Path.cwd())
684 return "." if rel == Path() else str(rel)
685 except ValueError:
686 return str(path)
689def _error(msg: str) -> None:
690 """Print error to stderr."""
691 print(f"error: {msg}", file=sys.stderr) # noqa: T201
694def _resolve_session(
695 session_name: str, config: FlowrConfig, resolver: DefaultFlowNameResolver
696) -> tuple[Session, Flow, Path]:
697 """Load session and resolve its flow.
699 Returns:
700 Tuple of (session, flow, flow_path).
701 """
702 store = YamlSessionStore(config.sessions_path())
703 try:
704 session = store.load(session_name)
705 except (SessionNotFoundError, SessionNameNotFoundError) as exc:
706 _error(str(exc))
707 sys.exit(1)
709 try:
710 flow_path = resolver.resolve(session.flow, config.flows_path())
711 except FlowNameNotFoundError as exc:
712 _error(str(exc))
713 sys.exit(1)
715 try:
716 flow = load_flow_from_file(flow_path)
717 except FlowParseError as exc: # pragma: no cover
718 _error(f"invalid flow definition: {exc}") # pragma: no cover
719 sys.exit(1) # pragma: no cover
721 return session, flow, flow_path
724def _find_flow_file(flow_name: str, flows_dir: Path) -> Path | None:
725 """Find a flow file by name in the flows directory."""
726 path = flows_dir / (flow_name + ".yaml")
727 if path.exists():
728 return path
729 path = flows_dir / flow_name
730 if path.exists():
731 return path
732 return None
735def _enter_subflow(
736 session: Session,
737 parent_flow: Flow,
738 parent_flow_path: Path,
739 target_state_id: str,
740) -> tuple[Session, str] | None:
741 """Try to enter a subflow for the given target state.
743 Returns (updated_session, display_target) if a subflow was entered,
744 or None if the target is not a subflow entry point.
746 Recursively enters nested subflows if the child's first state
747 is itself a subflow wrapper (has a ``flow:`` field).
748 """
749 target_state = _find_state(parent_flow, target_state_id)
750 if target_state is None or target_state.flow is None:
751 return None
753 all_flows = resolve_subflows(parent_flow, parent_flow_path)
754 child = _find_subflow(all_flows, target_state.flow, parent_flow_path)
755 if child is None or not child.states:
756 return None
758 frame = SessionStackFrame(flow=parent_flow.flow, state=target_state_id)
759 subflow_initial = child.states[0].id
760 updated = session.push_stack(frame, subflow_initial, new_flow=child.flow)
761 display = f"{child.flow}/{subflow_initial}"
763 child_first = child.states[0]
764 if child_first.flow is not None:
765 child_flows = resolve_subflows(child, parent_flow_path)
766 grandchild = _find_subflow(child_flows, child_first.flow, parent_flow_path)
767 if grandchild is not None and grandchild.states:
768 nested_frame = SessionStackFrame(flow=child.flow, state=subflow_initial)
769 gc_initial = grandchild.states[0].id
770 nested = updated.push_stack(
771 nested_frame, gc_initial, new_flow=grandchild.flow
772 )
773 return nested, f"{grandchild.flow}/{gc_initial}"
775 return updated, display
778def _resolve_subflow_exit(
779 session: Session,
780 trigger: str,
781 exit_name: str,
782 flows_dir: Path,
783) -> tuple[Session, str]:
784 """Handle subflow exit: resolve parent transition, handle chaining.
786 When a transition targets an exit name and the session has a stack,
787 this function resolves the actual target through the parent flow's
788 transition map and handles entering the next subflow if needed.
789 """
790 parent_frame = session.stack[-1]
791 parent_flow_path = _find_flow_file(parent_frame.flow, flows_dir)
792 if parent_flow_path is None:
793 return session.pop_stack(exit_name), exit_name
795 parent_flow = load_flow_from_file(parent_flow_path)
796 parent_state = _find_state(parent_flow, parent_frame.state)
797 if parent_state is None:
798 return session.pop_stack(exit_name), exit_name
800 parent_transition = parent_state.next.get(exit_name)
801 if parent_transition is None:
802 return session.pop_stack(exit_name), exit_name
804 resolved_target = parent_transition.target
806 popped = session.pop_stack(resolved_target)
808 entry = _enter_subflow(popped, parent_flow, parent_flow_path, resolved_target)
809 if entry is not None:
810 return entry
812 return popped, resolved_target
815def _apply_session_transition(
816 session: Session,
817 flow: Flow,
818 flow_path: Path,
819 trigger: str,
820 evidence: dict[str, str],
821 flows_dir: Path | None = None,
822) -> tuple[Session, str]:
823 """Apply a transition to a session, handling subflow push/pop.
825 Returns:
826 Tuple of (updated_session, target_display).
827 """
828 state = _find_state(flow, session.state)
829 if state is None:
830 _error(f"State '{session.state}' not found")
831 sys.exit(1)
833 transition = state.next.get(trigger)
834 if transition is None:
835 _error(f"Trigger '{trigger}' not found in state '{session.state}'")
836 sys.exit(1)
838 if transition.conditions and not _conditions_met(
839 transition.conditions.conditions, evidence
840 ): # pragma: no cover
841 _error(f"Conditions not met for trigger '{trigger}'") # pragma: no cover
842 sys.exit(1) # pragma: no cover
844 target = transition.target
846 # Check if transition enters a subflow
847 entry = _enter_subflow(session, flow, flow_path, target)
848 if entry is not None:
849 return entry
851 # Check if transition exits a subflow
852 if session.stack and target in flow.exits:
853 if flows_dir is not None:
854 return _resolve_subflow_exit(session, trigger, target, flows_dir)
855 updated_session = session.pop_stack(target)
856 return updated_session, target
858 updated_session = session.with_state(target)
859 return updated_session, target
862def _cmd_transition_session(
863 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
864) -> None:
865 """Run transition with session-aware flow/state resolution and auto-update."""
866 if not args.positional:
867 _error("trigger is required")
868 sys.exit(2)
869 trigger = args.positional[0]
871 session_name = (
872 config.default_session if args.session == "__default__" else args.session
873 )
875 session, flow, flow_path = _resolve_session(session_name, config, resolver)
876 evidence = _parse_evidence(args)
877 updated_session, target = _apply_session_transition(
878 session, flow, flow_path, trigger, evidence, config.flows_path()
879 )
881 store = YamlSessionStore(config.sessions_path())
882 store.save(updated_session)
884 output: dict[str, Any] = {
885 "from": session.state,
886 "trigger": trigger,
887 "to": target,
888 }
889 if args.text_output:
890 print(format_text(output)) # noqa: T201
891 else:
892 print(format_json(output)) # noqa: T201
893 sys.exit(0)
896def _handle_session(
897 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
898) -> None:
899 """Dispatch session subcommands."""
900 if args.session_command is None: # pragma: no cover
901 build_parser().parse_args(["session", "--help"])
902 sys.exit(2) # pragma: no cover
904 handlers = {
905 "init": cmd_session_init,
906 "show": cmd_session_show,
907 "set-state": cmd_session_set_state,
908 "list": cmd_session_list,
909 }
910 handler = handlers.get(args.session_command)
911 if handler is None:
912 _error(f"Unknown session command: {args.session_command}")
913 sys.exit(2)
915 try:
916 rc = handler(args, config, resolver)
917 except FlowParseError as exc: # pragma: no cover
918 _error(f"invalid flow definition: {exc}") # pragma: no cover
919 sys.exit(1) # pragma: no cover
920 sys.exit(rc)
923def _cmd_check_session(
924 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
925) -> None:
926 """Run check with session-aware flow/state resolution (read-only)."""
927 session_name = (
928 config.default_session if args.session == "__default__" else args.session
929 )
931 session, flow, _flow_path = _resolve_session(session_name, config, resolver)
932 state = _find_state(flow, session.state)
933 if state is None:
934 _error(f"State '{session.state}' not found")
935 sys.exit(1)
937 effective_target = args.target or getattr(args, "flow_file", None)
938 if effective_target is not None:
939 args.target = effective_target
940 rc = _cmd_check_conditions(flow, state, args)
941 else:
942 rc = _cmd_check_state(flow, state, args)
943 sys.exit(rc)
946def _cmd_next_session(
947 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
948) -> None:
949 """Run next with session-aware flow/state resolution (read-only)."""
950 session_name = (
951 config.default_session if args.session == "__default__" else args.session
952 )
954 session, flow, _flow_path = _resolve_session(session_name, config, resolver)
955 state = _find_state(flow, session.state)
956 if state is None:
957 _error(f"State '{session.state}' not found")
958 sys.exit(1)
960 evidence = _parse_evidence(args)
961 transitions = _build_transition_list(state, evidence)
962 if args.text_output:
963 print(_format_transitions_text(state.id, transitions)) # noqa: T201
964 else:
965 print(format_json({"state": state.id, "transitions": transitions})) # noqa: T201
966 sys.exit(0)
969def _cmd_states_session(
970 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
971) -> None:
972 """Run states with session-aware flow resolution (read-only)."""
973 session_name = (
974 config.default_session if args.session == "__default__" else args.session
975 )
977 _session, flow, _flow_path = _resolve_session(session_name, config, resolver)
978 state_ids = [s.id for s in flow.states]
979 if args.text_output:
980 for sid in state_ids:
981 print(sid) # noqa: T201
982 else:
983 print(format_json(state_ids)) # noqa: T201
984 sys.exit(0)
987def _cmd_validate_session(
988 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
989) -> None:
990 """Run validate with session-aware flow resolution (read-only)."""
991 session_name = (
992 config.default_session if args.session == "__default__" else args.session
993 )
995 _session, flow, flow_path = _resolve_session(session_name, config, resolver)
996 all_flows = resolve_subflows(flow, flow_path)
997 result = validate(flow, all_flows if len(all_flows) > 1 else None)
998 output: dict[str, Any] = {
999 "valid": result.is_valid,
1000 "violations": [],
1001 }
1002 for v in result.violations:
1003 output["violations"].append(
1004 {
1005 "severity": v.severity.value,
1006 "message": v.message,
1007 "location": v.location,
1008 }
1009 )
1010 if args.text_output:
1011 print(format_text(output)) # noqa: T201
1012 else:
1013 print(format_json(output)) # noqa: T201
1014 sys.exit(0 if result.is_valid else 1)
1017def _resolve_flow_for_command(
1018 args: argparse.Namespace,
1019 config: FlowrConfig,
1020 resolver: DefaultFlowNameResolver,
1021) -> None:
1022 """Resolve flow_file for non-session, non-transition-session commands."""
1023 flows_dir = config.flows_path()
1024 if args.command == "transition":
1025 if len(args.positional) < 3:
1026 _error("transition requires <flow> <state> <trigger>")
1027 sys.exit(2)
1028 flow_file_arg = args.positional[0]
1029 try:
1030 args.flow_file = resolver.resolve(flow_file_arg, flows_dir)
1031 except FlowNameNotFoundError as exc:
1032 _error(str(exc))
1033 sys.exit(1)
1034 else:
1035 try:
1036 args.flow_file = resolver.resolve(args.flow_file, flows_dir)
1037 except FlowNameNotFoundError as exc:
1038 _error(str(exc))
1039 sys.exit(1)
1042_SESSION_COMMANDS = {
1043 "transition": "_cmd_transition_session",
1044 "check": "_cmd_check_session",
1045 "next": "_cmd_next_session",
1046 "states": "_cmd_states_session",
1047 "validate": "_cmd_validate_session",
1048}
1051def _dispatch_session_command(
1052 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver
1053) -> bool:
1054 """Handle session-aware command dispatch. Returns True if handled."""
1055 if getattr(args, "session", None) is None:
1056 return False
1057 handler_name = _SESSION_COMMANDS.get(args.command)
1058 if handler_name is None:
1059 return False
1060 handler = globals()[handler_name]
1061 handler(args, config, resolver)
1062 return True
1065def _run_command(
1066 handler: Callable[[argparse.Namespace], int],
1067 args: argparse.Namespace,
1068) -> None:
1069 """Run a command handler with unified error handling."""
1070 try:
1071 sys.exit(handler(args))
1072 except yaml.YAMLError as exc:
1073 _error(f"malformed YAML: {str(exc).splitlines()[0]}")
1074 sys.exit(1)
1075 except FlowParseError as exc:
1076 _error(f"invalid flow definition: {exc}")
1077 sys.exit(1)
1080def _run_export(args: argparse.Namespace) -> None:
1081 """Run the export command with unified error handling."""
1082 try:
1083 sys.exit(_cmd_export(args))
1084 except yaml.YAMLError as exc:
1085 _error(f"malformed YAML: {str(exc).splitlines()[0]}")
1086 sys.exit(1)
1087 except FlowParseError as exc:
1088 _error(f"invalid flow definition: {exc}")
1089 sys.exit(1)
1092def main() -> None:
1093 """Run the application."""
1094 args = build_parser().parse_args()
1095 if args.command is None: # pragma: no cover
1096 build_parser().print_help()
1097 sys.exit(2)
1099 resolver = DefaultFlowNameResolver()
1100 config = resolve_config()
1101 if args.flows_dir is not None:
1102 config = resolve_config(cli_overrides={"flows_dir": args.flows_dir})
1104 if args.command == "session": # pragma: no cover
1105 _handle_session(args, config, resolver)
1106 return # pragma: no cover
1108 if args.command == "config": # pragma: no cover
1109 rc = _cmd_config(args)
1110 sys.exit(rc) # pragma: no cover
1112 if args.command == "export":
1113 _run_export(args)
1114 return # pragma: no cover
1116 if _dispatch_session_command(args, config, resolver):
1117 return
1119 if args.command == "serve" and cmd_serve is not None:
1120 rc = cmd_serve(args)
1121 sys.exit(rc)
1123 _resolve_flow_for_command(args, config, resolver)
1125 cmd_map = {
1126 "validate": _cmd_validate,
1127 "states": _cmd_states,
1128 "check": _cmd_check,
1129 "next": _cmd_next,
1130 "transition": _cmd_transition,
1131 "config": _cmd_config,
1132 }
1133 handler = cmd_map.get(args.command)
1134 if handler is None: # pragma: no cover
1135 _error(f"Unknown command: {args.command}")
1136 sys.exit(2)
1137 _run_command(handler, args)
1140if __name__ == "__main__":
1141 main()