fix: Backtesting: replay red-team attack sequences against optimizer candidates (#536)
- Add AttackRunner.s.sol: structured forge script that reads attack ops from a JSONL file (ATTACK_FILE env), executes them against the local Anvil deployment, and emits full state snapshots (tick, positions, VWAP, optimizer output, adversary balances) as JSON lines after every recenter and at start/end. - Add 5 canonical attack files in onchain/script/backtesting/attacks/: * il-crystallization-15.jsonl — 15 buy-recenter cycles + sell (extraction) * il-crystallization-80.jsonl — 80 buy-recenter cycles + sell (extraction) * fee-drain-oscillation.jsonl — buy-recenter-sell-recenter oscillation * round-trip-safe.jsonl — 20 full round-trips (regression: safe) * staking-safe.jsonl — staking manipulation (regression: safe) - Add scripts/harb-evaluator/export-attacks.py: parses red-team-stream.jsonl for tool_use Bash blocks containing cast send commands and converts them to AttackRunner-compatible JSONL (buy/sell/recenter/stake/unstake/mint_lp/burn_lp). - Update scripts/harb-evaluator/red-team.sh: after each agent run, automatically exports the attack sequence via export-attacks.py and replays it with AttackRunner to capture structured snapshots in tmp/red-team-snapshots.jsonl. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
08b9a3df30
commit
c8453f6a33
8 changed files with 1261 additions and 2 deletions
298
scripts/harb-evaluator/export-attacks.py
Executable file
298
scripts/harb-evaluator/export-attacks.py
Executable file
|
|
@ -0,0 +1,298 @@
|
|||
#!/usr/bin/env python3
|
||||
"""export-attacks.py — Convert red-team stream JSONL to attack JSONL format.
|
||||
|
||||
Parses a red-team-stream.jsonl file (produced by red-team.sh --output-format stream-json)
|
||||
for tool_use blocks containing cast send commands, extracts the operation type and
|
||||
parameters, and writes them in AttackRunner-compatible JSONL format.
|
||||
|
||||
Usage:
|
||||
python3 export-attacks.py [STREAM_FILE] [OUTPUT_FILE]
|
||||
|
||||
STREAM_FILE Path to red-team-stream.jsonl (default: tmp/red-team-stream.jsonl)
|
||||
OUTPUT_FILE Path to write attack JSONL (default: stdout)
|
||||
|
||||
Supported cast send patterns:
|
||||
WETH.deposit() → ignored (setup)
|
||||
WETH/KRK.approve() → ignored (setup)
|
||||
SwapRouter.exactInputSingle(...) → buy / sell
|
||||
LM.recenter() → recenter
|
||||
Stake.snatch(...) → stake
|
||||
Stake.exitPosition(...) → unstake
|
||||
NPM.mint(...) → mint_lp
|
||||
NPM.decreaseLiquidity(...) → burn_lp (paired with collect)
|
||||
evm_mine / anvil_snapshot etc. → mine (cast rpc evm_mine)
|
||||
|
||||
Only operations with recognisable function signatures are emitted.
|
||||
Unrecognised calls are silently skipped.
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# ── Constants (must match red-team.sh and AttackRunner.s.sol) ──────────────────
|
||||
WETH_ADDR = "0x4200000000000000000000000000000000000006"
|
||||
SWAP_ROUTER_ADDR = "0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4"
|
||||
NPM_ADDR = "0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2"
|
||||
|
||||
|
||||
def _normalise_addr(addr: str) -> str:
|
||||
return addr.lower().strip()
|
||||
|
||||
|
||||
def _extract_cast_commands(stream_file: str) -> list[dict]:
|
||||
"""Parse stream-json and return a list of parsed cast send invocations."""
|
||||
commands = []
|
||||
try:
|
||||
with open(stream_file) as fh:
|
||||
for line in fh:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Look for tool_use blocks with bash commands.
|
||||
msg = obj.get("message", {})
|
||||
for block in msg.get("content", []):
|
||||
if block.get("type") != "tool_use":
|
||||
continue
|
||||
if block.get("name") not in ("Bash", "bash"):
|
||||
continue
|
||||
cmd_input = block.get("input", {})
|
||||
cmd = cmd_input.get("command", "")
|
||||
if not cmd:
|
||||
continue
|
||||
parsed = _parse_cast_command(cmd)
|
||||
if parsed:
|
||||
commands.append(parsed)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: stream file not found: {stream_file}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return commands
|
||||
|
||||
|
||||
def _parse_cast_command(cmd: str) -> dict | None:
|
||||
"""
|
||||
Parse a single shell command string and extract an attack operation dict.
|
||||
Returns None if the command is not a recognised attack operation.
|
||||
"""
|
||||
# Normalise whitespace / line continuations.
|
||||
cmd = re.sub(r"\\\n\s*", " ", cmd).strip()
|
||||
|
||||
# Must be a cast send (not cast call / cast rpc / etc).
|
||||
if "cast rpc" in cmd and "evm_mine" in cmd:
|
||||
return {"op": "mine", "blocks": 1}
|
||||
|
||||
if "cast send" not in cmd:
|
||||
return None
|
||||
|
||||
# Extract destination address (first non-flag positional after "cast send").
|
||||
dest_match = re.search(r"cast send\s+(?:\S+\s+)*?(0x[0-9a-fA-F]{40})", cmd)
|
||||
if not dest_match:
|
||||
return None
|
||||
dest = _normalise_addr(dest_match.group(1))
|
||||
|
||||
# Extract function signature (the quoted sig after the address).
|
||||
sig_match = re.search(r'"([a-zA-Z_]\w*\([^"]*\))"', cmd)
|
||||
if not sig_match:
|
||||
return None
|
||||
sig = sig_match.group(1)
|
||||
func_name = sig.split("(")[0].strip()
|
||||
|
||||
# Extract positional arguments (the tuple or bare args after the signature).
|
||||
args_text = cmd[sig_match.end():].strip()
|
||||
|
||||
# ── dispatch by function ──────────────────────────────────────────────────
|
||||
|
||||
if func_name == "deposit":
|
||||
return None # WETH setup, skip
|
||||
|
||||
if func_name == "approve":
|
||||
return None # token approval, skip
|
||||
|
||||
if func_name == "exactInputSingle":
|
||||
return _parse_swap(args_text)
|
||||
|
||||
if func_name == "recenter":
|
||||
return {"op": "recenter"}
|
||||
|
||||
if func_name == "snatch":
|
||||
return _parse_snatch(args_text)
|
||||
|
||||
if func_name == "exitPosition":
|
||||
return _parse_exit_position(args_text)
|
||||
|
||||
if func_name == "mint" and _normalise_addr(dest_match.group(1)) == _normalise_addr(NPM_ADDR):
|
||||
return _parse_mint_lp(args_text)
|
||||
|
||||
if func_name == "decreaseLiquidity":
|
||||
return _parse_burn_lp(args_text)
|
||||
|
||||
if func_name == "collect":
|
||||
return None # paired with decreaseLiquidity, handled there
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _extract_tuple_args(args_text: str) -> list[str]:
|
||||
"""
|
||||
Extract the positional elements from a Solidity tuple literal: (a,b,c,...).
|
||||
Handles nested parentheses.
|
||||
"""
|
||||
args_text = args_text.strip()
|
||||
if args_text.startswith('"') or args_text.startswith("'"):
|
||||
args_text = args_text[1:]
|
||||
if args_text.startswith("("):
|
||||
# Find matching closing paren.
|
||||
depth = 0
|
||||
for i, ch in enumerate(args_text):
|
||||
if ch == "(":
|
||||
depth += 1
|
||||
elif ch == ")":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
args_text = args_text[1:i]
|
||||
break
|
||||
|
||||
# Split on commas, respecting nested parens.
|
||||
parts = []
|
||||
depth = 0
|
||||
current = []
|
||||
for ch in args_text:
|
||||
if ch == "(":
|
||||
depth += 1
|
||||
current.append(ch)
|
||||
elif ch == ")":
|
||||
depth -= 1
|
||||
current.append(ch)
|
||||
elif ch == "," and depth == 0:
|
||||
parts.append("".join(current).strip())
|
||||
current = []
|
||||
else:
|
||||
current.append(ch)
|
||||
if current:
|
||||
parts.append("".join(current).strip())
|
||||
|
||||
return parts
|
||||
|
||||
|
||||
def _clean_value(v: str) -> str:
|
||||
return v.strip().strip('"').strip("'")
|
||||
|
||||
|
||||
def _parse_swap(args_text: str) -> dict | None:
|
||||
"""
|
||||
Parse exactInputSingle((tokenIn,tokenOut,fee,recipient,amountIn,amountOutMin,sqrtLimit)).
|
||||
SwapRouter02 struct order: tokenIn, tokenOut, fee, recipient, amountIn, amountOutMinimum, sqrtPriceLimitX96
|
||||
"""
|
||||
parts = _extract_tuple_args(args_text)
|
||||
if len(parts) < 7:
|
||||
return None
|
||||
|
||||
token_in = _clean_value(parts[0]).lower()
|
||||
token_out = _clean_value(parts[1]).lower()
|
||||
amount_in = _clean_value(parts[4])
|
||||
|
||||
weth = _normalise_addr(WETH_ADDR)
|
||||
|
||||
if token_in == weth:
|
||||
# WETH → KRK: buy
|
||||
return {"op": "buy", "amount": amount_in, "token": "WETH"}
|
||||
else:
|
||||
# KRK → WETH: sell
|
||||
return {"op": "sell", "amount": amount_in, "token": "KRK"}
|
||||
|
||||
|
||||
def _parse_snatch(args_text: str) -> dict | None:
|
||||
"""
|
||||
Parse snatch(assets, receiver, taxRateIndex, positionsToSnatch[]).
|
||||
"""
|
||||
# Strip outer quotes if present, then split bare args.
|
||||
parts = args_text.strip().split()
|
||||
if len(parts) < 3:
|
||||
return None
|
||||
amount = _clean_value(parts[0])
|
||||
# parts[1] is receiver address, parts[2] is taxRateIndex
|
||||
try:
|
||||
tax_rate_index = int(_clean_value(parts[2]))
|
||||
except ValueError:
|
||||
tax_rate_index = 0
|
||||
return {"op": "stake", "amount": amount, "taxRateIndex": tax_rate_index}
|
||||
|
||||
|
||||
def _parse_exit_position(args_text: str) -> dict | None:
|
||||
"""Parse exitPosition(positionId)."""
|
||||
parts = args_text.strip().split()
|
||||
if not parts:
|
||||
return None
|
||||
try:
|
||||
position_id = int(_clean_value(parts[0]))
|
||||
except ValueError:
|
||||
return None
|
||||
return {"op": "unstake", "positionId": position_id}
|
||||
|
||||
|
||||
def _parse_mint_lp(args_text: str) -> dict | None:
|
||||
"""
|
||||
Parse NPM.mint((token0,token1,fee,tickLower,tickUpper,amount0,amount1,min0,min1,recipient,deadline)).
|
||||
"""
|
||||
parts = _extract_tuple_args(args_text)
|
||||
if len(parts) < 7:
|
||||
return None
|
||||
try:
|
||||
tick_lower = int(_clean_value(parts[3]))
|
||||
tick_upper = int(_clean_value(parts[4]))
|
||||
amount0 = _clean_value(parts[5])
|
||||
amount1 = _clean_value(parts[6])
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
return {
|
||||
"op": "mint_lp",
|
||||
"tickLower": tick_lower,
|
||||
"tickUpper": tick_upper,
|
||||
"amount0": amount0,
|
||||
"amount1": amount1,
|
||||
}
|
||||
|
||||
|
||||
def _parse_burn_lp(args_text: str) -> dict | None:
|
||||
"""
|
||||
Parse NPM.decreaseLiquidity((tokenId,liquidity,min0,min1,deadline)).
|
||||
"""
|
||||
parts = _extract_tuple_args(args_text)
|
||||
if not parts:
|
||||
return None
|
||||
try:
|
||||
token_id = int(_clean_value(parts[0]))
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
return {"op": "burn_lp", "tokenId": token_id}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
stream_file = sys.argv[1] if len(sys.argv) > 1 else "tmp/red-team-stream.jsonl"
|
||||
output_file = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
|
||||
ops = _extract_cast_commands(stream_file)
|
||||
|
||||
if not ops:
|
||||
print("Warning: no attack operations found in stream file.", file=sys.stderr)
|
||||
|
||||
lines = [json.dumps(op, separators=(",", ":")) for op in ops]
|
||||
output = "\n".join(lines)
|
||||
if output:
|
||||
output += "\n"
|
||||
|
||||
if output_file:
|
||||
Path(output_file).write_text(output)
|
||||
print(f"Wrote {len(ops)} operations to {output_file}", file=sys.stderr)
|
||||
else:
|
||||
print(output, end="")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -18,13 +18,16 @@
|
|||
set -euo pipefail
|
||||
|
||||
CAST=/home/debian/.foundry/bin/cast
|
||||
FORGE=/home/debian/.foundry/bin/forge
|
||||
RPC_URL="${RPC_URL:-http://localhost:8545}"
|
||||
CLAUDE_TIMEOUT="${CLAUDE_TIMEOUT:-7200}"
|
||||
REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)"
|
||||
REPORT_DIR="$REPO_ROOT/tmp"
|
||||
REPORT="$REPORT_DIR/red-team-report.txt"
|
||||
STREAM_LOG="$REPORT_DIR/red-team-stream.jsonl"
|
||||
MEMORY_FILE="$REPORT_DIR/red-team-memory.jsonl"
|
||||
MEMORY_FILE="$REPO_ROOT/tmp/red-team-memory.jsonl"
|
||||
ATTACK_EXPORT="$REPORT_DIR/red-team-attacks.jsonl"
|
||||
ATTACK_SNAPSHOTS="$REPORT_DIR/red-team-snapshots.jsonl"
|
||||
DEPLOYMENTS="$REPO_ROOT/onchain/deployments-local.json"
|
||||
|
||||
# ── Anvil accounts ─────────────────────────────────────────────────────────────
|
||||
|
|
@ -45,7 +48,8 @@ log() { echo "[red-team] $*"; }
|
|||
die() { echo "[red-team] ERROR: $*" >&2; exit 2; }
|
||||
|
||||
# ── Prerequisites ──────────────────────────────────────────────────────────────
|
||||
command -v "$CAST" &>/dev/null || die "cast not found at $CAST"
|
||||
command -v "$CAST" &>/dev/null || die "cast not found at $CAST"
|
||||
command -v "$FORGE" &>/dev/null || die "forge not found at $FORGE"
|
||||
command -v claude &>/dev/null || die "claude CLI not found (install: npm i -g @anthropic-ai/claude-code)"
|
||||
command -v python3 &>/dev/null || die "python3 not found"
|
||||
command -v jq &>/dev/null || die "jq not found"
|
||||
|
|
@ -615,6 +619,42 @@ log "Extracting strategy findings from agent output..."
|
|||
extract_memory "$STREAM_LOG"
|
||||
log " floor_after = $FLOOR_AFTER wei/token"
|
||||
|
||||
# ── 8b. Export attack sequence and replay with AttackRunner ────────────────────
|
||||
# Converts the agent's cast send commands to structured JSONL and replays them
|
||||
# via AttackRunner.s.sol to capture full state snapshots for optimizer training.
|
||||
log "Exporting attack sequence from stream log..."
|
||||
set +e
|
||||
python3 "$REPO_ROOT/scripts/harb-evaluator/export-attacks.py" \
|
||||
"$STREAM_LOG" "$ATTACK_EXPORT" 2>&1 | while IFS= read -r line; do log " $line"; done
|
||||
EXPORT_EXIT=${PIPESTATUS[0]}
|
||||
set -e
|
||||
|
||||
if [[ $EXPORT_EXIT -eq 0 && -f "$ATTACK_EXPORT" && -s "$ATTACK_EXPORT" ]]; then
|
||||
log " Attack export: $ATTACK_EXPORT"
|
||||
log " Replaying attack sequence with AttackRunner for state snapshots..."
|
||||
set +e
|
||||
(cd "$REPO_ROOT/onchain" && \
|
||||
ATTACK_FILE="$ATTACK_EXPORT" \
|
||||
DEPLOYMENTS_FILE="deployments-local.json" \
|
||||
"$FORGE" script script/backtesting/AttackRunner.s.sol \
|
||||
--rpc-url "$RPC_URL" --broadcast 2>&1 \
|
||||
| grep '^{' >"$ATTACK_SNAPSHOTS")
|
||||
REPLAY_EXIT=$?
|
||||
set -e
|
||||
if [[ $REPLAY_EXIT -eq 0 && -s "$ATTACK_SNAPSHOTS" ]]; then
|
||||
SNAPSHOT_COUNT=$(wc -l <"$ATTACK_SNAPSHOTS")
|
||||
log " AttackRunner replay complete: $SNAPSHOT_COUNT snapshots → $ATTACK_SNAPSHOTS"
|
||||
else
|
||||
log " WARNING: AttackRunner replay produced no snapshots (exit $REPLAY_EXIT) — non-fatal"
|
||||
fi
|
||||
# Revert to the clean baseline after replay so the floor check below is unaffected.
|
||||
"$CAST" rpc anvil_revert "$SNAP" --rpc-url "$RPC_URL" >/dev/null 2>&1 || true
|
||||
# Re-take the snapshot so cleanup trap still has a valid ID to revert.
|
||||
SNAP=$("$CAST" rpc anvil_snapshot --rpc-url "$RPC_URL" | tr -d '"')
|
||||
else
|
||||
log " WARNING: No attack operations exported from stream — skipping AttackRunner replay"
|
||||
fi
|
||||
|
||||
# ── 9. Summarise results ───────────────────────────────────────────────────────
|
||||
log ""
|
||||
log "=== RED-TEAM SUMMARY ==="
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue