523 lines
17 KiB
Python
523 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Integration Validator
|
||
Validates that new components, pages, and APIs are properly integrated
|
||
with the existing project structure.
|
||
|
||
Usage:
|
||
python3 validate_integration.py [--design-doc PATH] [--project-dir PATH]
|
||
|
||
Exit codes:
|
||
0 = All integrations valid
|
||
1 = Integration issues found (warnings)
|
||
2 = Critical integration failures
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import re
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Set, Tuple, Optional
|
||
from dataclasses import dataclass, field
|
||
|
||
try:
|
||
import yaml
|
||
except ImportError:
|
||
yaml = None
|
||
|
||
|
||
@dataclass
|
||
class IntegrationIssue:
|
||
"""Represents an integration issue."""
|
||
severity: str # 'error', 'warning', 'info'
|
||
category: str # 'navigation', 'import', 'api_wiring', 'export'
|
||
entity_id: str
|
||
message: str
|
||
suggestion: str = ""
|
||
file_path: str = ""
|
||
|
||
|
||
@dataclass
|
||
class IntegrationReport:
|
||
"""Full integration validation report."""
|
||
issues: List[IntegrationIssue] = field(default_factory=list)
|
||
navigation_status: Dict[str, bool] = field(default_factory=dict)
|
||
component_usage: Dict[str, List[str]] = field(default_factory=dict)
|
||
api_wiring: Dict[str, List[str]] = field(default_factory=dict)
|
||
barrel_exports: Dict[str, bool] = field(default_factory=dict)
|
||
|
||
@property
|
||
def error_count(self) -> int:
|
||
return len([i for i in self.issues if i.severity == 'error'])
|
||
|
||
@property
|
||
def warning_count(self) -> int:
|
||
return len([i for i in self.issues if i.severity == 'warning'])
|
||
|
||
|
||
class IntegrationValidator:
|
||
"""Validates integration of new features with existing project."""
|
||
|
||
def __init__(self, project_dir: Path, design_doc_path: Optional[Path] = None):
|
||
self.project_dir = project_dir
|
||
self.design_doc_path = design_doc_path
|
||
self.design: Dict[str, Any] = {}
|
||
self.report = IntegrationReport()
|
||
|
||
# Discovered existing structure
|
||
self.nav_files: List[Path] = []
|
||
self.layout_files: List[Path] = []
|
||
self.index_files: List[Path] = []
|
||
|
||
def load_design_document(self) -> bool:
|
||
"""Load design document if provided."""
|
||
if not self.design_doc_path or not self.design_doc_path.exists():
|
||
return False
|
||
|
||
if yaml is None:
|
||
print("Warning: PyYAML not installed, using JSON fallback", file=sys.stderr)
|
||
return False
|
||
|
||
try:
|
||
with open(self.design_doc_path) as f:
|
||
self.design = yaml.safe_load(f)
|
||
return True
|
||
except Exception as e:
|
||
print(f"Warning: Could not load design document: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
def discover_project_structure(self):
|
||
"""Find navigation, layout, and index files."""
|
||
app_dir = self.project_dir / "app"
|
||
components_dir = self.project_dir / "components"
|
||
|
||
# Find navigation files
|
||
nav_patterns = [
|
||
"**/nav*.tsx", "**/Nav*.tsx",
|
||
"**/sidebar*.tsx", "**/Sidebar*.tsx",
|
||
"**/header*.tsx", "**/Header*.tsx",
|
||
"**/menu*.tsx", "**/Menu*.tsx",
|
||
"**/navigation*.tsx", "**/Navigation*.tsx",
|
||
]
|
||
|
||
for pattern in nav_patterns:
|
||
for d in [app_dir, components_dir]:
|
||
if d.exists():
|
||
self.nav_files.extend(d.glob(pattern))
|
||
|
||
# Find layout files
|
||
if app_dir.exists():
|
||
self.layout_files = list(app_dir.glob("**/layout.tsx"))
|
||
|
||
# Find index/barrel export files
|
||
for d in [app_dir, components_dir, self.project_dir / "lib"]:
|
||
if d.exists():
|
||
self.index_files.extend(d.glob("**/index.ts"))
|
||
self.index_files.extend(d.glob("**/index.tsx"))
|
||
|
||
def get_new_pages(self) -> List[Dict[str, Any]]:
|
||
"""Get new pages from design document."""
|
||
return self.design.get('pages', [])
|
||
|
||
def get_new_components(self) -> List[Dict[str, Any]]:
|
||
"""Get new components from design document."""
|
||
return self.design.get('components', [])
|
||
|
||
def get_new_apis(self) -> List[Dict[str, Any]]:
|
||
"""Get new API endpoints from design document."""
|
||
return self.design.get('api_endpoints', [])
|
||
|
||
def check_navigation_integration(self):
|
||
"""Check if new pages are added to navigation."""
|
||
new_pages = self.get_new_pages()
|
||
if not new_pages:
|
||
return
|
||
|
||
# Read all navigation files
|
||
nav_content = ""
|
||
for nav_file in self.nav_files:
|
||
try:
|
||
nav_content += nav_file.read_text()
|
||
except:
|
||
pass
|
||
|
||
# Also check layout files for navigation
|
||
for layout_file in self.layout_files:
|
||
try:
|
||
nav_content += layout_file.read_text()
|
||
except:
|
||
pass
|
||
|
||
for page in new_pages:
|
||
page_id = page.get('id', '')
|
||
page_path = page.get('path', '')
|
||
page_name = page.get('name', page_id)
|
||
|
||
if not page_path:
|
||
continue
|
||
|
||
# Check if page path appears in navigation
|
||
# Look for href="<path>" or to="<path>" or path: "<path>"
|
||
patterns = [
|
||
rf'href=["\']({re.escape(page_path)})["\']',
|
||
rf'to=["\']({re.escape(page_path)})["\']',
|
||
rf'path:\s*["\']({re.escape(page_path)})["\']',
|
||
rf'["\']({re.escape(page_path)})["\']',
|
||
]
|
||
|
||
found = False
|
||
for pattern in patterns:
|
||
if re.search(pattern, nav_content):
|
||
found = True
|
||
break
|
||
|
||
self.report.navigation_status[page_id] = found
|
||
|
||
if not found:
|
||
self.report.issues.append(IntegrationIssue(
|
||
severity='warning',
|
||
category='navigation',
|
||
entity_id=page_id,
|
||
message=f"Page '{page_name}' ({page_path}) not found in navigation",
|
||
suggestion=f"Add a link to '{page_path}' in your navigation component",
|
||
file_path=str(self.nav_files[0]) if self.nav_files else "app/layout.tsx"
|
||
))
|
||
|
||
def check_component_usage(self):
|
||
"""Check if new components are imported and used somewhere."""
|
||
new_components = self.get_new_components()
|
||
if not new_components:
|
||
return
|
||
|
||
# Scan all tsx/jsx files for imports
|
||
app_dir = self.project_dir / "app"
|
||
all_files: List[Path] = []
|
||
|
||
if app_dir.exists():
|
||
all_files.extend(app_dir.glob("**/*.tsx"))
|
||
all_files.extend(app_dir.glob("**/*.jsx"))
|
||
|
||
for comp in new_components:
|
||
comp_id = comp.get('id', '')
|
||
comp_name = comp.get('name', '')
|
||
|
||
if not comp_name:
|
||
continue
|
||
|
||
# Find files that import this component
|
||
usage_files: List[str] = []
|
||
|
||
for file_path in all_files:
|
||
# Skip the component's own file
|
||
if comp_name.lower() in file_path.name.lower():
|
||
continue
|
||
|
||
try:
|
||
content = file_path.read_text()
|
||
|
||
# Look for import patterns
|
||
import_patterns = [
|
||
rf'import\s+.*{comp_name}.*from',
|
||
rf'import\s+{{{comp_name}',
|
||
rf'<{comp_name}[\s/>]',
|
||
]
|
||
|
||
for pattern in import_patterns:
|
||
if re.search(pattern, content):
|
||
usage_files.append(str(file_path.relative_to(self.project_dir)))
|
||
break
|
||
except:
|
||
pass
|
||
|
||
self.report.component_usage[comp_id] = usage_files
|
||
|
||
if not usage_files:
|
||
self.report.issues.append(IntegrationIssue(
|
||
severity='warning',
|
||
category='import',
|
||
entity_id=comp_id,
|
||
message=f"Component '{comp_name}' is not imported/used anywhere",
|
||
suggestion=f"Import and use <{comp_name} /> in a page or parent component"
|
||
))
|
||
|
||
def check_api_wiring(self):
|
||
"""Check if new APIs are called from frontend."""
|
||
new_apis = self.get_new_apis()
|
||
if not new_apis:
|
||
return
|
||
|
||
# Scan all tsx/jsx/ts files for API calls
|
||
app_dir = self.project_dir / "app"
|
||
all_files: List[Path] = []
|
||
|
||
if app_dir.exists():
|
||
all_files.extend(app_dir.glob("**/*.tsx"))
|
||
all_files.extend(app_dir.glob("**/*.ts"))
|
||
|
||
# Exclude api routes themselves
|
||
all_files = [f for f in all_files if '/api/' not in str(f)]
|
||
|
||
for api in new_apis:
|
||
api_id = api.get('id', '')
|
||
api_path = api.get('path', '')
|
||
|
||
if not api_path:
|
||
continue
|
||
|
||
# Normalize path for matching
|
||
# /api/songs → /api/songs
|
||
# /api/songs/:id → /api/songs/
|
||
normalized_path = re.sub(r':\w+', '', api_path).rstrip('/')
|
||
|
||
# Find files that call this API
|
||
usage_files: List[str] = []
|
||
|
||
for file_path in all_files:
|
||
try:
|
||
content = file_path.read_text()
|
||
|
||
# Look for API call patterns
|
||
patterns = [
|
||
rf'fetch\(["\']({re.escape(normalized_path)})',
|
||
rf'axios\.\w+\(["\']({re.escape(normalized_path)})',
|
||
rf'useSWR\(["\']({re.escape(normalized_path)})',
|
||
rf'useQuery.*["\']({re.escape(normalized_path)})',
|
||
rf'["\']({re.escape(normalized_path)})["\']',
|
||
]
|
||
|
||
for pattern in patterns:
|
||
if re.search(pattern, content):
|
||
usage_files.append(str(file_path.relative_to(self.project_dir)))
|
||
break
|
||
except:
|
||
pass
|
||
|
||
self.report.api_wiring[api_id] = usage_files
|
||
|
||
if not usage_files:
|
||
method = api.get('method', 'GET')
|
||
self.report.issues.append(IntegrationIssue(
|
||
severity='info',
|
||
category='api_wiring',
|
||
entity_id=api_id,
|
||
message=f"API '{method} {api_path}' is not called from frontend",
|
||
suggestion=f"Add a fetch/useSWR call to '{api_path}' in relevant components"
|
||
))
|
||
|
||
def check_barrel_exports(self):
|
||
"""Check if new components are exported from index files."""
|
||
new_components = self.get_new_components()
|
||
if not new_components:
|
||
return
|
||
|
||
# Read all index files
|
||
index_content = ""
|
||
for index_file in self.index_files:
|
||
try:
|
||
index_content += index_file.read_text()
|
||
except:
|
||
pass
|
||
|
||
for comp in new_components:
|
||
comp_id = comp.get('id', '')
|
||
comp_name = comp.get('name', '')
|
||
|
||
if not comp_name:
|
||
continue
|
||
|
||
# Check if component is exported
|
||
export_patterns = [
|
||
rf'export\s*{{\s*{comp_name}',
|
||
rf'export\s+{{\s*{comp_name}',
|
||
rf'export\s+\*\s+from\s+["\'].*{comp_name}',
|
||
]
|
||
|
||
found = False
|
||
for pattern in export_patterns:
|
||
if re.search(pattern, index_content):
|
||
found = True
|
||
break
|
||
|
||
self.report.barrel_exports[comp_id] = found
|
||
|
||
# This is just informational, not a warning
|
||
|
||
def validate(self) -> IntegrationReport:
|
||
"""Run all integration validations."""
|
||
self.load_design_document()
|
||
self.discover_project_structure()
|
||
|
||
self.check_navigation_integration()
|
||
self.check_component_usage()
|
||
self.check_api_wiring()
|
||
self.check_barrel_exports()
|
||
|
||
return self.report
|
||
|
||
def print_report(self):
|
||
"""Print human-readable report."""
|
||
print("\n" + "=" * 70)
|
||
print("INTEGRATION VALIDATION REPORT")
|
||
print("=" * 70)
|
||
|
||
# Navigation Status
|
||
print("\n📍 NAVIGATION INTEGRATION")
|
||
print("-" * 40)
|
||
if self.report.navigation_status:
|
||
for page_id, integrated in self.report.navigation_status.items():
|
||
status = "✅" if integrated else "❌"
|
||
print(f" {status} {page_id}")
|
||
else:
|
||
print(" No new pages to integrate")
|
||
|
||
# Component Usage
|
||
print("\n🧩 COMPONENT USAGE")
|
||
print("-" * 40)
|
||
if self.report.component_usage:
|
||
for comp_id, usage in self.report.component_usage.items():
|
||
if usage:
|
||
print(f" ✅ {comp_id}")
|
||
for file in usage[:3]:
|
||
print(f" └─ {file}")
|
||
if len(usage) > 3:
|
||
print(f" └─ ... and {len(usage) - 3} more")
|
||
else:
|
||
print(f" ⚠️ {comp_id} - NOT USED ANYWHERE")
|
||
else:
|
||
print(" No new components to integrate")
|
||
|
||
# API Wiring
|
||
print("\n🔌 API WIRING")
|
||
print("-" * 40)
|
||
if self.report.api_wiring:
|
||
for api_id, usage in self.report.api_wiring.items():
|
||
if usage:
|
||
print(f" ✅ {api_id}")
|
||
for file in usage[:3]:
|
||
print(f" └─ {file}")
|
||
else:
|
||
print(f" ℹ️ {api_id} - not called from frontend")
|
||
else:
|
||
print(" No new APIs to integrate")
|
||
|
||
# Issues Summary
|
||
if self.report.issues:
|
||
print("\n" + "=" * 70)
|
||
print("INTEGRATION ISSUES")
|
||
print("=" * 70)
|
||
|
||
for issue in self.report.issues:
|
||
if issue.severity == 'error':
|
||
icon = "❌"
|
||
elif issue.severity == 'warning':
|
||
icon = "⚠️ "
|
||
else:
|
||
icon = "ℹ️ "
|
||
|
||
print(f"\n{icon} [{issue.category.upper()}] {issue.entity_id}")
|
||
print(f" {issue.message}")
|
||
if issue.suggestion:
|
||
print(f" 💡 {issue.suggestion}")
|
||
if issue.file_path:
|
||
print(f" 📁 {issue.file_path}")
|
||
|
||
# Summary
|
||
print("\n" + "=" * 70)
|
||
print("SUMMARY")
|
||
print("=" * 70)
|
||
print(f" Errors: {self.report.error_count}")
|
||
print(f" Warnings: {self.report.warning_count}")
|
||
|
||
if self.report.error_count > 0:
|
||
print("\n❌ INTEGRATION FAILED - Fix errors before proceeding")
|
||
elif self.report.warning_count > 0:
|
||
print("\n⚠️ INTEGRATION INCOMPLETE - Review warnings")
|
||
else:
|
||
print("\n✅ INTEGRATION COMPLETE")
|
||
|
||
print("")
|
||
|
||
|
||
def find_latest_design_doc(project_dir: Path) -> Optional[Path]:
|
||
"""Find the latest design document from workflow versions."""
|
||
workflow_dir = project_dir / ".workflow" / "versions"
|
||
|
||
if not workflow_dir.exists():
|
||
return None
|
||
|
||
# Find all version directories
|
||
versions = sorted(
|
||
[d for d in workflow_dir.iterdir() if d.is_dir() and d.name.startswith('v')],
|
||
reverse=True
|
||
)
|
||
|
||
for version_dir in versions:
|
||
design_doc = version_dir / "design" / "design_document.yml"
|
||
if design_doc.exists():
|
||
return design_doc
|
||
|
||
return None
|
||
|
||
|
||
def main():
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="Validate integration of new features")
|
||
parser.add_argument('--design-doc', '-d', type=Path, help='Path to design document')
|
||
parser.add_argument('--project-dir', '-p', type=Path, default=Path('.'), help='Project directory')
|
||
parser.add_argument('--json', action='store_true', help='Output JSON format')
|
||
|
||
args = parser.parse_args()
|
||
|
||
project_dir = args.project_dir.resolve()
|
||
|
||
# Auto-detect design document if not provided
|
||
design_doc = args.design_doc
|
||
if design_doc is None:
|
||
design_doc = find_latest_design_doc(project_dir)
|
||
|
||
if design_doc and not design_doc.exists():
|
||
print(f"Warning: Design document not found: {design_doc}", file=sys.stderr)
|
||
design_doc = None
|
||
|
||
validator = IntegrationValidator(project_dir, design_doc)
|
||
report = validator.validate()
|
||
|
||
if args.json:
|
||
output = {
|
||
'navigation_status': report.navigation_status,
|
||
'component_usage': report.component_usage,
|
||
'api_wiring': report.api_wiring,
|
||
'barrel_exports': report.barrel_exports,
|
||
'issues': [
|
||
{
|
||
'severity': i.severity,
|
||
'category': i.category,
|
||
'entity_id': i.entity_id,
|
||
'message': i.message,
|
||
'suggestion': i.suggestion,
|
||
}
|
||
for i in report.issues
|
||
],
|
||
'summary': {
|
||
'errors': report.error_count,
|
||
'warnings': report.warning_count,
|
||
}
|
||
}
|
||
print(json.dumps(output, indent=2))
|
||
else:
|
||
validator.print_report()
|
||
|
||
# Exit codes
|
||
if report.error_count > 0:
|
||
sys.exit(2)
|
||
elif report.warning_count > 0:
|
||
sys.exit(1)
|
||
else:
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|