fix(openai): fix stop_reason null, zero usage fields and max_tokens forwarding

- Fix stop_reason always null in assembled AssistantMessage by applying
  the value captured from message_delta event
- Reset partialMessage to null after message_stop to prevent duplicate
  AssistantMessage emission causing doubled content in next API request
- Forward computed maxTokens into buildOpenAIRequestBody as max_tokens
  so OpenAI-compatible endpoints receive the intended output cap
- Extract assembleFinalAssistantOutputs helper to deduplicate message
  assembly logic between message_stop handler and post-loop fallback
- Fix test helper to use events parameter instead of hidden global
- Add regression test for max_tokens request forwarding

Signed-off-by: guunergooner <tongchao0923@gmail.com>
This commit is contained in:
guunergooner
2026-04-09 18:53:17 +08:00
parent e6affc7053
commit c82f59943c
6 changed files with 865 additions and 61 deletions

View File

@@ -0,0 +1,454 @@
/**
* Tests for queryModelOpenAI in index.ts.
*
* Focused on the two bugs fixed:
* 1. stop_reason was always null in the assembled AssistantMessage because
* partialMessage (from message_start) has stop_reason: null, and the
* stop_reason captured from message_delta was never applied.
* 2. partialMessage was not reset to null after message_stop, so the safety
* fallback at the end of the loop would yield a second identical
* AssistantMessage (causing doubled content in the next API request).
*
* Strategy: mock getOpenAIClient + adaptOpenAIStreamToAnthropic so we can
* feed pre-built Anthropic events directly into queryModelOpenAI and inspect
* what it emits — without any real HTTP calls.
*/
import { describe, expect, test, mock, beforeEach, afterEach } from 'bun:test'
import type { BetaRawMessageStreamEvent } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
import type { AssistantMessage, StreamEvent } from '../../../../types/message.js'
// ─── helpers ─────────────────────────────────────────────────────────────────
/** Build a minimal message_start event */
function makeMessageStart(overrides: Record<string, any> = {}): BetaRawMessageStreamEvent {
return {
type: 'message_start',
message: {
id: 'msg_test',
type: 'message',
role: 'assistant',
content: [],
model: 'test-model',
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
...overrides,
},
} as any
}
/** Build a content_block_start event for the given block type */
function makeContentBlockStart(index: number, type: 'text' | 'tool_use' | 'thinking', extra: Record<string, any> = {}): BetaRawMessageStreamEvent {
const block =
type === 'text'
? { type: 'text', text: '' }
: type === 'tool_use'
? { type: 'tool_use', id: 'toolu_test', name: 'bash', input: {} }
: { type: 'thinking', thinking: '', signature: '' }
return { type: 'content_block_start', index, content_block: { ...block, ...extra } } as any
}
/** Build a text_delta content_block_delta event */
function makeTextDelta(index: number, text: string): BetaRawMessageStreamEvent {
return { type: 'content_block_delta', index, delta: { type: 'text_delta', text } } as any
}
/** Build an input_json_delta content_block_delta event */
function makeInputJsonDelta(index: number, json: string): BetaRawMessageStreamEvent {
return { type: 'content_block_delta', index, delta: { type: 'input_json_delta', partial_json: json } } as any
}
/** Build a thinking_delta content_block_delta event */
function makeThinkingDelta(index: number, thinking: string): BetaRawMessageStreamEvent {
return { type: 'content_block_delta', index, delta: { type: 'thinking_delta', thinking } } as any
}
/** Build a content_block_stop event */
function makeContentBlockStop(index: number): BetaRawMessageStreamEvent {
return { type: 'content_block_stop', index } as any
}
/** Build a message_delta event with stop_reason and output_tokens */
function makeMessageDelta(stopReason: string, outputTokens: number): BetaRawMessageStreamEvent {
return {
type: 'message_delta',
delta: { stop_reason: stopReason, stop_sequence: null },
usage: { output_tokens: outputTokens },
} as any
}
/** Build a message_stop event */
function makeMessageStop(): BetaRawMessageStreamEvent {
return { type: 'message_stop' } as any
}
/** Async generator from a fixed array of events */
async function* eventStream(events: BetaRawMessageStreamEvent[]) {
for (const e of events) yield e
}
/** Collect all outputs from queryModelOpenAI into typed buckets */
async function runQueryModel(
events: BetaRawMessageStreamEvent[],
envOverrides: Record<string, string | undefined> = {},
) {
// Wire events into the mocked stream adapter
_nextEvents = events
// Save + apply env overrides
const saved: Record<string, string | undefined> = {}
for (const [k, v] of Object.entries(envOverrides)) {
saved[k] = process.env[k]
if (v === undefined) delete process.env[k]
else process.env[k] = v
}
try {
// We inline mock.module inside the try block.
// Bun resolves mock.module at the call site synchronously (hoisted),
// so we register once per test file, then re-import each time.
const { queryModelOpenAI } = await import('../index.js')
const assistantMessages: AssistantMessage[] = []
const streamEvents: StreamEvent[] = []
const otherOutputs: any[] = []
const minimalOptions: any = {
model: 'test-model',
tools: [],
agents: [],
querySource: 'main_loop',
getToolPermissionContext: async () => ({
alwaysAllow: [],
alwaysDeny: [],
needsPermission: [],
mode: 'default',
isBypassingPermissions: false,
}),
}
for await (const item of queryModelOpenAI(
[],
{ type: 'text', text: '' } as any,
[],
new AbortController().signal,
minimalOptions,
)) {
if (item.type === 'assistant') {
assistantMessages.push(item as AssistantMessage)
} else if (item.type === 'stream_event') {
streamEvents.push(item as StreamEvent)
} else {
otherOutputs.push(item)
}
}
return { assistantMessages, streamEvents, otherOutputs }
} finally {
// Restore env
for (const [k, v] of Object.entries(saved)) {
if (v === undefined) delete process.env[k]
else process.env[k] = v
}
}
}
// ─── mock setup ──────────────────────────────────────────────────────────────
// We mock at module level. Bun's mock.module replaces the module for the
// entire file, so we configure the stream per-test via a shared variable.
let _nextEvents: BetaRawMessageStreamEvent[] = []
/** Captured arguments from the last chat.completions.create() call */
let _lastCreateArgs: Record<string, any> | null = null
mock.module('../client.js', () => ({
getOpenAIClient: () => ({
chat: {
completions: {
create: async (args: Record<string, any>) => {
_lastCreateArgs = args
return { [Symbol.asyncIterator]: async function* () {} }
},
},
},
}),
}))
mock.module('../streamAdapter.js', () => ({
adaptOpenAIStreamToAnthropic: (_stream: any, _model: string) => eventStream(_nextEvents),
}))
mock.module('../modelMapping.js', () => ({
resolveOpenAIModel: (m: string) => m,
}))
mock.module('../convertMessages.js', () => ({
anthropicMessagesToOpenAI: () => [],
}))
mock.module('../convertTools.js', () => ({
anthropicToolsToOpenAI: () => [],
anthropicToolChoiceToOpenAI: () => undefined,
}))
mock.module('../../../../utils/context.js', () => ({
getModelMaxOutputTokens: () => ({ upperLimit: 8192, default: 8192 }),
getContextWindowForModel: () => 200_000,
}))
mock.module('../../../../utils/messages.js', () => ({
normalizeMessagesForAPI: (msgs: any) => msgs,
normalizeContentFromAPI: (blocks: any[]) => blocks,
createAssistantAPIErrorMessage: (opts: any) => ({
type: 'assistant',
message: { content: [{ type: 'text', text: opts.content }], apiError: opts.apiError },
uuid: 'error-uuid',
timestamp: new Date().toISOString(),
}),
}))
mock.module('../../../../utils/api.js', () => ({
toolToAPISchema: async (t: any) => t,
}))
mock.module('../../../../utils/toolSearch.js', () => ({
isToolSearchEnabled: async () => false,
extractDiscoveredToolNames: () => new Set(),
}))
mock.module('../../../../tools/ToolSearchTool/prompt.js', () => ({
isDeferredTool: () => false,
TOOL_SEARCH_TOOL_NAME: '__tool_search__',
}))
mock.module('../../../../cost-tracker.js', () => ({
addToTotalSessionCost: () => {},
}))
mock.module('../../../../utils/modelCost.js', () => ({
calculateUSDCost: () => 0,
}))
mock.module('../../../../utils/debug.js', () => ({
logForDebugging: () => {},
}))
// ─── tests ───────────────────────────────────────────────────────────────────
describe('queryModelOpenAI — stop_reason propagation', () => {
test('assembled AssistantMessage has stop_reason end_turn (not null)', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'Hello'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 10),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0]!.message.stop_reason).toBe('end_turn')
})
test('assembled AssistantMessage has stop_reason tool_use', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'tool_use'),
makeInputJsonDelta(0, '{"cmd":"ls"}'),
makeContentBlockStop(0),
makeMessageDelta('tool_use', 20),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0]!.message.stop_reason).toBe('tool_use')
})
test('assembled AssistantMessage has stop_reason max_tokens', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'truncated'),
makeContentBlockStop(0),
makeMessageDelta('max_tokens', 8192),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
// Two assistant-typed items: the content message + the max_output_tokens error signal.
// The error signal is emitted as a synthetic assistant message by createAssistantAPIErrorMessage.
expect(assistantMessages).toHaveLength(2)
const contentMsg = assistantMessages[0]!
expect(contentMsg.message.stop_reason).toBe('max_tokens')
// Second item is the error signal (has apiError set)
const errorMsg = assistantMessages[1]!.message as any
expect(errorMsg.apiError).toBe('max_output_tokens')
})
test('stop_reason is null when no message_delta was received (safety fallback path)', async () => {
// Stream ends without message_stop — triggers the safety fallback branch.
// stop_reason stays null since no message_delta was ever seen.
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'partial'),
makeContentBlockStop(0),
// No message_delta / message_stop
]
const { assistantMessages } = await runQueryModel(_nextEvents)
// Safety fallback should yield the partial content
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0]!.message.stop_reason).toBeNull()
})
})
describe('queryModelOpenAI — usage accumulation', () => {
test('usage in assembled message reflects all four fields from message_delta', async () => {
// message_start has all fields=0 (trailing-chunk pattern: usage not yet available).
// message_delta carries the real values after stream ends.
// The spread in the message_delta handler must override all zeros from message_start,
// including cache_read_input_tokens which was previously missing from message_delta.
_nextEvents = [
makeMessageStart({ usage: { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 } }),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'response'),
makeContentBlockStop(0),
// message_delta carries all four Anthropic usage fields (as emitted by the fixed streamAdapter)
{
type: 'message_delta',
delta: { stop_reason: 'end_turn', stop_sequence: null },
usage: { input_tokens: 30011, output_tokens: 190, cache_read_input_tokens: 19904, cache_creation_input_tokens: 0 },
} as any,
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
const usage = assistantMessages[0]!.message.usage as any
expect(usage.input_tokens).toBe(30011)
expect(usage.output_tokens).toBe(190)
// cache_read_input_tokens from message_delta overrides the 0 from message_start
expect(usage.cache_read_input_tokens).toBe(19904)
expect(usage.cache_creation_input_tokens).toBe(0)
})
test('usage is zero when no usage events arrive (prevents false autocompact)', async () => {
// If usage stays 0, tokenCountWithEstimation will undercount — so at least
// verify the field exists and is numeric (to detect regressions).
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'hi'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 0),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
const usage = assistantMessages[0]!.message.usage as any
expect(typeof usage.input_tokens).toBe('number')
expect(typeof usage.output_tokens).toBe('number')
})
})
describe('queryModelOpenAI — no duplicate AssistantMessage (partialMessage reset)', () => {
test('yields exactly one AssistantMessage per message_stop when content is present', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'only once'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 5),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
// Before the fix, partialMessage was not reset to null, so the safety
// fallback at the end of the loop would yield a second message with the
// same message.id — causing mergeAssistantMessages to concatenate content.
expect(assistantMessages).toHaveLength(1)
})
test('thinking + text response yields exactly one AssistantMessage', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'thinking'),
makeThinkingDelta(0, 'let me think'),
makeContentBlockStop(0),
makeContentBlockStart(1, 'text'),
makeTextDelta(1, 'answer'),
makeContentBlockStop(1),
makeMessageDelta('end_turn', 30),
makeMessageStop(),
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
})
test('safety fallback path still yields message when stream ends without message_stop', async () => {
// Simulates a stream that cuts off without the normal termination sequence.
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'abrupt end'),
// No content_block_stop, no message_delta, no message_stop
]
const { assistantMessages } = await runQueryModel(_nextEvents)
expect(assistantMessages).toHaveLength(1)
})
})
describe('queryModelOpenAI — stream_events forwarded', () => {
test('every adapted event is also yielded as stream_event for real-time display', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'hello'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 5),
makeMessageStop(),
]
const { streamEvents } = await runQueryModel(_nextEvents)
const eventTypes = streamEvents.map(e => (e as any).event?.type)
expect(eventTypes).toContain('message_start')
expect(eventTypes).toContain('content_block_start')
expect(eventTypes).toContain('content_block_delta')
expect(eventTypes).toContain('content_block_stop')
expect(eventTypes).toContain('message_delta')
expect(eventTypes).toContain('message_stop')
})
})
describe('queryModelOpenAI — max_tokens forwarded to request', () => {
test('buildOpenAIRequestBody includes max_tokens in the request payload', async () => {
_nextEvents = [
makeMessageStart(),
makeContentBlockStart(0, 'text'),
makeTextDelta(0, 'hi'),
makeContentBlockStop(0),
makeMessageDelta('end_turn', 5),
makeMessageStop(),
]
await runQueryModel(_nextEvents)
expect(_lastCreateArgs).not.toBeNull()
expect(_lastCreateArgs!.max_tokens).toBe(8192)
})
})

