Coverage for smith / infrastructure / template_source.py: 100%
0 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 18:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 18:48 +0000
1"""Template source adapters — resolve files from bundled, local, or URL sources."""
3import importlib.resources
4import os
5import tarfile
6import tempfile
7import zipfile
8from pathlib import Path
10import requests
12from smith.domain.ports import TemplateSourceError
13from smith.domain.value_objects import FileSpec, TemplateSource
15AGENTIC_FILE_PATTERNS = [
16 "AGENTS.md",
17 ".opencode/agents/",
18 ".opencode/knowledge/",
19 ".opencode/skills/",
20 ".opencode/tools/",
21 ".templates/",
22 ".flowr/",
23]
25GITIGNORE_PATTERNS = ["AGENTS.md", ".opencode/", ".templates/", ".flowr/"]
28def _is_agentic_path(path: Path) -> bool:
29 """Return whether *path* belongs to an agentic file pattern."""
30 path_str = str(path)
31 if path_str == "AGENTS.md":
32 return True
33 return any(path_str.startswith(pattern) for pattern in AGENTIC_FILE_PATTERNS[1:])
36def _collect_specs_from_directory(directory: Path) -> list[FileSpec]:
37 """Walk *directory* and collect FileSpec for every file found."""
38 specs: list[FileSpec] = []
39 for root, _dirs, files in os.walk(directory):
40 for f in files:
41 full = Path(root) / f
42 rel = full.relative_to(directory)
43 specs.append(FileSpec(relative_path=rel, content=full.read_bytes()))
44 return specs
47class BundledTemplateSource:
48 """Resolve templates shipped inside the ``smith.data`` package."""
50 def resolve(self) -> list[FileSpec]:
51 """Return FileSpec list for every agentic file in the bundled data."""
52 try:
53 data_dir = importlib.resources.files("smith.data")
54 except Exception as exc:
55 raise TemplateSourceError(
56 f"Failed to locate bundled template data: {exc}"
57 ) from exc
59 if not hasattr(data_dir, "joinpath"):
60 raise TemplateSourceError("Bundled template data directory not found")
62 data_path = Path(str(data_dir.joinpath()))
63 if not data_path.is_dir():
64 raise TemplateSourceError(
65 f"Bundled template data directory not found: {data_path}"
66 )
68 all_specs = _collect_specs_from_directory(data_path)
69 specs = [s for s in all_specs if _is_agentic_path(s.relative_path)]
71 if not specs:
72 raise TemplateSourceError("Bundled template data contains no agentic files")
74 return specs
76 def gitignore_patterns(self) -> list[str]:
77 """Return the standard gitignore patterns for bundled templates."""
78 return list(GITIGNORE_PATTERNS)
81class LocalTemplateSource:
82 """Resolve templates from a local directory on disk."""
84 def __init__(self, path: Path) -> None:
85 """Initialise with the local template directory path."""
86 self._path = path
88 def resolve(self) -> list[FileSpec]:
89 """Return FileSpec list for every file in the local directory."""
90 if not self._path.is_dir():
91 raise TemplateSourceError(f"Template directory not found: {self._path}")
92 return _collect_specs_from_directory(self._path)
94 def gitignore_patterns(self) -> list[str]:
95 """Return gitignore patterns derived from the local directory contents."""
96 patterns: list[str] = []
97 for item in sorted(self._path.iterdir()):
98 name = item.name
99 if item.is_dir():
100 patterns.append(f"{name}/")
101 else:
102 patterns.append(name)
103 return patterns
106class UrlTemplateSource:
107 """Resolve templates from a remote tar.gz or zip archive."""
109 def __init__(self, url: str) -> None:
110 """Initialise with the URL of the template archive."""
111 self._url = url
113 def resolve(self) -> list[FileSpec]:
114 """Download, extract, and return FileSpec list for agentic files."""
115 try:
116 response = requests.get(self._url, timeout=30)
117 response.raise_for_status()
118 except requests.RequestException as exc:
119 raise TemplateSourceError(
120 f"Failed to download template from {self._url}: {exc}"
121 ) from exc
123 tmp_dir = tempfile.mkdtemp(prefix="smith_url_")
124 try:
125 try:
126 if self._url.endswith(".zip"):
127 self._extract_zip(response.content, tmp_dir)
128 else:
129 self._extract_tar(response.content, tmp_dir)
130 except (tarfile.TarError, zipfile.BadZipFile, OSError) as exc:
131 raise TemplateSourceError(
132 f"Failed to extract template archive from {self._url}: {exc}"
133 ) from exc
135 all_specs = _collect_specs_from_directory(Path(tmp_dir))
136 finally:
137 import shutil
139 shutil.rmtree(tmp_dir, ignore_errors=True)
141 specs = [s for s in all_specs if _is_agentic_path(s.relative_path)]
143 if not specs:
144 raise TemplateSourceError(
145 f"Template archive from {self._url} contains no agentic files"
146 )
148 return specs
150 def gitignore_patterns(self) -> list[str]:
151 """Return the standard gitignore patterns for URL-sourced templates."""
152 return list(GITIGNORE_PATTERNS)
154 @staticmethod
155 def _extract_tar(content: bytes, target_dir: str) -> None:
156 """Extract a tar.gz archive, stripping the top-level directory."""
157 import io
159 with tarfile.open(fileobj=io.BytesIO(content), mode="r:gz") as tar:
160 members = tar.getmembers()
161 if not members:
162 raise TemplateSourceError("Tar archive is empty")
164 root_dir = members[0].name.split("/")[0]
165 for member in members:
166 member_path = Path(member.name)
167 if member_path.parts[0] == root_dir:
168 member.name = str(Path(*member_path.parts[1:]))
169 if member.name:
170 tar.extract(member, target_dir, filter="data")
172 @staticmethod
173 def _extract_zip(content: bytes, target_dir: str) -> None:
174 """Extract a zip archive, stripping the top-level directory."""
175 import io
177 with zipfile.ZipFile(io.BytesIO(content)) as zf:
178 names = zf.namelist()
179 if not names:
180 raise TemplateSourceError("Zip archive is empty")
182 root_dir = names[0].split("/")[0]
183 for name in names:
184 if name == root_dir + "/":
185 continue
186 if name.startswith(root_dir + "/"):
187 zf.extract(name, target_dir)
190class TemplateSourceAdapter:
191 """Dispatch to the correct TemplateSourcePort implementation based on kind."""
193 def __init__(self, source: TemplateSource) -> None:
194 """Initialise with the TemplateSource value object."""
195 self._source = source
197 def resolve(self) -> list[FileSpec]:
198 """Resolve template files via the appropriate source strategy."""
199 if self._source.kind == "bundled":
200 return BundledTemplateSource().resolve()
201 if self._source.kind == "local":
202 return LocalTemplateSource(Path(self._source.location)).resolve()
203 if self._source.kind == "url":
204 return UrlTemplateSource(self._source.location).resolve()
205 raise TemplateSourceError(f"Unknown template source kind: {self._source.kind}")
207 def gitignore_patterns(self) -> list[str]:
208 """Return gitignore patterns via the appropriate source strategy."""
209 if self._source.kind == "bundled":
210 return BundledTemplateSource().gitignore_patterns()
211 if self._source.kind == "local":
212 return LocalTemplateSource(Path(self._source.location)).gitignore_patterns()
213 if self._source.kind == "url":
214 return UrlTemplateSource(self._source.location).gitignore_patterns()
215 raise TemplateSourceError(f"Unknown template source kind: {self._source.kind}")