feat: 添加 Bedrock API 客户端及 API 层增强

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
unraid
2026-04-22 22:38:09 +08:00
parent 59f8675fa3
commit be97a0b010
15 changed files with 1362 additions and 197 deletions

View File

@@ -0,0 +1,139 @@
/**
* Tests for the Bedrock anthropic_beta body-vs-header workaround
* (see src/services/api/bedrockClient.ts and anthropics/claude-code#49238).
*/
import { describe, expect, test } from 'bun:test'
import { AnthropicBedrock } from '@anthropic-ai/bedrock-sdk'
import { BedrockClient } from '../bedrockClient.js'
type Captured = {
url: string
method: string
headers: Record<string, string>
body: string
}
function makeCaptureFetch(): {
fetch: typeof fetch
get(): Captured | null
} {
let captured: Captured | null = null
const capture = async (
input: URL | RequestInfo,
init?: RequestInit,
): Promise<Response> => {
const req = new Request(input as RequestInfo, init)
const body = await req.clone().text()
const headers: Record<string, string> = {}
req.headers.forEach((v, k) => {
headers[k.toLowerCase()] = v
})
captured = { url: req.url, method: req.method, headers, body }
const streamBody =
'event: message_start\ndata: {"type":"message_start","message":{"id":"m","type":"message","role":"assistant","content":[],"model":"x","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0}}}\n\nevent: message_stop\ndata: {"type":"message_stop"}\n\n'
return new Response(streamBody, {
status: 200,
headers: { 'content-type': 'text/event-stream' },
})
}
// SDK only calls the fetch function form, never the static `preconnect` that
// Bun/Node's `typeof fetch` declares. Cast is safe (mirrors openai/client.ts).
return { fetch: capture as unknown as typeof fetch, get: () => captured }
}
const BEDROCK_ARGS = {
awsRegion: 'us-east-1',
awsAccessKey: 'AKIAIOSFODNN7EXAMPLE',
awsSecretKey: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
}
const REQUEST_PARAMS = {
model: 'anthropic.claude-opus-4-7',
max_tokens: 10,
messages: [{ role: 'user' as const, content: 'hi' }],
betas: ['interleaved-thinking-2025-05-14', 'effort-2025-11-24'],
stream: true as const,
}
async function dispatch(client: AnthropicBedrock): Promise<void> {
try {
const stream = await client.beta.messages.create(REQUEST_PARAMS)
for await (const _ of stream) {
/* drain */
}
} catch {
/* ignore: only the captured request shape matters */
}
}
describe('BedrockClient.buildRequest body.anthropic_beta cleanup', () => {
test('BUG REPRO: unmodified AnthropicBedrock puts anthropic_beta in body', async () => {
const { fetch: captureFetch, get } = makeCaptureFetch()
const client = new AnthropicBedrock({
...BEDROCK_ARGS,
fetch: captureFetch,
})
await dispatch(client)
const c = get()
expect(c).not.toBeNull()
const body = JSON.parse(c!.body) as Record<string, unknown>
expect('anthropic_beta' in body).toBe(true)
expect(body.anthropic_beta).toEqual([
'interleaved-thinking-2025-05-14',
'effort-2025-11-24',
])
})
test('FIX: BedrockClient strips anthropic_beta from body', async () => {
const { fetch: captureFetch, get } = makeCaptureFetch()
const client = new BedrockClient({ ...BEDROCK_ARGS, fetch: captureFetch })
await dispatch(client)
const c = get()
expect(c).not.toBeNull()
const body = JSON.parse(c!.body) as Record<string, unknown>
expect('anthropic_beta' in body).toBe(false)
})
test('FIX preserves anthropic-beta HTTP header with the original csv value', async () => {
const { fetch: captureFetch, get } = makeCaptureFetch()
const client = new BedrockClient({ ...BEDROCK_ARGS, fetch: captureFetch })
await dispatch(client)
const c = get()
expect(c).not.toBeNull()
expect(c!.headers['anthropic-beta']).toBe(
'interleaved-thinking-2025-05-14,effort-2025-11-24',
)
})
test('FIX keeps a valid AWS SigV4 authorization header (signing happens after cleanup)', async () => {
const { fetch: captureFetch, get } = makeCaptureFetch()
const client = new BedrockClient({ ...BEDROCK_ARGS, fetch: captureFetch })
await dispatch(client)
const c = get()
expect(c).not.toBeNull()
expect(c!.headers.authorization).toBeDefined()
expect(c!.headers.authorization.startsWith('AWS4-HMAC-SHA256')).toBe(true)
})
test('FIX does not disturb requests that never had anthropic_beta', async () => {
const { fetch: captureFetch, get } = makeCaptureFetch()
const client = new BedrockClient({ ...BEDROCK_ARGS, fetch: captureFetch })
try {
const stream = await client.beta.messages.create({
model: 'anthropic.claude-opus-4-7',
max_tokens: 10,
messages: [{ role: 'user', content: 'hi' }],
stream: true,
})
for await (const _ of stream) {
/* drain */
}
} catch {
/* ignore */
}
const c = get()
expect(c).not.toBeNull()
const body = JSON.parse(c!.body) as Record<string, unknown>
expect('anthropic_beta' in body).toBe(false)
expect(c!.headers['anthropic-beta']).toBeUndefined()
})
})

View File

