Coverage for flowr / __main__.py: 96%

536 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 18:42 +0000

1"""CLI entrypoint for flowr — invoked via `python -m flowr`.""" 

2 

3import argparse 

4import importlib.metadata 

5import sys 

6from collections.abc import Callable 

7from pathlib import Path 

8from typing import Any 

9 

10import yaml 

11 

12from flowr.cli.output import format_json, format_text 

13from flowr.cli.resolution import DefaultFlowNameResolver, FlowNameNotFoundError 

14from flowr.cli.session_cmd import ( 

15 add_session_parser, 

16 cmd_session_init, 

17 cmd_session_list, 

18 cmd_session_set_state, 

19 cmd_session_show, 

20) 

21from flowr.domain.condition import evaluate_condition, parse_condition 

22from flowr.domain.flow_definition import Flow, State, Transition 

23from flowr.domain.loader import FlowParseError, load_flow_from_file, resolve_subflows 

24from flowr.domain.session import Session, SessionStackFrame 

25from flowr.domain.validation import validate 

26from flowr.infrastructure.config import ( 

27 FlowrConfig, 

28 resolve_config, 

29 resolve_config_with_sources, 

30) 

31from flowr.infrastructure.session_store import ( 

32 SessionNameNotFoundError, 

33 SessionNotFoundError, 

34 YamlSessionStore, 

35) 

36 

37add_serve_parser: Callable[..., None] | None = None 

38cmd_serve: Callable[..., int] | None = None 

39try: 

40 from flowr.cli.serve import add_serve_parser, cmd_serve 

41 

42 _SERVE_AVAILABLE = True 

43except ImportError: 

44 _SERVE_AVAILABLE = False 

45 

46 

47def build_parser() -> argparse.ArgumentParser: 

48 """Build and return the argument parser. 

49 

50 Returns: 

51 The configured ArgumentParser instance. 

52 """ 

53 meta = importlib.metadata.metadata("flowr") 

54 parser = argparse.ArgumentParser( 

55 prog="flowr", 

56 description=meta["Summary"], 

57 ) 

58 parser.add_argument( 

59 "--version", 

60 action="version", 

61 version=f"flowr {meta['Version']}", 

62 ) 

63 parser.add_argument( 

64 "--flows-dir", 

65 dest="flows_dir", 

66 default=None, 

67 metavar="DIR", 

68 help="Override the configured flows directory for this invocation", 

69 ) 

70 _add_subcommands(parser) 

71 return parser 

72 

73 

74def _add_evidence_args(parser: argparse.ArgumentParser) -> None: 

75 """Add evidence input args.""" 

76 parser.add_argument( 

77 "--evidence", 

78 action="append", 

79 default=[], 

80 metavar="KEY=VALUE", 

81 help="Evidence key=value pairs", 

82 ) 

83 parser.add_argument( 

84 "--evidence-json", 

85 default=None, 

86 metavar="JSON", 

87 help="JSON evidence object", 

88 ) 

89 

90 

91def _parse_evidence(args: argparse.Namespace) -> dict[str, str]: 

92 """Parse evidence from CLI args into a dict. 

93 

94 Returns: 

95 Dictionary mapping evidence keys to their string values. 

96 """ 

97 evidence: dict[str, str] = {} 

98 for pair in args.evidence: 

99 key, _, value = pair.partition("=") 

100 evidence[key] = value 

101 if args.evidence_json is not None: 

102 import json 

103 

104 data = json.loads(args.evidence_json) 

105 for k, v in data.items(): 

106 evidence[k] = str(v) 

107 return evidence 

108 

109 

110def _add_subcommands(parser: argparse.ArgumentParser) -> None: 

111 """Add all CLI subcommands.""" 

112 sub = parser.add_subparsers(dest="command") 

113 

114 # validate 

115 p_validate = sub.add_parser("validate", help="Validate a flow definition") 

116 p_validate.add_argument( 

117 "flow_file", 

118 nargs="?", 

119 default=None, 

120 help="Path to flow YAML file or flow name (required unless --session)", 

121 ) 

122 p_validate.add_argument("--text", action="store_true", dest="text_output") 

123 p_validate.add_argument( 

124 "--session", 

125 nargs="?", 

126 const="__default__", 

127 default=None, 

128 dest="session", 

129 metavar="NAME", 

130 help="Use session name to resolve flow (read-only)", 

131 ) 

132 

133 # states 

134 p_states = sub.add_parser("states", help="List all states in a flow") 

135 p_states.add_argument( 

136 "flow_file", 

137 nargs="?", 

138 default=None, 

139 help="Path to flow YAML file or flow name (required unless --session)", 

140 ) 

141 p_states.add_argument("--text", action="store_true", dest="text_output") 

142 p_states.add_argument( 

143 "--session", 

144 nargs="?", 

145 const="__default__", 

146 default=None, 

147 dest="session", 

148 metavar="NAME", 

149 help="Use session name to resolve flow (read-only)", 

150 ) 

