feat: 添加 provider usage 统计与余额查询

- 新增 providerUsage 服务(anthropic/bedrock/openai 适配器)
- 新增余额查询(deepseek/generic poller)
- StatusLine 保留原有 rateLimits 接口不变

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
unraid
2026-04-22 22:38:09 +08:00
parent 1837df5f88
commit 31b2fdd97a
10 changed files with 693 additions and 0 deletions

View File

@@ -0,0 +1,120 @@
import { describe, test, expect, beforeEach } from 'bun:test'
import { anthropicAdapter } from '../adapters/anthropic.js'
import { openaiAdapter } from '../adapters/openai.js'
import { bedrockAdapter } from '../adapters/bedrock.js'
import {
getProviderUsage,
resetProviderUsage,
setProviderBalance,
subscribeProviderUsage,
updateProviderBuckets,
} from '../store.js'
function headers(pairs: Record<string, string>): Headers {
const h = new Headers()
for (const [k, v] of Object.entries(pairs)) h.set(k, v)
return h
}
describe('anthropicAdapter', () => {
test('parses both 5h and 7d buckets', () => {
const h = headers({
'anthropic-ratelimit-unified-5h-utilization': '0.42',
'anthropic-ratelimit-unified-5h-reset': '1800000000',
'anthropic-ratelimit-unified-7d-utilization': '0.1',
'anthropic-ratelimit-unified-7d-reset': '1800100000',
})
const out = anthropicAdapter.parseHeaders(h)
expect(out).toHaveLength(2)
expect(out[0]).toMatchObject({
kind: 'session',
label: 'Session',
utilization: 0.42,
resetsAt: 1800000000,
})
expect(out[1]).toMatchObject({
kind: 'weekly',
label: 'Weekly',
utilization: 0.1,
resetsAt: 1800100000,
})
})
test('returns [] when headers absent (API key user)', () => {
expect(anthropicAdapter.parseHeaders(new Headers())).toEqual([])
})
test('drops bucket with non-numeric utilization', () => {
const h = headers({
'anthropic-ratelimit-unified-5h-utilization': 'xx',
'anthropic-ratelimit-unified-5h-reset': '0',
})
expect(anthropicAdapter.parseHeaders(h)).toEqual([])
})
})
describe('openaiAdapter', () => {
test('computes RPM and TPM utilization from limit+remaining', () => {
const h = headers({
'x-ratelimit-limit-requests': '1000',
'x-ratelimit-remaining-requests': '250',
'x-ratelimit-limit-tokens': '100000',
'x-ratelimit-remaining-tokens': '25000',
'x-ratelimit-reset-requests': '6m',
})
const out = openaiAdapter.parseHeaders(h)
expect(out).toHaveLength(2)
expect(out[0].kind).toBe('requests')
expect(out[0].label).toBe('RPM')
expect(out[0].utilization).toBeCloseTo(0.75, 5)
expect(out[1].kind).toBe('tokens')
expect(out[1].utilization).toBeCloseTo(0.75, 5)
})
test('returns [] when no relevant headers', () => {
expect(openaiAdapter.parseHeaders(new Headers())).toEqual([])
})
})
describe('bedrockAdapter', () => {
test('inverts quota-remaining into utilization', () => {
const h = headers({
'x-amzn-bedrock-quota-remaining': '0.3',
'x-amzn-bedrock-quota-reset': '1800000000',
})
const out = bedrockAdapter.parseHeaders(h)
expect(out).toHaveLength(1)
expect(out[0].kind).toBe('throttle')
expect(out[0].utilization).toBeCloseTo(0.7, 5)
expect(out[0].resetsAt).toBe(1800000000)
})
test('returns [] without header', () => {
expect(bedrockAdapter.parseHeaders(new Headers())).toEqual([])
})
})
describe('providerUsage store', () => {
beforeEach(() => {
resetProviderUsage()
})
test('updateProviderBuckets replaces buckets and notifies', () => {
const seen: string[] = []
const unsub = subscribeProviderUsage(u => seen.push(u.providerId))
updateProviderBuckets('openai', [
{ kind: 'tokens', label: 'TPM', utilization: 0.5 },
])
expect(getProviderUsage().providerId).toBe('openai')
expect(getProviderUsage().buckets).toHaveLength(1)
expect(seen).toEqual(['openai'])
unsub()
})
test('setProviderBalance stores and clears', () => {
setProviderBalance('deepseek', { currency: 'USD', remaining: 3.5 })
expect(getProviderUsage().balance?.remaining).toBe(3.5)
setProviderBalance('deepseek', null)
expect(getProviderUsage().balance).toBeUndefined()
})
})

