299 lines
9.5 KiB
Python
299 lines
9.5 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""export-attacks.py — Convert red-team stream JSONL to attack JSONL format.
|
||
|
|
|
||
|
|
Parses a red-team-stream.jsonl file (produced by red-team.sh --output-format stream-json)
|
||
|
|
for tool_use blocks containing cast send commands, extracts the operation type and
|
||
|
|
parameters, and writes them in AttackRunner-compatible JSONL format.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python3 export-attacks.py [STREAM_FILE] [OUTPUT_FILE]
|
||
|
|
|
||
|
|
STREAM_FILE Path to red-team-stream.jsonl (default: tmp/red-team-stream.jsonl)
|
||
|
|
OUTPUT_FILE Path to write attack JSONL (default: stdout)
|
||
|
|
|
||
|
|
Supported cast send patterns:
|
||
|
|
WETH.deposit() → ignored (setup)
|
||
|
|
WETH/KRK.approve() → ignored (setup)
|
||
|
|
SwapRouter.exactInputSingle(...) → buy / sell
|
||
|
|
LM.recenter() → recenter
|
||
|
|
Stake.snatch(...) → stake
|
||
|
|
Stake.exitPosition(...) → unstake
|
||
|
|
NPM.mint(...) → mint_lp
|
||
|
|
NPM.decreaseLiquidity(...) → burn_lp (paired with collect)
|
||
|
|
evm_mine / anvil_snapshot etc. → mine (cast rpc evm_mine)
|
||
|
|
|
||
|
|
Only operations with recognisable function signatures are emitted.
|
||
|
|
Unrecognised calls are silently skipped.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
# ── Constants (must match red-team.sh and AttackRunner.s.sol) ──────────────────
|
||
|
|
WETH_ADDR = "0x4200000000000000000000000000000000000006"
|
||
|
|
SWAP_ROUTER_ADDR = "0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4"
|
||
|
|
NPM_ADDR = "0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2"
|
||
|
|
|
||
|
|
|
||
|
|
def _normalise_addr(addr: str) -> str:
|
||
|
|
return addr.lower().strip()
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_cast_commands(stream_file: str) -> list[dict]:
|
||
|
|
"""Parse stream-json and return a list of parsed cast send invocations."""
|
||
|
|
commands = []
|
||
|
|
try:
|
||
|
|
with open(stream_file) as fh:
|
||
|
|
for line in fh:
|
||
|
|
line = line.strip()
|
||
|
|
if not line:
|
||
|
|
continue
|
||
|
|
try:
|
||
|
|
obj = json.loads(line)
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Look for tool_use blocks with bash commands.
|
||
|
|
msg = obj.get("message", {})
|
||
|
|
for block in msg.get("content", []):
|
||
|
|
if block.get("type") != "tool_use":
|
||
|
|
continue
|
||
|
|
if block.get("name") not in ("Bash", "bash"):
|
||
|
|
continue
|
||
|
|
cmd_input = block.get("input", {})
|
||
|
|
cmd = cmd_input.get("command", "")
|
||
|
|
if not cmd:
|
||
|
|
continue
|
||
|
|
parsed = _parse_cast_command(cmd)
|
||
|
|
if parsed:
|
||
|
|
commands.append(parsed)
|
||
|
|
except FileNotFoundError:
|
||
|
|
print(f"Error: stream file not found: {stream_file}", file=sys.stderr)
|
||
|
|
sys.exit(1)
|
||
|
|
return commands
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_cast_command(cmd: str) -> dict | None:
|
||
|
|
"""
|
||
|
|
Parse a single shell command string and extract an attack operation dict.
|
||
|
|
Returns None if the command is not a recognised attack operation.
|
||
|
|
"""
|
||
|
|
# Normalise whitespace / line continuations.
|
||
|
|
cmd = re.sub(r"\\\n\s*", " ", cmd).strip()
|
||
|
|
|
||
|
|
# Must be a cast send (not cast call / cast rpc / etc).
|
||
|
|
if "cast rpc" in cmd and "evm_mine" in cmd:
|
||
|
|
return {"op": "mine", "blocks": 1}
|
||
|
|
|
||
|
|
if "cast send" not in cmd:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Extract destination address (first non-flag positional after "cast send").
|
||
|
|
dest_match = re.search(r"cast send\s+(?:\S+\s+)*?(0x[0-9a-fA-F]{40})", cmd)
|
||
|
|
if not dest_match:
|
||
|
|
return None
|
||
|
|
dest = _normalise_addr(dest_match.group(1))
|
||
|
|
|
||
|
|
# Extract function signature (the quoted sig after the address).
|
||
|
|
sig_match = re.search(r'"([a-zA-Z_]\w*\([^"]*\))"', cmd)
|
||
|
|
if not sig_match:
|
||
|
|
return None
|
||
|
|
sig = sig_match.group(1)
|
||
|
|
func_name = sig.split("(")[0].strip()
|
||
|
|
|
||
|
|
# Extract positional arguments (the tuple or bare args after the signature).
|
||
|
|
args_text = cmd[sig_match.end():].strip()
|
||
|
|
|
||
|
|
# ── dispatch by function ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
if func_name == "deposit":
|
||
|
|
return None # WETH setup, skip
|
||
|
|
|
||
|
|
if func_name == "approve":
|
||
|
|
return None # token approval, skip
|
||
|
|
|
||
|
|
if func_name == "exactInputSingle":
|
||
|
|
return _parse_swap(args_text)
|
||
|
|
|
||
|
|
if func_name == "recenter":
|
||
|
|
return {"op": "recenter"}
|
||
|
|
|
||
|
|
if func_name == "snatch":
|
||
|
|
return _parse_snatch(args_text)
|
||
|
|
|
||
|
|
if func_name == "exitPosition":
|
||
|
|
return _parse_exit_position(args_text)
|
||
|
|
|
||
|
|
if func_name == "mint" and _normalise_addr(dest_match.group(1)) == _normalise_addr(NPM_ADDR):
|
||
|
|
return _parse_mint_lp(args_text)
|
||
|
|
|
||
|
|
if func_name == "decreaseLiquidity":
|
||
|
|
return _parse_burn_lp(args_text)
|
||
|
|
|
||
|
|
if func_name == "collect":
|
||
|
|
return None # paired with decreaseLiquidity, handled there
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_tuple_args(args_text: str) -> list[str]:
|
||
|
|
"""
|
||
|
|
Extract the positional elements from a Solidity tuple literal: (a,b,c,...).
|
||
|
|
Handles nested parentheses.
|
||
|
|
"""
|
||
|
|
args_text = args_text.strip()
|
||
|
|
if args_text.startswith('"') or args_text.startswith("'"):
|
||
|
|
args_text = args_text[1:]
|
||
|
|
if args_text.startswith("("):
|
||
|
|
# Find matching closing paren.
|
||
|
|
depth = 0
|
||
|
|
for i, ch in enumerate(args_text):
|
||
|
|
if ch == "(":
|
||
|
|
depth += 1
|
||
|
|
elif ch == ")":
|
||
|
|
depth -= 1
|
||
|
|
if depth == 0:
|
||
|
|
args_text = args_text[1:i]
|
||
|
|
break
|
||
|
|
|
||
|
|
# Split on commas, respecting nested parens.
|
||
|
|
parts = []
|
||
|
|
depth = 0
|
||
|
|
current = []
|
||
|
|
for ch in args_text:
|
||
|
|
if ch == "(":
|
||
|
|
depth += 1
|
||
|
|
current.append(ch)
|
||
|
|
elif ch == ")":
|
||
|
|
depth -= 1
|
||
|
|
current.append(ch)
|
||
|
|
elif ch == "," and depth == 0:
|
||
|
|
parts.append("".join(current).strip())
|
||
|
|
current = []
|
||
|
|
else:
|
||
|
|
current.append(ch)
|
||
|
|
if current:
|
||
|
|
parts.append("".join(current).strip())
|
||
|
|
|
||
|
|
return parts
|
||
|
|
|
||
|
|
|
||
|
|
def _clean_value(v: str) -> str:
|
||
|
|
return v.strip().strip('"').strip("'")
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_swap(args_text: str) -> dict | None:
|
||
|
|
"""
|
||
|
|
Parse exactInputSingle((tokenIn,tokenOut,fee,recipient,amountIn,amountOutMin,sqrtLimit)).
|
||
|
|
SwapRouter02 struct order: tokenIn, tokenOut, fee, recipient, amountIn, amountOutMinimum, sqrtPriceLimitX96
|
||
|
|
"""
|
||
|
|
parts = _extract_tuple_args(args_text)
|
||
|
|
if len(parts) < 7:
|
||
|
|
return None
|
||
|
|
|
||
|
|
token_in = _clean_value(parts[0]).lower()
|
||
|
|
token_out = _clean_value(parts[1]).lower()
|
||
|
|
amount_in = _clean_value(parts[4])
|
||
|
|
|
||
|
|
weth = _normalise_addr(WETH_ADDR)
|
||
|
|
|
||
|
|
if token_in == weth:
|
||
|
|
# WETH → KRK: buy
|
||
|
|
return {"op": "buy", "amount": amount_in, "token": "WETH"}
|
||
|
|
else:
|
||
|
|
# KRK → WETH: sell
|
||
|
|
return {"op": "sell", "amount": amount_in, "token": "KRK"}
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_snatch(args_text: str) -> dict | None:
|
||
|
|
"""
|
||
|
|
Parse snatch(assets, receiver, taxRateIndex, positionsToSnatch[]).
|
||
|
|
"""
|
||
|
|
# Strip outer quotes if present, then split bare args.
|
||
|
|
parts = args_text.strip().split()
|
||
|
|
if len(parts) < 3:
|
||
|
|
return None
|
||
|
|
amount = _clean_value(parts[0])
|
||
|
|
# parts[1] is receiver address, parts[2] is taxRateIndex
|
||
|
|
try:
|
||
|
|
tax_rate_index = int(_clean_value(parts[2]))
|
||
|
|
except ValueError:
|
||
|
|
tax_rate_index = 0
|
||
|
|
return {"op": "stake", "amount": amount, "taxRateIndex": tax_rate_index}
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_exit_position(args_text: str) -> dict | None:
|
||
|
|
"""Parse exitPosition(positionId)."""
|
||
|
|
parts = args_text.strip().split()
|
||
|
|
if not parts:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
position_id = int(_clean_value(parts[0]))
|
||
|
|
except ValueError:
|
||
|
|
return None
|
||
|
|
return {"op": "unstake", "positionId": position_id}
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_mint_lp(args_text: str) -> dict | None:
|
||
|
|
"""
|
||
|
|
Parse NPM.mint((token0,token1,fee,tickLower,tickUpper,amount0,amount1,min0,min1,recipient,deadline)).
|
||
|
|
"""
|
||
|
|
parts = _extract_tuple_args(args_text)
|
||
|
|
if len(parts) < 7:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
tick_lower = int(_clean_value(parts[3]))
|
||
|
|
tick_upper = int(_clean_value(parts[4]))
|
||
|
|
amount0 = _clean_value(parts[5])
|
||
|
|
amount1 = _clean_value(parts[6])
|
||
|
|
except (ValueError, IndexError):
|
||
|
|
return None
|
||
|
|
return {
|
||
|
|
"op": "mint_lp",
|
||
|
|
"tickLower": tick_lower,
|
||
|
|
"tickUpper": tick_upper,
|
||
|
|
"amount0": amount0,
|
||
|
|
"amount1": amount1,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_burn_lp(args_text: str) -> dict | None:
|
||
|
|
"""
|
||
|
|
Parse NPM.decreaseLiquidity((tokenId,liquidity,min0,min1,deadline)).
|
||
|
|
"""
|
||
|
|
parts = _extract_tuple_args(args_text)
|
||
|
|
if not parts:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
token_id = int(_clean_value(parts[0]))
|
||
|
|
except (ValueError, IndexError):
|
||
|
|
return None
|
||
|
|
return {"op": "burn_lp", "tokenId": token_id}
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
stream_file = sys.argv[1] if len(sys.argv) > 1 else "tmp/red-team-stream.jsonl"
|
||
|
|
output_file = sys.argv[2] if len(sys.argv) > 2 else None
|
||
|
|
|
||
|
|
ops = _extract_cast_commands(stream_file)
|
||
|
|
|
||
|
|
if not ops:
|
||
|
|
print("Warning: no attack operations found in stream file.", file=sys.stderr)
|
||
|
|
|
||
|
|
lines = [json.dumps(op, separators=(",", ":")) for op in ops]
|
||
|
|
output = "\n".join(lines)
|
||
|
|
if output:
|
||
|
|
output += "\n"
|
||
|
|
|
||
|
|
if output_file:
|
||
|
|
Path(output_file).write_text(output)
|
||
|
|
print(f"Wrote {len(ops)} operations to {output_file}", file=sys.stderr)
|
||
|
|
else:
|
||
|
|
print(output, end="")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|