151 

152 # check 

153 p_check = sub.add_parser("check", help="Check a state or transition conditions") 

154 p_check.add_argument( 

155 "flow_file", 

156 nargs="?", 

157 default=None, 

158 help="Path to flow YAML file or flow name (required unless --session)", 

159 ) 

160 p_check.add_argument("--text", action="store_true", dest="text_output") 

161 p_check.add_argument( 

162 "state_id", 

163 nargs="?", 

164 default=None, 

165 help="State id to inspect", 

166 ) 

167 p_check.add_argument( 

168 "target", 

169 nargs="?", 

170 default=None, 

171 help="Target to check conditions for", 

172 ) 

173 p_check.add_argument( 

174 "--session", 

175 nargs="?", 

176 const="__default__", 

177 default=None, 

178 dest="session", 

179 metavar="NAME", 

180 help="Use session name or file path to resolve flow/state (read-only)", 

181 ) 

182 

183 # next 

184 p_next = sub.add_parser("next", help="Show valid next transitions") 

185 p_next.add_argument( 

186 "flow_file", 

187 nargs="?", 

188 default=None, 

189 help="Path to flow YAML file or flow name (required unless --session)", 

190 ) 

191 p_next.add_argument("--text", action="store_true", dest="text_output") 

192 p_next.add_argument("state_id", nargs="?", default=None, help="Current state id") 

193 _add_evidence_args(p_next) 

194 p_next.add_argument( 

195 "--session", 

196 nargs="?", 

197 const="__default__", 

198 default=None, 

199 dest="session", 

200 metavar="NAME", 

201 help="Use session name or file path to resolve flow/state (read-only)", 

202 ) 

203 

204 # transition 

205 p_transition = sub.add_parser("transition", help="Compute next state") 

206 p_transition.add_argument( 

207 "positional", 

208 nargs="*", 

209 help="Args: <flow> <state> <trigger> or <trigger> with --session", 

210 ) 

211 p_transition.add_argument("--text", action="store_true", dest="text_output") 

212 _add_evidence_args(p_transition) 

213 p_transition.add_argument( 

214 "--session", 

215 nargs="?", 

216 const="__default__", 

217 default=None, 

218 dest="session", 

219 metavar="NAME", 

220 help="Use session to resolve flow/state; auto-update after transition", 

221 ) 

222 

223 # config 

224 p_config = sub.add_parser("config", help="Show resolved configuration") 

225 p_config.add_argument( 

226 "--text", 

227 action="store_true", 

228 dest="text_output", 

229 help="Output as human-readable text", 

230 ) 

231 

232 # export 

233 p_export = sub.add_parser( 

234 "export", help="Export a flow definition to various formats" 

235 ) 

236 p_export.add_argument("input_path", help="Path to flow YAML file or directory") 

237 p_export.add_argument( 

238 "--format", 

239 required=True, 

240 dest="export_format", 

241 help="Export format", 

242 ) 

243 p_export.add_argument( 

244 "--output", 

245 "-o", 

246 dest="output_path", 

247 help="Write output to file instead of stdout", 

248 ) 

249 from flowr.exporters.registry import EXPORTERS as EXPORTERS_FOR_ARGS 

250 

251 for _name, adapter in EXPORTERS_FOR_ARGS.items(): 

252 adapter.add_arguments(p_export) 

253 

254 # session 

255 add_session_parser(sub) 

256 

257 if _SERVE_AVAILABLE and add_serve_parser is not None: 

258 add_serve_parser(sub) 

259 

260 

261def _cmd_validate(args: argparse.Namespace) -> int: 

262 """Run validate subcommand. 

263 

264 Returns: 

265 Exit code: 0 if valid, 1 if invalid. 

266 """ 

267 if args.flow_file is None: 

268 _error("flow_file is required when not using --session") 

269 return 2 

270 flow = load_flow_from_file(args.flow_file) 

271 all_flows = resolve_subflows(flow, args.flow_file) 

272 result = validate(flow, all_flows if len(all_flows) > 1 else None) 

273 output: dict[str, Any] = { 

274 "valid": result.is_valid, 

275 "violations": [], 

276 } 

277 for v in result.violations: 

278 output["violations"].append( 

279 { 

280 "severity": v.severity.value, 

281 "message": v.message, 

282 "location": v.location, 

283 } 

284 ) 

285 if args.text_output: 

286 print(format_text(output)) # noqa: T201 

287 else: 

288 print(format_json(output)) # noqa: T201 

289 return 0 if result.is_valid else 1 

290 

291 

292def _cmd_states(args: argparse.Namespace) -> int: 

293 """Run states subcommand. 

294 

295 Returns: 

296 Exit code: 0 on success. 

297 """ 

298 if args.flow_file is None: 

299 _error("flow_file is required when not using --session") 

300 return 2 

301 flow = load_flow_from_file(args.flow_file) 

302 state_ids = [s.id for s in flow.states] 

303 if args.text_output: 

304 for sid in state_ids: 