View File

@@ -0,0 +1,40 @@
import type { ProviderUsageAdapter, ProviderUsageBucket } from '../types.js'
export const anthropicAdapter: ProviderUsageAdapter = {
providerId: 'anthropic',
/**
* Parse Anthropic's unified rate-limit headers.
*
* anthropic-ratelimit-unified-5h-utilization (0..1)
* anthropic-ratelimit-unified-5h-reset (unix seconds)
* anthropic-ratelimit-unified-7d-utilization
* anthropic-ratelimit-unified-7d-reset
*
* Only present for OAuth (Claude AI Pro/Max) subscribers. For raw API keys
* these headers are absent and this adapter returns [].
*/
parseHeaders(headers): ProviderUsageBucket[] {
const buckets: ProviderUsageBucket[] = []
for (const [abbrev, kind, label] of [
['5h', 'session', 'Session'],
['7d', 'weekly', 'Weekly'],
] as const) {
const util = headers.get(
`anthropic-ratelimit-unified-${abbrev}-utilization`,
)
const reset = headers.get(`anthropic-ratelimit-unified-${abbrev}-reset`)
if (util === null || reset === null) continue
const utilization = Number(util)
const resetsAt = Number(reset)
if (!Number.isFinite(utilization)) continue
buckets.push({
kind,
label,
utilization,
...(Number.isFinite(resetsAt) && resetsAt > 0 ? { resetsAt } : {}),
})
}
return buckets
},
}

View File

@@ -0,0 +1,38 @@
import type { ProviderUsageAdapter, ProviderUsageBucket } from '../types.js'
/**
* AWS Bedrock rate-limit / throttling headers.
*
* Bedrock does not expose a precise per-minute quota the way OpenAI or
* Anthropic do — the only reliably-present signal is `x-amzn-bedrock-*`
* metadata on the response. We surface *throttle pressure* as a bucket
* only when we can derive a meaningful 0..1 signal; otherwise return [].
*
* x-amzn-bedrock-quota-remaining (0..1 fraction, when present on some models)
* x-amzn-bedrock-quota-reset (unix seconds)
* retry-after (seconds, present on 429)
*/
export const bedrockAdapter: ProviderUsageAdapter = {
providerId: 'bedrock',
parseHeaders(headers): ProviderUsageBucket[] {
const buckets: ProviderUsageBucket[] = []
const remainingRaw = headers.get('x-amzn-bedrock-quota-remaining')
const resetRaw = headers.get('x-amzn-bedrock-quota-reset')
if (remainingRaw !== null) {
const remaining = Number(remainingRaw)
if (Number.isFinite(remaining) && remaining >= 0 && remaining <= 1) {
const resetsAt = resetRaw !== null ? Number(resetRaw) : 0
buckets.push({
kind: 'throttle',
label: 'Throttle',
utilization: 1 - remaining,
...(Number.isFinite(resetsAt) && resetsAt > 0 ? { resetsAt } : {}),
})
}
}
return buckets
},
}

View File

