bootstrap-light.sh now extracts the Uniswap V3 pool address from DeployLocal.sol deploy output and writes both Pool and V3Factory (Base Sepolia: 0x4752ba5DBc23f44D87826276BF6Fd6b1C372aD24) into deployments-local.json alongside the existing contract addresses. red-team.sh now reads V3_FACTORY and POOL from deployments-local.json instead of hardcoding the Base mainnet factory address (0x33128a8fC17869897dcE68Ed026d694621f6FDfD), and removes the getPool() RPC call that always failed with "contract does not have any code" on the Sepolia fork. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
717 lines
29 KiB
Bash
Executable file
717 lines
29 KiB
Bash
Executable file
#!/usr/bin/env bash
|
|
# red-team.sh — Adversarial floor-attack agent runner.
|
|
#
|
|
# Spawns a Claude sub-agent with tools and a goal: make ethPerToken() decrease.
|
|
# The agent iterates freely — snapshot → strategy → check floor → revert → repeat.
|
|
#
|
|
# Usage: red-team.sh
|
|
#
|
|
# Exit codes:
|
|
# 0 floor held (no confirmed decrease)
|
|
# 1 floor broken (agent found a strategy that decreased ethPerToken)
|
|
# 2 infra error (stack not running, missing dependency, etc.)
|
|
#
|
|
# Environment overrides:
|
|
# CLAUDE_TIMEOUT seconds for the agent run (default: 7200)
|
|
# RPC_URL Anvil RPC endpoint (default: http://localhost:8545)
|
|
|
|
set -euo pipefail
|
|
|
|
CAST=/home/debian/.foundry/bin/cast
|
|
FORGE=/home/debian/.foundry/bin/forge
|
|
RPC_URL="${RPC_URL:-http://localhost:8545}"
|
|
CLAUDE_TIMEOUT="${CLAUDE_TIMEOUT:-7200}"
|
|
REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)"
|
|
REPORT_DIR="$REPO_ROOT/tmp"
|
|
REPORT="$REPORT_DIR/red-team-report.txt"
|
|
STREAM_LOG="$REPORT_DIR/red-team-stream.jsonl"
|
|
MEMORY_FILE="$REPO_ROOT/tmp/red-team-memory.jsonl"
|
|
CROSS_PATTERNS_FILE="/tmp/red-team-cross-patterns.jsonl"
|
|
ATTACK_EXPORT="$REPORT_DIR/red-team-attacks.jsonl"
|
|
ATTACK_SNAPSHOTS="$REPORT_DIR/red-team-snapshots.jsonl"
|
|
DEPLOYMENTS="$REPO_ROOT/onchain/deployments-local.json"
|
|
|
|
# ── Candidate metadata (set by red-team-sweep.sh; defaults to unknown for standalone runs) ─
|
|
CANDIDATE_NAME="${CANDIDATE_NAME:-unknown}"
|
|
OPTIMIZER_PROFILE="${OPTIMIZER_PROFILE:-unknown}"
|
|
|
|
# ── Anvil accounts ─────────────────────────────────────────────────────────────
|
|
# Account 8 — adversary (10k ETH, 0 KRK)
|
|
ADV_PK=0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97
|
|
# Account 2 — recenter caller (recenter is public, any account can call)
|
|
RECENTER_PK=0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a
|
|
|
|
# ── Infrastructure constants ───────────────────────────────────────────────────
|
|
WETH=0x4200000000000000000000000000000000000006
|
|
# Base mainnet SwapRouter02 — https://basescan.org/address/0x2626664c2603336E57B271c5C0b26F421741e481
|
|
SWAP_ROUTER=0x2626664c2603336E57B271c5C0b26F421741e481
|
|
# Base mainnet NonfungiblePositionManager — https://basescan.org/address/0x03a520B32c04bf3beef7BEb72E919cF822Ed34F3
|
|
NPM=0x03a520B32c04bf3beef7BEb72E919cF822Ed34F3
|
|
POOL_FEE=10000
|
|
|
|
# ── Logging helpers ────────────────────────────────────────────────────────────
|
|
log() { echo "[red-team] $*"; }
|
|
die() { echo "[red-team] ERROR: $*" >&2; exit 2; }
|
|
|
|
# ── Prerequisites ──────────────────────────────────────────────────────────────
|
|
command -v "$CAST" &>/dev/null || die "cast not found at $CAST"
|
|
command -v "$FORGE" &>/dev/null || die "forge not found at $FORGE"
|
|
command -v claude &>/dev/null || die "claude CLI not found (install: npm i -g @anthropic-ai/claude-code)"
|
|
command -v python3 &>/dev/null || die "python3 not found"
|
|
command -v jq &>/dev/null || die "jq not found"
|
|
|
|
# ── 1. Fresh stack via bootstrap-light ─────────────────────────────────────────
|
|
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
|
log "Running bootstrap-light ..."
|
|
bash "$SCRIPT_DIR/bootstrap-light.sh" || die "bootstrap-light failed"
|
|
|
|
# Verify Anvil responds
|
|
"$CAST" chain-id --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|
|
|| die "Anvil not accessible at $RPC_URL after bootstrap-light"
|
|
|
|
# ── 2. Read contract addresses ─────────────────────────────────────────────────
|
|
[[ -f "$DEPLOYMENTS" ]] || die "deployments-local.json not found at $DEPLOYMENTS (bootstrap not complete)"
|
|
|
|
KRK=$(jq -r '.contracts.Kraiken' "$DEPLOYMENTS")
|
|
STAKE=$(jq -r '.contracts.Stake' "$DEPLOYMENTS")
|
|
LM=$(jq -r '.contracts.LiquidityManager' "$DEPLOYMENTS")
|
|
OPT=$(jq -r '.contracts.OptimizerProxy' "$DEPLOYMENTS")
|
|
V3_FACTORY=$(jq -r '.contracts.V3Factory' "$DEPLOYMENTS")
|
|
POOL=$(jq -r '.contracts.Pool' "$DEPLOYMENTS")
|
|
|
|
for var in KRK STAKE LM OPT V3_FACTORY POOL; do
|
|
val="${!var}"
|
|
[[ -n "$val" && "$val" != "null" ]] \
|
|
|| die "$var address missing from deployments-local.json — was bootstrap successful?"
|
|
done
|
|
|
|
log " KRK: $KRK"
|
|
log " STAKE: $STAKE"
|
|
log " LM: $LM"
|
|
log " OPT: $OPT"
|
|
log " V3_FACTORY: $V3_FACTORY"
|
|
log " Pool: $POOL"
|
|
|
|
# Derive Anvil account addresses from their private keys
|
|
ADV_ADDR=$("$CAST" wallet address --private-key "$ADV_PK")
|
|
RECENTER_ADDR=$("$CAST" wallet address --private-key "$RECENTER_PK")
|
|
log " Adversary: $ADV_ADDR (account 8)"
|
|
log " Recenter: $RECENTER_ADDR (account 2)"
|
|
|
|
# ── 3a. recenter() is now public (no recenterAccess needed) ──
|
|
# Any address can call recenter() — TWAP oracle enforces safety.
|
|
log "recenter() is public — no access grant needed"
|
|
|
|
# ── 3b. Set feeDestination to LM itself (fees accrue as liquidity) ─────────────
|
|
# setFeeDestination allows repeated EOA sets; setting to a contract locks it permanently.
|
|
# The deployer (Anvil account 0) deployed LiquidityManager and may call setFeeDestination again.
|
|
# DEPLOYER_PK is Anvil's deterministic account-0 key — valid ONLY against a local ephemeral
|
|
# Anvil instance. Never run this script against a non-ephemeral or shared-state chain.
|
|
DEPLOYER_PK=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
|
|
log "Setting feeDestination to LM ($LM) ..."
|
|
"$CAST" send --rpc-url "$RPC_URL" --private-key "$DEPLOYER_PK" \
|
|
"$LM" "setFeeDestination(address)" "$LM" >/dev/null 2>&1 \
|
|
|| die "setFeeDestination($LM) failed"
|
|
VERIFY=$("$CAST" call "$LM" "feeDestination()(address)" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g')
|
|
log " feeDestination set to: $VERIFY"
|
|
[[ "${VERIFY,,}" == "${LM,,}" ]] || die "feeDestination verification failed: expected $LM, got $VERIFY"
|
|
|
|
# ── 3c. Fund LM with 1000 ETH and deploy into positions via recenter ───────────
|
|
# Send ETH as WETH (LM uses WETH internally), then recenter to deploy into positions.
|
|
# Without recenter, the ETH sits idle and the first recenter mints massive KRK.
|
|
log "Funding LM with 1000 ETH ..."
|
|
# Wrap to WETH and transfer to LM
|
|
"$CAST" send "$WETH" "deposit()" --value 1000ether \
|
|
--private-key "$ADV_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|
|
|| die "Failed to wrap ETH"
|
|
"$CAST" send "$WETH" "transfer(address,uint256)" "$LM" 1000000000000000000000 \
|
|
--private-key "$ADV_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|
|
|| die "Failed to transfer WETH to LM"
|
|
|
|
# Recenter to deploy the new WETH into positions (establishes realistic baseline)
|
|
log "Recentering to deploy funded WETH into positions ..."
|
|
"$CAST" send "$LM" "recenter()" \
|
|
--private-key "$RECENTER_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 \
|
|
|| log " WARNING: initial recenter failed (may need amplitude — mining blocks)"
|
|
# Advance time and mine blocks, then retry recenter
|
|
for _i in $(seq 1 3); do
|
|
"$CAST" rpc evm_increaseTime 600 --rpc-url "$RPC_URL" >/dev/null 2>&1
|
|
for _b in $(seq 1 50); do
|
|
"$CAST" rpc evm_mine --rpc-url "$RPC_URL" >/dev/null 2>&1
|
|
done
|
|
"$CAST" send "$LM" "recenter()" \
|
|
--private-key "$RECENTER_PK" --rpc-url "$RPC_URL" >/dev/null 2>&1 && break
|
|
done
|
|
|
|
LM_ETH=$("$CAST" balance "$LM" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g')
|
|
LM_WETH=$("$CAST" call "$WETH" "balanceOf(address)(uint256)" "$LM" --rpc-url "$RPC_URL" | sed 's/\[.*//;s/[[:space:]]//g')
|
|
log " LM after recenter: ETH=$LM_ETH WETH=$LM_WETH"
|
|
|
|
# ── 4. Take Anvil snapshot (clean baseline) ─────
|
|
log "Taking Anvil snapshot..."
|
|
SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"')
|
|
log " Snapshot ID: $SNAP"
|
|
|
|
# Revert to the baseline snapshot on exit so subsequent runs start clean.
|
|
CLAUDE_PID=""
|
|
cleanup() {
|
|
local rc=$?
|
|
if [[ -n "${CLAUDE_PID:-}" ]]; then
|
|
kill "$CLAUDE_PID" 2>/dev/null || true
|
|
fi
|
|
if [[ -n "${SNAP:-}" ]]; then
|
|
"$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true
|
|
fi
|
|
exit $rc
|
|
}
|
|
trap cleanup EXIT INT TERM
|
|
|
|
# ── Helper: compute total ETH controlled by LM ────────────────────────────────
|
|
# Total = free ETH + free WETH + ETH locked in all 3 Uni V3 positions
|
|
# This is the real metric: "can the adversary extract ETH from the protocol?"
|
|
# Uses a forge script with exact Uni V3 integer math (LiquidityAmounts + TickMath)
|
|
# instead of multiple cast calls + Python float approximation.
|
|
compute_lm_total_eth() {
|
|
local output result
|
|
output=$(cd "$REPO_ROOT" && LM="$LM" WETH="$WETH" POOL="$POOL" \
|
|
"$FORGE" script onchain/script/LmTotalEth.s.sol \
|
|
--rpc-url "$RPC_URL" --root onchain 2>&1)
|
|
# forge script prints "== Logs ==" then " <value>" — extract the number
|
|
result=$(echo "$output" | awk '/^== Logs ==/{getline; gsub(/^[[:space:]]+/,""); print; exit}')
|
|
[[ -n "$result" && "$result" =~ ^[0-9]+$ ]] || die "Failed to read LM total ETH (forge output: $output)"
|
|
echo "$result"
|
|
}
|
|
|
|
# ── Helper: extract strategy findings from stream-json and append to memory ────
|
|
extract_memory() {
|
|
local stream_file="$1"
|
|
local run_num memory_file="$MEMORY_FILE"
|
|
|
|
# Determine run number: use max run in file + 1 so it stays monotonic after trim
|
|
if [[ -f "$memory_file" ]]; then
|
|
run_num=$(python3 - "$memory_file" <<'EOF'
|
|
import json, sys
|
|
entries = [json.loads(l) for l in open(sys.argv[1]) if l.strip()]
|
|
print(max((e.get('run', 0) for e in entries), default=0) + 1)
|
|
EOF
|
|
)
|
|
[[ "$run_num" =~ ^[0-9]+$ ]] || run_num=1
|
|
else
|
|
run_num=1
|
|
fi
|
|
|
|
python3 - "$stream_file" "$memory_file" "$run_num" "$LM_ETH_BEFORE" "$CANDIDATE_NAME" "$OPTIMIZER_PROFILE" <<'PYEOF'
|
|
import json, sys, re
|
|
from datetime import datetime, timezone
|
|
|
|
stream_file = sys.argv[1]
|
|
memory_file = sys.argv[2]
|
|
run_num = int(sys.argv[3])
|
|
try:
|
|
lm_eth_before = int(sys.argv[4])
|
|
except (ValueError, IndexError):
|
|
print(" extract_memory: invalid lm_eth_before value, skipping", file=sys.stderr)
|
|
sys.exit(0)
|
|
candidate = sys.argv[5] if len(sys.argv) > 5 else "unknown"
|
|
optimizer_profile = sys.argv[6] if len(sys.argv) > 6 else "unknown"
|
|
|
|
def make_pattern(strategy_name, steps_text):
|
|
"""Extract abstract op sequence preserving execution order."""
|
|
text = (strategy_name + " " + steps_text).lower()
|
|
op_positions = []
|
|
|
|
for kw, label in [("wrap", "wrap"), ("buy", "buy"), ("sell", "sell")]:
|
|
m = re.search(r'\b' + kw + r'\b', text)
|
|
if m:
|
|
op_positions.append((m.start(), label))
|
|
|
|
# Use word boundaries so 'stake' never matches inside 'unstake'
|
|
m_stake = re.search(r'\bstake\b', text)
|
|
if m_stake:
|
|
ctx = text[max(0, m_stake.start() - 10):m_stake.start() + 20]
|
|
op_positions.append((m_stake.start(), "stake_all" if "all" in ctx else "stake"))
|
|
|
|
m_unstake = re.search(r'\bunstake\b', text)
|
|
if m_unstake:
|
|
op_positions.append((m_unstake.start(), "unstake"))
|
|
|
|
recenter_matches = list(re.finditer(r'\brecenter\b', text))
|
|
if recenter_matches:
|
|
label = "recenter" if len(recenter_matches) == 1 else "recenter_multi"
|
|
op_positions.append((recenter_matches[0].start(), label))
|
|
|
|
# add_lp: keyword or mint + LP context
|
|
m = re.search(r'\badd_lp\b', text)
|
|
if m:
|
|
op_positions.append((m.start(), "add_lp"))
|
|
elif re.search(r'\bmint\b', text) and ("lp" in text or "liquidity" in text):
|
|
m = re.search(r'\bmint\b', text)
|
|
op_positions.append((m.start(), "add_lp"))
|
|
|
|
# remove_lp: keyword or decreaseliquidity
|
|
for pat in [r'\bremove_lp\b', r'\bdecreaseliquidity\b']:
|
|
m = re.search(pat, text)
|
|
if m:
|
|
op_positions.append((m.start(), "remove_lp"))
|
|
break
|
|
|
|
# Sort by first occurrence position to reflect actual execution order
|
|
op_positions.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
ops = []
|
|
for _, label in op_positions:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
ops.append(label)
|
|
return " → ".join(ops) if ops else strategy_name[:60]
|
|
|
|
texts = []
|
|
with open(stream_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
if obj.get("type") == "assistant":
|
|
for block in obj.get("message", {}).get("content", []):
|
|
if block.get("type") == "text":
|
|
texts.append(block["text"])
|
|
except:
|
|
pass
|
|
|
|
# Parse strategies from agent text
|
|
strategies = []
|
|
current = None
|
|
for text in texts:
|
|
# Detect strategy headers: matches "## Strategy 1: name" and "STRATEGY 1: name"
|
|
strat_match = re.search(r"(?:##\s*)?[Ss][Tt][Rr][Aa][Tt][Ee][Gg][Yy]\s*\d+[^:]*:\s*(.+)", text)
|
|
if strat_match:
|
|
if current:
|
|
strategies.append(current)
|
|
current = {
|
|
"strategy": strat_match.group(1).strip(),
|
|
"steps": "",
|
|
"lm_eth_after": None,
|
|
"insight": "",
|
|
"insight_pri": 999 # tracks priority of stored insight; lower index wins
|
|
}
|
|
|
|
if current:
|
|
# Capture floor readings — take the last match in the block (most recent value)
|
|
floor_matches = list(re.finditer(r"(?:floor|ethPerToken|lm.?eth)[^\d]*?(\d{4,})\s*(?:wei)?", text, re.IGNORECASE))
|
|
if floor_matches:
|
|
current["lm_eth_after"] = int(floor_matches[-1].group(1))
|
|
|
|
# Capture insights — prefer explicit labels; only overwrite if new match is higher priority
|
|
for pri, ins_pat in enumerate([
|
|
r"[Kk]ey [Ii]nsight:\s*(.+)",
|
|
r"[Ii]nsight:\s*(.+)",
|
|
r"[Ww][Hh][Yy][^:]*:\s*(.{30,})",
|
|
r"(?:because|since|due to)\s+(.{30,})",
|
|
r"(?:discovered|learned|realized)\s+(?:that\s+)?(.+)"
|
|
]):
|
|
if pri >= current["insight_pri"]:
|
|
break # already have a higher-priority insight stored
|
|
insight_match = re.search(ins_pat, text)
|
|
if insight_match and len(insight_match.group(1)) > 20:
|
|
current["insight"] = insight_match.group(1).strip()[:300]
|
|
current["insight_pri"] = pri
|
|
break
|
|
|
|
# Capture step summaries
|
|
if any(word in text.lower() for word in ["wrap", "buy", "sell", "stake", "recenter", "mint", "approve"]):
|
|
if len(text) < 200:
|
|
current["steps"] += text.strip() + "; "
|
|
|
|
if current:
|
|
strategies.append(current)
|
|
|
|
# Write to memory file
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
with open(memory_file, "a") as f:
|
|
for s in strategies:
|
|
fa = s["lm_eth_after"] if s.get("lm_eth_after") is not None else lm_eth_before
|
|
delta_bps = round((fa - lm_eth_before) * 10000 / lm_eth_before) if lm_eth_before else 0
|
|
if fa < lm_eth_before:
|
|
result = "DECREASED"
|
|
elif fa > lm_eth_before:
|
|
result = "INCREASED"
|
|
else:
|
|
result = "HELD"
|
|
|
|
pattern = make_pattern(s["strategy"], s["steps"])
|
|
entry = {
|
|
"run": run_num,
|
|
"ts": ts,
|
|
"candidate": candidate,
|
|
"optimizer_profile": optimizer_profile,
|
|
"strategy": s["strategy"][:100],
|
|
"pattern": pattern[:150],
|
|
"steps": s["steps"][:300].rstrip("; "),
|
|
"lm_eth_before": lm_eth_before,
|
|
"lm_eth_after": fa,
|
|
"delta_bps": delta_bps,
|
|
"result": result,
|
|
"insight": s["insight"][:300]
|
|
}
|
|
f.write(json.dumps(entry) + "\n")
|
|
print(f" Recorded: {entry['strategy']} [{entry['candidate']}] → {result} ({delta_bps:+d} bps)")
|
|
|
|
if not strategies:
|
|
print(" No strategies detected in stream output")
|
|
|
|
# Trim memory file: keep 10 most recent + all DECREASED entries (cap at 50)
|
|
with open(memory_file) as f:
|
|
all_entries = [json.loads(l) for l in f if l.strip()]
|
|
|
|
if len(all_entries) > 50:
|
|
# Keep all DECREASED entries + 10 most recent; deduplicate preserving order
|
|
trimmed = [e for e in all_entries if e.get("result") == "DECREASED"] + all_entries[-10:]
|
|
seen = set()
|
|
deduped = []
|
|
for e in trimmed:
|
|
key = (e.get("run"), e.get("ts"), e.get("strategy"))
|
|
if key not in seen:
|
|
seen.add(key)
|
|
deduped.append(e)
|
|
with open(memory_file, "w") as f:
|
|
for e in deduped:
|
|
f.write(json.dumps(e) + "\n")
|
|
print(f" Trimmed memory to {len(deduped)} entries")
|
|
PYEOF
|
|
}
|
|
|
|
# ── 5. Read lm_eth_before ───────────────────────────────────────────────────────
|
|
log "Reading floor before agent run..."
|
|
LM_ETH_BEFORE=$(compute_lm_total_eth)
|
|
log " lm_eth_before = $LM_ETH_BEFORE wei"
|
|
|
|
# ── 6. Build agent prompt ──────────────────────────────────────────────────────
|
|
|
|
# ── 6a. Read Solidity source files (reflect the current candidate after inject) ─
|
|
ONCHAIN_SRC="$REPO_ROOT/onchain/src"
|
|
SOL_LM=$(< "$ONCHAIN_SRC/LiquidityManager.sol")
|
|
SOL_THREE_POS=$(< "$ONCHAIN_SRC/abstracts/ThreePositionStrategy.sol")
|
|
SOL_OPTIMIZER=$(< "$ONCHAIN_SRC/Optimizer.sol")
|
|
SOL_OPTIMIZERV3=$(< "$ONCHAIN_SRC/OptimizerV3.sol")
|
|
SOL_VWAP=$(< "$ONCHAIN_SRC/VWAPTracker.sol")
|
|
SOL_PRICE_ORACLE=$(< "$ONCHAIN_SRC/abstracts/PriceOracle.sol")
|
|
SOL_KRAIKEN=$(< "$ONCHAIN_SRC/Kraiken.sol")
|
|
SOL_STAKE=$(< "$ONCHAIN_SRC/Stake.sol")
|
|
|
|
# Build Previous Findings section from memory file
|
|
MEMORY_SECTION=""
|
|
if [[ -f "$MEMORY_FILE" && -s "$MEMORY_FILE" ]]; then
|
|
MEMORY_SECTION=$(python3 - "$MEMORY_FILE" <<'PYEOF'
|
|
import json, sys
|
|
from collections import defaultdict
|
|
entries = []
|
|
with open(sys.argv[1]) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
entries.append(json.loads(line))
|
|
if not entries:
|
|
sys.exit(0)
|
|
print('## Previous Findings (from earlier runs)')
|
|
print()
|
|
print('DO NOT repeat strategies marked HELD or INCREASED. Build on the insights.')
|
|
print('Distinguish optimizer-specific vulnerabilities from universal patterns.')
|
|
print('Try NEW combinations not yet attempted. Combine tools creatively.')
|
|
print()
|
|
|
|
# Cross-candidate: patterns that DECREASED in multiple distinct candidates
|
|
decreased = [e for e in entries if e.get('result') == 'DECREASED']
|
|
cross = defaultdict(set)
|
|
for e in decreased:
|
|
key = e.get('pattern') or e.get('strategy', '')
|
|
cross[key].add(e.get('candidate', 'unknown'))
|
|
universal = [(p, cands) for p, cands in cross.items() if len(cands) > 1]
|
|
if universal:
|
|
print('### Universal Patterns (succeeded across multiple candidates)')
|
|
for pat, cands in universal:
|
|
print(f"- **{pat}** — worked on: {', '.join(sorted(cands))}")
|
|
print()
|
|
|
|
# Group remaining entries by candidate
|
|
by_candidate = defaultdict(list)
|
|
for e in entries:
|
|
by_candidate[e.get('candidate', 'unknown')].append(e)
|
|
|
|
for cand, cand_entries in sorted(by_candidate.items()):
|
|
prof = next((e.get('optimizer_profile', '') for e in cand_entries
|
|
if e.get('optimizer_profile', '') not in ('', 'unknown')), '')
|
|
print(f"### Candidate: {cand}")
|
|
if prof:
|
|
print(f"Profile: {prof}")
|
|
print()
|
|
for e in cand_entries:
|
|
r = e.get('result', '?')
|
|
emoji = '❌' if r == 'DECREASED' else '⬆️' if r == 'INCREASED' else '➡️'
|
|
pat = e.get('pattern', '')
|
|
print(f"#### Run {e.get('run','?')}: {e.get('strategy','?')} {emoji} {r}")
|
|
if pat:
|
|
print(f"Pattern: `{pat}`")
|
|
print(f"Steps: {e.get('steps','?')}")
|
|
print(f"Delta: {e.get('delta_bps',0)} bps")
|
|
if e.get('insight'):
|
|
print(f"**Insight:** {e['insight']}")
|
|
print()
|
|
PYEOF
|
|
)
|
|
fi
|
|
|
|
# Build Cross-Candidate Intelligence section from the cross-patterns file
|
|
CROSS_CANDIDATE_SECTION=""
|
|
if [[ -f "$CROSS_PATTERNS_FILE" && -s "$CROSS_PATTERNS_FILE" ]]; then
|
|
CROSS_CANDIDATE_SECTION=$(python3 - "$CROSS_PATTERNS_FILE" "$CANDIDATE_NAME" <<'PYEOF'
|
|
import json, sys
|
|
from collections import defaultdict
|
|
|
|
cross_file = sys.argv[1]
|
|
current_candidate = sys.argv[2] if len(sys.argv) > 2 else ""
|
|
|
|
entries = []
|
|
with open(cross_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except Exception:
|
|
pass
|
|
|
|
if not entries:
|
|
sys.exit(0)
|
|
|
|
# Exclude entries from the current candidate (they are cross-candidate evidence, not self-evidence)
|
|
entries = [e for e in entries if e.get("candidate", "unknown") != current_candidate]
|
|
|
|
# Group by abstract pattern; track worked/failed per candidate
|
|
by_pattern = defaultdict(lambda: {"worked": {}, "failed": {}, "insight": ""})
|
|
for e in entries:
|
|
pat = e.get("pattern", "") or e.get("strategy", "")[:80]
|
|
if not pat:
|
|
continue # skip entries with no identifiable pattern
|
|
cand = e.get("candidate", "unknown")
|
|
prof = e.get("optimizer_profile", "unknown")
|
|
result = e.get("result", "HELD")
|
|
insight = e.get("insight", "")
|
|
if result == "DECREASED":
|
|
by_pattern[pat]["worked"][cand] = prof
|
|
else:
|
|
by_pattern[pat]["failed"][cand] = prof
|
|
if insight and not by_pattern[pat]["insight"]:
|
|
by_pattern[pat]["insight"] = insight
|
|
|
|
universal = [(p, d) for p, d in by_pattern.items() if len(d["worked"]) > 1]
|
|
candidate_specific = [(p, d) for p, d in by_pattern.items() if len(d["worked"]) == 1]
|
|
failed_all = [(p, d) for p, d in by_pattern.items() if not d["worked"] and d["failed"]]
|
|
|
|
print("## Cross-Candidate Intelligence")
|
|
print()
|
|
print("Attack patterns learned across all previously tested candidates.")
|
|
print("Exploit successes. Avoid repeating patterns that universally failed.")
|
|
print()
|
|
|
|
def fmt_cand(cand, prof):
|
|
return f"{cand} ({prof})" if prof and prof not in ("", "unknown") else cand
|
|
|
|
if universal:
|
|
print("### Universal Patterns (succeeded on 2+ candidates)")
|
|
for pat, d in sorted(universal, key=lambda x: -len(x[1]["worked"])):
|
|
worked_str = ", ".join(fmt_cand(c, p) for c, p in sorted(d["worked"].items()))
|
|
print(f"- `{pat}` — **BROKE** on: {worked_str}")
|
|
if d["failed"]:
|
|
failed_str = ", ".join(d["failed"])
|
|
print(f" Held on: {failed_str}")
|
|
if d["insight"]:
|
|
print(f" Insight: {d['insight']}")
|
|
print()
|
|
|
|
if candidate_specific:
|
|
print("### Candidate-Specific Patterns (broke exactly one candidate)")
|
|
for pat, d in candidate_specific:
|
|
worked_cand, worked_prof = next(iter(d["worked"].items()))
|
|
print(f"- `{pat}` — **BROKE** on: {fmt_cand(worked_cand, worked_prof)}")
|
|
if d["failed"]:
|
|
print(f" Held on: {', '.join(d['failed'])}")
|
|
if d["insight"]:
|
|
print(f" Insight: {d['insight']}")
|
|
print()
|
|
|
|
if failed_all:
|
|
print("### Patterns That Held Across All Candidates Tried")
|
|
for pat, d in failed_all:
|
|
print(f"- `{pat}` — held on: {', '.join(d['failed'])}")
|
|
print()
|
|
PYEOF
|
|
)
|
|
fi
|
|
|
|
PROMPT=$(cat "$SCRIPT_DIR/red-team-program.md")
|
|
PROMPT=${PROMPT//\{\{LM_ETH_BEFORE\}\}/$LM_ETH_BEFORE}
|
|
PROMPT=${PROMPT//\{\{CANDIDATE_NAME\}\}/$CANDIDATE_NAME}
|
|
PROMPT=${PROMPT//\{\{OPTIMIZER_PROFILE\}\}/$OPTIMIZER_PROFILE}
|
|
PROMPT=${PROMPT//\{\{KRK\}\}/$KRK}
|
|
PROMPT=${PROMPT//\{\{STAKE\}\}/$STAKE}
|
|
PROMPT=${PROMPT//\{\{LM\}\}/$LM}
|
|
PROMPT=${PROMPT//\{\{OPT\}\}/$OPT}
|
|
PROMPT=${PROMPT//\{\{POOL\}\}/$POOL}
|
|
PROMPT=${PROMPT//\{\{NPM\}\}/$NPM}
|
|
PROMPT=${PROMPT//\{\{WETH\}\}/$WETH}
|
|
PROMPT=${PROMPT//\{\{SWAP_ROUTER\}\}/$SWAP_ROUTER}
|
|
PROMPT=${PROMPT//\{\{ADV_ADDR\}\}/$ADV_ADDR}
|
|
PROMPT=${PROMPT//\{\{ADV_PK\}\}/$ADV_PK}
|
|
PROMPT=${PROMPT//\{\{RECENTER_ADDR\}\}/$RECENTER_ADDR}
|
|
PROMPT=${PROMPT//\{\{RECENTER_PK\}\}/$RECENTER_PK}
|
|
PROMPT=${PROMPT//\{\{POOL_FEE\}\}/$POOL_FEE}
|
|
PROMPT=${PROMPT//\{\{SOL_LM\}\}/$SOL_LM}
|
|
PROMPT=${PROMPT//\{\{SOL_THREE_POS\}\}/$SOL_THREE_POS}
|
|
PROMPT=${PROMPT//\{\{SOL_OPTIMIZER\}\}/$SOL_OPTIMIZER}
|
|
PROMPT=${PROMPT//\{\{SOL_OPTIMIZERV3\}\}/$SOL_OPTIMIZERV3}
|
|
PROMPT=${PROMPT//\{\{SOL_VWAP\}\}/$SOL_VWAP}
|
|
PROMPT=${PROMPT//\{\{SOL_PRICE_ORACLE\}\}/$SOL_PRICE_ORACLE}
|
|
PROMPT=${PROMPT//\{\{SOL_KRAIKEN\}\}/$SOL_KRAIKEN}
|
|
PROMPT=${PROMPT//\{\{SOL_STAKE\}\}/$SOL_STAKE}
|
|
PROMPT=${PROMPT//\{\{CROSS_CANDIDATE_SECTION\}\}/$CROSS_CANDIDATE_SECTION}
|
|
PROMPT=${PROMPT//\{\{MEMORY_SECTION\}\}/$MEMORY_SECTION}
|
|
|
|
# ── 7. Create output directory and run the agent ───────────────────────────────
|
|
mkdir -p "$REPORT_DIR"
|
|
mkdir -p "$(dirname "$MEMORY_FILE")"
|
|
|
|
log "Spawning Claude red-team agent (timeout: ${CLAUDE_TIMEOUT}s)..."
|
|
log " Report will be written to: $REPORT"
|
|
|
|
set +e
|
|
# Note: --verbose is required by the claude CLI when --output-format stream-json is used;
|
|
# omitting it causes the CLI to exit with an error, producing an empty stream log.
|
|
timeout "$CLAUDE_TIMEOUT" claude -p --dangerously-skip-permissions \
|
|
--verbose --output-format stream-json \
|
|
"$PROMPT" >"$STREAM_LOG" 2>&1 &
|
|
CLAUDE_PID=$!
|
|
wait "$CLAUDE_PID"
|
|
AGENT_EXIT=$?
|
|
CLAUDE_PID=""
|
|
set -e
|
|
|
|
if [[ $AGENT_EXIT -ne 0 ]]; then
|
|
log "WARNING: claude exited with code $AGENT_EXIT — see $STREAM_LOG for details"
|
|
fi
|
|
|
|
# Extract readable text from stream-json for the report
|
|
python3 - "$STREAM_LOG" >"$REPORT" <<'PYEOF'
|
|
import json, sys
|
|
with open(sys.argv[1]) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
if obj.get("type") == "assistant":
|
|
for block in obj.get("message", {}).get("content", []):
|
|
if block.get("type") == "text":
|
|
print(block["text"], end="")
|
|
except:
|
|
pass
|
|
PYEOF
|
|
|
|
# If the agent crashed and produced no readable output, treat as an infra error
|
|
# rather than silently reporting ETH SAFE (a false pass).
|
|
if [[ $AGENT_EXIT -ne 0 && ! -s "$REPORT" ]]; then
|
|
die "claude agent failed (exit $AGENT_EXIT) with no readable output — see $STREAM_LOG"
|
|
fi
|
|
|
|
# ── 8. Read lm_eth_after ────────────────────────────────────────────────────────
|
|
log "Reading floor after agent run..."
|
|
LM_ETH_AFTER=$(compute_lm_total_eth)
|
|
|
|
# ── 8a. Extract and persist strategy findings ──────────────────────────────────
|
|
log "Extracting strategy findings from agent output..."
|
|
extract_memory "$STREAM_LOG"
|
|
log " lm_eth_after = $LM_ETH_AFTER wei"
|
|
|
|
# ── 8b. Export attack sequence and replay with AttackRunner ────────────────────
|
|
# Converts the agent's cast send commands to structured JSONL and replays them
|
|
# via AttackRunner.s.sol to capture full state snapshots for optimizer training.
|
|
log "Exporting attack sequence from stream log..."
|
|
set +e
|
|
python3 "$REPO_ROOT/scripts/harb-evaluator/export-attacks.py" \
|
|
"$STREAM_LOG" "$ATTACK_EXPORT" 2>&1 | while IFS= read -r line; do log " $line"; done
|
|
EXPORT_EXIT=${PIPESTATUS[0]}
|
|
set -e
|
|
|
|
if [[ $EXPORT_EXIT -eq 0 && -f "$ATTACK_EXPORT" && -s "$ATTACK_EXPORT" ]]; then
|
|
log " Attack export: $ATTACK_EXPORT"
|
|
log " Replaying attack sequence with AttackRunner for state snapshots..."
|
|
set +e
|
|
(cd "$REPO_ROOT/onchain" && \
|
|
ATTACK_FILE="$ATTACK_EXPORT" \
|
|
DEPLOYMENTS_FILE="deployments-local.json" \
|
|
"$FORGE" script script/backtesting/AttackRunner.s.sol \
|
|
--rpc-url "$RPC_URL" --broadcast 2>&1 \
|
|
| grep '^{' >"$ATTACK_SNAPSHOTS")
|
|
REPLAY_EXIT=$?
|
|
set -e
|
|
if [[ $REPLAY_EXIT -eq 0 && -s "$ATTACK_SNAPSHOTS" ]]; then
|
|
SNAPSHOT_COUNT=$(wc -l <"$ATTACK_SNAPSHOTS")
|
|
log " AttackRunner replay complete: $SNAPSHOT_COUNT snapshots → $ATTACK_SNAPSHOTS"
|
|
else
|
|
log " WARNING: AttackRunner replay produced no snapshots (exit $REPLAY_EXIT) — non-fatal"
|
|
fi
|
|
# Revert to the clean baseline after replay so the floor check below is unaffected.
|
|
"$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true
|
|
# Re-take the snapshot so cleanup trap still has a valid ID to revert.
|
|
SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"')
|
|
else
|
|
log " WARNING: No attack operations exported from stream — skipping AttackRunner replay"
|
|
fi
|
|
|
|
# ── 9. Summarise results ───────────────────────────────────────────────────────
|
|
log ""
|
|
log "=== RED-TEAM SUMMARY ==="
|
|
log ""
|
|
log " lm_eth_before : $LM_ETH_BEFORE wei"
|
|
log " lm_eth_after : $LM_ETH_AFTER wei"
|
|
log ""
|
|
|
|
BROKE=false
|
|
if python3 -c "import sys; sys.exit(0 if int('${LM_ETH_AFTER:-0}') < int('${LM_ETH_BEFORE:-0}') else 1)"; then
|
|
BROKE=true
|
|
fi
|
|
|
|
if [[ "$BROKE" == "true" ]]; then
|
|
DELTA=$(python3 -c "print(int('${LM_ETH_BEFORE:-0}') - int('${LM_ETH_AFTER:-0}'))")
|
|
log " RESULT: ETH EXTRACTED ❌"
|
|
log " Decrease: $DELTA wei"
|
|
log ""
|
|
log " See $REPORT for the winning strategy."
|
|
log ""
|
|
# Append a machine-readable summary to the report
|
|
cat >>"$REPORT" <<SUMMARY_EOF
|
|
|
|
=== RUNNER SUMMARY ===
|
|
lm_eth_before : $LM_ETH_BEFORE
|
|
lm_eth_after : $LM_ETH_AFTER
|
|
delta : -$DELTA
|
|
verdict : ETH_EXTRACTED
|
|
SUMMARY_EOF
|
|
exit 1
|
|
else
|
|
log " RESULT: ETH SAFE ✅"
|
|
log ""
|
|
log " See $REPORT for strategies attempted."
|
|
log ""
|
|
cat >>"$REPORT" <<SUMMARY_EOF
|
|
|
|
=== RUNNER SUMMARY ===
|
|
lm_eth_before : $LM_ETH_BEFORE
|
|
lm_eth_after : $LM_ETH_AFTER
|
|
delta : 0 (or increase)
|
|
verdict : ETH_SAFE
|
|
SUMMARY_EOF
|
|
exit 0
|
|
fi
|