#!/usr/bin/env python3 """Final scan of small-cap Uniswap V3 pool LP distributions. Uses tickBitmap to efficiently find all initialized ticks, then reads liquidity.""" import subprocess import time import sys import math RPC = "https://ethereum-rpc.publicnode.com" CAST = "/home/debian/.foundry/bin/cast" def cast_call(addr, sig, args=None): cmd = [CAST, "call", addr, sig, "--rpc-url", RPC] if args: # Insert args before --rpc-url cmd = [CAST, "call", addr, sig] cmd.extend(args) cmd.extend(["--rpc-url", RPC]) for attempt in range(3): try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) if result.returncode == 0: return result.stdout.strip() if "429" in (result.stderr + result.stdout): time.sleep(2) except subprocess.TimeoutExpired: pass return None def cast_call_neg(addr, sig, neg_val): """Call with a negative integer arg using -- separator.""" cmd = [CAST, "call", addr, sig, "--rpc-url", RPC, "--", str(neg_val)] for attempt in range(3): try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) if result.returncode == 0: return result.stdout.strip() if "429" in (result.stderr + result.stdout): time.sleep(2) except subprocess.TimeoutExpired: pass return None def parse_int(s): if not s: return None try: return int(s.strip().split()[0]) except: return None def get_bitmap(addr, word_pos): time.sleep(0.05) if word_pos < 0: raw = cast_call_neg(addr, "tickBitmap(int16)(uint256)", word_pos) else: raw = cast_call(addr, "tickBitmap(int16)(uint256)", [str(word_pos)]) if not raw: return 0 v = parse_int(raw) return v if v else 0 def get_tick_liq(addr, tick): time.sleep(0.05) if tick < 0: raw = cast_call_neg(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", tick) else: raw = cast_call(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", [str(tick)]) if not raw: return None lines = raw.split('\n') lg = parse_int(lines[0]) if lines else 0 initialized = lines[-1].strip() == "true" if lines else False if initialized and lg and lg > 0: return lg return None def find_ticks_via_bitmap(addr, spacing, current_tick, range_ticks): min_c = (current_tick - range_ticks) // spacing max_c = (current_tick + range_ticks) // spacing # Python's integer division floors toward negative infinity, which is correct for Uniswap min_word = min_c >> 8 if min_c >= 0 else -(-min_c >> 8) - (1 if (-min_c) & 0xFF else 0) max_word = max_c >> 8 if max_c >= 0 else -(-max_c >> 8) - (1 if (-max_c) & 0xFF else 0) # Simpler: just use Python's // which floors correctly min_word = min_c // 256 max_word = max_c // 256 n_words = max_word - min_word + 1 print(f" Bitmap: words {min_word} to {max_word} ({n_words} words)", flush=True) ticks = [] for wp in range(min_word, max_word + 1): bm = get_bitmap(addr, wp) if bm == 0: continue for bit in range(256): if bm & (1 << bit): compressed = wp * 256 + bit tick = compressed * spacing if current_tick - range_ticks <= tick <= current_tick + range_ticks: ticks.append(tick) return sorted(ticks) def analyze_pool(name, addr, spacing, scan_range=100000): print(f"\n{'='*65}") print(f" {name} (spacing={spacing})") print(f"{'='*65}") # Get current tick raw = cast_call(addr, "slot0()(uint160,int24,uint16,uint16,uint16,uint8,bool)") if not raw: print(" ERROR: slot0 failed") return None current_tick = parse_int(raw.split('\n')[1]) print(f" Current tick: {current_tick}") # Find initialized ticks init_ticks = find_ticks_via_bitmap(addr, spacing, current_tick, scan_range) print(f" Found {len(init_ticks)} initialized ticks") if not init_ticks: return None # Read liquidity (limit to first 150 to avoid rate limits) if len(init_ticks) > 150: print(f" (limiting to 150 closest to current tick)") init_ticks.sort(key=lambda t: abs(t - current_tick)) init_ticks = sorted(init_ticks[:150]) print(f" Reading liquidity...", flush=True) positions = [] for tick in init_ticks: lg = get_tick_liq(addr, tick) if lg: positions.append((tick, tick - current_tick, lg)) if not positions: print(" No ticks with liquidity!") return None positions.sort(key=lambda x: x[0]) max_liq = max(p[2] for p in positions) total = sum(p[2] for p in positions) # Print distribution print(f"\n {len(positions)} ticks with liquidity:") print(f" {'Tick':>8} {'Dist':>8} {'Sp':>6} {'%Max':>5} {'%Tot':>5} Bar") for tick, dist, lg in positions: pm = lg / max_liq * 100 pt = lg / total * 100 bar = '█' * max(1, int(pm / 4)) marker = " ◄" if abs(dist) < spacing else "" print(f" {tick:>8} {dist:>+8} {dist//spacing:>+5}sp {pm:>4.0f}% {pt:>4.1f}% {bar}{marker}") # Concentration analysis bands = [5, 10, 20, 50, 100, 200] print(f"\n Concentration:") for b in bands: in_band = sum(p[2] for p in positions if abs(p[1]) <= b * spacing) pct = in_band / total * 100 if pct > 0.1: print(f" ±{b:>3} spacings (±{b*spacing:>6} ticks): {pct:>5.1f}%") weighted_dist = sum(abs(p[1]) * p[2] for p in positions) / total print(f" Weighted avg distance: {weighted_dist:.0f} ticks ({weighted_dist/spacing:.1f} spacings)") # How much liquidity is "tight" (within ±10 spacings)? tight = sum(p[2] for p in positions if abs(p[1]) <= 10 * spacing) / total * 100 wide = sum(p[2] for p in positions if abs(p[1]) > 50 * spacing) / total * 100 return { 'name': name, 'spacing': spacing, 'n_ticks': len(positions), 'weighted_dist': weighted_dist, 'tight_pct': tight, 'wide_pct': wide, 'concentration': {b: sum(p[2] for p in positions if abs(p[1]) <= b*spacing)/total*100 for b in bands} } POOLS = [ ("AZTEC/WETH 1%", "0x7f9267759b90fb79bcb33258cab821a3495793c5", 200), ("wTAO/USDC 1%", "0xf763bb342eb3d23c02ccb86312422fe0c1c17e94", 200), ("wTAO/WETH 0.3%", "0x2982d3295a0e1a99e6e88ece0e93ffdfc5c761ae", 60), ("LQTY/WETH 0.3%", "0xd1d5a4c0ea98971894772dcd6d2f1dc71083c44e", 60), ] results = [] for name, addr, spacing in POOLS: try: r = analyze_pool(name, addr, spacing) if r: results.append(r) except Exception as e: print(f" ERROR: {e}") import traceback traceback.print_exc() if results: print(f"\n{'='*70}") print(f" SUMMARY — Real Small-Cap LP Distributions (Uniswap V3 Mainnet)") print(f"{'='*70}") print(f" {'Pool':25s} {'#ticks':>6} {'±10sp':>7} {'±20sp':>7} {'±50sp':>7} {'AvgDist':>8} {'Tight':>6} {'Wide':>6}") for r in results: c = r['concentration'] wd = r['weighted_dist'] / r['spacing'] print(f" {r['name']:25s} {r['n_ticks']:>6} {c.get(10,0):>6.1f}% {c.get(20,0):>6.1f}% {c.get(50,0):>6.1f}% {wd:>6.1f}sp {r['tight_pct']:>5.1f}% {r['wide_pct']:>5.1f}%") print(f"\n KRAIKEN model comparison:") print(f" Gaussian BG LP: 5 layers at ±10/20/40/80/160 sp → avg ≈62 spacings") print(f" Equal ETH per layer means outer layers (wide) have less liquidity/tick") print(f" but still compete across more ticks") avg_tight = sum(r['tight_pct'] for r in results) / len(results) avg_wd = sum(r['weighted_dist']/r['spacing'] for r in results) / len(results) print(f"\n Real LP avg tight (±10sp): {avg_tight:.1f}%") print(f" Real LP avg weighted dist: {avg_wd:.1f} spacings") if avg_wd > 30: print(f" → Real LPs are SPREAD OUT. Our Gaussian model is REALISTIC.") print(f" → Fee competition results are ACCURATE (not conservative)") elif avg_wd < 15: print(f" → Real LPs are CONCENTRATED. Our Gaussian is TOO SPREAD.") print(f" → Real competition would be WORSE. Our fee estimates are OPTIMISTIC.") else: print(f" → Real LPs have MODERATE spread. Our Gaussian is reasonable.")