Coverage for flowr / __main__.py: 100%

340 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 18:23 +0000

1"""CLI entrypoint for flowr — invoked via `python -m flowr`.""" 

2 

3import argparse 

4import importlib.metadata 

5import sys 

6from pathlib import Path 

7from typing import Any 

8 

9from flowr.cli.output import format_json, format_text 

10from flowr.cli.resolution import DefaultFlowNameResolver, FlowNameNotFoundError 

11from flowr.cli.session_cmd import ( 

12 add_session_parser, 

13 cmd_session_init, 

14 cmd_session_list, 

15 cmd_session_set_state, 

16 cmd_session_show, 

17) 

18from flowr.domain.condition import evaluate_condition, parse_condition 

19from flowr.domain.flow_definition import Flow, State, Transition 

20from flowr.domain.loader import FlowParseError, load_flow_from_file, resolve_subflows 

21from flowr.domain.mermaid import to_mermaid 

22from flowr.domain.session import Session, SessionStackFrame 

23from flowr.domain.validation import validate 

24from flowr.infrastructure.config import ( 

25 FlowrConfig, 

26 resolve_config, 

27 resolve_config_with_sources, 

28) 

29from flowr.infrastructure.session_store import ( 

30 SessionNameNotFoundError, 

31 SessionNotFoundError, 

32 YamlSessionStore, 

33) 

34 

35 

36def build_parser() -> argparse.ArgumentParser: 

37 """Build and return the argument parser. 

38 

39 Returns: 

40 The configured ArgumentParser instance. 

41 """ 

42 meta = importlib.metadata.metadata("flowr") 

43 parser = argparse.ArgumentParser( 

44 prog="flowr", 

45 description=meta["Summary"], 

46 ) 

47 parser.add_argument( 

48 "--version", 

49 action="version", 

50 version=f"flowr {meta['Version']}", 

51 ) 

52 parser.add_argument( 

53 "--flows-dir", 

54 dest="flows_dir", 

55 default=None, 

56 metavar="DIR", 

57 help="Override the configured flows directory for this invocation", 

58 ) 

59 _add_subcommands(parser) 

60 return parser 

61 

62 

63def _add_flow_args(parser: argparse.ArgumentParser) -> None: 

64 """Add common args: flow file path and --json flag.""" 

65 parser.add_argument("flow_file", help="Path to flow YAML file or flow name") 

66 parser.add_argument("--json", action="store_true", dest="json_output") 

67 

68 

69def _add_evidence_args(parser: argparse.ArgumentParser) -> None: 

70 """Add evidence input args.""" 

71 parser.add_argument( 

72 "--evidence", 

73 action="append", 

74 default=[], 

75 metavar="KEY=VALUE", 

76 help="Evidence key=value pairs", 

77 ) 

78 parser.add_argument( 

79 "--evidence-json", 

80 default=None, 

81 metavar="JSON", 

82 help="JSON evidence object", 

83 ) 

84 

85 

86def _parse_evidence(args: argparse.Namespace) -> dict[str, str]: 

87 """Parse evidence from CLI args into a dict. 

88 

89 Returns: 

90 Dictionary mapping evidence keys to their string values. 

91 """ 

92 evidence: dict[str, str] = {} 

93 for pair in args.evidence: 

94 key, _, value = pair.partition("=") 

95 evidence[key] = value 

96 if args.evidence_json is not None: 

97 import json 

98 

99 data = json.loads(args.evidence_json) 

100 for k, v in data.items(): 

101 evidence[k] = str(v) 

102 return evidence 

103 

104 

105def _add_subcommands(parser: argparse.ArgumentParser) -> None: 

106 """Add all CLI subcommands.""" 

107 sub = parser.add_subparsers(dest="command") 

108 

109 # validate 

110 p_validate = sub.add_parser("validate", help="Validate a flow definition") 

111 _add_flow_args(p_validate) 

112 

113 # states 

114 p_states = sub.add_parser("states", help="List all states in a flow") 

115 _add_flow_args(p_states) 

116 

117 # check 

118 p_check = sub.add_parser("check", help="Check a state or transition conditions") 

119 p_check.add_argument( 

120 "flow_file", 

121 nargs="?", 

122 default=None, 

123 help="Path to flow YAML file or flow name (required unless --session)", 

124 ) 

125 p_check.add_argument("--json", action="store_true", dest="json_output") 

126 p_check.add_argument( 

127 "state_id", 

128 nargs="?", 

129 default=None, 

130 help="State id to inspect", 

131 ) 