305 print(sid) # noqa: T201 

306 else: 

307 print(format_json(state_ids)) # noqa: T201 

308 return 0 

309 

310 

311def _cmd_check( 

312 args: argparse.Namespace, 

313) -> int: 

314 """Run check subcommand. 

315 

316 Returns: 

317 Exit code: 0 on success, 1 on error. 

318 """ 

319 if args.flow_file is None: 

320 _error("flow_file is required when not using --session") 

321 return 2 

322 if args.state_id is None: 

323 _error("state_id is required when not using --session") 

324 return 2 

325 flow = load_flow_from_file(args.flow_file) 

326 state = _find_state(flow, args.state_id) 

327 if state is None: 

328 _error(f"State '{args.state_id}' not found") 

329 return 1 

330 if args.target is not None: 

331 return _cmd_check_conditions(flow, state, args) 

332 return _cmd_check_state(flow, state, args) 

333 

334 

335def _cmd_check_state(_flow: Flow, state: State, args: argparse.Namespace) -> int: 

336 """Show state details. 

337 

338 Returns: 

339 Exit code: 0 on success. 

340 """ 

341 output: dict[str, Any] = {"id": state.id} 

342 if state.attrs: 

343 output["attrs"] = state.attrs 

344 if state.flow: 

345 output["flow"] = state.flow 

346 transitions = list(state.next.keys()) 

347 output["transitions"] = transitions 

348 if args.text_output: 

349 print(format_text(output)) # noqa: T201 

350 else: 

351 print(format_json(output)) # noqa: T201 

352 return 0 

353 

354 

355def _cmd_check_conditions(_flow: Flow, state: State, args: argparse.Namespace) -> int: 

356 """Show conditions for a specific transition target. 

357 

358 Returns: 

359 Exit code: 0 on success, 1 if target not found. 

360 """ 

361 transition = state.next.get(args.target) 

362 if transition is None: 

363 _error(f"Transition target '{args.target}' not found in state '{state.id}'") 

364 return 1 

365 output: dict[str, Any] = { 

366 "from": state.id, 

367 "target": args.target, 

368 } 

369 if transition.conditions: 

370 output["conditions"] = transition.conditions.conditions 

371 else: 

372 output["conditions"] = "(none)" 

373 if args.text_output: 

374 print(format_text(output)) # noqa: T201 

375 else: 

376 print(format_json(output)) # noqa: T201 

377 return 0 

378 

379 

380def _cmd_next(args: argparse.Namespace) -> int: 

381 """Run next subcommand. 

382 

383 Returns: 

384 Exit code: 0 on success, 1 if state not found. 

385 """ 

386 if args.flow_file is None: 

387 _error("flow_file is required when not using --session") 

388 return 2 

389 if args.state_id is None: 

390 _error("state_id is required when not using --session") 

391 return 2 

392 flow = load_flow_from_file(args.flow_file) 

393 state = _find_state(flow, args.state_id) 

394 if state is None: 

395 _error(f"State '{args.state_id}' not found") 

396 return 1 

397 evidence = _parse_evidence(args) 

398 transitions = _build_transition_list(state, evidence) 

399 if args.text_output: 

400 print(_format_transitions_text(state.id, transitions)) # noqa: T201 

401 else: 

402 print(format_json({"state": state.id, "transitions": transitions})) # noqa: T201 

403 return 0 

404 

405 

406def _cmd_transition(args: argparse.Namespace) -> int: 

407 """Run transition subcommand. 

408 

409 Returns: 

410 Exit code: 0 on success, 1 on error. 

411 """ 

412 if hasattr(args, "positional") and args.positional: 

413 flow_file = args.flow_file 

414 state_id = args.positional[1] 

415 trigger = args.positional[2] 

416 else: 

417 flow_file = args.flow_file 

418 state_id = args.state_id 

419 trigger = args.trigger 

420 flow = load_flow_from_file(flow_file) 

421 all_flows = resolve_subflows(flow, flow_file) 

422 state = _find_state(flow, state_id) 

423 if state is None: 

424 _error(f"State '{state_id}' not found") 

425 return 1 

426 transition = state.next.get(trigger) 

427 if transition is None: 

428 _error(f"Trigger '{trigger}' not found in state '{state_id}'") 

429 return 1 

430 evidence = _parse_evidence(args) 

431 if transition.conditions and not _conditions_met( 

432 transition.conditions.conditions, evidence 

433 ): 

434 _error(f"Conditions not met for trigger '{trigger}'") 

435 return 1 

436 target = transition.target 

437 target_state = _find_state(flow, target) 

438 if target_state is not None and target_state.flow is not None: 

439 child = _find_subflow(all_flows, target_state.flow, Path(flow_file)) 

440 if child and child.states: 

441 target = f"{child.flow}/{child.states[0].id}" 

442 output: dict[str, Any] = { 

443 "from": state_id, 

444 "trigger": trigger, 

445 "to": target, 

446 } 

447 if args.text_output: 