View File

@@ -29,6 +29,7 @@ function makeChunk(overrides: Partial<ChatCompletionChunk> & any = {}): ChatComp
} as ChatCompletionChunk
}
/** Collect all emitted Anthropic events from the stream adapter for assertion */
async function collectEvents(chunks: ChatCompletionChunk[]) {
const events: any[] = []
for await (const event of adaptOpenAIStreamToAnthropic(mockStream(chunks), 'gpt-4o')) {
@@ -453,4 +454,206 @@ describe('prompt caching support', () => {
expect(msgStart.message.usage.cache_read_input_tokens).toBe(0)
expect(msgStart.message.usage.input_tokens).toBe(500)
})
test('captures output_tokens and input_tokens from trailing chunk sent after finish_reason', async () => {
// Many OpenAI-compatible endpoints (e.g. DeepSeek) send usage in a separate
// final chunk AFTER the finish_reason chunk, with choices: [].
// message_delta must carry both input_tokens and output_tokens so that
// queryModelOpenAI's spread can override the zeros from message_start — which is
// emitted before the trailing chunk and always has input_tokens=0.
const events = await collectEvents([
makeChunk({
choices: [{ index: 0, delta: { content: 'hello' }, finish_reason: null }],
}),
// finish_reason chunk — usage not yet available
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
}),
// trailing usage-only chunk (choices: [])
makeChunk({
choices: [],
usage: { prompt_tokens: 123, completion_tokens: 45, total_tokens: 168 },
}),
])
// message_start emits on the first chunk before trailing usage arrives
const msgStart = events.find(e => e.type === 'message_start') as any
expect(msgStart.message.usage.input_tokens).toBe(0)
// message_delta is emitted after stream loop ends with final real values
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.usage.input_tokens).toBe(123)
expect(msgDelta.usage.output_tokens).toBe(45)
expect(msgDelta.delta.stop_reason).toBe('end_turn')
})
test('captures input_tokens from trailing chunk (used by tokenCountWithEstimation for autocompact)', async () => {
// input_tokens is the dominant term in tokenCountWithEstimation. Without it,
// getTokenCountFromUsage returns only output_tokens (~100-700), which is far below
// the autocompact threshold (~33k), so compaction never fires.
const events = await collectEvents([
makeChunk({
choices: [{ index: 0, delta: { content: 'answer' }, finish_reason: null }],
}),
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
}),
makeChunk({
choices: [],
usage: { prompt_tokens: 800, completion_tokens: 200, total_tokens: 1000 },
}),
])
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.usage.input_tokens).toBe(800)
expect(msgDelta.usage.output_tokens).toBe(200)
})
test('trailing usage chunk with tool_calls: stop_reason stays tool_use', async () => {
// Verifies that deferring message_delta does not break stop_reason mapping
// when the model made tool calls and usage arrives in a trailing chunk.
const events = await collectEvents([
makeChunk({
choices: [{
index: 0,
delta: {
tool_calls: [{ index: 0, id: 'call_x', function: { name: 'bash', arguments: '{"cmd":"ls"}' } }],
},
finish_reason: null,
}],
}),
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'tool_calls' }],
}),
// trailing usage-only chunk
makeChunk({
choices: [],
usage: { prompt_tokens: 500, completion_tokens: 30, total_tokens: 530 },
}),
])
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.delta.stop_reason).toBe('tool_use')
expect(msgDelta.usage.output_tokens).toBe(30)
})
test('message_delta always comes before message_stop', async () => {
// Verifies event ordering is preserved after deferring to post-loop emission.
const events = await collectEvents([
makeChunk({ choices: [{ index: 0, delta: { content: 'x' }, finish_reason: null }] }),
makeChunk({ choices: [{ index: 0, delta: {}, finish_reason: 'stop' }] }),
makeChunk({ choices: [], usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 } }),
])
const types = events.map(e => e.type)
const deltaIdx = types.lastIndexOf('message_delta')
const stopIdx = types.lastIndexOf('message_stop')
expect(deltaIdx).toBeGreaterThanOrEqual(0)
expect(stopIdx).toBeGreaterThan(deltaIdx)
})
// ── cache_read_input_tokens in message_delta (the core bug fix) ──────────
test('message_delta carries cache_read_input_tokens from trailing usage chunk', async () => {
// Real-world case: DeepSeek-V3 returns cached_tokens=19904
// in a trailing chunk with choices:[]. Previously message_delta only carried
// input_tokens and output_tokens, so cache_read_input_tokens stayed 0 after
// queryModelOpenAI's spread — even though cachedTokens was captured internally.
const events = await collectEvents([
makeChunk({
choices: [{ index: 0, delta: { content: 'answer' }, finish_reason: null }],
}),
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
}),
// trailing usage chunk matching the observed server response format
makeChunk({
choices: [],
usage: {
prompt_tokens: 30011,
completion_tokens: 190,
total_tokens: 30201,
prompt_tokens_details: { audio_tokens: 0, cached_tokens: 19904 },
} as any,
}),
])
// message_start is emitted before trailing chunk — cache fields are 0
const msgStart = events.find(e => e.type === 'message_start') as any
expect(msgStart.message.usage.cache_read_input_tokens).toBe(0)
// message_delta carries the real values from the trailing chunk
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.usage.input_tokens).toBe(30011)
expect(msgDelta.usage.output_tokens).toBe(190)
expect(msgDelta.usage.cache_read_input_tokens).toBe(19904)
expect(msgDelta.usage.cache_creation_input_tokens).toBe(0)
})
test('cache_read_input_tokens=0 in message_delta when cached_tokens is absent', async () => {
// Non-caching requests should still have the field present and zero.
const events = await collectEvents([
makeChunk({
choices: [{ index: 0, delta: { content: 'hi' }, finish_reason: null }],
}),
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
}),
makeChunk({
choices: [],
usage: { prompt_tokens: 100, completion_tokens: 20, total_tokens: 120 },
}),
])
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.usage.cache_read_input_tokens).toBe(0)
expect(msgDelta.usage.cache_creation_input_tokens).toBe(0)
})
test('cache_read_input_tokens=0 in message_delta when cached_tokens is 0', async () => {
// Explicit cached_tokens:0 should not be treated differently from absent.
const events = await collectEvents([
makeChunk({
choices: [{ index: 0, delta: { content: 'hi' }, finish_reason: null }],
}),
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
}),
makeChunk({
choices: [],
usage: {
prompt_tokens: 500,
completion_tokens: 50,
total_tokens: 550,
prompt_tokens_details: { cached_tokens: 0 },
} as any,
}),
])
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.usage.cache_read_input_tokens).toBe(0)
})
test('cache_read_input_tokens updated when cached_tokens arrives in same chunk as finish_reason', async () => {
// Some endpoints send usage in the finish_reason chunk instead of a trailing chunk.
const events = await collectEvents([
makeChunk({
choices: [{ index: 0, delta: { content: 'result' }, finish_reason: null }],
}),
makeChunk({
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
usage: {
prompt_tokens: 2000,
completion_tokens: 100,
total_tokens: 2100,
prompt_tokens_details: { cached_tokens: 1500 },
} as any,
}),
])
const msgDelta = events.find(e => e.type === 'message_delta') as any
expect(msgDelta.usage.cache_read_input_tokens).toBe(1500)
expect(msgDelta.usage.input_tokens).toBe(2000)
expect(msgDelta.usage.output_tokens).toBe(100)
})
})

