Coverage for flowr / cli / session_cmd.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 18:23 +0000

1"""Session subcommand group: init, show, set-state. 

2 

3Parses CLI args, dispatches to domain/infrastructure, formats output. 

4""" 

5 

6import argparse 

7import sys 

8from typing import NoReturn 

9 

10from flowr.cli.output import format_json, format_text 

11from flowr.cli.resolution import DefaultFlowNameResolver, FlowNameNotFoundError 

12from flowr.domain.loader import load_flow_from_file 

13from flowr.infrastructure.config import FlowrConfig 

14from flowr.infrastructure.session_store import ( 

15 SessionAlreadyExistsError, 

16 SessionNameNotFoundError, 

17 SessionNotFoundError, 

18 YamlSessionStore, 

19) 

20 

21 

22def add_session_parser(sub: argparse._SubParsersAction) -> None: 

23 """Add the session subcommand group to the argument parser.""" 

24 session_parser = sub.add_parser("session", help="Manage workflow sessions") 

25 session_sub = session_parser.add_subparsers(dest="session_command") 

26 

27 # session init 

28 p_init = session_sub.add_parser("init", help="Initialize a new session") 

29 p_init.add_argument("flow", help="Flow name or file path") 

30 p_init.add_argument( 

31 "--name", default=None, help="Session name or file path (default: from config)" 

32 ) 

33 

34 # session show 

35 p_show = session_sub.add_parser("show", help="Show current session state") 

36 p_show.add_argument( 

37 "--name", default=None, help="Session name or file path (default: from config)" 

38 ) 

39 p_show.add_argument( 

40 "--format", 

41 choices=["yaml", "json"], 

42 default="yaml", 

43 dest="output_format", 

44 help="Output format (default: yaml)", 

45 ) 

46 

47 # session set-state 

48 p_set = session_sub.add_parser("set-state", help="Update the current session state") 

49 p_set.add_argument("state", help="New state ID") 

50 p_set.add_argument( 

51 "--name", default=None, help="Session name or file path (default: from config)" 

52 ) 

53 

54 # session list 

55 p_list = session_sub.add_parser("list", help="List all sessions") 

56 p_list.add_argument( 

57 "--format", 

58 choices=["yaml", "json"], 

59 default="yaml", 

60 dest="output_format", 

61 help="Output format (default: yaml)", 

62 ) 

63 

64 

65def _error(msg: str) -> NoReturn: 

66 """Print error to stderr and exit with code 1.""" 

67 print(f"error: {msg}", file=sys.stderr) # noqa: T201 

68 sys.exit(1) 

69 

70 

71def cmd_session_init( 

72 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

73) -> int: 

74 """Run session init subcommand. 

75 

76 Returns: 

77 Exit code: 0 on success, 1 on error. 

78 """ 

79 flows_dir = config.flows_path() 

80 sessions_dir = config.sessions_path() 

81 name = args.name or config.default_session 

82 

83 try: 

84 flow_path = resolver.resolve(args.flow, flows_dir) 

85 except FlowNameNotFoundError as exc: 

86 _error(str(exc)) 

87 

88 store = YamlSessionStore(sessions_dir) 

89 

90 try: 

91 session = store.init(flow_path, name) 

92 except SessionAlreadyExistsError as exc: 

93 _error(str(exc)) 

94 

95 output = { 

96 "flow": session.flow, 

97 "state": session.state, 

98 "name": session.name, 

99 "created_at": session.created_at, 

100 } 

101 print(format_text(output)) # noqa: T201 

102 return 0 

103 

104 

105def cmd_session_show( 

106 args: argparse.Namespace, config: FlowrConfig, _resolver: DefaultFlowNameResolver 

107) -> int: 

108 """Run session show subcommand. 

109 

110 Returns: 

111 Exit code: 0 on success, 1 on error. 

112 """ 

113 sessions_dir = config.sessions_path() 

114 name = args.name or config.default_session 

115 

116 store = YamlSessionStore(sessions_dir) 

117 try: 

118 session = store.load(name) 

119 except (SessionNotFoundError, SessionNameNotFoundError) as exc: 

120 _error(str(exc)) 

121 

122 output: dict = { 

123 "flow": session.flow, 

124 "state": session.state, 

125 "name": session.name, 

126 "stack": [{"flow": f.flow, "state": f.state} for f in session.stack], 

127 "created_at": session.created_at, 

128 "updated_at": session.updated_at, 

129 } 

130 

131 if args.output_format == "json": 

132 print(format_json(output)) # noqa: T201 

133 else: 

134 print(format_text(output)) # noqa: T201 

135 return 0 

136 

137 

138def cmd_session_set_state( 

139 args: argparse.Namespace, config: FlowrConfig, resolver: DefaultFlowNameResolver 

140) -> int: 

141 """Run session set-state subcommand. 

142 

143 Returns: 

144 Exit code: 0 on success, 1 on error. 

145 """ 

146 sessions_dir = config.sessions_path() 

147 name = args.name or config.default_session 

148 

149 store = YamlSessionStore(sessions_dir) 

150 try: 

151 session = store.load(name) 

152 except (SessionNotFoundError, SessionNameNotFoundError) as exc: 

153 _error(str(exc)) 

154 

155 # Validate that the requested state exists in the flow 

156 flows_dir = config.flows_path() 

157 try: 

158 flow_path = resolver.resolve(session.flow, flows_dir) 

159 except FlowNameNotFoundError as exc: 

160 _error(str(exc)) 

161 

162 flow = load_flow_from_file(flow_path) 

163 state_ids = {s.id for s in flow.states} 

164 if args.state not in state_ids: 

165 _error(f"State '{args.state}' not found in flow '{session.flow}'") 

166 

167 updated = session.with_state(args.state) 

168 store.save(updated) 

169 

170 output = { 

171 "flow": updated.flow, 

172 "state": updated.state, 

173 "updated_at": updated.updated_at, 

174 } 

175 print(format_text(output)) # noqa: T201 

176 return 0 

177 

178 

179def cmd_session_list( 

180 args: argparse.Namespace, config: FlowrConfig, _resolver: DefaultFlowNameResolver 

181) -> int: 

182 """Run session list subcommand. 

183 

184 Returns: 

185 Exit code: 0 on success, 1 on error. 

186 """ 

187 sessions_dir = config.sessions_path() 

188 

189 store = YamlSessionStore(sessions_dir) 

190 sessions = store.list_sessions() 

191 

192 rows = [ 

193 { 

194 "name": s.name, 

195 "flow": s.flow, 

196 "state": s.state, 

197 "updated_at": s.updated_at, 

198 } 

199 for s in sessions 

200 ] 

201 

202 if args.output_format == "json": 

203 print(format_json(rows)) # noqa: T201 

204 else: 

205 print(format_text(rows)) # noqa: T201 

206 return 0