448 print(format_text(output)) # noqa: T201 

449 else: 

450 print(format_json(output)) # noqa: T201 

451 return 0 

452 

453 

454def _cmd_config(args: argparse.Namespace) -> int: 

455 """Run config subcommand — show resolved configuration with sources. 

456 

457 Returns: 

458 Exit code: 0 on success. 

459 """ 

460 config, sources = resolve_config_with_sources( 

461 cli_overrides={"flows_dir": args.flows_dir} if args.flows_dir else None, 

462 ) 

463 rows = [ 

464 { 

465 "key": "project_root", 

466 "value": _display_path(config.project_root), 

467 "source": sources["project_root"], 

468 }, 

469 { 

470 "key": "flows_dir", 

471 "value": str(config.flows_dir), 

472 "source": sources["flows_dir"], 

473 }, 

474 { 

475 "key": "sessions_dir", 

476 "value": str(config.sessions_dir), 

477 "source": sources["sessions_dir"], 

478 }, 

479 { 

480 "key": "default_flow", 

481 "value": config.default_flow, 

482 "source": sources["default_flow"], 

483 }, 

484 { 

485 "key": "default_session", 

486 "value": config.default_session, 

487 "source": sources["default_session"], 

488 }, 

489 ] 

490 if args.text_output: 

491 for row in rows: 

492 print(f"{row['key']} = {row['value']} ({row['source']})") # noqa: T201 

493 else: 

494 print(format_json(rows)) # noqa: T201 

495 return 0 

496 

497 

498def _extract_adapter_options(args: argparse.Namespace) -> dict: 

499 options: dict = {} 

500 for key, value in vars(args).items(): 

501 if key.startswith("adapter_"): 

502 options[key[len("adapter_") :]] = value 

503 return options 

504 

505 

506def _load_flows_from_directory(dir_path: Path) -> list[tuple[str, Flow]]: 

507 yaml_files = sorted(dir_path.glob("*.yaml")) + sorted(dir_path.glob("*.yml")) 

508 for subdir in sorted(dir_path.iterdir()): 

509 if subdir.is_dir(): 

510 yaml_files.extend( 

511 sorted(subdir.glob("*.yaml")) + sorted(subdir.glob("*.yml")) 

512 ) 

513 flows: list[tuple[str, Flow]] = [] 

514 for yaml_file in yaml_files: 

515 flow = load_flow_from_file(yaml_file) 

516 flows.append((yaml_file.stem, flow)) 

517 return flows 

518 

519 

520def _load_subflows(flow: Flow, search_dir: Path) -> dict[str, Flow]: 

521 subflows: dict[str, Flow] = {} 

522 for state in flow.states: 

523 if state.flow and state.flow not in subflows: 

524 for pattern in ( 

525 f"{state.flow}.yaml", 

526 f"{state.flow}.yml", 

527 f"{state.flow}-flow.yaml", 

528 f"{state.flow}-flow.yml", 

529 ): 

530 candidate = search_dir / pattern 

531 if candidate.exists(): 

532 subflows[state.flow] = load_flow_from_file(candidate) 

533 break 

534 return subflows 

535 

536 

537def _cmd_export(args: argparse.Namespace) -> int: 

538 from flowr.exporters.registry import EXPORTERS as EXPORTERS_REGISTRY 

539 

540 input_path = Path(args.input_path) 

541 if not input_path.exists(): 

542 _error(f"path does not exist: {args.input_path}") 

543 return 1 

544 adapter = EXPORTERS_REGISTRY.get(args.export_format) 

545 if adapter is None: 

546 available = ", ".join(sorted(EXPORTERS_REGISTRY.keys())) 

547 _error(f"unknown format '{args.export_format}'. available: {available}") 

548 return 1 

549 options = _extract_adapter_options(args) 

550 accepted = adapter.accepted_options() 

551 unused = [k for k in options if options[k] and k not in accepted] 

552 if unused: 

553 flag_names = ", ".join(f"--{k.replace('_', '-')}" for k in unused) 

554 print( # noqa: T201 

555 f"warning: unused flags for format '{args.export_format}': {flag_names}", 

556 file=sys.stderr, 

557 ) 

558 if input_path.is_dir(): 

559 flows = _load_flows_from_directory(input_path) 

560 if not flows: 

561 _error(f"no flow files found in directory: {args.input_path}") 

562 return 1 

563 output = adapter.export_directory(flows, options) 

564 else: 

565 flow = load_flow_from_file(input_path) 

566 subflows = _load_subflows(flow, input_path.parent) 

567 output = adapter.export(flow, options, subflows=subflows) 

568 output_path = getattr(args, "output_path", None) 

569 if output_path: 

570 out = Path(output_path) 

571 if out.suffix == ".js" and args.export_format == "json": 

572 var_name = "window.FLOWVIZ_DATA" 

573 output = f"{var_name} = {output};\n" 

574 out.parent.mkdir(parents=True, exist_ok=True) 

