Coverage for flowr / infrastructure / session_store.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 18:23 +0000

1"""Session persistence: load/save session YAML files with atomic writes. 

2 

3Implements the SessionStore Protocol defined in flowr.domain.session. 

4""" 

5 

6import os 

7import tempfile 

8from datetime import UTC, datetime 

9from pathlib import Path 

10 

11import yaml 

12 

13from flowr.domain.loader import load_flow_from_file 

14from flowr.domain.session import Session, SessionStackFrame 

15 

16 

17class SessionAlreadyExistsError(Exception): 

18 """Raised when attempting to init a session that already exists.""" 

19 

20 

21class SessionNotFoundError(Exception): 

22 """Raised when a session file cannot be found.""" 

23 

24 

25class SessionCorruptedError(Exception): 

26 """Raised when a session YAML file cannot be parsed.""" 

27 

28 

29class SessionNameNotFoundError(Exception): 

30 """Raised when a session name or path cannot be resolved.""" 

31 

32 def __init__(self, session_arg: str, sessions_dir: Path) -> None: 

33 """Initialize with the unresolvable session arg and searched directory.""" 

34 self.session_arg = session_arg 

35 self.sessions_dir = sessions_dir 

36 super().__init__( 

37 f"Session '{session_arg}' not found (searched in {sessions_dir})" 

38 ) 

39 

40 

41class YamlSessionStore: 

42 """File-based session store using YAML with atomic writes. 

43 

44 Sessions are stored as <name>.yaml in the configured sessions directory. 

45 Writes use temp-file-then-rename to prevent partial corruption. 

46 

47 Session arguments accept both names and file paths, mirroring the 

48 flow name resolution pattern: if the argument is an existing file path, 

49 it is used directly; otherwise, it is treated as a session name and 

50 resolved as <sessions_dir>/<name>.yaml. 

51 """ 

52 

53 def __init__(self, sessions_dir: Path) -> None: 

54 """Initialize with the sessions directory.""" 

55 self._sessions_dir = sessions_dir 

56 

57 def resolve(self, session_arg: str) -> Path: 

58 """Resolve a session argument to a session file path. 

59 

60 If session_arg is an existing file path, return it directly 

61 (backward compatible). Otherwise, treat it as a session name 

62 and look for <sessions_dir>/<session_arg>.yaml. 

63 

64 Args: 

65 session_arg: A file path or short session name. 

66 sessions_dir: The configured sessions directory. 

67 

68 Returns: 

69 The resolved Path to the session YAML file. 

70 

71 Raises: 

72 SessionNameNotFoundError: The argument is not an existing file 

73 and no matching .yaml file exists in sessions_dir. 

74 """ 

75 path = Path(session_arg) 

76 if path.exists(): 

77 return path 

78 

79 name = session_arg 

80 if not name.endswith(".yaml"): 

81 name = f"{name}.yaml" 

82 

83 resolved = self._sessions_dir / name 

84 if resolved.exists(): 

85 return resolved 

86 

87 raise SessionNameNotFoundError(session_arg, self._sessions_dir) 

88 

89 def init(self, flow_path: Path, name: str) -> Session: 

90 """Create a new session at the flow's initial state. 

91 

92 Returns: 

93 The newly created Session. 

94 

95 Raises: 

96 SessionAlreadyExistsError: A session with this name already exists. 

97 """ 

98 session_path = self._sessions_dir / f"{name}.yaml" 

99 if session_path.exists(): 

100 msg = f"Session '{name}' already exists" 

101 raise SessionAlreadyExistsError(msg) 

102 

103 flow = load_flow_from_file(flow_path) 

104 

105 ts = datetime.now(tz=UTC).isoformat() 

106 session = Session( 

107 flow=flow.flow, 

108 state=flow.states[0].id, 

109 name=name, 

110 created_at=ts, 

111 updated_at=ts, 

112 ) 

113 self.save(session) 

114 return session 

115 

116 def load(self, session_arg: str) -> Session: 

117 """Load a session by name or file path. 

118 

119 If session_arg is an existing file path, load it directly. 

120 Otherwise, resolve it as <sessions_dir>/<session_arg>.yaml. 

121 

122 Returns: 

123 The loaded Session. 

124 

125 Raises: 

126 SessionNotFoundError: No session file exists for this name. 

127 SessionCorruptedError: The session file contains invalid YAML. 

128 """ 

129 try: 

130 session_path = self.resolve(session_arg) 

131 except SessionNameNotFoundError: 

132 msg = f"Session '{session_arg}' not found" 

133 raise SessionNotFoundError(msg) from None 

134 

135 try: 

136 data = yaml.safe_load(session_path.read_text(encoding="utf-8")) 

137 except yaml.YAMLError as exc: 

138 msg = f"Session '{session_arg}' is corrupted: {exc}" 

139 raise SessionCorruptedError(msg) from exc 

140 

141 stack = [ 

142 SessionStackFrame(flow=f["flow"], state=f["state"]) 

143 for f in data.get("stack", []) 

144 ] 

145 return Session( 

146 flow=data["flow"], 

147 state=data["state"], 

148 name=data.get("name", "default"), 

149 created_at=data.get("created_at", ""), 

150 updated_at=data.get("updated_at", ""), 

151 stack=stack, 

152 params=data.get("params", {}), 

153 ) 

154 

155 def save(self, session: Session) -> None: 

156 """Save a session using atomic write (write temp, then rename).""" 

157 self._sessions_dir.mkdir(parents=True, exist_ok=True) 

158 session_path = self._sessions_dir / f"{session.name}.yaml" 

159 

160 data = { 

161 "flow": session.flow, 

162 "state": session.state, 

163 "name": session.name, 

164 "created_at": session.created_at, 

165 "updated_at": session.updated_at, 

166 "stack": [{"flow": f.flow, "state": f.state} for f in session.stack], 

167 "params": session.params, 

168 } 

169 

170 fd, tmp_path = tempfile.mkstemp(dir=str(self._sessions_dir), suffix=".yaml") 

171 tmp = Path(tmp_path) 

172 try: 

173 os.close(fd) 

174 with tmp.open("w", encoding="utf-8") as f: 

175 yaml.dump(data, f, default_flow_style=False) 

176 tmp.replace(str(session_path)) 

177 except BaseException: # pragma: no cover 

178 tmp.unlink(missing_ok=True) # pragma: no cover 

179 raise # pragma: no cover 

180 

181 def list_sessions(self) -> list[Session]: 

182 """List all sessions, sorted by name.""" 

183 self._sessions_dir.mkdir(parents=True, exist_ok=True) 

184 sessions: list[Session] = [] 

185 for path in sorted(self._sessions_dir.glob("*.yaml")): 

186 name = path.stem 

187 try: 

188 session = self.load(name) 

189 sessions.append(session) 

190 except (SessionNotFoundError, SessionCorruptedError): 

191 continue 

192 return sessions