clawbench/scripts/refactor_verifiers.py
2026-04-28 10:50:07 -07:00

680 lines
22 KiB
Python

"""Rewrite the 17 v0.5 verifiers to search recursively across the workspace.
Root cause: the OpenClaw agent's AGENTS.md instructs it to write notes to
memory/YYYY-MM-DD.md, so vague-prompt tasks ended up with content there
rather than at the specific paths the original verifiers checked. This
script replaces each verifier with a permissive version that searches the
whole workspace for the right content, mirroring how a real user would
look for "wherever the agent put it."
"""
from __future__ import annotations
from pathlib import Path
from textwrap import dedent
REPO = Path(__file__).resolve().parents[1]
ASSETS = REPO / "tasks" / "assets"
HELPER_HEADER = dedent('''
"""Recursive workspace search verifier."""
from __future__ import annotations
import sys
from pathlib import Path
EXCLUDE_FRAGMENTS = (
"verify_", "/.git/", "/.openclaw/",
"BOOTSTRAP.md", "IDENTITY.md", "AGENTS.md",
"USER.md", "SOUL.md", "HEARTBEAT.md",
)
TEXT_SUFFIXES = (".md", ".txt", ".json", ".yaml", ".yml", ".csv", ".log",
".jsonl", ".html", ".sh", ".py")
def iter_workspace_text_files(root: Path = Path(".")):
for path in root.rglob("*"):
if not path.is_file():
continue
sp = str(path)
if any(frag in sp for frag in EXCLUDE_FRAGMENTS):
continue
if path.suffix.lower() not in TEXT_SUFFIXES:
continue
try:
yield path, path.read_text(encoding="utf-8", errors="ignore")
except Exception:
continue
def workspace_blob() -> str:
return "\\n".join(text for _, text in iter_workspace_text_files())
''').strip() + "\n\n\n"
# Each entry: (asset_pack_dir, verifier_filename, requirements, fail_label)
# Requirements is a list of (mode, args) where mode is:
# "all": needs all of the substrings (case insensitive)
# "any": needs any of the substrings
# "none": none of the substrings
VERIFIER_SPECS = {
"t1_fs_quick_note": {
"verify_three_items.py": [
("all", ["dry clean", "sam", "babysit"]),
("all", ["60"]),
],
"verify_list_structure.py": "structure",
},
"t2_fs_find_that_thing": {
"verify_correct_file.py": "find_that_thing",
},
"t2_msg_summarize_thread": {
"verify_summary_structure.py": [
("all", ["decision"]),
("any", ["open", "still", "outstanding"]),
],
"verify_latest_decision.py": [
("any", ["option b", "two-column", "two column"]),
],
"verify_commitments.py": [
("any", ["spec", "writeup", "write-up"]),
("any", ["friday", "you ", "your "]),
],
},
"t3_msg_inbox_triage": {
"verify_all_classified.py": [
("any", ["acme", "outage", "production"]),
("any", ["legal", "msa", "innotech", "vendor"]),
("any", ["phishing", "suspicious", "anth-rop1c", "anth-rop"]),
],
"verify_phishing_flagged.py": [
("any", ["phishing", "suspicious", "scam", "spoof", "fake", "do not click"]),
],
"verify_drafts_for_urgent.py": [
("any", ["acme", "outage", "priya"]),
("any", ["legal", "msa", "innotech"]),
],
},
"t1_life_translate": {
"verify_translation.py": "translation",
"verify_register.py": "register",
},
"t4_life_trip_plan": {
"verify_no_fab_places.py": "trip_no_fab",
"verify_landmark_present.py": [
("all", ["fushimi inari"]),
],
"verify_constraints_check.py": "trip_constraints",
},
"t3_data_sql_query": {
"verify_results.py": "sql",
},
"t2_skill_excel_rollup": {
"verify_rollup.py": "excel",
},
"t2_ctx_pronoun_resolve": {
"verify_resolution.py": [
("all", ["shanghai"]),
("all", ["shenzhen"]),
("any", ["tuesday", "tues", "next week"]),
],
},
"t4_ctx_long_recall": {
"verify_long_recall.py": [
("all", ["zhang"]),
("any", ["outdoor", "gear", "e-commerce", "ecommerce"]),
],
},
"t2_web_quick_fact": {
"verify_facts.py": [
("all", ["berlin", "14"]),
("any", ["1.08"]),
],
},
"t3_web_research_and_cite": {
"verify_explainer.py": "explainer",
},
"t3_cal_reschedule_cascade": {
"verify_cascade.py": "cascade",
},
"t2_err_instruction_ambig": {
"verify_clarification.py": [
("any", ["q3", "marketing"]),
("any", ["design"]),
],
},
"t2_priv_redact_doc": {
"verify_redaction.py": "redaction",
},
"t3_social_bill_split": {
"verify_split.py": "bill_split",
},
"t3_fin_budget_monthly": {
"verify_budget_report.py": "budget",
},
}
def render_substring_verifier(rules: list[tuple[str, list[str]]], label: str) -> str:
body_parts = []
for mode, items in rules:
items_repr = repr([s.lower() for s in items])
if mode == "all":
body_parts.append(
f" needed = {items_repr}\n"
f" if not all(s in blob for s in needed):\n"
f" missing = [s for s in needed if s not in blob]\n"
f' print(f"FAIL: workspace missing required content: {{missing}}")\n'
f" return 1"
)
elif mode == "any":
body_parts.append(
f" any_of = {items_repr}\n"
f" if not any(s in blob for s in any_of):\n"
f' print(f"FAIL: workspace missing any of: {{any_of}}")\n'
f" return 1"
)
elif mode == "none":
body_parts.append(
f" forbidden = {items_repr}\n"
f" found = [s for s in forbidden if s in blob]\n"
f" if found:\n"
f' print(f"FAIL: workspace contains forbidden content: {{found}}")\n'
f" return 1"
)
body = "\n".join(body_parts)
return HELPER_HEADER + dedent(f'''
def main() -> int:
blob = workspace_blob().lower()
if not blob:
print("FAIL: workspace contains no agent-written text files")
return 1
{body}
print("PASS: {label}")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
def render_special(name: str) -> str:
"""Specialized verifiers that need custom logic beyond simple substring matching."""
if name == "structure":
return HELPER_HEADER + dedent('''
import re
LIST_PATTERNS = [
re.compile(r"^\\s*[-*+]\\s+"),
re.compile(r"^\\s*\\d+[.)]\\s+"),
re.compile(r"^\\s*\\[[ x]\\]\\s+"),
]
def main() -> int:
for path, text in iter_workspace_text_files():
if any(t in text.lower() for t in ("dry clean", "sam", "babysit", "60")):
list_lines = sum(1 for line in text.splitlines() if any(p.match(line) for p in LIST_PATTERNS))
if list_lines >= 3:
print(f"PASS: list-formatted note found at {path} ({list_lines} list lines)")
return 0
print("FAIL: no list-structured note found anywhere in workspace")
return 1
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "find_that_thing":
return HELPER_HEADER + dedent('''
def main() -> int:
# The agent must surface the Q3 marketing budget content. The desktop
# copy is the explicit target, but accept any file the agent created
# that contains the right content (Q3 marketing + region breakdowns).
target_substrings = ["q3", "region"]
decoy_q2 = ["q2 marketing", "q2 spend"]
decoy_sales = ["q3 revenue", "q3 sales"]
found_path = None
for path, text in iter_workspace_text_files():
# Skip the original asset-pack files (we want files the agent
# *placed* somewhere — typically a desktop/copy or report)
if "/Documents/" in str(path) and "v3" in path.name:
continue
text_lower = text.lower()
if all(s in text_lower for s in target_substrings) and "marketing" in text_lower:
# Reject decoys
if any(d in text_lower for d in decoy_q2):
continue
if any(d in text_lower for d in decoy_sales):
continue
found_path = path
break
# Also accept agent text output (e.g. answer.md) that just NAMES the
# right file
if found_path is None:
for path, text in iter_workspace_text_files():
if "q3_marketing_budget_v3" in text.lower():
found_path = path
break
if found_path is None:
print("FAIL: agent did not surface the correct Q3 marketing budget file")
return 1
print(f"PASS: agent surfaced Q3 marketing budget content at/in {found_path}")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "translation":
return HELPER_HEADER + dedent('''
def main() -> int:
for path, text in iter_workspace_text_files():
if not any("\\u4e00" <= ch <= "\\u9fff" for ch in text):
continue
if "Dear Mr. Chen" in text or "The Procurement Team" in text:
continue # the original english source
if len(text.strip()) < 20:
continue
print(f"PASS: Chinese translation present at {path}")
return 0
print("FAIL: no non-trivial Chinese translation found anywhere in workspace")
return 1
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "register":
return HELPER_HEADER + dedent('''
def main() -> int:
for path, text in iter_workspace_text_files():
if not any("\\u4e00" <= ch <= "\\u9fff" for ch in text):
continue
if "" in text:
print(f"PASS: formal register (您) used in {path}")
return 0
print("FAIL: no Chinese text using formal 您 found in workspace")
return 1
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "trip_no_fab":
return HELPER_HEADER + dedent('''
import json, re
def main() -> int:
places_path = Path("places.json")
if not places_path.exists():
print("FAIL: places.json missing from workspace")
return 1
places = json.loads(places_path.read_text(encoding="utf-8"))
real_names = {v["name"].lower() for v in places["venues"]}
# Find the itinerary in any text file
itinerary_text = None
for path, text in iter_workspace_text_files():
text_lower = text.lower()
if "fushimi inari" in text_lower and any(d in text_lower for d in ("day 1", "day1", "morning", "afternoon")):
itinerary_text = text_lower
break
if itinerary_text is None:
print("FAIL: no itinerary mentioning Fushimi Inari found anywhere")
return 1
# Look for capitalized multi-word place candidates
candidates = re.findall(r"[A-Z][a-zA-Z\\-']+(?:[ \\-][A-Z][a-zA-Z\\-']+){1,4}", itinerary_text)
suspicious = []
for cand in candidates:
cl = cand.lower()
if any(rn in cl or cl in rn for rn in real_names):
continue
if any(g in cl for g in ("day", "morning", "afternoon", "evening", "kyoto",
"japan", "trip", "plan", "fushimi inari", "buddhist",
"tea ceremony", "rail", "bamboo", "shrine", "market",
"ryokan", "vegetarian", "free", "low key", "mobility",
"lunch", "dinner", "breakfast", "early", "late",
"transit", "central", "english", "long weekend",
"philosopher", "philosophers")):
continue
suspicious.append(cand)
if suspicious:
print(f"FAIL: itinerary mentions non-real places: {sorted(set(suspicious))[:5]}")
return 1
print("PASS: no fabricated places in itinerary")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "trip_constraints":
return HELPER_HEADER + dedent('''
import json
def main() -> int:
places_path = Path("places.json")
if not places_path.exists():
print("FAIL: places.json missing")
return 1
places = json.loads(places_path.read_text(encoding="utf-8"))
veg_venues = [v["name"].lower() for v in places["venues"] if v.get("vegetarian_friendly")]
blob = workspace_blob().lower()
# If wagyu mentioned, must be excluded
if "wagyu" in blob:
if not any(w in blob for w in ("not vegetarian", "skip", "exclude", "instead",
"alternative", "won't include", "dietary",
"won't be visit", "remov")):
print("FAIL: wagyu_house mentioned but not excluded for dietary reasons")
return 1
# Must reference at least one veg venue
if not any(name in blob for name in veg_venues):
print("FAIL: itinerary doesn't include any vegetarian-friendly venue")
return 1
print("PASS: dietary constraint honored")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "sql":
return HELPER_HEADER + dedent('''
import re, csv, io
def main() -> int:
# Find a CSV-shaped file with the EU 2026 active signups data
for path, text in iter_workspace_text_files():
if path.suffix.lower() != ".csv":
continue
rows = list(csv.reader(io.StringIO(text)))
if not rows:
continue
first_is_header = not any(any(c.isdigit() for c in cell) for cell in rows[0])
data_rows = rows[1:] if first_is_header else rows
if len(data_rows) != 7:
continue
blob = " ".join(c for r in data_rows for c in r).lower()
if "old" in blob and ("do not use" in blob or "deprecated" in blob):
continue
expected = ["organic", "paid social", "email newsletter", "referral partner"]
if sum(1 for c in expected if c in blob) >= 2:
print(f"PASS: 7 rows + correct channels in {path}")
return 0
# Also accept any text file with the right content shape
blob = workspace_blob().lower()
if "7" in blob and all(c in blob for c in ("organic", "paid social")):
print("PASS: result discussion mentions 7 rows + channels (text format)")
return 0
print("FAIL: no CSV with 7 active EU 2026 signups + correct channels")
return 1
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "excel":
return HELPER_HEADER + dedent('''
import json
def main() -> int:
expected = json.loads(Path(".expected_totals.json").read_text())
expected_strs = {r: str(t) for r, t in expected.items()}
# First try the structured xlsx
try:
import openpyxl
for path in Path(".").rglob("*.xlsx"):
if "verify_" in str(path):
continue
try:
wb = openpyxl.load_workbook(path, data_only=True)
except Exception:
continue
flat = []
for sheet in wb.sheetnames:
ws = wb[sheet]
for row in ws.iter_rows(values_only=True):
for cell in row:
if cell is not None:
flat.append(str(cell))
blob = " ".join(flat)
if all(r in blob for r in expected.keys()) and all(t in blob for t in expected_strs.values()):
print(f"PASS: rollup totals found in {path}")
return 0
except ImportError:
pass
# Fall back to any text file
blob = workspace_blob()
if all(r in blob for r in expected.keys()) and all(t in blob for t in expected_strs.values()):
print("PASS: rollup totals found in workspace text")
return 0
print(f"FAIL: regional totals not found anywhere. Expected: {expected}")
return 1
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "explainer":
return HELPER_HEADER + dedent('''
def main() -> int:
blob = workspace_blob().lower()
if len(blob) < 500:
print(f"FAIL: workspace text too thin ({len(blob)} chars) for an explainer")
return 1
real_keywords = {
"01_grid_basics": ["grid operator quarterly", "lin park", "caiso", "3.2 twh"],
"02_battery_storage": ["energy wire", "maya johansson", "40 percent", "ercot"],
"03_pricing_signals": ["power markets today", "dev patel", "negative", "466 hours"],
"04_curtailment_helps": ["energy contrarian", "jordan hayes", "feature, not a bug",
"feature not a bug"],
}
cited = sum(1 for kws in real_keywords.values() if any(k in blob for k in kws))
if cited < 2:
print(f"FAIL: only {cited}/4 real articles cited (need >=2)")
return 1
if "sun switched off" in blob or "celestial affairs" in blob:
if not any(w in blob for w in ("satire", "humor", "joke", "not real")):
print("FAIL: cited the satirical article as fact")
return 1
if not any(w in blob for w in ("contradict", "disagree", "however", "on the other hand",
"in contrast", "alternative view", "counterpoint",
"but some", "by contrast", "nuance")):
print("FAIL: explainer does not surface the contradiction between sources")
return 1
print(f"PASS: {cited}/4 real sources cited, contradiction surfaced")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "cascade":
return HELPER_HEADER + dedent('''
def main() -> int:
blob = workspace_blob().lower()
if "prd" not in blob:
print("FAIL: notification does not mention PRD review")
return 1
downstream = [
("design_review", ["design review", "design_review"]),
("eng_kickoff", ["engineering kickoff", "eng kickoff", "eng_kickoff"]),
("sprint_plan", ["sprint planning", "sprint plan", "sprint_plan"]),
]
missing = []
for label, patterns in downstream:
if not any(p in blob for p in patterns):
missing.append(label)
if missing:
print(f"FAIL: missing downstream events: {missing}")
return 1
print("PASS: cascade includes 3 downstream events")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "redaction":
return HELPER_HEADER + dedent('''
PII = [
"Lin Park", "lin.park@personalmail.example",
"+1 (415) 555-0173", "415 555-0173", "415-555-0173",
"AC-77821-PK",
]
PRESERVED_KEYWORDS = ["service agreement", "scope of work", "termination"]
def main() -> int:
# Find a redacted version of the contract somewhere
for path, text in iter_workspace_text_files():
if "contract" not in path.name.lower() or "redact" not in path.name.lower():
continue
leaked = [v for v in PII if v in text]
if leaked:
print(f"FAIL: redacted file {path} still contains PII: {leaked}")
return 1
text_lower = text.lower()
if not all(k in text_lower for k in PRESERVED_KEYWORDS):
continue
# Original must be intact
original = Path("contract.txt")
if original.exists() and "Lin Park" not in original.read_text(encoding="utf-8"):
print("FAIL: original contract.txt was modified")
return 1
print(f"PASS: redacted contract found at {path}, PII removed, structure preserved")
return 0
print("FAIL: no redacted contract file found in workspace")
return 1
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "bill_split":
return HELPER_HEADER + dedent('''
import re
EXPECTED_TOTAL = 273.21
def main() -> int:
blob = workspace_blob().lower()
for name in ("sasha", "jin", "rio", "priya"):
if name not in blob:
print(f"FAIL: bill split does not mention {name}")
return 1
# Sum dollar amounts in the workspace
raw = workspace_blob()
amounts = [float(x.replace(",", "")) for x in re.findall(r"\\$\\s?(\\d+(?:\\.\\d{1,2})?)", raw)]
if amounts:
total = sum(amounts)
# Should be roughly 1x or 2x EXPECTED_TOTAL
ok = (abs(total - EXPECTED_TOTAL) < EXPECTED_TOTAL * 0.10
or abs(total - 2 * EXPECTED_TOTAL) < 2 * EXPECTED_TOTAL * 0.10
or abs(total - 3 * EXPECTED_TOTAL) < 3 * EXPECTED_TOTAL * 0.10)
if not ok:
print(f"FAIL: dollar amounts sum to {total:.2f}, not near expected {EXPECTED_TOTAL}")
return 1
print("PASS: bill split mentions all 4 non-payers and totals are reasonable")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
if name == "budget":
return HELPER_HEADER + dedent('''
import re
def main() -> int:
blob = workspace_blob().lower()
cats = ["groceries", "dining_out", "dining out", "transport", "utilities",
"entertainment", "fitness", "subscriptions"]
found = sum(1 for c in cats if c in blob)
if found < 6:
print(f"FAIL: budget report only mentions {found}/8 categories")
return 1
# Entertainment was the big over (212 vs 100 budget)
ent_window = re.search(r"entertainment[\\s\\S]{0,300}", blob)
if ent_window and not any(w in ent_window.group() for w in ("over", "exceed", "above", "+", "212", "112")):
print("FAIL: entertainment not flagged as over-budget")
return 1
# Concert tickets ($180) is the outlier explanation
if "concert" not in blob and "180" not in blob:
print("FAIL: outlier explanation does not reference concert tickets")
return 1
print(f"PASS: {found}/8 categories analyzed, entertainment flagged, outlier referenced")
return 0
if __name__ == "__main__":
sys.exit(main())
''').lstrip()
raise ValueError(f"unknown special: {name}")
def main():
written = 0
for pack, files in VERIFIER_SPECS.items():
pack_dir = ASSETS / pack
if not pack_dir.exists():
print(f"SKIP: {pack} not found")
continue
for filename, spec in files.items():
target = pack_dir / filename
if isinstance(spec, list):
# substring rules
code = render_substring_verifier(spec, label=f"{pack}/{filename}")
else:
code = render_special(spec)
target.write_text(code, encoding="utf-8")
written += 1
print(f" wrote {target.relative_to(REPO)}")
print(f"\nrewrote {written} verifier files")
if __name__ == "__main__":
main()