#!/usr/bin/env python3 """export-attacks.py — Convert red-team stream JSONL to attack JSONL format. Parses a red-team-stream.jsonl file (produced by red-team.sh --output-format stream-json) for tool_use blocks containing cast send commands, extracts the operation type and parameters, and writes them in AttackRunner-compatible JSONL format. Usage: python3 export-attacks.py [STREAM_FILE] [OUTPUT_FILE] STREAM_FILE Path to red-team-stream.jsonl (default: tmp/red-team-stream.jsonl) OUTPUT_FILE Path to write attack JSONL (default: stdout) Supported cast send patterns: WETH.deposit() → ignored (setup) WETH/KRK.approve() → ignored (setup) SwapRouter.exactInputSingle(...) → buy / sell LM.recenter() → recenter Stake.snatch(...) → stake Stake.exitPosition(...) → unstake NPM.mint(...) → mint_lp NPM.decreaseLiquidity(...) → burn_lp (paired with collect) evm_mine / anvil_snapshot etc. → mine (cast rpc evm_mine) Only operations with recognisable function signatures are emitted. Unrecognised calls are silently skipped. """ import json import re import sys from pathlib import Path # ── Constants (must match red-team.sh and AttackRunner.s.sol) ────────────────── WETH_ADDR = "0x4200000000000000000000000000000000000006" # Base Sepolia SwapRouter02 — https://sepolia.basescan.org/address/0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4 SWAP_ROUTER_ADDR = "0x94cC0AaC535CCDB3C01d6787D6413C739ae12bc4" # Base Sepolia NonfungiblePositionManager — https://sepolia.basescan.org/address/0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2 NPM_ADDR = "0x27F971cb582BF9E50F397e4d29a5C7A34f11faA2" def _normalise_addr(addr: str) -> str: return addr.lower().strip() def _extract_cast_commands(stream_file: str) -> list[dict]: """Parse stream-json and return a list of parsed cast send invocations.""" commands = [] try: with open(stream_file) as fh: for line in fh: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue # Look for tool_use blocks with bash commands. msg = obj.get("message", {}) for block in msg.get("content", []): if block.get("type") != "tool_use": continue if block.get("name") not in ("Bash", "bash"): continue cmd_input = block.get("input", {}) cmd = cmd_input.get("command", "") if not cmd: continue parsed = _parse_cast_command(cmd) if parsed: commands.append(parsed) except FileNotFoundError: print(f"Error: stream file not found: {stream_file}", file=sys.stderr) sys.exit(1) return commands def _parse_cast_command(cmd: str) -> dict | None: """ Parse a single shell command string and extract an attack operation dict. Returns None if the command is not a recognised attack operation. """ # Normalise whitespace / line continuations. cmd = re.sub(r"\\\n\s*", " ", cmd).strip() # Must be a cast send (not cast call / cast rpc / etc). if "cast rpc" in cmd and "evm_mine" in cmd: return {"op": "mine", "blocks": 1} if "cast send" not in cmd: return None # Extract destination address (first non-flag positional after "cast send"). dest_match = re.search(r"cast send\s+(?:\S+\s+)*?(0x[0-9a-fA-F]{40})", cmd) if not dest_match: return None dest = _normalise_addr(dest_match.group(1)) # Extract function signature (the quoted sig after the address). sig_match = re.search(r'"([a-zA-Z_]\w*\([^"]*\))"', cmd) if not sig_match: return None sig = sig_match.group(1) func_name = sig.split("(")[0].strip() # Extract positional arguments (the tuple or bare args after the signature). args_text = cmd[sig_match.end():].strip() # ── dispatch by function ────────────────────────────────────────────────── if func_name == "deposit": return None # WETH setup, skip if func_name == "approve": return None # token approval, skip if func_name == "exactInputSingle": return _parse_swap(args_text) if func_name == "recenter": return {"op": "recenter"} if func_name == "snatch": return _parse_snatch(args_text) if func_name == "exitPosition": return _parse_exit_position(args_text) if func_name == "mint" and _normalise_addr(dest_match.group(1)) == _normalise_addr(NPM_ADDR): return _parse_mint_lp(args_text) if func_name == "decreaseLiquidity": return _parse_burn_lp(args_text) if func_name == "collect": return None # paired with decreaseLiquidity, handled there return None def _extract_tuple_args(args_text: str) -> list[str]: """ Extract the positional elements from a Solidity tuple literal: (a,b,c,...). Handles nested parentheses. """ args_text = args_text.strip() if args_text.startswith('"') or args_text.startswith("'"): args_text = args_text[1:] if args_text.startswith("("): # Find matching closing paren. depth = 0 for i, ch in enumerate(args_text): if ch == "(": depth += 1 elif ch == ")": depth -= 1 if depth == 0: args_text = args_text[1:i] break # Split on commas, respecting nested parens. parts = [] depth = 0 current = [] for ch in args_text: if ch == "(": depth += 1 current.append(ch) elif ch == ")": depth -= 1 current.append(ch) elif ch == "," and depth == 0: parts.append("".join(current).strip()) current = [] else: current.append(ch) if current: parts.append("".join(current).strip()) return parts def _clean_value(v: str) -> str: return v.strip().strip('"').strip("'") def _parse_swap(args_text: str) -> dict | None: """ Parse exactInputSingle((tokenIn,tokenOut,fee,recipient,amountIn,amountOutMin,sqrtLimit)). SwapRouter02 struct order: tokenIn, tokenOut, fee, recipient, amountIn, amountOutMinimum, sqrtPriceLimitX96 """ parts = _extract_tuple_args(args_text) if len(parts) < 7: return None token_in = _clean_value(parts[0]).lower() token_out = _clean_value(parts[1]).lower() amount_in = _clean_value(parts[4]) weth = _normalise_addr(WETH_ADDR) if token_in == weth: # WETH → KRK: buy return {"op": "buy", "amount": amount_in, "token": "WETH"} else: # KRK → WETH: sell return {"op": "sell", "amount": amount_in, "token": "KRK"} def _parse_snatch(args_text: str) -> dict | None: """ Parse snatch(assets, receiver, taxRateIndex, positionsToSnatch[]). """ # Strip outer quotes if present, then split bare args. parts = args_text.strip().split() if len(parts) < 3: return None amount = _clean_value(parts[0]) # parts[1] is receiver address, parts[2] is taxRateIndex try: tax_rate_index = int(_clean_value(parts[2])) except ValueError: tax_rate_index = 0 return {"op": "stake", "amount": amount, "taxRateIndex": tax_rate_index} def _parse_exit_position(args_text: str) -> dict | None: """Parse exitPosition(positionId).""" parts = args_text.strip().split() if not parts: return None try: position_id = int(_clean_value(parts[0])) except ValueError: return None return {"op": "unstake", "positionId": position_id} def _parse_mint_lp(args_text: str) -> dict | None: """ Parse NPM.mint((token0,token1,fee,tickLower,tickUpper,amount0,amount1,min0,min1,recipient,deadline)). """ parts = _extract_tuple_args(args_text) if len(parts) < 7: return None try: tick_lower = int(_clean_value(parts[3])) tick_upper = int(_clean_value(parts[4])) amount0 = _clean_value(parts[5]) amount1 = _clean_value(parts[6]) except (ValueError, IndexError): return None return { "op": "mint_lp", "tickLower": tick_lower, "tickUpper": tick_upper, "amount0": amount0, "amount1": amount1, } def _parse_burn_lp(args_text: str) -> dict | None: """ Parse NPM.decreaseLiquidity((tokenId,liquidity,min0,min1,deadline)). """ parts = _extract_tuple_args(args_text) if not parts: return None try: token_id = int(_clean_value(parts[0])) except (ValueError, IndexError): return None return {"op": "burn_lp", "tokenId": token_id} def main() -> None: stream_file = sys.argv[1] if len(sys.argv) > 1 else "tmp/red-team-stream.jsonl" output_file = sys.argv[2] if len(sys.argv) > 2 else None ops = _extract_cast_commands(stream_file) if not ops: print("Warning: no attack operations found in stream file.", file=sys.stderr) lines = [json.dumps(op, separators=(",", ":")) for op in ops] output = "\n".join(lines) if output: output += "\n" if output_file: Path(output_file).write_text(output) print(f"Wrote {len(ops)} operations to {output_file}", file=sys.stderr) else: print(output, end="") if __name__ == "__main__": main()