629 lines
25 KiB
Python
629 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
preflight.py -- Pre-commit quality gate for UltrafastSecp256k1
|
|
|
|
Validates that changes follow the project's non-negotiable rules:
|
|
1. Security invariants: CT files retain all secure_erase/value_barrier calls
|
|
2. Narrative drift: audit docs don't claim CT layers are missing when active
|
|
3. Test coverage gaps: source files with no test coverage
|
|
4. Graph freshness: DB vs filesystem consistency
|
|
5. Doc-code pairing: code changes have matching doc updates
|
|
6. ABI surface check: new/removed ufsecp_* functions detected
|
|
|
|
Usage:
|
|
python3 scripts/preflight.py # full check
|
|
python3 scripts/preflight.py --security # security only
|
|
python3 scripts/preflight.py --drift # narrative drift only
|
|
python3 scripts/preflight.py --coverage # coverage gaps only
|
|
python3 scripts/preflight.py --freshness # graph freshness only
|
|
python3 scripts/preflight.py --claims # graph-backed assurance claim checks
|
|
python3 scripts/preflight.py --ai-review # AI review-event log checks
|
|
python3 scripts/preflight.py --gpu-evidence # GPU backend evidence / publishability checks
|
|
python3 scripts/preflight.py --changed # check git-changed files
|
|
python3 scripts/preflight.py --abi # ABI surface check
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
LIB_ROOT = SCRIPT_DIR.parent
|
|
DB_PATH = LIB_ROOT / ".project_graph.db"
|
|
|
|
# ANSI colors
|
|
RED = '\033[91m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
CYAN = '\033[96m'
|
|
BOLD = '\033[1m'
|
|
RESET = '\033[0m'
|
|
|
|
_VALIDATE_ASSURANCE_CACHE = None
|
|
|
|
def get_conn():
|
|
if not DB_PATH.exists():
|
|
print(f"{RED}ERROR: Graph DB not found at {DB_PATH}{RESET}")
|
|
print(f"Run: python3 {SCRIPT_DIR}/build_project_graph.py --rebuild")
|
|
sys.exit(1)
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def get_validate_assurance_payload():
|
|
"""Run validate_assurance once and cache the parsed JSON payload."""
|
|
global _VALIDATE_ASSURANCE_CACHE
|
|
if _VALIDATE_ASSURANCE_CACHE is not None:
|
|
return _VALIDATE_ASSURANCE_CACHE
|
|
|
|
validator = SCRIPT_DIR / 'validate_assurance.py'
|
|
try:
|
|
result = subprocess.run(
|
|
['python3', str(validator), '--json'],
|
|
capture_output=True, text=True, cwd=str(LIB_ROOT), check=False,
|
|
)
|
|
except Exception as exc:
|
|
_VALIDATE_ASSURANCE_CACHE = {'error': f"could not execute validate_assurance.py: {exc}"}
|
|
return _VALIDATE_ASSURANCE_CACHE
|
|
|
|
if result.returncode not in (0, 1):
|
|
_VALIDATE_ASSURANCE_CACHE = {
|
|
'error': result.stderr.strip() or result.stdout.strip() or 'validate_assurance.py failed',
|
|
}
|
|
return _VALIDATE_ASSURANCE_CACHE
|
|
|
|
try:
|
|
_VALIDATE_ASSURANCE_CACHE = {'payload': json.loads(result.stdout)}
|
|
except json.JSONDecodeError as exc:
|
|
_VALIDATE_ASSURANCE_CACHE = {'error': f'invalid JSON from validate_assurance.py: {exc}'}
|
|
return _VALIDATE_ASSURANCE_CACHE
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. Security Invariant Check
|
|
# ---------------------------------------------------------------------------
|
|
def check_security_invariants():
|
|
"""Verify CT files retain expected security patterns."""
|
|
conn = get_conn()
|
|
issues = []
|
|
|
|
# Get expected patterns from graph
|
|
expected = {}
|
|
rows = conn.execute("""SELECT source_file, pattern, COUNT(*) as cnt
|
|
FROM security_patterns GROUP BY source_file, pattern""").fetchall()
|
|
for r in rows:
|
|
key = (r['source_file'], r['pattern'])
|
|
expected[key] = r['cnt']
|
|
|
|
# Scan actual files
|
|
actual = {}
|
|
patterns_re = {
|
|
'secure_erase': re.compile(r'secure_erase\s*\('),
|
|
'value_barrier': re.compile(r'value_barrier\s*\('),
|
|
'CLASSIFY': re.compile(r'SECP256K1_CLASSIFY\s*\('),
|
|
'DECLASSIFY': re.compile(r'SECP256K1_DECLASSIFY\s*\('),
|
|
}
|
|
|
|
for (src_file, pat_name), exp_cnt in expected.items():
|
|
filepath = LIB_ROOT / src_file
|
|
if not filepath.exists():
|
|
issues.append(f" {RED}MISSING{RESET} {src_file} (expected {exp_cnt} {pat_name})")
|
|
continue
|
|
pat_re = patterns_re.get(pat_name)
|
|
if not pat_re:
|
|
continue
|
|
count = 0
|
|
try:
|
|
with open(filepath, 'r', errors='replace') as f:
|
|
for line in f:
|
|
stripped = line.strip()
|
|
# Skip comment-only lines for erase/barrier patterns,
|
|
# matching build_project_graph.py scanning logic.
|
|
# CLASSIFY/DECLASSIFY are exempt — the graph builder keeps
|
|
# those even in comment lines (macro definition context).
|
|
if pat_name in ('secure_erase', 'value_barrier'):
|
|
if stripped.startswith('//') or stripped.startswith('#include'):
|
|
continue
|
|
if pat_re.search(line):
|
|
count += 1
|
|
except Exception:
|
|
issues.append(f" {RED}UNREADABLE{RESET} {src_file}")
|
|
continue
|
|
actual[(src_file, pat_name)] = count
|
|
if count < exp_cnt:
|
|
issues.append(f" {RED}LOST{RESET} {src_file}: {pat_name} {exp_cnt} -> {count} ({exp_cnt - count} removed)")
|
|
elif count > exp_cnt:
|
|
issues.append(f" {CYAN}NEW{RESET} {src_file}: {pat_name} {exp_cnt} -> {count} (+{count - exp_cnt}, rebuild graph)")
|
|
|
|
conn.close()
|
|
return issues
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1b. Narrative Drift Detection
|
|
# ---------------------------------------------------------------------------
|
|
STALE_PHRASES = [
|
|
# (regex_pattern, description, files_to_check)
|
|
(r'(?i)\bno\s+formal\s+(ct\s+)?verification\b',
|
|
'Claims no formal CT verification -- ct-verif and valgrind-ct are active in CI',
|
|
['docs/AUDIT_READINESS_REPORT_v1.md', 'audit/AUDIT_TEST_PLAN.md']),
|
|
(r'(?i)\btool\s+integration\s+not\s+yet\s+done\b',
|
|
'Claims tool integration not done -- tools are integrated',
|
|
['docs/AUDIT_READINESS_REPORT_v1.md', 'audit/AUDIT_TEST_PLAN.md',
|
|
'docs/TEST_MATRIX.md']),
|
|
(r'(?i)\bno\s+formal\s+verification\s+applied\b',
|
|
'Claims no formal verification applied -- ct-verif is running and blocking',
|
|
['audit/run_full_audit.sh', 'audit/run_full_audit.ps1']),
|
|
(r'(?i)\bno\s+multi-uarch\b',
|
|
'Claims no multi-uarch support -- cross-platform KAT and CI exist',
|
|
['docs/AUDIT_READINESS_REPORT_v1.md']),
|
|
(r'(?i)\bgpu\s+equivalence\s+planned\b',
|
|
'Claims GPU equivalence only planned -- GPU audit runners exist',
|
|
['docs/AUDIT_READINESS_REPORT_v1.md']),
|
|
]
|
|
|
|
# Files that are marked historical are exempt from drift checks
|
|
HISTORICAL_EXEMPT_MARKER = re.compile(
|
|
r'(?i)(historical\s+report|superseded\s+by|snapshot\s+from\s+v\d)',
|
|
)
|
|
|
|
def check_narrative_drift():
|
|
"""Detect stale CT/audit phrases in narrative docs."""
|
|
issues = []
|
|
for pattern_str, description, target_files in STALE_PHRASES:
|
|
pat = re.compile(pattern_str)
|
|
for rel_path in target_files:
|
|
filepath = LIB_ROOT / rel_path
|
|
if not filepath.exists():
|
|
continue
|
|
try:
|
|
with open(filepath, 'r', errors='replace') as f:
|
|
content = f.read()
|
|
except Exception:
|
|
continue
|
|
# Skip files explicitly marked as historical
|
|
if HISTORICAL_EXEMPT_MARKER.search(content[:500]):
|
|
continue
|
|
for i, line in enumerate(content.splitlines(), 1):
|
|
if pat.search(line):
|
|
issues.append(
|
|
f" {YELLOW}DRIFT{RESET} {rel_path}:{i} -- {description}"
|
|
)
|
|
return issues
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Test Coverage Gap Analysis
|
|
# ---------------------------------------------------------------------------
|
|
def check_coverage_gaps():
|
|
"""Find source files with no test coverage."""
|
|
conn = get_conn()
|
|
|
|
# Core source files (cpu_core layer, not headers/tests/tools)
|
|
core_files = conn.execute("""SELECT path FROM source_files
|
|
WHERE layer IN ('fast', 'ct', 'abi')
|
|
AND category = 'cpu_core'
|
|
AND file_type IN ('cpp', 'source')
|
|
ORDER BY path""").fetchall()
|
|
|
|
# Files that have at least one 'covers' edge
|
|
covered = set()
|
|
rows = conn.execute("""SELECT DISTINCT dst_id FROM edges
|
|
WHERE relation='covers' AND dst_type='source_file'""").fetchall()
|
|
for r in rows:
|
|
covered.add(r['dst_id'])
|
|
|
|
gaps = []
|
|
for f in core_files:
|
|
if f['path'] not in covered:
|
|
# Check if it's a significant file (>50 lines)
|
|
info = conn.execute("SELECT lines FROM source_files WHERE path=?",
|
|
(f['path'],)).fetchone()
|
|
if info and info['lines'] > 50:
|
|
gaps.append((f['path'], info['lines']))
|
|
|
|
conn.close()
|
|
return gaps
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Graph Freshness Check
|
|
# ---------------------------------------------------------------------------
|
|
def check_freshness():
|
|
"""Compare graph build time vs file modification times."""
|
|
conn = get_conn()
|
|
stale = []
|
|
|
|
built_str = conn.execute("SELECT value FROM meta WHERE key='built_at'").fetchone()['value']
|
|
built_dt = datetime.fromisoformat(built_str)
|
|
|
|
rows = conn.execute("SELECT path, lines FROM source_files WHERE layer IN ('fast','ct','abi') ORDER BY lines DESC").fetchall()
|
|
for r in rows:
|
|
filepath = LIB_ROOT / r['path']
|
|
if not filepath.exists():
|
|
stale.append(('DELETED', r['path'], 0))
|
|
continue
|
|
mtime = datetime.fromtimestamp(filepath.stat().st_mtime, tz=timezone.utc)
|
|
if mtime > built_dt:
|
|
stale.append(('MODIFIED', r['path'], r['lines']))
|
|
|
|
# Check for new files not in graph
|
|
scan_dirs = ['cpu/src', 'cpu/include', 'include/ufsecp']
|
|
known_paths = {r['path'] for r in rows}
|
|
for scan_dir in scan_dirs:
|
|
dirpath = LIB_ROOT / scan_dir
|
|
if not dirpath.exists():
|
|
continue
|
|
for root, dirs, files in os.walk(dirpath):
|
|
dirs[:] = [d for d in dirs if not d.startswith('.')]
|
|
for fname in files:
|
|
ext = os.path.splitext(fname)[1].lower()
|
|
if ext in ('.cpp', '.hpp', '.h'):
|
|
rel = str(Path(root, fname).relative_to(LIB_ROOT))
|
|
if rel not in known_paths:
|
|
stale.append(('NEW', rel, 0))
|
|
|
|
conn.close()
|
|
return stale, built_str
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Doc-Code Pairing Check (for git-changed files)
|
|
# ---------------------------------------------------------------------------
|
|
DOC_PAIRS = {
|
|
# Public API / C ABI
|
|
'include/ufsecp/ufsecp.h': ['docs/API_REFERENCE.md', 'docs/USER_GUIDE.md'],
|
|
'include/ufsecp/ufsecp_impl.cpp': ['docs/API_REFERENCE.md'],
|
|
# Build system
|
|
'CMakeLists.txt': ['docs/BUILDING.md', 'README.md'],
|
|
# Benchmark
|
|
'cpu/bench/bench_unified.cpp': ['docs/BENCHMARKS.md', 'docs/BENCHMARK_METHODOLOGY.md'],
|
|
# Audit
|
|
'audit/unified_audit_runner.cpp': ['docs/TEST_MATRIX.md', 'docs/AUDIT_GUIDE.md'],
|
|
# Protocol implementations
|
|
'cpu/src/musig2.cpp': ['docs/API_REFERENCE.md'],
|
|
'cpu/src/frost.cpp': ['docs/API_REFERENCE.md'],
|
|
'cpu/src/adaptor.cpp': ['docs/API_REFERENCE.md'],
|
|
'cpu/src/silent_payments.cpp': ['docs/API_REFERENCE.md'],
|
|
'cpu/src/ecies.cpp': ['docs/API_REFERENCE.md'],
|
|
# CT layer
|
|
'cpu/src/ct_sign.cpp': ['docs/CT_VERIFICATION.md', 'docs/SECURITY_CLAIMS.md'],
|
|
'cpu/src/ct_field.cpp': ['docs/CT_VERIFICATION.md'],
|
|
'cpu/src/ct_scalar.cpp': ['docs/CT_VERIFICATION.md'],
|
|
'cpu/src/ct_point.cpp': ['docs/CT_VERIFICATION.md'],
|
|
# GPU backends
|
|
'cuda/secp256k1_cuda.cu': ['docs/COMPATIBILITY.md'],
|
|
'opencl/secp256k1_opencl.cpp': ['docs/COMPATIBILITY.md'],
|
|
'metal/secp256k1_metal.mm': ['docs/COMPATIBILITY.md'],
|
|
# Core headers
|
|
'cpu/include/secp256k1/field.hpp': ['docs/API_REFERENCE.md'],
|
|
'cpu/include/secp256k1/scalar.hpp':['docs/API_REFERENCE.md'],
|
|
'cpu/include/secp256k1/point.hpp': ['docs/API_REFERENCE.md'],
|
|
# Release workflow
|
|
'.github/workflows/release.yml': ['docs/LOCAL_CI.md'],
|
|
'.github/workflows/auditor-prep.yml': ['docs/EXTERNAL_AUDIT_AUTOMATION.md', 'AUDIT_GUIDE.md'],
|
|
'scripts/external_audit_prep.sh': ['docs/EXTERNAL_AUDIT_AUTOMATION.md', 'AUDIT_GUIDE.md'],
|
|
}
|
|
|
|
def check_doc_pairing(changed_files):
|
|
"""Check if code changes have matching doc updates."""
|
|
missing = []
|
|
changed_set = set(changed_files)
|
|
|
|
for code_file, expected_docs in DOC_PAIRS.items():
|
|
if any(code_file in cf for cf in changed_set):
|
|
for doc in expected_docs:
|
|
if not any(doc in cf for cf in changed_set):
|
|
missing.append((code_file, doc))
|
|
|
|
# Check CT layer changes
|
|
# Match only actual CT source files (filename starts with ct_), not paths
|
|
# that happen to contain the substring "ct_" (e.g. "project_graph.py")
|
|
ct_changed = [f for f in changed_files
|
|
if (Path(f).name.startswith('ct_') and Path(f).suffix in ('.cpp', '.hpp', '.h'))
|
|
or '/ct/' in f]
|
|
if ct_changed:
|
|
ct_docs = ['docs/CT_VERIFICATION.md', 'docs/SECURITY_CLAIMS.md']
|
|
for doc in ct_docs:
|
|
if not any(doc in cf for cf in changed_set):
|
|
for ct_f in ct_changed:
|
|
missing.append((ct_f, doc))
|
|
|
|
return missing
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. ABI Surface Check
|
|
# ---------------------------------------------------------------------------
|
|
def check_abi_surface():
|
|
"""Detect new/removed ufsecp_* functions vs graph."""
|
|
conn = get_conn()
|
|
|
|
# Known from graph
|
|
known = set()
|
|
rows = conn.execute("SELECT name FROM c_abi_functions").fetchall()
|
|
for r in rows:
|
|
known.add(r['name'])
|
|
|
|
# Scan actual headers (ufsecp.h + ufsecp_version.h)
|
|
actual = set()
|
|
fn_re = re.compile(r'UFSECP_API\s+.*?(ufsecp_\w+)\s*\(')
|
|
for hdr_name in ('ufsecp.h', 'ufsecp_gpu.h', 'ufsecp_version.h'):
|
|
header = LIB_ROOT / 'include' / 'ufsecp' / hdr_name
|
|
if header.exists():
|
|
with open(header, 'r', errors='replace') as f:
|
|
for line in f:
|
|
m = fn_re.search(line)
|
|
if m:
|
|
actual.add(m.group(1))
|
|
|
|
added = actual - known
|
|
removed = known - actual
|
|
conn.close()
|
|
return added, removed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5b. Graph-Driven Assurance Claim Check
|
|
# ---------------------------------------------------------------------------
|
|
def check_claim_surfaces():
|
|
"""Run validate_assurance and extract graph-backed claim-surface issues."""
|
|
data = get_validate_assurance_payload()
|
|
if 'error' in data:
|
|
return [f" {RED}ERROR{RESET} {data['error']}"]
|
|
|
|
payload = data['payload']
|
|
claim_surface = payload.get('claim_surface', {})
|
|
issues = []
|
|
for label in claim_surface.get('missing_surfaces', []):
|
|
issues.append(f" {RED}STALE{RESET} {label}")
|
|
for label in claim_surface.get('unindexed_surfaces', []):
|
|
issues.append(f" {YELLOW}UNINDEXED{RESET} {label}")
|
|
return issues
|
|
|
|
|
|
def check_ai_review_log():
|
|
"""Run validate_assurance and extract AI review-event log issues."""
|
|
data = get_validate_assurance_payload()
|
|
if 'error' in data:
|
|
return [f" {RED}ERROR{RESET} {data['error']}"]
|
|
|
|
payload = data['payload']
|
|
ai_review = payload.get('ai_review_events', {})
|
|
return [f" {RED}INVALID{RESET} {label}" for label in ai_review.get('invalid_entries', [])]
|
|
|
|
|
|
def check_gpu_backend_evidence():
|
|
"""Run validate_assurance and extract GPU backend evidence issues."""
|
|
data = get_validate_assurance_payload()
|
|
if 'error' in data:
|
|
return [f" {RED}ERROR{RESET} {data['error']}"]
|
|
|
|
payload = data['payload']
|
|
gpu = payload.get('gpu_backend_evidence', {})
|
|
return [f" {RED}INVALID{RESET} {label}" for label in gpu.get('invalid_entries', [])]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. Changed Files Analysis
|
|
# ---------------------------------------------------------------------------
|
|
def get_changed_files():
|
|
"""Get files changed vs HEAD (staged + unstaged)."""
|
|
try:
|
|
result = subprocess.run(
|
|
['git', 'diff', '--name-only', 'HEAD'],
|
|
capture_output=True, text=True, cwd=str(LIB_ROOT)
|
|
)
|
|
files = [f.strip() for f in result.stdout.strip().split('\n') if f.strip()]
|
|
# Also staged
|
|
result2 = subprocess.run(
|
|
['git', 'diff', '--cached', '--name-only'],
|
|
capture_output=True, text=True, cwd=str(LIB_ROOT)
|
|
)
|
|
files2 = [f.strip() for f in result2.stdout.strip().split('\n') if f.strip()]
|
|
return list(set(files + files2))
|
|
except Exception:
|
|
return []
|
|
|
|
def analyze_changed_files(changed):
|
|
"""For changed files, show impact via graph."""
|
|
if not changed:
|
|
return []
|
|
conn = get_conn()
|
|
impacts = []
|
|
for cf in changed:
|
|
row = conn.execute("SELECT * FROM source_files WHERE path LIKE ?",
|
|
(f'%{cf}%',)).fetchone()
|
|
if not row:
|
|
continue
|
|
fpath = row['path']
|
|
# Tests
|
|
tests = conn.execute("""SELECT src_id FROM edges
|
|
WHERE dst_type='source_file' AND dst_id=? AND relation='covers'""",
|
|
(fpath,)).fetchall()
|
|
test_names = [t['src_id'] for t in tests]
|
|
# Security
|
|
sec = conn.execute("SELECT COUNT(*) as cnt FROM security_patterns WHERE source_file=?",
|
|
(fpath,)).fetchone()
|
|
sec_cnt = sec['cnt'] if sec else 0
|
|
# Routing
|
|
fname = Path(fpath).stem
|
|
routing = conn.execute("""SELECT abi_function, layer FROM abi_routing
|
|
WHERE internal_call LIKE ? OR abi_function LIKE ?""",
|
|
(f'%{fname}%', f'%{fname}%')).fetchall()
|
|
rt_list = [(r['abi_function'], r['layer']) for r in routing]
|
|
|
|
impacts.append({
|
|
'file': fpath,
|
|
'layer': row['layer'],
|
|
'lines': row['lines'],
|
|
'tests': test_names,
|
|
'security_patterns': sec_cnt,
|
|
'abi_routing': rt_list,
|
|
})
|
|
conn.close()
|
|
return impacts
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MAIN
|
|
# ---------------------------------------------------------------------------
|
|
def run_all(args):
|
|
mode = args[0] if args else '--all'
|
|
exit_code = 0
|
|
total_issues = 0
|
|
|
|
print(f"\n{BOLD}{'='*60}{RESET}")
|
|
print(f"{BOLD} UltrafastSecp256k1 Preflight Check{RESET}")
|
|
print(f"{BOLD}{'='*60}{RESET}\n")
|
|
|
|
# Security
|
|
if mode in ('--all', '--security'):
|
|
print(f"{BOLD}[1/6] Security Invariants{RESET}")
|
|
issues = check_security_invariants()
|
|
if issues:
|
|
for i in issues:
|
|
print(i)
|
|
lost = sum(1 for i in issues if 'LOST' in i)
|
|
if lost:
|
|
exit_code = 1
|
|
total_issues += lost
|
|
print(f" {RED}{lost} lost, {len(issues) - lost} info{RESET}\n")
|
|
else:
|
|
print(f" {GREEN}[OK] All security patterns preserved{RESET}\n")
|
|
|
|
# Narrative drift
|
|
if mode in ('--all', '--drift'):
|
|
print(f"{BOLD}[2/6] Narrative Drift Detection{RESET}")
|
|
drift_issues = check_narrative_drift()
|
|
if drift_issues:
|
|
for i in drift_issues:
|
|
print(i)
|
|
total_issues += len(drift_issues)
|
|
print(f" {YELLOW}{len(drift_issues)} stale narrative phrase(s){RESET}\n")
|
|
else:
|
|
print(f" {GREEN}[OK] No stale CT/audit narrative detected{RESET}\n")
|
|
|
|
# Coverage
|
|
if mode in ('--all', '--coverage'):
|
|
print(f"{BOLD}[3/6] Test Coverage Gaps{RESET}")
|
|
gaps = check_coverage_gaps()
|
|
if gaps:
|
|
for path, lines in sorted(gaps, key=lambda x: -x[1])[:20]:
|
|
print(f" {YELLOW}UNTESTED{RESET} {path} ({lines} lines)")
|
|
total_issues += len(gaps)
|
|
print(f" {YELLOW}{len(gaps)} core files without test coverage{RESET}\n")
|
|
else:
|
|
print(f" {GREEN}[OK] All core files have test coverage{RESET}\n")
|
|
|
|
# Freshness
|
|
if mode in ('--all', '--freshness'):
|
|
print(f"{BOLD}[4/6] Graph Freshness{RESET}")
|
|
stale, built = check_freshness()
|
|
if stale:
|
|
for kind, path, lines in stale[:15]:
|
|
print(f" {YELLOW}{kind:8s}{RESET} {path}")
|
|
if len(stale) > 15:
|
|
print(f" ... and {len(stale) - 15} more")
|
|
print(f" {YELLOW}{len(stale)} stale entries (built: {built[:19]}){RESET}")
|
|
print(f" Run: python3 scripts/build_project_graph.py --rebuild\n")
|
|
else:
|
|
print(f" {GREEN}[OK] Graph is fresh (built: {built[:19]}){RESET}\n")
|
|
|
|
# Changed files
|
|
if mode in ('--all', '--claims'):
|
|
print(f"{BOLD}[5/9] Assurance Claim Surfaces{RESET}")
|
|
claim_issues = check_claim_surfaces()
|
|
if claim_issues:
|
|
for issue in claim_issues:
|
|
print(issue)
|
|
total_issues += len(claim_issues)
|
|
exit_code = 1
|
|
print(f" {YELLOW}{len(claim_issues)} graph-backed claim surface issue(s){RESET}\n")
|
|
else:
|
|
print(f" {GREEN}[OK] Claim surfaces resolve and are graph-indexed{RESET}\n")
|
|
|
|
if mode in ('--all', '--ai-review'):
|
|
print(f"{BOLD}[6/9] AI Review Event Log{RESET}")
|
|
ai_review_issues = check_ai_review_log()
|
|
if ai_review_issues:
|
|
for issue in ai_review_issues:
|
|
print(issue)
|
|
total_issues += len(ai_review_issues)
|
|
exit_code = 1
|
|
print(f" {YELLOW}{len(ai_review_issues)} AI review-event issue(s){RESET}\n")
|
|
else:
|
|
print(f" {GREEN}[OK] AI review-event log is schema-valid{RESET}\n")
|
|
|
|
if mode in ('--all', '--gpu-evidence'):
|
|
print(f"{BOLD}[7/9] GPU Backend Evidence{RESET}")
|
|
gpu_issues = check_gpu_backend_evidence()
|
|
if gpu_issues:
|
|
for issue in gpu_issues:
|
|
print(issue)
|
|
total_issues += len(gpu_issues)
|
|
exit_code = 1
|
|
print(f" {YELLOW}{len(gpu_issues)} GPU backend evidence issue(s){RESET}\n")
|
|
else:
|
|
print(f" {GREEN}[OK] GPU backend evidence is fail-closed for publishability and ROCm/HIP promotion{RESET}\n")
|
|
|
|
# Changed files
|
|
if mode in ('--all', '--changed'):
|
|
print(f"{BOLD}[8/9] Changed Files Impact{RESET}")
|
|
changed = get_changed_files()
|
|
if changed:
|
|
print(f" {len(changed)} files changed vs HEAD:")
|
|
impacts = analyze_changed_files(changed)
|
|
for imp in impacts:
|
|
layer_color = RED if imp['layer'] == 'ct' else CYAN
|
|
print(f" {layer_color}[{imp['layer']:4s}]{RESET} {imp['file']} ({imp['lines']} lines)")
|
|
if imp['tests']:
|
|
print(f" Tests: {', '.join(imp['tests'])}")
|
|
else:
|
|
print(f" {YELLOW}Tests: NONE{RESET}")
|
|
if imp['security_patterns'] > 0:
|
|
print(f" Security patterns: {imp['security_patterns']}")
|
|
if imp['abi_routing']:
|
|
for fn, layer in imp['abi_routing'][:5]:
|
|
print(f" ABI: [{layer}] {fn}")
|
|
|
|
# Doc pairing
|
|
doc_missing = check_doc_pairing(changed)
|
|
if doc_missing:
|
|
print(f"\n {YELLOW}Doc-code pairing violations:{RESET}")
|
|
for code, doc in doc_missing:
|
|
print(f" {code} changed but {doc} not updated")
|
|
total_issues += len(doc_missing)
|
|
print()
|
|
else:
|
|
print(f" {GREEN}[OK] No uncommitted changes{RESET}\n")
|
|
|
|
# ABI
|
|
if mode in ('--all', '--abi'):
|
|
print(f"{BOLD}[9/9] ABI Surface{RESET}")
|
|
added, removed = check_abi_surface()
|
|
if added:
|
|
print(f" {CYAN}NEW functions (not in graph):{RESET}")
|
|
for fn in sorted(added):
|
|
print(f" + {fn}")
|
|
if removed:
|
|
print(f" {RED}REMOVED functions (in graph but not in header):{RESET}")
|
|
for fn in sorted(removed):
|
|
print(f" - {fn}")
|
|
exit_code = 1
|
|
total_issues += len(removed)
|
|
if not added and not removed:
|
|
print(f" {GREEN}[OK] ABI surface matches graph{RESET}")
|
|
print()
|
|
|
|
# Summary
|
|
print(f"{BOLD}{'='*60}{RESET}")
|
|
if total_issues == 0:
|
|
print(f"{GREEN}{BOLD} PREFLIGHT PASSED{RESET}")
|
|
else:
|
|
print(f"{RED}{BOLD} PREFLIGHT: {total_issues} issues found{RESET}")
|
|
print(f"{BOLD}{'='*60}{RESET}\n")
|
|
|
|
return exit_code
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(run_all(sys.argv[1:]))
|