project-standalo-sonic-cloud/skills/guardrail-orchestrator/scripts/validate_write.py

283 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
Pre-write validation hook for guardrail enforcement.
Validates that file writes are allowed based on:
1. Current workflow phase
2. Manifest-defined allowed paths
3. Always-allowed system paths
Exit codes:
0 = Write allowed
1 = Write blocked (with error message)
"""
import argparse
import json
import os
import sys
from pathlib import Path
# Always allowed paths (relative to project root)
ALWAYS_ALLOWED_PATTERNS = [
"project_manifest.json",
".workflow/",
".claude/",
"skills/",
"CLAUDE.md",
"package.json",
"package-lock.json",
"tsconfig.json",
".gitignore",
".env.local",
".env.example",
"docs/", # Documentation generation (/eureka:index, /eureka:landing)
"claudedocs/", # Claude-specific documentation
"public/", # Public assets (landing pages, images)
]
def load_manifest(manifest_path: str) -> dict | None:
"""Load manifest if it exists."""
if not os.path.exists(manifest_path):
return None
try:
with open(manifest_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
def normalize_path(file_path: str, project_dir: str) -> str:
"""Normalize file path to relative path from project root."""
try:
abs_path = Path(file_path).resolve()
proj_path = Path(project_dir).resolve()
# Make relative if under project
if str(abs_path).startswith(str(proj_path)):
return str(abs_path.relative_to(proj_path))
return str(abs_path)
except (ValueError, OSError):
return file_path
def is_always_allowed(rel_path: str) -> bool:
"""Check if path is in always-allowed list."""
for pattern in ALWAYS_ALLOWED_PATTERNS:
if pattern.endswith('/'):
# Directory pattern
if rel_path.startswith(pattern) or rel_path == pattern.rstrip('/'):
return True
else:
# Exact file match
if rel_path == pattern:
return True
return False
def get_allowed_paths_from_manifest(manifest: dict) -> set:
"""Extract all allowed file paths from manifest entities."""
allowed = set()
entities = manifest.get("entities", {})
entity_types = ["pages", "components", "api_endpoints", "database_tables", "services", "utils", "hooks", "types"]
for entity_type in entity_types:
for entity in entities.get(entity_type, []):
status = entity.get("status", "")
# Allow APPROVED, IMPLEMENTED, or PENDING (for design phase updates)
if status in ["APPROVED", "IMPLEMENTED", "PENDING", "IN_PROGRESS"]:
if "file_path" in entity:
allowed.add(entity["file_path"])
# Also check for multiple file paths
if "file_paths" in entity:
for fp in entity.get("file_paths", []):
allowed.add(fp)
return allowed
def get_allowed_paths_from_tasks(project_dir: str) -> set:
"""Extract allowed file paths from task files in active workflow version."""
allowed = set()
# Try to import yaml
try:
import yaml
has_yaml = True
except ImportError:
has_yaml = False
# Find active version
current_path = Path(project_dir) / ".workflow" / "current.yml"
if not current_path.exists():
return allowed
try:
with open(current_path) as f:
content = f.read()
if has_yaml:
current = yaml.safe_load(content) or {}
else:
# Simple fallback parser
current = {}
for line in content.split('\n'):
if ':' in line and not line.startswith(' '):
key, _, value = line.partition(':')
current[key.strip()] = value.strip()
active_version = current.get('active_version')
if not active_version:
return allowed
# Read task files
tasks_dir = Path(project_dir) / ".workflow" / "versions" / active_version / "tasks"
if not tasks_dir.exists():
return allowed
for task_file in tasks_dir.glob("*.yml"):
try:
with open(task_file) as f:
task_content = f.read()
if has_yaml:
task = yaml.safe_load(task_content) or {}
file_paths = task.get('file_paths', [])
for fp in file_paths:
allowed.add(fp)
else:
# Simple extraction for file_paths
in_file_paths = False
for line in task_content.split('\n'):
if line.strip().startswith('file_paths:'):
in_file_paths = True
continue
if in_file_paths:
if line.strip().startswith('- '):
fp = line.strip()[2:].strip()
allowed.add(fp)
elif not line.startswith(' '):
in_file_paths = False
except (IOError, Exception):
continue
except (IOError, Exception):
pass
return allowed
def validate_write(file_path: str, manifest_path: str) -> tuple[bool, str]:
"""
Validate if a write operation is allowed.
Returns:
(allowed: bool, message: str)
"""
project_dir = os.path.dirname(manifest_path) or os.getcwd()
rel_path = normalize_path(file_path, project_dir)
# Check always-allowed paths first
if is_always_allowed(rel_path):
return True, f"✓ GUARDRAIL: Always-allowed path: {rel_path}"
# Load manifest
manifest = load_manifest(manifest_path)
# If no manifest exists, guardrails not active
if manifest is None:
return True, "✓ GUARDRAIL: No manifest found, allowing write"
# Get current phase
phase = manifest.get("state", {}).get("current_phase", "UNKNOWN")
# Collect all allowed paths
allowed_from_manifest = get_allowed_paths_from_manifest(manifest)
allowed_from_tasks = get_allowed_paths_from_tasks(project_dir)
all_allowed = allowed_from_manifest | allowed_from_tasks
# Check if file is in allowed paths
if rel_path in all_allowed:
return True, f"✓ GUARDRAIL: Allowed in manifest/tasks: {rel_path}"
# Also check with leading ./ removed
clean_path = rel_path.lstrip('./')
if clean_path in all_allowed:
return True, f"✓ GUARDRAIL: Allowed in manifest/tasks: {clean_path}"
# Check if any allowed path matches (handle path variations)
for allowed in all_allowed:
allowed_clean = allowed.lstrip('./')
if clean_path == allowed_clean:
return True, f"✓ GUARDRAIL: Allowed (path match): {rel_path}"
# Extract suggested feature from file path
name = Path(rel_path).stem
suggested_feature = f"update {name}"
# Not allowed - generate helpful error message with actionable instructions
error_msg = f"""
⛔ GUARDRAIL VIOLATION: Unauthorized file write
File: {rel_path}
Phase: {phase}
This file is not in the approved manifest or task files.
Allowed paths from manifest: {len(allowed_from_manifest)}
Allowed paths from tasks: {len(allowed_from_tasks)}
╔══════════════════════════════════════════════════════════════════╗
║ 👉 REQUIRED ACTION: Start a workflow to modify this file ║
║ ║
║ Run this command: ║
║ /workflow:spawn {suggested_feature}
║ ║
║ This will: ║
║ 1. Design what changes are needed ║
║ 2. Add this file to approved paths ║
║ 3. Get approval, then implement ║
╚══════════════════════════════════════════════════════════════════╝
Alternative: If workflow exists, add this file to:
- project_manifest.json (entities.*.file_path)
- .workflow/versions/*/tasks/*.yml (file_paths list)
"""
return False, error_msg
def main():
parser = argparse.ArgumentParser(description="Validate write operation against guardrails")
parser.add_argument("--manifest", required=True, help="Path to project_manifest.json")
parser.add_argument("--file", help="File path being written")
args = parser.parse_args()
# Get file path from argument or environment
file_path = args.file or os.environ.get('TOOL_INPUT_FILE_PATH', '')
if not file_path:
# Try reading from stdin
if not sys.stdin.isatty():
file_path = sys.stdin.read().strip()
if not file_path:
print("✓ GUARDRAIL: No file path provided, allowing (hook misconfiguration?)")
return 0
allowed, message = validate_write(file_path, args.manifest)
if allowed:
print(message)
return 0
else:
print(message, file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())