View File

@@ -25,6 +25,7 @@ import { logForDebugging } from '../../../utils/debug.js'
import { addToTotalSessionCost } from '../../../cost-tracker.js'
import { calculateUSDCost } from '../../../utils/modelCost.js'
import { isEnvTruthy, isEnvDefinedFalsy } from '../../../utils/envUtils.js'
import { getModelMaxOutputTokens } from '../../../utils/context.js'
import type { Options } from '../claude.js'
import { randomUUID } from 'crypto'
import {
@@ -81,12 +82,14 @@ export function buildOpenAIRequestBody(params: {
tools: any[]
toolChoice: any
enableThinking: boolean
maxTokens: number
temperatureOverride?: number
}): Record<string, any> {
const { model, messages, tools, toolChoice, enableThinking, temperatureOverride } = params
const { model, messages, tools, toolChoice, enableThinking, maxTokens, temperatureOverride } = params
return {
model,
messages,
max_tokens: maxTokens,
...(tools.length > 0 && {
tools,
...(toolChoice && { tool_choice: toolChoice }),
@@ -110,6 +113,56 @@ export function buildOpenAIRequestBody(params: {
}
}
/**
* Assemble the final AssistantMessage (and optional max_tokens error) from
* accumulated stream state. Extracted to avoid duplication between the
* `message_stop` handler and the post-loop safety fallback.
*/
function assembleFinalAssistantOutputs(params: {
partialMessage: any
contentBlocks: Record<number, any>
tools: Tools
agentId: string | undefined
usage: { input_tokens: number; output_tokens: number; cache_creation_input_tokens: number; cache_read_input_tokens: number }
stopReason: string | null
maxTokens: number
}): (AssistantMessage | SystemAPIErrorMessage)[] {
const { partialMessage, contentBlocks, tools, agentId, usage, stopReason, maxTokens } = params
const outputs: (AssistantMessage | SystemAPIErrorMessage)[] = []
const allBlocks = Object.keys(contentBlocks)
.sort((a, b) => Number(a) - Number(b))
.map(k => contentBlocks[Number(k)])
.filter(Boolean)
if (allBlocks.length > 0) {
outputs.push({
message: {
...partialMessage,
content: normalizeContentFromAPI(allBlocks, tools, agentId),
usage,
stop_reason: stopReason,
stop_sequence: null,
},
requestId: undefined,
type: 'assistant',
uuid: randomUUID(),
timestamp: new Date().toISOString(),
} as AssistantMessage)
}
if (stopReason === 'max_tokens') {
outputs.push(createAssistantAPIErrorMessage({
content: `Output truncated: response exceeded the ${maxTokens} token limit. ` +
`Set CLAUDE_CODE_MAX_OUTPUT_TOKENS to override.`,
apiError: 'max_output_tokens',
error: 'max_output_tokens',
}))
}
return outputs
}
/**
* OpenAI-compatible query path. Converts Anthropic-format messages/tools to
* OpenAI format, calls the OpenAI-compatible endpoint, and converts the
@@ -212,7 +265,20 @@ export async function* queryModelOpenAI(
)
}
// 10. Get client and make streaming request
// 10. Compute max_tokens — required by most OpenAI-compatible endpoints.
// Without this the server uses a tiny default, and when
// thinking is enabled the thinking phase consumes the entire budget
// leaving no tokens for the final response.
//
// Use upperLimit (not the slot-cap default) because the Anthropic path's
// slot-reservation cap (CAPPED_DEFAULT_MAX_TOKENS=8k) is paired with an
// auto-retry at 64k in query.ts. The OpenAI path has no such retry, so
// using the capped 8k default would silently truncate responses in
// multi-turn conversations where thinking consumes most of the budget.
const { upperLimit } = getModelMaxOutputTokens(openaiModel)
const maxTokens = options.maxOutputTokensOverride ?? upperLimit
// 11. Get client
const client = getOpenAIClient({
maxRetries: 0,
fetchOverride: options.fetchOverride,
@@ -223,13 +289,14 @@ export async function* queryModelOpenAI(
`[OpenAI] Calling model=${openaiModel}, messages=${openaiMessages.length}, tools=${openaiTools.length}, thinking=${enableThinking}`,
)
// 11. Call OpenAI API with streaming
// 12. Call OpenAI API with streaming
const requestBody = buildOpenAIRequestBody({
model: openaiModel,
messages: openaiMessages,
tools: openaiTools,
toolChoice: openaiToolChoice,
enableThinking,
maxTokens,
temperatureOverride: options.temperatureOverride,
})
const stream = await client.chat.completions.create(
@@ -244,6 +311,7 @@ export async function* queryModelOpenAI(
// Accumulate content blocks and usage, same as the Anthropic path in claude.ts
const contentBlocks: Record<number, any> = {}
let partialMessage: any
let stopReason: string | null = null
let usage = {
input_tokens: 0,
output_tokens: 0,
@@ -297,21 +365,7 @@ export async function* queryModelOpenAI(
break
}
case 'content_block_stop': {
const idx = (event as any).index
const block = contentBlocks[idx]
if (!block || !partialMessage) break
const m: AssistantMessage = {
message: {
...partialMessage,
content: normalizeContentFromAPI([block], tools, options.agentId),
},
requestId: undefined,
type: 'assistant',
uuid: randomUUID(),
timestamp: new Date().toISOString(),
}
yield m
// Block accumulation is complete; assembly happens at message_stop.
break
}
case 'message_delta': {
@@ -319,21 +373,33 @@ export async function* queryModelOpenAI(
if (deltaUsage) {
usage = { ...usage, ...deltaUsage }
}
// Update the stop_reason on the last yielded message
// (we don't have a reference here, but the consumer handles this)
if ((event as any).delta?.stop_reason != null) {
stopReason = (event as any).delta.stop_reason
}
break
}
case 'message_stop':
case 'message_stop': {
// Assemble ONE AssistantMessage with ALL content blocks, matching the
// Anthropic SDK path. Real usage (input + output tokens) is available
// here and injected so tokenCountWithEstimation() can read it.
if (partialMessage) {
for (const output of assembleFinalAssistantOutputs({
partialMessage, contentBlocks, tools, agentId: options.agentId,
usage, stopReason, maxTokens,
})) {
yield output
}
// Reset partialMessage so the post-loop safety fallback does not
// yield a second identical AssistantMessage.
partialMessage = null
}
// Track cost and token usage
if (usage.input_tokens + usage.output_tokens > 0) {
const costUSD = calculateUSDCost(openaiModel, usage as any)
addToTotalSessionCost(costUSD, usage as any, options.model)
}
break
}
// Track cost and token usage (matching the Anthropic path in claude.ts)
if (
event.type === 'message_stop' &&
usage.input_tokens + usage.output_tokens > 0
) {
const costUSD = calculateUSDCost(openaiModel, usage as any)
addToTotalSessionCost(costUSD, usage as any, options.model)
}
}
// Also yield as StreamEvent for real-time display (matching Anthropic path)
@@ -343,6 +409,16 @@ export async function* queryModelOpenAI(
...(event.type === 'message_start' ? { ttftMs } : undefined),
} as StreamEvent
}
// Safety: if stream ended without message_stop, assemble and yield whatever we have
if (partialMessage) {
for (const output of assembleFinalAssistantOutputs({
partialMessage, contentBlocks, tools, agentId: options.agentId,
usage, stopReason, maxTokens,
})) {
yield output
}
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logForDebugging(`[OpenAI] Error: ${errorMessage}`, { level: 'error' })
@@ -352,4 +428,4 @@ export async function* queryModelOpenAI(
error: error instanceof Error ? error : new Error(String(error)),
})
}
}
}

View File

@@ -11,7 +11,16 @@ import { randomUUID } from 'crypto'
* delta.content → content_block_start(text) + text_delta + content_block_stop
* delta.tool_calls → content_block_start(tool_use) + input_json_delta + content_block_stop
* finish_reason → message_delta(stop_reason) + message_stop
* usage.cached_tokens → cache_read_input_tokens in message_start usage
*
* Usage field mapping (OpenAI → Anthropic):
* prompt_tokens → input_tokens
* completion_tokens → output_tokens
* prompt_tokens_details.cached_tokens → cache_read_input_tokens
* (no OpenAI equivalent) → cache_creation_input_tokens (always 0)
*
* All four fields are emitted in the post-loop message_delta (not message_start)
* so that trailing usage chunks (sent after finish_reason by some
* OpenAI-compatible endpoints) are fully captured before the final counts are reported.
*
* Thinking support:
* DeepSeek and compatible providers send `delta.reasoning_content` for chain-of-thought.
@@ -41,26 +50,43 @@ export async function* adaptOpenAIStreamToAnthropic(
// Track text block state
let textBlockOpen = false
// Track usage
// Track usage — all four Anthropic fields, populated from OpenAI usage fields:
// prompt_tokens → input_tokens
// completion_tokens → output_tokens
// prompt_tokens_details.cached_tokens → cache_read_input_tokens
// (no standard OpenAI equivalent) → cache_creation_input_tokens (always 0)
let inputTokens = 0
let outputTokens = 0
let cachedTokens = 0
let cachedReadTokens = 0
// Track all open content block indices (for cleanup)
const openBlockIndices = new Set<number>()
// Deferred finish state: populated when finish_reason is encountered so that
// message_delta / message_stop are emitted AFTER the stream loop ends.
// This ensures usage chunks that arrive after the finish_reason chunk are
// captured before we emit the final token counts.
let pendingFinishReason: string | null = null
let pendingHasToolCalls = false
for await (const chunk of stream) {
const choice = chunk.choices?.[0]
const delta = choice?.delta
// Extract usage from any chunk that carries it
// Extract usage from any chunk that carries it.
// Many OpenAI-compatible endpoints (e.g. DeepSeek) send usage in a separate
// final chunk that arrives AFTER the finish_reason chunk. Reading it here
// (before emitting message_delta) ensures the token counts are available
// when we later emit message_delta.
if (chunk.usage) {
inputTokens = chunk.usage.prompt_tokens ?? inputTokens
outputTokens = chunk.usage.completion_tokens ?? outputTokens
// OpenAI prompt caching: prompt_tokens_details.cached_tokens
// → Anthropic cache_read_input_tokens
// Note: OpenAI has no equivalent for cache_creation_input_tokens.
const details = (chunk.usage as any).prompt_tokens_details
if (details?.cached_tokens) {
cachedTokens = details.cached_tokens
if (details?.cached_tokens != null) {
cachedReadTokens = details.cached_tokens
}
}
@@ -82,7 +108,7 @@ export async function* adaptOpenAIStreamToAnthropic(
input_tokens: inputTokens,
output_tokens: 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: cachedTokens,
cache_read_input_tokens: cachedReadTokens,
},
},
} as BetaRawMessageStreamEvent
@@ -224,7 +250,10 @@ export async function* adaptOpenAIStreamToAnthropic(
}
}
// Handle finish
// Handle finish: close all open content blocks and record the finish_reason.
// message_delta + message_stop are emitted AFTER the stream loop so that any
// trailing usage chunk (sent after the finish chunk by some endpoints)
// is captured first — ensuring token counts are non-zero.
if (choice?.finish_reason) {
// Close thinking block if still open
if (thinkingBlockOpen) {
@@ -257,27 +286,10 @@ export async function* adaptOpenAIStreamToAnthropic(
}
}
// Map finish_reason to Anthropic stop_reason.
// Some backends return "stop" even when tool_calls are present —
// force "tool_use" when we saw any tool blocks to ensure the query
// loop actually executes the tools.
const hasToolCalls = toolBlocks.size > 0
const stopReason = hasToolCalls ? 'tool_use' : mapFinishReason(choice.finish_reason)
yield {
type: 'message_delta',
delta: {
stop_reason: stopReason,
stop_sequence: null,
},
usage: {
output_tokens: outputTokens,
},
} as BetaRawMessageStreamEvent
yield {
type: 'message_stop',
} as BetaRawMessageStreamEvent
// Defer message_delta / message_stop until after the loop so that any
// trailing usage chunk is processed before we emit the final token counts.
pendingFinishReason = choice.finish_reason
pendingHasToolCalls = toolBlocks.size > 0
}
}
@@ -288,6 +300,54 @@ export async function* adaptOpenAIStreamToAnthropic(
index: idx,
} as BetaRawMessageStreamEvent
}
// Emit message_delta + message_stop now that the stream is fully consumed.
// Usage values (inputTokens / outputTokens) reflect all chunks including any
// trailing usage-only chunk sent after the finish_reason chunk.
if (pendingFinishReason !== null) {
// Map finish_reason to Anthropic stop_reason.
// CRITICAL: When finish_reason is 'length' (token budget exhausted), always
// report 'max_tokens' regardless of whether partial tool calls were received.
// Otherwise the query loop would try to execute tool calls with incomplete
// JSON arguments instead of triggering the max_tokens retry/recovery path.
const stopReason =
pendingFinishReason === 'length'
? 'max_tokens'
: pendingHasToolCalls
? 'tool_use'
: mapFinishReason(pendingFinishReason)
yield {
type: 'message_delta',
delta: {
stop_reason: stopReason,
stop_sequence: null,
},
// Carry all four Anthropic usage fields so queryModelOpenAI's message_delta
// handler (which spreads this into the accumulated usage object) can override
// every field that message_start emitted as 0. For endpoints that send usage
// in a trailing chunk (e.g. DeepSeek), message_start is emitted on the first
// content chunk before the trailing usage chunk arrives, so all four fields
// start at 0. By the time we reach here (post-loop) the trailing chunk has
// been processed and all values reflect the real counts.
//
// OpenAI → Anthropic field mapping:
// prompt_tokens → input_tokens
// completion_tokens → output_tokens
// prompt_tokens_details.cached_tokens → cache_read_input_tokens
// (no OpenAI equivalent) → cache_creation_input_tokens (stays 0)
usage: {
input_tokens: inputTokens,
output_tokens: outputTokens,
cache_read_input_tokens: cachedReadTokens,
cache_creation_input_tokens: 0,
},
} as BetaRawMessageStreamEvent
yield {
type: 'message_stop',
} as BetaRawMessageStreamEvent
}
}
/**

View File

@@ -1,6 +1,16 @@
import { describe, expect, test } from "bun:test";
import { describe, expect, test, beforeAll, afterAll } from "bun:test";
import { formatBriefTimestamp } from "../formatBriefTimestamp";
let savedLcAll: string | undefined;
beforeAll(() => {
savedLcAll = process.env.LC_ALL;
process.env.LC_ALL = "en_US.UTF-8";
});
afterAll(() => {
if (savedLcAll === undefined) delete process.env.LC_ALL;
else process.env.LC_ALL = savedLcAll;
});
describe("formatBriefTimestamp", () => {
// Fixed "now" for deterministic tests: 2026-04-02T14:00:00Z (Thursday)
const now = new Date("2026-04-02T14:00:00Z");

View File

@@ -76,6 +76,7 @@ function getLocale(): string | undefined {
}
}
/** Return the epoch-ms of the start of the local calendar day for `d`. */
function startOfDay(d: Date): number {
return new Date(d.getFullYear(), d.getMonth(), d.getDate()).getTime()
}