Coverage for flowr / infrastructure / session_store.py: 100%
74 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 18:23 +0000
1"""Session persistence: load/save session YAML files with atomic writes.
3Implements the SessionStore Protocol defined in flowr.domain.session.
4"""
6import os
7import tempfile
8from datetime import UTC, datetime
9from pathlib import Path
11import yaml
13from flowr.domain.loader import load_flow_from_file
14from flowr.domain.session import Session, SessionStackFrame
17class SessionAlreadyExistsError(Exception):
18 """Raised when attempting to init a session that already exists."""
21class SessionNotFoundError(Exception):
22 """Raised when a session file cannot be found."""
25class SessionCorruptedError(Exception):
26 """Raised when a session YAML file cannot be parsed."""
29class SessionNameNotFoundError(Exception):
30 """Raised when a session name or path cannot be resolved."""
32 def __init__(self, session_arg: str, sessions_dir: Path) -> None:
33 """Initialize with the unresolvable session arg and searched directory."""
34 self.session_arg = session_arg
35 self.sessions_dir = sessions_dir
36 super().__init__(
37 f"Session '{session_arg}' not found (searched in {sessions_dir})"
38 )
41class YamlSessionStore:
42 """File-based session store using YAML with atomic writes.
44 Sessions are stored as <name>.yaml in the configured sessions directory.
45 Writes use temp-file-then-rename to prevent partial corruption.
47 Session arguments accept both names and file paths, mirroring the
48 flow name resolution pattern: if the argument is an existing file path,
49 it is used directly; otherwise, it is treated as a session name and
50 resolved as <sessions_dir>/<name>.yaml.
51 """
53 def __init__(self, sessions_dir: Path) -> None:
54 """Initialize with the sessions directory."""
55 self._sessions_dir = sessions_dir
57 def resolve(self, session_arg: str) -> Path:
58 """Resolve a session argument to a session file path.
60 If session_arg is an existing file path, return it directly
61 (backward compatible). Otherwise, treat it as a session name
62 and look for <sessions_dir>/<session_arg>.yaml.
64 Args:
65 session_arg: A file path or short session name.
66 sessions_dir: The configured sessions directory.
68 Returns:
69 The resolved Path to the session YAML file.
71 Raises:
72 SessionNameNotFoundError: The argument is not an existing file
73 and no matching .yaml file exists in sessions_dir.
74 """
75 path = Path(session_arg)
76 if path.exists():
77 return path
79 name = session_arg
80 if not name.endswith(".yaml"):
81 name = f"{name}.yaml"
83 resolved = self._sessions_dir / name
84 if resolved.exists():
85 return resolved
87 raise SessionNameNotFoundError(session_arg, self._sessions_dir)
89 def init(self, flow_path: Path, name: str) -> Session:
90 """Create a new session at the flow's initial state.
92 Returns:
93 The newly created Session.
95 Raises:
96 SessionAlreadyExistsError: A session with this name already exists.
97 """
98 session_path = self._sessions_dir / f"{name}.yaml"
99 if session_path.exists():
100 msg = f"Session '{name}' already exists"
101 raise SessionAlreadyExistsError(msg)
103 flow = load_flow_from_file(flow_path)
105 ts = datetime.now(tz=UTC).isoformat()
106 session = Session(
107 flow=flow.flow,
108 state=flow.states[0].id,
109 name=name,
110 created_at=ts,
111 updated_at=ts,
112 )
113 self.save(session)
114 return session
116 def load(self, session_arg: str) -> Session:
117 """Load a session by name or file path.
119 If session_arg is an existing file path, load it directly.
120 Otherwise, resolve it as <sessions_dir>/<session_arg>.yaml.
122 Returns:
123 The loaded Session.
125 Raises:
126 SessionNotFoundError: No session file exists for this name.
127 SessionCorruptedError: The session file contains invalid YAML.
128 """
129 try:
130 session_path = self.resolve(session_arg)
131 except SessionNameNotFoundError:
132 msg = f"Session '{session_arg}' not found"
133 raise SessionNotFoundError(msg) from None
135 try:
136 data = yaml.safe_load(session_path.read_text(encoding="utf-8"))
137 except yaml.YAMLError as exc:
138 msg = f"Session '{session_arg}' is corrupted: {exc}"
139 raise SessionCorruptedError(msg) from exc
141 stack = [
142 SessionStackFrame(flow=f["flow"], state=f["state"])
143 for f in data.get("stack", [])
144 ]
145 return Session(
146 flow=data["flow"],
147 state=data["state"],
148 name=data.get("name", "default"),
149 created_at=data.get("created_at", ""),
150 updated_at=data.get("updated_at", ""),
151 stack=stack,
152 params=data.get("params", {}),
153 )
155 def save(self, session: Session) -> None:
156 """Save a session using atomic write (write temp, then rename)."""
157 self._sessions_dir.mkdir(parents=True, exist_ok=True)
158 session_path = self._sessions_dir / f"{session.name}.yaml"
160 data = {
161 "flow": session.flow,
162 "state": session.state,
163 "name": session.name,
164 "created_at": session.created_at,
165 "updated_at": session.updated_at,
166 "stack": [{"flow": f.flow, "state": f.state} for f in session.stack],
167 "params": session.params,
168 }
170 fd, tmp_path = tempfile.mkstemp(dir=str(self._sessions_dir), suffix=".yaml")
171 tmp = Path(tmp_path)
172 try:
173 os.close(fd)
174 with tmp.open("w", encoding="utf-8") as f:
175 yaml.dump(data, f, default_flow_style=False)
176 tmp.replace(str(session_path))
177 except BaseException: # pragma: no cover
178 tmp.unlink(missing_ok=True) # pragma: no cover
179 raise # pragma: no cover
181 def list_sessions(self) -> list[Session]:
182 """List all sessions, sorted by name."""
183 self._sessions_dir.mkdir(parents=True, exist_ok=True)
184 sessions: list[Session] = []
185 for path in sorted(self._sessions_dir.glob("*.yaml")):
186 name = path.stem
187 try:
188 session = self.load(name)
189 sessions.append(session)
190 except (SessionNotFoundError, SessionCorruptedError):
191 continue
192 return sessions