575 out.write_text(output, encoding="utf-8") 

576 else: 

577 print(output) # noqa: T201 

578 return 0 

579 

580 

581def _find_state(flow: Flow, state_id: str) -> State | None: 

582 """Find a state by id in a flow. 

583 

584 Returns: 

585 The matching State, or None if not found. 

586 """ 

587 for s in flow.states: 

588 if s.id == state_id: 

589 return s 

590 return None 

591 

592 

593def _find_passing_transitions( 

594 state: State, evidence: dict[str, str] 

595) -> list[Transition]: 

596 """Find transitions whose conditions pass given evidence. 

597 

598 Returns: 

599 List of transitions whose conditions are satisfied. 

600 """ 

601 passing: list[Transition] = [] 

602 for _trigger, transition in state.next.items(): 

603 if transition.conditions is None or _conditions_met( 

604 transition.conditions.conditions, evidence 

605 ): 

606 passing.append(transition) 

607 return passing 

608 

609 

610def _build_transition_list( 

611 state: State, evidence: dict[str, str] 

612) -> list[dict[str, Any]]: 

613 """Build rich transition info for all transitions from a state. 

614 

615 Returns: 

616 List of dicts with trigger, target, status, and conditions. 

617 """ 

618 transitions: list[dict[str, Any]] = [] 

619 for trigger, transition in state.next.items(): 

620 if transition.conditions is None or _conditions_met( 

621 transition.conditions.conditions, evidence 

622 ): 

623 status = "open" 

624 else: 

625 status = "blocked" 

626 transitions.append( 

627 { 

628 "trigger": trigger, 

629 "target": transition.target, 

630 "status": status, 

631 "conditions": transition.conditions.conditions 

632 if transition.conditions 

633 else None, 

634 } 

635 ) 

636 return transitions 

637 

638 

639def _format_transitions_text(state_id: str, transitions: list[dict[str, Any]]) -> str: 

640 """Format transitions as human-readable text.""" 

641 lines = [f"state: {state_id}"] 

642 for t in transitions: 

643 marker = " [blocked]" if t["status"] == "blocked" else "" 

644 cond = "" 

645 if t["status"] == "blocked" and t["conditions"]: 

646 pairs = ", ".join(f"{k}={v}" for k, v in t["conditions"].items()) 

647 cond = f" need: {pairs}" 

648 lines.append(f" {t['trigger']}{t['target']}{marker}{cond}") 

649 return "\n".join(lines) 

650 

651 

652def _conditions_met(conditions: dict[str, str], evidence: dict[str, str]) -> bool: 

653 """Check if all conditions are satisfied by evidence. 

654 

655 Returns: 

656 True if all conditions pass, False otherwise. 

657 """ 

658 for key, cond_str in conditions.items(): 

659 ev = evidence.get(key, "") 

660 op, value = parse_condition(cond_str) 

661 if not evaluate_condition(op, value, ev): 

662 return False 

663 return True 

664 

665 

666def _find_subflow( 

667 all_flows: list[Flow], flow_ref: str, _root_path: Path 

668) -> Flow | None: 

669 """Find a subflow by its flow name from the resolved list. 

670 

671 Returns: 

672 The matching Flow, or None if not found. 

673 """ 

674 for f in all_flows: 

675 if f.flow == Path(flow_ref).stem: 

676 return f 

677 return None 

678 

679 

680def _display_path(path: Path) -> str: 

681 """Display a path relative to cwd, or '.' if same.""" 

682 try: 

683 rel = path.relative_to(Path.cwd()) 

684 return "." if rel == Path() else str(rel) 

685 except ValueError: 

686 return str(path) 

687 

688 

689def _error(msg: str) -> None: 

690 """Print error to stderr.""" 

691 print(f"error: {msg}", file=sys.stderr) # noqa: T201 

692 

693 

694def _resolve_session( 

695 session_name: str, config: FlowrConfig, resolver: DefaultFlowNameResolver 

696) -> tuple[Session, Flow, Path]: 

697 """Load session and resolve its flow. 

698 

699 Returns: 

700 Tuple of (session, flow, flow_path). 

701 """ 

702 store = YamlSessionStore(config.sessions_path()) 

703 try: 

704 session = store.load(session_name) 

705 except (SessionNotFoundError, SessionNameNotFoundError) as exc: 

706 _error(str(exc)) 

707 sys.exit(1) 

708 

709 try: 

710 flow_path = resolver.resolve(session.flow, config.flows_path()) 

711 except FlowNameNotFoundError as exc: 

712 _error(str(exc)) 

713 sys.exit(1) 

714 

715 try: 

716 flow = load_flow_from_file(flow_path) 

717 except FlowParseError as exc: # pragma: no cover 

718 _error(f"invalid flow definition: {exc}") # pragma: no cover 

719 sys.exit(1) # pragma: no cover 

720 

721 return session, flow, flow_path 

722 

723 

724def _find_flow_file(flow_name: str, flows_dir: Path) -> Path | None: 

