diff --git a/services/ponder/generated/schema.graphql b/services/ponder/generated/schema.graphql index a3a3945..2db44d8 100644 --- a/services/ponder/generated/schema.graphql +++ b/services/ponder/generated/schema.graphql @@ -29,6 +29,7 @@ type stats { kraikenTotalSupply: BigInt! stakeTotalSupply: BigInt! outstandingStake: BigInt! + positionsUpdatedAt: BigInt! totalMinted: BigInt! totalBurned: BigInt! totalTaxPaid: BigInt! @@ -93,6 +94,14 @@ input statsFilter { outstandingStake_lt: BigInt outstandingStake_gte: BigInt outstandingStake_lte: BigInt + positionsUpdatedAt: BigInt + positionsUpdatedAt_not: BigInt + positionsUpdatedAt_in: [BigInt] + positionsUpdatedAt_not_in: [BigInt] + positionsUpdatedAt_gt: BigInt + positionsUpdatedAt_lt: BigInt + positionsUpdatedAt_gte: BigInt + positionsUpdatedAt_lte: BigInt totalMinted: BigInt totalMinted_not: BigInt totalMinted_in: [BigInt] diff --git a/services/ponder/ponder.schema.ts b/services/ponder/ponder.schema.ts index 96734e6..80886b8 100644 --- a/services/ponder/ponder.schema.ts +++ b/services/ponder/ponder.schema.ts @@ -19,6 +19,10 @@ export const stats = onchainTable('stats', t => ({ .bigint() .notNull() .$default(() => 0n), + positionsUpdatedAt: t + .bigint() + .notNull() + .$default(() => 0n), // Totals totalMinted: t diff --git a/services/ponder/src/helpers/stats.ts b/services/ponder/src/helpers/stats.ts index a19da29..528dd6e 100644 --- a/services/ponder/src/helpers/stats.ts +++ b/services/ponder/src/helpers/stats.ts @@ -112,9 +112,7 @@ export function checkBlockHistorySufficient(context: StatsContext, event: StatsE if (blocksSinceDeployment < MINIMUM_BLOCKS_FOR_RINGBUFFER) { // Use console.warn as fallback if context.logger is not available (e.g., in block handlers) const logger = context.logger || console; - logger.warn( - `Insufficient block history (only ${blocksSinceDeployment} blocks available, need ${MINIMUM_BLOCKS_FOR_RINGBUFFER})` - ); + logger.warn(`Insufficient block history (only ${blocksSinceDeployment} blocks available, need ${MINIMUM_BLOCKS_FOR_RINGBUFFER})`); return false; } return true; @@ -176,6 +174,7 @@ export async function ensureStatsExists(context: StatsContext, timestamp?: bigin kraikenTotalSupply, stakeTotalSupply, outstandingStake, + positionsUpdatedAt: timestamp ?? 0n, ringBufferPointer: 0, lastHourlyUpdateTimestamp: currentHour, ringBuffer: serializeRingBuffer(makeEmptyRingBuffer()), @@ -258,6 +257,13 @@ export async function updateHourlyData(context: StatsContext, timestamp: bigint) } } +export async function markPositionsUpdated(context: StatsContext, timestamp: bigint) { + await ensureStatsExists(context, timestamp); + await context.db.update(stats, { id: STATS_ID }).set({ + positionsUpdatedAt: timestamp, + }); +} + export async function getStakeTotalSupply(context: StatsContext): Promise { await ensureStatsExists(context); diff --git a/services/ponder/src/stake.ts b/services/ponder/src/stake.ts index 11dd664..2f912ca 100644 --- a/services/ponder/src/stake.ts +++ b/services/ponder/src/stake.ts @@ -3,6 +3,7 @@ import { positions, stats, STATS_ID, TAX_RATES } from 'ponder:schema'; import { ensureStatsExists, getStakeTotalSupply, + markPositionsUpdated, parseRingBuffer, refreshOutstandingStake, serializeRingBuffer, @@ -55,6 +56,7 @@ ponder.on('Stake:PositionCreated', async ({ event, context }) => { }); await refreshOutstandingStake(context); + await markPositionsUpdated(context, event.block.timestamp); }); ponder.on('Stake:PositionRemoved', async ({ event, context }) => { @@ -76,6 +78,7 @@ ponder.on('Stake:PositionRemoved', async ({ event, context }) => { }); await refreshOutstandingStake(context); + await markPositionsUpdated(context, event.block.timestamp); if (checkBlockHistorySufficient(context, event)) { await updateHourlyData(context, event.block.timestamp); @@ -101,6 +104,7 @@ ponder.on('Stake:PositionShrunk', async ({ event, context }) => { }); await refreshOutstandingStake(context); + await markPositionsUpdated(context, event.block.timestamp); if (checkBlockHistorySufficient(context, event)) { await updateHourlyData(context, event.block.timestamp); @@ -154,6 +158,7 @@ ponder.on('Stake:PositionTaxPaid', async ({ event, context }) => { } await refreshOutstandingStake(context); + await markPositionsUpdated(context, event.block.timestamp); }); ponder.on('Stake:PositionRateHiked', async ({ event, context }) => { @@ -163,4 +168,5 @@ ponder.on('Stake:PositionRateHiked', async ({ event, context }) => { taxRate: TAX_RATES[taxRateIndex] || 0, taxRateIndex, }); + await markPositionsUpdated(context, event.block.timestamp); }); diff --git a/web-app/src/composables/usePositions.ts b/web-app/src/composables/usePositions.ts index 9a95323..2303c54 100644 --- a/web-app/src/composables/usePositions.ts +++ b/web-app/src/composables/usePositions.ts @@ -2,24 +2,36 @@ import { ref, computed, type ComputedRef, onMounted, onUnmounted } from 'vue'; import { config } from '@/wagmi'; import { type WatchEventReturnType, type Hex, toBytes } from 'viem'; import axios from 'axios'; -import { getAccount, watchContractEvent, watchChainId, watchAccount, type Config } from '@wagmi/core'; +import { getAccount, watchChainId, watchAccount, watchContractEvent, type Config } from '@wagmi/core'; import type { WatchChainIdReturnType, WatchAccountReturnType, GetAccountReturnType } from '@wagmi/core'; -import { HarbContract } from '@/contracts/harb'; import { bytesToUint256 } from 'kraiken-lib'; import { bigInt2Number } from '@/utils/helper'; import { getTaxRateIndexByDecimal } from '@/composables/useAdjustTaxRates'; import logger from '@/utils/logger'; import { DEFAULT_CHAIN_ID } from '@/config'; import { createRetryManager, formatGraphqlError, resolveGraphqlEndpoint } from '@/utils/graphqlRetry'; +import { HarbContract } from '@/contracts/harb'; const rawActivePositions = ref>([]); const rawClosedPositoins = ref>([]); const loading = ref(false); const positionsError = ref(null); const GRAPHQL_TIMEOUT_MS = 15_000; const activeChainId = ref(DEFAULT_CHAIN_ID); +const positionsUpdatedAt = ref(0n); +const POLL_INTERVAL_MS = 10_000; const retryManager = createRetryManager(loadPositions, activeChainId); +let positionsPollTimer: ReturnType | null = null; +let pollInFlight = false; +let realtimeConsumerCount = 0; +let isPollingActive = false; +let isContractWatchActive = false; +let unwatchPositionCreated: WatchEventReturnType | null = null; +let unwatchPositionRemoved: WatchEventReturnType | null = null; +let unwatchChainSwitch: WatchChainIdReturnType | null = null; +let unwatchAccountChanged: WatchAccountReturnType | null = null; +let wagmiWatcherConsumers = 0; const activePositions = computed(() => { const account = getAccount(config as Config); @@ -119,7 +131,12 @@ const tresholdValue = computed(() => { return Math.floor(avgIndex / 2); }); -export async function loadActivePositions(chainId: number, endpointOverride?: string) { +interface ActivePositionsResult { + positions: Position[]; + updatedAt: bigint; +} + +export async function loadActivePositions(chainId: number, endpointOverride?: string): Promise { const targetEndpoint = resolveGraphqlEndpoint(chainId, endpointOverride); logger.info(`loadActivePositions for chainId: ${chainId}`); @@ -145,6 +162,9 @@ export async function loadActivePositions(chainId: number, endpointOverride?: st totalSupplyInit } } + stats(id: "0x01") { + positionsUpdatedAt + } }`, }, { timeout: GRAPHQL_TIMEOUT_MS } @@ -156,10 +176,17 @@ export async function loadActivePositions(chainId: number, endpointOverride?: st } const items = res.data?.data?.positionss?.items ?? []; - return items.map((item: Record) => ({ - ...item, - harbDeposit: item.kraikenDeposit ?? '0', - })) as Position[]; + const stats = res.data?.data?.stats; + const updatedAtRaw = stats?.positionsUpdatedAt; + const updatedAt = typeof updatedAtRaw === 'string' ? BigInt(updatedAtRaw) : 0n; + + return { + positions: items.map((item: Record) => ({ + ...item, + harbDeposit: item.kraikenDeposit ?? '0', + })) as Position[], + updatedAt, + }; } function formatId(id: Hex) { @@ -206,6 +233,35 @@ export async function loadMyClosedPositions(chainId: number, endpointOverride: s })) as Position[]; } +export async function fetchPositionsUpdatedAt(chainId: number, endpointOverride?: string): Promise { + const targetEndpoint = resolveGraphqlEndpoint(chainId, endpointOverride); + logger.info(`fetchPositionsUpdatedAt for chainId: ${chainId}`); + + const res = await axios.post( + targetEndpoint, + { + query: `query PositionsUpdatedAt { + stats(id: "0x01") { + positionsUpdatedAt + } + }`, + }, + { timeout: GRAPHQL_TIMEOUT_MS } + ); + + const errors = res.data?.errors; + if (Array.isArray(errors) && errors.length > 0) { + throw new Error(errors.map((err: unknown) => (err as { message?: string })?.message ?? 'GraphQL error').join(', ')); + } + + const updatedAtRaw = res.data?.data?.stats?.positionsUpdatedAt; + if (typeof updatedAtRaw !== 'string') { + throw new Error('positionsUpdatedAt missing from GraphQL response'); + } + + return BigInt(updatedAtRaw); +} + export async function loadPositions(chainId?: number) { loading.value = true; @@ -226,7 +282,9 @@ export async function loadPositions(chainId?: number) { } try { - rawActivePositions.value = await loadActivePositions(targetChainId, endpoint); + const { positions, updatedAt } = await loadActivePositions(targetChainId, endpoint); + rawActivePositions.value = positions; + positionsUpdatedAt.value = updatedAt; const account = getAccount(config as Config); if (account.address) { rawClosedPositoins.value = await loadMyClosedPositions(targetChainId, endpoint, account); @@ -241,95 +299,193 @@ export async function loadPositions(chainId?: number) { rawClosedPositoins.value = []; positionsError.value = formatGraphqlError(error); retryManager.schedule(); + positionsUpdatedAt.value = 0n; } finally { loading.value = false; } } -let unwatch: WatchEventReturnType | null; -let unwatchPositionRemovedEvent: WatchEventReturnType | null; -let unwatchChainSwitch: WatchChainIdReturnType | null; -let unwatchAccountChanged: WatchAccountReturnType | null; +async function pollPositionsOnce() { + if (!isPollingActive || realtimeConsumerCount === 0 || pollInFlight || loading.value) { + return; + } + pollInFlight = true; + try { + const chainId = activeChainId.value ?? DEFAULT_CHAIN_ID; + let endpoint: string; + try { + endpoint = resolveGraphqlEndpoint(chainId); + } catch (error) { + logger.info('positions polling skipped: no GraphQL endpoint', error); + return; + } + const latestUpdatedAt = await fetchPositionsUpdatedAt(chainId, endpoint); + if (latestUpdatedAt > positionsUpdatedAt.value) { + await loadPositions(chainId); + } + } catch (error) { + logger.info('positions polling failed', error); + } finally { + pollInFlight = false; + } +} + +function startPositionsPolling() { + if (isPollingActive || realtimeConsumerCount === 0) { + return; + } + positionsPollTimer = setInterval(() => { + void pollPositionsOnce(); + }, POLL_INTERVAL_MS); + isPollingActive = true; + void pollPositionsOnce(); +} + +function stopPositionsPolling() { + if (!isPollingActive) { + return; + } + if (positionsPollTimer) { + clearInterval(positionsPollTimer); + positionsPollTimer = null; + } + isPollingActive = false; +} + +function startContractEventWatchers() { + if (isContractWatchActive || realtimeConsumerCount === 0) { + return; + } + + unwatchPositionCreated = watchContractEvent(config as Config, { + address: HarbContract.contractAddress, + abi: HarbContract.abi, + eventName: 'PositionCreated', + async onLogs() { + await loadPositions(activeChainId.value); + }, + }); + + unwatchPositionRemoved = watchContractEvent(config as Config, { + address: HarbContract.contractAddress, + abi: HarbContract.abi, + eventName: 'PositionRemoved', + async onLogs() { + await loadPositions(activeChainId.value); + }, + }); + + isContractWatchActive = true; +} + +function stopContractEventWatchers() { + if (!isContractWatchActive) { + return; + } + if (unwatchPositionCreated) { + unwatchPositionCreated(); + unwatchPositionCreated = null; + } + if (unwatchPositionRemoved) { + unwatchPositionRemoved(); + unwatchPositionRemoved = null; + } + isContractWatchActive = false; +} + +function syncRealtimeMode() { + if (realtimeConsumerCount === 0) { + stopContractEventWatchers(); + stopPositionsPolling(); + return; + } + + const account = getAccount(config as Config); + const shouldUseContractEvents = Boolean(account.address); + + if (shouldUseContractEvents) { + stopPositionsPolling(); + startContractEventWatchers(); + } else { + stopContractEventWatchers(); + startPositionsPolling(); + } +} + +function registerRealtimeConsumer() { + realtimeConsumerCount += 1; + syncRealtimeMode(); +} + +function unregisterRealtimeConsumer() { + if (realtimeConsumerCount === 0) { + return; + } + realtimeConsumerCount -= 1; + if (realtimeConsumerCount === 0) { + stopContractEventWatchers(); + stopPositionsPolling(); + } +} + +function ensureWagmiWatchers() { + wagmiWatcherConsumers += 1; + if (wagmiWatcherConsumers > 1) { + return; + } + + if (!unwatchChainSwitch) { + unwatchChainSwitch = watchChainId(config as Config, { + async onChange(nextChainId) { + const resolvedChainId = nextChainId ?? DEFAULT_CHAIN_ID; + activeChainId.value = resolvedChainId; + await loadPositions(resolvedChainId); + syncRealtimeMode(); + }, + }); + } + + if (!unwatchAccountChanged) { + unwatchAccountChanged = watchAccount(config as Config, { + async onChange() { + await loadPositions(activeChainId.value); + syncRealtimeMode(); + }, + }); + } +} + +function teardownWagmiWatchers() { + wagmiWatcherConsumers = Math.max(0, wagmiWatcherConsumers - 1); + if (wagmiWatcherConsumers > 0) { + return; + } + if (unwatchChainSwitch) { + unwatchChainSwitch(); + unwatchChainSwitch = null; + } + if (unwatchAccountChanged) { + unwatchAccountChanged(); + unwatchAccountChanged = null; + } +} + export function usePositions(chainId: number = DEFAULT_CHAIN_ID) { activeChainId.value = chainId; - function watchEvent() { - unwatch = watchContractEvent(config as Config, { - address: HarbContract.contractAddress, - abi: HarbContract.abi, - eventName: 'PositionCreated', - async onLogs(_logs) { - // console.log("new Position", logs); - await loadPositions(); - // await getMinStake(); - }, - }); - } - - function watchPositionRemoved() { - unwatchPositionRemovedEvent = watchContractEvent(config as Config, { - address: HarbContract.contractAddress, - abi: HarbContract.abi, - eventName: 'PositionRemoved', - async onLogs(_logs) { - // console.log("Position removed", logs); - await loadPositions(); - // await getMinStake(); - }, - }); - } - onMounted(async () => { - //initial loading positions + ensureWagmiWatchers(); if (activePositions.value.length < 1 && loading.value === false) { await loadPositions(activeChainId.value); - // await getMinStake(); } - if (!unwatch) { - watchEvent(); - } - if (!unwatchPositionRemovedEvent) { - watchPositionRemoved(); - } - - if (!unwatchChainSwitch) { - unwatchChainSwitch = watchChainId(config as Config, { - async onChange(nextChainId) { - const resolvedChainId = nextChainId ?? DEFAULT_CHAIN_ID; - activeChainId.value = resolvedChainId; - await loadPositions(resolvedChainId); - }, - }); - } - - if (!unwatchAccountChanged) { - unwatchAccountChanged = watchAccount(config as Config, { - async onChange() { - await loadPositions(activeChainId.value); - }, - }); - } + registerRealtimeConsumer(); }); onUnmounted(() => { - if (unwatch) { - unwatch(); - unwatch = null; - } - if (unwatchPositionRemovedEvent) { - unwatchPositionRemovedEvent(); - unwatchPositionRemovedEvent = null; - } - if (unwatchChainSwitch) { - unwatchChainSwitch(); - unwatchChainSwitch = null; - } - if (unwatchAccountChanged) { - unwatchAccountChanged(); - unwatchAccountChanged = null; - } + unregisterRealtimeConsumer(); + teardownWagmiWatchers(); retryManager.clear(); }); @@ -379,10 +535,9 @@ export function usePositions(chainId: number = DEFAULT_CHAIN_ID) { myActivePositions, myClosedPositions, tresholdValue, - watchEvent, - watchPositionRemoved, createRandomPosition, positionsError, loading, + positionsUpdatedAt, }; }