@@ -0,0 +1,97 @@
import type { ProviderUsageAdapter, ProviderUsageBucket } from '../types.js'
/**
* Parse a Retry-After-style duration string (e.g. "6m0s", "1h30m", "500ms")
* into unix epoch seconds *from now*. Returns 0 if unparseable.
*/
function parseResetAt(value: string | null): number {
if (!value) return 0
let seconds = 0
const re = /(\d+(?:\.\d+)?)(ms|s|m|h|d)/g
let match: RegExpExecArray | null
while ((match = re.exec(value)) !== null) {
const n = Number(match[1])
const unit = match[2]
switch (unit) {
case 'ms':
seconds += n / 1000
break
case 's':
seconds += n
break
case 'm':
seconds += n * 60
break
case 'h':
seconds += n * 3600
break
case 'd':
seconds += n * 86400
break
}
}
if (seconds === 0) {
const n = Number(value)
if (Number.isFinite(n)) seconds = n
}
if (seconds <= 0) return 0
return Math.floor(Date.now() / 1000) + seconds
}
function computeUtilization(
remaining: string | null,
limit: string | null,
): number | null {
if (remaining === null || limit === null) return null
const r = Number(remaining)
const l = Number(limit)
if (!Number.isFinite(r) || !Number.isFinite(l) || l <= 0) return null
const used = Math.max(0, l - r)
return Math.min(1, Math.max(0, used / l))
}
/**
* OpenAI-compatible rate-limit headers.
*
* x-ratelimit-limit-requests / x-ratelimit-remaining-requests / x-ratelimit-reset-requests
* x-ratelimit-limit-tokens / x-ratelimit-remaining-tokens / x-ratelimit-reset-tokens
*
* Works for OpenAI, DeepSeek, Moonshot, Grok (xAI) and many self-hosted
* OpenAI-compatible gateways.
*/
export const openaiAdapter: ProviderUsageAdapter = {
providerId: 'openai',
parseHeaders(headers): ProviderUsageBucket[] {
const buckets: ProviderUsageBucket[] = []
const reqUtil = computeUtilization(
headers.get('x-ratelimit-remaining-requests'),
headers.get('x-ratelimit-limit-requests'),
)
if (reqUtil !== null) {
buckets.push({
kind: 'requests',
label: 'RPM',
utilization: reqUtil,
resetsAt:
parseResetAt(headers.get('x-ratelimit-reset-requests')) || undefined,
})
}
const tokUtil = computeUtilization(
headers.get('x-ratelimit-remaining-tokens'),
headers.get('x-ratelimit-limit-tokens'),
)
if (tokUtil !== null) {
buckets.push({
kind: 'tokens',
label: 'TPM',
utilization: tokUtil,
resetsAt:
parseResetAt(headers.get('x-ratelimit-reset-tokens')) || undefined,
})
}
return buckets
},
}

View File

@@ -0,0 +1,85 @@
import type { ProviderBalance } from '../types.js'
import type { BalanceProvider } from './types.js'
/**
* DeepSeek exposes balance at `GET /user/balance`.
*
* Enabled when:
* - OPENAI_BASE_URL points at api.deepseek.com, OR
* - DEEPSEEK_API_KEY is set (explicit opt-in).
*
* Response shape:
* { is_available: true, balance_infos: [{ currency:"USD", total_balance:"5.00", ... }, ...] }
*/
function getBaseUrl(): string | null {
const url = process.env.OPENAI_BASE_URL
if (url && /\bapi\.deepseek\.com\b/i.test(url)) return url.replace(/\/+$/, '')
if (process.env.DEEPSEEK_API_KEY) return 'https://api.deepseek.com'
return null
}
function getApiKey(): string | null {
return process.env.DEEPSEEK_API_KEY || process.env.OPENAI_API_KEY || null
}
export const deepseekBalanceProvider: BalanceProvider = {
providerId: 'deepseek',
isEnabled(): boolean {
return getBaseUrl() !== null && getApiKey() !== null
},
async fetchBalance(signal?: AbortSignal): Promise<ProviderBalance | null> {
const base = getBaseUrl()
const key = getApiKey()
if (!base || !key) return null
let res: Response
try {
res = await fetch(`${base}/user/balance`, {
method: 'GET',
headers: {
Authorization: `Bearer ${key}`,
Accept: 'application/json',
},
signal,
})
} catch {
return null
}
if (!res.ok) return null
let data: unknown
try {
data = await res.json()
} catch {
return null
}
const infos = (data as { balance_infos?: unknown })?.balance_infos
if (!Array.isArray(infos)) return null
// Prefer USD; fall back to the first entry.
const usd = infos.find(
(e: unknown) =>
typeof e === 'object' &&
e !== null &&
(e as { currency?: unknown }).currency === 'USD',
) as Record<string, unknown> | undefined
const pick = usd ?? (infos[0] as Record<string, unknown>) ?? null
if (!pick) return null
const currency = typeof pick.currency === 'string' ? pick.currency : 'USD'
const remainingRaw = pick.total_balance
const remaining =
typeof remainingRaw === 'number' ? remainingRaw : Number(remainingRaw)
if (!Number.isFinite(remaining)) return null
return {
currency,
remaining,
updatedAt: Math.floor(Date.now() / 1000),
}
},
}

View File

