harb/services/ponder/src/helpers/stats.ts
johba e20b4517fd fix: fix: protocol activity statistics stay zero — ponder watches wrong contract addresses (#4)
1. Add LM_ADDRESS and POOL_ADDRESS to ponder .env.local (bootstrap.sh)
2. Discover pool address from Uniswap factory during bootstrap (bootstrap-common.sh)
3. Make ring buffer block threshold configurable via MINIMUM_BLOCKS_FOR_RINGBUFFER env var,
   set to 0 for local dev so early events populate the ring buffer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:21:41 +00:00

417 lines
15 KiB
TypeScript

import { getLogger } from './logger';
import { stats, STATS_ID, HOURS_IN_RING_BUFFER, SECONDS_IN_HOUR } from 'ponder:schema';
type Handler = Parameters<(typeof import('ponder:registry'))['ponder']['on']>[1];
type HandlerArgs = Handler extends (...args: infer Args) => unknown ? Args[0] : never;
export type StatsContext = HandlerArgs extends { context: infer C } ? C : never;
type StatsEvent = HandlerArgs extends { event: infer E } ? E : never;
export const RING_BUFFER_SEGMENTS = 4; // ethReserve, minted, burned, holderCount
export const MINIMUM_BLOCKS_FOR_RINGBUFFER = parseInt(process.env.MINIMUM_BLOCKS_FOR_RINGBUFFER || '100');
// Get deploy block from environment (set by bootstrap)
const DEPLOY_BLOCK = BigInt(process.env.START_BLOCK || '0');
let cachedStakeTotalSupply: bigint | null = null;
export function makeEmptyRingBuffer(): bigint[] {
return Array(HOURS_IN_RING_BUFFER * RING_BUFFER_SEGMENTS).fill(0n);
}
export function parseRingBuffer(raw?: string[] | null): bigint[] {
if (!raw || raw.length === 0) {
return makeEmptyRingBuffer();
}
return raw.map(value => BigInt(value));
}
export function serializeRingBuffer(values: bigint[]): string[] {
return values.map(value => value.toString());
}
function computeMetrics(ringBuffer: bigint[], pointer: number) {
let mintedDay = 0n;
let mintedWeek = 0n;
let burnedDay = 0n;
let burnedWeek = 0n;
// Slot 0: ETH reserve snapshots per hour (latest value, not cumulative)
let ethReserveLatest = 0n;
let ethReserve24hAgo = 0n;
let ethReserve7dAgo = 0n;
// Slot 3: holderCount snapshots per hour
let holderCountLatest = 0n;
let holderCount24hAgo = 0n;
let holderCount7dAgo = 0n;
for (let i = 0; i < HOURS_IN_RING_BUFFER; i++) {
const baseIndex = ((pointer - i + HOURS_IN_RING_BUFFER) % HOURS_IN_RING_BUFFER) * RING_BUFFER_SEGMENTS;
const ethReserve = ringBuffer[baseIndex + 0];
const minted = ringBuffer[baseIndex + 1];
const burned = ringBuffer[baseIndex + 2];
const holderCount = ringBuffer[baseIndex + 3];
// Track ETH reserve at key points
if (i === 0 && ethReserve > 0n) ethReserveLatest = ethReserve;
if (i === 23 && ethReserve > 0n) ethReserve24hAgo = ethReserve;
if (ethReserve > 0n) ethReserve7dAgo = ethReserve; // Last non-zero = oldest
// Track holder count at key points
if (i === 0 && holderCount > 0n) holderCountLatest = holderCount;
if (i === 23 && holderCount > 0n) holderCount24hAgo = holderCount;
if (holderCount > 0n) holderCount7dAgo = holderCount; // Last non-zero = oldest
if (i < 24) {
mintedDay += minted;
burnedDay += burned;
}
mintedWeek += minted;
burnedWeek += burned;
}
return {
ethReserveLatest,
ethReserve24hAgo,
ethReserve7dAgo,
holderCountLatest,
holderCount24hAgo,
holderCount7dAgo,
mintedDay,
mintedWeek,
burnedDay,
burnedWeek,
};
}
function computeProjections(ringBuffer: bigint[], pointer: number, timestamp: bigint, metrics: ReturnType<typeof computeMetrics>) {
const startOfHour = (timestamp / BigInt(SECONDS_IN_HOUR)) * BigInt(SECONDS_IN_HOUR);
const elapsedSeconds = timestamp - startOfHour;
const currentBase = pointer * RING_BUFFER_SEGMENTS;
const previousBase = ((pointer - 1 + HOURS_IN_RING_BUFFER) % HOURS_IN_RING_BUFFER) * RING_BUFFER_SEGMENTS;
const project = (current: bigint, previous: bigint, weekly: bigint) => {
if (elapsedSeconds === 0n) {
return weekly / 168n;
}
const projectedTotal = (current * BigInt(SECONDS_IN_HOUR)) / elapsedSeconds;
const medium = (previous + projectedTotal) / 2n;
return medium > 0n ? medium : weekly / 168n;
};
const mintProjection = project(ringBuffer[currentBase + 1], ringBuffer[previousBase + 1], metrics.mintedWeek);
const burnProjection = project(ringBuffer[currentBase + 2], ringBuffer[previousBase + 2], metrics.burnedWeek);
return {
mintProjection,
burnProjection,
};
}
export function checkBlockHistorySufficient(context: StatsContext, event: StatsEvent): boolean {
// Guard against incomplete context during initialization
if (!context?.network?.contracts?.Kraiken) {
return false;
}
const currentBlock = event.block.number;
const deployBlock = DEPLOY_BLOCK;
const blocksSinceDeployment = Number(currentBlock - deployBlock);
if (blocksSinceDeployment < MINIMUM_BLOCKS_FOR_RINGBUFFER) {
// Use console.warn as fallback if context.logger is not available (e.g., in block handlers)
const logger = getLogger(context);
logger.warn(`Insufficient block history (only ${blocksSinceDeployment} blocks available, need ${MINIMUM_BLOCKS_FOR_RINGBUFFER})`);
return false;
}
return true;
}
export async function ensureStatsExists(context: StatsContext, timestamp?: bigint) {
let statsData = await context.db.find(stats, { id: STATS_ID });
if (!statsData) {
const { client, contracts } = context;
const readWithFallback = async <T>(fn: () => Promise<T>, fallback: T, label: string): Promise<T> => {
try {
return await fn();
} catch (error) {
const logger = getLogger(context);
logger.warn(`[stats.ensureStatsExists] Falling back for ${label}`, error);
return fallback;
}
};
const [kraikenTotalSupply, stakeTotalSupply, outstandingStake, minStake] = await Promise.all([
readWithFallback(
() =>
client.readContract({
abi: contracts.Kraiken.abi,
address: contracts.Kraiken.address,
functionName: 'totalSupply',
}),
0n,
'Kraiken.totalSupply'
),
readWithFallback(
() =>
client.readContract({
abi: contracts.Stake.abi,
address: contracts.Stake.address,
functionName: 'totalSupply',
}),
0n,
'Stake.totalSupply'
),
readWithFallback(
() =>
client.readContract({
abi: contracts.Stake.abi,
address: contracts.Stake.address,
functionName: 'outstandingStake',
}),
0n,
'Stake.outstandingStake'
),
readWithFallback(
() =>
client.readContract({
abi: contracts.Kraiken.abi,
address: contracts.Kraiken.address,
functionName: 'minStake',
}),
0n,
'Kraiken.minStake'
),
]);
cachedStakeTotalSupply = stakeTotalSupply;
const currentHour = timestamp ? (timestamp / BigInt(SECONDS_IN_HOUR)) * BigInt(SECONDS_IN_HOUR) : 0n;
await context.db.insert(stats).values({
id: STATS_ID,
kraikenTotalSupply,
stakeTotalSupply,
outstandingStake,
positionsUpdatedAt: timestamp ?? 0n,
ringBufferPointer: 0,
lastHourlyUpdateTimestamp: currentHour,
ringBuffer: serializeRingBuffer(makeEmptyRingBuffer()),
minStake,
});
statsData = await context.db.find(stats, { id: STATS_ID });
}
return statsData;
}
export async function updateHourlyData(context: StatsContext, timestamp: bigint) {
const statsData = await context.db.find(stats, { id: STATS_ID });
if (!statsData) return;
const ringBuffer = parseRingBuffer(statsData.ringBuffer as string[]);
const currentHour = (timestamp / BigInt(SECONDS_IN_HOUR)) * BigInt(SECONDS_IN_HOUR);
let pointer = statsData.ringBufferPointer ?? 0;
const lastUpdate = statsData.lastHourlyUpdateTimestamp ?? 0n;
// Snapshot current holderCount into ring buffer slot 3
// NOTE: Slot 3 migrated from cumulative tax to holderCount in PR #177.
// Existing ring buffer data will contain stale tax values interpreted as
// holder counts for up to 7 days (168 hours) post-deploy until the buffer
// fully rotates. Data self-heals as new hourly snapshots overwrite old slots.
const currentHolderCount = BigInt(statsData.holderCount ?? 0);
const base = pointer * RING_BUFFER_SEGMENTS;
ringBuffer[base + 3] = currentHolderCount;
if (lastUpdate === 0n) {
await context.db.update(stats, { id: STATS_ID }).set({
lastHourlyUpdateTimestamp: currentHour,
ringBuffer: serializeRingBuffer(ringBuffer),
});
return;
}
if (currentHour > lastUpdate) {
const hoursElapsedBig = (currentHour - lastUpdate) / BigInt(SECONDS_IN_HOUR);
const hoursElapsed = Number(hoursElapsedBig > BigInt(HOURS_IN_RING_BUFFER) ? BigInt(HOURS_IN_RING_BUFFER) : hoursElapsedBig);
for (let h = 0; h < hoursElapsed; h++) {
pointer = (pointer + 1) % HOURS_IN_RING_BUFFER;
const newBase = pointer * RING_BUFFER_SEGMENTS;
ringBuffer[newBase + 0] = 0n;
ringBuffer[newBase + 1] = 0n;
ringBuffer[newBase + 2] = 0n;
ringBuffer[newBase + 3] = currentHolderCount; // Carry forward current holderCount
}
const metrics = computeMetrics(ringBuffer, pointer);
await context.db.update(stats, { id: STATS_ID }).set({
ringBufferPointer: pointer,
lastHourlyUpdateTimestamp: currentHour,
ringBuffer: serializeRingBuffer(ringBuffer),
mintedLastDay: metrics.mintedDay,
mintedLastWeek: metrics.mintedWeek,
burnedLastDay: metrics.burnedDay,
burnedLastWeek: metrics.burnedWeek,
ethReserveLastDay: metrics.ethReserveLatest > 0n ? metrics.ethReserveLatest - metrics.ethReserve24hAgo : 0n,
ethReserveLastWeek: metrics.ethReserveLatest > 0n ? metrics.ethReserveLatest - metrics.ethReserve7dAgo : 0n,
netSupplyChangeDay: metrics.mintedDay - metrics.burnedDay,
netSupplyChangeWeek: metrics.mintedWeek - metrics.burnedWeek,
mintNextHourProjected: metrics.mintedWeek / 168n,
burnNextHourProjected: metrics.burnedWeek / 168n,
});
} else {
const metrics = computeMetrics(ringBuffer, pointer);
const projections = computeProjections(ringBuffer, pointer, timestamp, metrics);
await context.db.update(stats, { id: STATS_ID }).set({
ringBuffer: serializeRingBuffer(ringBuffer),
mintedLastDay: metrics.mintedDay,
mintedLastWeek: metrics.mintedWeek,
burnedLastDay: metrics.burnedDay,
burnedLastWeek: metrics.burnedWeek,
ethReserveLastDay: metrics.ethReserveLatest > 0n ? metrics.ethReserveLatest - metrics.ethReserve24hAgo : 0n,
ethReserveLastWeek: metrics.ethReserveLatest > 0n ? metrics.ethReserveLatest - metrics.ethReserve7dAgo : 0n,
netSupplyChangeDay: metrics.mintedDay - metrics.burnedDay,
netSupplyChangeWeek: metrics.mintedWeek - metrics.burnedWeek,
mintNextHourProjected: projections.mintProjection,
burnNextHourProjected: projections.burnProjection,
});
}
}
export async function markPositionsUpdated(context: StatsContext, timestamp: bigint) {
await ensureStatsExists(context, timestamp);
await context.db.update(stats, { id: STATS_ID }).set({
positionsUpdatedAt: timestamp,
});
}
export async function getStakeTotalSupply(context: StatsContext): Promise<bigint> {
await ensureStatsExists(context);
if (cachedStakeTotalSupply !== null) {
return cachedStakeTotalSupply;
}
const totalSupply = await context.client.readContract({
abi: context.contracts.Stake.abi,
address: context.contracts.Stake.address,
functionName: 'totalSupply',
});
cachedStakeTotalSupply = totalSupply;
await context.db.update(stats, { id: STATS_ID }).set({
stakeTotalSupply: totalSupply,
});
return totalSupply;
}
export async function refreshOutstandingStake(context: StatsContext) {
await ensureStatsExists(context);
const outstandingStake = await context.client.readContract({
abi: context.contracts.Stake.abi,
address: context.contracts.Stake.address,
functionName: 'outstandingStake',
});
await context.db.update(stats, { id: STATS_ID }).set({
outstandingStake,
});
}
/**
* Record ETH reserve snapshot in ring buffer slot 0.
* Called from lm.ts on Recentered events (where we know the pool's ETH balance).
*/
export async function recordEthReserveSnapshot(context: StatsContext, ethBalance: bigint) {
const statsData = await context.db.find(stats, { id: STATS_ID });
if (!statsData) return;
const ringBuffer = parseRingBuffer(statsData.ringBuffer as string[]);
const pointer = statsData.ringBufferPointer ?? 0;
const base = pointer * RING_BUFFER_SEGMENTS;
// Slot 0 = ETH reserve snapshot (overwrite with latest value for this hour)
ringBuffer[base + 0] = ethBalance;
await context.db.update(stats, { id: STATS_ID }).set({
ringBuffer: serializeRingBuffer(ringBuffer),
});
}
// WETH address is identical across Base mainnet, Base Sepolia, and local Anvil fork
const WETH_ADDRESS = (process.env.WETH_ADDRESS || '0x4200000000000000000000000000000000000006') as `0x${string}`;
// Minimal ERC-20 ABI — only balanceOf is needed
const erc20BalanceOfAbi = [
{
name: 'balanceOf',
type: 'function',
stateMutability: 'view',
inputs: [{ name: 'account', type: 'address' }],
outputs: [{ name: '', type: 'uint256' }],
},
] as const;
/**
* Read WETH balance of the Uniswap V3 pool via Ponder's cached client and
* persist it as `lastEthReserve` in the stats row.
*
* Call this from any event handler where a trade or stake changes the pool
* balance (Kraiken:Transfer buys/sells, Stake:PositionCreated/Removed).
* EthScarcity/EthAbundance handlers already receive the balance in event args
* and update `lastEthReserve` directly via `updateReserveStats()`.
*/
export async function updateEthReserve(context: StatsContext, poolAddress: `0x${string}`) {
let wethBalance: bigint;
try {
wethBalance = await context.client.readContract({
abi: erc20BalanceOfAbi,
address: WETH_ADDRESS,
functionName: 'balanceOf',
args: [poolAddress],
});
} catch (error) {
const logger = getLogger(context);
logger.warn('[stats.updateEthReserve] Failed to read WETH balance', error);
return;
}
if (wethBalance === 0n) return; // Pool not yet seeded — don't overwrite a real value with 0
await context.db.update(stats, { id: STATS_ID }).set({
lastEthReserve: wethBalance,
});
}
export async function refreshMinStake(context: StatsContext, statsData?: Awaited<ReturnType<typeof ensureStatsExists>>) {
let currentStats = statsData;
if (!currentStats) {
currentStats = await context.db.find(stats, { id: STATS_ID });
}
if (!currentStats) {
currentStats = await ensureStatsExists(context);
}
if (!currentStats) return;
let minStake: bigint;
try {
minStake = await context.client.readContract({
abi: context.contracts.Kraiken.abi,
address: context.contracts.Kraiken.address,
functionName: 'minStake',
});
} catch (error) {
const logger = getLogger(context);
logger.warn('[stats.refreshMinStake] Failed to read Kraiken.minStake', error);
return;
}
if ((currentStats.minStake ?? 0n) === minStake) return;
await context.db.update(stats, { id: STATS_ID }).set({
minStake,
});
}