project-standalo-note-to-app/skills/guardrail-orchestrator/scripts/build_relations.py

746 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Relationship Builder
Analyzes design documents and code to build a comprehensive relationship graph
between database models, APIs, components, and pages.
Usage:
python3 build_relations.py [--design-doc PATH] [--project-dir PATH] [--output PATH]
Output:
relations.yml - Entity relationship graph with dependency chains
Exit codes:
0 = Success
1 = Error
"""
import os
import sys
import re
import json
from pathlib import Path
from typing import Dict, List, Any, Set, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
@dataclass
class Entity:
"""Represents an entity in the system."""
id: str
type: str # 'database', 'api', 'component', 'page'
name: str
file_path: str = ""
layer: int = 0
# Relationships
depends_on: List[str] = field(default_factory=list)
used_by: List[str] = field(default_factory=list)
# Metadata
status: str = "pending"
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'type': self.type,
'name': self.name,
'file_path': self.file_path,
'layer': self.layer,
'depends_on': self.depends_on,
'used_by': self.used_by,
'status': self.status,
}
@dataclass
class Relation:
"""Represents a relationship between two entities."""
source_id: str
target_id: str
relation_type: str # 'queries', 'calls', 'imports', 'uses', 'renders'
def to_dict(self) -> Dict[str, Any]:
return {
'source': self.source_id,
'target': self.target_id,
'type': self.relation_type,
}
class RelationshipBuilder:
"""Builds entity relationship graph from design documents and code analysis."""
# Layer definitions
LAYER_DATABASE = 1
LAYER_API = 2
LAYER_COMPONENT = 3
LAYER_PAGE = 4
def __init__(self, project_dir: Path, design_doc_path: Optional[Path] = None):
self.project_dir = project_dir
self.design_doc_path = design_doc_path
self.design: Dict[str, Any] = {}
# Entity storage
self.entities: Dict[str, Entity] = {}
self.relations: List[Relation] = []
# Type mappings for API response → Component prop validation
self.type_mappings: Dict[str, Dict[str, str]] = {} # entity_id -> {field: type}
def load_design_document(self) -> bool:
"""Load design document if provided."""
if not self.design_doc_path:
# Try to find design document
self.design_doc_path = self._find_design_document()
if not self.design_doc_path or not self.design_doc_path.exists():
return False
try:
content = self.design_doc_path.read_text()
if HAS_YAML:
self.design = yaml.safe_load(content) or {}
else:
# Fallback JSON parser for simple YAML
self.design = self._parse_yaml_fallback(content)
return True
except Exception as e:
print(f"Warning: Could not load design document: {e}", file=sys.stderr)
return False
def _find_design_document(self) -> Optional[Path]:
"""Find design document in workflow versions."""
workflow_dir = self.project_dir / ".workflow" / "versions"
if not workflow_dir.exists():
return None
# Find latest version
versions = sorted(
[d for d in workflow_dir.iterdir() if d.is_dir() and d.name.startswith('v')],
reverse=True
)
for version_dir in versions:
design_doc = version_dir / "design" / "design_document.yml"
if design_doc.exists():
return design_doc
return None
def _parse_yaml_fallback(self, content: str) -> Dict[str, Any]:
"""Simple YAML parser fallback."""
# This is a basic fallback - won't handle complex YAML
result: Dict[str, Any] = {}
current_section = None
for line in content.split('\n'):
stripped = line.strip()
if not stripped or stripped.startswith('#'):
continue
if ':' in stripped and not stripped.startswith('-'):
key, _, value = stripped.partition(':')
key = key.strip()
value = value.strip()
if value:
result[key] = value
else:
result[key] = {}
current_section = key
return result
def extract_from_design(self):
"""Extract entities and relationships from design document."""
if not self.design:
return
# Extract database models
self._extract_models()
# Extract API endpoints
self._extract_apis()
# Extract components
self._extract_components()
# Extract pages
self._extract_pages()
# Build reverse relationships (used_by)
self._build_reverse_relations()
def _extract_models(self):
"""Extract database models from design document."""
models = self.design.get('data_models', [])
if isinstance(models, dict):
models = models.get('models', [])
for model in models:
if not isinstance(model, dict):
continue
model_id = model.get('id', '')
if not model_id:
continue
entity = Entity(
id=model_id,
type='database',
name=model.get('name', model_id),
file_path=model.get('file_path', f"prisma/schema.prisma"),
layer=self.LAYER_DATABASE,
status=model.get('status', 'pending'),
)
# Extract field types for later validation
fields = model.get('fields', [])
self.type_mappings[model_id] = {}
for field in fields:
if isinstance(field, dict):
field_name = field.get('name', '')
field_type = field.get('type', 'string')
if field_name:
self.type_mappings[model_id][field_name] = field_type
# Database models don't depend on other entities (Layer 1)
# But they may have relations to other models
relations = model.get('relations', [])
for rel in relations:
if isinstance(rel, dict):
target = rel.get('target', '')
if target:
entity.depends_on.append(target)
self.relations.append(Relation(
source_id=model_id,
target_id=target,
relation_type='references',
))
self.entities[model_id] = entity
def _extract_apis(self):
"""Extract API endpoints from design document."""
apis = self.design.get('api_endpoints', [])
if isinstance(apis, dict):
apis = apis.get('endpoints', [])
for api in apis:
if not isinstance(api, dict):
continue
api_id = api.get('id', '')
if not api_id:
continue
entity = Entity(
id=api_id,
type='api',
name=api.get('summary', api_id),
file_path=api.get('file_path', self._infer_api_path(api)),
layer=self.LAYER_API,
status=api.get('status', 'pending'),
)
# APIs depend on database models
depends_on_models = api.get('depends_on_models', [])
for model_id in depends_on_models:
entity.depends_on.append(model_id)
self.relations.append(Relation(
source_id=api_id,
target_id=model_id,
relation_type='queries',
))
# APIs may depend on other APIs
depends_on_apis = api.get('depends_on_apis', [])
for dep_api_id in depends_on_apis:
entity.depends_on.append(dep_api_id)
self.relations.append(Relation(
source_id=api_id,
target_id=dep_api_id,
relation_type='calls',
))
# Store response schema for type validation
responses = api.get('responses', [])
for response in responses:
if isinstance(response, dict) and response.get('status') in [200, 201]:
schema = response.get('schema', {})
props = schema.get('properties', [])
self.type_mappings[api_id] = {}
for prop in props:
if isinstance(prop, dict):
prop_name = prop.get('name', '')
prop_type = prop.get('type', 'any')
if prop_name:
self.type_mappings[api_id][prop_name] = prop_type
self.entities[api_id] = entity
def _infer_api_path(self, api: Dict[str, Any]) -> str:
"""Infer file path for API endpoint."""
path = api.get('path', '')
if not path:
return ''
# Convert /api/users/:id to app/api/users/[id]/route.ts
parts = path.strip('/').split('/')
if parts and parts[0] == 'api':
parts = parts[1:]
# Replace :param with [param]
parts = [f"[{p[1:]}]" if p.startswith(':') else p for p in parts]
return f"app/api/{'/'.join(parts)}/route.ts"
def _extract_components(self):
"""Extract UI components from design document."""
components = self.design.get('components', [])
if isinstance(components, dict):
components = components.get('components', [])
for comp in components:
if not isinstance(comp, dict):
continue
comp_id = comp.get('id', '')
if not comp_id:
continue
entity = Entity(
id=comp_id,
type='component',
name=comp.get('name', comp_id),
file_path=comp.get('file_path', ''),
layer=self.LAYER_COMPONENT,
status=comp.get('status', 'pending'),
)
# Components may use APIs directly
uses_apis = comp.get('uses_apis', [])
for api_id in uses_apis:
entity.depends_on.append(api_id)
self.relations.append(Relation(
source_id=comp_id,
target_id=api_id,
relation_type='calls',
))
# Components may use other components
uses_components = comp.get('uses_components', [])
for child_comp_id in uses_components:
entity.depends_on.append(child_comp_id)
self.relations.append(Relation(
source_id=comp_id,
target_id=child_comp_id,
relation_type='imports',
))
# Store prop types for validation
props = comp.get('props', [])
self.type_mappings[comp_id] = {}
for prop in props:
if isinstance(prop, dict):
prop_name = prop.get('name', '')
prop_type = prop.get('type', 'any')
if prop_name:
self.type_mappings[comp_id][prop_name] = prop_type
self.entities[comp_id] = entity
def _extract_pages(self):
"""Extract pages from design document."""
pages = self.design.get('pages', [])
if isinstance(pages, dict):
pages = pages.get('pages', [])
for page in pages:
if not isinstance(page, dict):
continue
page_id = page.get('id', '')
if not page_id:
continue
entity = Entity(
id=page_id,
type='page',
name=page.get('name', page_id),
file_path=page.get('file_path', self._infer_page_path(page)),
layer=self.LAYER_PAGE,
status=page.get('status', 'pending'),
)
# Pages need data from APIs
data_needs = page.get('data_needs', [])
for need in data_needs:
if isinstance(need, dict):
api_id = need.get('api_id', '')
elif isinstance(need, str):
api_id = need
else:
continue
if api_id:
entity.depends_on.append(api_id)
self.relations.append(Relation(
source_id=page_id,
target_id=api_id,
relation_type='calls',
))
# Pages use components
components = page.get('components', [])
for comp_id in components:
entity.depends_on.append(comp_id)
self.relations.append(Relation(
source_id=page_id,
target_id=comp_id,
relation_type='renders',
))
self.entities[page_id] = entity
def _infer_page_path(self, page: Dict[str, Any]) -> str:
"""Infer file path for page."""
path = page.get('path', '')
if not path:
return ''
# Convert /users/[id] to app/users/[id]/page.tsx
parts = path.strip('/').split('/')
return f"app/{'/'.join(parts)}/page.tsx"
def _build_reverse_relations(self):
"""Build used_by relationships from depends_on."""
for entity_id, entity in self.entities.items():
for dep_id in entity.depends_on:
if dep_id in self.entities:
if entity_id not in self.entities[dep_id].used_by:
self.entities[dep_id].used_by.append(entity_id)
def analyze_code(self):
"""Analyze actual code to discover additional relationships."""
app_dir = self.project_dir / "app"
components_dir = self.project_dir / "components"
if not app_dir.exists():
return
# Scan for API calls in components/pages
self._scan_for_api_calls(app_dir)
if components_dir.exists():
self._scan_for_api_calls(components_dir)
# Scan for component imports
self._scan_for_component_imports(app_dir)
def _scan_for_api_calls(self, directory: Path):
"""Scan directory for API calls."""
for file_path in directory.glob("**/*.tsx"):
# Skip API route files
if '/api/' in str(file_path):
continue
try:
content = file_path.read_text()
# Find API calls
patterns = [
r'fetch\(["\']([^"\']+)["\']',
r'axios\.\w+\(["\']([^"\']+)["\']',
r'useSWR\(["\']([^"\']+)["\']',
]
for pattern in patterns:
for match in re.finditer(pattern, content):
api_path = match.group(1)
if api_path.startswith('/api/'):
self._record_discovered_api_call(file_path, api_path)
except Exception:
pass
def _record_discovered_api_call(self, file_path: Path, api_path: str):
"""Record a discovered API call from code analysis."""
# Find matching API entity
for entity_id, entity in self.entities.items():
if entity.type == 'api':
# Compare paths
design_path = self._get_api_url_path(entity_id)
if design_path and self._paths_match(design_path, api_path):
# Find the caller entity
caller_id = self._find_entity_by_file(file_path)
if caller_id and entity_id not in self.entities[caller_id].depends_on:
self.entities[caller_id].depends_on.append(entity_id)
self.relations.append(Relation(
source_id=caller_id,
target_id=entity_id,
relation_type='calls',
))
def _get_api_url_path(self, api_id: str) -> str:
"""Get URL path for an API entity."""
apis = self.design.get('api_endpoints', [])
if isinstance(apis, dict):
apis = apis.get('endpoints', [])
for api in apis:
if isinstance(api, dict) and api.get('id') == api_id:
return api.get('path', '')
return ''
def _paths_match(self, design_path: str, code_path: str) -> bool:
"""Check if design path matches code path (handles params)."""
# Normalize paths
design_parts = design_path.strip('/').split('/')
code_parts = code_path.strip('/').split('/')
if len(design_parts) != len(code_parts):
return False
for d, c in zip(design_parts, code_parts):
# :id in design matches anything in code
if d.startswith(':'):
continue
# ${var} in code matches :param in design
if '${' in c:
continue
if d != c:
return False
return True
def _find_entity_by_file(self, file_path: Path) -> Optional[str]:
"""Find entity ID by file path."""
rel_path = str(file_path.relative_to(self.project_dir))
for entity_id, entity in self.entities.items():
if entity.file_path and rel_path.endswith(entity.file_path.lstrip('./')):
return entity_id
return None
def _scan_for_component_imports(self, directory: Path):
"""Scan for component import statements."""
for file_path in directory.glob("**/*.tsx"):
try:
content = file_path.read_text()
# Find component imports
import_pattern = r'import\s+(?:{[^}]+}|\w+)\s+from\s+["\']([^"\']+)["\']'
for match in re.finditer(import_pattern, content):
import_path = match.group(1)
if 'components' in import_path.lower():
# This file imports from components
pass # Already handled by design document
except Exception:
pass
def get_dependency_chain(self, entity_id: str) -> List[str]:
"""Get full dependency chain for an entity (what it depends on, transitively)."""
visited: Set[str] = set()
chain: List[str] = []
def traverse(eid: str):
if eid in visited:
return
visited.add(eid)
if eid in self.entities:
for dep_id in self.entities[eid].depends_on:
traverse(dep_id)
if dep_id not in chain:
chain.append(dep_id)
traverse(entity_id)
return chain
def get_impact_chain(self, entity_id: str) -> List[str]:
"""Get impact chain for an entity (what depends on it, transitively)."""
visited: Set[str] = set()
chain: List[str] = []
def traverse(eid: str):
if eid in visited:
return
visited.add(eid)
if eid in self.entities:
for dep_id in self.entities[eid].used_by:
if dep_id not in chain:
chain.append(dep_id)
traverse(dep_id)
traverse(entity_id)
return chain
def detect_circular_dependencies(self) -> List[List[str]]:
"""Detect circular dependencies in the graph."""
cycles: List[List[str]] = []
visited: Set[str] = set()
path: List[str] = []
def dfs(entity_id: str):
if entity_id in path:
# Found cycle
cycle_start = path.index(entity_id)
cycles.append(path[cycle_start:] + [entity_id])
return
if entity_id in visited:
return
visited.add(entity_id)
path.append(entity_id)
if entity_id in self.entities:
for dep_id in self.entities[entity_id].depends_on:
dfs(dep_id)
path.pop()
for entity_id in self.entities:
path = []
dfs(entity_id)
return cycles
def build(self) -> Dict[str, Any]:
"""Build complete relationship graph."""
self.load_design_document()
self.extract_from_design()
self.analyze_code()
# Detect issues
cycles = self.detect_circular_dependencies()
# Build output structure
output = {
'version': '1.0',
'generated_at': datetime.now().isoformat(),
'source': str(self.design_doc_path) if self.design_doc_path else 'code_analysis',
'entities': {
'database': [],
'api': [],
'component': [],
'page': [],
},
'relations': [r.to_dict() for r in self.relations],
'dependency_chains': {},
'impact_chains': {},
'type_mappings': self.type_mappings,
'issues': {
'circular_dependencies': cycles,
},
'statistics': {
'total_entities': len(self.entities),
'total_relations': len(self.relations),
'by_type': {
'database': 0,
'api': 0,
'component': 0,
'page': 0,
},
'by_layer': {
'1_database': 0,
'2_api': 0,
'3_component': 0,
'4_page': 0,
},
},
}
# Populate entities by type
for entity_id, entity in self.entities.items():
entity_dict = entity.to_dict()
output['entities'][entity.type].append(entity_dict)
output['statistics']['by_type'][entity.type] += 1
output['statistics']['by_layer'][f"{entity.layer}_{entity.type}"] += 1
# Build dependency and impact chains
output['dependency_chains'][entity_id] = self.get_dependency_chain(entity_id)
output['impact_chains'][entity_id] = self.get_impact_chain(entity_id)
return output
def to_yaml(self) -> str:
"""Convert output to YAML string."""
output = self.build()
if HAS_YAML:
return yaml.dump(output, default_flow_style=False, sort_keys=False)
else:
# Fallback to JSON-like format
return json.dumps(output, indent=2)
def main():
import argparse
parser = argparse.ArgumentParser(description="Build entity relationship graph")
parser.add_argument('--design-doc', '-d', type=Path, help='Path to design document')
parser.add_argument('--project-dir', '-p', type=Path, default=Path('.'), help='Project directory')
parser.add_argument('--output', '-o', type=Path, help='Output file path')
parser.add_argument('--json', action='store_true', help='Output as JSON')
args = parser.parse_args()
project_dir = args.project_dir.resolve()
builder = RelationshipBuilder(project_dir, args.design_doc)
output = builder.build()
if args.json:
content = json.dumps(output, indent=2)
else:
content = builder.to_yaml()
if args.output:
args.output.write_text(content)
print(f"Relations written to: {args.output}")
else:
print(content)
# Print summary
print("\n" + "=" * 60, file=sys.stderr)
print("RELATIONSHIP GRAPH BUILT", file=sys.stderr)
print("=" * 60, file=sys.stderr)
print(f" Entities: {output['statistics']['total_entities']}", file=sys.stderr)
print(f" Relations: {output['statistics']['total_relations']}", file=sys.stderr)
print(f" Database models: {output['statistics']['by_type']['database']}", file=sys.stderr)
print(f" API endpoints: {output['statistics']['by_type']['api']}", file=sys.stderr)
print(f" Components: {output['statistics']['by_type']['component']}", file=sys.stderr)
print(f" Pages: {output['statistics']['by_type']['page']}", file=sys.stderr)
if output['issues']['circular_dependencies']:
print(f"\n ⚠️ Circular dependencies detected: {len(output['issues']['circular_dependencies'])}", file=sys.stderr)
print("", file=sys.stderr)
sys.exit(0)
if __name__ == '__main__':
main()