Coverage for flowr / server / app.py: 93%
142 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 18:42 +0000
1"""FastAPI application factory for the viz server."""
3from __future__ import annotations
5import asyncio
6import threading
7import time
8from pathlib import Path
9from typing import Any
11from fastapi import FastAPI, Request
12from fastapi.responses import HTMLResponse, JSONResponse
13from fastapi.staticfiles import StaticFiles
15from flowr.server.config import ServerConfig
16from flowr.server.scanner import FlowRegistry
19def _build_scope(environ: dict[str, Any]) -> tuple[dict[str, Any], bytes]:
20 content_length = int(environ.get("CONTENT_LENGTH", 0) or 0)
21 body = environ["wsgi.input"].read(content_length)
23 raw_headers: list[tuple[bytes, bytes]] = []
24 for key, value in environ.items():
25 if key.startswith("HTTP_"):
26 name = key[5:].replace("_", "-").lower().encode()
27 raw_headers.append((name, value.encode()))
28 if "CONTENT_TYPE" in environ:
29 raw_headers.append((b"content-type", environ["CONTENT_TYPE"].encode()))
30 if "CONTENT_LENGTH" in environ:
31 raw_headers.append((b"content-length", environ["CONTENT_LENGTH"].encode()))
33 scheme = environ.get("wsgi.url_scheme", "http")
34 server_name = environ.get("SERVER_NAME", "testserver")
35 server_port = int(environ.get("SERVER_PORT", "80"))
36 path = environ.get("PATH_INFO", "/")
37 query_string = environ.get("QUERY_STRING", "").encode()
39 scope: dict[str, Any] = {
40 "type": "http",
41 "http_version": "1.1",
42 "method": environ["REQUEST_METHOD"],
43 "path": path,
44 "raw_path": path.encode(),
45 "root_path": environ.get("SCRIPT_NAME", ""),
46 "scheme": scheme,
47 "query_string": query_string,
48 "headers": raw_headers,
49 "client": (environ.get("REMOTE_ADDR", "localhost"), 0),
50 "server": (server_name, server_port),
51 "state": {},
52 }
53 return scope, body
56async def _wsgi_to_asgi(
57 asgi_app: Any, # noqa: ANN401
58 scope: dict[str, Any],
59 body: bytes,
60) -> dict[str, Any]:
61 response_status: int = 500
62 response_headers: list[tuple[bytes, bytes]] = []
63 response_body_chunks: list[bytes] = []
65 body_sent = False
67 async def receive() -> dict[str, Any]: # noqa: RUF029
68 nonlocal body_sent
69 if body_sent:
70 return {"type": "http.disconnect"}
71 body_sent = True
72 return {
73 "type": "http.request",
74 "body": body,
75 "more_body": False,
76 }
78 async def send(message: dict[str, Any]) -> None: # noqa: RUF029
79 nonlocal response_status, response_headers
80 if message["type"] == "http.response.start":
81 response_status = message["status"]
82 response_headers = [(k.lower(), v) for k, v in message["headers"]]
83 elif message["type"] == "http.response.body":
84 response_body_chunks.append(message.get("body", b""))
86 await asgi_app(scope, receive, send)
88 return {
89 "status": response_status,
90 "headers": response_headers,
91 "body": b"".join(response_body_chunks),
92 }
95class _ASGIWSGIBridge:
96 def __init__(self, asgi_app: Any) -> None: # noqa: ANN401
97 self._asgi_app = asgi_app
99 def __call__(self, environ: dict[str, Any], start_response: Any) -> list[bytes]: # noqa: ANN401
100 scope, body = _build_scope(environ)
101 response = asyncio.run(_wsgi_to_asgi(self._asgi_app, scope, body))
103 status_str = str(response["status"])
104 wsgi_headers: list[tuple[str, str]] = [
105 (k.decode(), v.decode()) for k, v in response["headers"]
106 ]
108 start_response(status_str + " OK", wsgi_headers)
109 return [response["body"]]
112def _make_app(config: ServerConfig) -> FastAPI: # noqa: C901
113 """Create the raw FastAPI application (no WSGI bridge)."""
114 app = FastAPI()
115 registry = FlowRegistry(config.path)
117 static_dir = Path(__file__).resolve().parent.parent / "static"
118 if static_dir.is_dir():
119 app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
121 index_html = (static_dir / "index.html").read_text()
123 @app.get("/")
124 async def root() -> HTMLResponse:
125 return HTMLResponse(content=index_html)
127 @app.get("/api/flows")
128 async def list_flows(refresh: str = "false") -> dict[str, Any]:
129 if refresh == "true":
130 registry.refresh()
131 flows: list[dict[str, Any]] = []
132 for f in registry.list_flows():
133 flows.append(
134 {
135 "name": f.name,
136 "relativePath": f.relative_path,
137 "status": "ok",
138 "error": None,
139 }
140 )
141 return {
142 "flows": flows,
143 "directory": str(config.path),
144 "edit_mode": config.edit_mode,
145 }
147 @app.get("/api/flows/{flow_id:path}")
148 async def get_flow(flow_id: str) -> Any: # noqa: ANN401
149 result = registry.read_flow_model(flow_id)
150 if not result["success"]:
151 return JSONResponse(status_code=404, content={"error": result["error"]})
152 model = result["data"]
153 return {
154 "flow": model.flow,
155 "version": model.version,
156 "exits": list(model.exits),
157 "params": [{"name": p.name, "default": p.default} for p in model.params],
158 "attrs": model.attrs or {},
159 "states": [
160 {
161 "id": s.id,
162 "attrs": s.attrs or {},
163 "conditions": s.conditions or {},
164 "flow": s.flow,
165 "flow_version": s.flow_version,
166 "next": {
167 trigger: {
168 "target": t.target,
169 "conditions": (
170 t.conditions.conditions if t.conditions else None
171 ),
172 }
173 for trigger, t in s.next.items()
174 },
175 }
176 for s in model.states
177 ],
178 }
180 @app.put("/api/flows/{flow_id:path}")
181 async def put_flow(flow_id: str, request: Request) -> Any: # noqa: ANN401
182 if not config.edit_mode:
183 return JSONResponse(
184 status_code=405, content={"error": "method not allowed"}
185 )
186 content = (await request.body()).decode()
187 result = registry.write_flow(flow_id, content)
188 if result["success"]:
189 return JSONResponse(status_code=200, content=result)
190 if result.get("error"):
191 return JSONResponse(status_code=404, content=result)
192 return JSONResponse(status_code=422, content=result)
194 @app.post("/api/flows")
195 async def post_flow(request: Request) -> Any: # noqa: ANN401
196 if not config.edit_mode:
197 return JSONResponse(
198 status_code=405, content={"error": "method not allowed"}
199 )
200 body = await request.json()
201 filename = body.get("filename")
202 content = body.get("content", "")
203 if not filename:
204 return JSONResponse(status_code=422, content={"error": "filename required"})
205 result = registry.create_flow(filename, content)
206 if result.get("error") == "path traversal":
207 return JSONResponse(status_code=422, content=result)
208 if result["success"]:
209 return JSONResponse(status_code=201, content=result)
210 return JSONResponse(status_code=422, content=result)
212 @app.delete("/api/flows/{flow_id:path}")
213 async def delete_flow(flow_id: str) -> Any: # noqa: ANN401
214 if not config.edit_mode:
215 return JSONResponse(
216 status_code=405, content={"error": "method not allowed"}
217 )
218 if registry.delete_flow(flow_id):
219 return JSONResponse(status_code=200, content={"status": "ok"})
220 return JSONResponse(status_code=404, content={"error": "not found"})
222 return app
225def create_app(config: ServerConfig) -> _ASGIWSGIBridge:
226 """Create the application, wrapped for WSGI compatibility."""
227 return _ASGIWSGIBridge(_make_app(config))
230def start_server(config: ServerConfig) -> tuple[threading.Thread, int]:
231 """Start the uvicorn server in a background thread.
233 Returns:
234 (thread, actual_port) — the port uvicorn bound to.
235 """
236 import uvicorn
238 app = _make_app(config)
239 srv = uvicorn.Server(
240 uvicorn.Config(app, host=config.host, port=config.port, log_level="error")
241 )
243 # Give uvicorn a moment to bind, then capture the actual port
244 def run_and_store() -> None:
245 srv.run()
247 t = threading.Thread(target=run_and_store, daemon=True)
248 t.start()
249 # Wait briefly for the server to bind
250 timeout = 2.0
251 start = time.monotonic()
252 while not srv.started and (time.monotonic() - start) < timeout:
253 time.sleep(0.05)
254 if not srv.started:
255 raise RuntimeError("server did not start within timeout")
257 if srv.servers:
258 actual_port = srv.servers[0].sockets[0].getsockname()[1] # type: ignore[union-attr]
259 else:
260 actual_port = config.port
261 return t, actual_port
264def run_server(config: ServerConfig) -> None:
265 """Start the uvicorn server and block until stopped."""
266 import uvicorn
268 app = _make_app(config)
269 print(f"http://{config.host}:{config.port}", flush=True) # noqa: T201
270 uvicorn.run(app, host=config.host, port=config.port, log_level="error")