132 p_check.add_argument( 

133 "target", 

134 nargs="?", 

135 default=None, 

136 help="Target to check conditions for", 

137 ) 

138 p_check.add_argument( 

139 "--session", 

140 nargs="?", 

141 const="__default__", 

142 default=None, 

143 dest="session", 

144 metavar="NAME", 

145 help="Use session name or file path to resolve flow/state (read-only)", 

146 ) 

147 

148 # next 

149 p_next = sub.add_parser("next", help="Show valid next transitions") 

150 p_next.add_argument( 

151 "flow_file", 

152 nargs="?", 

153 default=None, 

154 help="Path to flow YAML file or flow name (required unless --session)", 

155 ) 

156 p_next.add_argument("--json", action="store_true", dest="json_output") 

157 p_next.add_argument("state_id", nargs="?", default=None, help="Current state id") 

158 _add_evidence_args(p_next) 

159 p_next.add_argument( 

160 "--session", 

161 nargs="?", 

162 const="__default__", 

163 default=None, 

164 dest="session", 

165 metavar="NAME", 

166 help="Use session name or file path to resolve flow/state (read-only)", 

167 ) 

168 

169 # transition 

170 p_transition = sub.add_parser("transition", help="Compute next state") 

171 p_transition.add_argument( 

172 "positional", 

173 nargs="*", 

174 help="Args: <flow> <state> <trigger> or <trigger> with --session", 

175 ) 

176 p_transition.add_argument("--json", action="store_true", dest="json_output") 

177 _add_evidence_args(p_transition) 

178 p_transition.add_argument( 

179 "--session", 

180 nargs="?", 

181 const="__default__", 

182 default=None, 

183 dest="session", 

184 metavar="NAME", 

185 help="Use session to resolve flow/state; auto-update after transition", 

186 ) 

187 

188 # config 

189 p_config = sub.add_parser("config", help="Show resolved configuration") 

190 p_config.add_argument( 

191 "--json", 

192 action="store_true", 

193 dest="json_output", 

194 help="Output as JSON", 

195 ) 

196 

197 # mermaid 

198 p_mermaid = sub.add_parser("mermaid", help="Export as Mermaid diagram") 

199 _add_flow_args(p_mermaid) 

200 

201 # session 

202 add_session_parser(sub) 

203 

204 

205def _cmd_validate(args: argparse.Namespace) -> int: 

206 """Run validate subcommand. 

207 

208 Returns: 

209 Exit code: 0 if valid, 1 if invalid. 

210 """ 

211 flow = load_flow_from_file(args.flow_file) 

212 all_flows = resolve_subflows(flow, args.flow_file) 

213 result = validate(flow, all_flows if len(all_flows) > 1 else None) 

214 output: dict[str, Any] = { 

215 "valid": result.is_valid, 

216 "violations": [], 

217 } 

218 for v in result.violations: 

219 output["violations"].append( 

220 { 

221 "severity": v.severity.value, 

222 "message": v.message, 

223 "location": v.location, 

224 } 

225 ) 

226 if args.json_output: 

227 print(format_json(output)) # noqa: T201 

228 else: 

229 print(format_text(output)) # noqa: T201 

230 return 0 if result.is_valid else 1 

231 

232 

233def _cmd_states(args: argparse.Namespace) -> int: 

234 """Run states subcommand. 

235 

236 Returns: 

237 Exit code: 0 on success. 

238 """ 

239 flow = load_flow_from_file(args.flow_file) 

240 state_ids = [s.id for s in flow.states] 

241 if args.json_output: 

242 print(format_json(state_ids)) # noqa: T201 

243 else: 

244 for sid in state_ids: 

245 print(sid) # noqa: T201 

246 return 0 

247 

248 

249def _cmd_check( 

250 args: argparse.Namespace, 

251) -> int: 

252 """Run check subcommand. 

253 

254 Returns: 

255 Exit code: 0 on success, 1 on error. 

256 """ 

257 if args.flow_file is None: 

258 _error("flow_file is required when not using --session") 

259 return 2 

260 if args.state_id is None: 

261 _error("state_id is required when not using --session") 

262 return 2 

263 flow = load_flow_from_file(args.flow_file) 

264 state = _find_state(flow, args.state_id) 

265 if state is None: 

266 _error(f"State '{args.state_id}' not found") 

267 return 1 

268 if args.target is not None: 