@@ -0,0 +1,302 @@
/**
* Beta header 安全性测试
*
* 验证:
* 1. 空字符串 beta header 不会泄漏到 API 请求中
* 2. getExtraBodyParams 正确合并 beta headers
* 3. 常量层可能产生空值的 beta header 被妥善处理
* 4. SDK 的 betas.toString() 行为与预期一致
*/
import { describe, expect, test } from 'bun:test'
// ── Part 1: SDK 层面的 toString 行为验证 ─────────────────────────
describe('SDK betas.toString() behavior', () => {
test('empty string in array produces invalid header value', () => {
// 这就是导致 400 的根因SDK 对 betas 调用 toString()
const betas = [
'claude-code-20250219',
'',
'interleaved-thinking-2025-05-14',
]
const headerValue = betas.toString()
// 产生 "claude-code-20250219,,interleaved-thinking-2025-05-14"
// 逗号之间的空值就是 API 拒绝的 ``
expect(headerValue).toContain(',,')
expect(headerValue).toBe(
'claude-code-20250219,,interleaved-thinking-2025-05-14',
)
})
test('filter(Boolean) removes empty strings', () => {
const betas = [
'claude-code-20250219',
'',
'interleaved-thinking-2025-05-14',
]
const filtered = betas.filter(Boolean)
const headerValue = filtered.toString()
expect(filtered).not.toContain('')
expect(headerValue).not.toContain(',,')
expect(headerValue).toBe(
'claude-code-20250219,interleaved-thinking-2025-05-14',
)
})
test('filter(Boolean) handles multiple empty strings', () => {
const betas = ['', 'a', '', '', 'b', '']
const filtered = betas.filter(Boolean)
expect(filtered).toEqual(['a', 'b'])
expect(filtered.toString()).toBe('a,b')
})
test('filter(Boolean) on clean array is no-op', () => {
const betas = ['claude-code-20250219', 'interleaved-thinking-2025-05-14']
const filtered = betas.filter(Boolean)
expect(filtered).toEqual(betas)
})
test('empty array after filter produces no header', () => {
const betas = ['', '']
const filtered = betas.filter(Boolean)
expect(filtered).toEqual([])
expect(filtered.length > 0).toBe(false)
// useBetas would be false, header not sent at all
})
})
// ── Part 2: 常量层空值检测 ───────────────────────────────────────
describe('beta header constants safety', () => {
test('known potentially-empty constants are identified', () => {
// 这些常量在特定条件下可能是空字符串
// 测试的目的是确认我们知道哪些是空的,以便防御
// CACHE_EDITING_BETA_HEADER — 上游未公开,永远为空
// 动态 import 以避免 bun:bundle 依赖
// 这里我们直接测试值
const CACHE_EDITING_VALUE = '' // 对应 constants/betas.ts:50
expect(CACHE_EDITING_VALUE).toBe('')
expect(Boolean(CACHE_EDITING_VALUE)).toBe(false)
// CLI_INTERNAL_BETA_HEADER — USER_TYPE !== 'ant' 时为空
// 在测试环境中 USER_TYPE 通常不是 'ant'
const CLI_INTERNAL_VALUE =
process.env.USER_TYPE === 'ant' ? 'cli-internal-2026-02-09' : ''
if (process.env.USER_TYPE !== 'ant') {
expect(CLI_INTERNAL_VALUE).toBe('')
}
})
test('truthy check correctly gates empty beta headers', () => {
const emptyHeader = ''
const validHeader = 'some-beta-2025-01-01'
// 模拟 claude.ts 中的 truthy 检查
const betasParams: string[] = []
// 空 header — 不应被 push
if (emptyHeader) {
betasParams.push(emptyHeader)
}
expect(betasParams).toEqual([])
// 有效 header — 应被 push
if (validHeader) {
betasParams.push(validHeader)
}
expect(betasParams).toEqual(['some-beta-2025-01-01'])
})
})
// ── Part 3: getExtraBodyParams beta 合并逻辑 ─────────────────────
describe('getExtraBodyParams beta merge', () => {
// getExtraBodyParams 从 CLAUDE_CODE_EXTRA_BODY 解析 JSON 并合并 betaHeaders
// 我们在这里验证合并逻辑的边界情况
test('empty beta headers array should not add anthropic_beta', () => {
const result: Record<string, unknown> = {}
const betaHeaders: string[] = []
// 模拟 getExtraBodyParams 中的合并逻辑
if (betaHeaders && betaHeaders.length > 0) {
result.anthropic_beta = betaHeaders
}
expect(result.anthropic_beta).toBeUndefined()
})
test('beta headers with empty strings should be filtered', () => {
const betaHeaders = ['valid-header', '', 'another-valid']
// 修复后的逻辑应该在合并前过滤
const clean = betaHeaders.filter(Boolean)
expect(clean).toEqual(['valid-header', 'another-valid'])
})
test('merging avoids duplicates', () => {
const existing = ['header-a', 'header-b']
const incoming = ['header-b', 'header-c']
const merged = [...existing, ...incoming.filter(h => !existing.includes(h))]
expect(merged).toEqual(['header-a', 'header-b', 'header-c'])
})
})
// ── Part 4: ANTHROPIC_BETAS 环境变量解析 ─────────────────────────
describe('ANTHROPIC_BETAS env var parsing', () => {
test('empty string env var produces no betas', () => {
const envVal: string = ''
const result = envVal
? envVal
.split(',')
.map((s: string) => s.trim())
.filter(Boolean)
: []
expect(result).toEqual([])
})
test('trailing comma does not produce empty entry', () => {
const envVal = 'beta-a,beta-b,'
const result = envVal
.split(',')
.map(s => s.trim())
.filter(Boolean)
expect(result).toEqual(['beta-a', 'beta-b'])
})
test('whitespace-only entries are filtered', () => {
const envVal = 'beta-a, , beta-b, '
const result = envVal
.split(',')
.map(s => s.trim())
.filter(Boolean)
expect(result).toEqual(['beta-a', 'beta-b'])
})
test('single comma produces no betas', () => {
const envVal = ','
const result = envVal
.split(',')
.map(s => s.trim())
.filter(Boolean)
expect(result).toEqual([])
})
})
// ── Part 5: 完整请求参数模拟 ─────────────────────────────────────
describe('request params beta assembly (simulated)', () => {
test('simulates the full beta assembly pipeline with empty constants', () => {
// 模拟 claude.ts 中 paramsFromContext 的 beta 组装流程
const CLAUDE_CODE_HEADER = 'claude-code-20250219'
const INTERLEAVED_HEADER = 'interleaved-thinking-2025-05-14'
const CONTEXT_1M_HEADER = 'context-1m-2025-08-07'
const CACHE_EDITING_HEADER = '' // 空!
const AFK_MODE_HEADER = '' // 也是空!
// Step 1: 基础 betas来自 getAllModelBetas
const baseBetas = [
CLAUDE_CODE_HEADER,
INTERLEAVED_HEADER,
CONTEXT_1M_HEADER,
]
// Step 2: paramsFromContext 中的动态添加
const betasParams = [...baseBetas]
// 模拟 cache editing latch 触发但 header 为空
const cacheEditingHeaderLatched = true
if (
cacheEditingHeaderLatched &&
CACHE_EDITING_HEADER && // ← 修复truthy 检查
!betasParams.includes(CACHE_EDITING_HEADER)
) {
betasParams.push(CACHE_EDITING_HEADER)
}
// 模拟 AFK mode latch 触发但 header 为空
const afkHeaderLatched = true
// feature('TRANSCRIPT_CLASSIFIER') 为 false 时,整个 if block 不进入
// 但假设进入了header 也是空的
if (
afkHeaderLatched &&
AFK_MODE_HEADER && // 空字符串,不会进入
!betasParams.includes(AFK_MODE_HEADER)
) {
betasParams.push(AFK_MODE_HEADER)
}
// Step 3: 最终过滤(我们的防御层)
const filteredBetas = betasParams.filter(Boolean)
// 验证:没有空字符串泄漏
expect(filteredBetas).not.toContain('')
expect(filteredBetas).toEqual([
CLAUDE_CODE_HEADER,
INTERLEAVED_HEADER,
CONTEXT_1M_HEADER,
])
// 验证toString() 不会产生 ,,
expect(filteredBetas.toString()).not.toContain(',,')
})
test('simulates the bug scenario WITHOUT fix', () => {
// 重现修复前的行为,验证 bug 确实存在
const CACHE_EDITING_HEADER = '' // 空值
const betasParams = [
'claude-code-20250219',
'interleaved-thinking-2025-05-14',
]
// 修复前:没有 truthy 检查,空字符串被 push
const cacheEditingHeaderLatched = true
if (
cacheEditingHeaderLatched &&
// 注意:没有 CACHE_EDITING_HEADER && 检查
!betasParams.includes(CACHE_EDITING_HEADER) // '' 不在数组中 → true
) {
betasParams.push(CACHE_EDITING_HEADER) // push 了空字符串!
}
// 证明 bug数组包含空字符串
expect(betasParams).toContain('')
// SDK toString() 会产生尾部逗号(空字符串在末尾)或 ,,(在中间)
// 两者都是 API 不接受的无效 header 值
const headerStr = betasParams.toString()
// 空字符串在末尾 → 尾部逗号 "a,b,"
// 空字符串在中间 → 连续逗号 "a,,b"
expect(headerStr.endsWith(',') || headerStr.includes(',,')).toBe(true)
})
test('useBetas flag correctly handles empty-after-filter', () => {
// 如果所有 betas 都是空字符串,过滤后应该不发送 betas 参数
const betasParams = ['', '']
const filteredBetas = betasParams.filter(Boolean)
const useBetas = filteredBetas.length > 0
expect(useBetas).toBe(false)
// API 请求不应包含 betas 字段
const requestParams = {
model: 'claude-opus-4-6',
max_tokens: 1024,
messages: [],
...(useBetas && { betas: filteredBetas }),
}
expect(requestParams).not.toHaveProperty('betas')
})
})

