position stream

This commit is contained in:
johba 2025-10-11 12:33:35 +00:00
parent 0674648044
commit 55321eff65
5 changed files with 265 additions and 85 deletions

View file

@ -29,6 +29,7 @@ type stats {
kraikenTotalSupply: BigInt!
stakeTotalSupply: BigInt!
outstandingStake: BigInt!
positionsUpdatedAt: BigInt!
totalMinted: BigInt!
totalBurned: BigInt!
totalTaxPaid: BigInt!
@ -93,6 +94,14 @@ input statsFilter {
outstandingStake_lt: BigInt
outstandingStake_gte: BigInt
outstandingStake_lte: BigInt
positionsUpdatedAt: BigInt
positionsUpdatedAt_not: BigInt
positionsUpdatedAt_in: [BigInt]
positionsUpdatedAt_not_in: [BigInt]
positionsUpdatedAt_gt: BigInt
positionsUpdatedAt_lt: BigInt
positionsUpdatedAt_gte: BigInt
positionsUpdatedAt_lte: BigInt
totalMinted: BigInt
totalMinted_not: BigInt
totalMinted_in: [BigInt]

View file

@ -19,6 +19,10 @@ export const stats = onchainTable('stats', t => ({
.bigint()
.notNull()
.$default(() => 0n),
positionsUpdatedAt: t
.bigint()
.notNull()
.$default(() => 0n),
// Totals
totalMinted: t

View file

@ -112,9 +112,7 @@ export function checkBlockHistorySufficient(context: StatsContext, event: StatsE
if (blocksSinceDeployment < MINIMUM_BLOCKS_FOR_RINGBUFFER) {
// Use console.warn as fallback if context.logger is not available (e.g., in block handlers)
const logger = context.logger || console;
logger.warn(
`Insufficient block history (only ${blocksSinceDeployment} blocks available, need ${MINIMUM_BLOCKS_FOR_RINGBUFFER})`
);
logger.warn(`Insufficient block history (only ${blocksSinceDeployment} blocks available, need ${MINIMUM_BLOCKS_FOR_RINGBUFFER})`);
return false;
}
return true;
@ -176,6 +174,7 @@ export async function ensureStatsExists(context: StatsContext, timestamp?: bigin
kraikenTotalSupply,
stakeTotalSupply,
outstandingStake,
positionsUpdatedAt: timestamp ?? 0n,
ringBufferPointer: 0,
lastHourlyUpdateTimestamp: currentHour,
ringBuffer: serializeRingBuffer(makeEmptyRingBuffer()),
@ -258,6 +257,13 @@ export async function updateHourlyData(context: StatsContext, timestamp: bigint)
}
}
export async function markPositionsUpdated(context: StatsContext, timestamp: bigint) {
await ensureStatsExists(context, timestamp);
await context.db.update(stats, { id: STATS_ID }).set({
positionsUpdatedAt: timestamp,
});
}
export async function getStakeTotalSupply(context: StatsContext): Promise<bigint> {
await ensureStatsExists(context);

View file

@ -3,6 +3,7 @@ import { positions, stats, STATS_ID, TAX_RATES } from 'ponder:schema';
import {
ensureStatsExists,
getStakeTotalSupply,
markPositionsUpdated,
parseRingBuffer,
refreshOutstandingStake,
serializeRingBuffer,
@ -55,6 +56,7 @@ ponder.on('Stake:PositionCreated', async ({ event, context }) => {
});
await refreshOutstandingStake(context);
await markPositionsUpdated(context, event.block.timestamp);
});
ponder.on('Stake:PositionRemoved', async ({ event, context }) => {
@ -76,6 +78,7 @@ ponder.on('Stake:PositionRemoved', async ({ event, context }) => {
});
await refreshOutstandingStake(context);
await markPositionsUpdated(context, event.block.timestamp);
if (checkBlockHistorySufficient(context, event)) {
await updateHourlyData(context, event.block.timestamp);
@ -101,6 +104,7 @@ ponder.on('Stake:PositionShrunk', async ({ event, context }) => {
});
await refreshOutstandingStake(context);
await markPositionsUpdated(context, event.block.timestamp);
if (checkBlockHistorySufficient(context, event)) {
await updateHourlyData(context, event.block.timestamp);
@ -154,6 +158,7 @@ ponder.on('Stake:PositionTaxPaid', async ({ event, context }) => {
}
await refreshOutstandingStake(context);
await markPositionsUpdated(context, event.block.timestamp);
});
ponder.on('Stake:PositionRateHiked', async ({ event, context }) => {
@ -163,4 +168,5 @@ ponder.on('Stake:PositionRateHiked', async ({ event, context }) => {
taxRate: TAX_RATES[taxRateIndex] || 0,
taxRateIndex,
});
await markPositionsUpdated(context, event.block.timestamp);
});

View file

@ -2,24 +2,36 @@ import { ref, computed, type ComputedRef, onMounted, onUnmounted } from 'vue';
import { config } from '@/wagmi';
import { type WatchEventReturnType, type Hex, toBytes } from 'viem';
import axios from 'axios';
import { getAccount, watchContractEvent, watchChainId, watchAccount, type Config } from '@wagmi/core';
import { getAccount, watchChainId, watchAccount, watchContractEvent, type Config } from '@wagmi/core';
import type { WatchChainIdReturnType, WatchAccountReturnType, GetAccountReturnType } from '@wagmi/core';
import { HarbContract } from '@/contracts/harb';
import { bytesToUint256 } from 'kraiken-lib';
import { bigInt2Number } from '@/utils/helper';
import { getTaxRateIndexByDecimal } from '@/composables/useAdjustTaxRates';
import logger from '@/utils/logger';
import { DEFAULT_CHAIN_ID } from '@/config';
import { createRetryManager, formatGraphqlError, resolveGraphqlEndpoint } from '@/utils/graphqlRetry';
import { HarbContract } from '@/contracts/harb';
const rawActivePositions = ref<Array<Position>>([]);
const rawClosedPositoins = ref<Array<Position>>([]);
const loading = ref(false);
const positionsError = ref<string | null>(null);
const GRAPHQL_TIMEOUT_MS = 15_000;
const activeChainId = ref<number>(DEFAULT_CHAIN_ID);
const positionsUpdatedAt = ref<bigint>(0n);
const POLL_INTERVAL_MS = 10_000;
const retryManager = createRetryManager(loadPositions, activeChainId);
let positionsPollTimer: ReturnType<typeof setInterval> | null = null;
let pollInFlight = false;
let realtimeConsumerCount = 0;
let isPollingActive = false;
let isContractWatchActive = false;
let unwatchPositionCreated: WatchEventReturnType | null = null;
let unwatchPositionRemoved: WatchEventReturnType | null = null;
let unwatchChainSwitch: WatchChainIdReturnType | null = null;
let unwatchAccountChanged: WatchAccountReturnType | null = null;
let wagmiWatcherConsumers = 0;
const activePositions = computed(() => {
const account = getAccount(config as Config);
@ -119,7 +131,12 @@ const tresholdValue = computed(() => {
return Math.floor(avgIndex / 2);
});
export async function loadActivePositions(chainId: number, endpointOverride?: string) {
interface ActivePositionsResult {
positions: Position[];
updatedAt: bigint;
}
export async function loadActivePositions(chainId: number, endpointOverride?: string): Promise<ActivePositionsResult> {
const targetEndpoint = resolveGraphqlEndpoint(chainId, endpointOverride);
logger.info(`loadActivePositions for chainId: ${chainId}`);
@ -145,6 +162,9 @@ export async function loadActivePositions(chainId: number, endpointOverride?: st
totalSupplyInit
}
}
stats(id: "0x01") {
positionsUpdatedAt
}
}`,
},
{ timeout: GRAPHQL_TIMEOUT_MS }
@ -156,10 +176,17 @@ export async function loadActivePositions(chainId: number, endpointOverride?: st
}
const items = res.data?.data?.positionss?.items ?? [];
return items.map((item: Record<string, unknown>) => ({
const stats = res.data?.data?.stats;
const updatedAtRaw = stats?.positionsUpdatedAt;
const updatedAt = typeof updatedAtRaw === 'string' ? BigInt(updatedAtRaw) : 0n;
return {
positions: items.map((item: Record<string, unknown>) => ({
...item,
harbDeposit: item.kraikenDeposit ?? '0',
})) as Position[];
})) as Position[],
updatedAt,
};
}
function formatId(id: Hex) {
@ -206,6 +233,35 @@ export async function loadMyClosedPositions(chainId: number, endpointOverride: s
})) as Position[];
}
export async function fetchPositionsUpdatedAt(chainId: number, endpointOverride?: string): Promise<bigint> {
const targetEndpoint = resolveGraphqlEndpoint(chainId, endpointOverride);
logger.info(`fetchPositionsUpdatedAt for chainId: ${chainId}`);
const res = await axios.post(
targetEndpoint,
{
query: `query PositionsUpdatedAt {
stats(id: "0x01") {
positionsUpdatedAt
}
}`,
},
{ timeout: GRAPHQL_TIMEOUT_MS }
);
const errors = res.data?.errors;
if (Array.isArray(errors) && errors.length > 0) {
throw new Error(errors.map((err: unknown) => (err as { message?: string })?.message ?? 'GraphQL error').join(', '));
}
const updatedAtRaw = res.data?.data?.stats?.positionsUpdatedAt;
if (typeof updatedAtRaw !== 'string') {
throw new Error('positionsUpdatedAt missing from GraphQL response');
}
return BigInt(updatedAtRaw);
}
export async function loadPositions(chainId?: number) {
loading.value = true;
@ -226,7 +282,9 @@ export async function loadPositions(chainId?: number) {
}
try {
rawActivePositions.value = await loadActivePositions(targetChainId, endpoint);
const { positions, updatedAt } = await loadActivePositions(targetChainId, endpoint);
rawActivePositions.value = positions;
positionsUpdatedAt.value = updatedAt;
const account = getAccount(config as Config);
if (account.address) {
rawClosedPositoins.value = await loadMyClosedPositions(targetChainId, endpoint, account);
@ -241,57 +299,139 @@ export async function loadPositions(chainId?: number) {
rawClosedPositoins.value = [];
positionsError.value = formatGraphqlError(error);
retryManager.schedule();
positionsUpdatedAt.value = 0n;
} finally {
loading.value = false;
}
}
let unwatch: WatchEventReturnType | null;
let unwatchPositionRemovedEvent: WatchEventReturnType | null;
let unwatchChainSwitch: WatchChainIdReturnType | null;
let unwatchAccountChanged: WatchAccountReturnType | null;
export function usePositions(chainId: number = DEFAULT_CHAIN_ID) {
activeChainId.value = chainId;
async function pollPositionsOnce() {
if (!isPollingActive || realtimeConsumerCount === 0 || pollInFlight || loading.value) {
return;
}
pollInFlight = true;
try {
const chainId = activeChainId.value ?? DEFAULT_CHAIN_ID;
let endpoint: string;
try {
endpoint = resolveGraphqlEndpoint(chainId);
} catch (error) {
logger.info('positions polling skipped: no GraphQL endpoint', error);
return;
}
const latestUpdatedAt = await fetchPositionsUpdatedAt(chainId, endpoint);
if (latestUpdatedAt > positionsUpdatedAt.value) {
await loadPositions(chainId);
}
} catch (error) {
logger.info('positions polling failed', error);
} finally {
pollInFlight = false;
}
}
function watchEvent() {
unwatch = watchContractEvent(config as Config, {
function startPositionsPolling() {
if (isPollingActive || realtimeConsumerCount === 0) {
return;
}
positionsPollTimer = setInterval(() => {
void pollPositionsOnce();
}, POLL_INTERVAL_MS);
isPollingActive = true;
void pollPositionsOnce();
}
function stopPositionsPolling() {
if (!isPollingActive) {
return;
}
if (positionsPollTimer) {
clearInterval(positionsPollTimer);
positionsPollTimer = null;
}
isPollingActive = false;
}
function startContractEventWatchers() {
if (isContractWatchActive || realtimeConsumerCount === 0) {
return;
}
unwatchPositionCreated = watchContractEvent(config as Config, {
address: HarbContract.contractAddress,
abi: HarbContract.abi,
eventName: 'PositionCreated',
async onLogs(_logs) {
// console.log("new Position", logs);
await loadPositions();
// await getMinStake();
async onLogs() {
await loadPositions(activeChainId.value);
},
});
}
function watchPositionRemoved() {
unwatchPositionRemovedEvent = watchContractEvent(config as Config, {
unwatchPositionRemoved = watchContractEvent(config as Config, {
address: HarbContract.contractAddress,
abi: HarbContract.abi,
eventName: 'PositionRemoved',
async onLogs(_logs) {
// console.log("Position removed", logs);
await loadPositions();
// await getMinStake();
async onLogs() {
await loadPositions(activeChainId.value);
},
});
isContractWatchActive = true;
}
onMounted(async () => {
//initial loading positions
if (activePositions.value.length < 1 && loading.value === false) {
await loadPositions(activeChainId.value);
// await getMinStake();
function stopContractEventWatchers() {
if (!isContractWatchActive) {
return;
}
if (unwatchPositionCreated) {
unwatchPositionCreated();
unwatchPositionCreated = null;
}
if (unwatchPositionRemoved) {
unwatchPositionRemoved();
unwatchPositionRemoved = null;
}
isContractWatchActive = false;
}
if (!unwatch) {
watchEvent();
function syncRealtimeMode() {
if (realtimeConsumerCount === 0) {
stopContractEventWatchers();
stopPositionsPolling();
return;
}
if (!unwatchPositionRemovedEvent) {
watchPositionRemoved();
const account = getAccount(config as Config);
const shouldUseContractEvents = Boolean(account.address);
if (shouldUseContractEvents) {
stopPositionsPolling();
startContractEventWatchers();
} else {
stopContractEventWatchers();
startPositionsPolling();
}
}
function registerRealtimeConsumer() {
realtimeConsumerCount += 1;
syncRealtimeMode();
}
function unregisterRealtimeConsumer() {
if (realtimeConsumerCount === 0) {
return;
}
realtimeConsumerCount -= 1;
if (realtimeConsumerCount === 0) {
stopContractEventWatchers();
stopPositionsPolling();
}
}
function ensureWagmiWatchers() {
wagmiWatcherConsumers += 1;
if (wagmiWatcherConsumers > 1) {
return;
}
if (!unwatchChainSwitch) {
@ -300,6 +440,7 @@ export function usePositions(chainId: number = DEFAULT_CHAIN_ID) {
const resolvedChainId = nextChainId ?? DEFAULT_CHAIN_ID;
activeChainId.value = resolvedChainId;
await loadPositions(resolvedChainId);
syncRealtimeMode();
},
});
}
@ -308,19 +449,16 @@ export function usePositions(chainId: number = DEFAULT_CHAIN_ID) {
unwatchAccountChanged = watchAccount(config as Config, {
async onChange() {
await loadPositions(activeChainId.value);
syncRealtimeMode();
},
});
}
});
onUnmounted(() => {
if (unwatch) {
unwatch();
unwatch = null;
}
if (unwatchPositionRemovedEvent) {
unwatchPositionRemovedEvent();
unwatchPositionRemovedEvent = null;
function teardownWagmiWatchers() {
wagmiWatcherConsumers = Math.max(0, wagmiWatcherConsumers - 1);
if (wagmiWatcherConsumers > 0) {
return;
}
if (unwatchChainSwitch) {
unwatchChainSwitch();
@ -330,6 +468,24 @@ export function usePositions(chainId: number = DEFAULT_CHAIN_ID) {
unwatchAccountChanged();
unwatchAccountChanged = null;
}
}
export function usePositions(chainId: number = DEFAULT_CHAIN_ID) {
activeChainId.value = chainId;
onMounted(async () => {
ensureWagmiWatchers();
if (activePositions.value.length < 1 && loading.value === false) {
await loadPositions(activeChainId.value);
}
registerRealtimeConsumer();
});
onUnmounted(() => {
unregisterRealtimeConsumer();
teardownWagmiWatchers();
retryManager.clear();
});
@ -379,10 +535,9 @@ export function usePositions(chainId: number = DEFAULT_CHAIN_ID) {
myActivePositions,
myClosedPositions,
tresholdValue,
watchEvent,
watchPositionRemoved,
createRandomPosition,
positionsError,
loading,
positionsUpdatedAt,
};
}