smith.domain.connection
Connection aggregate — core domain logic for connect/disconnect/update/status.
1"""Connection aggregate — core domain logic for connect/disconnect/update/status.""" 2 3from pathlib import Path 4 5from smith.domain.ports import ( 6 FileSystemPort, 7 GitignorePort, 8 MetadataPort, 9 TemplateSourcePort, 10) 11from smith.domain.value_objects import ( 12 ConnectionState, 13 ConnectionStatus, 14 FileSpec, 15 TemplateSource, 16) 17 18 19class Connection: 20 """Domain aggregate that manages the lifecycle of agentic file connections.""" 21 22 def __init__( 23 self, 24 template_source_port: TemplateSourcePort, 25 filesystem_port: FileSystemPort, 26 gitignore_port: GitignorePort, 27 metadata_port: MetadataPort, 28 ) -> None: 29 """Initialise the Connection with its required ports.""" 30 self._template_source_port = template_source_port 31 self._filesystem_port = filesystem_port 32 self._gitignore_port = gitignore_port 33 self._metadata_port = metadata_port 34 35 def connect( 36 self, 37 source: TemplateSource, 38 overwrite: bool = False, 39 ) -> None: 40 """Write agentic files and register the connection in .gitignore.""" 41 specs = self._template_source_port.resolve() 42 paths = [spec.relative_path for spec in specs] 43 44 specs = self._resolve_specs(specs, paths, overwrite) 45 self._commit(specs, source) 46 47 def _commit(self, specs: list[FileSpec], source: TemplateSource) -> None: 48 self._filesystem_port.write_atomic(specs) 49 self._gitignore_port.add_section( 50 self._template_source_port.gitignore_patterns() 51 ) 52 self._metadata_port.save_source(source) 53 54 def _resolve_specs( 55 self, 56 specs: list[FileSpec], 57 paths: list[Path], 58 overwrite: bool = False, 59 ) -> list[FileSpec]: 60 conflicting = self._filesystem_port.check_conflicts(paths) 61 if not conflicting: 62 return specs 63 64 unmanaged_existing = { 65 p 66 for p in conflicting 67 if not self._is_path_managed(p, self._gitignore_port.get_patterns()) 68 } 69 70 if overwrite: 71 return [s for s in specs if s.relative_path not in unmanaged_existing] 72 73 if not unmanaged_existing: 74 return specs 75 return [s for s in specs if s.relative_path not in unmanaged_existing] 76 77 @staticmethod 78 def _is_path_managed(path: Path, managed_patterns: list[str]) -> bool: 79 path_str = str(path) 80 return any( 81 path_str == pattern or path_str.startswith(pattern) 82 for pattern in managed_patterns 83 ) 84 85 def disconnect(self) -> list[Path]: 86 """Remove agentic files and the .gitignore section; return removed paths.""" 87 if not self._gitignore_port.has_section(): 88 return [] 89 90 managed_patterns = self._gitignore_port.get_patterns() 91 if not managed_patterns: 92 return [] 93 94 all_template_specs = self._template_source_port.resolve() 95 all_template_paths = [spec.relative_path for spec in all_template_specs] 96 97 managed_paths = [ 98 p for p in all_template_paths if self._is_path_managed(p, managed_patterns) 99 ] 100 101 existence = self._filesystem_port.exists(managed_paths) 102 paths_to_remove = [p for p in managed_paths if existence.get(p, False)] 103 104 self._filesystem_port.remove(paths_to_remove) 105 return paths_to_remove 106 107 def update( 108 self, 109 source: TemplateSource | None = None, 110 ) -> None: 111 """Refresh agentic files, optionally from a new template source.""" 112 if not self._gitignore_port.has_section(): 113 fallback_source = source or TemplateSource( 114 kind="bundled", location="agents-smith" 115 ) 116 return self.connect(source=fallback_source) 117 118 resolved_source = ( 119 source 120 or self._metadata_port.load_source() 121 or TemplateSource(kind="bundled", location="agents-smith") 122 ) 123 124 specs = self._template_source_port.resolve() 125 paths = [spec.relative_path for spec in specs] 126 127 specs = self._resolve_specs(specs, paths) 128 129 self._commit(specs, resolved_source) 130 return None 131 132 def status(self) -> ConnectionStatus: 133 """Return the current connection status of the project.""" 134 source = self._metadata_port.load_source() 135 all_specs = self._template_source_port.resolve() 136 all_paths = [spec.relative_path for spec in all_specs] 137 existence = self._filesystem_port.exists(all_paths) 138 present_files = [p for p in all_paths if existence.get(p, False)] 139 missing_files = [p for p in all_paths if not existence.get(p, False)] 140 141 if not self._gitignore_port.has_section() or not present_files: 142 state = ConnectionState.DISCONNECTED 143 elif missing_files: 144 state = ConnectionState.PARTIAL 145 else: 146 state = ConnectionState.CONNECTED 147 148 return ConnectionStatus( 149 state=state, 150 source=source, 151 present_files=present_files, 152 missing_files=missing_files, 153 )
class
Connection:
20class Connection: 21 """Domain aggregate that manages the lifecycle of agentic file connections.""" 22 23 def __init__( 24 self, 25 template_source_port: TemplateSourcePort, 26 filesystem_port: FileSystemPort, 27 gitignore_port: GitignorePort, 28 metadata_port: MetadataPort, 29 ) -> None: 30 """Initialise the Connection with its required ports.""" 31 self._template_source_port = template_source_port 32 self._filesystem_port = filesystem_port 33 self._gitignore_port = gitignore_port 34 self._metadata_port = metadata_port 35 36 def connect( 37 self, 38 source: TemplateSource, 39 overwrite: bool = False, 40 ) -> None: 41 """Write agentic files and register the connection in .gitignore.""" 42 specs = self._template_source_port.resolve() 43 paths = [spec.relative_path for spec in specs] 44 45 specs = self._resolve_specs(specs, paths, overwrite) 46 self._commit(specs, source) 47 48 def _commit(self, specs: list[FileSpec], source: TemplateSource) -> None: 49 self._filesystem_port.write_atomic(specs) 50 self._gitignore_port.add_section( 51 self._template_source_port.gitignore_patterns() 52 ) 53 self._metadata_port.save_source(source) 54 55 def _resolve_specs( 56 self, 57 specs: list[FileSpec], 58 paths: list[Path], 59 overwrite: bool = False, 60 ) -> list[FileSpec]: 61 conflicting = self._filesystem_port.check_conflicts(paths) 62 if not conflicting: 63 return specs 64 65 unmanaged_existing = { 66 p 67 for p in conflicting 68 if not self._is_path_managed(p, self._gitignore_port.get_patterns()) 69 } 70 71 if overwrite: 72 return [s for s in specs if s.relative_path not in unmanaged_existing] 73 74 if not unmanaged_existing: 75 return specs 76 return [s for s in specs if s.relative_path not in unmanaged_existing] 77 78 @staticmethod 79 def _is_path_managed(path: Path, managed_patterns: list[str]) -> bool: 80 path_str = str(path) 81 return any( 82 path_str == pattern or path_str.startswith(pattern) 83 for pattern in managed_patterns 84 ) 85 86 def disconnect(self) -> list[Path]: 87 """Remove agentic files and the .gitignore section; return removed paths.""" 88 if not self._gitignore_port.has_section(): 89 return [] 90 91 managed_patterns = self._gitignore_port.get_patterns() 92 if not managed_patterns: 93 return [] 94 95 all_template_specs = self._template_source_port.resolve() 96 all_template_paths = [spec.relative_path for spec in all_template_specs] 97 98 managed_paths = [ 99 p for p in all_template_paths if self._is_path_managed(p, managed_patterns) 100 ] 101 102 existence = self._filesystem_port.exists(managed_paths) 103 paths_to_remove = [p for p in managed_paths if existence.get(p, False)] 104 105 self._filesystem_port.remove(paths_to_remove) 106 return paths_to_remove 107 108 def update( 109 self, 110 source: TemplateSource | None = None, 111 ) -> None: 112 """Refresh agentic files, optionally from a new template source.""" 113 if not self._gitignore_port.has_section(): 114 fallback_source = source or TemplateSource( 115 kind="bundled", location="agents-smith" 116 ) 117 return self.connect(source=fallback_source) 118 119 resolved_source = ( 120 source 121 or self._metadata_port.load_source() 122 or TemplateSource(kind="bundled", location="agents-smith") 123 ) 124 125 specs = self._template_source_port.resolve() 126 paths = [spec.relative_path for spec in specs] 127 128 specs = self._resolve_specs(specs, paths) 129 130 self._commit(specs, resolved_source) 131 return None 132 133 def status(self) -> ConnectionStatus: 134 """Return the current connection status of the project.""" 135 source = self._metadata_port.load_source() 136 all_specs = self._template_source_port.resolve() 137 all_paths = [spec.relative_path for spec in all_specs] 138 existence = self._filesystem_port.exists(all_paths) 139 present_files = [p for p in all_paths if existence.get(p, False)] 140 missing_files = [p for p in all_paths if not existence.get(p, False)] 141 142 if not self._gitignore_port.has_section() or not present_files: 143 state = ConnectionState.DISCONNECTED 144 elif missing_files: 145 state = ConnectionState.PARTIAL 146 else: 147 state = ConnectionState.CONNECTED 148 149 return ConnectionStatus( 150 state=state, 151 source=source, 152 present_files=present_files, 153 missing_files=missing_files, 154 )
Domain aggregate that manages the lifecycle of agentic file connections.
Connection( template_source_port: smith.domain.ports.TemplateSourcePort, filesystem_port: smith.domain.ports.FileSystemPort, gitignore_port: smith.domain.ports.GitignorePort, metadata_port: smith.domain.ports.MetadataPort)
23 def __init__( 24 self, 25 template_source_port: TemplateSourcePort, 26 filesystem_port: FileSystemPort, 27 gitignore_port: GitignorePort, 28 metadata_port: MetadataPort, 29 ) -> None: 30 """Initialise the Connection with its required ports.""" 31 self._template_source_port = template_source_port 32 self._filesystem_port = filesystem_port 33 self._gitignore_port = gitignore_port 34 self._metadata_port = metadata_port
Initialise the Connection with its required ports.
def
connect( self, source: smith.domain.value_objects.TemplateSource, overwrite: bool = False) -> None:
36 def connect( 37 self, 38 source: TemplateSource, 39 overwrite: bool = False, 40 ) -> None: 41 """Write agentic files and register the connection in .gitignore.""" 42 specs = self._template_source_port.resolve() 43 paths = [spec.relative_path for spec in specs] 44 45 specs = self._resolve_specs(specs, paths, overwrite) 46 self._commit(specs, source)
Write agentic files and register the connection in .gitignore.
def
disconnect(self) -> list[pathlib._local.Path]:
86 def disconnect(self) -> list[Path]: 87 """Remove agentic files and the .gitignore section; return removed paths.""" 88 if not self._gitignore_port.has_section(): 89 return [] 90 91 managed_patterns = self._gitignore_port.get_patterns() 92 if not managed_patterns: 93 return [] 94 95 all_template_specs = self._template_source_port.resolve() 96 all_template_paths = [spec.relative_path for spec in all_template_specs] 97 98 managed_paths = [ 99 p for p in all_template_paths if self._is_path_managed(p, managed_patterns) 100 ] 101 102 existence = self._filesystem_port.exists(managed_paths) 103 paths_to_remove = [p for p in managed_paths if existence.get(p, False)] 104 105 self._filesystem_port.remove(paths_to_remove) 106 return paths_to_remove
Remove agentic files and the .gitignore section; return removed paths.
108 def update( 109 self, 110 source: TemplateSource | None = None, 111 ) -> None: 112 """Refresh agentic files, optionally from a new template source.""" 113 if not self._gitignore_port.has_section(): 114 fallback_source = source or TemplateSource( 115 kind="bundled", location="agents-smith" 116 ) 117 return self.connect(source=fallback_source) 118 119 resolved_source = ( 120 source 121 or self._metadata_port.load_source() 122 or TemplateSource(kind="bundled", location="agents-smith") 123 ) 124 125 specs = self._template_source_port.resolve() 126 paths = [spec.relative_path for spec in specs] 127 128 specs = self._resolve_specs(specs, paths) 129 130 self._commit(specs, resolved_source) 131 return None
Refresh agentic files, optionally from a new template source.
133 def status(self) -> ConnectionStatus: 134 """Return the current connection status of the project.""" 135 source = self._metadata_port.load_source() 136 all_specs = self._template_source_port.resolve() 137 all_paths = [spec.relative_path for spec in all_specs] 138 existence = self._filesystem_port.exists(all_paths) 139 present_files = [p for p in all_paths if existence.get(p, False)] 140 missing_files = [p for p in all_paths if not existence.get(p, False)] 141 142 if not self._gitignore_port.has_section() or not present_files: 143 state = ConnectionState.DISCONNECTED 144 elif missing_files: 145 state = ConnectionState.PARTIAL 146 else: 147 state = ConnectionState.CONNECTED 148 149 return ConnectionStatus( 150 state=state, 151 source=source, 152 present_files=present_files, 153 missing_files=missing_files, 154 )
Return the current connection status of the project.