UltrafastSecp256k1/scripts/audit_gate.py
shrec e9b4f2cdb3
audit: fix reporting bugs, add audit manifest + gate
Bug fixes (bug-bounty pass):
- export_assurance.py: test_coverage queried wrong table (edges instead
  of function_test_map) — all 135 CPU ABI functions now show coverage
- export_assurance.py: add crash guard for missing v_symbol_reasoning view
- build_project_graph.py: scan ufsecp_gpu.h for ABI functions (was missing
  18 GPU functions from graph)
- build_project_graph.py: add 'gpu' prefix to categorize_abi_func
- preflight.py: include ufsecp_gpu.h in ABI surface check

New:
- docs/AUDIT_MANIFEST.md: 10 audit principles with severity levels,
  automation rules, and extension guide
- scripts/audit_gate.py: automated audit gate implementing all 10
  principles (P1-P10), CLI with per-check flags, JSON output for CI

Graph rebuilt: 153 ABI functions (was 135), 0 coverage gaps.
2026-03-23 17:43:52 +00:00

607 lines
21 KiB
Python

#!/usr/bin/env python3
"""
audit_gate.py -- Automated audit gate for UltrafastSecp256k1
Implements the 10 audit principles defined in docs/AUDIT_MANIFEST.md.
Every principle maps to a check that produces FAIL/WARN/INFO findings.
Exit code:
0 — all checks passed (may have WARN/INFO)
1 — at least one FAIL
Usage:
python3 scripts/audit_gate.py # full gate
python3 scripts/audit_gate.py --abi-completeness # single check
python3 scripts/audit_gate.py --json # JSON output
python3 scripts/audit_gate.py --json -o report.json # write JSON to file
"""
import json
import os
import re
import sqlite3
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
LIB_ROOT = SCRIPT_DIR.parent
DB_PATH = LIB_ROOT / ".project_graph.db"
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
CYAN = '\033[96m'
BOLD = '\033[1m'
RESET = '\033[0m'
def get_conn():
if not DB_PATH.exists():
print(f"{RED}ERROR: Graph DB not found at {DB_PATH}{RESET}", file=sys.stderr)
print(f"Run: python3 scripts/build_project_graph.py --rebuild", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def scan_header_functions():
"""Extract all ufsecp_* from public headers."""
headers = [
LIB_ROOT / 'include' / 'ufsecp' / 'ufsecp.h',
LIB_ROOT / 'include' / 'ufsecp' / 'ufsecp_gpu.h',
LIB_ROOT / 'include' / 'ufsecp' / 'ufsecp_version.h',
]
api_re = re.compile(r'UFSECP_API\s+.*?(ufsecp_\w+)\s*\(')
fns = set()
for h in headers:
if not h.exists():
continue
with open(h, 'r', errors='replace') as f:
for line in f:
m = api_re.search(line)
if m:
fns.add(m.group(1))
return fns
def scan_ledger_functions():
"""Extract function names from FEATURE_ASSURANCE_LEDGER.md."""
ledger = LIB_ROOT / 'docs' / 'FEATURE_ASSURANCE_LEDGER.md'
if not ledger.exists():
return set()
fn_re = re.compile(r'^\|\s*`?(ufsecp_\w+)`?\s*\|')
fns = set()
with open(ledger, 'r', errors='replace') as f:
for line in f:
m = fn_re.search(line)
if m:
fns.add(m.group(1))
return fns
# ---------------------------------------------------------------------------
# P1 — ABI Completeness
# ---------------------------------------------------------------------------
def check_abi_completeness(conn):
findings = []
header_fns = scan_header_functions()
graph_fns = {r['name'] for r in conn.execute("SELECT name FROM c_abi_functions").fetchall()}
ledger_fns = scan_ledger_functions()
# Header vs graph
missing_graph = sorted(header_fns - graph_fns)
extra_graph = sorted(graph_fns - header_fns)
if missing_graph:
findings.append(('FAIL', f'{len(missing_graph)} header functions not in graph: {", ".join(missing_graph[:5])}'))
if extra_graph:
findings.append(('WARN', f'{len(extra_graph)} graph functions not in headers: {", ".join(extra_graph[:5])}'))
# Header vs ledger
missing_ledger = sorted(header_fns - ledger_fns)
extra_ledger = sorted(ledger_fns - header_fns)
if missing_ledger:
findings.append(('WARN', f'{len(missing_ledger)} header functions not in FEATURE_ASSURANCE_LEDGER'))
if extra_ledger:
findings.append(('WARN', f'{len(extra_ledger)} ledger functions removed from headers'))
if not findings:
findings.append(('PASS', f'All {len(header_fns)} header functions in graph and ledger'))
return 'P1: ABI Completeness', findings
# ---------------------------------------------------------------------------
# P2 — Test Coverage Mapping
# ---------------------------------------------------------------------------
def check_test_coverage(conn):
findings = []
# Coverage gaps view
gaps = conn.execute("SELECT * FROM v_coverage_gaps").fetchall()
if gaps:
names = [r['name'] if 'name' in r.keys() else str(dict(r)) for r in gaps[:5]]
findings.append(('FAIL', f'{len(gaps)} coverage gaps: {", ".join(names)}'))
# ABI functions without test mapping
all_abi = {r['name'] for r in conn.execute("SELECT name FROM c_abi_functions").fetchall()}
mapped = {r[0] for r in conn.execute(
"SELECT DISTINCT function_name FROM function_test_map WHERE function_name LIKE 'ufsecp_%'"
).fetchall()}
unmapped = sorted(all_abi - mapped)
if unmapped:
# GPU functions may not have direct test mappings
gpu_unmapped = [f for f in unmapped if 'gpu' in f]
non_gpu_unmapped = [f for f in unmapped if 'gpu' not in f]
if non_gpu_unmapped:
findings.append(('FAIL', f'{len(non_gpu_unmapped)} non-GPU ABI functions without test mapping: {", ".join(non_gpu_unmapped[:5])}'))
if gpu_unmapped:
findings.append(('WARN', f'{len(gpu_unmapped)} GPU ABI functions without test mapping (GPU tests map differently)'))
if not findings:
findings.append(('PASS', f'All {len(all_abi)} ABI functions have test coverage'))
return 'P2: Test Coverage', findings
# ---------------------------------------------------------------------------
# P3 — Security Pattern Preservation
# ---------------------------------------------------------------------------
def check_security_patterns(conn):
findings = []
lost_count = 0
new_count = 0
expected = conn.execute("""
SELECT source_file, pattern, COUNT(*) as cnt
FROM security_patterns GROUP BY source_file, pattern
""").fetchall()
patterns_re = {
'secure_erase': re.compile(r'secure_erase\s*\('),
'value_barrier': re.compile(r'value_barrier\s*\('),
'CLASSIFY': re.compile(r'SECP256K1_CLASSIFY\s*\('),
'DECLASSIFY': re.compile(r'SECP256K1_DECLASSIFY\s*\('),
}
for r in expected:
src_file, pat_name, exp_cnt = r['source_file'], r['pattern'], r['cnt']
filepath = LIB_ROOT / src_file
if not filepath.exists():
findings.append(('FAIL', f'MISSING {src_file} (expected {exp_cnt} {pat_name})'))
lost_count += 1
continue
pat_re = patterns_re.get(pat_name)
if not pat_re:
continue
count = 0
try:
with open(filepath, 'r', errors='replace') as f:
for line in f:
stripped = line.strip()
if pat_name in ('secure_erase', 'value_barrier'):
if stripped.startswith('//') or stripped.startswith('#include'):
continue
if pat_re.search(line):
count += 1
except Exception:
findings.append(('FAIL', f'UNREADABLE {src_file}'))
continue
if count < exp_cnt:
findings.append(('FAIL', f'LOST {src_file}: {pat_name} {exp_cnt}->{count} ({exp_cnt - count} removed)'))
lost_count += 1
elif count > exp_cnt:
findings.append(('INFO', f'NEW {src_file}: {pat_name} {exp_cnt}->{count} (+{count - exp_cnt}, rebuild graph)'))
new_count += 1
if lost_count == 0 and not any(f[0] == 'FAIL' for f in findings):
total = sum(r['cnt'] for r in expected)
findings.insert(0, ('PASS', f'All {total} security patterns preserved'))
return 'P3: Security Patterns', findings
# ---------------------------------------------------------------------------
# P4 — CT Layer Integrity
# ---------------------------------------------------------------------------
def check_ct_integrity(conn):
findings = []
ct_funcs = conn.execute("""
SELECT abi_function FROM abi_routing WHERE layer='ct'
""").fetchall()
ct_names = [r['abi_function'] for r in ct_funcs]
ct_doc = LIB_ROOT / 'docs' / 'CT_VERIFICATION.md'
if ct_doc.exists():
content = ct_doc.read_text(errors='replace')
missing_docs = [fn for fn in ct_names if fn not in content]
if missing_docs:
findings.append(('WARN', f'{len(missing_docs)} CT functions not in CT_VERIFICATION.md: {", ".join(missing_docs[:5])}'))
else:
findings.append(('WARN', 'CT_VERIFICATION.md not found'))
if not findings:
findings.append(('PASS', f'All {len(ct_names)} CT-routed functions documented'))
return 'P4: CT Integrity', findings
# ---------------------------------------------------------------------------
# P5 — Narrative Consistency
# ---------------------------------------------------------------------------
STALE_PHRASES = [
(r'(?i)\bno\s+formal\s+(ct\s+)?verification\b',
'Claims no formal CT verification'),
(r'(?i)\btool\s+integration\s+not\s+yet\s+done\b',
'Claims tool integration not done'),
(r'(?i)\bno\s+formal\s+verification\s+applied\b',
'Claims no formal verification applied'),
]
NARRATIVE_FILES = [
'docs/AUDIT_READINESS_REPORT_v1.md',
'audit/AUDIT_TEST_PLAN.md',
'docs/TEST_MATRIX.md',
'audit/run_full_audit.sh',
'audit/run_full_audit.ps1',
]
HISTORICAL_MARKER = re.compile(r'(?i)(historical\s+report|superseded\s+by|snapshot\s+from\s+v\d)')
def check_narrative(conn):
findings = []
for pat_str, description in STALE_PHRASES:
pat = re.compile(pat_str)
for rel_path in NARRATIVE_FILES:
filepath = LIB_ROOT / rel_path
if not filepath.exists():
continue
try:
content = filepath.read_text(errors='replace')
except Exception:
continue
if HISTORICAL_MARKER.search(content[:500]):
continue
for i, line in enumerate(content.splitlines(), 1):
if pat.search(line):
findings.append(('WARN', f'DRIFT {rel_path}:{i}{description}'))
if not findings:
findings.append(('PASS', 'No stale narrative detected'))
return 'P5: Narrative Consistency', findings
# ---------------------------------------------------------------------------
# P6 — Graph Freshness
# ---------------------------------------------------------------------------
def check_freshness(conn):
findings = []
built_str = conn.execute("SELECT value FROM meta WHERE key='built_at'").fetchone()['value']
built_dt = datetime.fromisoformat(built_str)
stale = []
rows = conn.execute("""
SELECT path FROM source_files WHERE layer IN ('fast','ct','abi')
""").fetchall()
for r in rows:
filepath = LIB_ROOT / r['path']
if not filepath.exists():
stale.append(('DELETED', r['path']))
continue
mtime = datetime.fromtimestamp(filepath.stat().st_mtime, tz=timezone.utc)
if mtime > built_dt:
stale.append(('MODIFIED', r['path']))
if stale:
findings.append(('WARN', f'{len(stale)} files modified/deleted since graph build ({built_str[:19]})'))
for kind, path in stale[:5]:
findings.append(('INFO', f'{kind}: {path}'))
else:
findings.append(('PASS', f'Graph is fresh (built: {built_str[:19]})'))
return 'P6: Graph Freshness', findings
# ---------------------------------------------------------------------------
# P7 — GPU Backend Parity
# ---------------------------------------------------------------------------
def check_gpu_parity(conn):
findings = []
# Check all GPU ABI functions are in graph
header_gpu = {fn for fn in scan_header_functions() if 'gpu' in fn}
graph_gpu = {r['name'] for r in conn.execute(
"SELECT name FROM c_abi_functions WHERE category='gpu'"
).fetchall()}
missing = sorted(header_gpu - graph_gpu)
if missing:
findings.append(('FAIL', f'{len(missing)} GPU header functions not in graph: {", ".join(missing)}'))
# Scan for undocumented Unsupported returns
unsupported_dirs = [
LIB_ROOT / 'gpu' / 'src',
LIB_ROOT / 'opencl',
LIB_ROOT / 'metal',
]
unsup_re = re.compile(r'Unsupported')
todo_re = re.compile(r'TODO\(parity\)|PARITY-EXCEPTION')
undocumented = []
for scan_dir in unsupported_dirs:
if not scan_dir.exists():
continue
for root, dirs, files in os.walk(scan_dir):
for fname in files:
fpath = Path(root) / fname
try:
lines = fpath.read_text(errors='replace').splitlines()
except Exception:
continue
for i, line in enumerate(lines):
if unsup_re.search(line):
# Check preceding 3 lines for TODO or PARITY-EXCEPTION
context = '\n'.join(lines[max(0, i-3):i+1])
if not todo_re.search(context):
rel = str(fpath.relative_to(LIB_ROOT))
undocumented.append(f'{rel}:{i+1}')
if undocumented:
findings.append(('WARN', f'{len(undocumented)} undocumented Unsupported returns'))
for loc in undocumented[:5]:
findings.append(('INFO', f' {loc}'))
if not findings:
findings.append(('PASS', f'GPU parity OK ({len(header_gpu)} GPU ABI functions)'))
return 'P7: GPU Parity', findings
# ---------------------------------------------------------------------------
# P8 — Test Target Documentation
# ---------------------------------------------------------------------------
def check_test_docs(conn):
findings = []
# Actual CTest targets
actual = set()
test_re = re.compile(r'add_test\s*\(\s*NAME\s+(\S+)')
for cmake_file in LIB_ROOT.rglob('CMakeLists.txt'):
rel = str(cmake_file.relative_to(LIB_ROOT))
if rel.startswith('build') or '_build' in rel:
continue
try:
for line in open(cmake_file, 'r', errors='replace'):
m = test_re.search(line)
if m:
actual.add(m.group(1))
except Exception:
continue
# Documented in TEST_MATRIX.md
matrix = LIB_ROOT / 'docs' / 'TEST_MATRIX.md'
documented = set()
if matrix.exists():
content = matrix.read_text(errors='replace')
for target in actual:
if target in content:
documented.add(target)
missing = sorted(actual - documented)
if missing:
findings.append(('WARN', f'{len(missing)} CTest targets not in TEST_MATRIX.md: {", ".join(missing[:5])}'))
if not findings:
findings.append(('PASS', f'All {len(actual)} CTest targets documented'))
return 'P8: Test Documentation', findings
# ---------------------------------------------------------------------------
# P9 — ABI Routing Consistency
# ---------------------------------------------------------------------------
def check_routing(conn):
findings = []
ct_funcs = conn.execute("""
SELECT abi_function, internal_call FROM abi_routing WHERE layer='ct'
""").fetchall()
for r in ct_funcs:
call = r['internal_call'] or ''
if call and not any(kw in call.lower() for kw in ['ct_', 'constant_time', 'ct::']):
findings.append(('INFO', f'{r["abi_function"]} routed CT but calls {call}'))
if not any(f[0] in ('FAIL', 'WARN') for f in findings):
findings.insert(0, ('PASS', f'ABI routing consistent ({len(ct_funcs)} CT functions)'))
return 'P9: Routing Consistency', findings
# ---------------------------------------------------------------------------
# P10 — Doc-Code Pairing
# ---------------------------------------------------------------------------
DOC_PAIRS = {
'include/ufsecp/ufsecp.h': ['docs/API_REFERENCE.md', 'docs/USER_GUIDE.md'],
'include/ufsecp/ufsecp_impl.cpp': ['docs/API_REFERENCE.md'],
'CMakeLists.txt': ['docs/BUILDING.md'],
'cpu/src/musig2.cpp': ['docs/API_REFERENCE.md'],
'cpu/src/frost.cpp': ['docs/API_REFERENCE.md'],
'cpu/src/ct_sign.cpp': ['docs/CT_VERIFICATION.md', 'docs/SECURITY_CLAIMS.md'],
'cpu/src/ct_field.cpp': ['docs/CT_VERIFICATION.md'],
'cpu/src/ct_scalar.cpp': ['docs/CT_VERIFICATION.md'],
'cpu/src/ct_point.cpp': ['docs/CT_VERIFICATION.md'],
}
def check_doc_pairing(conn):
findings = []
# Get changed files
try:
result = subprocess.run(
['git', 'diff', '--name-only', 'HEAD'],
capture_output=True, text=True, cwd=str(LIB_ROOT)
)
changed = {f.strip() for f in result.stdout.strip().split('\n') if f.strip()}
result2 = subprocess.run(
['git', 'diff', '--cached', '--name-only'],
capture_output=True, text=True, cwd=str(LIB_ROOT)
)
changed |= {f.strip() for f in result2.stdout.strip().split('\n') if f.strip()}
except Exception:
changed = set()
if not changed:
findings.append(('PASS', 'No uncommitted changes'))
return 'P10: Doc-Code Pairing', findings
for code_file, expected_docs in DOC_PAIRS.items():
if any(code_file in cf for cf in changed):
for doc in expected_docs:
if not any(doc in cf for cf in changed):
findings.append(('WARN', f'{code_file} changed but {doc} not updated'))
if not findings:
findings.append(('PASS', f'{len(changed)} changed files have matching docs'))
return 'P10: Doc-Code Pairing', findings
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
CHECK_MAP = {
'--abi-completeness': check_abi_completeness,
'--test-coverage': check_test_coverage,
'--security-patterns': check_security_patterns,
'--ct-integrity': check_ct_integrity,
'--narrative': check_narrative,
'--freshness': check_freshness,
'--gpu-parity': check_gpu_parity,
'--test-docs': check_test_docs,
'--routing': check_routing,
'--doc-pairing': check_doc_pairing,
}
ALL_CHECKS = [
check_abi_completeness,
check_test_coverage,
check_security_patterns,
check_ct_integrity,
check_narrative,
check_freshness,
check_gpu_parity,
check_test_docs,
check_routing,
check_doc_pairing,
]
def main():
args = sys.argv[1:]
json_mode = '--json' in args
out_file = None
if '-o' in args:
idx = args.index('-o')
if idx + 1 < len(args):
out_file = args[idx + 1]
# Select checks
selected = []
for arg in args:
if arg in CHECK_MAP:
selected.append(CHECK_MAP[arg])
if not selected:
selected = ALL_CHECKS
conn = get_conn()
results = []
has_fail = False
for check_fn in selected:
title, findings = check_fn(conn)
results.append({'check': title, 'findings': findings})
if any(f[0] == 'FAIL' for f in findings):
has_fail = True
conn.close()
if json_mode:
report = {
'generated_at': datetime.now(timezone.utc).isoformat(),
'status': 'FAIL' if has_fail else 'PASS',
'checks': [
{
'name': r['check'],
'status': 'FAIL' if any(f[0] == 'FAIL' for f in r['findings']) else 'PASS',
'findings': [{'severity': f[0], 'message': f[1]} for f in r['findings']],
}
for r in results
],
}
output = json.dumps(report, indent=2)
if out_file:
Path(out_file).write_text(output)
print(f"Report written to {out_file}", file=sys.stderr)
else:
print(output)
else:
print(f"\n{BOLD}{'='*60}{RESET}")
print(f"{BOLD} UltrafastSecp256k1 Audit Gate{RESET}")
print(f"{BOLD}{'='*60}{RESET}\n")
for r in results:
title = r['check']
findings = r['findings']
check_fail = any(f[0] == 'FAIL' for f in findings)
check_warn = any(f[0] == 'WARN' for f in findings)
if check_fail:
icon = f"{RED}[FAIL]{RESET}"
elif check_warn:
icon = f"{YELLOW}[WARN]{RESET}"
else:
icon = f"{GREEN}[PASS]{RESET}"
print(f"{BOLD}{title}{RESET} {icon}")
for sev, msg in findings:
if sev == 'FAIL':
print(f" {RED}{sev}{RESET} {msg}")
elif sev == 'WARN':
print(f" {YELLOW}{sev}{RESET} {msg}")
elif sev == 'INFO':
print(f" {CYAN}{sev}{RESET} {msg}")
elif sev == 'PASS':
print(f" {GREEN}{sev}{RESET} {msg}")
print()
print(f"{BOLD}{'='*60}{RESET}")
if has_fail:
fail_count = sum(1 for r in results for f in r['findings'] if f[0] == 'FAIL')
print(f"{RED}{BOLD} AUDIT GATE: FAILED ({fail_count} blocking findings){RESET}")
else:
warn_count = sum(1 for r in results for f in r['findings'] if f[0] == 'WARN')
if warn_count:
print(f"{YELLOW}{BOLD} AUDIT GATE: PASSED with {warn_count} warnings{RESET}")
else:
print(f"{GREEN}{BOLD} AUDIT GATE: PASSED{RESET}")
print(f"{BOLD}{'='*60}{RESET}\n")
return 1 if has_fail else 0
if __name__ == '__main__':
sys.exit(main())