harb/onchain/analysis/scan-final.py
openhands b7260b2eaf chore: analysis tooling, research artifacts, and code quality
- Analysis: parameter sweep scripts, adversarial testing, 2D frontier maps
- Research: KRAIKEN_RESEARCH_REPORT, SECURITY_REVIEW, STORAGE_LAYOUT
- FuzzingBase: consolidated fuzzing helper, BackgroundLP simulation
- Sweep results: CSV data for full 4D sweep (1050 combos), bull-bear,
  AS sweep, VWAP fix validation
- Code quality: .gitignore for fuzz CSVs, gas snapshot, updated docs
- Remove dead analysis helpers (CSVHelper, CSVManager, ScenarioRecorder)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 18:22:03 +00:00

227 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""Final scan of small-cap Uniswap V3 pool LP distributions.
Uses tickBitmap to efficiently find all initialized ticks, then reads liquidity."""
import subprocess
import time
import sys
import math
RPC = "https://ethereum-rpc.publicnode.com"
CAST = "/home/debian/.foundry/bin/cast"
def cast_call(addr, sig, args=None):
cmd = [CAST, "call", addr, sig, "--rpc-url", RPC]
if args:
# Insert args before --rpc-url
cmd = [CAST, "call", addr, sig]
cmd.extend(args)
cmd.extend(["--rpc-url", RPC])
for attempt in range(3):
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0:
return result.stdout.strip()
if "429" in (result.stderr + result.stdout):
time.sleep(2)
except subprocess.TimeoutExpired:
pass
return None
def cast_call_neg(addr, sig, neg_val):
"""Call with a negative integer arg using -- separator."""
cmd = [CAST, "call", addr, sig, "--rpc-url", RPC, "--", str(neg_val)]
for attempt in range(3):
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0:
return result.stdout.strip()
if "429" in (result.stderr + result.stdout):
time.sleep(2)
except subprocess.TimeoutExpired:
pass
return None
def parse_int(s):
if not s:
return None
try:
return int(s.strip().split()[0])
except:
return None
def get_bitmap(addr, word_pos):
time.sleep(0.05)
if word_pos < 0:
raw = cast_call_neg(addr, "tickBitmap(int16)(uint256)", word_pos)
else:
raw = cast_call(addr, "tickBitmap(int16)(uint256)", [str(word_pos)])
if not raw:
return 0
v = parse_int(raw)
return v if v else 0
def get_tick_liq(addr, tick):
time.sleep(0.05)
if tick < 0:
raw = cast_call_neg(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", tick)
else:
raw = cast_call(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", [str(tick)])
if not raw:
return None
lines = raw.split('\n')
lg = parse_int(lines[0]) if lines else 0
initialized = lines[-1].strip() == "true" if lines else False
if initialized and lg and lg > 0:
return lg
return None
def find_ticks_via_bitmap(addr, spacing, current_tick, range_ticks):
min_c = (current_tick - range_ticks) // spacing
max_c = (current_tick + range_ticks) // spacing
# Python's integer division floors toward negative infinity, which is correct for Uniswap
min_word = min_c >> 8 if min_c >= 0 else -(-min_c >> 8) - (1 if (-min_c) & 0xFF else 0)
max_word = max_c >> 8 if max_c >= 0 else -(-max_c >> 8) - (1 if (-max_c) & 0xFF else 0)
# Simpler: just use Python's // which floors correctly
min_word = min_c // 256
max_word = max_c // 256
n_words = max_word - min_word + 1
print(f" Bitmap: words {min_word} to {max_word} ({n_words} words)", flush=True)
ticks = []
for wp in range(min_word, max_word + 1):
bm = get_bitmap(addr, wp)
if bm == 0:
continue
for bit in range(256):
if bm & (1 << bit):
compressed = wp * 256 + bit
tick = compressed * spacing
if current_tick - range_ticks <= tick <= current_tick + range_ticks:
ticks.append(tick)
return sorted(ticks)
def analyze_pool(name, addr, spacing, scan_range=100000):
print(f"\n{'='*65}")
print(f" {name} (spacing={spacing})")
print(f"{'='*65}")
# Get current tick
raw = cast_call(addr, "slot0()(uint160,int24,uint16,uint16,uint16,uint8,bool)")
if not raw:
print(" ERROR: slot0 failed")
return None
current_tick = parse_int(raw.split('\n')[1])
print(f" Current tick: {current_tick}")
# Find initialized ticks
init_ticks = find_ticks_via_bitmap(addr, spacing, current_tick, scan_range)
print(f" Found {len(init_ticks)} initialized ticks")
if not init_ticks:
return None
# Read liquidity (limit to first 150 to avoid rate limits)
if len(init_ticks) > 150:
print(f" (limiting to 150 closest to current tick)")
init_ticks.sort(key=lambda t: abs(t - current_tick))
init_ticks = sorted(init_ticks[:150])
print(f" Reading liquidity...", flush=True)
positions = []
for tick in init_ticks:
lg = get_tick_liq(addr, tick)
if lg:
positions.append((tick, tick - current_tick, lg))
if not positions:
print(" No ticks with liquidity!")
return None
positions.sort(key=lambda x: x[0])
max_liq = max(p[2] for p in positions)
total = sum(p[2] for p in positions)
# Print distribution
print(f"\n {len(positions)} ticks with liquidity:")
print(f" {'Tick':>8} {'Dist':>8} {'Sp':>6} {'%Max':>5} {'%Tot':>5} Bar")
for tick, dist, lg in positions:
pm = lg / max_liq * 100
pt = lg / total * 100
bar = '' * max(1, int(pm / 4))
marker = "" if abs(dist) < spacing else ""
print(f" {tick:>8} {dist:>+8} {dist//spacing:>+5}sp {pm:>4.0f}% {pt:>4.1f}% {bar}{marker}")
# Concentration analysis
bands = [5, 10, 20, 50, 100, 200]
print(f"\n Concentration:")
for b in bands:
in_band = sum(p[2] for p in positions if abs(p[1]) <= b * spacing)
pct = in_band / total * 100
if pct > 0.1:
print(f" ±{b:>3} spacings (±{b*spacing:>6} ticks): {pct:>5.1f}%")
weighted_dist = sum(abs(p[1]) * p[2] for p in positions) / total
print(f" Weighted avg distance: {weighted_dist:.0f} ticks ({weighted_dist/spacing:.1f} spacings)")
# How much liquidity is "tight" (within ±10 spacings)?
tight = sum(p[2] for p in positions if abs(p[1]) <= 10 * spacing) / total * 100
wide = sum(p[2] for p in positions if abs(p[1]) > 50 * spacing) / total * 100
return {
'name': name, 'spacing': spacing,
'n_ticks': len(positions),
'weighted_dist': weighted_dist,
'tight_pct': tight,
'wide_pct': wide,
'concentration': {b: sum(p[2] for p in positions if abs(p[1]) <= b*spacing)/total*100 for b in bands}
}
POOLS = [
("AZTEC/WETH 1%", "0x7f9267759b90fb79bcb33258cab821a3495793c5", 200),
("wTAO/USDC 1%", "0xf763bb342eb3d23c02ccb86312422fe0c1c17e94", 200),
("wTAO/WETH 0.3%", "0x2982d3295a0e1a99e6e88ece0e93ffdfc5c761ae", 60),
("LQTY/WETH 0.3%", "0xd1d5a4c0ea98971894772dcd6d2f1dc71083c44e", 60),
]
results = []
for name, addr, spacing in POOLS:
try:
r = analyze_pool(name, addr, spacing)
if r:
results.append(r)
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
if results:
print(f"\n{'='*70}")
print(f" SUMMARY — Real Small-Cap LP Distributions (Uniswap V3 Mainnet)")
print(f"{'='*70}")
print(f" {'Pool':25s} {'#ticks':>6} {'±10sp':>7} {'±20sp':>7} {'±50sp':>7} {'AvgDist':>8} {'Tight':>6} {'Wide':>6}")
for r in results:
c = r['concentration']
wd = r['weighted_dist'] / r['spacing']
print(f" {r['name']:25s} {r['n_ticks']:>6} {c.get(10,0):>6.1f}% {c.get(20,0):>6.1f}% {c.get(50,0):>6.1f}% {wd:>6.1f}sp {r['tight_pct']:>5.1f}% {r['wide_pct']:>5.1f}%")
print(f"\n KRAIKEN model comparison:")
print(f" Gaussian BG LP: 5 layers at ±10/20/40/80/160 sp → avg ≈62 spacings")
print(f" Equal ETH per layer means outer layers (wide) have less liquidity/tick")
print(f" but still compete across more ticks")
avg_tight = sum(r['tight_pct'] for r in results) / len(results)
avg_wd = sum(r['weighted_dist']/r['spacing'] for r in results) / len(results)
print(f"\n Real LP avg tight (±10sp): {avg_tight:.1f}%")
print(f" Real LP avg weighted dist: {avg_wd:.1f} spacings")
if avg_wd > 30:
print(f" → Real LPs are SPREAD OUT. Our Gaussian model is REALISTIC.")
print(f" → Fee competition results are ACCURATE (not conservative)")
elif avg_wd < 15:
print(f" → Real LPs are CONCENTRATED. Our Gaussian is TOO SPREAD.")
print(f" → Real competition would be WORSE. Our fee estimates are OPTIMISTIC.")
else:
print(f" → Real LPs have MODERATE spread. Our Gaussian is reasonable.")