project-standalo-sonic-cloud/skills/guardrail-orchestrator/scripts/generate_api_contract.py

616 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Generate API Contract and Shared Types from Design Document
This script:
1. Reads the design_document.yml
2. Extracts all API endpoints and their types
3. Generates api_contract.yml with strict typing
4. Generates app/types/api.ts with shared TypeScript interfaces
Both frontend and backend agents MUST use these generated files
to ensure contract compliance.
"""
import os
import sys
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional, Set
try:
import yaml
except ImportError:
yaml = None
def load_yaml(path: Path) -> Dict:
"""Load YAML file."""
if yaml:
with open(path) as f:
return yaml.safe_load(f)
else:
# Fallback: simple YAML parser for basic cases
with open(path) as f:
content = f.read()
# Try JSON first (YAML is superset of JSON)
try:
return json.loads(content)
except:
print(f"Warning: yaml module not available, using basic parser", file=sys.stderr)
return {}
def save_yaml(data: Dict, path: Path) -> None:
"""Save data as YAML."""
if yaml:
with open(path, 'w') as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
else:
# Fallback: JSON format
with open(path, 'w') as f:
json.dump(data, f, indent=2)
def ts_type_from_field(field: Dict) -> str:
"""Convert design document field type to TypeScript type."""
type_map = {
'string': 'string',
'text': 'string',
'integer': 'number',
'float': 'number',
'decimal': 'number',
'boolean': 'boolean',
'datetime': 'Date',
'date': 'Date',
'uuid': 'string',
'json': 'Record<string, unknown>',
'array': 'unknown[]',
}
field_type = field.get('type', 'string')
# Handle enum type
if field_type == 'enum':
enum_values = field.get('enum_values', [])
if enum_values:
return ' | '.join([f"'{v}'" for v in enum_values])
return 'string'
return type_map.get(field_type, 'unknown')
def generate_type_from_model(model: Dict) -> Dict:
"""Generate TypeScript type definition from model."""
type_id = f"type_{model['name']}"
properties = []
for field in model.get('fields', []):
# Skip internal fields like password_hash
if field['name'].endswith('_hash'):
continue
constraints = field.get('constraints', [])
required = 'not_null' in constraints or 'primary_key' in constraints
properties.append({
'name': to_camel_case(field['name']),
'type': ts_type_from_field(field),
'required': required,
'description': field.get('description', ''),
})
return {
'id': type_id,
'name': model['name'],
'definition': {
'type': 'object',
'properties': properties,
},
'used_by': {
'models': [model['id']],
'responses': [],
'requests': [],
}
}
def generate_request_type(endpoint: Dict) -> Optional[Dict]:
"""Generate request body type from endpoint definition."""
request_body = endpoint.get('request_body', {})
if not request_body:
return None
schema = request_body.get('schema', {})
if not schema:
return None
# Generate type name from endpoint
parts = endpoint['id'].replace('api_', '').split('_')
type_name = ''.join([p.capitalize() for p in parts]) + 'Request'
type_id = f"type_{type_name}"
properties = []
for prop in schema.get('properties', []):
properties.append({
'name': to_camel_case(prop['name']),
'type': ts_type_from_field(prop),
'required': prop.get('required', False),
'description': prop.get('description', ''),
'validation': ','.join(prop.get('validations', [])) if prop.get('validations') else None,
})
return {
'id': type_id,
'name': type_name,
'definition': {
'type': 'object',
'properties': properties,
},
'used_by': {
'models': [],
'responses': [],
'requests': [endpoint['id']],
}
}
def generate_response_type(endpoint: Dict, models: Dict[str, Dict]) -> Optional[Dict]:
"""Generate response type from endpoint definition - may reference model types."""
responses = endpoint.get('responses', [])
success_response = None
for resp in responses:
status = resp.get('status', 0)
if 200 <= status < 300:
success_response = resp
break
if not success_response:
return None
# Check if this response references a model
depends_on = endpoint.get('depends_on_models', [])
if depends_on:
# Response likely uses model type
primary_model = depends_on[0]
if primary_model in models:
return None # Will use model type directly
# Generate custom response type
schema = success_response.get('schema', {})
if not schema or schema.get('type') != 'object':
return None
parts = endpoint['id'].replace('api_', '').split('_')
type_name = ''.join([p.capitalize() for p in parts]) + 'Response'
type_id = f"type_{type_name}"
properties = []
for prop in schema.get('properties', []):
properties.append({
'name': to_camel_case(prop['name']),
'type': ts_type_from_field(prop),
'required': True,
'description': '',
})
return {
'id': type_id,
'name': type_name,
'definition': {
'type': 'object',
'properties': properties,
},
'used_by': {
'models': [],
'responses': [endpoint['id']],
'requests': [],
}
}
def to_camel_case(snake_str: str) -> str:
"""Convert snake_case to camelCase."""
components = snake_str.split('_')
return components[0] + ''.join(x.capitalize() for x in components[1:])
def generate_endpoint_contract(endpoint: Dict, types: Dict[str, Dict], models: Dict[str, Dict]) -> Dict:
"""Generate endpoint contract from design document endpoint."""
# Determine request body type
request_body = None
if endpoint.get('request_body'):
# Generate request type name
parts = endpoint['id'].replace('api_', '').split('_')
type_name = ''.join([p.capitalize() for p in parts]) + 'Request'
request_body = {
'type_id': f"type_{type_name}",
'content_type': 'application/json',
}
# Determine response type
response_type_id = None
is_array = False
responses = endpoint.get('responses', [])
success_response = None
for resp in responses:
if 200 <= resp.get('status', 0) < 300:
success_response = resp
break
if success_response:
# Check if referencing a model
depends_on = endpoint.get('depends_on_models', [])
if depends_on:
model_id = depends_on[0]
if model_id in models:
model_name = models[model_id].get('name', model_id.replace('model_', '').capitalize())
response_type_id = f"type_{model_name}"
# Check if response is array
schema = success_response.get('schema', {})
if schema.get('type') == 'array':
is_array = True
if not response_type_id:
parts = endpoint['id'].replace('api_', '').split('_')
type_name = ''.join([p.capitalize() for p in parts]) + 'Response'
response_type_id = f"type_{type_name}"
# Extract path params
path_params = []
for param in endpoint.get('path_params', []):
path_params.append({
'name': param['name'],
'type': ts_type_from_field(param),
'description': param.get('description', ''),
})
# Extract query params
query_params = []
for param in endpoint.get('query_params', []):
query_params.append({
'name': param['name'],
'type': ts_type_from_field(param),
'required': param.get('required', False),
'default': param.get('default'),
'description': param.get('description', ''),
})
# Build error responses
error_responses = []
for resp in responses:
status = resp.get('status', 0)
if status >= 400:
error_responses.append({
'status': status,
'type_id': 'type_ApiError',
'description': resp.get('description', ''),
})
return {
'id': endpoint['id'],
'method': endpoint['method'],
'path': endpoint['path'],
'path_params': path_params,
'query_params': query_params,
'request_body': request_body,
'response': {
'success': {
'status': success_response.get('status', 200) if success_response else 200,
'type_id': response_type_id,
'is_array': is_array,
},
'errors': error_responses,
},
'auth': endpoint.get('auth', {'required': False, 'roles': []}),
'version': '1.0.0',
}
def generate_frontend_calls(pages: List[Dict], components: List[Dict], endpoints: Dict[str, Dict]) -> List[Dict]:
"""Generate frontend call contracts from pages and components."""
calls = []
# From pages
for page in pages:
for data_need in page.get('data_needs', []):
api_id = data_need.get('api_id')
if api_id and api_id in endpoints:
calls.append({
'id': f"call_{page['id']}_{api_id}",
'source': {
'entity_id': page['id'],
'file_path': f"app{page['path']}/page.tsx",
},
'endpoint_id': api_id,
'purpose': data_need.get('purpose', 'Load data'),
'trigger': 'onLoad' if data_need.get('on_load') else 'onDemand',
'request_mapping': {
'from_props': [],
'from_state': [],
'from_form': [],
},
'response_handling': {
'success_action': 'Update state',
'error_action': 'Show error',
},
})
# From components
for component in components:
for api_id in component.get('uses_apis', []):
if api_id in endpoints:
endpoint = endpoints[api_id]
method = endpoint.get('method', 'GET')
trigger = 'onSubmit' if method in ['POST', 'PUT', 'PATCH'] else 'onDemand'
calls.append({
'id': f"call_{component['id']}_{api_id}",
'source': {
'entity_id': component['id'],
'file_path': f"app/components/{component['name']}.tsx",
},
'endpoint_id': api_id,
'purpose': f"Call {api_id}",
'trigger': trigger,
'request_mapping': {
'from_props': [],
'from_state': [],
'from_form': [],
},
'response_handling': {
'success_action': 'Handle response',
'error_action': 'Show error',
},
})
return calls
def generate_backend_routes(endpoints: List[Dict]) -> List[Dict]:
"""Generate backend route contracts from endpoints."""
routes = []
for endpoint in endpoints:
# Determine file path from endpoint path
api_path = endpoint['path'].replace('/api/', '')
# Handle dynamic segments like /users/:id
parts = api_path.split('/')
file_parts = []
for part in parts:
if part.startswith(':'):
file_parts.append(f"[{part[1:]}]")
else:
file_parts.append(part)
file_path = f"app/api/{'/'.join(file_parts)}/route.ts"
routes.append({
'id': f"route_{endpoint['method'].lower()}_{api_path.replace('/', '_')}",
'endpoint_id': endpoint['id'],
'file_path': file_path,
'export_name': endpoint['method'],
'uses_models': endpoint.get('depends_on_models', []),
'uses_services': [],
'must_validate': [],
'must_authenticate': endpoint.get('auth', {}).get('required', False),
'must_authorize': endpoint.get('auth', {}).get('roles', []),
})
return routes
def generate_typescript_types(types: List[Dict]) -> str:
"""Generate TypeScript type definitions."""
lines = [
"// AUTO-GENERATED - DO NOT EDIT",
"// Source: .workflow/versions/vXXX/contracts/api_contract.yml",
f"// Generated: {datetime.now().isoformat()}",
"",
"// ============================================================================",
"// Shared API Types",
"// Both frontend and backend MUST import from this file",
"// ============================================================================",
"",
]
# Add standard error types
lines.extend([
"// === Error Types ===",
"",
"export interface ApiError {",
" error: string;",
" message?: string;",
" code?: string;",
"}",
"",
"export interface ValidationError {",
" error: string;",
" details: string[];",
"}",
"",
])
# Generate interfaces for each type
lines.append("// === Domain Types ===")
lines.append("")
for type_def in types:
name = type_def['name']
definition = type_def['definition']
if definition['type'] == 'object':
lines.append(f"export interface {name} {{")
for prop in definition.get('properties', []):
optional = '' if prop.get('required') else '?'
desc = prop.get('description', '')
if desc:
lines.append(f" /** {desc} */")
lines.append(f" {prop['name']}{optional}: {prop['type']};")
lines.append("}")
lines.append("")
elif definition['type'] == 'enum':
values = definition.get('enum_values', [])
quoted_values = [f"'{v}'" for v in values]
lines.append(f"export type {name} = {' | '.join(quoted_values)};")
lines.append("")
elif definition['type'] == 'union':
members = definition.get('union_members', [])
lines.append(f"export type {name} = {' | '.join(members)};")
lines.append("")
return '\n'.join(lines)
def generate_api_paths(endpoints: List[Dict]) -> str:
"""Generate API path constants for type-safe calls."""
lines = [
"",
"// === API Paths ===",
"",
"export const API_PATHS = {",
]
for endpoint in endpoints:
# Generate constant name: api_get_users -> GET_USERS
const_name = endpoint['id'].replace('api_', '').upper()
lines.append(f" {const_name}: '{endpoint['path']}' as const,")
lines.append("} as const;")
lines.append("")
return '\n'.join(lines)
def main():
"""Main entry point."""
if len(sys.argv) < 2:
print("Usage: generate_api_contract.py <design_document.yml> [--output-dir <dir>]", file=sys.stderr)
sys.exit(1)
design_doc_path = Path(sys.argv[1])
# Parse output directory
output_dir = design_doc_path.parent.parent # .workflow/versions/vXXX/
if '--output-dir' in sys.argv:
idx = sys.argv.index('--output-dir')
output_dir = Path(sys.argv[idx + 1])
if not design_doc_path.exists():
print(f"Error: Design document not found: {design_doc_path}", file=sys.stderr)
sys.exit(1)
# Load design document
design_doc = load_yaml(design_doc_path)
if not design_doc:
print("Error: Failed to load design document", file=sys.stderr)
sys.exit(1)
# Extract entities
models = {m['id']: m for m in design_doc.get('data_models', [])}
endpoints = design_doc.get('api_endpoints', [])
pages = design_doc.get('pages', [])
components = design_doc.get('components', [])
workflow_version = design_doc.get('workflow_version', 'unknown')
revision = design_doc.get('revision', 1)
# Generate types from models
types = []
for model in models.values():
type_def = generate_type_from_model(model)
types.append(type_def)
# Generate request/response types from endpoints
endpoints_dict = {e['id']: e for e in endpoints}
for endpoint in endpoints:
req_type = generate_request_type(endpoint)
if req_type:
types.append(req_type)
resp_type = generate_response_type(endpoint, models)
if resp_type:
types.append(resp_type)
# Generate types dictionary for lookup
types_dict = {t['id']: t for t in types}
# Generate endpoint contracts
endpoint_contracts = []
for endpoint in endpoints:
contract = generate_endpoint_contract(endpoint, types_dict, models)
endpoint_contracts.append(contract)
# Generate frontend calls
frontend_calls = generate_frontend_calls(pages, components, endpoints_dict)
# Generate backend routes
backend_routes = generate_backend_routes(endpoints)
# Build API contract
api_contract = {
'api_contract': {
'workflow_version': workflow_version,
'design_document_revision': revision,
'generated_at': datetime.now().isoformat(),
'validated_at': None,
'status': 'draft',
},
'types': types,
'endpoints': endpoint_contracts,
'frontend_calls': frontend_calls,
'backend_routes': backend_routes,
}
# Create output directories
contracts_dir = output_dir / 'contracts'
contracts_dir.mkdir(parents=True, exist_ok=True)
# Save API contract
contract_path = contracts_dir / 'api_contract.yml'
save_yaml(api_contract, contract_path)
print(f"Generated: {contract_path}")
# Generate TypeScript types
ts_types = generate_typescript_types(types)
ts_paths = generate_api_paths(endpoint_contracts)
# Find project root (look for package.json)
project_root = output_dir
while project_root != project_root.parent:
if (project_root / 'package.json').exists():
break
project_root = project_root.parent
if not (project_root / 'package.json').exists():
project_root = output_dir.parent.parent.parent # Assume .workflow is in project root
# Create types directory and file
types_dir = project_root / 'app' / 'types'
types_dir.mkdir(parents=True, exist_ok=True)
types_file = types_dir / 'api.ts'
types_file.write_text(ts_types + ts_paths)
print(f"Generated: {types_file}")
# Summary
print("\n=== API CONTRACT GENERATED ===")
print(f"Types: {len(types)}")
print(f"Endpoints: {len(endpoint_contracts)}")
print(f"Frontend calls: {len(frontend_calls)}")
print(f"Backend routes: {len(backend_routes)}")
print(f"\nContract file: {contract_path}")
print(f"Types file: {types_file}")
print("\nBoth agents MUST import from app/types/api.ts")
if __name__ == '__main__':
main()