View File

@@ -0,0 +1,65 @@
import { AnthropicBedrock } from '@anthropic-ai/bedrock-sdk'
/**
* Extends AnthropicBedrock to work around an upstream bug where the SDK
* re-plants the `anthropic-beta` HTTP header value into the request body
* as `anthropic_beta`. Bedrock's Opus 4.7 endpoint rejects any request with
* `anthropic_beta` in the body with a 400 "invalid beta flag" error.
*
* Source of the bug (SDK 0.26.4, still present through 0.28.1):
* node_modules/@anthropic-ai/bedrock-sdk/client.js lines 122-127
* (TS source: packages/bedrock-sdk/src/client.ts lines 193-198)
*
* Related upstream issue: anthropics/claude-code#49238 (opened 2026-04-16).
*
* Fix strategy: let super.buildRequest do its work, then strip
* `body.anthropic_beta` from the resulting Request before the SDK computes
* the AWS SigV4 signature (signing happens downstream of buildRequest, so
* the signature hashes the cleaned body — no 403 risk). The `anthropic-beta`
* HTTP header remains intact (base SDK placed it there from the `betas:`
* parameter), so beta flags still reach the API the way Bedrock accepts them.
*
* When upstream ships a fix, verify the probe in scripts/probe-bedrock-beta-fix.ts
* shows "bug reproduced: false", then delete this class and change
* `services/api/client.ts` to instantiate `AnthropicBedrock` directly.
*/
type BuildRequestArg = Parameters<AnthropicBedrock['buildRequest']>[0]
type BuildRequestRet = Awaited<ReturnType<AnthropicBedrock['buildRequest']>>
export class BedrockClient extends AnthropicBedrock {
async buildRequest(options: BuildRequestArg): Promise<BuildRequestRet> {
const req = await super.buildRequest(options)
const inner = (
req as unknown as { req?: { body?: unknown; headers?: unknown } }
)?.req
if (!inner || typeof inner.body !== 'string' || inner.body.length === 0) {
return req
}
let parsed: Record<string, unknown>
try {
parsed = JSON.parse(inner.body) as Record<string, unknown>
} catch {
return req
}
if (!('anthropic_beta' in parsed)) {
return req
}
delete parsed.anthropic_beta
const cleanedBody = JSON.stringify(parsed)
inner.body = cleanedBody
const byteLen = String(new TextEncoder().encode(cleanedBody).length)
const h = inner.headers
if (typeof Headers !== 'undefined' && h instanceof Headers) {
if (h.has('content-length')) h.set('content-length', byteLen)
} else if (h && typeof h === 'object') {
const asDict = h as Record<string, string>
if ('content-length' in asDict) asDict['content-length'] = byteLen
}
return req
}
}