725 """Find a flow file by name in the flows directory.""" 

726 path = flows_dir / (flow_name + ".yaml") 

727 if path.exists(): 

728 return path 

729 path = flows_dir / flow_name 

730 if path.exists(): 

731 return path 

732 return None 

733 

734 

735def _enter_subflow( 

736 session: Session, 

737 parent_flow: Flow, 

738 parent_flow_path: Path, 

739 target_state_id: str, 

740) -> tuple[Session, str] | None: 

741 """Try to enter a subflow for the given target state. 

742 

743 Returns (updated_session, display_target) if a subflow was entered, 

744 or None if the target is not a subflow entry point. 

745 

746 Recursively enters nested subflows if the child's first state 

747 is itself a subflow wrapper (has a ``flow:`` field). 

748 """ 

749 target_state = _find_state(parent_flow, target_state_id) 

750 if target_state is None or target_state.flow is None: 

751 return None 

752 

753 all_flows = resolve_subflows(parent_flow, parent_flow_path) 

754 child = _find_subflow(all_flows, target_state.flow, parent_flow_path) 

755 if child is None or not child.states: 

756 return None 

757 

758 frame = SessionStackFrame(flow=parent_flow.flow, state=target_state_id) 

759 subflow_initial = child.states[0].id 

760 updated = session.push_stack(frame, subflow_initial, new_flow=child.flow) 

761 display = f"{child.flow}/{subflow_initial}" 

762 

763 child_first = child.states[0] 

764 if child_first.flow is not None: 

765 child_flows = resolve_subflows(child, parent_flow_path) 

766 grandchild = _find_subflow(child_flows, child_first.flow, parent_flow_path) 

767 if grandchild is not None and grandchild.states: 

768 nested_frame = SessionStackFrame(flow=child.flow, state=subflow_initial) 

769 gc_initial = grandchild.states[0].id 

770 nested = updated.push_stack( 

771 nested_frame, gc_initial, new_flow=grandchild.flow 

772 ) 

773 return nested, f"{grandchild.flow}/{gc_initial}" 

774 

775 return updated, display 

776 

777 

778def _resolve_subflow_exit( 

779 session: Session, 

780 trigger: str, 

781 exit_name: str, 

782 flows_dir: Path, 

783) -> tuple[Session, str]: 

784 """Handle subflow exit: resolve parent transition, handle chaining. 

785 

786 When a transition targets an exit name and the session has a stack, 

787 this function resolves the actual target through the parent flow's 

788 transition map and handles entering the next subflow if needed. 

789 """ 

790 parent_frame = session.stack[-1] 

791 parent_flow_path = _find_flow_file(parent_frame.flow, flows_dir) 

792 if parent_flow_path is None: 

793 return session.pop_stack(exit_name), exit_name 

794 

795 parent_flow = load_flow_from_file(parent_flow_path) 

796 parent_state = _find_state(parent_flow, parent_frame.state) 

797 if parent_state is None: 

798 return session.pop_stack(exit_name), exit_name 

799 

800 parent_transition = parent_state.next.get(exit_name) 

801 if parent_transition is None: 

802 return session.pop_stack(exit_name), exit_name 

803 

804 resolved_target = parent_transition.target 

805 

806 popped = session.pop_stack(resolved_target) 

807 

808 entry = _enter_subflow(popped, parent_flow, parent_flow_path, resolved_target) 

809 if entry is not None: 

810 return entry 

811 

812 return popped, resolved_target 

813 

814 

815def _apply_session_transition( 

816 session: Session, 

817 flow: Flow, 

818 flow_path: Path, 

819 trigger: str, 

820 evidence: dict[str, str], 

821 flows_dir: Path | None = None, 

822) -> tuple[Session, str]: 

823 """Apply a transition to a session, handling subflow push/pop. 

824 

825 Returns: 

826 Tuple of (updated_session, target_display). 

827 """ 

828 state = _find_state(flow, session.state) 

829 if state is None: 

830 _error(f"State '{session.state}' not found") 

831 sys.exit(1) 

832 

833 transition = state.next.get(trigger) 

834 if transition is None: 

835 _error(f"Trigger '{trigger}' not found in state '{session.state}'") 

836 sys.exit(1) 

837 

838 if transition.conditions and not _conditions_met( 

839 transition.conditions.conditions, evidence 

840 ): # pragma: no cover 

841 _error(f"Conditions not met for trigger '{trigger}'") # pragma: no cover 

842 sys.exit(1) # pragma: no cover 

843 

844 target = transition.target 

845 

846 # Check if transition enters a subflow 

847 entry = _enter_subflow(session, flow, flow_path, target) 

848 if entry is not None: 

849 return entry 

850 

851 # Check if transition exits a subflow 

852 if session.stack and target in flow.exits: 

853 if flows_dir is not None: 

854 return _resolve_subflow_exit(session, trigger, target, flows_dir) 

855 updated_session = session.pop_stack(target) 

856 return updated_session, target 

857 