269 return _cmd_check_conditions(flow, state, args) 

270 return _cmd_check_state(flow, state, args) 

271 

272 

273def _cmd_check_state(_flow: Flow, state: State, args: argparse.Namespace) -> int: 

274 """Show state details. 

275 

276 Returns: 

277 Exit code: 0 on success. 

278 """ 

279 output: dict[str, Any] = {"id": state.id} 

280 if state.attrs: 

281 output["attrs"] = state.attrs 

282 if state.flow: 

283 output["flow"] = state.flow 

284 transitions = list(state.next.keys()) 

285 output["transitions"] = transitions 

286 if args.json_output: 

287 print(format_json(output)) # noqa: T201 

288 else: 

289 print(format_text(output)) # noqa: T201 

290 return 0 

291 

292 

293def _cmd_check_conditions(_flow: Flow, state: State, args: argparse.Namespace) -> int: 

294 """Show conditions for a specific transition target. 

295 

296 Returns: 

297 Exit code: 0 on success, 1 if target not found. 

298 """ 

299 transition = state.next.get(args.target) 

300 if transition is None: 

301 _error(f"Transition target '{args.target}' not found in state '{state.id}'") 

302 return 1 

303 output: dict[str, Any] = { 

304 "from": state.id, 

305 "target": args.target, 

306 } 

307 if transition.conditions: 

308 output["conditions"] = transition.conditions.conditions 

309 else: 

310 output["conditions"] = "(none)" 

311 if args.json_output: 

312 print(format_json(output)) # noqa: T201 

313 else: 

314 print(format_text(output)) # noqa: T201 

315 return 0 

316 

317 

318def _cmd_next(args: argparse.Namespace) -> int: 

319 """Run next subcommand. 

320 

321 Returns: 

322 Exit code: 0 on success, 1 if state not found. 

323 """ 

324 if args.flow_file is None: 

325 _error("flow_file is required when not using --session") 

326 return 2 

327 if args.state_id is None: 

328 _error("state_id is required when not using --session") 

329 return 2 

330 flow = load_flow_from_file(args.flow_file) 

331 state = _find_state(flow, args.state_id) 

332 if state is None: 

333 _error(f"State '{args.state_id}' not found") 

334 return 1 

335 evidence = _parse_evidence(args) 

336 passing = _find_passing_transitions(state, evidence) 

337 output: dict[str, Any] = { 

338 "state": state.id, 

339 "next": [t.target for t in passing], 

340 } 

341 if args.json_output: 

342 print(format_json(output)) # noqa: T201 

343 else: 

344 print(format_text(output)) # noqa: T201 

345 return 0 

346 

347 

348def _cmd_transition(args: argparse.Namespace) -> int: 

349 """Run transition subcommand. 

350 

351 Returns: 

352 Exit code: 0 on success, 1 on error. 

353 """ 

354 if hasattr(args, "positional") and args.positional: 

355 flow_file = args.flow_file 

356 state_id = args.positional[1] 

357 trigger = args.positional[2] 

358 else: 

359 flow_file = args.flow_file 

360 state_id = args.state_id 

361 trigger = args.trigger 

362 flow = load_flow_from_file(flow_file) 

363 all_flows = resolve_subflows(flow, flow_file) 

364 state = _find_state(flow, state_id) 

365 if state is None: 

366 _error(f"State '{state_id}' not found") 

367 return 1 

368 transition = state.next.get(trigger) 

369 if transition is None: 

370 _error(f"Trigger '{trigger}' not found in state '{state_id}'") 

371 return 1 

372 evidence = _parse_evidence(args) 

373 if transition.conditions and not _conditions_met( 

374 transition.conditions.conditions, evidence 

375 ): 

376 _error(f"Conditions not met for trigger '{trigger}'") 

377 return 1 

378 target = transition.target 

379 target_state = _find_state(flow, target) 

380 if target_state is not None and target_state.flow is not None: 

381 child = _find_subflow(all_flows, target_state.flow, Path(flow_file)) 

382 if child and child.states: 

383 target = f"{child.flow}/{child.states[0].id}" 

384 output: dict[str, Any] = { 

385 "from": state_id, 

386 "trigger": trigger, 

387 "to": target, 

388 } 

389 if args.json_output: 

390 print(format_json(output)) # noqa: T201 

391 else: 

392 print(format_text(output)) # noqa: T201 

393 return 0 

394 

395 

396def _cmd_config(args: argparse.Namespace) -> int: 

