harb/scripts/harb-evaluator/export-attacks.py
openhands c8453f6a33 fix: Backtesting: replay red-team attack sequences against optimizer candidates (#536)
- Add AttackRunner.s.sol: structured forge script that reads attack ops from a
  JSONL file (ATTACK_FILE env), executes them against the local Anvil deployment,
  and emits full state snapshots (tick, positions, VWAP, optimizer output,
  adversary balances) as JSON lines after every recenter and at start/end.

- Add 5 canonical attack files in onchain/script/backtesting/attacks/:
  * il-crystallization-15.jsonl  — 15 buy-recenter cycles + sell (extraction)
  * il-crystallization-80.jsonl  — 80 buy-recenter cycles + sell (extraction)
  * fee-drain-oscillation.jsonl  — buy-recenter-sell-recenter oscillation
  * round-trip-safe.jsonl        — 20 full round-trips (regression: safe)
  * staking-safe.jsonl           — staking manipulation (regression: safe)

- Add scripts/harb-evaluator/export-attacks.py: parses red-team-stream.jsonl
  for tool_use Bash blocks containing cast send commands and converts them to
  AttackRunner-compatible JSONL (buy/sell/recenter/stake/unstake/mint_lp/burn_lp).

- Update scripts/harb-evaluator/red-team.sh: after each agent run, automatically
  exports the attack sequence via export-attacks.py and replays it with
  AttackRunner to capture structured snapshots in tmp/red-team-snapshots.jsonl.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-11 02:08:06 +00:00

298 lines
9.5 KiB
Python
Executable file

#!/usr/bin/env python3
"""export-attacks.py — Convert red-team stream JSONL to attack JSONL format.
Parses a red-team-stream.jsonl file (produced by red-team.sh --output-format stream-json)
for tool_use blocks containing cast send commands, extracts the operation type and
parameters, and writes them in AttackRunner-compatible JSONL format.
Usage:
python3 export-attacks.py [STREAM_FILE] [OUTPUT_FILE]
STREAM_FILE Path to red-team-stream.jsonl (default: tmp/red-team-stream.jsonl)
OUTPUT_FILE Path to write attack JSONL (default: stdout)
Supported cast send patterns:
WETH.deposit() → ignored (setup)
WETH/KRK.approve() → ignored (setup)
SwapRouter.exactInputSingle(...) → buy / sell
LM.recenter() → recenter
Stake.snatch(...) → stake
Stake.exitPosition(...) → unstake
NPM.mint(...) → mint_lp
NPM.decreaseLiquidity(...) → burn_lp (paired with collect)
evm_mine / anvil_snapshot etc. → mine (cast rpc evm_mine)
Only operations with recognisable function signatures are emitted.
Unrecognised calls are silently skipped.
"""
import json
import re
import sys
from pathlib import Path
# ── Constants (must match red-team.sh and AttackRunner.s.sol) ──────────────────
WETH_ADDR = "0x4200000000000000000000000000000000000006"
SWAP_ROUTER_ADDR = "0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4"
NPM_ADDR = "0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2"
def _normalise_addr(addr: str) -> str:
return addr.lower().strip()
def _extract_cast_commands(stream_file: str) -> list[dict]:
"""Parse stream-json and return a list of parsed cast send invocations."""
commands = []
try:
with open(stream_file) as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
# Look for tool_use blocks with bash commands.
msg = obj.get("message", {})
for block in msg.get("content", []):
if block.get("type") != "tool_use":
continue
if block.get("name") not in ("Bash", "bash"):
continue
cmd_input = block.get("input", {})
cmd = cmd_input.get("command", "")
if not cmd:
continue
parsed = _parse_cast_command(cmd)
if parsed:
commands.append(parsed)
except FileNotFoundError:
print(f"Error: stream file not found: {stream_file}", file=sys.stderr)
sys.exit(1)
return commands
def _parse_cast_command(cmd: str) -> dict | None:
"""
Parse a single shell command string and extract an attack operation dict.
Returns None if the command is not a recognised attack operation.
"""
# Normalise whitespace / line continuations.
cmd = re.sub(r"\\\n\s*", " ", cmd).strip()
# Must be a cast send (not cast call / cast rpc / etc).
if "cast rpc" in cmd and "evm_mine" in cmd:
return {"op": "mine", "blocks": 1}
if "cast send" not in cmd:
return None
# Extract destination address (first non-flag positional after "cast send").
dest_match = re.search(r"cast send\s+(?:\S+\s+)*?(0x[0-9a-fA-F]{40})", cmd)
if not dest_match:
return None
dest = _normalise_addr(dest_match.group(1))
# Extract function signature (the quoted sig after the address).
sig_match = re.search(r'"([a-zA-Z_]\w*\([^"]*\))"', cmd)
if not sig_match:
return None
sig = sig_match.group(1)
func_name = sig.split("(")[0].strip()
# Extract positional arguments (the tuple or bare args after the signature).
args_text = cmd[sig_match.end():].strip()
# ── dispatch by function ──────────────────────────────────────────────────
if func_name == "deposit":
return None # WETH setup, skip
if func_name == "approve":
return None # token approval, skip
if func_name == "exactInputSingle":
return _parse_swap(args_text)
if func_name == "recenter":
return {"op": "recenter"}
if func_name == "snatch":
return _parse_snatch(args_text)
if func_name == "exitPosition":
return _parse_exit_position(args_text)
if func_name == "mint" and _normalise_addr(dest_match.group(1)) == _normalise_addr(NPM_ADDR):
return _parse_mint_lp(args_text)
if func_name == "decreaseLiquidity":
return _parse_burn_lp(args_text)
if func_name == "collect":
return None # paired with decreaseLiquidity, handled there
return None
def _extract_tuple_args(args_text: str) -> list[str]:
"""
Extract the positional elements from a Solidity tuple literal: (a,b,c,...).
Handles nested parentheses.
"""
args_text = args_text.strip()
if args_text.startswith('"') or args_text.startswith("'"):
args_text = args_text[1:]
if args_text.startswith("("):
# Find matching closing paren.
depth = 0
for i, ch in enumerate(args_text):
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
args_text = args_text[1:i]
break
# Split on commas, respecting nested parens.
parts = []
depth = 0
current = []
for ch in args_text:
if ch == "(":
depth += 1
current.append(ch)
elif ch == ")":
depth -= 1
current.append(ch)
elif ch == "," and depth == 0:
parts.append("".join(current).strip())
current = []
else:
current.append(ch)
if current:
parts.append("".join(current).strip())
return parts
def _clean_value(v: str) -> str:
return v.strip().strip('"').strip("'")
def _parse_swap(args_text: str) -> dict | None:
"""
Parse exactInputSingle((tokenIn,tokenOut,fee,recipient,amountIn,amountOutMin,sqrtLimit)).
SwapRouter02 struct order: tokenIn, tokenOut, fee, recipient, amountIn, amountOutMinimum, sqrtPriceLimitX96
"""
parts = _extract_tuple_args(args_text)
if len(parts) < 7:
return None
token_in = _clean_value(parts[0]).lower()
token_out = _clean_value(parts[1]).lower()
amount_in = _clean_value(parts[4])
weth = _normalise_addr(WETH_ADDR)
if token_in == weth:
# WETH → KRK: buy
return {"op": "buy", "amount": amount_in, "token": "WETH"}
else:
# KRK → WETH: sell
return {"op": "sell", "amount": amount_in, "token": "KRK"}
def _parse_snatch(args_text: str) -> dict | None:
"""
Parse snatch(assets, receiver, taxRateIndex, positionsToSnatch[]).
"""
# Strip outer quotes if present, then split bare args.
parts = args_text.strip().split()
if len(parts) < 3:
return None
amount = _clean_value(parts[0])
# parts[1] is receiver address, parts[2] is taxRateIndex
try:
tax_rate_index = int(_clean_value(parts[2]))
except ValueError:
tax_rate_index = 0
return {"op": "stake", "amount": amount, "taxRateIndex": tax_rate_index}
def _parse_exit_position(args_text: str) -> dict | None:
"""Parse exitPosition(positionId)."""
parts = args_text.strip().split()
if not parts:
return None
try:
position_id = int(_clean_value(parts[0]))
except ValueError:
return None
return {"op": "unstake", "positionId": position_id}
def _parse_mint_lp(args_text: str) -> dict | None:
"""
Parse NPM.mint((token0,token1,fee,tickLower,tickUpper,amount0,amount1,min0,min1,recipient,deadline)).
"""
parts = _extract_tuple_args(args_text)
if len(parts) < 7:
return None
try:
tick_lower = int(_clean_value(parts[3]))
tick_upper = int(_clean_value(parts[4]))
amount0 = _clean_value(parts[5])
amount1 = _clean_value(parts[6])
except (ValueError, IndexError):
return None
return {
"op": "mint_lp",
"tickLower": tick_lower,
"tickUpper": tick_upper,
"amount0": amount0,
"amount1": amount1,
}
def _parse_burn_lp(args_text: str) -> dict | None:
"""
Parse NPM.decreaseLiquidity((tokenId,liquidity,min0,min1,deadline)).
"""
parts = _extract_tuple_args(args_text)
if not parts:
return None
try:
token_id = int(_clean_value(parts[0]))
except (ValueError, IndexError):
return None
return {"op": "burn_lp", "tokenId": token_id}
def main() -> None:
stream_file = sys.argv[1] if len(sys.argv) > 1 else "tmp/red-team-stream.jsonl"
output_file = sys.argv[2] if len(sys.argv) > 2 else None
ops = _extract_cast_commands(stream_file)
if not ops:
print("Warning: no attack operations found in stream file.", file=sys.stderr)
lines = [json.dumps(op, separators=(",", ":")) for op in ops]
output = "\n".join(lines)
if output:
output += "\n"
if output_file:
Path(output_file).write_text(output)
print(f"Wrote {len(ops)} operations to {output_file}", file=sys.stderr)
else:
print(output, end="")
if __name__ == "__main__":
main()