858 updated_session = session.with_state(target) 

859 return updated_session, target 

860 

861 

862def _cmd_transition_session( 

863 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

864) -> None: 

865 """Run transition with session-aware flow/state resolution and auto-update.""" 

866 if not args.positional: 

867 _error("trigger is required") 

868 sys.exit(2) 

869 trigger = args.positional[0] 

870 

871 session_name = ( 

872 config.default_session if args.session == "__default__" else args.session 

873 ) 

874 

875 session, flow, flow_path = _resolve_session(session_name, config, resolver) 

876 evidence = _parse_evidence(args) 

877 updated_session, target = _apply_session_transition( 

878 session, flow, flow_path, trigger, evidence, config.flows_path() 

879 ) 

880 

881 store = YamlSessionStore(config.sessions_path()) 

882 store.save(updated_session) 

883 

884 output: dict[str, Any] = { 

885 "from": session.state, 

886 "trigger": trigger, 

887 "to": target, 

888 } 

889 if args.text_output: 

890 print(format_text(output)) # noqa: T201 

891 else: 

892 print(format_json(output)) # noqa: T201 

893 sys.exit(0) 

894 

895 

896def _handle_session( 

897 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

898) -> None: 

899 """Dispatch session subcommands.""" 

900 if args.session_command is None: # pragma: no cover 

901 build_parser().parse_args(["session", "--help"]) 

902 sys.exit(2) # pragma: no cover 

903 

904 handlers = { 

905 "init": cmd_session_init, 

906 "show": cmd_session_show, 

907 "set-state": cmd_session_set_state, 

908 "list": cmd_session_list, 

909 } 

910 handler = handlers.get(args.session_command) 

911 if handler is None: 

912 _error(f"Unknown session command: {args.session_command}") 

913 sys.exit(2) 

914 

915 try: 

916 rc = handler(args, config, resolver) 

917 except FlowParseError as exc: # pragma: no cover 

918 _error(f"invalid flow definition: {exc}") # pragma: no cover 

919 sys.exit(1) # pragma: no cover 

920 sys.exit(rc) 

921 

922 

923def _cmd_check_session( 

924 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

925) -> None: 

926 """Run check with session-aware flow/state resolution (read-only).""" 

927 session_name = ( 

928 config.default_session if args.session == "__default__" else args.session 

929 ) 

930 

931 session, flow, _flow_path = _resolve_session(session_name, config, resolver) 

932 state = _find_state(flow, session.state) 

933 if state is None: 

934 _error(f"State '{session.state}' not found") 

935 sys.exit(1) 

936 

937 effective_target = args.target or getattr(args, "flow_file", None) 

938 if effective_target is not None: 

939 args.target = effective_target 

940 rc = _cmd_check_conditions(flow, state, args) 

941 else: 

942 rc = _cmd_check_state(flow, state, args) 

943 sys.exit(rc) 

944 

945 

946def _cmd_next_session( 

947 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

948) -> None: 

949 """Run next with session-aware flow/state resolution (read-only).""" 

950 session_name = ( 

951 config.default_session if args.session == "__default__" else args.session 

952 ) 

953 

954 session, flow, _flow_path = _resolve_session(session_name, config, resolver) 

955 state = _find_state(flow, session.state) 

956 if state is None: 

957 _error(f"State '{session.state}' not found") 

958 sys.exit(1) 

959 

960 evidence = _parse_evidence(args) 

961 transitions = _build_transition_list(state, evidence) 

962 if args.text_output: 

963 print(_format_transitions_text(state.id, transitions)) # noqa: T201 

964 else: 

965 print(format_json({"state": state.id, "transitions": transitions})) # noqa: T201 

966 sys.exit(0) 

967 

968 

969def _cmd_states_session( 

970 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

971) -> None: 

972 """Run states with session-aware flow resolution (read-only).""" 

973 session_name = ( 

974 config.default_session if args.session == "__default__" else args.session 

975 ) 

976 

977 _session, flow, _flow_path = _resolve_session(session_name, config, resolver) 

978 state_ids = [s.id for s in flow.states] 

979 if args.text_output: 

980 for sid in state_ids: 

981 print(sid) # noqa: T201 

982 else: 

983 print(format_json(state_ids)) # noqa: T201 

984 sys.exit(0) 

985 

986 

987def _cmd_validate_session( 

988 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

989) -> None: 

990 """Run validate with session-aware flow resolution (read-only).""" 

991 session_name = ( 

992 config.default_session if args.session == "__default__" else args.session 

993 ) 

994 

995 _session, flow, flow_path = _resolve_session(session_name, config, resolver) 

996 all_flows = resolve_subflows(flow, flow_path) 

997 result = validate(flow, all_flows if len(all_flows) > 1 else None) 

998 output: dict[str, Any] = { 

999 "valid": result.is_valid, 

1000 "violations": [], 

1001 } 

1002 for v in result.violations: 

