- Analysis: parameter sweep scripts, adversarial testing, 2D frontier maps - Research: KRAIKEN_RESEARCH_REPORT, SECURITY_REVIEW, STORAGE_LAYOUT - FuzzingBase: consolidated fuzzing helper, BackgroundLP simulation - Sweep results: CSV data for full 4D sweep (1050 combos), bull-bear, AS sweep, VWAP fix validation - Code quality: .gitignore for fuzz CSVs, gas snapshot, updated docs - Remove dead analysis helpers (CSVHelper, CSVManager, ScenarioRecorder) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
227 lines
8.5 KiB
Python
227 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Final scan of small-cap Uniswap V3 pool LP distributions.
|
|
Uses tickBitmap to efficiently find all initialized ticks, then reads liquidity."""
|
|
|
|
import subprocess
|
|
import time
|
|
import sys
|
|
import math
|
|
|
|
RPC = "https://ethereum-rpc.publicnode.com"
|
|
CAST = "/home/debian/.foundry/bin/cast"
|
|
|
|
def cast_call(addr, sig, args=None):
|
|
cmd = [CAST, "call", addr, sig, "--rpc-url", RPC]
|
|
if args:
|
|
# Insert args before --rpc-url
|
|
cmd = [CAST, "call", addr, sig]
|
|
cmd.extend(args)
|
|
cmd.extend(["--rpc-url", RPC])
|
|
for attempt in range(3):
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
if "429" in (result.stderr + result.stdout):
|
|
time.sleep(2)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
return None
|
|
|
|
def cast_call_neg(addr, sig, neg_val):
|
|
"""Call with a negative integer arg using -- separator."""
|
|
cmd = [CAST, "call", addr, sig, "--rpc-url", RPC, "--", str(neg_val)]
|
|
for attempt in range(3):
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
if "429" in (result.stderr + result.stdout):
|
|
time.sleep(2)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
return None
|
|
|
|
def parse_int(s):
|
|
if not s:
|
|
return None
|
|
try:
|
|
return int(s.strip().split()[0])
|
|
except:
|
|
return None
|
|
|
|
def get_bitmap(addr, word_pos):
|
|
time.sleep(0.05)
|
|
if word_pos < 0:
|
|
raw = cast_call_neg(addr, "tickBitmap(int16)(uint256)", word_pos)
|
|
else:
|
|
raw = cast_call(addr, "tickBitmap(int16)(uint256)", [str(word_pos)])
|
|
if not raw:
|
|
return 0
|
|
v = parse_int(raw)
|
|
return v if v else 0
|
|
|
|
def get_tick_liq(addr, tick):
|
|
time.sleep(0.05)
|
|
if tick < 0:
|
|
raw = cast_call_neg(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", tick)
|
|
else:
|
|
raw = cast_call(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", [str(tick)])
|
|
if not raw:
|
|
return None
|
|
lines = raw.split('\n')
|
|
lg = parse_int(lines[0]) if lines else 0
|
|
initialized = lines[-1].strip() == "true" if lines else False
|
|
if initialized and lg and lg > 0:
|
|
return lg
|
|
return None
|
|
|
|
def find_ticks_via_bitmap(addr, spacing, current_tick, range_ticks):
|
|
min_c = (current_tick - range_ticks) // spacing
|
|
max_c = (current_tick + range_ticks) // spacing
|
|
|
|
# Python's integer division floors toward negative infinity, which is correct for Uniswap
|
|
min_word = min_c >> 8 if min_c >= 0 else -(-min_c >> 8) - (1 if (-min_c) & 0xFF else 0)
|
|
max_word = max_c >> 8 if max_c >= 0 else -(-max_c >> 8) - (1 if (-max_c) & 0xFF else 0)
|
|
|
|
# Simpler: just use Python's // which floors correctly
|
|
min_word = min_c // 256
|
|
max_word = max_c // 256
|
|
|
|
n_words = max_word - min_word + 1
|
|
print(f" Bitmap: words {min_word} to {max_word} ({n_words} words)", flush=True)
|
|
|
|
ticks = []
|
|
for wp in range(min_word, max_word + 1):
|
|
bm = get_bitmap(addr, wp)
|
|
if bm == 0:
|
|
continue
|
|
for bit in range(256):
|
|
if bm & (1 << bit):
|
|
compressed = wp * 256 + bit
|
|
tick = compressed * spacing
|
|
if current_tick - range_ticks <= tick <= current_tick + range_ticks:
|
|
ticks.append(tick)
|
|
return sorted(ticks)
|
|
|
|
def analyze_pool(name, addr, spacing, scan_range=100000):
|
|
print(f"\n{'='*65}")
|
|
print(f" {name} (spacing={spacing})")
|
|
print(f"{'='*65}")
|
|
|
|
# Get current tick
|
|
raw = cast_call(addr, "slot0()(uint160,int24,uint16,uint16,uint16,uint8,bool)")
|
|
if not raw:
|
|
print(" ERROR: slot0 failed")
|
|
return None
|
|
current_tick = parse_int(raw.split('\n')[1])
|
|
print(f" Current tick: {current_tick}")
|
|
|
|
# Find initialized ticks
|
|
init_ticks = find_ticks_via_bitmap(addr, spacing, current_tick, scan_range)
|
|
print(f" Found {len(init_ticks)} initialized ticks")
|
|
|
|
if not init_ticks:
|
|
return None
|
|
|
|
# Read liquidity (limit to first 150 to avoid rate limits)
|
|
if len(init_ticks) > 150:
|
|
print(f" (limiting to 150 closest to current tick)")
|
|
init_ticks.sort(key=lambda t: abs(t - current_tick))
|
|
init_ticks = sorted(init_ticks[:150])
|
|
|
|
print(f" Reading liquidity...", flush=True)
|
|
positions = []
|
|
for tick in init_ticks:
|
|
lg = get_tick_liq(addr, tick)
|
|
if lg:
|
|
positions.append((tick, tick - current_tick, lg))
|
|
|
|
if not positions:
|
|
print(" No ticks with liquidity!")
|
|
return None
|
|
|
|
positions.sort(key=lambda x: x[0])
|
|
max_liq = max(p[2] for p in positions)
|
|
total = sum(p[2] for p in positions)
|
|
|
|
# Print distribution
|
|
print(f"\n {len(positions)} ticks with liquidity:")
|
|
print(f" {'Tick':>8} {'Dist':>8} {'Sp':>6} {'%Max':>5} {'%Tot':>5} Bar")
|
|
|
|
for tick, dist, lg in positions:
|
|
pm = lg / max_liq * 100
|
|
pt = lg / total * 100
|
|
bar = '█' * max(1, int(pm / 4))
|
|
marker = " ◄" if abs(dist) < spacing else ""
|
|
print(f" {tick:>8} {dist:>+8} {dist//spacing:>+5}sp {pm:>4.0f}% {pt:>4.1f}% {bar}{marker}")
|
|
|
|
# Concentration analysis
|
|
bands = [5, 10, 20, 50, 100, 200]
|
|
print(f"\n Concentration:")
|
|
for b in bands:
|
|
in_band = sum(p[2] for p in positions if abs(p[1]) <= b * spacing)
|
|
pct = in_band / total * 100
|
|
if pct > 0.1:
|
|
print(f" ±{b:>3} spacings (±{b*spacing:>6} ticks): {pct:>5.1f}%")
|
|
|
|
weighted_dist = sum(abs(p[1]) * p[2] for p in positions) / total
|
|
print(f" Weighted avg distance: {weighted_dist:.0f} ticks ({weighted_dist/spacing:.1f} spacings)")
|
|
|
|
# How much liquidity is "tight" (within ±10 spacings)?
|
|
tight = sum(p[2] for p in positions if abs(p[1]) <= 10 * spacing) / total * 100
|
|
wide = sum(p[2] for p in positions if abs(p[1]) > 50 * spacing) / total * 100
|
|
|
|
return {
|
|
'name': name, 'spacing': spacing,
|
|
'n_ticks': len(positions),
|
|
'weighted_dist': weighted_dist,
|
|
'tight_pct': tight,
|
|
'wide_pct': wide,
|
|
'concentration': {b: sum(p[2] for p in positions if abs(p[1]) <= b*spacing)/total*100 for b in bands}
|
|
}
|
|
|
|
POOLS = [
|
|
("AZTEC/WETH 1%", "0x7f9267759b90fb79bcb33258cab821a3495793c5", 200),
|
|
("wTAO/USDC 1%", "0xf763bb342eb3d23c02ccb86312422fe0c1c17e94", 200),
|
|
("wTAO/WETH 0.3%", "0x2982d3295a0e1a99e6e88ece0e93ffdfc5c761ae", 60),
|
|
("LQTY/WETH 0.3%", "0xd1d5a4c0ea98971894772dcd6d2f1dc71083c44e", 60),
|
|
]
|
|
|
|
results = []
|
|
for name, addr, spacing in POOLS:
|
|
try:
|
|
r = analyze_pool(name, addr, spacing)
|
|
if r:
|
|
results.append(r)
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if results:
|
|
print(f"\n{'='*70}")
|
|
print(f" SUMMARY — Real Small-Cap LP Distributions (Uniswap V3 Mainnet)")
|
|
print(f"{'='*70}")
|
|
print(f" {'Pool':25s} {'#ticks':>6} {'±10sp':>7} {'±20sp':>7} {'±50sp':>7} {'AvgDist':>8} {'Tight':>6} {'Wide':>6}")
|
|
for r in results:
|
|
c = r['concentration']
|
|
wd = r['weighted_dist'] / r['spacing']
|
|
print(f" {r['name']:25s} {r['n_ticks']:>6} {c.get(10,0):>6.1f}% {c.get(20,0):>6.1f}% {c.get(50,0):>6.1f}% {wd:>6.1f}sp {r['tight_pct']:>5.1f}% {r['wide_pct']:>5.1f}%")
|
|
|
|
print(f"\n KRAIKEN model comparison:")
|
|
print(f" Gaussian BG LP: 5 layers at ±10/20/40/80/160 sp → avg ≈62 spacings")
|
|
print(f" Equal ETH per layer means outer layers (wide) have less liquidity/tick")
|
|
print(f" but still compete across more ticks")
|
|
avg_tight = sum(r['tight_pct'] for r in results) / len(results)
|
|
avg_wd = sum(r['weighted_dist']/r['spacing'] for r in results) / len(results)
|
|
print(f"\n Real LP avg tight (±10sp): {avg_tight:.1f}%")
|
|
print(f" Real LP avg weighted dist: {avg_wd:.1f} spacings")
|
|
if avg_wd > 30:
|
|
print(f" → Real LPs are SPREAD OUT. Our Gaussian model is REALISTIC.")
|
|
print(f" → Fee competition results are ACCURATE (not conservative)")
|
|
elif avg_wd < 15:
|
|
print(f" → Real LPs are CONCENTRATED. Our Gaussian is TOO SPREAD.")
|
|
print(f" → Real competition would be WORSE. Our fee estimates are OPTIMISTIC.")
|
|
else:
|
|
print(f" → Real LPs have MODERATE spread. Our Gaussian is reasonable.")
|