@@ -0,0 +1,118 @@
import type { ProviderBalance } from '../types.js'
import type { BalanceProvider } from './types.js'
/**
* Generic URL+key balance provider.
*
* Environment:
* CLAUDE_CODE_BALANCE_URL — GET endpoint returning JSON (required)
* CLAUDE_CODE_BALANCE_KEY — optional Bearer token (falls back to OPENAI_API_KEY / ANTHROPIC_API_KEY)
* CLAUDE_CODE_BALANCE_JSON_PATH — dot path into the JSON for the remaining number (default: "balance")
* array indices allowed, e.g. "data.0.credit"
* CLAUDE_CODE_BALANCE_CURRENCY — display currency label (default: "USD")
*
* Kept intentionally permissive so any OpenAI-compatible "my balance" endpoint
* can be wired up without writing new code.
*/
function pickAtPath(obj: unknown, path: string): unknown {
if (!path) return obj
const parts = path.split('.').filter(Boolean)
let cur: unknown = obj
for (const part of parts) {
if (cur === null || cur === undefined) return undefined
if (Array.isArray(cur)) {
const idx = Number(part)
if (!Number.isFinite(idx)) return undefined
cur = cur[idx]
} else if (typeof cur === 'object') {
cur = (cur as Record<string, unknown>)[part]
} else {
return undefined
}
}
return cur
}
const PRIVATE_IP_RE =
/^(10\.|192\.168\.|172\.(1[6-9]|2\d|3[01])\.|169\.254\.|127\.|0\.0\.0\.0|fc|fd|\[::1\]|\[fe80:)/
function assertSafeBalanceUrl(raw: string): URL {
const parsed = new URL(raw)
if (parsed.protocol !== 'https:' && parsed.protocol !== 'http:') {
throw new Error(`unsupported protocol: ${parsed.protocol}`)
}
if (
parsed.protocol === 'http:' &&
!['localhost', '127.0.0.1', '[::1]'].includes(parsed.hostname)
) {
throw new Error(`http only allowed for localhost, got ${parsed.hostname}`)
}
if (PRIVATE_IP_RE.test(parsed.hostname)) {
throw new Error(`private/reserved IP not allowed: ${parsed.hostname}`)
}
return parsed
}
export const genericBalanceProvider: BalanceProvider = {
providerId: 'generic',
isEnabled(): boolean {
return Boolean(process.env.CLAUDE_CODE_BALANCE_URL)
},
async fetchBalance(signal?: AbortSignal): Promise<ProviderBalance | null> {
const rawUrl = process.env.CLAUDE_CODE_BALANCE_URL
if (!rawUrl) return null
let url: URL
try {
url = assertSafeBalanceUrl(rawUrl)
} catch {
return null
}
// Fallback chain: BALANCE_KEY → OPENAI_API_KEY → ANTHROPIC_API_KEY.
// WARNING: fallback keys are sent to CLAUDE_CODE_BALANCE_URL as Bearer token.
// If that URL is untrusted, your provider key leaks. Prefer CLAUDE_CODE_BALANCE_KEY.
const key =
process.env.CLAUDE_CODE_BALANCE_KEY ||
process.env.OPENAI_API_KEY ||
process.env.ANTHROPIC_API_KEY ||
''
const path = process.env.CLAUDE_CODE_BALANCE_JSON_PATH || 'balance'
const currency = process.env.CLAUDE_CODE_BALANCE_CURRENCY || 'USD'
let res: Response
try {
res = await fetch(url.href, {
method: 'GET',
headers: {
Accept: 'application/json',
...(key ? { Authorization: `Bearer ${key}` } : {}),
},
signal,
})
} catch {
return null
}
if (!res.ok) return null
let data: unknown
try {
data = await res.json()
} catch {
return null
}
const raw = pickAtPath(data, path)
const remaining = typeof raw === 'number' ? raw : Number(raw)
if (!Number.isFinite(remaining)) return null
return {
currency,
remaining,
updatedAt: Math.floor(Date.now() / 1000),
}
},
}

View File

@@ -0,0 +1,78 @@
import { setProviderBalance } from '../store.js'
import { deepseekBalanceProvider } from './deepseek.js'
import { genericBalanceProvider } from './generic.js'
import type { BalanceProvider } from './types.js'
const DEFAULT_INTERVAL_MIN = 10
// Registration order = priority. First enabled wins. Generic (user-supplied
// URL) comes first so operators can override the built-in DeepSeek detection.
const PROVIDERS: BalanceProvider[] = [
genericBalanceProvider,
deepseekBalanceProvider,
]
function selectProvider(): BalanceProvider | null {
if (process.env.CLAUDE_CODE_BALANCE_PROVIDER === 'none') return null
return PROVIDERS.find(p => p.isEnabled()) ?? null
}
function intervalMs(): number {
const raw = process.env.CLAUDE_CODE_BALANCE_POLL_INTERVAL_MINUTES
const n = raw ? Number(raw) : DEFAULT_INTERVAL_MIN
if (!Number.isFinite(n) || n <= 0) return DEFAULT_INTERVAL_MIN * 60_000
return Math.floor(n * 60_000)
}
let timer: ReturnType<typeof setInterval> | null = null
let inflight: AbortController | null = null
let active: BalanceProvider | null = null
const FETCH_TIMEOUT_MS = 10_000
async function tick(): Promise<void> {
if (!active) return
inflight?.abort()
inflight = new AbortController()
const timeout = setTimeout(() => inflight?.abort(), FETCH_TIMEOUT_MS)
try {
const balance = await active.fetchBalance(inflight.signal)
setProviderBalance(active.providerId, balance)
} catch {
// Never bubble into the host process.
} finally {
clearTimeout(timeout)
}
}
/** Start polling if a provider is configured. Idempotent. */
export function startBalancePolling(): void {
if (timer !== null) return
active = selectProvider()
if (!active) return
// Kick off immediately, then on interval.
void tick()
timer = setInterval(() => {
void tick()
}, intervalMs())
// Don't keep the event loop alive just for the poller.
if (
typeof (timer as unknown as { unref?: () => void }).unref === 'function'
) {
;(timer as unknown as { unref: () => void }).unref()
}
}
export function stopBalancePolling(): void {
if (timer !== null) {
clearInterval(timer)
timer = null
}
inflight?.abort()
inflight = null
active = null
}
export function getActiveBalanceProviderId(): string | null {
return active?.providerId ?? null
}

View File

@@ -0,0 +1,9 @@
import type { ProviderBalance } from '../types.js'
export interface BalanceProvider {
readonly providerId: string
/** Whether the user has configured this provider (env vars etc.). */
isEnabled(): boolean
/** Fetch a fresh snapshot; return null on any soft failure. */
fetchBalance(signal?: AbortSignal): Promise<ProviderBalance | null>
}

View File

@@ -0,0 +1,68 @@
import type {
ProviderBalance,
ProviderUsage,
ProviderUsageBucket,
} from './types.js'
type Listener = (snapshot: ProviderUsage) => void
let current: ProviderUsage = {
providerId: 'unknown',
buckets: [],
}
const listeners: Set<Listener> = new Set()
export function getProviderUsage(): ProviderUsage {
return current
}
/**
* Replace buckets for a provider. Passing an empty array is valid — it records
* that the latest response carried no usable quota header.
*/
export function updateProviderBuckets(
providerId: string,
buckets: ProviderUsageBucket[],
): void {
current = {
...current,
providerId,
buckets,
}
emit()
}
export function setProviderBalance(
providerId: string,
balance: ProviderBalance | null,
): void {
current = {
...current,
providerId,
...(balance === null ? { balance: undefined } : { balance }),
}
emit()
}
export function subscribeProviderUsage(listener: Listener): () => void {
listeners.add(listener)
return () => {
listeners.delete(listener)
}
}
export function resetProviderUsage(): void {
current = { providerId: 'unknown', buckets: [] }
emit()
}
function emit(): void {
for (const listener of listeners) {
try {
listener(current)
} catch {
// Listener errors must not break the publish loop.
}
}
}

View File

@@ -0,0 +1,40 @@
/**
* Unified provider usage model.
*
* Each API client (Anthropic, OpenAI, Bedrock, ...) parses its own response
* headers through a `ProviderUsageAdapter` and pushes buckets into the store.
* A balance poller may additionally populate `ProviderBalance`.
*/
export type BucketKind =
| 'session' // Anthropic 5-hour window
| 'weekly' // Anthropic 7-day window
| 'requests' // OpenAI-style RPM bucket
| 'tokens' // OpenAI-style TPM bucket
| 'throttle' // Bedrock / generic throttle
| 'custom'
export interface ProviderUsageBucket {
kind: BucketKind
label: string
utilization: number
resetsAt?: number
}
export interface ProviderBalance {
currency: string
remaining: number
total?: number
updatedAt?: number
}
export interface ProviderUsage {
providerId: string
buckets: ProviderUsageBucket[]
balance?: ProviderBalance
}
export interface ProviderUsageAdapter {
providerId: string
parseHeaders(headers: globalThis.Headers): ProviderUsageBucket[]
}