diff --git a/src/services/providerUsage/__tests__/providerUsage.test.ts b/src/services/providerUsage/__tests__/providerUsage.test.ts new file mode 100644 index 000000000..6d56459db --- /dev/null +++ b/src/services/providerUsage/__tests__/providerUsage.test.ts @@ -0,0 +1,120 @@ +import { describe, test, expect, beforeEach } from 'bun:test' +import { anthropicAdapter } from '../adapters/anthropic.js' +import { openaiAdapter } from '../adapters/openai.js' +import { bedrockAdapter } from '../adapters/bedrock.js' +import { + getProviderUsage, + resetProviderUsage, + setProviderBalance, + subscribeProviderUsage, + updateProviderBuckets, +} from '../store.js' + +function headers(pairs: Record): Headers { + const h = new Headers() + for (const [k, v] of Object.entries(pairs)) h.set(k, v) + return h +} + +describe('anthropicAdapter', () => { + test('parses both 5h and 7d buckets', () => { + const h = headers({ + 'anthropic-ratelimit-unified-5h-utilization': '0.42', + 'anthropic-ratelimit-unified-5h-reset': '1800000000', + 'anthropic-ratelimit-unified-7d-utilization': '0.1', + 'anthropic-ratelimit-unified-7d-reset': '1800100000', + }) + const out = anthropicAdapter.parseHeaders(h) + expect(out).toHaveLength(2) + expect(out[0]).toMatchObject({ + kind: 'session', + label: 'Session', + utilization: 0.42, + resetsAt: 1800000000, + }) + expect(out[1]).toMatchObject({ + kind: 'weekly', + label: 'Weekly', + utilization: 0.1, + resetsAt: 1800100000, + }) + }) + + test('returns [] when headers absent (API key user)', () => { + expect(anthropicAdapter.parseHeaders(new Headers())).toEqual([]) + }) + + test('drops bucket with non-numeric utilization', () => { + const h = headers({ + 'anthropic-ratelimit-unified-5h-utilization': 'xx', + 'anthropic-ratelimit-unified-5h-reset': '0', + }) + expect(anthropicAdapter.parseHeaders(h)).toEqual([]) + }) +}) + +describe('openaiAdapter', () => { + test('computes RPM and TPM utilization from limit+remaining', () => { + const h = headers({ + 'x-ratelimit-limit-requests': '1000', + 'x-ratelimit-remaining-requests': '250', + 'x-ratelimit-limit-tokens': '100000', + 'x-ratelimit-remaining-tokens': '25000', + 'x-ratelimit-reset-requests': '6m', + }) + const out = openaiAdapter.parseHeaders(h) + expect(out).toHaveLength(2) + expect(out[0].kind).toBe('requests') + expect(out[0].label).toBe('RPM') + expect(out[0].utilization).toBeCloseTo(0.75, 5) + expect(out[1].kind).toBe('tokens') + expect(out[1].utilization).toBeCloseTo(0.75, 5) + }) + + test('returns [] when no relevant headers', () => { + expect(openaiAdapter.parseHeaders(new Headers())).toEqual([]) + }) +}) + +describe('bedrockAdapter', () => { + test('inverts quota-remaining into utilization', () => { + const h = headers({ + 'x-amzn-bedrock-quota-remaining': '0.3', + 'x-amzn-bedrock-quota-reset': '1800000000', + }) + const out = bedrockAdapter.parseHeaders(h) + expect(out).toHaveLength(1) + expect(out[0].kind).toBe('throttle') + expect(out[0].utilization).toBeCloseTo(0.7, 5) + expect(out[0].resetsAt).toBe(1800000000) + }) + + test('returns [] without header', () => { + expect(bedrockAdapter.parseHeaders(new Headers())).toEqual([]) + }) +}) + +describe('providerUsage store', () => { + beforeEach(() => { + resetProviderUsage() + }) + + test('updateProviderBuckets replaces buckets and notifies', () => { + const seen: string[] = [] + const unsub = subscribeProviderUsage(u => seen.push(u.providerId)) + updateProviderBuckets('openai', [ + { kind: 'tokens', label: 'TPM', utilization: 0.5 }, + ]) + expect(getProviderUsage().providerId).toBe('openai') + expect(getProviderUsage().buckets).toHaveLength(1) + expect(seen).toEqual(['openai']) + unsub() + }) + + test('setProviderBalance stores and clears', () => { + setProviderBalance('deepseek', { currency: 'USD', remaining: 3.5 }) + expect(getProviderUsage().balance?.remaining).toBe(3.5) + setProviderBalance('deepseek', null) + expect(getProviderUsage().balance).toBeUndefined() + }) +}) diff --git a/src/services/providerUsage/adapters/anthropic.ts b/src/services/providerUsage/adapters/anthropic.ts new file mode 100644 index 000000000..226985751 --- /dev/null +++ b/src/services/providerUsage/adapters/anthropic.ts @@ -0,0 +1,40 @@ +import type { ProviderUsageAdapter, ProviderUsageBucket } from '../types.js' + +export const anthropicAdapter: ProviderUsageAdapter = { + providerId: 'anthropic', + + /** + * Parse Anthropic's unified rate-limit headers. + * + * anthropic-ratelimit-unified-5h-utilization (0..1) + * anthropic-ratelimit-unified-5h-reset (unix seconds) + * anthropic-ratelimit-unified-7d-utilization + * anthropic-ratelimit-unified-7d-reset + * + * Only present for OAuth (Claude AI Pro/Max) subscribers. For raw API keys + * these headers are absent and this adapter returns []. + */ + parseHeaders(headers): ProviderUsageBucket[] { + const buckets: ProviderUsageBucket[] = [] + for (const [abbrev, kind, label] of [ + ['5h', 'session', 'Session'], + ['7d', 'weekly', 'Weekly'], + ] as const) { + const util = headers.get( + `anthropic-ratelimit-unified-${abbrev}-utilization`, + ) + const reset = headers.get(`anthropic-ratelimit-unified-${abbrev}-reset`) + if (util === null || reset === null) continue + const utilization = Number(util) + const resetsAt = Number(reset) + if (!Number.isFinite(utilization)) continue + buckets.push({ + kind, + label, + utilization, + ...(Number.isFinite(resetsAt) && resetsAt > 0 ? { resetsAt } : {}), + }) + } + return buckets + }, +} diff --git a/src/services/providerUsage/adapters/bedrock.ts b/src/services/providerUsage/adapters/bedrock.ts new file mode 100644 index 000000000..1dba007c2 --- /dev/null +++ b/src/services/providerUsage/adapters/bedrock.ts @@ -0,0 +1,38 @@ +import type { ProviderUsageAdapter, ProviderUsageBucket } from '../types.js' + +/** + * AWS Bedrock rate-limit / throttling headers. + * + * Bedrock does not expose a precise per-minute quota the way OpenAI or + * Anthropic do — the only reliably-present signal is `x-amzn-bedrock-*` + * metadata on the response. We surface *throttle pressure* as a bucket + * only when we can derive a meaningful 0..1 signal; otherwise return []. + * + * x-amzn-bedrock-quota-remaining (0..1 fraction, when present on some models) + * x-amzn-bedrock-quota-reset (unix seconds) + * retry-after (seconds, present on 429) + */ +export const bedrockAdapter: ProviderUsageAdapter = { + providerId: 'bedrock', + parseHeaders(headers): ProviderUsageBucket[] { + const buckets: ProviderUsageBucket[] = [] + + const remainingRaw = headers.get('x-amzn-bedrock-quota-remaining') + const resetRaw = headers.get('x-amzn-bedrock-quota-reset') + + if (remainingRaw !== null) { + const remaining = Number(remainingRaw) + if (Number.isFinite(remaining) && remaining >= 0 && remaining <= 1) { + const resetsAt = resetRaw !== null ? Number(resetRaw) : 0 + buckets.push({ + kind: 'throttle', + label: 'Throttle', + utilization: 1 - remaining, + ...(Number.isFinite(resetsAt) && resetsAt > 0 ? { resetsAt } : {}), + }) + } + } + + return buckets + }, +} diff --git a/src/services/providerUsage/adapters/openai.ts b/src/services/providerUsage/adapters/openai.ts new file mode 100644 index 000000000..57fdb1b8f --- /dev/null +++ b/src/services/providerUsage/adapters/openai.ts @@ -0,0 +1,97 @@ +import type { ProviderUsageAdapter, ProviderUsageBucket } from '../types.js' + +/** + * Parse a Retry-After-style duration string (e.g. "6m0s", "1h30m", "500ms") + * into unix epoch seconds *from now*. Returns 0 if unparseable. + */ +function parseResetAt(value: string | null): number { + if (!value) return 0 + let seconds = 0 + const re = /(\d+(?:\.\d+)?)(ms|s|m|h|d)/g + let match: RegExpExecArray | null + while ((match = re.exec(value)) !== null) { + const n = Number(match[1]) + const unit = match[2] + switch (unit) { + case 'ms': + seconds += n / 1000 + break + case 's': + seconds += n + break + case 'm': + seconds += n * 60 + break + case 'h': + seconds += n * 3600 + break + case 'd': + seconds += n * 86400 + break + } + } + if (seconds === 0) { + const n = Number(value) + if (Number.isFinite(n)) seconds = n + } + if (seconds <= 0) return 0 + return Math.floor(Date.now() / 1000) + seconds +} + +function computeUtilization( + remaining: string | null, + limit: string | null, +): number | null { + if (remaining === null || limit === null) return null + const r = Number(remaining) + const l = Number(limit) + if (!Number.isFinite(r) || !Number.isFinite(l) || l <= 0) return null + const used = Math.max(0, l - r) + return Math.min(1, Math.max(0, used / l)) +} + +/** + * OpenAI-compatible rate-limit headers. + * + * x-ratelimit-limit-requests / x-ratelimit-remaining-requests / x-ratelimit-reset-requests + * x-ratelimit-limit-tokens / x-ratelimit-remaining-tokens / x-ratelimit-reset-tokens + * + * Works for OpenAI, DeepSeek, Moonshot, Grok (xAI) and many self-hosted + * OpenAI-compatible gateways. + */ +export const openaiAdapter: ProviderUsageAdapter = { + providerId: 'openai', + parseHeaders(headers): ProviderUsageBucket[] { + const buckets: ProviderUsageBucket[] = [] + + const reqUtil = computeUtilization( + headers.get('x-ratelimit-remaining-requests'), + headers.get('x-ratelimit-limit-requests'), + ) + if (reqUtil !== null) { + buckets.push({ + kind: 'requests', + label: 'RPM', + utilization: reqUtil, + resetsAt: + parseResetAt(headers.get('x-ratelimit-reset-requests')) || undefined, + }) + } + + const tokUtil = computeUtilization( + headers.get('x-ratelimit-remaining-tokens'), + headers.get('x-ratelimit-limit-tokens'), + ) + if (tokUtil !== null) { + buckets.push({ + kind: 'tokens', + label: 'TPM', + utilization: tokUtil, + resetsAt: + parseResetAt(headers.get('x-ratelimit-reset-tokens')) || undefined, + }) + } + + return buckets + }, +} diff --git a/src/services/providerUsage/balance/deepseek.ts b/src/services/providerUsage/balance/deepseek.ts new file mode 100644 index 000000000..92db9f62a --- /dev/null +++ b/src/services/providerUsage/balance/deepseek.ts @@ -0,0 +1,85 @@ +import type { ProviderBalance } from '../types.js' +import type { BalanceProvider } from './types.js' + +/** + * DeepSeek exposes balance at `GET /user/balance`. + * + * Enabled when: + * - OPENAI_BASE_URL points at api.deepseek.com, OR + * - DEEPSEEK_API_KEY is set (explicit opt-in). + * + * Response shape: + * { is_available: true, balance_infos: [{ currency:"USD", total_balance:"5.00", ... }, ...] } + */ + +function getBaseUrl(): string | null { + const url = process.env.OPENAI_BASE_URL + if (url && /\bapi\.deepseek\.com\b/i.test(url)) return url.replace(/\/+$/, '') + if (process.env.DEEPSEEK_API_KEY) return 'https://api.deepseek.com' + return null +} + +function getApiKey(): string | null { + return process.env.DEEPSEEK_API_KEY || process.env.OPENAI_API_KEY || null +} + +export const deepseekBalanceProvider: BalanceProvider = { + providerId: 'deepseek', + + isEnabled(): boolean { + return getBaseUrl() !== null && getApiKey() !== null + }, + + async fetchBalance(signal?: AbortSignal): Promise { + const base = getBaseUrl() + const key = getApiKey() + if (!base || !key) return null + + let res: Response + try { + res = await fetch(`${base}/user/balance`, { + method: 'GET', + headers: { + Authorization: `Bearer ${key}`, + Accept: 'application/json', + }, + signal, + }) + } catch { + return null + } + if (!res.ok) return null + + let data: unknown + try { + data = await res.json() + } catch { + return null + } + + const infos = (data as { balance_infos?: unknown })?.balance_infos + if (!Array.isArray(infos)) return null + + // Prefer USD; fall back to the first entry. + const usd = infos.find( + (e: unknown) => + typeof e === 'object' && + e !== null && + (e as { currency?: unknown }).currency === 'USD', + ) as Record | undefined + const pick = usd ?? (infos[0] as Record) ?? null + if (!pick) return null + + const currency = typeof pick.currency === 'string' ? pick.currency : 'USD' + const remainingRaw = pick.total_balance + const remaining = + typeof remainingRaw === 'number' ? remainingRaw : Number(remainingRaw) + if (!Number.isFinite(remaining)) return null + + return { + currency, + remaining, + updatedAt: Math.floor(Date.now() / 1000), + } + }, +} diff --git a/src/services/providerUsage/balance/generic.ts b/src/services/providerUsage/balance/generic.ts new file mode 100644 index 000000000..d1e1c06ce --- /dev/null +++ b/src/services/providerUsage/balance/generic.ts @@ -0,0 +1,118 @@ +import type { ProviderBalance } from '../types.js' +import type { BalanceProvider } from './types.js' + +/** + * Generic URL+key balance provider. + * + * Environment: + * CLAUDE_CODE_BALANCE_URL — GET endpoint returning JSON (required) + * CLAUDE_CODE_BALANCE_KEY — optional Bearer token (falls back to OPENAI_API_KEY / ANTHROPIC_API_KEY) + * CLAUDE_CODE_BALANCE_JSON_PATH — dot path into the JSON for the remaining number (default: "balance") + * array indices allowed, e.g. "data.0.credit" + * CLAUDE_CODE_BALANCE_CURRENCY — display currency label (default: "USD") + * + * Kept intentionally permissive so any OpenAI-compatible "my balance" endpoint + * can be wired up without writing new code. + */ + +function pickAtPath(obj: unknown, path: string): unknown { + if (!path) return obj + const parts = path.split('.').filter(Boolean) + let cur: unknown = obj + for (const part of parts) { + if (cur === null || cur === undefined) return undefined + if (Array.isArray(cur)) { + const idx = Number(part) + if (!Number.isFinite(idx)) return undefined + cur = cur[idx] + } else if (typeof cur === 'object') { + cur = (cur as Record)[part] + } else { + return undefined + } + } + return cur +} + +const PRIVATE_IP_RE = + /^(10\.|192\.168\.|172\.(1[6-9]|2\d|3[01])\.|169\.254\.|127\.|0\.0\.0\.0|fc|fd|\[::1\]|\[fe80:)/ + +function assertSafeBalanceUrl(raw: string): URL { + const parsed = new URL(raw) + if (parsed.protocol !== 'https:' && parsed.protocol !== 'http:') { + throw new Error(`unsupported protocol: ${parsed.protocol}`) + } + if ( + parsed.protocol === 'http:' && + !['localhost', '127.0.0.1', '[::1]'].includes(parsed.hostname) + ) { + throw new Error(`http only allowed for localhost, got ${parsed.hostname}`) + } + if (PRIVATE_IP_RE.test(parsed.hostname)) { + throw new Error(`private/reserved IP not allowed: ${parsed.hostname}`) + } + return parsed +} + +export const genericBalanceProvider: BalanceProvider = { + providerId: 'generic', + + isEnabled(): boolean { + return Boolean(process.env.CLAUDE_CODE_BALANCE_URL) + }, + + async fetchBalance(signal?: AbortSignal): Promise { + const rawUrl = process.env.CLAUDE_CODE_BALANCE_URL + if (!rawUrl) return null + + let url: URL + try { + url = assertSafeBalanceUrl(rawUrl) + } catch { + return null + } + + // Fallback chain: BALANCE_KEY → OPENAI_API_KEY → ANTHROPIC_API_KEY. + // WARNING: fallback keys are sent to CLAUDE_CODE_BALANCE_URL as Bearer token. + // If that URL is untrusted, your provider key leaks. Prefer CLAUDE_CODE_BALANCE_KEY. + const key = + process.env.CLAUDE_CODE_BALANCE_KEY || + process.env.OPENAI_API_KEY || + process.env.ANTHROPIC_API_KEY || + '' + const path = process.env.CLAUDE_CODE_BALANCE_JSON_PATH || 'balance' + const currency = process.env.CLAUDE_CODE_BALANCE_CURRENCY || 'USD' + + let res: Response + try { + res = await fetch(url.href, { + method: 'GET', + headers: { + Accept: 'application/json', + ...(key ? { Authorization: `Bearer ${key}` } : {}), + }, + signal, + }) + } catch { + return null + } + if (!res.ok) return null + + let data: unknown + try { + data = await res.json() + } catch { + return null + } + + const raw = pickAtPath(data, path) + const remaining = typeof raw === 'number' ? raw : Number(raw) + if (!Number.isFinite(remaining)) return null + + return { + currency, + remaining, + updatedAt: Math.floor(Date.now() / 1000), + } + }, +} diff --git a/src/services/providerUsage/balance/poller.ts b/src/services/providerUsage/balance/poller.ts new file mode 100644 index 000000000..325d767a2 --- /dev/null +++ b/src/services/providerUsage/balance/poller.ts @@ -0,0 +1,78 @@ +import { setProviderBalance } from '../store.js' +import { deepseekBalanceProvider } from './deepseek.js' +import { genericBalanceProvider } from './generic.js' +import type { BalanceProvider } from './types.js' + +const DEFAULT_INTERVAL_MIN = 10 + +// Registration order = priority. First enabled wins. Generic (user-supplied +// URL) comes first so operators can override the built-in DeepSeek detection. +const PROVIDERS: BalanceProvider[] = [ + genericBalanceProvider, + deepseekBalanceProvider, +] + +function selectProvider(): BalanceProvider | null { + if (process.env.CLAUDE_CODE_BALANCE_PROVIDER === 'none') return null + return PROVIDERS.find(p => p.isEnabled()) ?? null +} + +function intervalMs(): number { + const raw = process.env.CLAUDE_CODE_BALANCE_POLL_INTERVAL_MINUTES + const n = raw ? Number(raw) : DEFAULT_INTERVAL_MIN + if (!Number.isFinite(n) || n <= 0) return DEFAULT_INTERVAL_MIN * 60_000 + return Math.floor(n * 60_000) +} + +let timer: ReturnType | null = null +let inflight: AbortController | null = null +let active: BalanceProvider | null = null + +const FETCH_TIMEOUT_MS = 10_000 + +async function tick(): Promise { + if (!active) return + inflight?.abort() + inflight = new AbortController() + const timeout = setTimeout(() => inflight?.abort(), FETCH_TIMEOUT_MS) + try { + const balance = await active.fetchBalance(inflight.signal) + setProviderBalance(active.providerId, balance) + } catch { + // Never bubble into the host process. + } finally { + clearTimeout(timeout) + } +} + +/** Start polling if a provider is configured. Idempotent. */ +export function startBalancePolling(): void { + if (timer !== null) return + active = selectProvider() + if (!active) return + // Kick off immediately, then on interval. + void tick() + timer = setInterval(() => { + void tick() + }, intervalMs()) + // Don't keep the event loop alive just for the poller. + if ( + typeof (timer as unknown as { unref?: () => void }).unref === 'function' + ) { + ;(timer as unknown as { unref: () => void }).unref() + } +} + +export function stopBalancePolling(): void { + if (timer !== null) { + clearInterval(timer) + timer = null + } + inflight?.abort() + inflight = null + active = null +} + +export function getActiveBalanceProviderId(): string | null { + return active?.providerId ?? null +} diff --git a/src/services/providerUsage/balance/types.ts b/src/services/providerUsage/balance/types.ts new file mode 100644 index 000000000..fbf938c9d --- /dev/null +++ b/src/services/providerUsage/balance/types.ts @@ -0,0 +1,9 @@ +import type { ProviderBalance } from '../types.js' + +export interface BalanceProvider { + readonly providerId: string + /** Whether the user has configured this provider (env vars etc.). */ + isEnabled(): boolean + /** Fetch a fresh snapshot; return null on any soft failure. */ + fetchBalance(signal?: AbortSignal): Promise +} diff --git a/src/services/providerUsage/store.ts b/src/services/providerUsage/store.ts new file mode 100644 index 000000000..526170006 --- /dev/null +++ b/src/services/providerUsage/store.ts @@ -0,0 +1,68 @@ +import type { + ProviderBalance, + ProviderUsage, + ProviderUsageBucket, +} from './types.js' + +type Listener = (snapshot: ProviderUsage) => void + +let current: ProviderUsage = { + providerId: 'unknown', + buckets: [], +} + +const listeners: Set = new Set() + +export function getProviderUsage(): ProviderUsage { + return current +} + +/** + * Replace buckets for a provider. Passing an empty array is valid — it records + * that the latest response carried no usable quota header. + */ +export function updateProviderBuckets( + providerId: string, + buckets: ProviderUsageBucket[], +): void { + current = { + ...current, + providerId, + buckets, + } + emit() +} + +export function setProviderBalance( + providerId: string, + balance: ProviderBalance | null, +): void { + current = { + ...current, + providerId, + ...(balance === null ? { balance: undefined } : { balance }), + } + emit() +} + +export function subscribeProviderUsage(listener: Listener): () => void { + listeners.add(listener) + return () => { + listeners.delete(listener) + } +} + +export function resetProviderUsage(): void { + current = { providerId: 'unknown', buckets: [] } + emit() +} + +function emit(): void { + for (const listener of listeners) { + try { + listener(current) + } catch { + // Listener errors must not break the publish loop. + } + } +} diff --git a/src/services/providerUsage/types.ts b/src/services/providerUsage/types.ts new file mode 100644 index 000000000..8163b0a9b --- /dev/null +++ b/src/services/providerUsage/types.ts @@ -0,0 +1,40 @@ +/** + * Unified provider usage model. + * + * Each API client (Anthropic, OpenAI, Bedrock, ...) parses its own response + * headers through a `ProviderUsageAdapter` and pushes buckets into the store. + * A balance poller may additionally populate `ProviderBalance`. + */ + +export type BucketKind = + | 'session' // Anthropic 5-hour window + | 'weekly' // Anthropic 7-day window + | 'requests' // OpenAI-style RPM bucket + | 'tokens' // OpenAI-style TPM bucket + | 'throttle' // Bedrock / generic throttle + | 'custom' + +export interface ProviderUsageBucket { + kind: BucketKind + label: string + utilization: number + resetsAt?: number +} + +export interface ProviderBalance { + currency: string + remaining: number + total?: number + updatedAt?: number +} + +export interface ProviderUsage { + providerId: string + buckets: ProviderUsageBucket[] + balance?: ProviderBalance +} + +export interface ProviderUsageAdapter { + providerId: string + parseHeaders(headers: globalThis.Headers): ProviderUsageBucket[] +}