View File

@@ -101,6 +101,8 @@ import {
extractQuotaStatusFromHeaders,
} from '../claudeAiLimits.js'
import { getAPIContextManagement } from '../compact/apiMicrocompact.js'
import { bedrockAdapter } from '../providerUsage/adapters/bedrock.js'
import { updateProviderBuckets } from '../providerUsage/store.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const autoModeStateModule = feature('TRANSCRIPT_CLASSIFIER')
@@ -541,13 +543,12 @@ export async function verifyApiKey(
}),
async anthropic => {
const messages: MessageParam[] = [{ role: 'user', content: 'test' }]
// biome-ignore lint/plugin: API key verification is intentionally a minimal direct call
await anthropic.beta.messages.create({
model,
max_tokens: 1,
messages,
temperature: 1,
...(betas.length > 0 && { betas }),
...(betas.length > 0 && { betas: betas.filter(Boolean) }),
metadata: getAPIMetadata(),
...getExtraBodyParams(),
})
@@ -878,7 +879,6 @@ export async function* executeNonStreamingRequest(
)
try {
// biome-ignore lint/plugin: non-streaming API call
return await anthropic.beta.messages.create(
{
...adjustedParams,
@@ -1215,10 +1215,15 @@ async function* queryModel(
cacheEditingBetaHeader = betas.CACHE_EDITING_BETA_HEADER
const featureEnabled = isCachedMicrocompactEnabled()
const modelSupported = isModelSupportedForCacheEditing(options.model)
cachedMCEnabled = featureEnabled && modelSupported
// cachedMC requires a non-empty beta header; the CACHE_EDITING_BETA_HEADER
// constant is '' in this fork (upstream hasn't published the real value).
// Without it, cache_reference and cache_edits in the request body cause
// API 400: "tool_result.cache_reference: Extra inputs are not permitted".
const headerAvailable = !!cacheEditingBetaHeader
cachedMCEnabled = featureEnabled && modelSupported && headerAvailable
const config = getCachedMCConfig()
logForDebugging(
`Cached MC gate: enabled=${featureEnabled} modelSupported=${modelSupported} model=${options.model} supportedModels=${jsonStringify((config as any).supportedModels)}`,
`Cached MC gate: enabled=${featureEnabled} modelSupported=${modelSupported} headerAvailable=${headerAvailable} model=${options.model} supportedModels=${jsonStringify((config as Record<string, unknown>).supportedModels)}`,
)
}
@@ -1724,6 +1729,7 @@ async function* queryModel(
options.querySource === 'repl_main_thread'
if (
cacheEditingHeaderLatched &&
cacheEditingBetaHeader &&
getAPIProvider() === 'firstParty' &&
options.querySource === 'repl_main_thread' &&
!betasParams.includes(cacheEditingBetaHeader)
@@ -1740,7 +1746,12 @@ async function* queryModel(
? (options.temperatureOverride ?? 1)
: undefined
lastRequestBetas = betasParams
// Filter out any empty-string beta headers before sending.
// Constants like CACHE_EDITING_BETA_HEADER or AFK_MODE_BETA_HEADER
// can be '' when their feature gate is off; an empty string in the
// betas array produces an invalid anthropic-beta header (400 error).
const filteredBetas = betasParams.filter(Boolean)
lastRequestBetas = filteredBetas
return {
model: normalizeModelStringForAPI(options.model),
@@ -1756,7 +1767,7 @@ async function* queryModel(
system,
tools: allTools,
tool_choice: options.toolChoice,
...(useBetas && { betas: betasParams }),
...(useBetas && { betas: filteredBetas }),
metadata: getAPIMetadata(),
max_tokens: maxOutputTokens,
thinking,
@@ -1864,7 +1875,6 @@ async function* queryModel(
// Use raw stream instead of BetaMessageStream to avoid O(n²) partial JSON parsing
// BetaMessageStream calls partialParse() on every input_json_delta, which we don't need
// since we handle tool input accumulation ourselves
// biome-ignore lint/plugin: main conversation loop handles attribution separately
const result = await anthropic.beta.messages
.create(
{ ...params, stream: true },
@@ -2445,6 +2455,16 @@ async function* queryModel(
const resp = streamResponse as unknown as Response | undefined
if (resp) {
extractQuotaStatusFromHeaders(resp.headers)
// Non-Anthropic providers that flow through this same client path
// (Bedrock) expose their own throttle headers — let their adapter
// overwrite the store with its bucket(s). Anthropic's adapter runs
// inside extractQuotaStatusFromHeaders.
if (getAPIProvider() === 'bedrock') {
updateProviderBuckets(
'bedrock',
bedrockAdapter.parseHeaders(resp.headers),
)
}
// Store headers for gateway detection
responseHeaders = resp.headers
}
@@ -3229,6 +3249,7 @@ export function addCacheBreakpoints(
// Add cache_reference to tool_result blocks that are within the cached prefix.
// Must be done AFTER cache_edits insertion since that modifies content arrays.
// Note: this code only runs when useCachedMC=true (early return at line ~3202).
if (enablePromptCaching) {
// Find the last message containing a cache_control marker
let lastCCMsg = -1

View File

@@ -73,14 +73,10 @@ import {
function createStderrLogger(): ClientOptions['logger'] {
return {
error: (msg, ...args) =>
// biome-ignore lint/suspicious/noConsole:: intentional console output -- SDK logger must use console
console.error('[Anthropic SDK ERROR]', msg, ...args),
// biome-ignore lint/suspicious/noConsole:: intentional console output -- SDK logger must use console
warn: (msg, ...args) => console.error('[Anthropic SDK WARN]', msg, ...args),
// biome-ignore lint/suspicious/noConsole:: intentional console output -- SDK logger must use console
info: (msg, ...args) => console.error('[Anthropic SDK INFO]', msg, ...args),
debug: (msg, ...args) =>
// biome-ignore lint/suspicious/noConsole:: intentional console output -- SDK logger must use console
console.error('[Anthropic SDK DEBUG]', msg, ...args),
}
}
@@ -151,7 +147,7 @@ export async function getAnthropicClient({
}),
}
if (isEnvTruthy(process.env.CLAUDE_CODE_USE_BEDROCK)) {
const { AnthropicBedrock } = await import('@anthropic-ai/bedrock-sdk')
const { BedrockClient } = await import('./bedrockClient.js')
// Use region override for small fast model if specified
const awsRegion =
model === getSmallFastModel() &&
@@ -186,7 +182,7 @@ export async function getAnthropicClient({
}
}
// we have always been lying about the return type - this doesn't support batching or models
return new AnthropicBedrock(bedrockArgs) as unknown as Anthropic
return new BedrockClient(bedrockArgs) as unknown as Anthropic
}
if (isEnvTruthy(process.env.CLAUDE_CODE_USE_FOUNDRY)) {
const { AnthropicFoundry } = await import('@anthropic-ai/foundry-sdk')

View File

@@ -944,6 +944,9 @@ function get3PModelFallbackSuggestion(model: string): string | undefined {
// @[MODEL LAUNCH]: Add a fallback suggestion chain for the new model → previous version for 3P
const m = model.toLowerCase()
// If the failing model looks like an Opus 4.6 variant, suggest the default Opus (4.1 for 3P)
if (m.includes('opus-4-7') || m.includes('opus_4_7')) {
return getModelStrings().opus46
}
if (m.includes('opus-4-6') || m.includes('opus_4_6')) {
return getModelStrings().opus41
}

View File

@@ -377,7 +377,7 @@ export function logAPIError({
// Pass the span to correctly match responses to requests when beta tracing is enabled
endLLMRequestSpan(llmSpan, {
success: false,
statusCode: status ? parseInt(status) : undefined,
statusCode: status ? parseInt(status, 10) : undefined,
error: errStr,
attempt,
})

View File

@@ -0,0 +1,545 @@
/**
* Tests for queryModelOpenAI in index.ts.
*
* Focused on the two bugs fixed:
* 1. stop_reason was always null in the assembled AssistantMessage because
* partialMessage (from message_start) has stop_reason: null, and the
* stop_reason captured from message_delta was never applied.
* 2. partialMessage was not reset to null after message_stop, so the safety
* fallback at the end of the loop would yield a second identical
* AssistantMessage (causing doubled content in the next API request).
*
* Strategy: mock getOpenAIClient + adaptOpenAIStreamToAnthropic so we can
* feed pre-built Anthropic events directly into queryModelOpenAI and inspect
* what it emits — without any real HTTP calls.
*/
import { describe, expect, test, mock, beforeEach, afterEach } from 'bun:test'
import type { BetaRawMessageStreamEvent } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
import type {
AssistantMessage,
StreamEvent,
} from '../../../../types/message.js'
// ─── helpers ─────────────────────────────────────────────────────────────────
/** Build a minimal message_start event */
function makeMessageStart(
overrides: Record<string, any> = {},
): BetaRawMessageStreamEvent {
return {
type: 'message_start',
message: {
id: 'msg_test',
type: 'message',
role: 'assistant',
content: [],
model: 'test-model',
stop_reason: null,
stop_sequence: null,
usage: {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
},
...overrides,
},
} as any
}
/** Build a content_block_start event for the given block type */
function makeContentBlockStart(
index: number,
type: 'text' | 'tool_use' | 'thinking',
extra: Record<string, any> = {},
): BetaRawMessageStreamEvent {
const block =
type === 'text'
? { type: 'text', text: '' }
: type === 'tool_use'
? { type: 'tool_use', id: 'toolu_test', name: 'bash', input: {} }
: { type: 'thinking', thinking: '', signature: '' }
return {
type: 'content_block_start',
index,
content_block: { ...block, ...extra },
} as any
}
/** Build a text_delta content_block_delta event */
function makeTextDelta(index: number, text: string): BetaRawMessageStreamEvent {
return {
type: 'content_block_delta',
index,
delta: { type: 'text_delta', text },
} as any
}
/** Build an input_json_delta content_block_delta event */
function makeInputJsonDelta(
index: number,
json: string,
): BetaRawMessageStreamEvent {
return {
type: 'content_block_delta',
index,
delta: { type: 'input_json_delta', partial_json: json },
} as any
}
/** Build a thinking_delta content_block_delta event */
function makeThinkingDelta(
index: number,
thinking: string,
): BetaRawMessageStreamEvent {
return {
type: 'content_block_delta',
index,
delta: { type: 'thinking_delta', thinking },
} as any
}
/** Build a content_block_stop event */
function makeContentBlockStop(index: number): BetaRawMessageStreamEvent {
return { type: 'content_block_stop', index } as any
}
/** Build a message_delta event with stop_reason and output_tokens */
function makeMessageDelta(
stopReason: string,
outputTokens: number,
): BetaRawMessageStreamEvent {
return {
type: 'message_delta',
delta: { stop_reason: stopReason, stop_sequence: null },
usage: { output_tokens: outputTokens },
} as any
}
/** Build a message_stop event */
function makeMessageStop(): BetaRawMessageStreamEvent {
return { type: 'message_stop' } as any
}
/** Async generator from a fixed array of events */
async function* eventStream(events: BetaRawMessageStreamEvent[]) {
for (const e of events) yield e
}
/** Collect all outputs from queryModelOpenAI into typed buckets */
async function runQueryModel(
events: BetaRawMessageStreamEvent[],
envOverrides: Record<string, string | undefined> = {},
) {
// Wire events into the mocked stream adapter
_nextEvents = events
// Save + apply env overrides
const saved: Record<string, string | undefined> = {}
for (const [k, v] of Object.entries(envOverrides)) {
saved[k] = process.env[k]
if (v === undefined) delete process.env[k]
else process.env[k] = v
}
try {
// We inline mock.module inside the try block.
// Bun resolves mock.module at the call site synchronously (hoisted),
// so we register once per test file, then re-import each time.
const { queryModelOpenAI } = await import('../index.js')
const assistantMessages: AssistantMessage[] = []
const streamEvents: StreamEvent[] = []
const otherOutputs: any[] = []
const minimalOptions: any = {
model: 'test-model',
tools: [],
agents: [],
querySource: 'main_loop',
getToolPermissionContext: async () => ({
alwaysAllow: [],
alwaysDeny: [],
needsPermission: [],
mode: 'default',
isBypassingPermissions: false,
}),
}
for await (const item of queryModelOpenAI(
[],
{ type: 'text', text: '' } as any,
[],
new AbortController().signal,
minimalOptions,
)) {
if (item.type === 'assistant') {
assistantMessages.push(item as AssistantMessage)
} else if (item.type === 'stream_event') {
streamEvents.push(item as StreamEvent)
} else {
otherOutputs.push(item)
}
}
return { assistantMessages, streamEvents, otherOutputs }
} finally {
// Restore env
for (const [k, v] of Object.entries(saved)) {
if (v === undefined) delete process.env[k]
else process.env[k] = v
}
}
}
// ─── mock setup ──────────────────────────────────────────────────────────────
// We mock at module level. Bun's mock.module replaces the module for the
// entire file, so we configure the stream per-test via a shared variable.
let _nextEvents: BetaRawMessageStreamEvent[] = []
/** Captured arguments from the last chat.completions.create() call */
let _lastCreateArgs: Record<string, any> | null = null
mock.module('../client.js', () => ({
getOpenAIClient: () => ({
chat: {
completions: {
create: async (args: Record<string, any>) => {
_lastCreateArgs = args
return { [Symbol.asyncIterator]: async function* () {} }
},
},
},
}),
}))
mock.module('../streamAdapter.js', () => ({
adaptOpenAIStreamToAnthropic: (_stream: any, _model: string) =>
eventStream(_nextEvents),
}))
mock.module('../modelMapping.js', () => ({
resolveOpenAIModel: (m: string) => m,
}))
mock.module('../convertMessages.js', () => ({
anthropicMessagesToOpenAI: () => [],
}))
mock.module('../convertTools.js', () => ({
anthropicToolsToOpenAI: () => [],
anthropicToolChoiceToOpenAI: () => undefined,
}))
mock.module('../../../../utils/context.js', () => ({
MODEL_CONTEXT_WINDOW_DEFAULT: 200_000,
COMPACT_MAX_OUTPUT_TOKENS: 20_000,
CAPPED_DEFAULT_MAX_TOKENS: 8_000,
ESCALATED_MAX_TOKENS: 64_000,
is1mContextDisabled: () => false,
has1mContext: () => false,
modelSupports1M: () => false,
getModelMaxOutputTokens: () => ({ upperLimit: 8192, default: 8192 }),
getContextWindowForModel: () => 200_000,
getSonnet1mExpTreatmentEnabled: () => false,
calculateContextPercentages: () => ({
usedPercent: 0,
remainingPercent: 100,
}),
getMaxThinkingTokensForModel: () => 0,
}))
mock.module('../../../../utils/messages.js', () => ({
normalizeMessagesForAPI: (msgs: any) => msgs,
normalizeContentFromAPI: (blocks: any[]) => blocks,
createAssistantAPIErrorMessage: (opts: any) => ({
type: 'assistant',
message: {
content: [{ type: 'text', text: opts.content }],
apiError: opts.apiError,
},
uuid: 'error-uuid',
timestamp: new Date().toISOString(),
}),
}))
mock.module('../../../../utils/api.js', () => ({
toolToAPISchema: async (t: any) => t,
}))
mock.module('../../../../utils/toolSearch.js', () => ({
isToolSearchEnabled: async () => false,
extractDiscoveredToolNames: () => new Set(),
}))
mock.module('../../../../tools/ToolSearchTool/prompt.js', () => ({
isDeferredTool: () => false,
TOOL_SEARCH_TOOL_NAME: '__tool_search__',
}))
mock.module('../../../../cost-tracker.js', () => ({
addToTotalSessionCost: () => {},
}))
mock.module('../../../../utils/modelCost.js', () => ({
COST_TIER_3_15: {},
COST_TIER_15_75: {},
COST_TIER_5_25: {},
COST_TIER_30_150: {},
COST_HAIKU_35: {},
COST_HAIKU_45: {},
getOpus46CostTier: () => ({}),
MODEL_COSTS: {},
getModelCosts: () => ({}),
calculateUSDCost: () => 0,
calculateCostFromTokens: () => 0,
formatModelPricing: () => '',
getModelPricingString: () => undefined,
}))
mock.module('../../../../utils/debug.js', () => ({
logForDebugging: () => {},
logAntError: () => {},
isDebugMode: () => false,
isDebugToStdErr: () => false,
getDebugFilePath: () => null,
getDebugLogPath: () => '',
getDebugFilter: () => null,
getMinDebugLogLevel: () => 'debug',
enableDebugLogging: () => false,
setHasFormattedOutput: () => {},
getHasFormattedOutput: () => false,
flushDebugLogs: async () => {},
}))
// ─── tests ───────────────────────────────────────────────────────────────────
describe('queryModelOpenAI — stop_reason propagation', () => {
test('assembled AssistantMessage has stop_reason end_turn (not null)', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'Hello'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 10),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0]!.message.stop_reason).toBe('end_turn')
})
test('assembled AssistantMessage has stop_reason tool_use', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'tool_use'),
makeInputJsonDelta(0, '{"cmd":"ls"}'),
makeContentBlockStop(0),
makeMessageDelta('tool_use', 20),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0]!.message.stop_reason).toBe('tool_use')
})
test('assembled AssistantMessage has stop_reason max_tokens', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'truncated'),
makeContentBlockStop(0),
makeMessageDelta('max_tokens', 8192),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
// Two assistant-typed items: the content message + the max_output_tokens error signal.
// The error signal is emitted as a synthetic assistant message by createAssistantAPIErrorMessage.
expect(assistantMessages).toHaveLength(2)
const contentMsg = assistantMessages[0]!
expect(contentMsg.message.stop_reason).toBe('max_tokens')
// Second item is the error signal (has apiError set)
const errorMsg = assistantMessages[1]!.message as any
expect(errorMsg.apiError).toBe('max_output_tokens')
})
test('stop_reason is null when no message_delta was received (safety fallback path)', async () => {
// Stream ends without message_stop — triggers the safety fallback branch.
// stop_reason stays null since no message_delta was ever seen.
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'partial'),
makeContentBlockStop(0),
// No message_delta / message_stop
]
const { assistantMessages } = await runQueryModel(_nextEvents)
// Safety fallback should yield the partial content
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0]!.message.stop_reason).toBeNull()
})
})
describe('queryModelOpenAI — usage accumulation', () => {
test('usage in assembled message reflects all four fields from message_delta', async () => {
// message_start has all fields=0 (trailing-chunk pattern: usage not yet available).
// message_delta carries the real values after stream ends.
// The spread in the message_delta handler must override all zeros from message_start,
// including cache_read_input_tokens which was previously missing from message_delta.
_nextEvents = [
makeMessageStart({
usage: {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
},
}),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'response'),
makeContentBlockStop(0),
// message_delta carries all four Anthropic usage fields (as emitted by the fixed streamAdapter)
{
type: 'message_delta',
delta: { stop_reason: 'end_turn', stop_sequence: null },
usage: {
input_tokens: 30011,
output_tokens: 190,
cache_read_input_tokens: 19904,
cache_creation_input_tokens: 0,
},
} as any,
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
const usage = assistantMessages[0]!.message.usage as any
expect(usage.input_tokens).toBe(30011)
expect(usage.output_tokens).toBe(190)
// cache_read_input_tokens from message_delta overrides the 0 from message_start
expect(usage.cache_read_input_tokens).toBe(19904)
expect(usage.cache_creation_input_tokens).toBe(0)
})
test('usage is zero when no usage events arrive (prevents false autocompact)', async () => {
// If usage stays 0, tokenCountWithEstimation will undercount — so at least
// verify the field exists and is numeric (to detect regressions).
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'hi'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 0),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
const usage = assistantMessages[0]!.message.usage as any
expect(typeof usage.input_tokens).toBe('number')
expect(typeof usage.output_tokens).toBe('number')
})
})
describe('queryModelOpenAI — no duplicate AssistantMessage (partialMessage reset)', () => {
test('yields exactly one AssistantMessage per message_stop when content is present', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'only once'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 5),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
// Before the fix, partialMessage was not reset to null, so the safety
// fallback at the end of the loop would yield a second message with the
// same message.id — causing mergeAssistantMessages to concatenate content.
expect(assistantMessages).toHaveLength(1)
})
test('thinking + text response yields exactly one AssistantMessage', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'thinking'),
makeThinkingDelta(0, 'let me think'),
makeContentBlockStop(0),
makeContentBlockStart(1, 'text'),
makeTextDelta(1, 'answer'),
makeContentBlockStop(1),
makeMessageDelta('end_turn', 30),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
})
test('safety fallback path still yields message when stream ends without message_stop', async () => {
// Simulates a stream that cuts off without the normal termination sequence.
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'abrupt end'),
// No content_block_stop, no message_delta, no message_stop
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
})
})
describe('queryModelOpenAI — stream_events forwarded', () => {
test('every adapted event is also yielded as stream_event for real-time display', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'hello'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 5),
makeMessageStop(),
]
const { streamEvents } = await runQueryModel(_nextEvents)
const eventTypes = streamEvents.map(e => (e as any).event?.type)
expect(eventTypes).toContain('message_start')
expect(eventTypes).toContain('content_block_start')
expect(eventTypes).toContain('content_block_delta')
expect(eventTypes).toContain('content_block_stop')
expect(eventTypes).toContain('message_delta')
expect(eventTypes).toContain('message_stop')
})
})
describe('queryModelOpenAI — max_tokens forwarded to request', () => {
test('buildOpenAIRequestBody includes max_tokens in the request payload', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'hi'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 5),
makeMessageStop(),
]
await runQueryModel(_nextEvents)
expect(_lastCreateArgs).not.toBeNull()
expect(_lastCreateArgs!.max_tokens).toBe(8192)
})
})

