chore: analysis tooling, research artifacts, and code quality
- Analysis: parameter sweep scripts, adversarial testing, 2D frontier maps - Research: KRAIKEN_RESEARCH_REPORT, SECURITY_REVIEW, STORAGE_LAYOUT - FuzzingBase: consolidated fuzzing helper, BackgroundLP simulation - Sweep results: CSV data for full 4D sweep (1050 combos), bull-bear, AS sweep, VWAP fix validation - Code quality: .gitignore for fuzz CSVs, gas snapshot, updated docs - Remove dead analysis helpers (CSVHelper, CSVManager, ScenarioRecorder) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d64b63aff4
commit
b7260b2eaf
256 changed files with 30276 additions and 1579 deletions
227
onchain/analysis/scan-final.py
Normal file
227
onchain/analysis/scan-final.py
Normal file
|
|
@ -0,0 +1,227 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Final scan of small-cap Uniswap V3 pool LP distributions.
|
||||
Uses tickBitmap to efficiently find all initialized ticks, then reads liquidity."""
|
||||
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
import math
|
||||
|
||||
RPC = "https://ethereum-rpc.publicnode.com"
|
||||
CAST = "/home/debian/.foundry/bin/cast"
|
||||
|
||||
def cast_call(addr, sig, args=None):
|
||||
cmd = [CAST, "call", addr, sig, "--rpc-url", RPC]
|
||||
if args:
|
||||
# Insert args before --rpc-url
|
||||
cmd = [CAST, "call", addr, sig]
|
||||
cmd.extend(args)
|
||||
cmd.extend(["--rpc-url", RPC])
|
||||
for attempt in range(3):
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip()
|
||||
if "429" in (result.stderr + result.stdout):
|
||||
time.sleep(2)
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return None
|
||||
|
||||
def cast_call_neg(addr, sig, neg_val):
|
||||
"""Call with a negative integer arg using -- separator."""
|
||||
cmd = [CAST, "call", addr, sig, "--rpc-url", RPC, "--", str(neg_val)]
|
||||
for attempt in range(3):
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip()
|
||||
if "429" in (result.stderr + result.stdout):
|
||||
time.sleep(2)
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return None
|
||||
|
||||
def parse_int(s):
|
||||
if not s:
|
||||
return None
|
||||
try:
|
||||
return int(s.strip().split()[0])
|
||||
except:
|
||||
return None
|
||||
|
||||
def get_bitmap(addr, word_pos):
|
||||
time.sleep(0.05)
|
||||
if word_pos < 0:
|
||||
raw = cast_call_neg(addr, "tickBitmap(int16)(uint256)", word_pos)
|
||||
else:
|
||||
raw = cast_call(addr, "tickBitmap(int16)(uint256)", [str(word_pos)])
|
||||
if not raw:
|
||||
return 0
|
||||
v = parse_int(raw)
|
||||
return v if v else 0
|
||||
|
||||
def get_tick_liq(addr, tick):
|
||||
time.sleep(0.05)
|
||||
if tick < 0:
|
||||
raw = cast_call_neg(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", tick)
|
||||
else:
|
||||
raw = cast_call(addr, "ticks(int24)(uint128,int128,uint256,uint256,int56,uint160,uint32,bool)", [str(tick)])
|
||||
if not raw:
|
||||
return None
|
||||
lines = raw.split('\n')
|
||||
lg = parse_int(lines[0]) if lines else 0
|
||||
initialized = lines[-1].strip() == "true" if lines else False
|
||||
if initialized and lg and lg > 0:
|
||||
return lg
|
||||
return None
|
||||
|
||||
def find_ticks_via_bitmap(addr, spacing, current_tick, range_ticks):
|
||||
min_c = (current_tick - range_ticks) // spacing
|
||||
max_c = (current_tick + range_ticks) // spacing
|
||||
|
||||
# Python's integer division floors toward negative infinity, which is correct for Uniswap
|
||||
min_word = min_c >> 8 if min_c >= 0 else -(-min_c >> 8) - (1 if (-min_c) & 0xFF else 0)
|
||||
max_word = max_c >> 8 if max_c >= 0 else -(-max_c >> 8) - (1 if (-max_c) & 0xFF else 0)
|
||||
|
||||
# Simpler: just use Python's // which floors correctly
|
||||
min_word = min_c // 256
|
||||
max_word = max_c // 256
|
||||
|
||||
n_words = max_word - min_word + 1
|
||||
print(f" Bitmap: words {min_word} to {max_word} ({n_words} words)", flush=True)
|
||||
|
||||
ticks = []
|
||||
for wp in range(min_word, max_word + 1):
|
||||
bm = get_bitmap(addr, wp)
|
||||
if bm == 0:
|
||||
continue
|
||||
for bit in range(256):
|
||||
if bm & (1 << bit):
|
||||
compressed = wp * 256 + bit
|
||||
tick = compressed * spacing
|
||||
if current_tick - range_ticks <= tick <= current_tick + range_ticks:
|
||||
ticks.append(tick)
|
||||
return sorted(ticks)
|
||||
|
||||
def analyze_pool(name, addr, spacing, scan_range=100000):
|
||||
print(f"\n{'='*65}")
|
||||
print(f" {name} (spacing={spacing})")
|
||||
print(f"{'='*65}")
|
||||
|
||||
# Get current tick
|
||||
raw = cast_call(addr, "slot0()(uint160,int24,uint16,uint16,uint16,uint8,bool)")
|
||||
if not raw:
|
||||
print(" ERROR: slot0 failed")
|
||||
return None
|
||||
current_tick = parse_int(raw.split('\n')[1])
|
||||
print(f" Current tick: {current_tick}")
|
||||
|
||||
# Find initialized ticks
|
||||
init_ticks = find_ticks_via_bitmap(addr, spacing, current_tick, scan_range)
|
||||
print(f" Found {len(init_ticks)} initialized ticks")
|
||||
|
||||
if not init_ticks:
|
||||
return None
|
||||
|
||||
# Read liquidity (limit to first 150 to avoid rate limits)
|
||||
if len(init_ticks) > 150:
|
||||
print(f" (limiting to 150 closest to current tick)")
|
||||
init_ticks.sort(key=lambda t: abs(t - current_tick))
|
||||
init_ticks = sorted(init_ticks[:150])
|
||||
|
||||
print(f" Reading liquidity...", flush=True)
|
||||
positions = []
|
||||
for tick in init_ticks:
|
||||
lg = get_tick_liq(addr, tick)
|
||||
if lg:
|
||||
positions.append((tick, tick - current_tick, lg))
|
||||
|
||||
if not positions:
|
||||
print(" No ticks with liquidity!")
|
||||
return None
|
||||
|
||||
positions.sort(key=lambda x: x[0])
|
||||
max_liq = max(p[2] for p in positions)
|
||||
total = sum(p[2] for p in positions)
|
||||
|
||||
# Print distribution
|
||||
print(f"\n {len(positions)} ticks with liquidity:")
|
||||
print(f" {'Tick':>8} {'Dist':>8} {'Sp':>6} {'%Max':>5} {'%Tot':>5} Bar")
|
||||
|
||||
for tick, dist, lg in positions:
|
||||
pm = lg / max_liq * 100
|
||||
pt = lg / total * 100
|
||||
bar = '█' * max(1, int(pm / 4))
|
||||
marker = " ◄" if abs(dist) < spacing else ""
|
||||
print(f" {tick:>8} {dist:>+8} {dist//spacing:>+5}sp {pm:>4.0f}% {pt:>4.1f}% {bar}{marker}")
|
||||
|
||||
# Concentration analysis
|
||||
bands = [5, 10, 20, 50, 100, 200]
|
||||
print(f"\n Concentration:")
|
||||
for b in bands:
|
||||
in_band = sum(p[2] for p in positions if abs(p[1]) <= b * spacing)
|
||||
pct = in_band / total * 100
|
||||
if pct > 0.1:
|
||||
print(f" ±{b:>3} spacings (±{b*spacing:>6} ticks): {pct:>5.1f}%")
|
||||
|
||||
weighted_dist = sum(abs(p[1]) * p[2] for p in positions) / total
|
||||
print(f" Weighted avg distance: {weighted_dist:.0f} ticks ({weighted_dist/spacing:.1f} spacings)")
|
||||
|
||||
# How much liquidity is "tight" (within ±10 spacings)?
|
||||
tight = sum(p[2] for p in positions if abs(p[1]) <= 10 * spacing) / total * 100
|
||||
wide = sum(p[2] for p in positions if abs(p[1]) > 50 * spacing) / total * 100
|
||||
|
||||
return {
|
||||
'name': name, 'spacing': spacing,
|
||||
'n_ticks': len(positions),
|
||||
'weighted_dist': weighted_dist,
|
||||
'tight_pct': tight,
|
||||
'wide_pct': wide,
|
||||
'concentration': {b: sum(p[2] for p in positions if abs(p[1]) <= b*spacing)/total*100 for b in bands}
|
||||
}
|
||||
|
||||
POOLS = [
|
||||
("AZTEC/WETH 1%", "0x7f9267759b90fb79bcb33258cab821a3495793c5", 200),
|
||||
("wTAO/USDC 1%", "0xf763bb342eb3d23c02ccb86312422fe0c1c17e94", 200),
|
||||
("wTAO/WETH 0.3%", "0x2982d3295a0e1a99e6e88ece0e93ffdfc5c761ae", 60),
|
||||
("LQTY/WETH 0.3%", "0xd1d5a4c0ea98971894772dcd6d2f1dc71083c44e", 60),
|
||||
]
|
||||
|
||||
results = []
|
||||
for name, addr, spacing in POOLS:
|
||||
try:
|
||||
r = analyze_pool(name, addr, spacing)
|
||||
if r:
|
||||
results.append(r)
|
||||
except Exception as e:
|
||||
print(f" ERROR: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
if results:
|
||||
print(f"\n{'='*70}")
|
||||
print(f" SUMMARY — Real Small-Cap LP Distributions (Uniswap V3 Mainnet)")
|
||||
print(f"{'='*70}")
|
||||
print(f" {'Pool':25s} {'#ticks':>6} {'±10sp':>7} {'±20sp':>7} {'±50sp':>7} {'AvgDist':>8} {'Tight':>6} {'Wide':>6}")
|
||||
for r in results:
|
||||
c = r['concentration']
|
||||
wd = r['weighted_dist'] / r['spacing']
|
||||
print(f" {r['name']:25s} {r['n_ticks']:>6} {c.get(10,0):>6.1f}% {c.get(20,0):>6.1f}% {c.get(50,0):>6.1f}% {wd:>6.1f}sp {r['tight_pct']:>5.1f}% {r['wide_pct']:>5.1f}%")
|
||||
|
||||
print(f"\n KRAIKEN model comparison:")
|
||||
print(f" Gaussian BG LP: 5 layers at ±10/20/40/80/160 sp → avg ≈62 spacings")
|
||||
print(f" Equal ETH per layer means outer layers (wide) have less liquidity/tick")
|
||||
print(f" but still compete across more ticks")
|
||||
avg_tight = sum(r['tight_pct'] for r in results) / len(results)
|
||||
avg_wd = sum(r['weighted_dist']/r['spacing'] for r in results) / len(results)
|
||||
print(f"\n Real LP avg tight (±10sp): {avg_tight:.1f}%")
|
||||
print(f" Real LP avg weighted dist: {avg_wd:.1f} spacings")
|
||||
if avg_wd > 30:
|
||||
print(f" → Real LPs are SPREAD OUT. Our Gaussian model is REALISTIC.")
|
||||
print(f" → Fee competition results are ACCURATE (not conservative)")
|
||||
elif avg_wd < 15:
|
||||
print(f" → Real LPs are CONCENTRATED. Our Gaussian is TOO SPREAD.")
|
||||
print(f" → Real competition would be WORSE. Our fee estimates are OPTIMISTIC.")
|
||||
else:
|
||||
print(f" → Real LPs have MODERATE spread. Our Gaussian is reasonable.")
|
||||
Loading…
Add table
Add a link
Reference in a new issue