#!/usr/bin/env python3 """ Phase Gate Enforcement System Provides strict enforcement of workflow phases with: - Blocking checkpoints that MUST pass before proceeding - Script-based validation gates - State persistence that survives interruptions - Fix loops that return to IMPLEMENTING when issues found Exit codes: 0 = PASS - operation allowed 1 = BLOCKED - conditions not met (with detailed reason) 2 = ERROR - system error Usage: phase_gate.py can-enter PHASE # Check if ready to enter phase phase_gate.py complete PHASE # Mark phase as complete (validates all checkpoints) phase_gate.py checkpoint NAME # Save a checkpoint phase_gate.py verify PHASE # Verify all checkpoints for phase phase_gate.py blockers # Show current blocking conditions phase_gate.py fix-loop PHASE # Handle fix loop logic phase_gate.py status # Show full gate state """ import argparse import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Optional try: import yaml HAS_YAML = True except ImportError: HAS_YAML = False # ============================================================================ # Configuration # ============================================================================ PHASES_ORDER = [ 'INITIALIZING', 'DESIGNING', 'AWAITING_DESIGN_APPROVAL', 'IMPLEMENTING', 'REVIEWING', 'SECURITY_REVIEW', 'AWAITING_IMPL_APPROVAL', 'COMPLETING', 'COMPLETED' ] # Phases that can trigger a fix loop back to IMPLEMENTING FIX_LOOP_PHASES = ['REVIEWING', 'SECURITY_REVIEW'] # Phase entry requirements - what MUST be true to enter ENTRY_REQUIREMENTS = { 'INITIALIZING': [], 'DESIGNING': [ {'type': 'phase_completed', 'phase': 'INITIALIZING'}, ], 'AWAITING_DESIGN_APPROVAL': [ {'type': 'phase_completed', 'phase': 'DESIGNING'}, {'type': 'file_exists', 'path': '.workflow/versions/{version}/design/design_document.yml'}, {'type': 'min_file_count', 'pattern': '.workflow/versions/{version}/tasks/*.yml', 'minimum': 1}, ], 'IMPLEMENTING': [ {'type': 'phase_completed', 'phase': 'AWAITING_DESIGN_APPROVAL'}, {'type': 'approval_granted', 'gate': 'design'}, ], 'REVIEWING': [ {'type': 'phase_completed', 'phase': 'IMPLEMENTING'}, {'type': 'script_passes', 'script': 'npm run build', 'name': 'build'}, {'type': 'script_passes', 'script': 'npx tsc --noEmit', 'name': 'type-check'}, {'type': 'script_passes', 'script': 'npm run lint', 'name': 'lint'}, ], 'SECURITY_REVIEW': [ {'type': 'phase_completed', 'phase': 'REVIEWING'}, {'type': 'checkpoint_passed', 'checkpoint': 'review_passed'}, ], 'AWAITING_IMPL_APPROVAL': [ {'type': 'phase_completed', 'phase': 'SECURITY_REVIEW'}, {'type': 'checkpoint_passed', 'checkpoint': 'security_passed'}, ], 'COMPLETING': [ {'type': 'phase_completed', 'phase': 'AWAITING_IMPL_APPROVAL'}, {'type': 'approval_granted', 'gate': 'implementation'}, ], 'COMPLETED': [ {'type': 'phase_completed', 'phase': 'COMPLETING'}, ], } # Phase checkpoints - what MUST be completed within each phase PHASE_CHECKPOINTS = { 'INITIALIZING': [ 'manifest_exists', 'version_created', ], 'DESIGNING': [ 'design_document_created', 'design_validated', 'tasks_generated', ], 'AWAITING_DESIGN_APPROVAL': [ 'design_approved', ], 'IMPLEMENTING': [ 'all_layers_complete', 'build_passes', 'type_check_passes', 'lint_passes', ], 'REVIEWING': [ 'review_script_run', 'all_files_verified', 'code_review_passed', # Code review agent must pass 'review_passed', # Critical - umbrella checkpoint, must pass or trigger fix loop ], 'SECURITY_REVIEW': [ 'security_scan_run', 'api_contract_validated', 'security_passed', # Critical - must pass or trigger fix loop ], 'AWAITING_IMPL_APPROVAL': [ 'implementation_approved', ], 'COMPLETING': [ 'tasks_marked_complete', 'version_finalized', ], } # ============================================================================ # YAML Helpers # ============================================================================ def load_yaml(filepath: str) -> dict: """Load YAML file (or JSON fallback).""" if not os.path.exists(filepath): return {} with open(filepath, 'r') as f: content = f.read() if not content.strip(): return {} if HAS_YAML: return yaml.safe_load(content) or {} # Fallback: try JSON parsing (many .yml files are actually JSON) try: return json.loads(content) or {} except json.JSONDecodeError: return {} def save_yaml(filepath: str, data: dict): """Save data to YAML file.""" os.makedirs(os.path.dirname(filepath), exist_ok=True) if HAS_YAML: with open(filepath, 'w') as f: yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) else: with open(filepath, 'w') as f: json.dump(data, f, indent=2) # ============================================================================ # State Management # ============================================================================ def get_workflow_dir() -> Path: return Path('.workflow') def get_active_version() -> Optional[str]: """Get active workflow version.""" current_path = get_workflow_dir() / 'current.yml' if not current_path.exists(): return None current = load_yaml(str(current_path)) return current.get('active_version') def get_gate_state_path(version: str) -> Path: """Get path to gate state file.""" return get_workflow_dir() / 'versions' / version / 'gate_state.yml' def get_session_path(version: str) -> Path: """Get path to session file.""" return get_workflow_dir() / 'versions' / version / 'session.yml' def load_gate_state(version: str) -> dict: """Load gate state for version.""" path = get_gate_state_path(version) if path.exists(): return load_yaml(str(path)) # Initialize new gate state return { 'version': version, 'current_phase': 'INITIALIZING', 'created_at': datetime.now().isoformat(), 'updated_at': datetime.now().isoformat(), 'phases': {phase: { 'status': 'not_started', 'entered_at': None, 'completed_at': None, 'checkpoints': {cp: {'status': 'pending', 'at': None, 'data': {}} for cp in PHASE_CHECKPOINTS.get(phase, [])} } for phase in PHASES_ORDER}, 'fix_loops': [], # Track fix loop iterations } def save_gate_state(version: str, state: dict): """Save gate state.""" state['updated_at'] = datetime.now().isoformat() save_yaml(str(get_gate_state_path(version)), state) def get_current_phase(version: str) -> str: """Get current phase from session.""" session_path = get_session_path(version) if not session_path.exists(): return 'INITIALIZING' session = load_yaml(str(session_path)) return session.get('current_phase', 'INITIALIZING') # ============================================================================ # Validation Functions # ============================================================================ def check_file_exists(path: str, version: str) -> tuple[bool, str]: """Check if file exists.""" resolved_path = path.format(version=version) if os.path.exists(resolved_path): return True, f"File exists: {resolved_path}" return False, f"File missing: {resolved_path}" def check_min_file_count(pattern: str, minimum: int, version: str) -> tuple[bool, str]: """Check minimum file count matching pattern.""" import glob resolved_pattern = pattern.format(version=version) files = glob.glob(resolved_pattern) count = len(files) if count >= minimum: return True, f"Found {count} files (minimum: {minimum})" return False, f"Found only {count} files, need at least {minimum}" def check_phase_completed(phase: str, version: str) -> tuple[bool, str]: """Check if a phase has been completed.""" state = load_gate_state(version) phase_state = state['phases'].get(phase, {}) if phase_state.get('status') == 'completed': return True, f"Phase {phase} is completed" return False, f"Phase {phase} not completed (status: {phase_state.get('status', 'unknown')})" def check_approval_granted(gate: str, version: str) -> tuple[bool, str]: """Check if approval gate has been granted.""" session_path = get_session_path(version) if not session_path.exists(): return False, f"No session file found" session = load_yaml(str(session_path)) approvals = session.get('approvals', {}) gate_approval = approvals.get(gate, {}) if gate_approval.get('status') == 'approved': return True, f"{gate} approval granted" return False, f"{gate} approval not granted (status: {gate_approval.get('status', 'pending')})" def check_checkpoint_passed(checkpoint: str, version: str) -> tuple[bool, str]: """Check if a specific checkpoint has passed.""" state = load_gate_state(version) # Search all phases for this checkpoint for phase, phase_data in state['phases'].items(): checkpoints = phase_data.get('checkpoints', {}) if checkpoint in checkpoints: cp_state = checkpoints[checkpoint] if cp_state.get('status') == 'passed': return True, f"Checkpoint {checkpoint} passed" return False, f"Checkpoint {checkpoint} not passed (status: {cp_state.get('status', 'pending')})" return False, f"Checkpoint {checkpoint} not found" def check_script_passes(script: str, name: str = None) -> tuple[bool, str]: """Run a script and check if it passes (exit code 0).""" script_name = name or script.split()[0] try: result = subprocess.run( script, shell=True, capture_output=True, text=True, timeout=300 # 5 minute timeout ) if result.returncode == 0: return True, f"Script '{script_name}' passed" return False, f"Script '{script_name}' failed (exit code: {result.returncode})\n{result.stderr[:500]}" except subprocess.TimeoutExpired: return False, f"Script '{script_name}' timed out" except Exception as e: return False, f"Script '{script_name}' error: {str(e)}" def validate_requirement(req: dict, version: str) -> tuple[bool, str]: """Validate a single requirement.""" req_type = req.get('type') if req_type == 'file_exists': return check_file_exists(req['path'], version) elif req_type == 'min_file_count': return check_min_file_count(req['pattern'], req['minimum'], version) elif req_type == 'phase_completed': return check_phase_completed(req['phase'], version) elif req_type == 'approval_granted': return check_approval_granted(req['gate'], version) elif req_type == 'checkpoint_passed': return check_checkpoint_passed(req['checkpoint'], version) elif req_type == 'script_passes': return check_script_passes(req['script'], req.get('name')) else: return False, f"Unknown requirement type: {req_type}" # ============================================================================ # Gate Operations # ============================================================================ def can_enter_phase(phase: str, version: str) -> tuple[bool, list[str]]: """Check if all entry requirements are met for a phase.""" requirements = ENTRY_REQUIREMENTS.get(phase, []) failures = [] for req in requirements: passed, message = validate_requirement(req, version) if not passed: failures.append(message) return len(failures) == 0, failures def save_checkpoint(phase: str, checkpoint: str, status: str, version: str, data: dict = None): """Save a checkpoint status.""" state = load_gate_state(version) if phase not in state['phases']: state['phases'][phase] = { 'status': 'in_progress', 'entered_at': datetime.now().isoformat(), 'completed_at': None, 'checkpoints': {} } state['phases'][phase]['checkpoints'][checkpoint] = { 'status': status, 'at': datetime.now().isoformat(), 'data': data or {} } save_gate_state(version, state) return True def verify_phase_checkpoints(phase: str, version: str) -> tuple[bool, list[str]]: """Verify all checkpoints for a phase are passed.""" state = load_gate_state(version) required_checkpoints = PHASE_CHECKPOINTS.get(phase, []) phase_state = state['phases'].get(phase, {}) checkpoints = phase_state.get('checkpoints', {}) failures = [] for cp in required_checkpoints: cp_state = checkpoints.get(cp, {}) if cp_state.get('status') != 'passed': failures.append(f"Checkpoint '{cp}' not passed (status: {cp_state.get('status', 'missing')})") return len(failures) == 0, failures def complete_phase(phase: str, version: str) -> tuple[bool, list[str]]: """Mark a phase as complete after verifying all checkpoints.""" # First verify all checkpoints passed, failures = verify_phase_checkpoints(phase, version) if not passed: return False, failures # Update state state = load_gate_state(version) state['phases'][phase]['status'] = 'completed' state['phases'][phase]['completed_at'] = datetime.now().isoformat() save_gate_state(version, state) return True, [] def enter_phase(phase: str, version: str) -> tuple[bool, list[str]]: """Enter a new phase after verifying entry requirements.""" # Check entry requirements can_enter, failures = can_enter_phase(phase, version) if not can_enter: return False, failures # Update state state = load_gate_state(version) state['current_phase'] = phase state['phases'][phase]['status'] = 'in_progress' state['phases'][phase]['entered_at'] = datetime.now().isoformat() save_gate_state(version, state) return True, [] def handle_fix_loop(phase: str, version: str, issues: list[str]) -> dict: """Handle fix loop - return to IMPLEMENTING to fix issues.""" state = load_gate_state(version) # Record fix loop fix_loop_entry = { 'from_phase': phase, 'issues': issues, 'timestamp': datetime.now().isoformat(), 'iteration': len([fl for fl in state.get('fix_loops', []) if fl['from_phase'] == phase]) + 1 } if 'fix_loops' not in state: state['fix_loops'] = [] state['fix_loops'].append(fix_loop_entry) # Reset phase states for re-run state['phases'][phase]['status'] = 'needs_fix' state['phases'][phase]['completed_at'] = None # Reset checkpoints that need re-verification if phase == 'REVIEWING': state['phases'][phase]['checkpoints']['review_passed'] = {'status': 'pending', 'at': None, 'data': {}} elif phase == 'SECURITY_REVIEW': state['phases'][phase]['checkpoints']['security_passed'] = {'status': 'pending', 'at': None, 'data': {}} # Set IMPLEMENTING as needing re-entry state['phases']['IMPLEMENTING']['status'] = 'needs_reentry' state['current_phase'] = 'IMPLEMENTING' save_gate_state(version, state) return { 'action': 'FIX_REQUIRED', 'return_to': 'IMPLEMENTING', 'issues': issues, 'iteration': fix_loop_entry['iteration'] } def get_blockers(version: str) -> list[dict]: """Get all current blocking conditions.""" state = load_gate_state(version) current_phase = state.get('current_phase', 'INITIALIZING') blockers = [] # Check current phase checkpoints phase_state = state['phases'].get(current_phase, {}) checkpoints = phase_state.get('checkpoints', {}) for cp_name, cp_state in checkpoints.items(): if cp_state.get('status') != 'passed': blockers.append({ 'type': 'checkpoint', 'phase': current_phase, 'name': cp_name, 'status': cp_state.get('status', 'pending') }) # Check if in fix loop fix_loops = state.get('fix_loops', []) if fix_loops: latest = fix_loops[-1] if state['phases'].get(latest['from_phase'], {}).get('status') == 'needs_fix': blockers.append({ 'type': 'fix_loop', 'phase': latest['from_phase'], 'issues': latest['issues'], 'iteration': latest['iteration'] }) return blockers def show_status(version: str): """Display full gate state status.""" state = load_gate_state(version) print() print("=" * 70) print(" PHASE GATE STATUS".center(70)) print("=" * 70) print(f" Version: {version}") print(f" Current Phase: {state.get('current_phase', 'UNKNOWN')}") print(f" Updated: {state.get('updated_at', 'N/A')}") print("=" * 70) for phase in PHASES_ORDER: phase_state = state['phases'].get(phase, {}) status = phase_state.get('status', 'not_started') # Status icon if status == 'completed': icon = "✅" elif status == 'in_progress': icon = "🔄" elif status == 'needs_fix': icon = "🔧" elif status == 'needs_reentry': icon = "â†Šī¸ " else: icon = "âŗ" print(f"\n {icon} {phase}: {status}") # Show checkpoints checkpoints = phase_state.get('checkpoints', {}) if checkpoints: for cp_name, cp_state in checkpoints.items(): cp_status = cp_state.get('status', 'pending') cp_icon = "✓" if cp_status == 'passed' else "○" if cp_status == 'pending' else "✗" print(f" {cp_icon} {cp_name}: {cp_status}") # Show fix loops fix_loops = state.get('fix_loops', []) if fix_loops: print("\n" + "-" * 70) print(" FIX LOOP HISTORY") print("-" * 70) for fl in fix_loops[-5:]: # Show last 5 print(f" [{fl['timestamp'][:19]}] {fl['from_phase']} → IMPLEMENTING (iteration {fl['iteration']})") for issue in fl['issues'][:3]: print(f" - {issue[:60]}") print("\n" + "=" * 70) # ============================================================================ # CLI Interface # ============================================================================ def main(): parser = argparse.ArgumentParser(description="Phase gate enforcement system") subparsers = parser.add_subparsers(dest='command', help='Commands') # can-enter command enter_parser = subparsers.add_parser('can-enter', help='Check if can enter a phase') enter_parser.add_argument('phase', choices=PHASES_ORDER, help='Target phase') # complete command complete_parser = subparsers.add_parser('complete', help='Complete a phase') complete_parser.add_argument('phase', choices=PHASES_ORDER, help='Phase to complete') # checkpoint command cp_parser = subparsers.add_parser('checkpoint', help='Save a checkpoint') cp_parser.add_argument('name', help='Checkpoint name') cp_parser.add_argument('--phase', required=True, help='Phase for checkpoint') cp_parser.add_argument('--status', default='passed', choices=['passed', 'failed', 'pending']) cp_parser.add_argument('--data', help='JSON data to store') # verify command verify_parser = subparsers.add_parser('verify', help='Verify phase checkpoints') verify_parser.add_argument('phase', choices=PHASES_ORDER, help='Phase to verify') # blockers command subparsers.add_parser('blockers', help='Show current blockers') # fix-loop command fix_parser = subparsers.add_parser('fix-loop', help='Trigger fix loop') fix_parser.add_argument('phase', choices=FIX_LOOP_PHASES, help='Phase triggering fix') fix_parser.add_argument('--issues', nargs='+', help='Issues that need fixing') # status command subparsers.add_parser('status', help='Show full gate state') # enter command (actually enter a phase) do_enter_parser = subparsers.add_parser('enter', help='Enter a phase') do_enter_parser.add_argument('phase', choices=PHASES_ORDER, help='Phase to enter') args = parser.parse_args() # Get active version version = get_active_version() if not version and args.command not in ['status', 'blockers']: print("Error: No active workflow version") print("Run /workflow:spawn to start a new workflow") sys.exit(2) if args.command == 'can-enter': can_enter, failures = can_enter_phase(args.phase, version) if can_enter: print(f"✅ CAN ENTER: {args.phase}") print("All entry requirements met") sys.exit(0) else: print(f"❌ BLOCKED: Cannot enter {args.phase}") print("\nBlocking conditions:") for f in failures: print(f" - {f}") sys.exit(1) elif args.command == 'enter': success, failures = enter_phase(args.phase, version) if success: print(f"✅ ENTERED: {args.phase}") sys.exit(0) else: print(f"❌ BLOCKED: Cannot enter {args.phase}") for f in failures: print(f" - {f}") sys.exit(1) elif args.command == 'complete': success, failures = complete_phase(args.phase, version) if success: print(f"✅ COMPLETED: {args.phase}") sys.exit(0) else: print(f"❌ BLOCKED: Cannot complete {args.phase}") print("\nMissing checkpoints:") for f in failures: print(f" - {f}") sys.exit(1) elif args.command == 'checkpoint': data = None if args.data: try: data = json.loads(args.data) except json.JSONDecodeError: data = {'raw': args.data} save_checkpoint(args.phase, args.name, args.status, version, data) icon = "✅" if args.status == 'passed' else "❌" if args.status == 'failed' else "âŗ" print(f"{icon} Checkpoint saved: {args.name} = {args.status}") sys.exit(0) elif args.command == 'verify': passed, failures = verify_phase_checkpoints(args.phase, version) if passed: print(f"✅ VERIFIED: All checkpoints for {args.phase} passed") sys.exit(0) else: print(f"❌ FAILED: {args.phase} has incomplete checkpoints") for f in failures: print(f" - {f}") sys.exit(1) elif args.command == 'blockers': if not version: print("No active workflow") sys.exit(0) blockers = get_blockers(version) if not blockers: print("✅ No blockers - workflow can proceed") sys.exit(0) print("❌ CURRENT BLOCKERS:") print("-" * 50) for b in blockers: if b['type'] == 'checkpoint': print(f" [{b['phase']}] Checkpoint '{b['name']}': {b['status']}") elif b['type'] == 'fix_loop': print(f" [FIX REQUIRED] From {b['phase']} (iteration {b['iteration']})") for issue in b.get('issues', [])[:3]: print(f" - {issue[:60]}") sys.exit(1) elif args.command == 'fix-loop': issues = args.issues or ['Unspecified issues found'] result = handle_fix_loop(args.phase, version, issues) print(f"🔧 FIX LOOP TRIGGERED") print(f" From: {args.phase}") print(f" Return to: {result['return_to']}") print(f" Iteration: {result['iteration']}") print(f"\n Issues to fix:") for issue in result['issues']: print(f" - {issue}") print(f"\n 👉 Run /workflow:resume to continue fixing") sys.exit(1) # Exit 1 to indicate fix needed elif args.command == 'status': if version: show_status(version) else: print("No active workflow") sys.exit(0) else: parser.print_help() sys.exit(0) if __name__ == "__main__": main()