harb/scripts/harb-evaluator/red-team.sh
johba 52ba6b2f38 fix: run-attack-suite is spec-only — no implementation in red-team.sh (#1000)
Implement the attack catalogue loop (step 5a) in red-team.sh that was
previously a forward spec in the formula. The loop replays every *.jsonl
attack file through AttackRunner.s.sol with snapshot revert between files,
records LM total ETH before/after each attack, and injects results into
the adversarial agent prompt so it knows which strategies are already
catalogued.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 15:30:46 +00:00

958 lines
39 KiB
Bash
Executable file

#!/usr/bin/env bash
# red-team.sh — Adversarial floor-attack agent runner.
#
# Spawns a Claude sub-agent with tools and a goal: make ethPerToken() decrease.
# The agent iterates freely — snapshot → strategy → check floor → revert → repeat.
#
# Usage: red-team.sh
#
# Exit codes:
# 0 floor held (no confirmed decrease)
# 1 floor broken (agent found a strategy that decreased ethPerToken)
# 2 infra error (stack not running, missing dependency, etc.)
#
# Environment overrides:
# CLAUDE_TIMEOUT seconds for the agent run (default: 7200)
# RPC_URL Anvil RPC endpoint (default: http://localhost:8545)
set -euo pipefail
CAST=/home/debian/.foundry/bin/cast
FORGE=/home/debian/.foundry/bin/forge
RPC_URL="${RPC_URL:-http://localhost:8545}"
CLAUDE_TIMEOUT="${CLAUDE_TIMEOUT:-7200}"
REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)"
REPORT_DIR="$REPO_ROOT/tmp"
REPORT="$REPORT_DIR/red-team-report.txt"
STREAM_LOG="$REPORT_DIR/red-team-stream.jsonl"
MEMORY_FILE="$REPO_ROOT/tmp/red-team-memory.jsonl"
CROSS_PATTERNS_FILE="$REPO_ROOT/tools/red-team/cross-patterns.jsonl"
ATTACK_EXPORT="$REPORT_DIR/red-team-attacks.jsonl"
ATTACK_SNAPSHOTS="$REPORT_DIR/red-team-snapshots.jsonl"
DEPLOYMENTS="$REPO_ROOT/onchain/deployments-local.json"
# ── Candidate metadata (set by red-team-sweep.sh; defaults to unknown for standalone runs) ─
CANDIDATE_NAME="${CANDIDATE_NAME:-unknown}"
OPTIMIZER_PROFILE="${OPTIMIZER_PROFILE:-unknown}"
CANDIDATE_COMMIT="$(git -C "$REPO_ROOT" rev-parse HEAD 2>/dev/null || echo "unknown")"
# ── Anvil accounts ─────────────────────────────────────────────────────────────
# Account 8 — adversary (10k ETH, 0 KRK)
ADV_PK=0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97
# Account 2 — recenter caller (recenter is public, any account can call)
RECENTER_PK=0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a
# ── Infrastructure constants ───────────────────────────────────────────────────
WETH=0x4200000000000000000000000000000000000006
# SwapRouter02 and NonfungiblePositionManager — resolved by detect_periphery() after Anvil is verified
SWAP_ROUTER_SEPOLIA=0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4
SWAP_ROUTER_MAINNET=0x2626664c2603336E57B271c5C0b26F421741e481
NPM_SEPOLIA=0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2
NPM_MAINNET=0x03a520B32c04bf3beef7BEb72E919cF822Ed34F3
SWAP_ROUTER=""
NPM=""
POOL_FEE=10000
# Detect chain ID and select the correct periphery addresses (mirrors bootstrap-common.sh).
# Must be called after Anvil is verified to be accessible.
detect_periphery() {
local chain_id
chain_id=$("$CAST" chain-id --rpc-url "$RPC_URL" 2>/dev/null || echo "")
if [[ "$chain_id" == "8453" ]]; then
SWAP_ROUTER="$SWAP_ROUTER_MAINNET"
NPM="$NPM_MAINNET"
log "Detected Base mainnet (chain ID 8453) — using mainnet periphery addresses"
else
SWAP_ROUTER="$SWAP_ROUTER_SEPOLIA"
NPM="$NPM_SEPOLIA"
log "Using Base Sepolia periphery addresses (chain ID: ${chain_id:-unknown})"
fi
}
# ── Logging helpers ────────────────────────────────────────────────────────────
log() { echo "[red-team] $*"; }
die() { echo "[red-team] ERROR: $*" >&2; exit 2; }
# ── Prerequisites ──────────────────────────────────────────────────────────────
command -v "$CAST" &>/dev/null || die "cast not found at $CAST"
command -v "$FORGE" &>/dev/null || die "forge not found at $FORGE"
command -v claude &>/dev/null || die "claude CLI not found (install: npm i -g @anthropic-ai/claude-code)"
command -v python3 &>/dev/null || die "python3 not found"
command -v jq &>/dev/null || die "jq not found"
# ── 1. Fresh stack via bootstrap-light ─────────────────────────────────────────
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
log "Running bootstrap-light ..."
bash "$SCRIPT_DIR/bootstrap-light.sh" || die "bootstrap-light failed"
# Verify Anvil responds
"$CAST" chain-id --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|| die "Anvil not accessible at $RPC_URL after bootstrap-light"
# Select network-appropriate periphery addresses
detect_periphery
# ── 2. Read contract addresses ─────────────────────────────────────────────────
[[ -f "$DEPLOYMENTS" ]] || die "deployments-local.json not found at $DEPLOYMENTS (bootstrap not complete)"
KRK=$(jq -r '.contracts.Kraiken' "$DEPLOYMENTS")
STAKE=$(jq -r '.contracts.Stake' "$DEPLOYMENTS")
LM=$(jq -r '.contracts.LiquidityManager' "$DEPLOYMENTS")
OPT=$(jq -r '.contracts.OptimizerProxy' "$DEPLOYMENTS")
V3_FACTORY=$(jq -r '.contracts.V3Factory' "$DEPLOYMENTS")
POOL=$(jq -r '.contracts.Pool' "$DEPLOYMENTS")
for var in KRK STAKE LM OPT V3_FACTORY POOL; do
val="${!var}"
[[ -n "$val" && "$val" != "null" ]] \
|| die "$var address missing from deployments-local.json — was bootstrap successful?"
done
log " KRK: $KRK"
log " STAKE: $STAKE"
log " LM: $LM"
log " OPT: $OPT"
log " V3_FACTORY: $V3_FACTORY"
log " Pool: $POOL"
# Derive Anvil account addresses from their private keys
ADV_ADDR=$("$CAST" wallet address --private-key "$ADV_PK")
RECENTER_ADDR=$("$CAST" wallet address --private-key "$RECENTER_PK")
log " Adversary: $ADV_ADDR (account 8)"
log " Recenter: $RECENTER_ADDR (account 2)"
# ── 3a. recenter() is now public (no recenterAccess needed) ──
# Any address can call recenter() — TWAP oracle enforces safety.
log "recenter() is public — no access grant needed"
# ── 3b. Set feeDestination to LM itself (fees accrue as liquidity) ─────────────
# setFeeDestination allows repeated EOA sets; setting to a contract locks it permanently.
# The deployer (Anvil account 0) deployed LiquidityManager and may call setFeeDestination again.
# DEPLOYER_PK is Anvil's deterministic account-0 key — valid ONLY against a local ephemeral
# Anvil instance. Never run this script against a non-ephemeral or shared-state chain.
DEPLOYER_PK=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
log "Setting feeDestination to LM ($LM) ..."
"$CAST" send --rpc-url "$RPC_URL" --private-key "$DEPLOYER_PK" \
"$LM" "setFeeDestination(address)" "$LM" >/dev/null 2>&1 \
|| die "setFeeDestination($LM) failed"
VERIFY=$("$CAST" call "$LM" "feeDestination()(address)" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g')
log " feeDestination set to: $VERIFY"
[[ "${VERIFY,,}" == "${LM,,}" ]] || die "feeDestination verification failed: expected $LM, got $VERIFY"
# ── 3c. Fund LM with 1000 ETH and deploy into positions via recenter ───────────
# Send ETH as WETH (LM uses WETH internally), then recenter to deploy into positions.
# Without recenter, the ETH sits idle and the first recenter mints massive KRK.
log "Funding LM with 1000 ETH ..."
# Wrap to WETH and transfer to LM
"$CAST" send "$WETH" "deposit()" --value 1000ether \
--private-key "$ADV_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|| die "Failed to wrap ETH"
"$CAST" send "$WETH" "transfer(address,uint256)" "$LM" 1000000000000000000000 \
--private-key "$ADV_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|| die "Failed to transfer WETH to LM"
# Recenter to deploy the new WETH into positions (establishes realistic baseline)
log "Recentering to deploy funded WETH into positions ..."
"$CAST" send "$LM" "recenter()" \
--private-key "$RECENTER_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|| log " WARNING: initial recenter failed (may need amplitude — mining blocks)"
# Advance time and mine blocks, then retry recenter
for _i in $(seq 1 3); do
"$CAST" rpc evm_increaseTime 600 --rpc-url "$RPC_URL" >/dev/null 2>&1
for _b in $(seq 1 50); do
"$CAST" rpc evm_mine --rpc-url "$RPC_URL" >/dev/null 2>&1
done
"$CAST" send "$LM" "recenter()" \
--private-key "$RECENTER_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 && break
done
LM_ETH=$("$CAST" balance "$LM" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g')
LM_WETH=$("$CAST" call "$WETH" "balanceOf(address)(uint256)" "$LM" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g')
log " LM after recenter: ETH=$LM_ETH WETH=$LM_WETH"
# ── 4. Take Anvil snapshot (clean baseline) ─────
log "Taking Anvil snapshot..."
SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"')
log " Snapshot ID: $SNAP"
# Revert to the baseline snapshot on exit so subsequent runs start clean.
CLAUDE_PID=""
cleanup() {
local rc=$?
if [[ -n "${CLAUDE_PID:-}" ]]; then
kill "$CLAUDE_PID" 2>/dev/null || true
fi
if [[ -n "${SNAP:-}" ]]; then
"$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true
fi
rm -f "${PROMPT_FILE:-}" 2>/dev/null || true
exit $rc
}
trap cleanup EXIT INT TERM
# ── Helper: compute total ETH controlled by LM ────────────────────────────────
# Total = free ETH + free WETH + ETH locked in all 3 Uni V3 positions
# This is the real metric: "can the adversary extract ETH from the protocol?"
# Uses a forge script with exact Uni V3 integer math (LiquidityAmounts + TickMath)
# instead of multiple cast calls + Python float approximation.
compute_lm_total_eth() {
local output result
output=$(cd "$REPO_ROOT" && LM="$LM" WETH="$WETH" POOL="$POOL" \
"$FORGE" script onchain/script/LmTotalEth.s.sol \
--rpc-url "$RPC_URL" --root onchain 2>&1)
# forge script prints "== Logs ==" then " <value>" — extract the number.
# Scan all lines after the marker so blank lines or warning lines don't corrupt the result.
result=$(echo "$output" | awk '/^== Logs ==/{found=1; next} found && /^[[:space:]]*[0-9]+[[:space:]]*$/{gsub(/[[:space:]]/, ""); print; exit}')
[[ -n "$result" && "$result" =~ ^[0-9]+$ ]] || die "Failed to read LM total ETH (forge output: $output)"
echo "$result"
}
# ── Helper: extract strategy findings from stream-json and append to memory ────
extract_memory() {
local stream_file="$1"
local run_num memory_file="$MEMORY_FILE"
# Determine run number: use max run in file + 1 so it stays monotonic after trim
if [[ -f "$memory_file" ]]; then
run_num=$(python3 - "$memory_file" <<'EOF'
import json, sys
entries = [json.loads(l) for l in open(sys.argv[1]) if l.strip()]
print(max((e.get('run', 0) for e in entries), default=0) + 1)
EOF
)
[[ "$run_num" =~ ^[0-9]+$ ]] || run_num=1
else
run_num=1
fi
python3 - "$stream_file" "$memory_file" "$run_num" "$LM_ETH_BEFORE" "$CANDIDATE_NAME" "$OPTIMIZER_PROFILE" "$CROSS_PATTERNS_FILE" <<'PYEOF'
import json, os, sys, re
from datetime import datetime, timezone
stream_file = sys.argv[1]
memory_file = sys.argv[2]
run_num = int(sys.argv[3])
try:
lm_eth_before = int(sys.argv[4])
except (ValueError, IndexError):
print(" extract_memory: invalid lm_eth_before value, skipping", file=sys.stderr)
sys.exit(0)
candidate = sys.argv[5] if len(sys.argv) > 5 else "unknown"
optimizer_profile = sys.argv[6] if len(sys.argv) > 6 else "unknown"
cross_file = sys.argv[7] if len(sys.argv) > 7 else None
sweep_id = os.environ.get("SWEEP_ID", "unknown")
def make_pattern(strategy_name, steps_text):
"""Extract abstract op sequence preserving execution order."""
text = (strategy_name + " " + steps_text).lower()
op_positions = []
for kw, label in [("wrap", "wrap"), ("buy", "buy"), ("sell", "sell")]:
m = re.search(r'\b' + kw + r'\b', text)
if m:
op_positions.append((m.start(), label))
# Use word boundaries so 'stake' never matches inside 'unstake'
m_stake = re.search(r'\bstake\b', text)
if m_stake:
ctx = text[max(0, m_stake.start() - 10):m_stake.start() + 20]
op_positions.append((m_stake.start(), "stake_all" if "all" in ctx else "stake"))
m_unstake = re.search(r'\bunstake\b', text)
if m_unstake:
op_positions.append((m_unstake.start(), "unstake"))
recenter_matches = list(re.finditer(r'\brecenter\b', text))
if recenter_matches:
label = "recenter" if len(recenter_matches) == 1 else "recenter_multi"
op_positions.append((recenter_matches[0].start(), label))
# add_lp: keyword or mint + LP context
m = re.search(r'\badd_lp\b', text)
if m:
op_positions.append((m.start(), "add_lp"))
elif re.search(r'\bmint\b', text) and ("lp" in text or "liquidity" in text):
m = re.search(r'\bmint\b', text)
op_positions.append((m.start(), "add_lp"))
# remove_lp: keyword or decreaseliquidity
for pat in [r'\bremove_lp\b', r'\bdecreaseliquidity\b']:
m = re.search(pat, text)
if m:
op_positions.append((m.start(), "remove_lp"))
break
# Sort by first occurrence position to reflect actual execution order
op_positions.sort(key=lambda x: x[0])
seen = set()
ops = []
for _, label in op_positions:
if label not in seen:
seen.add(label)
ops.append(label)
return " → ".join(ops) if ops else strategy_name[:60]
texts = []
with open(stream_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
if obj.get("type") == "assistant":
for block in obj.get("message", {}).get("content", []):
if block.get("type") == "text":
texts.append(block["text"])
except:
pass
# Parse strategies from agent text
strategies = []
current = None
for text in texts:
# Detect strategy headers: matches "## Strategy 1: name" and "STRATEGY 1: name"
strat_match = re.search(r"(?:##\s*)?[Ss][Tt][Rr][Aa][Tt][Ee][Gg][Yy]\s*\d+[^:]*:\s*(.+)", text)
if strat_match:
if current:
strategies.append(current)
current = {
"strategy": strat_match.group(1).strip(),
"steps": "",
"lm_eth_after": None,
"insight": "",
"insight_pri": 999 # tracks priority of stored insight; lower index wins
}
if current:
# Capture lm_eth_after only from the structured final-report label
# ("lm_eth_after: <value> wei"). Mid-execution "Total LM ETH: X wei"
# lines are deliberately excluded: they reflect intermediate chain state
# (e.g. after staking before revert) and must not be recorded as the
# confirmed post-strategy ETH balance.
floor_matches = list(re.finditer(r"lm_eth_after\s*:\s*(\d+)", text, re.IGNORECASE))
if floor_matches:
current["lm_eth_after"] = int(floor_matches[-1].group(1))
# Capture insights — prefer explicit labels; only overwrite if new match is higher priority
for pri, ins_pat in enumerate([
r"[Kk]ey [Ii]nsight:\s*(.+)",
r"[Ii]nsight:\s*(.+)",
r"[Ww][Hh][Yy][^:]*:\s*(.{30,})",
r"(?:because|since|due to)\s+(.{30,})",
r"(?:discovered|learned|realized)\s+(?:that\s+)?(.+)"
]):
if pri >= current["insight_pri"]:
break # already have a higher-priority insight stored
insight_match = re.search(ins_pat, text)
if insight_match and len(insight_match.group(1)) > 20:
current["insight"] = insight_match.group(1).strip()[:300]
current["insight_pri"] = pri
break
# Capture step summaries
if any(word in text.lower() for word in ["wrap", "buy", "sell", "stake", "recenter", "mint", "approve"]):
if len(text) < 200:
current["steps"] += text.strip() + "; "
if current:
strategies.append(current)
# Write to memory file
ts = datetime.now(timezone.utc).isoformat()
with open(memory_file, "a") as f:
for s in strategies:
fa = s["lm_eth_after"] if s.get("lm_eth_after") is not None else lm_eth_before
delta_bps = round((fa - lm_eth_before) * 10000 / lm_eth_before) if lm_eth_before else 0
if fa < lm_eth_before:
result = "DECREASED"
elif fa > lm_eth_before:
result = "INCREASED"
else:
result = "HELD"
pattern = make_pattern(s["strategy"], s["steps"])
entry = {
"run": run_num,
"ts": ts,
"candidate": candidate,
"optimizer_profile": optimizer_profile,
"strategy": s["strategy"][:100],
"pattern": pattern[:150],
"steps": s["steps"][:300].rstrip("; "),
"lm_eth_before": lm_eth_before,
"lm_eth_after": fa,
"delta_bps": delta_bps,
"result": result,
"insight": s["insight"][:300]
}
f.write(json.dumps(entry) + "\n")
print(f" Recorded: {entry['strategy']} [{entry['candidate']}] → {result} ({delta_bps:+d} bps)")
if not strategies:
print(" No strategies detected in stream output")
# Trim memory file: keep 10 most recent + all DECREASED entries (cap at 50)
with open(memory_file) as f:
all_entries = [json.loads(l) for l in f if l.strip()]
if len(all_entries) > 50:
# Keep all DECREASED entries + 10 most recent; deduplicate preserving order
trimmed = [e for e in all_entries if e.get("result") == "DECREASED"] + all_entries[-10:]
seen = set()
deduped = []
for e in trimmed:
# 3-tuple key: run+ts uniquely identifies the extract_memory call; strategy
# distinguishes entries within the same call. Matches step-4c's identity check.
key = (e.get("run"), e.get("ts"), e.get("strategy"))
if key not in seen:
seen.add(key)
deduped.append(e)
# Export entries that would be dropped to cross-patterns before discarding them
if cross_file:
kept_keys = {(e.get("run"), e.get("ts"), e.get("strategy")) for e in deduped}
dropped = [e for e in all_entries if (e.get("run"), e.get("ts"), e.get("strategy")) not in kept_keys]
if dropped:
existing_cross_keys = set()
try:
with open(cross_file) as cf:
for line in cf:
line = line.strip()
if line:
try:
ce = json.loads(line)
existing_cross_keys.add((ce.get("pattern", ""), ce.get("candidate", ""), ce.get("result", "")))
except Exception:
pass
except FileNotFoundError:
pass
try:
exported = 0
with open(cross_file, "a") as cf:
for e in dropped:
key = (e.get("pattern", ""), e.get("candidate", ""), e.get("result", ""))
if key not in existing_cross_keys:
existing_cross_keys.add(key)
e.setdefault("sweep_id", sweep_id)
cf.write(json.dumps(e) + "\n")
exported += 1
if exported:
print(f" Pre-trim export: {exported} dropped entr{'y' if exported == 1 else 'ies'} saved to cross-patterns")
except Exception as ex:
print(f" WARNING: pre-trim export failed: {ex}", file=sys.stderr)
with open(memory_file, "w") as f:
for e in deduped:
f.write(json.dumps(e) + "\n")
print(f" Trimmed memory to {len(deduped)} entries")
PYEOF
}
# ── 5. Read lm_eth_before ───────────────────────────────────────────────────────
log "Reading floor before agent run..."
LM_ETH_BEFORE=$(compute_lm_total_eth)
log " lm_eth_before = $LM_ETH_BEFORE wei"
# ── 5a. Run attack catalogue (structured suite) ──────────────────────────────
# Loop through every existing .jsonl attack file in the attacks directory,
# replay each through AttackRunner.s.sol, record LM total ETH before/after,
# and revert to the baseline snapshot between files so attacks are independent.
ATTACK_DIR="${ATTACK_DIR:-$REPO_ROOT/onchain/script/backtesting/attacks}"
ATTACK_SUITE_RESULTS=""
ATTACK_SUITE_COUNT=0
if [[ -d "$ATTACK_DIR" ]]; then
mapfile -t ATTACK_FILES < <(find "$ATTACK_DIR" -maxdepth 1 -name '*.jsonl' -type f | sort)
if [[ ${#ATTACK_FILES[@]} -gt 0 ]]; then
log "Running attack catalogue (${#ATTACK_FILES[@]} files in $ATTACK_DIR)..."
ATTACK_SUITE_RESULTS="## Attack Catalogue Results (pre-run structured suite)
These attacks were replayed from the known catalogue before your session.
Do NOT repeat these strategies. Focus on novel approaches instead.
"
for attack_file in "${ATTACK_FILES[@]}"; do
attack_name=$(basename "$attack_file" .jsonl)
log " Running attack: $attack_name ..."
# Record LM ETH before this attack
suite_eth_before=$(compute_lm_total_eth)
# Run AttackRunner
set +e
suite_output=$(cd "$REPO_ROOT/onchain" && \
ATTACK_FILE="$attack_file" \
DEPLOYMENTS_FILE="deployments-local.json" \
SWAP_ROUTER="$SWAP_ROUTER" \
NPM_ADDR="$NPM" \
"$FORGE" script script/backtesting/AttackRunner.s.sol \
--rpc-url "$RPC_URL" --broadcast 2>&1)
suite_exit=$?
set -e
# Record LM ETH after this attack
if [[ $suite_exit -eq 0 ]]; then
suite_eth_after=$(compute_lm_total_eth)
suite_delta_bps=$(python3 -c "
b=int('$suite_eth_before'); a=int('$suite_eth_after')
print(round((a - b) * 10000 / b) if b else 0)
")
if python3 -c "import sys; sys.exit(0 if int('$suite_eth_after') < int('$suite_eth_before') else 1)"; then
suite_verdict="FLOOR_BROKEN"
else
suite_verdict="FLOOR_HELD"
fi
log " $attack_name: $suite_verdict (${suite_delta_bps} bps)"
ATTACK_SUITE_RESULTS+="- **$attack_name**: $suite_verdict (delta: ${suite_delta_bps} bps, before: $suite_eth_before, after: $suite_eth_after)
"
else
log " $attack_name: REPLAY_ERROR (exit $suite_exit)"
ATTACK_SUITE_RESULTS+="- **$attack_name**: REPLAY_ERROR (forge exit $suite_exit)
"
fi
ATTACK_SUITE_COUNT=$((ATTACK_SUITE_COUNT + 1))
# Revert to baseline snapshot so next attack starts from clean state
"$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true
# Re-take snapshot (anvil_revert is one-shot)
SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"')
done
log "Attack catalogue complete: $ATTACK_SUITE_COUNT files processed"
else
log "No .jsonl files found in $ATTACK_DIR — skipping attack catalogue"
fi
else
log "Attack directory not found ($ATTACK_DIR) — skipping attack catalogue"
fi
# ── 6. Build agent prompt ──────────────────────────────────────────────────────
# ── 6a. Read Solidity source files (reflect the current candidate after inject) ─
ONCHAIN_SRC="$REPO_ROOT/onchain/src"
SOL_LM=$(< "$ONCHAIN_SRC/LiquidityManager.sol")
SOL_THREE_POS=$(< "$ONCHAIN_SRC/abstracts/ThreePositionStrategy.sol")
SOL_OPTIMIZER=$(< "$ONCHAIN_SRC/Optimizer.sol")
SOL_OPTIMIZERV3=$(< "$ONCHAIN_SRC/OptimizerV3.sol")
SOL_VWAP=$(< "$ONCHAIN_SRC/VWAPTracker.sol")
SOL_PRICE_ORACLE=$(< "$ONCHAIN_SRC/abstracts/PriceOracle.sol")
SOL_KRAIKEN=$(< "$ONCHAIN_SRC/Kraiken.sol")
SOL_STAKE=$(< "$ONCHAIN_SRC/Stake.sol")
# Build Previous Findings section from memory file
MEMORY_SECTION=""
if [[ -f "$MEMORY_FILE" && -s "$MEMORY_FILE" ]]; then
MEMORY_SECTION=$(python3 - "$MEMORY_FILE" <<'PYEOF'
import json, sys
from collections import defaultdict
entries = []
with open(sys.argv[1]) as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
if not entries:
sys.exit(0)
print('## Previous Findings (from earlier runs)')
print()
print('DO NOT repeat strategies marked HELD or INCREASED. Build on the insights.')
print('Distinguish optimizer-specific vulnerabilities from universal patterns.')
print('Try NEW combinations not yet attempted. Combine tools creatively.')
print()
# Cross-candidate: patterns that DECREASED in multiple distinct candidates
decreased = [e for e in entries if e.get('result') == 'DECREASED']
cross = defaultdict(set)
for e in decreased:
key = e.get('pattern') or e.get('strategy', '')
cross[key].add(e.get('candidate', 'unknown'))
universal = [(p, cands) for p, cands in cross.items() if len(cands) > 1]
if universal:
print('### Universal Patterns (succeeded across multiple candidates)')
for pat, cands in universal:
print(f"- **{pat}** — worked on: {', '.join(sorted(cands))}")
print()
# Group remaining entries by candidate
by_candidate = defaultdict(list)
for e in entries:
by_candidate[e.get('candidate', 'unknown')].append(e)
for cand, cand_entries in sorted(by_candidate.items()):
prof = next((e.get('optimizer_profile', '') for e in cand_entries
if e.get('optimizer_profile', '') not in ('', 'unknown')), '')
print(f"### Candidate: {cand}")
if prof:
print(f"Profile: {prof}")
print()
for e in cand_entries:
r = e.get('result', '?')
emoji = '❌' if r == 'DECREASED' else '⬆️' if r == 'INCREASED' else '➡️'
pat = e.get('pattern', '')
print(f"#### Run {e.get('run','?')}: {e.get('strategy','?')} {emoji} {r}")
if pat:
print(f"Pattern: `{pat}`")
print(f"Steps: {e.get('steps','?')}")
print(f"Delta: {e.get('delta_bps',0)} bps")
if e.get('insight'):
print(f"**Insight:** {e['insight']}")
print()
PYEOF
)
fi
# Build Cross-Candidate Intelligence section from the cross-patterns file
CROSS_CANDIDATE_SECTION=""
if [[ -f "$CROSS_PATTERNS_FILE" && -s "$CROSS_PATTERNS_FILE" ]]; then
CROSS_CANDIDATE_SECTION=$(python3 - "$CROSS_PATTERNS_FILE" "$CANDIDATE_NAME" <<'PYEOF'
import json, sys
from collections import defaultdict
cross_file = sys.argv[1]
current_candidate = sys.argv[2] if len(sys.argv) > 2 else ""
entries = []
with open(cross_file) as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except Exception:
pass
if not entries:
sys.exit(0)
# Exclude entries from the current candidate (they are cross-candidate evidence, not self-evidence)
entries = [e for e in entries if e.get("candidate", "unknown") != current_candidate]
# Group by abstract pattern; track worked/failed per candidate
by_pattern = defaultdict(lambda: {"worked": {}, "failed": {}, "insight": ""})
for e in entries:
pat = e.get("pattern", "") or e.get("strategy", "")[:80]
if not pat:
continue # skip entries with no identifiable pattern
cand = e.get("candidate", "unknown")
prof = e.get("optimizer_profile", "unknown")
result = e.get("result", "HELD")
insight = e.get("insight", "")
if result == "DECREASED":
by_pattern[pat]["worked"][cand] = prof
else:
by_pattern[pat]["failed"][cand] = prof
if insight and not by_pattern[pat]["insight"]:
by_pattern[pat]["insight"] = insight
universal = [(p, d) for p, d in by_pattern.items() if len(d["worked"]) > 1]
candidate_specific = [(p, d) for p, d in by_pattern.items() if len(d["worked"]) == 1]
failed_all = [(p, d) for p, d in by_pattern.items() if not d["worked"] and d["failed"]]
print("## Cross-Candidate Intelligence")
print()
print("Attack patterns learned across all previously tested candidates.")
print("Exploit successes. Avoid repeating patterns that universally failed.")
print()
def fmt_cand(cand, prof):
return f"{cand} ({prof})" if prof and prof not in ("", "unknown") else cand
if universal:
print("### Universal Patterns (succeeded on 2+ candidates)")
for pat, d in sorted(universal, key=lambda x: -len(x[1]["worked"])):
worked_str = ", ".join(fmt_cand(c, p) for c, p in sorted(d["worked"].items()))
print(f"- `{pat}` — **BROKE** on: {worked_str}")
if d["failed"]:
failed_str = ", ".join(d["failed"])
print(f" Held on: {failed_str}")
if d["insight"]:
print(f" Insight: {d['insight']}")
print()
if candidate_specific:
print("### Candidate-Specific Patterns (broke exactly one candidate)")
for pat, d in candidate_specific:
worked_cand, worked_prof = next(iter(d["worked"].items()))
print(f"- `{pat}` — **BROKE** on: {fmt_cand(worked_cand, worked_prof)}")
if d["failed"]:
print(f" Held on: {', '.join(d['failed'])}")
if d["insight"]:
print(f" Insight: {d['insight']}")
print()
if failed_all:
print("### Patterns That Held Across All Candidates Tried")
for pat, d in failed_all:
print(f"- `{pat}` — held on: {', '.join(d['failed'])}")
print()
PYEOF
)
fi
PROMPT=$(cat "$SCRIPT_DIR/red-team-program.md")
PROMPT=${PROMPT//\{\{LM_ETH_BEFORE\}\}/$LM_ETH_BEFORE}
PROMPT=${PROMPT//\{\{CANDIDATE_NAME\}\}/$CANDIDATE_NAME}
PROMPT=${PROMPT//\{\{OPTIMIZER_PROFILE\}\}/$OPTIMIZER_PROFILE}
PROMPT=${PROMPT//\{\{KRK\}\}/$KRK}
PROMPT=${PROMPT//\{\{STAKE\}\}/$STAKE}
PROMPT=${PROMPT//\{\{LM\}\}/$LM}
PROMPT=${PROMPT//\{\{OPT\}\}/$OPT}
PROMPT=${PROMPT//\{\{POOL\}\}/$POOL}
PROMPT=${PROMPT//\{\{NPM\}\}/$NPM}
PROMPT=${PROMPT//\{\{WETH\}\}/$WETH}
PROMPT=${PROMPT//\{\{SWAP_ROUTER\}\}/$SWAP_ROUTER}
PROMPT=${PROMPT//\{\{ADV_ADDR\}\}/$ADV_ADDR}
PROMPT=${PROMPT//\{\{ADV_PK\}\}/$ADV_PK}
PROMPT=${PROMPT//\{\{RECENTER_ADDR\}\}/$RECENTER_ADDR}
PROMPT=${PROMPT//\{\{RECENTER_PK\}\}/$RECENTER_PK}
PROMPT=${PROMPT//\{\{POOL_FEE\}\}/$POOL_FEE}
PROMPT=${PROMPT//\{\{SOL_LM\}\}/$SOL_LM}
PROMPT=${PROMPT//\{\{SOL_THREE_POS\}\}/$SOL_THREE_POS}
PROMPT=${PROMPT//\{\{SOL_OPTIMIZER\}\}/$SOL_OPTIMIZER}
PROMPT=${PROMPT//\{\{SOL_OPTIMIZERV3\}\}/$SOL_OPTIMIZERV3}
PROMPT=${PROMPT//\{\{SOL_VWAP\}\}/$SOL_VWAP}
PROMPT=${PROMPT//\{\{SOL_PRICE_ORACLE\}\}/$SOL_PRICE_ORACLE}
PROMPT=${PROMPT//\{\{SOL_KRAIKEN\}\}/$SOL_KRAIKEN}
PROMPT=${PROMPT//\{\{SOL_STAKE\}\}/$SOL_STAKE}
PROMPT=${PROMPT//\{\{ATTACK_SUITE_RESULTS\}\}/$ATTACK_SUITE_RESULTS}
PROMPT=${PROMPT//\{\{CROSS_CANDIDATE_SECTION\}\}/$CROSS_CANDIDATE_SECTION}
PROMPT=${PROMPT//\{\{MEMORY_SECTION\}\}/$MEMORY_SECTION}
# ── 7. Create output directory and run the agent ───────────────────────────────
mkdir -p "$REPORT_DIR"
mkdir -p "$(dirname "$MEMORY_FILE")"
mkdir -p "$(dirname "$CROSS_PATTERNS_FILE")"
log "Spawning Claude red-team agent (timeout: ${CLAUDE_TIMEOUT}s)..."
log " Report will be written to: $REPORT"
set +e
# Write prompt to temp file to avoid "Argument list too long" (prompt can be 50KB+)
PROMPT_FILE=$(mktemp /tmp/red-team-prompt-XXXXXX.md)
printf '%s' "$PROMPT" > "$PROMPT_FILE"
# Note: --verbose is required by the claude CLI when --output-format stream-json is used;
# omitting it causes the CLI to exit with an error, producing an empty stream log.
# Run synchronously — timeout handles kill, no need to background
timeout "$CLAUDE_TIMEOUT" bash -c 'claude -p --dangerously-skip-permissions \
--verbose --output-format stream-json \
<"$1" >"$2" 2>&1' _ "$PROMPT_FILE" "$STREAM_LOG"
AGENT_EXIT=$?
CLAUDE_PID=""
set -e
if [[ $AGENT_EXIT -ne 0 ]]; then
log "WARNING: claude exited with code $AGENT_EXIT — see $STREAM_LOG for details"
fi
# Extract readable text from stream-json for the report
python3 - "$STREAM_LOG" >"$REPORT" <<'PYEOF'
import json, sys
with open(sys.argv[1]) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
if obj.get("type") == "assistant":
for block in obj.get("message", {}).get("content", []):
if block.get("type") == "text":
print(block["text"], end="")
except:
pass
PYEOF
# If the agent crashed and produced no readable output, treat as an infra error
# rather than silently reporting ETH SAFE (a false pass).
if [[ $AGENT_EXIT -ne 0 && ! -s "$REPORT" ]]; then
die "claude agent failed (exit $AGENT_EXIT) with no readable output — see $STREAM_LOG"
fi
# ── 8. Read lm_eth_after ────────────────────────────────────────────────────────
log "Reading floor after agent run..."
LM_ETH_AFTER=$(compute_lm_total_eth)
# ── 8a. Extract and persist strategy findings ──────────────────────────────────
log "Extracting strategy findings from agent output..."
extract_memory "$STREAM_LOG"
log " lm_eth_after = $LM_ETH_AFTER wei"
# ── 8b. Export attack sequence and replay with AttackRunner ────────────────────
# Converts the agent's cast send commands to structured JSONL and replays them
# via AttackRunner.s.sol to capture full state snapshots for optimizer training.
log "Exporting attack sequence from stream log..."
set +e
python3 "$REPO_ROOT/scripts/harb-evaluator/export-attacks.py" \
"$STREAM_LOG" "$ATTACK_EXPORT" 2>&1 | while IFS= read -r line; do log " $line"; done
EXPORT_EXIT=${PIPESTATUS[0]}
set -e
if [[ $EXPORT_EXIT -eq 0 && -f "$ATTACK_EXPORT" && -s "$ATTACK_EXPORT" ]]; then
log " Attack export: $ATTACK_EXPORT"
log " Replaying attack sequence with AttackRunner for state snapshots..."
set +e
(cd "$REPO_ROOT/onchain" && \
ATTACK_FILE="$ATTACK_EXPORT" \
DEPLOYMENTS_FILE="deployments-local.json" \
SWAP_ROUTER="$SWAP_ROUTER" \
NPM_ADDR="$NPM" \
"$FORGE" script script/backtesting/AttackRunner.s.sol \
--rpc-url "$RPC_URL" --broadcast 2>&1 \
| grep '^{' >"$ATTACK_SNAPSHOTS")
REPLAY_EXIT=$?
set -e
if [[ $REPLAY_EXIT -eq 0 && -s "$ATTACK_SNAPSHOTS" ]]; then
SNAPSHOT_COUNT=$(wc -l <"$ATTACK_SNAPSHOTS")
log " AttackRunner replay complete: $SNAPSHOT_COUNT snapshots → $ATTACK_SNAPSHOTS"
else
log " WARNING: AttackRunner replay produced no snapshots (exit $REPLAY_EXIT) — non-fatal"
fi
# Revert to the clean baseline after replay so the floor check below is unaffected.
"$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true
# Re-take the snapshot so cleanup trap still has a valid ID to revert.
SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"')
else
log " WARNING: No attack operations exported from stream — skipping AttackRunner replay"
fi
# ── 9. Summarise results ───────────────────────────────────────────────────────
log ""
log "=== RED-TEAM SUMMARY ==="
log ""
log " lm_eth_before : $LM_ETH_BEFORE wei"
log " lm_eth_after : $LM_ETH_AFTER wei"
log ""
BROKE=false
if python3 -c "import sys; sys.exit(0 if int('${LM_ETH_AFTER:-0}') < int('${LM_ETH_BEFORE:-0}') else 1)"; then
BROKE=true
fi
# ── 9a-pre. Write structured evidence JSON ──────────────────────────────────
EVIDENCE_DIR="$REPO_ROOT/evidence/red-team"
EVIDENCE_DATE=$(date -u +%Y-%m-%d)
EVIDENCE_FILE="$EVIDENCE_DIR/$EVIDENCE_DATE.json"
mkdir -p "$EVIDENCE_DIR"
if [[ "$BROKE" == "true" ]]; then
_verdict="floor_broken"
_floor_held="false"
_eth_extracted=$(python3 -c "print(int('${LM_ETH_BEFORE:-0}') - int('${LM_ETH_AFTER:-0}'))")
else
_verdict="floor_held"
_floor_held="true"
_eth_extracted=0
fi
python3 - "$EVIDENCE_FILE" "$REPO_ROOT/tmp/red-team-memory.jsonl" \
"$EVIDENCE_DATE" "$CANDIDATE_NAME" "$CANDIDATE_COMMIT" "$OPTIMIZER_PROFILE" \
"$LM_ETH_BEFORE" "$LM_ETH_AFTER" "$_eth_extracted" "$_floor_held" "$_verdict" \
"$ATTACK_SUITE_COUNT" <<'PYEOF'
import json, sys, os
evidence_file = sys.argv[1]
memory_file = sys.argv[2]
date = sys.argv[3]
candidate = sys.argv[4]
candidate_commit = sys.argv[5]
optimizer_profile = sys.argv[6]
lm_eth_before = int(sys.argv[7]) if sys.argv[7].isdigit() else 0
lm_eth_after = int(sys.argv[8]) if sys.argv[8].isdigit() else 0
eth_extracted = int(sys.argv[9]) if sys.argv[9].isdigit() else 0
floor_held = sys.argv[10].lower() == "true"
verdict = sys.argv[11]
attack_suite_count = int(sys.argv[12]) if len(sys.argv) > 12 and sys.argv[12].isdigit() else 0
# Build attacks list from memory entries for this candidate
attacks = []
if os.path.isfile(memory_file) and os.path.getsize(memory_file) > 0:
with open(memory_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
e = json.loads(line)
if e.get("candidate") != candidate:
continue
attacks.append({
"strategy": e.get("strategy", ""),
"pattern": e.get("pattern", ""),
"result": e.get("result", "HELD"),
"delta_bps": e.get("delta_bps", 0),
"insight": e.get("insight", ""),
})
except Exception:
pass
evidence = {
"date": date,
"candidate": candidate,
"candidate_commit": candidate_commit,
"optimizer_profile": optimizer_profile,
"lm_eth_before": lm_eth_before,
"lm_eth_after": lm_eth_after,
"eth_extracted": eth_extracted,
"floor_held": floor_held,
"verdict": verdict,
"attacks": attacks,
"attack_suite_count": attack_suite_count,
}
with open(evidence_file, "w") as f:
json.dump(evidence, f, indent=2)
f.write("\n")
print(f" Evidence written to {evidence_file}")
PYEOF
log "Evidence file: $EVIDENCE_FILE"
if [[ "$BROKE" == "true" ]]; then
DELTA=$(python3 -c "print(int('${LM_ETH_BEFORE:-0}') - int('${LM_ETH_AFTER:-0}'))")
log " RESULT: ETH EXTRACTED ❌"
log " Decrease: $DELTA wei"
log ""
log " See $REPORT for the winning strategy."
log ""
# Append a machine-readable summary to the report
cat >>"$REPORT" <<SUMMARY_EOF
=== RUNNER SUMMARY ===
lm_eth_before : $LM_ETH_BEFORE
lm_eth_after : $LM_ETH_AFTER
delta : -$DELTA
verdict : ETH_EXTRACTED
SUMMARY_EOF
# ── 9a. Promote attack vector to git via Codeberg PR (non-fatal) ──────────
if [[ -f "$ATTACK_EXPORT" && -s "$ATTACK_EXPORT" ]]; then
log "Promoting attack vector to git via PR ..."
set +e
bash "$SCRIPT_DIR/promote-attacks.sh" \
--attacks "$ATTACK_EXPORT" \
--candidate "$CANDIDATE_NAME" \
--profile "$OPTIMIZER_PROFILE" \
--eth-extracted "$DELTA" \
--eth-before "$LM_ETH_BEFORE" 2>&1 | while IFS= read -r line; do log " $line"; done
PROMOTE_EXIT="${PIPESTATUS[0]}"
set -e
if [[ "$PROMOTE_EXIT" -ne 0 ]]; then
log " WARNING: promote-attacks.sh exited with code $PROMOTE_EXIT — PR was not created"
fi
fi
exit 1
else
log " RESULT: ETH SAFE ✅"
log ""
log " See $REPORT for strategies attempted."
log ""
cat >>"$REPORT" <<SUMMARY_EOF
=== RUNNER SUMMARY ===
lm_eth_before : $LM_ETH_BEFORE
lm_eth_after : $LM_ETH_AFTER
delta : 0 (or increase)
verdict : ETH_SAFE
SUMMARY_EOF
exit 0
fi