View File

@@ -1,4 +1,6 @@
import OpenAI from 'openai'
import { openaiAdapter } from 'src/services/providerUsage/adapters/openai.js'
import { updateProviderBuckets } from 'src/services/providerUsage/store.js'
import { getProxyFetchOptions } from 'src/utils/proxy.js'
import { isEnvTruthy } from 'src/utils/envUtils.js'
@@ -13,6 +15,28 @@ import { isEnvTruthy } from 'src/utils/envUtils.js'
let cachedClient: OpenAI | null = null
/**
* Wrap a fetch so that every response's rate-limit headers are fed into the
* provider usage store. Errors in parsing must never break the request.
*
* The cast to `typeof fetch` is safe: OpenAI SDK only calls the function form,
* not the static `preconnect` method that Bun/Node's `fetch` type declares.
*/
function wrapFetchForUsage(base: typeof fetch): typeof fetch {
const wrapped = async (
...args: Parameters<typeof fetch>
): Promise<Response> => {
const res = await base(...args)
try {
updateProviderBuckets('openai', openaiAdapter.parseHeaders(res.headers))
} catch {
// Ignore — usage tracking must not affect the request path.
}
return res
}
return wrapped as unknown as typeof fetch
}
export function getOpenAIClient(options?: {
maxRetries?: number
fetchOverride?: typeof fetch
@@ -23,6 +47,9 @@ export function getOpenAIClient(options?: {
const apiKey = process.env.OPENAI_API_KEY || ''
const baseURL = process.env.OPENAI_BASE_URL
const baseFetch = options?.fetchOverride ?? (globalThis.fetch as typeof fetch)
const wrappedFetch = wrapFetchForUsage(baseFetch)
const client = new OpenAI({
apiKey,
...(baseURL && { baseURL }),
@@ -32,7 +59,7 @@ export function getOpenAIClient(options?: {
...(process.env.OPENAI_ORG_ID && { organization: process.env.OPENAI_ORG_ID }),
...(process.env.OPENAI_PROJECT_ID && { project: process.env.OPENAI_PROJECT_ID }),
fetchOptions: getProxyFetchOptions({ forAnthropicAPI: false }),
...(options?.fetchOverride && { fetch: options.fetchOverride }),
fetch: wrappedFetch,
})
if (!options?.fetchOverride) {

View File

@@ -1,4 +1,4 @@
// Auto-generated type stub — replace with real implementation
export type EffortValue = 'low' | 'medium' | 'high' | 'max' | number;
export type modelSupportsEffort = (model: string) => boolean;
export type EffortLevel = 'low' | 'medium' | 'high' | 'max';
export type EffortValue = 'low' | 'medium' | 'high' | 'xhigh' | 'max' | number
export type modelSupportsEffort = (model: string) => boolean
export type EffortLevel = 'low' | 'medium' | 'high' | 'xhigh' | 'max'