397 """Run config subcommand — show resolved configuration with sources. 

398 

399 Returns: 

400 Exit code: 0 on success. 

401 """ 

402 config, sources = resolve_config_with_sources( 

403 cli_overrides={"flows_dir": args.flows_dir} if args.flows_dir else None, 

404 ) 

405 rows = [ 

406 { 

407 "key": "project_root", 

408 "value": str(config.project_root), 

409 "source": sources["project_root"], 

410 }, 

411 { 

412 "key": "flows_dir", 

413 "value": str(config.flows_dir), 

414 "source": sources["flows_dir"], 

415 }, 

416 { 

417 "key": "sessions_dir", 

418 "value": str(config.sessions_dir), 

419 "source": sources["sessions_dir"], 

420 }, 

421 { 

422 "key": "default_flow", 

423 "value": config.default_flow, 

424 "source": sources["default_flow"], 

425 }, 

426 { 

427 "key": "default_session", 

428 "value": config.default_session, 

429 "source": sources["default_session"], 

430 }, 

431 ] 

432 if args.json_output: 

433 print(format_json(rows)) # noqa: T201 

434 else: 

435 for row in rows: 

436 print(f"{row['key']} = {row['value']} ({row['source']})") # noqa: T201 

437 return 0 

438 

439 

440def _cmd_mermaid(args: argparse.Namespace) -> int: 

441 """Run mermaid subcommand. 

442 

443 Returns: 

444 Exit code: 0 on success. 

445 """ 

446 flow = load_flow_from_file(args.flow_file) 

447 diagram = to_mermaid(flow) 

448 if args.json_output: 

449 print(format_json({"mermaid": diagram})) # noqa: T201 

450 else: 

451 print(diagram) # noqa: T201 

452 return 0 

453 

454 

455def _find_state(flow: Flow, state_id: str) -> State | None: 

456 """Find a state by id in a flow. 

457 

458 Returns: 

459 The matching State, or None if not found. 

460 """ 

461 for s in flow.states: 

462 if s.id == state_id: 

463 return s 

464 return None 

465 

466 

467def _find_passing_transitions( 

468 state: State, evidence: dict[str, str] 

469) -> list[Transition]: 

470 """Find transitions whose conditions pass given evidence. 

471 

472 Returns: 

473 List of transitions whose conditions are satisfied. 

474 """ 

475 passing: list[Transition] = [] 

476 for _trigger, transition in state.next.items(): 

477 if transition.conditions is None or _conditions_met( 

478 transition.conditions.conditions, evidence 

479 ): 

480 passing.append(transition) 

481 return passing 

482 

483 

484def _conditions_met(conditions: dict[str, str], evidence: dict[str, str]) -> bool: 

485 """Check if all conditions are satisfied by evidence. 

486 

487 Returns: 

488 True if all conditions pass, False otherwise. 

489 """ 

490 for key, cond_str in conditions.items(): 

491 ev = evidence.get(key, "") 

492 op, value = parse_condition(cond_str) 

493 if not evaluate_condition(op, value, ev): 

494 return False 

495 return True 

496 

497 

498def _find_subflow( 

499 all_flows: list[Flow], flow_ref: str, _root_path: Path 

500) -> Flow | None: 

501 """Find a subflow by its flow name from the resolved list. 

502 

503 Returns: 

504 The matching Flow, or None if not found. 

505 """ 

506 for f in all_flows: 

507 if f.flow == Path(flow_ref).stem: 

508 return f 

509 return None 

510 

511 

512def _error(msg: str) -> None: 

513 """Print error to stderr.""" 

514 print(f"error: {msg}", file=sys.stderr) # noqa: T201 

515 

516 

517def _resolve_session( 

518 session_name: str, config: FlowrConfig, resolver: DefaultFlowNameResolver 

519) -> tuple[Session, Flow, Path]: 

520 """Load session and resolve its flow. 

521 

522 Returns: 

523 Tuple of (session, flow, flow_path). 

524 """ 

525 store = YamlSessionStore(config.sessions_path()) 

526 try: 

527 session = store.load(session_name) 

528 except (SessionNotFoundError, SessionNameNotFoundError) as exc: 

529 _error(str(exc)) 

530 sys.exit(1) 

531 

532 try: 

533 flow_path = resolver.resolve(session.flow, config.flows_path()) 

534 except FlowNameNotFoundError as exc: 

535 _error(str(exc)) 

536 sys.exit(1) 

537 

538 try: 