1003 output["violations"].append( 

1004 { 

1005 "severity": v.severity.value, 

1006 "message": v.message, 

1007 "location": v.location, 

1008 } 

1009 ) 

1010 if args.text_output: 

1011 print(format_text(output)) # noqa: T201 

1012 else: 

1013 print(format_json(output)) # noqa: T201 

1014 sys.exit(0 if result.is_valid else 1) 

1015 

1016 

1017def _resolve_flow_for_command( 

1018 args: argparse.Namespace, 

1019 config: FlowrConfig, 

1020 resolver: DefaultFlowNameResolver, 

1021) -> None: 

1022 """Resolve flow_file for non-session, non-transition-session commands.""" 

1023 flows_dir = config.flows_path() 

1024 if args.command == "transition": 

1025 if len(args.positional) < 3: 

1026 _error("transition requires <flow> <state> <trigger>") 

1027 sys.exit(2) 

1028 flow_file_arg = args.positional[0] 

1029 try: 

1030 args.flow_file = resolver.resolve(flow_file_arg, flows_dir) 

1031 except FlowNameNotFoundError as exc: 

1032 _error(str(exc)) 

1033 sys.exit(1) 

1034 else: 

1035 try: 

1036 args.flow_file = resolver.resolve(args.flow_file, flows_dir) 

1037 except FlowNameNotFoundError as exc: 

1038 _error(str(exc)) 

1039 sys.exit(1) 

1040 

1041 

1042_SESSION_COMMANDS = { 

1043 "transition": "_cmd_transition_session", 

1044 "check": "_cmd_check_session", 

1045 "next": "_cmd_next_session", 

1046 "states": "_cmd_states_session", 

1047 "validate": "_cmd_validate_session", 

1048} 

1049 

1050 

1051def _dispatch_session_command( 

1052 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

1053) -> bool: 

1054 """Handle session-aware command dispatch. Returns True if handled.""" 

1055 if getattr(args, "session", None) is None: 

1056 return False 

1057 handler_name = _SESSION_COMMANDS.get(args.command) 

1058 if handler_name is None: 

1059 return False 

1060 handler = globals()[handler_name] 

1061 handler(args, config, resolver) 

1062 return True 

1063 

1064 

1065def _run_command( 

1066 handler: Callable[[argparse.Namespace], int], 

1067 args: argparse.Namespace, 

1068) -> None: 

1069 """Run a command handler with unified error handling.""" 

1070 try: 

1071 sys.exit(handler(args)) 

1072 except yaml.YAMLError as exc: 

1073 _error(f"malformed YAML: {str(exc).splitlines()[0]}") 

1074 sys.exit(1) 

1075 except FlowParseError as exc: 

1076 _error(f"invalid flow definition: {exc}") 

1077 sys.exit(1) 

1078 

1079 

1080def _run_export(args: argparse.Namespace) -> None: 

1081 """Run the export command with unified error handling.""" 

1082 try: 

1083 sys.exit(_cmd_export(args)) 

1084 except yaml.YAMLError as exc: 

1085 _error(f"malformed YAML: {str(exc).splitlines()[0]}") 

1086 sys.exit(1) 

1087 except FlowParseError as exc: 

1088 _error(f"invalid flow definition: {exc}") 

1089 sys.exit(1) 

1090 

1091 

1092def main() -> None: 

1093 """Run the application.""" 

1094 args = build_parser().parse_args() 

1095 if args.command is None: # pragma: no cover 

1096 build_parser().print_help() 

1097 sys.exit(2) 

1098 

1099 resolver = DefaultFlowNameResolver() 

1100 config = resolve_config() 

1101 if args.flows_dir is not None: 

1102 config = resolve_config(cli_overrides={"flows_dir": args.flows_dir}) 

1103 

1104 if args.command == "session": # pragma: no cover 

1105 _handle_session(args, config, resolver) 

1106 return # pragma: no cover 

1107 

1108 if args.command == "config": # pragma: no cover 

1109 rc = _cmd_config(args) 

1110 sys.exit(rc) # pragma: no cover 

1111 

1112 if args.command == "export": 

1113 _run_export(args) 

1114 return # pragma: no cover 

1115 

1116 if _dispatch_session_command(args, config, resolver): 

1117 return 

1118 

1119 if args.command == "serve" and cmd_serve is not None: 

1120 rc = cmd_serve(args) 

1121 sys.exit(rc) 

1122 

1123 _resolve_flow_for_command(args, config, resolver) 

1124 

1125 cmd_map = { 

1126 "validate": _cmd_validate, 

1127 "states": _cmd_states, 

1128 "check": _cmd_check, 

1129 "next": _cmd_next, 

1130 "transition": _cmd_transition, 

1131 "config": _cmd_config, 

1132 } 

1133 handler = cmd_map.get(args.command) 

1134 if handler is None: # pragma: no cover 

1135 _error(f"Unknown command: {args.command}") 

1136 sys.exit(2) 

1137 _run_command(handler, args) 

1138 

1139 

1140if __name__ == "__main__": 

1141 main()