#!/usr/bin/env bash # red-team.sh — Adversarial floor-attack agent runner. # # Spawns a Claude sub-agent with tools and a goal: make ethPerToken() decrease. # The agent iterates freely — snapshot → strategy → check floor → revert → repeat. # # Usage: red-team.sh # # Exit codes: # 0 floor held (no confirmed decrease) # 1 floor broken (agent found a strategy that decreased ethPerToken) # 2 infra error (stack not running, missing dependency, etc.) # # Environment overrides: # CLAUDE_TIMEOUT seconds for the agent run (default: 7200) # RPC_URL Anvil RPC endpoint (default: http://localhost:8545) set -euo pipefail CAST=/home/debian/.foundry/bin/cast FORGE=/home/debian/.foundry/bin/forge RPC_URL="${RPC_URL:-http://localhost:8545}" CLAUDE_TIMEOUT="${CLAUDE_TIMEOUT:-7200}" REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" REPORT_DIR="$REPO_ROOT/tmp" REPORT="$REPORT_DIR/red-team-report.txt" STREAM_LOG="$REPORT_DIR/red-team-stream.jsonl" MEMORY_FILE="$REPO_ROOT/tmp/red-team-memory.jsonl" CROSS_PATTERNS_FILE="$REPO_ROOT/tools/red-team/cross-patterns.jsonl" ATTACK_EXPORT="$REPORT_DIR/red-team-attacks.jsonl" ATTACK_SNAPSHOTS="$REPORT_DIR/red-team-snapshots.jsonl" DEPLOYMENTS="$REPO_ROOT/onchain/deployments-local.json" # ── Candidate metadata (set by red-team-sweep.sh; defaults to unknown for standalone runs) ─ CANDIDATE_NAME="${CANDIDATE_NAME:-unknown}" OPTIMIZER_PROFILE="${OPTIMIZER_PROFILE:-unknown}" CANDIDATE_COMMIT="$(git -C "$REPO_ROOT" rev-parse HEAD 2>/dev/null || echo "unknown")" # ── Anvil accounts ───────────────────────────────────────────────────────────── # Account 8 — adversary (10k ETH, 0 KRK) ADV_PK=0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97 # Account 2 — recenter caller (recenter is public, any account can call) RECENTER_PK=0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a # ── Infrastructure constants ─────────────────────────────────────────────────── WETH=0x4200000000000000000000000000000000000006 # SwapRouter02 and NonfungiblePositionManager — resolved by detect_periphery() after Anvil is verified SWAP_ROUTER_SEPOLIA=0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4 SWAP_ROUTER_MAINNET=0x2626664c2603336E57B271c5C0b26F421741e481 NPM_SEPOLIA=0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2 NPM_MAINNET=0x03a520B32c04bf3beef7BEb72E919cF822Ed34F3 SWAP_ROUTER="" NPM="" POOL_FEE=10000 # Detect chain ID and select the correct periphery addresses (mirrors bootstrap-common.sh). # Must be called after Anvil is verified to be accessible. detect_periphery() { local chain_id chain_id=$("$CAST" chain-id --rpc-url "$RPC_URL" 2>/dev/null || echo "") if [[ "$chain_id" == "8453" ]]; then SWAP_ROUTER="$SWAP_ROUTER_MAINNET" NPM="$NPM_MAINNET" log "Detected Base mainnet (chain ID 8453) — using mainnet periphery addresses" else SWAP_ROUTER="$SWAP_ROUTER_SEPOLIA" NPM="$NPM_SEPOLIA" log "Using Base Sepolia periphery addresses (chain ID: ${chain_id:-unknown})" fi } # ── Logging helpers ──────────────────────────────────────────────────────────── log() { echo "[red-team] $*"; } die() { echo "[red-team] ERROR: $*" >&2; exit 2; } # ── Prerequisites ────────────────────────────────────────────────────────────── command -v "$CAST" &>/dev/null || die "cast not found at $CAST" command -v "$FORGE" &>/dev/null || die "forge not found at $FORGE" command -v claude &>/dev/null || die "claude CLI not found (install: npm i -g @anthropic-ai/claude-code)" command -v python3 &>/dev/null || die "python3 not found" command -v jq &>/dev/null || die "jq not found" # ── 1. Fresh stack via bootstrap-light ───────────────────────────────────────── SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" log "Running bootstrap-light ..." bash "$SCRIPT_DIR/bootstrap-light.sh" || die "bootstrap-light failed" # Verify Anvil responds "$CAST" chain-id --rpc-url "$RPC_URL" >/dev/null 2>&1 \ || die "Anvil not accessible at $RPC_URL after bootstrap-light" # Select network-appropriate periphery addresses detect_periphery # ── 2. Read contract addresses ───────────────────────────────────────────────── [[ -f "$DEPLOYMENTS" ]] || die "deployments-local.json not found at $DEPLOYMENTS (bootstrap not complete)" KRK=$(jq -r '.contracts.Kraiken' "$DEPLOYMENTS") STAKE=$(jq -r '.contracts.Stake' "$DEPLOYMENTS") LM=$(jq -r '.contracts.LiquidityManager' "$DEPLOYMENTS") OPT=$(jq -r '.contracts.OptimizerProxy' "$DEPLOYMENTS") V3_FACTORY=$(jq -r '.contracts.V3Factory' "$DEPLOYMENTS") POOL=$(jq -r '.contracts.Pool' "$DEPLOYMENTS") for var in KRK STAKE LM OPT V3_FACTORY POOL; do val="${!var}" [[ -n "$val" && "$val" != "null" ]] \ || die "$var address missing from deployments-local.json — was bootstrap successful?" done log " KRK: $KRK" log " STAKE: $STAKE" log " LM: $LM" log " OPT: $OPT" log " V3_FACTORY: $V3_FACTORY" log " Pool: $POOL" # Derive Anvil account addresses from their private keys ADV_ADDR=$("$CAST" wallet address --private-key "$ADV_PK") RECENTER_ADDR=$("$CAST" wallet address --private-key "$RECENTER_PK") log " Adversary: $ADV_ADDR (account 8)" log " Recenter: $RECENTER_ADDR (account 2)" # ── 3a. recenter() is now public (no recenterAccess needed) ── # Any address can call recenter() — TWAP oracle enforces safety. log "recenter() is public — no access grant needed" # ── 3b. Set feeDestination to LM itself (fees accrue as liquidity) ───────────── # setFeeDestination allows repeated EOA sets; setting to a contract locks it permanently. # The deployer (Anvil account 0) deployed LiquidityManager and may call setFeeDestination again. # DEPLOYER_PK is Anvil's deterministic account-0 key — valid ONLY against a local ephemeral # Anvil instance. Never run this script against a non-ephemeral or shared-state chain. DEPLOYER_PK=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 log "Setting feeDestination to LM ($LM) ..." "$CAST" send --rpc-url "$RPC_URL" --private-key "$DEPLOYER_PK" \ "$LM" "setFeeDestination(address)" "$LM" >/dev/null 2>&1 \ || die "setFeeDestination($LM) failed" VERIFY=$("$CAST" call "$LM" "feeDestination()(address)" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g') log " feeDestination set to: $VERIFY" [[ "${VERIFY,,}" == "${LM,,}" ]] || die "feeDestination verification failed: expected $LM, got $VERIFY" # ── 3c. Fund LM with 1000 ETH and deploy into positions via recenter ─────────── # Send ETH as WETH (LM uses WETH internally), then recenter to deploy into positions. # Without recenter, the ETH sits idle and the first recenter mints massive KRK. log "Funding LM with 1000 ETH ..." # Wrap to WETH and transfer to LM "$CAST" send "$WETH" "deposit()" --value 1000ether \ --private-key "$ADV_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \ || die "Failed to wrap ETH" "$CAST" send "$WETH" "transfer(address,uint256)" "$LM" 1000000000000000000000 \ --private-key "$ADV_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \ || die "Failed to transfer WETH to LM" # Recenter to deploy the new WETH into positions (establishes realistic baseline) log "Recentering to deploy funded WETH into positions ..." "$CAST" send "$LM" "recenter()" \ --private-key "$RECENTER_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \ || log " WARNING: initial recenter failed (may need amplitude — mining blocks)" # Advance time and mine blocks, then retry recenter for _i in $(seq 1 3); do "$CAST" rpc evm_increaseTime 600 --rpc-url "$RPC_URL" >/dev/null 2>&1 for _b in $(seq 1 50); do "$CAST" rpc evm_mine --rpc-url "$RPC_URL" >/dev/null 2>&1 done "$CAST" send "$LM" "recenter()" \ --private-key "$RECENTER_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 && break done LM_ETH=$("$CAST" balance "$LM" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g') LM_WETH=$("$CAST" call "$WETH" "balanceOf(address)(uint256)" "$LM" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g') log " LM after recenter: ETH=$LM_ETH WETH=$LM_WETH" # ── 4. Take Anvil snapshot (clean baseline) ───── log "Taking Anvil snapshot..." SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"') log " Snapshot ID: $SNAP" # Revert to the baseline snapshot on exit so subsequent runs start clean. CLAUDE_PID="" cleanup() { local rc=$? if [[ -n "${CLAUDE_PID:-}" ]]; then kill "$CLAUDE_PID" 2>/dev/null || true fi if [[ -n "${SNAP:-}" ]]; then "$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true fi rm -f "${PROMPT_FILE:-}" 2>/dev/null || true exit $rc } trap cleanup EXIT INT TERM # ── Helper: compute total ETH controlled by LM ──────────────────────────────── # Total = free ETH + free WETH + ETH locked in all 3 Uni V3 positions # This is the real metric: "can the adversary extract ETH from the protocol?" # Uses a forge script with exact Uni V3 integer math (LiquidityAmounts + TickMath) # instead of multiple cast calls + Python float approximation. compute_lm_total_eth() { local output result output=$(cd "$REPO_ROOT" && LM="$LM" WETH="$WETH" POOL="$POOL" \ "$FORGE" script onchain/script/LmTotalEth.s.sol \ --rpc-url "$RPC_URL" --root onchain 2>&1) # forge script prints "== Logs ==" then " " — extract the number. # Scan all lines after the marker so blank lines or warning lines don't corrupt the result. result=$(echo "$output" | awk '/^== Logs ==/{found=1; next} found && /^[[:space:]]*[0-9]+[[:space:]]*$/{gsub(/[[:space:]]/, ""); print; exit}') [[ -n "$result" && "$result" =~ ^[0-9]+$ ]] || die "Failed to read LM total ETH (forge output: $output)" echo "$result" } # ── Helper: extract strategy findings from stream-json and append to memory ──── extract_memory() { local stream_file="$1" local run_num memory_file="$MEMORY_FILE" # Determine run number: use max run in file + 1 so it stays monotonic after trim if [[ -f "$memory_file" ]]; then run_num=$(python3 - "$memory_file" <<'EOF' import json, sys entries = [json.loads(l) for l in open(sys.argv[1]) if l.strip()] print(max((e.get('run', 0) for e in entries), default=0) + 1) EOF ) [[ "$run_num" =~ ^[0-9]+$ ]] || run_num=1 else run_num=1 fi python3 - "$stream_file" "$memory_file" "$run_num" "$LM_ETH_BEFORE" "$CANDIDATE_NAME" "$OPTIMIZER_PROFILE" "$CROSS_PATTERNS_FILE" <<'PYEOF' import json, os, sys, re from datetime import datetime, timezone stream_file = sys.argv[1] memory_file = sys.argv[2] run_num = int(sys.argv[3]) try: lm_eth_before = int(sys.argv[4]) except (ValueError, IndexError): print(" extract_memory: invalid lm_eth_before value, skipping", file=sys.stderr) sys.exit(0) candidate = sys.argv[5] if len(sys.argv) > 5 else "unknown" optimizer_profile = sys.argv[6] if len(sys.argv) > 6 else "unknown" cross_file = sys.argv[7] if len(sys.argv) > 7 else None sweep_id = os.environ.get("SWEEP_ID", "unknown") def make_pattern(strategy_name, steps_text): """Extract abstract op sequence preserving execution order.""" text = (strategy_name + " " + steps_text).lower() op_positions = [] for kw, label in [("wrap", "wrap"), ("buy", "buy"), ("sell", "sell")]: m = re.search(r'\b' + kw + r'\b', text) if m: op_positions.append((m.start(), label)) # Use word boundaries so 'stake' never matches inside 'unstake' m_stake = re.search(r'\bstake\b', text) if m_stake: ctx = text[max(0, m_stake.start() - 10):m_stake.start() + 20] op_positions.append((m_stake.start(), "stake_all" if "all" in ctx else "stake")) m_unstake = re.search(r'\bunstake\b', text) if m_unstake: op_positions.append((m_unstake.start(), "unstake")) recenter_matches = list(re.finditer(r'\brecenter\b', text)) if recenter_matches: label = "recenter" if len(recenter_matches) == 1 else "recenter_multi" op_positions.append((recenter_matches[0].start(), label)) # add_lp: keyword or mint + LP context m = re.search(r'\badd_lp\b', text) if m: op_positions.append((m.start(), "add_lp")) elif re.search(r'\bmint\b', text) and ("lp" in text or "liquidity" in text): m = re.search(r'\bmint\b', text) op_positions.append((m.start(), "add_lp")) # remove_lp: keyword or decreaseliquidity for pat in [r'\bremove_lp\b', r'\bdecreaseliquidity\b']: m = re.search(pat, text) if m: op_positions.append((m.start(), "remove_lp")) break # Sort by first occurrence position to reflect actual execution order op_positions.sort(key=lambda x: x[0]) seen = set() ops = [] for _, label in op_positions: if label not in seen: seen.add(label) ops.append(label) return " → ".join(ops) if ops else strategy_name[:60] texts = [] with open(stream_file) as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) if obj.get("type") == "assistant": for block in obj.get("message", {}).get("content", []): if block.get("type") == "text": texts.append(block["text"]) except: pass # Parse strategies from agent text strategies = [] current = None for text in texts: # Detect strategy headers: matches "## Strategy 1: name" and "STRATEGY 1: name" strat_match = re.search(r"(?:##\s*)?[Ss][Tt][Rr][Aa][Tt][Ee][Gg][Yy]\s*\d+[^:]*:\s*(.+)", text) if strat_match: if current: strategies.append(current) current = { "strategy": strat_match.group(1).strip(), "steps": "", "lm_eth_after": None, "insight": "", "insight_pri": 999 # tracks priority of stored insight; lower index wins } if current: # Capture lm_eth_after only from the structured final-report label # ("lm_eth_after: wei"). Mid-execution "Total LM ETH: X wei" # lines are deliberately excluded: they reflect intermediate chain state # (e.g. after staking before revert) and must not be recorded as the # confirmed post-strategy ETH balance. floor_matches = list(re.finditer(r"lm_eth_after\s*:\s*(\d+)", text, re.IGNORECASE)) if floor_matches: current["lm_eth_after"] = int(floor_matches[-1].group(1)) # Capture insights — prefer explicit labels; only overwrite if new match is higher priority for pri, ins_pat in enumerate([ r"[Kk]ey [Ii]nsight:\s*(.+)", r"[Ii]nsight:\s*(.+)", r"[Ww][Hh][Yy][^:]*:\s*(.{30,})", r"(?:because|since|due to)\s+(.{30,})", r"(?:discovered|learned|realized)\s+(?:that\s+)?(.+)" ]): if pri >= current["insight_pri"]: break # already have a higher-priority insight stored insight_match = re.search(ins_pat, text) if insight_match and len(insight_match.group(1)) > 20: current["insight"] = insight_match.group(1).strip()[:300] current["insight_pri"] = pri break # Capture step summaries if any(word in text.lower() for word in ["wrap", "buy", "sell", "stake", "recenter", "mint", "approve"]): if len(text) < 200: current["steps"] += text.strip() + "; " if current: strategies.append(current) # Write to memory file ts = datetime.now(timezone.utc).isoformat() with open(memory_file, "a") as f: for s in strategies: fa = s["lm_eth_after"] if s.get("lm_eth_after") is not None else lm_eth_before delta_bps = round((fa - lm_eth_before) * 10000 / lm_eth_before) if lm_eth_before else 0 if fa < lm_eth_before: result = "DECREASED" elif fa > lm_eth_before: result = "INCREASED" else: result = "HELD" pattern = make_pattern(s["strategy"], s["steps"]) entry = { "run": run_num, "ts": ts, "candidate": candidate, "optimizer_profile": optimizer_profile, "strategy": s["strategy"][:100], "pattern": pattern[:150], "steps": s["steps"][:300].rstrip("; "), "lm_eth_before": lm_eth_before, "lm_eth_after": fa, "delta_bps": delta_bps, "result": result, "insight": s["insight"][:300] } f.write(json.dumps(entry) + "\n") print(f" Recorded: {entry['strategy']} [{entry['candidate']}] → {result} ({delta_bps:+d} bps)") if not strategies: print(" No strategies detected in stream output") # Trim memory file: keep 10 most recent + all DECREASED entries (cap at 50) with open(memory_file) as f: all_entries = [json.loads(l) for l in f if l.strip()] if len(all_entries) > 50: # Keep all DECREASED entries + 10 most recent; deduplicate preserving order trimmed = [e for e in all_entries if e.get("result") == "DECREASED"] + all_entries[-10:] seen = set() deduped = [] for e in trimmed: # 3-tuple key: run+ts uniquely identifies the extract_memory call; strategy # distinguishes entries within the same call. Matches step-4c's identity check. key = (e.get("run"), e.get("ts"), e.get("strategy")) if key not in seen: seen.add(key) deduped.append(e) # Export entries that would be dropped to cross-patterns before discarding them if cross_file: kept_keys = {(e.get("run"), e.get("ts"), e.get("strategy")) for e in deduped} dropped = [e for e in all_entries if (e.get("run"), e.get("ts"), e.get("strategy")) not in kept_keys] if dropped: existing_cross_keys = set() try: with open(cross_file) as cf: for line in cf: line = line.strip() if line: try: ce = json.loads(line) existing_cross_keys.add((ce.get("pattern", ""), ce.get("candidate", ""), ce.get("result", ""))) except Exception: pass except FileNotFoundError: pass try: exported = 0 with open(cross_file, "a") as cf: for e in dropped: key = (e.get("pattern", ""), e.get("candidate", ""), e.get("result", "")) if key not in existing_cross_keys: existing_cross_keys.add(key) e.setdefault("sweep_id", sweep_id) cf.write(json.dumps(e) + "\n") exported += 1 if exported: print(f" Pre-trim export: {exported} dropped entr{'y' if exported == 1 else 'ies'} saved to cross-patterns") except Exception as ex: print(f" WARNING: pre-trim export failed: {ex}", file=sys.stderr) with open(memory_file, "w") as f: for e in deduped: f.write(json.dumps(e) + "\n") print(f" Trimmed memory to {len(deduped)} entries") PYEOF } # ── 5. Read lm_eth_before ─────────────────────────────────────────────────────── log "Reading floor before agent run..." LM_ETH_BEFORE=$(compute_lm_total_eth) log " lm_eth_before = $LM_ETH_BEFORE wei" # ── 5a. Run attack catalogue (structured suite) ────────────────────────────── # Loop through every existing .jsonl attack file in the attacks directory, # replay each through AttackRunner.s.sol, record LM total ETH before/after, # and revert to the baseline snapshot between files so attacks are independent. ATTACK_DIR="${ATTACK_DIR:-$REPO_ROOT/onchain/script/backtesting/attacks}" ATTACK_SUITE_RESULTS="" ATTACK_SUITE_COUNT=0 if [[ -d "$ATTACK_DIR" ]]; then mapfile -t ATTACK_FILES < <(find "$ATTACK_DIR" -maxdepth 1 -name '*.jsonl' -type f | sort) if [[ ${#ATTACK_FILES[@]} -gt 0 ]]; then log "Running attack catalogue (${#ATTACK_FILES[@]} files in $ATTACK_DIR)..." ATTACK_SUITE_RESULTS="## Attack Catalogue Results (pre-run structured suite) These attacks were replayed from the known catalogue before your session. Do NOT repeat these strategies. Focus on novel approaches instead. " for attack_file in "${ATTACK_FILES[@]}"; do attack_name=$(basename "$attack_file" .jsonl) log " Running attack: $attack_name ..." # Record LM ETH before this attack suite_eth_before=$(compute_lm_total_eth) # Run AttackRunner set +e suite_output=$(cd "$REPO_ROOT/onchain" && \ ATTACK_FILE="$attack_file" \ DEPLOYMENTS_FILE="deployments-local.json" \ SWAP_ROUTER="$SWAP_ROUTER" \ NPM_ADDR="$NPM" \ "$FORGE" script script/backtesting/AttackRunner.s.sol \ --rpc-url "$RPC_URL" --broadcast 2>&1) suite_exit=$? set -e # Record LM ETH after this attack if [[ $suite_exit -eq 0 ]]; then suite_eth_after=$(compute_lm_total_eth) suite_delta_bps=$(python3 -c " b=int('$suite_eth_before'); a=int('$suite_eth_after') print(round((a - b) * 10000 / b) if b else 0) ") if python3 -c "import sys; sys.exit(0 if int('$suite_eth_after') < int('$suite_eth_before') else 1)"; then suite_verdict="FLOOR_BROKEN" else suite_verdict="FLOOR_HELD" fi log " $attack_name: $suite_verdict (${suite_delta_bps} bps)" ATTACK_SUITE_RESULTS+="- **$attack_name**: $suite_verdict (delta: ${suite_delta_bps} bps, before: $suite_eth_before, after: $suite_eth_after) " else log " $attack_name: REPLAY_ERROR (exit $suite_exit)" ATTACK_SUITE_RESULTS+="- **$attack_name**: REPLAY_ERROR (forge exit $suite_exit) " fi ATTACK_SUITE_COUNT=$((ATTACK_SUITE_COUNT + 1)) # Revert to baseline snapshot so next attack starts from clean state "$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true # Re-take snapshot (anvil_revert is one-shot) SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"') done log "Attack catalogue complete: $ATTACK_SUITE_COUNT files processed" else log "No .jsonl files found in $ATTACK_DIR — skipping attack catalogue" fi else log "Attack directory not found ($ATTACK_DIR) — skipping attack catalogue" fi # ── 6. Build agent prompt ────────────────────────────────────────────────────── # ── 6a. Read Solidity source files (reflect the current candidate after inject) ─ ONCHAIN_SRC="$REPO_ROOT/onchain/src" SOL_LM=$(< "$ONCHAIN_SRC/LiquidityManager.sol") SOL_THREE_POS=$(< "$ONCHAIN_SRC/abstracts/ThreePositionStrategy.sol") SOL_OPTIMIZER=$(< "$ONCHAIN_SRC/Optimizer.sol") SOL_OPTIMIZERV3=$(< "$ONCHAIN_SRC/OptimizerV3.sol") SOL_VWAP=$(< "$ONCHAIN_SRC/VWAPTracker.sol") SOL_PRICE_ORACLE=$(< "$ONCHAIN_SRC/abstracts/PriceOracle.sol") SOL_KRAIKEN=$(< "$ONCHAIN_SRC/Kraiken.sol") SOL_STAKE=$(< "$ONCHAIN_SRC/Stake.sol") # Build Previous Findings section from memory file MEMORY_SECTION="" if [[ -f "$MEMORY_FILE" && -s "$MEMORY_FILE" ]]; then MEMORY_SECTION=$(python3 - "$MEMORY_FILE" <<'PYEOF' import json, sys from collections import defaultdict entries = [] with open(sys.argv[1]) as f: for line in f: line = line.strip() if line: entries.append(json.loads(line)) if not entries: sys.exit(0) print('## Previous Findings (from earlier runs)') print() print('DO NOT repeat strategies marked HELD or INCREASED. Build on the insights.') print('Distinguish optimizer-specific vulnerabilities from universal patterns.') print('Try NEW combinations not yet attempted. Combine tools creatively.') print() # Cross-candidate: patterns that DECREASED in multiple distinct candidates decreased = [e for e in entries if e.get('result') == 'DECREASED'] cross = defaultdict(set) for e in decreased: key = e.get('pattern') or e.get('strategy', '') cross[key].add(e.get('candidate', 'unknown')) universal = [(p, cands) for p, cands in cross.items() if len(cands) > 1] if universal: print('### Universal Patterns (succeeded across multiple candidates)') for pat, cands in universal: print(f"- **{pat}** — worked on: {', '.join(sorted(cands))}") print() # Group remaining entries by candidate by_candidate = defaultdict(list) for e in entries: by_candidate[e.get('candidate', 'unknown')].append(e) for cand, cand_entries in sorted(by_candidate.items()): prof = next((e.get('optimizer_profile', '') for e in cand_entries if e.get('optimizer_profile', '') not in ('', 'unknown')), '') print(f"### Candidate: {cand}") if prof: print(f"Profile: {prof}") print() for e in cand_entries: r = e.get('result', '?') emoji = '❌' if r == 'DECREASED' else '⬆️' if r == 'INCREASED' else '➡️' pat = e.get('pattern', '') print(f"#### Run {e.get('run','?')}: {e.get('strategy','?')} {emoji} {r}") if pat: print(f"Pattern: `{pat}`") print(f"Steps: {e.get('steps','?')}") print(f"Delta: {e.get('delta_bps',0)} bps") if e.get('insight'): print(f"**Insight:** {e['insight']}") print() PYEOF ) fi # Build Cross-Candidate Intelligence section from the cross-patterns file CROSS_CANDIDATE_SECTION="" if [[ -f "$CROSS_PATTERNS_FILE" && -s "$CROSS_PATTERNS_FILE" ]]; then CROSS_CANDIDATE_SECTION=$(python3 - "$CROSS_PATTERNS_FILE" "$CANDIDATE_NAME" <<'PYEOF' import json, sys from collections import defaultdict cross_file = sys.argv[1] current_candidate = sys.argv[2] if len(sys.argv) > 2 else "" entries = [] with open(cross_file) as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except Exception: pass if not entries: sys.exit(0) # Exclude entries from the current candidate (they are cross-candidate evidence, not self-evidence) entries = [e for e in entries if e.get("candidate", "unknown") != current_candidate] # Group by abstract pattern; track worked/failed per candidate by_pattern = defaultdict(lambda: {"worked": {}, "failed": {}, "insight": ""}) for e in entries: pat = e.get("pattern", "") or e.get("strategy", "")[:80] if not pat: continue # skip entries with no identifiable pattern cand = e.get("candidate", "unknown") prof = e.get("optimizer_profile", "unknown") result = e.get("result", "HELD") insight = e.get("insight", "") if result == "DECREASED": by_pattern[pat]["worked"][cand] = prof else: by_pattern[pat]["failed"][cand] = prof if insight and not by_pattern[pat]["insight"]: by_pattern[pat]["insight"] = insight universal = [(p, d) for p, d in by_pattern.items() if len(d["worked"]) > 1] candidate_specific = [(p, d) for p, d in by_pattern.items() if len(d["worked"]) == 1] failed_all = [(p, d) for p, d in by_pattern.items() if not d["worked"] and d["failed"]] print("## Cross-Candidate Intelligence") print() print("Attack patterns learned across all previously tested candidates.") print("Exploit successes. Avoid repeating patterns that universally failed.") print() def fmt_cand(cand, prof): return f"{cand} ({prof})" if prof and prof not in ("", "unknown") else cand if universal: print("### Universal Patterns (succeeded on 2+ candidates)") for pat, d in sorted(universal, key=lambda x: -len(x[1]["worked"])): worked_str = ", ".join(fmt_cand(c, p) for c, p in sorted(d["worked"].items())) print(f"- `{pat}` — **BROKE** on: {worked_str}") if d["failed"]: failed_str = ", ".join(d["failed"]) print(f" Held on: {failed_str}") if d["insight"]: print(f" Insight: {d['insight']}") print() if candidate_specific: print("### Candidate-Specific Patterns (broke exactly one candidate)") for pat, d in candidate_specific: worked_cand, worked_prof = next(iter(d["worked"].items())) print(f"- `{pat}` — **BROKE** on: {fmt_cand(worked_cand, worked_prof)}") if d["failed"]: print(f" Held on: {', '.join(d['failed'])}") if d["insight"]: print(f" Insight: {d['insight']}") print() if failed_all: print("### Patterns That Held Across All Candidates Tried") for pat, d in failed_all: print(f"- `{pat}` — held on: {', '.join(d['failed'])}") print() PYEOF ) fi PROMPT=$(cat "$SCRIPT_DIR/red-team-program.md") PROMPT=${PROMPT//\{\{LM_ETH_BEFORE\}\}/$LM_ETH_BEFORE} PROMPT=${PROMPT//\{\{CANDIDATE_NAME\}\}/$CANDIDATE_NAME} PROMPT=${PROMPT//\{\{OPTIMIZER_PROFILE\}\}/$OPTIMIZER_PROFILE} PROMPT=${PROMPT//\{\{KRK\}\}/$KRK} PROMPT=${PROMPT//\{\{STAKE\}\}/$STAKE} PROMPT=${PROMPT//\{\{LM\}\}/$LM} PROMPT=${PROMPT//\{\{OPT\}\}/$OPT} PROMPT=${PROMPT//\{\{POOL\}\}/$POOL} PROMPT=${PROMPT//\{\{NPM\}\}/$NPM} PROMPT=${PROMPT//\{\{WETH\}\}/$WETH} PROMPT=${PROMPT//\{\{SWAP_ROUTER\}\}/$SWAP_ROUTER} PROMPT=${PROMPT//\{\{ADV_ADDR\}\}/$ADV_ADDR} PROMPT=${PROMPT//\{\{ADV_PK\}\}/$ADV_PK} PROMPT=${PROMPT//\{\{RECENTER_ADDR\}\}/$RECENTER_ADDR} PROMPT=${PROMPT//\{\{RECENTER_PK\}\}/$RECENTER_PK} PROMPT=${PROMPT//\{\{POOL_FEE\}\}/$POOL_FEE} PROMPT=${PROMPT//\{\{SOL_LM\}\}/$SOL_LM} PROMPT=${PROMPT//\{\{SOL_THREE_POS\}\}/$SOL_THREE_POS} PROMPT=${PROMPT//\{\{SOL_OPTIMIZER\}\}/$SOL_OPTIMIZER} PROMPT=${PROMPT//\{\{SOL_OPTIMIZERV3\}\}/$SOL_OPTIMIZERV3} PROMPT=${PROMPT//\{\{SOL_VWAP\}\}/$SOL_VWAP} PROMPT=${PROMPT//\{\{SOL_PRICE_ORACLE\}\}/$SOL_PRICE_ORACLE} PROMPT=${PROMPT//\{\{SOL_KRAIKEN\}\}/$SOL_KRAIKEN} PROMPT=${PROMPT//\{\{SOL_STAKE\}\}/$SOL_STAKE} PROMPT=${PROMPT//\{\{ATTACK_SUITE_RESULTS\}\}/$ATTACK_SUITE_RESULTS} PROMPT=${PROMPT//\{\{CROSS_CANDIDATE_SECTION\}\}/$CROSS_CANDIDATE_SECTION} PROMPT=${PROMPT//\{\{MEMORY_SECTION\}\}/$MEMORY_SECTION} # ── 7. Create output directory and run the agent ─────────────────────────────── mkdir -p "$REPORT_DIR" mkdir -p "$(dirname "$MEMORY_FILE")" mkdir -p "$(dirname "$CROSS_PATTERNS_FILE")" log "Spawning Claude red-team agent (timeout: ${CLAUDE_TIMEOUT}s)..." log " Report will be written to: $REPORT" set +e # Write prompt to temp file to avoid "Argument list too long" (prompt can be 50KB+) PROMPT_FILE=$(mktemp /tmp/red-team-prompt-XXXXXX.md) printf '%s' "$PROMPT" > "$PROMPT_FILE" # Note: --verbose is required by the claude CLI when --output-format stream-json is used; # omitting it causes the CLI to exit with an error, producing an empty stream log. # Run synchronously — timeout handles kill, no need to background timeout "$CLAUDE_TIMEOUT" bash -c 'claude -p --dangerously-skip-permissions \ --verbose --output-format stream-json \ <"$1" >"$2" 2>&1' _ "$PROMPT_FILE" "$STREAM_LOG" AGENT_EXIT=$? CLAUDE_PID="" set -e if [[ $AGENT_EXIT -ne 0 ]]; then log "WARNING: claude exited with code $AGENT_EXIT — see $STREAM_LOG for details" fi # Extract readable text from stream-json for the report python3 - "$STREAM_LOG" >"$REPORT" <<'PYEOF' import json, sys with open(sys.argv[1]) as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) if obj.get("type") == "assistant": for block in obj.get("message", {}).get("content", []): if block.get("type") == "text": print(block["text"], end="") except: pass PYEOF # If the agent crashed and produced no readable output, treat as an infra error # rather than silently reporting ETH SAFE (a false pass). if [[ $AGENT_EXIT -ne 0 && ! -s "$REPORT" ]]; then die "claude agent failed (exit $AGENT_EXIT) with no readable output — see $STREAM_LOG" fi # ── 8. Read lm_eth_after ──────────────────────────────────────────────────────── log "Reading floor after agent run..." LM_ETH_AFTER=$(compute_lm_total_eth) # ── 8a. Extract and persist strategy findings ────────────────────────────────── log "Extracting strategy findings from agent output..." extract_memory "$STREAM_LOG" log " lm_eth_after = $LM_ETH_AFTER wei" # ── 8b. Export attack sequence and replay with AttackRunner ──────────────────── # Converts the agent's cast send commands to structured JSONL and replays them # via AttackRunner.s.sol to capture full state snapshots for optimizer training. log "Exporting attack sequence from stream log..." set +e python3 "$REPO_ROOT/scripts/harb-evaluator/export-attacks.py" \ "$STREAM_LOG" "$ATTACK_EXPORT" 2>&1 | while IFS= read -r line; do log " $line"; done EXPORT_EXIT=${PIPESTATUS[0]} set -e if [[ $EXPORT_EXIT -eq 0 && -f "$ATTACK_EXPORT" && -s "$ATTACK_EXPORT" ]]; then log " Attack export: $ATTACK_EXPORT" log " Replaying attack sequence with AttackRunner for state snapshots..." set +e (cd "$REPO_ROOT/onchain" && \ ATTACK_FILE="$ATTACK_EXPORT" \ DEPLOYMENTS_FILE="deployments-local.json" \ SWAP_ROUTER="$SWAP_ROUTER" \ NPM_ADDR="$NPM" \ "$FORGE" script script/backtesting/AttackRunner.s.sol \ --rpc-url "$RPC_URL" --broadcast 2>&1 \ | grep '^{' >"$ATTACK_SNAPSHOTS") REPLAY_EXIT=$? set -e if [[ $REPLAY_EXIT -eq 0 && -s "$ATTACK_SNAPSHOTS" ]]; then SNAPSHOT_COUNT=$(wc -l <"$ATTACK_SNAPSHOTS") log " AttackRunner replay complete: $SNAPSHOT_COUNT snapshots → $ATTACK_SNAPSHOTS" else log " WARNING: AttackRunner replay produced no snapshots (exit $REPLAY_EXIT) — non-fatal" fi # Revert to the clean baseline after replay so the floor check below is unaffected. "$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true # Re-take the snapshot so cleanup trap still has a valid ID to revert. SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"') else log " WARNING: No attack operations exported from stream — skipping AttackRunner replay" fi # ── 9. Summarise results ─────────────────────────────────────────────────────── log "" log "=== RED-TEAM SUMMARY ===" log "" log " lm_eth_before : $LM_ETH_BEFORE wei" log " lm_eth_after : $LM_ETH_AFTER wei" log "" BROKE=false if python3 -c "import sys; sys.exit(0 if int('${LM_ETH_AFTER:-0}') < int('${LM_ETH_BEFORE:-0}') else 1)"; then BROKE=true fi # ── 9a-pre. Write structured evidence JSON ────────────────────────────────── EVIDENCE_DIR="$REPO_ROOT/evidence/red-team" EVIDENCE_DATE=$(date -u +%Y-%m-%d) EVIDENCE_FILE="$EVIDENCE_DIR/$EVIDENCE_DATE.json" mkdir -p "$EVIDENCE_DIR" if [[ "$BROKE" == "true" ]]; then _verdict="floor_broken" _floor_held="false" _eth_extracted=$(python3 -c "print(int('${LM_ETH_BEFORE:-0}') - int('${LM_ETH_AFTER:-0}'))") else _verdict="floor_held" _floor_held="true" _eth_extracted=0 fi python3 - "$EVIDENCE_FILE" "$REPO_ROOT/tmp/red-team-memory.jsonl" \ "$EVIDENCE_DATE" "$CANDIDATE_NAME" "$CANDIDATE_COMMIT" "$OPTIMIZER_PROFILE" \ "$LM_ETH_BEFORE" "$LM_ETH_AFTER" "$_eth_extracted" "$_floor_held" "$_verdict" \ "$ATTACK_SUITE_COUNT" <<'PYEOF' import json, sys, os evidence_file = sys.argv[1] memory_file = sys.argv[2] date = sys.argv[3] candidate = sys.argv[4] candidate_commit = sys.argv[5] optimizer_profile = sys.argv[6] lm_eth_before = int(sys.argv[7]) if sys.argv[7].isdigit() else 0 lm_eth_after = int(sys.argv[8]) if sys.argv[8].isdigit() else 0 eth_extracted = int(sys.argv[9]) if sys.argv[9].isdigit() else 0 floor_held = sys.argv[10].lower() == "true" verdict = sys.argv[11] attack_suite_count = int(sys.argv[12]) if len(sys.argv) > 12 and sys.argv[12].isdigit() else 0 # Build attacks list from memory entries for this candidate attacks = [] if os.path.isfile(memory_file) and os.path.getsize(memory_file) > 0: with open(memory_file) as f: for line in f: line = line.strip() if not line: continue try: e = json.loads(line) if e.get("candidate") != candidate: continue attacks.append({ "strategy": e.get("strategy", ""), "pattern": e.get("pattern", ""), "result": e.get("result", "HELD"), "delta_bps": e.get("delta_bps", 0), "insight": e.get("insight", ""), }) except Exception: pass evidence = { "date": date, "candidate": candidate, "candidate_commit": candidate_commit, "optimizer_profile": optimizer_profile, "lm_eth_before": lm_eth_before, "lm_eth_after": lm_eth_after, "eth_extracted": eth_extracted, "floor_held": floor_held, "verdict": verdict, "attacks": attacks, "attack_suite_count": attack_suite_count, } with open(evidence_file, "w") as f: json.dump(evidence, f, indent=2) f.write("\n") print(f" Evidence written to {evidence_file}") PYEOF log "Evidence file: $EVIDENCE_FILE" if [[ "$BROKE" == "true" ]]; then DELTA=$(python3 -c "print(int('${LM_ETH_BEFORE:-0}') - int('${LM_ETH_AFTER:-0}'))") log " RESULT: ETH EXTRACTED ❌" log " Decrease: $DELTA wei" log "" log " See $REPORT for the winning strategy." log "" # Append a machine-readable summary to the report cat >>"$REPORT" <&1 | while IFS= read -r line; do log " $line"; done PROMOTE_EXIT="${PIPESTATUS[0]}" set -e if [[ "$PROMOTE_EXIT" -ne 0 ]]; then log " WARNING: promote-attacks.sh exited with code $PROMOTE_EXIT — PR was not created" fi fi exit 1 else log " RESULT: ETH SAFE ✅" log "" log " See $REPORT for strategies attempted." log "" cat >>"$REPORT" <