539 flow = load_flow_from_file(flow_path) 

540 except FlowParseError as exc: # pragma: no cover 

541 _error(f"invalid flow definition: {exc}") # pragma: no cover 

542 sys.exit(1) # pragma: no cover 

543 

544 return session, flow, flow_path 

545 

546 

547def _apply_session_transition( 

548 session: Session, 

549 flow: Flow, 

550 flow_path: Path, 

551 trigger: str, 

552 evidence: dict[str, str], 

553) -> tuple[Session, str]: 

554 """Apply a transition to a session, handling subflow push/pop. 

555 

556 Returns: 

557 Tuple of (updated_session, target_display). 

558 """ 

559 state = _find_state(flow, session.state) 

560 if state is None: 

561 _error(f"State '{session.state}' not found") 

562 sys.exit(1) 

563 

564 transition = state.next.get(trigger) 

565 if transition is None: 

566 _error(f"Trigger '{trigger}' not found in state '{session.state}'") 

567 sys.exit(1) 

568 

569 if transition.conditions and not _conditions_met( 

570 transition.conditions.conditions, evidence 

571 ): # pragma: no cover 

572 _error(f"Conditions not met for trigger '{trigger}'") # pragma: no cover 

573 sys.exit(1) # pragma: no cover 

574 

575 target = transition.target 

576 all_flows = resolve_subflows(flow, flow_path) 

577 target_state = _find_state(flow, target) 

578 

579 # Check if transition enters a subflow 

580 enters_subflow = target_state is not None and target_state.flow is not None 

581 

582 if enters_subflow and target_state is not None and target_state.flow is not None: 

583 flow_ref = target_state.flow 

584 child = _find_subflow(all_flows, flow_ref, flow_path) 

585 if child and child.states: 

586 frame = SessionStackFrame(flow=session.flow, state=session.state) 

587 subflow_initial = child.states[0].id 

588 updated_session = session.push_stack( 

589 frame, subflow_initial, new_flow=child.flow 

590 ) 

591 target = f"{child.flow}/{subflow_initial}" 

592 else: # pragma: no cover 

593 updated_session = session.with_state(target) # pragma: no cover 

594 elif session.stack and target in flow.exits: 

595 # Transition exits a subflow 

596 updated_session = session.pop_stack(target) 

597 else: 

598 updated_session = session.with_state(target) 

599 

600 return updated_session, target 

601 

602 

603def _cmd_transition_session( 

604 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

605) -> None: 

606 """Run transition with session-aware flow/state resolution and auto-update.""" 

607 if not args.positional: 

608 _error("trigger is required") 

609 sys.exit(2) 

610 trigger = args.positional[0] 

611 

612 session_name = ( 

613 config.default_session if args.session == "__default__" else args.session 

614 ) 

615 

616 session, flow, flow_path = _resolve_session(session_name, config, resolver) 

617 evidence = _parse_evidence(args) 

618 updated_session, target = _apply_session_transition( 

619 session, flow, flow_path, trigger, evidence 

620 ) 

621 

622 store = YamlSessionStore(config.sessions_path()) 

623 store.save(updated_session) 

624 

625 output: dict[str, Any] = { 

626 "from": session.state, 

627 "trigger": trigger, 

628 "to": target, 

629 } 

630 if args.json_output: 

631 print(format_json(output)) # noqa: T201 

632 else: 

633 print(format_text(output)) # noqa: T201 

634 sys.exit(0) 

635 

636 

637def _handle_session( 

638 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

639) -> None: 

640 """Dispatch session subcommands.""" 

641 if args.session_command is None: # pragma: no cover 

642 build_parser().parse_args(["session", "--help"]) 

643 sys.exit(2) # pragma: no cover 

644 

645 handlers = { 

646 "init": cmd_session_init, 

647 "show": cmd_session_show, 

648 "set-state": cmd_session_set_state, 

649 "list": cmd_session_list, 

650 } 

651 handler = handlers.get(args.session_command) 

652 if handler is None: 

653 _error(f"Unknown session command: {args.session_command}") 

654 sys.exit(2) 

655 

656 try: 

657 rc = handler(args, config, resolver) 

658 except FlowParseError as exc: # pragma: no cover 

659 _error(f"invalid flow definition: {exc}") # pragma: no cover 

660 sys.exit(1) # pragma: no cover 

661 sys.exit(rc) 

662 

663 

664def _cmd_check_session( 

665 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

666) -> None: 

667 """Run check with session-aware flow/state resolution (read-only).""" 

