#!/usr/bin/env tsx /** * Fetches Swap/Mint/Burn events from a Uniswap V3 pool via Infura * and caches them to disk as JSON Lines. * * Usage: * npx tsx fetch-events.ts --pool 0x0d59... --days 7 * npx tsx fetch-events.ts --pool 0x0d59... --start-block 12345678 --end-block 12989078 * npx tsx fetch-events.ts --pool 0x0d59... --days 7 --output /tmp/events.jsonl */ import { createPublicClient, http, parseAbi, decodeEventLog } from "viem"; import type { Address, Hex } from "viem"; import { base } from "viem/chains"; import { appendFileSync, existsSync, readFileSync, mkdirSync } from "fs"; import { resolve } from "path"; import { fileURLToPath } from "url"; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- const DEFAULT_INFURA_URL = "https://base-mainnet.infura.io/v3/409c42ecaa4e405bb5735faac0f7aec2"; /** Base mainnet produces ~1 block every 2 seconds → ~43 200 blocks/day. */ const BASE_BLOCKS_PER_DAY = 43_200; /** Maximum blocks per eth_getLogs call. 2 000 keeps payloads manageable. */ const BATCH_SIZE = 2_000; /** Polite inter-batch delay to avoid 429s. */ const BATCH_DELAY_MS = 100; const SWAP_TOPIC = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67" as Hex; const MINT_TOPIC = "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde" as Hex; const BURN_TOPIC = "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c" as Hex; const POOL_ABI = parseAbi([ "event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)", "event Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)", "event Burn(address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)", ]); // --------------------------------------------------------------------------- // CLI parsing // --------------------------------------------------------------------------- interface Args { pool: Address; days: number; startBlock: number | null; endBlock: number | null; output: string | null; rpcUrl: string; } function parseArgs(): Args { const argv = process.argv.slice(2); function getFlag(flag: string): string | undefined { const i = argv.indexOf(flag); return i !== -1 ? argv[i + 1] : undefined; } const pool = getFlag("--pool"); if (!pool) { console.error( [ "Usage: npx tsx fetch-events.ts --pool
", " [--days ] number of past days to fetch (default: 7)", " [--start-block ] explicit start block (overrides --days)", " [--end-block ] explicit end block (default: latest)", " [--output ] output path (default: cache/{pool}-{start}-{end}.jsonl)", " [--rpc-url ] RPC endpoint (default: Infura Base mainnet)", ].join("\n") ); process.exit(1); } const startBlockRaw = getFlag("--start-block"); const endBlockRaw = getFlag("--end-block"); return { pool: pool as Address, days: parseInt(getFlag("--days") ?? "7", 10), startBlock: startBlockRaw !== undefined ? parseInt(startBlockRaw, 10) : null, endBlock: endBlockRaw !== undefined ? parseInt(endBlockRaw, 10) : null, output: getFlag("--output") ?? null, rpcUrl: getFlag("--rpc-url") ?? DEFAULT_INFURA_URL, }; } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } /** * Returns the last non-empty line of a file, or null if the file does not * exist or is empty. Reads the whole file — acceptable for cache sizes here. */ function readLastLine(filePath: string): string | null { if (!existsSync(filePath)) return null; const content = readFileSync(filePath, "utf8").trimEnd(); if (!content) return null; const lastNewline = content.lastIndexOf("\n"); return lastNewline === -1 ? content : content.slice(lastNewline + 1); } /** Recursively convert BigInt values to decimal strings for JSON serialisation. */ function serializeBigInts(value: unknown): unknown { if (typeof value === "bigint") return value.toString(); if (Array.isArray(value)) return value.map(serializeBigInts); if (value !== null && typeof value === "object") { return Object.fromEntries( Object.entries(value as Record).map(([k, v]) => [ k, serializeBigInts(v), ]) ); } return value; } function defaultCacheDir(): string { const __dirname = fileURLToPath(new URL(".", import.meta.url)); return resolve(__dirname, "cache"); } // --------------------------------------------------------------------------- // Main // --------------------------------------------------------------------------- async function main(): Promise { const args = parseArgs(); const client = createPublicClient({ chain: base, transport: http(args.rpcUrl), }); // ------------------------------------------------------------------ // Resolve block range // ------------------------------------------------------------------ const latestBlock = Number(await client.getBlockNumber()); let startBlock: number; let endBlock: number; if (args.startBlock !== null && args.endBlock !== null) { startBlock = args.startBlock; endBlock = args.endBlock; } else if (args.startBlock !== null) { startBlock = args.startBlock; endBlock = latestBlock; } else { endBlock = args.endBlock ?? latestBlock; startBlock = endBlock - args.days * BASE_BLOCKS_PER_DAY; } console.log( `Pool: ${args.pool}`, `\nRange: ${startBlock} → ${endBlock} (${endBlock - startBlock + 1} blocks)` ); // ------------------------------------------------------------------ // Determine output path and set up cache directory // ------------------------------------------------------------------ mkdirSync(defaultCacheDir(), { recursive: true }); const outputPath = args.output ?? resolve(defaultCacheDir(), `${args.pool}-${startBlock}-${endBlock}.jsonl`); // ------------------------------------------------------------------ // Resume support: read last written block from existing cache file // ------------------------------------------------------------------ let resumeFromBlock = startBlock; const lastLine = readLastLine(outputPath); if (lastLine !== null) { try { const lastEntry = JSON.parse(lastLine) as { block: number }; if (typeof lastEntry.block === "number") { resumeFromBlock = lastEntry.block + 1; console.log( `Resuming from block ${resumeFromBlock} (last cached: ${lastEntry.block})` ); } } catch { console.warn("Could not parse last cache line; starting from beginning"); } } if (resumeFromBlock > endBlock) { console.log( `Cache already complete (${endBlock - startBlock + 1} blocks). Nothing to do.` ); return; } // ------------------------------------------------------------------ // Fetch in batches // ------------------------------------------------------------------ const totalBatches = Math.ceil((endBlock - startBlock + 1) / BATCH_SIZE); // Batch index that corresponds to resumeFromBlock (1-based for display) const startBatchNum = Math.floor((resumeFromBlock - startBlock) / BATCH_SIZE) + 1; let totalEvents = 0; let batchNum = startBatchNum; for (let from = resumeFromBlock; from <= endBlock; from += BATCH_SIZE) { const to = Math.min(from + BATCH_SIZE - 1, endBlock); process.stdout.write(`Fetching blocks ${from}-${to}... `); // Retry loop with exponential back-off on 429 / rate-limit errors let logs: Awaited>; let retries = 0; while (true) { try { logs = await client.getLogs({ address: args.pool, // topic0 OR-filter: match any of the three event signatures topics: [[SWAP_TOPIC, MINT_TOPIC, BURN_TOPIC]], fromBlock: BigInt(from), toBlock: BigInt(to), }); break; } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); const isRateLimit = msg.includes("429") || msg.toLowerCase().includes("rate limit") || msg.toLowerCase().includes("too many requests"); if (isRateLimit && retries < 5) { const delay = 1_000 * 2 ** retries; process.stdout.write(`\nRate limited — retrying in ${delay} ms...\n`); await sleep(delay); retries++; } else { throw err; } } } // Decode and append each log as a JSON Line let batchEvents = 0; for (const log of logs) { if (log.blockNumber === null || log.transactionHash === null) continue; try { const decoded = decodeEventLog({ abi: POOL_ABI, data: log.data as Hex, topics: log.topics as [Hex, ...Hex[]], }); const entry: Record = { block: Number(log.blockNumber), txHash: log.transactionHash, logIndex: log.logIndex, event: decoded.eventName, ...(serializeBigInts(decoded.args) as Record), }; appendFileSync(outputPath, JSON.stringify(entry) + "\n"); batchEvents++; totalEvents++; } catch { // Skip any log we cannot decode (shouldn't happen for our known pool) } } console.log(`${batchEvents} events (${batchNum}/${totalBatches} batches)`); batchNum++; // Polite inter-batch delay (skip after last batch) if (from + BATCH_SIZE <= endBlock) { await sleep(BATCH_DELAY_MS); } } console.log(`\nDone. ${totalEvents} new events written to:\n ${outputPath}`); } main().catch((err: unknown) => { console.error(err instanceof Error ? err.message : err); process.exit(1); });