Coverage for flowr / domain / session.py: 100%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 18:23 +0000

1"""Session format for tracking workflow progress.""" 

2 

3from dataclasses import dataclass, field 

4from datetime import UTC, datetime 

5from pathlib import Path 

6from typing import Any, Protocol 

7 

8 

9@dataclass(frozen=True, slots=True) 

10class SessionStackFrame: 

11 """A frame in the session call stack tracking subflow nesting.""" 

12 

13 flow: str 

14 state: str 

15 

16 

17@dataclass(frozen=True, slots=True) 

18class Session: 

19 """Minimal session tracking: current flow, state, and call stack.""" 

20 

21 flow: str 

22 state: str 

23 name: str = "default" 

24 created_at: str = "" 

25 updated_at: str = "" 

26 stack: list[SessionStackFrame] = field(default_factory=list) 

27 params: dict[str, dict[str, Any]] = field(default_factory=dict) 

28 

29 def with_state(self, state: str, updated_at: str | None = None) -> "Session": 

30 """Return a new Session with an updated state and optional timestamp. 

31 

32 Returns: 

33 New Session with the state and updated_at fields changed. 

34 """ 

35 ts = updated_at or datetime.now(tz=UTC).isoformat() 

36 return Session( 

37 flow=self.flow, 

38 state=state, 

39 name=self.name, 

40 created_at=self.created_at, 

41 updated_at=ts, 

42 stack=self.stack, 

43 params=self.params, 

44 ) 

45 

46 def push_stack( 

47 self, frame: SessionStackFrame, new_state: str, new_flow: str | None = None 

48 ) -> "Session": 

49 """Push a stack frame and update state (entering a subflow). 

50 

51 Returns: 

52 New Session with the frame pushed and state/flow updated. 

53 """ 

54 ts = datetime.now(tz=UTC).isoformat() 

55 return Session( 

56 flow=new_flow if new_flow is not None else self.flow, 

57 state=new_state, 

58 name=self.name, 

59 created_at=self.created_at, 

60 updated_at=ts, 

61 stack=[*self.stack, frame], 

62 params=self.params, 

63 ) 

64 

65 def pop_stack(self, new_state: str) -> "Session": 

66 """Pop a stack frame and update state (exiting a subflow). 

67 

68 Returns: 

69 New Session with the frame popped and state/flow restored. 

70 """ 

71 ts = datetime.now(tz=UTC).isoformat() 

72 parent_flow = self.stack[-1].flow if self.stack else self.flow 

73 return Session( 

74 flow=parent_flow, 

75 state=new_state, 

76 name=self.name, 

77 created_at=self.created_at, 

78 updated_at=ts, 

79 stack=self.stack[:-1], 

80 params=self.params, 

81 ) 

82 

83 

84class SessionStore(Protocol): 

85 """Persistence interface for session state. 

86 

87 Implementations must use atomic writes (temp-file-then-rename) 

88 to prevent partial state corruption. 

89 """ 

90 

91 def init(self, flow_path: Path, name: str) -> Session: # pragma: no cover 

92 """Create a new session at the flow's initial state. 

93 

94 Args: 

95 flow_path: The resolved path to the flow YAML file. 

96 name: The session name (used as filename stem). 

97 

98 Returns: 

99 The newly created Session. 

100 

101 Raises: 

102 SessionAlreadyExistsError: A session with this name already exists. 

103 """ 

104 ... 

105 

106 def load(self, name: str) -> Session: # pragma: no cover 

107 """Load a session by name. 

108 

109 Args: 

110 name: The session name. 

111 

112 Returns: 

113 The loaded Session. 

114 

115 Raises: 

116 SessionNotFoundError: No session file exists for this name. 

117 """ 

118 ... 

119 

120 def save(self, session: Session) -> None: # pragma: no cover 

121 """Persist a session using atomic write. 

122 

123 Args: 

124 session: The session to save. 

125 """ 

126 ... 

127 

128 def list_sessions(self) -> list[Session]: # pragma: no cover 

129 """List all sessions in the session store. 

130 

131 Returns: 

132 List of all sessions, sorted by updated_at descending. 

133 """ 

134 ...