668 session_name = ( 

669 config.default_session if args.session == "__default__" else args.session 

670 ) 

671 

672 session, flow, _flow_path = _resolve_session(session_name, config, resolver) 

673 state = _find_state(flow, session.state) 

674 if state is None: 

675 _error(f"State '{session.state}' not found") 

676 sys.exit(1) 

677 

678 if args.target is not None: 

679 rc = _cmd_check_conditions(flow, state, args) 

680 else: 

681 rc = _cmd_check_state(flow, state, args) 

682 sys.exit(rc) 

683 

684 

685def _cmd_next_session( 

686 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

687) -> None: 

688 """Run next with session-aware flow/state resolution (read-only).""" 

689 session_name = ( 

690 config.default_session if args.session == "__default__" else args.session 

691 ) 

692 

693 session, flow, _flow_path = _resolve_session(session_name, config, resolver) 

694 state = _find_state(flow, session.state) 

695 if state is None: 

696 _error(f"State '{session.state}' not found") 

697 sys.exit(1) 

698 

699 evidence = _parse_evidence(args) 

700 passing = _find_passing_transitions(state, evidence) 

701 output: dict[str, Any] = { 

702 "state": state.id, 

703 "next": [t.target for t in passing], 

704 } 

705 if args.json_output: 

706 print(format_json(output)) # noqa: T201 

707 else: 

708 print(format_text(output)) # noqa: T201 

709 sys.exit(0) 

710 

711 

712def _resolve_flow_for_command( 

713 args: argparse.Namespace, 

714 config: FlowrConfig, 

715 resolver: DefaultFlowNameResolver, 

716) -> None: 

717 """Resolve flow_file for non-session, non-transition-session commands.""" 

718 flows_dir = config.flows_path() 

719 if args.command == "transition": 

720 if len(args.positional) < 3: 

721 _error("transition requires <flow> <state> <trigger>") 

722 sys.exit(2) 

723 flow_file_arg = args.positional[0] 

724 try: 

725 args.flow_file = resolver.resolve(flow_file_arg, flows_dir) 

726 except FlowNameNotFoundError as exc: 

727 _error(str(exc)) 

728 sys.exit(1) 

729 else: 

730 try: 

731 args.flow_file = resolver.resolve(args.flow_file, flows_dir) 

732 except FlowNameNotFoundError as exc: 

733 _error(str(exc)) 

734 sys.exit(1) 

735 

736 

737def main() -> None: 

738 """Run the application.""" 

739 args = build_parser().parse_args() 

740 if args.command is None: # pragma: no cover 

741 build_parser().print_help() 

742 sys.exit(2) 

743 

744 resolver = DefaultFlowNameResolver() 

745 config = resolve_config() 

746 if args.flows_dir is not None: 

747 config = resolve_config(cli_overrides={"flows_dir": args.flows_dir}) 

748 

749 if args.command == "session": # pragma: no cover 

750 _handle_session(args, config, resolver) 

751 return # pragma: no cover 

752 

753 if args.command == "config": # pragma: no cover 

754 rc = _cmd_config(args) 

755 sys.exit(rc) # pragma: no cover 

756 

757 if args.command == "transition" and args.session is not None: # pragma: no cover 

758 _cmd_transition_session(args, config, resolver) 

759 return # pragma: no cover 

760 

761 if args.command == "check" and args.session is not None: # pragma: no cover 

762 _cmd_check_session(args, config, resolver) 

763 return # pragma: no cover 

764 

765 if args.command == "next" and args.session is not None: # pragma: no cover 

766 _cmd_next_session(args, config, resolver) 

767 return # pragma: no cover 

768 

769 _resolve_flow_for_command(args, config, resolver) 

770 

771 cmd_map = { 

772 "validate": _cmd_validate, 

773 "states": _cmd_states, 

774 "check": _cmd_check, 

775 "next": _cmd_next, 

776 "transition": _cmd_transition, 

777 "mermaid": _cmd_mermaid, 

778 "config": _cmd_config, 

779 } 

780 handler = cmd_map.get(args.command) 

781 if handler is None: # pragma: no cover 

782 _error(f"Unknown command: {args.command}") 

783 sys.exit(2) 

784 try: 

785 sys.exit(handler(args)) 

786 except FlowParseError as exc: 

787 _error(f"invalid flow definition: {exc}") 

788 sys.exit(1) 

789 

790 

791if __name__ == "__main__": 

792 main()