project-standalo-sonic-cloud/skills/guardrail-orchestrator/scripts/validate_workflow.py

479 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Workflow enforcement hook for Claude Code.
Validates that operations comply with current workflow phase.
When blocked, instructs AI to run /workflow:spawn to start a proper workflow.
Exit codes:
0 = Operation allowed
1 = Operation blocked (with message)
"""
import argparse
import json
import os
import sys
from pathlib import Path
# Try to import yaml
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
def load_yaml(filepath: str) -> dict:
"""Load YAML file."""
if not os.path.exists(filepath):
return {}
with open(filepath, 'r') as f:
content = f.read()
if not content.strip():
return {}
if HAS_YAML:
return yaml.safe_load(content) or {}
# Simple fallback parser
result = {}
current_list = None
for line in content.split('\n'):
stripped = line.strip()
if not stripped or stripped.startswith('#'):
continue
# Handle list items
if stripped.startswith('- '):
if current_list is not None:
value = stripped[2:].strip()
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
current_list.append(value)
continue
if ':' in stripped:
key, _, value = stripped.partition(':')
key = key.strip()
value = value.strip()
if value == '' or value == '[]':
result[key] = []
current_list = result[key]
elif value == 'null' or value == '~':
result[key] = None
current_list = None
elif value == 'true':
result[key] = True
current_list = None
elif value == 'false':
result[key] = False
current_list = None
elif value.isdigit():
result[key] = int(value)
current_list = None
else:
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
result[key] = value
current_list = None
return result
def get_current_phase() -> str:
"""Get current workflow phase from version session."""
workflow_dir = Path('.workflow')
current_path = workflow_dir / 'current.yml'
if not current_path.exists():
return 'NO_WORKFLOW'
current = load_yaml(str(current_path))
active_version = current.get('active_version')
if not active_version:
return 'NO_WORKFLOW'
session_path = workflow_dir / 'versions' / active_version / 'session.yml'
if not session_path.exists():
return 'NO_WORKFLOW'
session = load_yaml(str(session_path))
return session.get('current_phase', 'UNKNOWN')
def get_active_version() -> str:
"""Get active workflow version."""
workflow_dir = Path('.workflow')
current_path = workflow_dir / 'current.yml'
if not current_path.exists():
return None
current = load_yaml(str(current_path))
return current.get('active_version')
def get_workflow_feature() -> str:
"""Get the feature name from current workflow."""
workflow_dir = Path('.workflow')
current_path = workflow_dir / 'current.yml'
if not current_path.exists():
return None
current = load_yaml(str(current_path))
active_version = current.get('active_version')
if not active_version:
return None
session_path = workflow_dir / 'versions' / active_version / 'session.yml'
if not session_path.exists():
return None
session = load_yaml(str(session_path))
return session.get('feature', 'unknown feature')
def count_task_files(version: str) -> int:
"""Count task files in version directory."""
tasks_dir = Path('.workflow') / 'versions' / version / 'tasks'
if not tasks_dir.exists():
return 0
return len(list(tasks_dir.glob('task_*.yml')))
def extract_feature_from_file(file_path: str) -> str:
"""Extract a feature description from the file path."""
# Convert path to a human-readable feature description
parts = Path(file_path).parts
# Remove common prefixes
skip = {'src', 'app', 'lib', 'components', 'pages', 'api', 'utils', 'hooks'}
meaningful = [p for p in parts if p not in skip and not p.startswith('.')]
if meaningful:
# Get the file name without extension
name = Path(file_path).stem
return f"update {name}"
return f"modify {file_path}"
def validate_task_spawn(tool_input: dict) -> tuple[bool, str]:
"""
Validate Task tool spawning for workflow compliance.
"""
phase = get_current_phase()
prompt = tool_input.get('prompt', '')
subagent_type = tool_input.get('subagent_type', '')
agent_type = subagent_type.lower()
# Check architect agent
if 'system-architect' in agent_type or 'ARCHITECT AGENT' in prompt.upper():
if phase not in ['DESIGNING', 'NO_WORKFLOW']:
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot spawn Architect agent
Current Phase: {phase}
Required Phase: DESIGNING
The Architect agent can only be spawned during the DESIGNING phase.
👉 REQUIRED ACTION: Run /workflow:status to check current state.
"""
# Check frontend agent
if 'frontend' in agent_type or 'FRONTEND AGENT' in prompt.upper():
if phase not in ['IMPLEMENTING', 'IMPL_REJECTED']:
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot spawn Frontend agent
Current Phase: {phase}
Required Phase: IMPLEMENTING
👉 REQUIRED ACTION: Complete the design phase first, then run /workflow:approve
"""
version = get_active_version()
if version and count_task_files(version) == 0:
return False, f"""
⛔ WORKFLOW VIOLATION: No task files found
Cannot start implementation without design tasks.
👉 REQUIRED ACTION: Ensure Architect agent created task files in:
.workflow/versions/{version}/tasks/
"""
# Check backend agent
if 'backend' in agent_type or 'BACKEND AGENT' in prompt.upper():
if phase not in ['IMPLEMENTING', 'IMPL_REJECTED']:
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot spawn Backend agent
Current Phase: {phase}
Required Phase: IMPLEMENTING
👉 REQUIRED ACTION: Complete the design phase first, then run /workflow:approve
"""
# Check reviewer agent
if 'quality' in agent_type or 'REVIEWER AGENT' in prompt.upper():
if phase not in ['REVIEWING', 'AWAITING_IMPL_APPROVAL']:
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot spawn Reviewer agent
Current Phase: {phase}
Required Phase: REVIEWING
👉 REQUIRED ACTION: Complete implementation first.
"""
# Check security agent
if 'security' in agent_type or 'SECURITY AGENT' in prompt.upper():
if phase not in ['SECURITY_REVIEW', 'REVIEWING']:
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot spawn Security agent
Current Phase: {phase}
Required Phase: SECURITY_REVIEW
👉 REQUIRED ACTION: Complete code review first, then security review runs.
"""
return True, ""
def validate_write_operation(tool_input: dict) -> tuple[bool, str]:
"""
Validate Write/Edit operations for workflow compliance.
"""
phase = get_current_phase()
file_path = tool_input.get('file_path', tool_input.get('path', ''))
if not file_path:
return True, ""
# Normalize path
try:
abs_file_path = str(Path(file_path).resolve())
project_dir = str(Path.cwd().resolve())
if abs_file_path.startswith(project_dir):
rel_path = abs_file_path[len(project_dir):].lstrip('/')
else:
rel_path = file_path
except:
rel_path = file_path
# Always allow these
always_allowed = [
'project_manifest.json',
'.workflow/',
'skills/',
'.claude/',
'CLAUDE.md',
'package.json',
'package-lock.json',
'docs/', # Documentation generation (/eureka:index, /eureka:landing)
'claudedocs/', # Claude-specific documentation
'public/', # Public assets (landing pages, images)
]
for allowed in always_allowed:
if rel_path.startswith(allowed) or rel_path == allowed.rstrip('/'):
return True, ""
# Extract feature suggestion from file path
suggested_feature = extract_feature_from_file(rel_path)
# NO_WORKFLOW - Must start a workflow first
if phase == 'NO_WORKFLOW':
return False, f"""
⛔ WORKFLOW REQUIRED: No active workflow
You are trying to modify: {rel_path}
This project uses guardrail workflows. You cannot directly edit files.
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Start a workflow first! ║
║ ║
║ Run this command: ║
║ /workflow:spawn {suggested_feature}
║ ║
║ This will: ║
║ 1. Create a design for your changes ║
║ 2. Get approval ║
║ 3. Then allow you to implement ║
╚══════════════════════════════════════════════════════════════════╝
"""
# DESIGNING phase - can't write implementation files
if phase == 'DESIGNING':
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot write implementation files during DESIGNING
Current Phase: DESIGNING
File: {rel_path}
During DESIGNING phase, only these files can be modified:
- project_manifest.json
- .workflow/versions/*/tasks/*.yml
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Complete design and get approval ║
║ ║
║ 1. Finish adding entities to project_manifest.json ║
║ 2. Create task files in .workflow/versions/*/tasks/ ║
║ 3. Run: /workflow:approve ║
╚══════════════════════════════════════════════════════════════════╝
"""
# REVIEWING phase - read only
if phase == 'REVIEWING':
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot write files during REVIEWING
Current Phase: REVIEWING
File: {rel_path}
During REVIEWING phase, files are READ-ONLY.
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Complete the review ║
║ ║
║ If changes are needed: ║
║ - Run: /workflow:reject "reason for changes"
║ - This returns to IMPLEMENTING phase ║
║ ║
║ If review passes: ║
║ - Run: /workflow:approve ║
╚══════════════════════════════════════════════════════════════════╝
"""
# SECURITY_REVIEW phase - read only
if phase == 'SECURITY_REVIEW':
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot write files during SECURITY_REVIEW
Current Phase: SECURITY_REVIEW
File: {rel_path}
During SECURITY_REVIEW phase, files are READ-ONLY.
Security scan is running to check for vulnerabilities.
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Wait for security scan to complete ║
║ ║
║ If security issues found: ║
║ - Workflow returns to IMPLEMENTING phase to fix issues ║
║ ║
║ If security passes: ║
║ - Workflow proceeds to AWAITING_IMPL_APPROVAL ║
║ ║
║ For full audit: /workflow:security --full ║
╚══════════════════════════════════════════════════════════════════╝
"""
# AWAITING approval phases
if phase in ['AWAITING_DESIGN_APPROVAL', 'AWAITING_IMPL_APPROVAL']:
gate_type = "design" if "DESIGN" in phase else "implementation"
return False, f"""
⛔ WORKFLOW VIOLATION: Cannot write files while awaiting approval
Current Phase: {phase}
File: {rel_path}
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Get user approval ║
║ ║
║ Waiting for {gate_type} approval. Ask the user to run: ║
║ - /workflow:approve (to proceed) ║
║ - /workflow:reject (to revise) ║
╚══════════════════════════════════════════════════════════════════╝
"""
# COMPLETED - need new workflow
if phase == 'COMPLETED':
return False, f"""
⛔ WORKFLOW VIOLATION: Workflow already completed
Current Phase: COMPLETED
File: {rel_path}
This workflow version is complete.
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Start a new workflow ║
║ ║
║ Run: /workflow:spawn {suggested_feature}
╚══════════════════════════════════════════════════════════════════╝
"""
return True, ""
def validate_transition(tool_input: dict) -> tuple[bool, str]:
"""Validate phase transitions for proper sequencing."""
return True, ""
def main():
parser = argparse.ArgumentParser(description="Workflow enforcement hook")
parser.add_argument('--operation', required=True,
choices=['task', 'write', 'edit', 'transition', 'build'],
help='Operation type being validated')
parser.add_argument('--input', help='JSON input from tool call')
parser.add_argument('--file', help='File path (for write/edit operations)')
args = parser.parse_args()
# Parse input
tool_input = {}
if args.input:
try:
tool_input = json.loads(args.input)
except json.JSONDecodeError:
tool_input = {'raw': args.input}
if args.file:
tool_input['file_path'] = args.file
# Route to appropriate validator
allowed = True
message = ""
if args.operation == 'task':
allowed, message = validate_task_spawn(tool_input)
elif args.operation in ['write', 'edit']:
allowed, message = validate_write_operation(tool_input)
elif args.operation == 'transition':
allowed, message = validate_transition(tool_input)
elif args.operation == 'build':
phase = get_current_phase()
print(f"BUILD: Running in phase {phase}")
allowed = True
# Output result
if not allowed:
print(message, file=sys.stderr)
sys.exit(1)
else:
phase = get_current_phase()
version = get_active_version() or 'N/A'
print(f"✓ WORKFLOW: {args.operation.upper()} allowed in {phase} (v{version})")
sys.exit(0)
if __name__ == "__main__":
main()