mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-22 08:15:53 +00:00
Merge remote-tracking branch 'guunergooner/fix/openai-stop-reason-usage'
This commit is contained in:
454
src/services/api/openai/__tests__/queryModelOpenAI.test.ts
Normal file
454
src/services/api/openai/__tests__/queryModelOpenAI.test.ts
Normal file
@@ -0,0 +1,454 @@
|
|||||||
|
/**
|
||||||
|
* Tests for queryModelOpenAI in index.ts.
|
||||||
|
*
|
||||||
|
* Focused on the two bugs fixed:
|
||||||
|
* 1. stop_reason was always null in the assembled AssistantMessage because
|
||||||
|
* partialMessage (from message_start) has stop_reason: null, and the
|
||||||
|
* stop_reason captured from message_delta was never applied.
|
||||||
|
* 2. partialMessage was not reset to null after message_stop, so the safety
|
||||||
|
* fallback at the end of the loop would yield a second identical
|
||||||
|
* AssistantMessage (causing doubled content in the next API request).
|
||||||
|
*
|
||||||
|
* Strategy: mock getOpenAIClient + adaptOpenAIStreamToAnthropic so we can
|
||||||
|
* feed pre-built Anthropic events directly into queryModelOpenAI and inspect
|
||||||
|
* what it emits — without any real HTTP calls.
|
||||||
|
*/
|
||||||
|
import { describe, expect, test, mock, beforeEach, afterEach } from 'bun:test'
|
||||||
|
import type { BetaRawMessageStreamEvent } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
|
||||||
|
import type { AssistantMessage, StreamEvent } from '../../../../types/message.js'
|
||||||
|
|
||||||
|
// ─── helpers ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/** Build a minimal message_start event */
|
||||||
|
function makeMessageStart(overrides: Record<string, any> = {}): BetaRawMessageStreamEvent {
|
||||||
|
return {
|
||||||
|
type: 'message_start',
|
||||||
|
message: {
|
||||||
|
id: 'msg_test',
|
||||||
|
type: 'message',
|
||||||
|
role: 'assistant',
|
||||||
|
content: [],
|
||||||
|
model: 'test-model',
|
||||||
|
stop_reason: null,
|
||||||
|
stop_sequence: null,
|
||||||
|
usage: { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
|
||||||
|
...overrides,
|
||||||
|
},
|
||||||
|
} as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build a content_block_start event for the given block type */
|
||||||
|
function makeContentBlockStart(index: number, type: 'text' | 'tool_use' | 'thinking', extra: Record<string, any> = {}): BetaRawMessageStreamEvent {
|
||||||
|
const block =
|
||||||
|
type === 'text'
|
||||||
|
? { type: 'text', text: '' }
|
||||||
|
: type === 'tool_use'
|
||||||
|
? { type: 'tool_use', id: 'toolu_test', name: 'bash', input: {} }
|
||||||
|
: { type: 'thinking', thinking: '', signature: '' }
|
||||||
|
return { type: 'content_block_start', index, content_block: { ...block, ...extra } } as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build a text_delta content_block_delta event */
|
||||||
|
function makeTextDelta(index: number, text: string): BetaRawMessageStreamEvent {
|
||||||
|
return { type: 'content_block_delta', index, delta: { type: 'text_delta', text } } as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build an input_json_delta content_block_delta event */
|
||||||
|
function makeInputJsonDelta(index: number, json: string): BetaRawMessageStreamEvent {
|
||||||
|
return { type: 'content_block_delta', index, delta: { type: 'input_json_delta', partial_json: json } } as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build a thinking_delta content_block_delta event */
|
||||||
|
function makeThinkingDelta(index: number, thinking: string): BetaRawMessageStreamEvent {
|
||||||
|
return { type: 'content_block_delta', index, delta: { type: 'thinking_delta', thinking } } as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build a content_block_stop event */
|
||||||
|
function makeContentBlockStop(index: number): BetaRawMessageStreamEvent {
|
||||||
|
return { type: 'content_block_stop', index } as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build a message_delta event with stop_reason and output_tokens */
|
||||||
|
function makeMessageDelta(stopReason: string, outputTokens: number): BetaRawMessageStreamEvent {
|
||||||
|
return {
|
||||||
|
type: 'message_delta',
|
||||||
|
delta: { stop_reason: stopReason, stop_sequence: null },
|
||||||
|
usage: { output_tokens: outputTokens },
|
||||||
|
} as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Build a message_stop event */
|
||||||
|
function makeMessageStop(): BetaRawMessageStreamEvent {
|
||||||
|
return { type: 'message_stop' } as any
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Async generator from a fixed array of events */
|
||||||
|
async function* eventStream(events: BetaRawMessageStreamEvent[]) {
|
||||||
|
for (const e of events) yield e
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Collect all outputs from queryModelOpenAI into typed buckets */
|
||||||
|
async function runQueryModel(
|
||||||
|
events: BetaRawMessageStreamEvent[],
|
||||||
|
envOverrides: Record<string, string | undefined> = {},
|
||||||
|
) {
|
||||||
|
// Wire events into the mocked stream adapter
|
||||||
|
_nextEvents = events
|
||||||
|
// Save + apply env overrides
|
||||||
|
const saved: Record<string, string | undefined> = {}
|
||||||
|
for (const [k, v] of Object.entries(envOverrides)) {
|
||||||
|
saved[k] = process.env[k]
|
||||||
|
if (v === undefined) delete process.env[k]
|
||||||
|
else process.env[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// We inline mock.module inside the try block.
|
||||||
|
// Bun resolves mock.module at the call site synchronously (hoisted),
|
||||||
|
// so we register once per test file, then re-import each time.
|
||||||
|
const { queryModelOpenAI } = await import('../index.js')
|
||||||
|
|
||||||
|
const assistantMessages: AssistantMessage[] = []
|
||||||
|
const streamEvents: StreamEvent[] = []
|
||||||
|
const otherOutputs: any[] = []
|
||||||
|
|
||||||
|
const minimalOptions: any = {
|
||||||
|
model: 'test-model',
|
||||||
|
tools: [],
|
||||||
|
agents: [],
|
||||||
|
querySource: 'main_loop',
|
||||||
|
getToolPermissionContext: async () => ({
|
||||||
|
alwaysAllow: [],
|
||||||
|
alwaysDeny: [],
|
||||||
|
needsPermission: [],
|
||||||
|
mode: 'default',
|
||||||
|
isBypassingPermissions: false,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
for await (const item of queryModelOpenAI(
|
||||||
|
[],
|
||||||
|
{ type: 'text', text: '' } as any,
|
||||||
|
[],
|
||||||
|
new AbortController().signal,
|
||||||
|
minimalOptions,
|
||||||
|
)) {
|
||||||
|
if (item.type === 'assistant') {
|
||||||
|
assistantMessages.push(item as AssistantMessage)
|
||||||
|
} else if (item.type === 'stream_event') {
|
||||||
|
streamEvents.push(item as StreamEvent)
|
||||||
|
} else {
|
||||||
|
otherOutputs.push(item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { assistantMessages, streamEvents, otherOutputs }
|
||||||
|
} finally {
|
||||||
|
// Restore env
|
||||||
|
for (const [k, v] of Object.entries(saved)) {
|
||||||
|
if (v === undefined) delete process.env[k]
|
||||||
|
else process.env[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── mock setup ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// We mock at module level. Bun's mock.module replaces the module for the
|
||||||
|
// entire file, so we configure the stream per-test via a shared variable.
|
||||||
|
let _nextEvents: BetaRawMessageStreamEvent[] = []
|
||||||
|
|
||||||
|
/** Captured arguments from the last chat.completions.create() call */
|
||||||
|
let _lastCreateArgs: Record<string, any> | null = null
|
||||||
|
|
||||||
|
mock.module('../client.js', () => ({
|
||||||
|
getOpenAIClient: () => ({
|
||||||
|
chat: {
|
||||||
|
completions: {
|
||||||
|
create: async (args: Record<string, any>) => {
|
||||||
|
_lastCreateArgs = args
|
||||||
|
return { [Symbol.asyncIterator]: async function* () {} }
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../streamAdapter.js', () => ({
|
||||||
|
adaptOpenAIStreamToAnthropic: (_stream: any, _model: string) => eventStream(_nextEvents),
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../modelMapping.js', () => ({
|
||||||
|
resolveOpenAIModel: (m: string) => m,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../convertMessages.js', () => ({
|
||||||
|
anthropicMessagesToOpenAI: () => [],
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../convertTools.js', () => ({
|
||||||
|
anthropicToolsToOpenAI: () => [],
|
||||||
|
anthropicToolChoiceToOpenAI: () => undefined,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../utils/context.js', () => ({
|
||||||
|
getModelMaxOutputTokens: () => ({ upperLimit: 8192, default: 8192 }),
|
||||||
|
getContextWindowForModel: () => 200_000,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../utils/messages.js', () => ({
|
||||||
|
normalizeMessagesForAPI: (msgs: any) => msgs,
|
||||||
|
normalizeContentFromAPI: (blocks: any[]) => blocks,
|
||||||
|
createAssistantAPIErrorMessage: (opts: any) => ({
|
||||||
|
type: 'assistant',
|
||||||
|
message: { content: [{ type: 'text', text: opts.content }], apiError: opts.apiError },
|
||||||
|
uuid: 'error-uuid',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
}),
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../utils/api.js', () => ({
|
||||||
|
toolToAPISchema: async (t: any) => t,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../utils/toolSearch.js', () => ({
|
||||||
|
isToolSearchEnabled: async () => false,
|
||||||
|
extractDiscoveredToolNames: () => new Set(),
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../tools/ToolSearchTool/prompt.js', () => ({
|
||||||
|
isDeferredTool: () => false,
|
||||||
|
TOOL_SEARCH_TOOL_NAME: '__tool_search__',
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../cost-tracker.js', () => ({
|
||||||
|
addToTotalSessionCost: () => {},
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../utils/modelCost.js', () => ({
|
||||||
|
calculateUSDCost: () => 0,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module('../../../../utils/debug.js', () => ({
|
||||||
|
logForDebugging: () => {},
|
||||||
|
}))
|
||||||
|
|
||||||
|
// ─── tests ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
describe('queryModelOpenAI — stop_reason propagation', () => {
|
||||||
|
test('assembled AssistantMessage has stop_reason end_turn (not null)', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'Hello'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('end_turn', 10),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
expect(assistantMessages[0]!.message.stop_reason).toBe('end_turn')
|
||||||
|
})
|
||||||
|
|
||||||
|
test('assembled AssistantMessage has stop_reason tool_use', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'tool_use'),
|
||||||
|
makeInputJsonDelta(0, '{"cmd":"ls"}'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('tool_use', 20),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
expect(assistantMessages[0]!.message.stop_reason).toBe('tool_use')
|
||||||
|
})
|
||||||
|
|
||||||
|
test('assembled AssistantMessage has stop_reason max_tokens', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'truncated'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('max_tokens', 8192),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
// Two assistant-typed items: the content message + the max_output_tokens error signal.
|
||||||
|
// The error signal is emitted as a synthetic assistant message by createAssistantAPIErrorMessage.
|
||||||
|
expect(assistantMessages).toHaveLength(2)
|
||||||
|
const contentMsg = assistantMessages[0]!
|
||||||
|
expect(contentMsg.message.stop_reason).toBe('max_tokens')
|
||||||
|
// Second item is the error signal (has apiError set)
|
||||||
|
const errorMsg = assistantMessages[1]!.message as any
|
||||||
|
expect(errorMsg.apiError).toBe('max_output_tokens')
|
||||||
|
})
|
||||||
|
|
||||||
|
test('stop_reason is null when no message_delta was received (safety fallback path)', async () => {
|
||||||
|
// Stream ends without message_stop — triggers the safety fallback branch.
|
||||||
|
// stop_reason stays null since no message_delta was ever seen.
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'partial'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
// No message_delta / message_stop
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
// Safety fallback should yield the partial content
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
expect(assistantMessages[0]!.message.stop_reason).toBeNull()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('queryModelOpenAI — usage accumulation', () => {
|
||||||
|
test('usage in assembled message reflects all four fields from message_delta', async () => {
|
||||||
|
// message_start has all fields=0 (trailing-chunk pattern: usage not yet available).
|
||||||
|
// message_delta carries the real values after stream ends.
|
||||||
|
// The spread in the message_delta handler must override all zeros from message_start,
|
||||||
|
// including cache_read_input_tokens which was previously missing from message_delta.
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart({ usage: { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 } }),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'response'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
// message_delta carries all four Anthropic usage fields (as emitted by the fixed streamAdapter)
|
||||||
|
{
|
||||||
|
type: 'message_delta',
|
||||||
|
delta: { stop_reason: 'end_turn', stop_sequence: null },
|
||||||
|
usage: { input_tokens: 30011, output_tokens: 190, cache_read_input_tokens: 19904, cache_creation_input_tokens: 0 },
|
||||||
|
} as any,
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
const usage = assistantMessages[0]!.message.usage as any
|
||||||
|
expect(usage.input_tokens).toBe(30011)
|
||||||
|
expect(usage.output_tokens).toBe(190)
|
||||||
|
// cache_read_input_tokens from message_delta overrides the 0 from message_start
|
||||||
|
expect(usage.cache_read_input_tokens).toBe(19904)
|
||||||
|
expect(usage.cache_creation_input_tokens).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('usage is zero when no usage events arrive (prevents false autocompact)', async () => {
|
||||||
|
// If usage stays 0, tokenCountWithEstimation will undercount — so at least
|
||||||
|
// verify the field exists and is numeric (to detect regressions).
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'hi'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('end_turn', 0),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
const usage = assistantMessages[0]!.message.usage as any
|
||||||
|
expect(typeof usage.input_tokens).toBe('number')
|
||||||
|
expect(typeof usage.output_tokens).toBe('number')
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('queryModelOpenAI — no duplicate AssistantMessage (partialMessage reset)', () => {
|
||||||
|
test('yields exactly one AssistantMessage per message_stop when content is present', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'only once'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('end_turn', 5),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
// Before the fix, partialMessage was not reset to null, so the safety
|
||||||
|
// fallback at the end of the loop would yield a second message with the
|
||||||
|
// same message.id — causing mergeAssistantMessages to concatenate content.
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('thinking + text response yields exactly one AssistantMessage', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'thinking'),
|
||||||
|
makeThinkingDelta(0, 'let me think'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeContentBlockStart(1, 'text'),
|
||||||
|
makeTextDelta(1, 'answer'),
|
||||||
|
makeContentBlockStop(1),
|
||||||
|
makeMessageDelta('end_turn', 30),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('safety fallback path still yields message when stream ends without message_stop', async () => {
|
||||||
|
// Simulates a stream that cuts off without the normal termination sequence.
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'abrupt end'),
|
||||||
|
// No content_block_stop, no message_delta, no message_stop
|
||||||
|
]
|
||||||
|
|
||||||
|
const { assistantMessages } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
expect(assistantMessages).toHaveLength(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('queryModelOpenAI — stream_events forwarded', () => {
|
||||||
|
test('every adapted event is also yielded as stream_event for real-time display', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'hello'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('end_turn', 5),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
const { streamEvents } = await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
const eventTypes = streamEvents.map(e => (e as any).event?.type)
|
||||||
|
expect(eventTypes).toContain('message_start')
|
||||||
|
expect(eventTypes).toContain('content_block_start')
|
||||||
|
expect(eventTypes).toContain('content_block_delta')
|
||||||
|
expect(eventTypes).toContain('content_block_stop')
|
||||||
|
expect(eventTypes).toContain('message_delta')
|
||||||
|
expect(eventTypes).toContain('message_stop')
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('queryModelOpenAI — max_tokens forwarded to request', () => {
|
||||||
|
test('buildOpenAIRequestBody includes max_tokens in the request payload', async () => {
|
||||||
|
_nextEvents = [
|
||||||
|
makeMessageStart(),
|
||||||
|
makeContentBlockStart(0, 'text'),
|
||||||
|
makeTextDelta(0, 'hi'),
|
||||||
|
makeContentBlockStop(0),
|
||||||
|
makeMessageDelta('end_turn', 5),
|
||||||
|
makeMessageStop(),
|
||||||
|
]
|
||||||
|
|
||||||
|
await runQueryModel(_nextEvents)
|
||||||
|
|
||||||
|
expect(_lastCreateArgs).not.toBeNull()
|
||||||
|
expect(_lastCreateArgs!.max_tokens).toBe(8192)
|
||||||
|
})
|
||||||
|
})
|
||||||
@@ -29,6 +29,7 @@ function makeChunk(overrides: Partial<ChatCompletionChunk> & any = {}): ChatComp
|
|||||||
} as ChatCompletionChunk
|
} as ChatCompletionChunk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Collect all emitted Anthropic events from the stream adapter for assertion */
|
||||||
async function collectEvents(chunks: ChatCompletionChunk[]) {
|
async function collectEvents(chunks: ChatCompletionChunk[]) {
|
||||||
const events: any[] = []
|
const events: any[] = []
|
||||||
for await (const event of adaptOpenAIStreamToAnthropic(mockStream(chunks), 'gpt-4o')) {
|
for await (const event of adaptOpenAIStreamToAnthropic(mockStream(chunks), 'gpt-4o')) {
|
||||||
@@ -453,4 +454,206 @@ describe('prompt caching support', () => {
|
|||||||
expect(msgStart.message.usage.cache_read_input_tokens).toBe(0)
|
expect(msgStart.message.usage.cache_read_input_tokens).toBe(0)
|
||||||
expect(msgStart.message.usage.input_tokens).toBe(500)
|
expect(msgStart.message.usage.input_tokens).toBe(500)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('captures output_tokens and input_tokens from trailing chunk sent after finish_reason', async () => {
|
||||||
|
// Many OpenAI-compatible endpoints (e.g. DeepSeek) send usage in a separate
|
||||||
|
// final chunk AFTER the finish_reason chunk, with choices: [].
|
||||||
|
// message_delta must carry both input_tokens and output_tokens so that
|
||||||
|
// queryModelOpenAI's spread can override the zeros from message_start — which is
|
||||||
|
// emitted before the trailing chunk and always has input_tokens=0.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: { content: 'hello' }, finish_reason: null }],
|
||||||
|
}),
|
||||||
|
// finish_reason chunk — usage not yet available
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
|
||||||
|
}),
|
||||||
|
// trailing usage-only chunk (choices: [])
|
||||||
|
makeChunk({
|
||||||
|
choices: [],
|
||||||
|
usage: { prompt_tokens: 123, completion_tokens: 45, total_tokens: 168 },
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
// message_start emits on the first chunk before trailing usage arrives
|
||||||
|
const msgStart = events.find(e => e.type === 'message_start') as any
|
||||||
|
expect(msgStart.message.usage.input_tokens).toBe(0)
|
||||||
|
|
||||||
|
// message_delta is emitted after stream loop ends with final real values
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.usage.input_tokens).toBe(123)
|
||||||
|
expect(msgDelta.usage.output_tokens).toBe(45)
|
||||||
|
expect(msgDelta.delta.stop_reason).toBe('end_turn')
|
||||||
|
})
|
||||||
|
|
||||||
|
test('captures input_tokens from trailing chunk (used by tokenCountWithEstimation for autocompact)', async () => {
|
||||||
|
// input_tokens is the dominant term in tokenCountWithEstimation. Without it,
|
||||||
|
// getTokenCountFromUsage returns only output_tokens (~100-700), which is far below
|
||||||
|
// the autocompact threshold (~33k), so compaction never fires.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: { content: 'answer' }, finish_reason: null }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [],
|
||||||
|
usage: { prompt_tokens: 800, completion_tokens: 200, total_tokens: 1000 },
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.usage.input_tokens).toBe(800)
|
||||||
|
expect(msgDelta.usage.output_tokens).toBe(200)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('trailing usage chunk with tool_calls: stop_reason stays tool_use', async () => {
|
||||||
|
// Verifies that deferring message_delta does not break stop_reason mapping
|
||||||
|
// when the model made tool calls and usage arrives in a trailing chunk.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{
|
||||||
|
index: 0,
|
||||||
|
delta: {
|
||||||
|
tool_calls: [{ index: 0, id: 'call_x', function: { name: 'bash', arguments: '{"cmd":"ls"}' } }],
|
||||||
|
},
|
||||||
|
finish_reason: null,
|
||||||
|
}],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'tool_calls' }],
|
||||||
|
}),
|
||||||
|
// trailing usage-only chunk
|
||||||
|
makeChunk({
|
||||||
|
choices: [],
|
||||||
|
usage: { prompt_tokens: 500, completion_tokens: 30, total_tokens: 530 },
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.delta.stop_reason).toBe('tool_use')
|
||||||
|
expect(msgDelta.usage.output_tokens).toBe(30)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('message_delta always comes before message_stop', async () => {
|
||||||
|
// Verifies event ordering is preserved after deferring to post-loop emission.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({ choices: [{ index: 0, delta: { content: 'x' }, finish_reason: null }] }),
|
||||||
|
makeChunk({ choices: [{ index: 0, delta: {}, finish_reason: 'stop' }] }),
|
||||||
|
makeChunk({ choices: [], usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 } }),
|
||||||
|
])
|
||||||
|
|
||||||
|
const types = events.map(e => e.type)
|
||||||
|
const deltaIdx = types.lastIndexOf('message_delta')
|
||||||
|
const stopIdx = types.lastIndexOf('message_stop')
|
||||||
|
expect(deltaIdx).toBeGreaterThanOrEqual(0)
|
||||||
|
expect(stopIdx).toBeGreaterThan(deltaIdx)
|
||||||
|
})
|
||||||
|
|
||||||
|
// ── cache_read_input_tokens in message_delta (the core bug fix) ──────────
|
||||||
|
|
||||||
|
test('message_delta carries cache_read_input_tokens from trailing usage chunk', async () => {
|
||||||
|
// Real-world case: DeepSeek-V3 returns cached_tokens=19904
|
||||||
|
// in a trailing chunk with choices:[]. Previously message_delta only carried
|
||||||
|
// input_tokens and output_tokens, so cache_read_input_tokens stayed 0 after
|
||||||
|
// queryModelOpenAI's spread — even though cachedTokens was captured internally.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: { content: 'answer' }, finish_reason: null }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
|
||||||
|
}),
|
||||||
|
// trailing usage chunk matching the observed server response format
|
||||||
|
makeChunk({
|
||||||
|
choices: [],
|
||||||
|
usage: {
|
||||||
|
prompt_tokens: 30011,
|
||||||
|
completion_tokens: 190,
|
||||||
|
total_tokens: 30201,
|
||||||
|
prompt_tokens_details: { audio_tokens: 0, cached_tokens: 19904 },
|
||||||
|
} as any,
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
// message_start is emitted before trailing chunk — cache fields are 0
|
||||||
|
const msgStart = events.find(e => e.type === 'message_start') as any
|
||||||
|
expect(msgStart.message.usage.cache_read_input_tokens).toBe(0)
|
||||||
|
|
||||||
|
// message_delta carries the real values from the trailing chunk
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.usage.input_tokens).toBe(30011)
|
||||||
|
expect(msgDelta.usage.output_tokens).toBe(190)
|
||||||
|
expect(msgDelta.usage.cache_read_input_tokens).toBe(19904)
|
||||||
|
expect(msgDelta.usage.cache_creation_input_tokens).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('cache_read_input_tokens=0 in message_delta when cached_tokens is absent', async () => {
|
||||||
|
// Non-caching requests should still have the field present and zero.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: { content: 'hi' }, finish_reason: null }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [],
|
||||||
|
usage: { prompt_tokens: 100, completion_tokens: 20, total_tokens: 120 },
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.usage.cache_read_input_tokens).toBe(0)
|
||||||
|
expect(msgDelta.usage.cache_creation_input_tokens).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('cache_read_input_tokens=0 in message_delta when cached_tokens is 0', async () => {
|
||||||
|
// Explicit cached_tokens:0 should not be treated differently from absent.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: { content: 'hi' }, finish_reason: null }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [],
|
||||||
|
usage: {
|
||||||
|
prompt_tokens: 500,
|
||||||
|
completion_tokens: 50,
|
||||||
|
total_tokens: 550,
|
||||||
|
prompt_tokens_details: { cached_tokens: 0 },
|
||||||
|
} as any,
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.usage.cache_read_input_tokens).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('cache_read_input_tokens updated when cached_tokens arrives in same chunk as finish_reason', async () => {
|
||||||
|
// Some endpoints send usage in the finish_reason chunk instead of a trailing chunk.
|
||||||
|
const events = await collectEvents([
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: { content: 'result' }, finish_reason: null }],
|
||||||
|
}),
|
||||||
|
makeChunk({
|
||||||
|
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
|
||||||
|
usage: {
|
||||||
|
prompt_tokens: 2000,
|
||||||
|
completion_tokens: 100,
|
||||||
|
total_tokens: 2100,
|
||||||
|
prompt_tokens_details: { cached_tokens: 1500 },
|
||||||
|
} as any,
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
|
||||||
|
const msgDelta = events.find(e => e.type === 'message_delta') as any
|
||||||
|
expect(msgDelta.usage.cache_read_input_tokens).toBe(1500)
|
||||||
|
expect(msgDelta.usage.input_tokens).toBe(2000)
|
||||||
|
expect(msgDelta.usage.output_tokens).toBe(100)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import { logForDebugging } from '../../../utils/debug.js'
|
|||||||
import { addToTotalSessionCost } from '../../../cost-tracker.js'
|
import { addToTotalSessionCost } from '../../../cost-tracker.js'
|
||||||
import { calculateUSDCost } from '../../../utils/modelCost.js'
|
import { calculateUSDCost } from '../../../utils/modelCost.js'
|
||||||
import { isEnvTruthy, isEnvDefinedFalsy } from '../../../utils/envUtils.js'
|
import { isEnvTruthy, isEnvDefinedFalsy } from '../../../utils/envUtils.js'
|
||||||
|
import { getModelMaxOutputTokens } from '../../../utils/context.js'
|
||||||
import type { Options } from '../claude.js'
|
import type { Options } from '../claude.js'
|
||||||
import { randomUUID } from 'crypto'
|
import { randomUUID } from 'crypto'
|
||||||
import {
|
import {
|
||||||
@@ -87,16 +88,18 @@ export function buildOpenAIRequestBody(params: {
|
|||||||
tools: any[]
|
tools: any[]
|
||||||
toolChoice: any
|
toolChoice: any
|
||||||
enableThinking: boolean
|
enableThinking: boolean
|
||||||
|
maxTokens: number
|
||||||
temperatureOverride?: number
|
temperatureOverride?: number
|
||||||
}): ChatCompletionCreateParamsStreaming & {
|
}): ChatCompletionCreateParamsStreaming & {
|
||||||
thinking?: { type: string }
|
thinking?: { type: string }
|
||||||
enable_thinking?: boolean
|
enable_thinking?: boolean
|
||||||
chat_template_kwargs?: { thinking: boolean }
|
chat_template_kwargs?: { thinking: boolean }
|
||||||
} {
|
} {
|
||||||
const { model, messages, tools, toolChoice, enableThinking, temperatureOverride } = params
|
const { model, messages, tools, toolChoice, enableThinking, maxTokens, temperatureOverride } = params
|
||||||
return {
|
return {
|
||||||
model,
|
model,
|
||||||
messages,
|
messages,
|
||||||
|
max_tokens: maxTokens,
|
||||||
...(tools.length > 0 && {
|
...(tools.length > 0 && {
|
||||||
tools,
|
tools,
|
||||||
...(toolChoice && { tool_choice: toolChoice }),
|
...(toolChoice && { tool_choice: toolChoice }),
|
||||||
@@ -120,6 +123,56 @@ export function buildOpenAIRequestBody(params: {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assemble the final AssistantMessage (and optional max_tokens error) from
|
||||||
|
* accumulated stream state. Extracted to avoid duplication between the
|
||||||
|
* `message_stop` handler and the post-loop safety fallback.
|
||||||
|
*/
|
||||||
|
function assembleFinalAssistantOutputs(params: {
|
||||||
|
partialMessage: any
|
||||||
|
contentBlocks: Record<number, any>
|
||||||
|
tools: Tools
|
||||||
|
agentId: string | undefined
|
||||||
|
usage: { input_tokens: number; output_tokens: number; cache_creation_input_tokens: number; cache_read_input_tokens: number }
|
||||||
|
stopReason: string | null
|
||||||
|
maxTokens: number
|
||||||
|
}): (AssistantMessage | SystemAPIErrorMessage)[] {
|
||||||
|
const { partialMessage, contentBlocks, tools, agentId, usage, stopReason, maxTokens } = params
|
||||||
|
const outputs: (AssistantMessage | SystemAPIErrorMessage)[] = []
|
||||||
|
|
||||||
|
const allBlocks = Object.keys(contentBlocks)
|
||||||
|
.sort((a, b) => Number(a) - Number(b))
|
||||||
|
.map(k => contentBlocks[Number(k)])
|
||||||
|
.filter(Boolean)
|
||||||
|
|
||||||
|
if (allBlocks.length > 0) {
|
||||||
|
outputs.push({
|
||||||
|
message: {
|
||||||
|
...partialMessage,
|
||||||
|
content: normalizeContentFromAPI(allBlocks, tools, agentId),
|
||||||
|
usage,
|
||||||
|
stop_reason: stopReason,
|
||||||
|
stop_sequence: null,
|
||||||
|
},
|
||||||
|
requestId: undefined,
|
||||||
|
type: 'assistant',
|
||||||
|
uuid: randomUUID(),
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
} as AssistantMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stopReason === 'max_tokens') {
|
||||||
|
outputs.push(createAssistantAPIErrorMessage({
|
||||||
|
content: `Output truncated: response exceeded the ${maxTokens} token limit. ` +
|
||||||
|
`Set CLAUDE_CODE_MAX_OUTPUT_TOKENS to override.`,
|
||||||
|
apiError: 'max_output_tokens',
|
||||||
|
error: 'max_output_tokens',
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
return outputs
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* OpenAI-compatible query path. Converts Anthropic-format messages/tools to
|
* OpenAI-compatible query path. Converts Anthropic-format messages/tools to
|
||||||
* OpenAI format, calls the OpenAI-compatible endpoint, and converts the
|
* OpenAI format, calls the OpenAI-compatible endpoint, and converts the
|
||||||
@@ -222,7 +275,20 @@ export async function* queryModelOpenAI(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 10. Get client and make streaming request
|
// 10. Compute max_tokens — required by most OpenAI-compatible endpoints.
|
||||||
|
// Without this the server uses a tiny default, and when
|
||||||
|
// thinking is enabled the thinking phase consumes the entire budget
|
||||||
|
// leaving no tokens for the final response.
|
||||||
|
//
|
||||||
|
// Use upperLimit (not the slot-cap default) because the Anthropic path's
|
||||||
|
// slot-reservation cap (CAPPED_DEFAULT_MAX_TOKENS=8k) is paired with an
|
||||||
|
// auto-retry at 64k in query.ts. The OpenAI path has no such retry, so
|
||||||
|
// using the capped 8k default would silently truncate responses in
|
||||||
|
// multi-turn conversations where thinking consumes most of the budget.
|
||||||
|
const { upperLimit } = getModelMaxOutputTokens(openaiModel)
|
||||||
|
const maxTokens = options.maxOutputTokensOverride ?? upperLimit
|
||||||
|
|
||||||
|
// 11. Get client
|
||||||
const client = getOpenAIClient({
|
const client = getOpenAIClient({
|
||||||
maxRetries: 0,
|
maxRetries: 0,
|
||||||
fetchOverride: options.fetchOverride as unknown as typeof fetch,
|
fetchOverride: options.fetchOverride as unknown as typeof fetch,
|
||||||
@@ -233,13 +299,14 @@ export async function* queryModelOpenAI(
|
|||||||
`[OpenAI] Calling model=${openaiModel}, messages=${openaiMessages.length}, tools=${openaiTools.length}, thinking=${enableThinking}`,
|
`[OpenAI] Calling model=${openaiModel}, messages=${openaiMessages.length}, tools=${openaiTools.length}, thinking=${enableThinking}`,
|
||||||
)
|
)
|
||||||
|
|
||||||
// 11. Call OpenAI API with streaming
|
// 12. Call OpenAI API with streaming
|
||||||
const requestBody = buildOpenAIRequestBody({
|
const requestBody = buildOpenAIRequestBody({
|
||||||
model: openaiModel,
|
model: openaiModel,
|
||||||
messages: openaiMessages,
|
messages: openaiMessages,
|
||||||
tools: openaiTools,
|
tools: openaiTools,
|
||||||
toolChoice: openaiToolChoice,
|
toolChoice: openaiToolChoice,
|
||||||
enableThinking,
|
enableThinking,
|
||||||
|
maxTokens,
|
||||||
temperatureOverride: options.temperatureOverride,
|
temperatureOverride: options.temperatureOverride,
|
||||||
})
|
})
|
||||||
const stream = await client.chat.completions.create(
|
const stream = await client.chat.completions.create(
|
||||||
@@ -254,6 +321,7 @@ export async function* queryModelOpenAI(
|
|||||||
// Accumulate content blocks and usage, same as the Anthropic path in claude.ts
|
// Accumulate content blocks and usage, same as the Anthropic path in claude.ts
|
||||||
const contentBlocks: Record<number, any> = {}
|
const contentBlocks: Record<number, any> = {}
|
||||||
let partialMessage: any
|
let partialMessage: any
|
||||||
|
let stopReason: string | null = null
|
||||||
let usage = {
|
let usage = {
|
||||||
input_tokens: 0,
|
input_tokens: 0,
|
||||||
output_tokens: 0,
|
output_tokens: 0,
|
||||||
@@ -307,21 +375,7 @@ export async function* queryModelOpenAI(
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'content_block_stop': {
|
case 'content_block_stop': {
|
||||||
const idx = (event as any).index
|
// Block accumulation is complete; assembly happens at message_stop.
|
||||||
const block = contentBlocks[idx]
|
|
||||||
if (!block || !partialMessage) break
|
|
||||||
|
|
||||||
const m: AssistantMessage = {
|
|
||||||
message: {
|
|
||||||
...partialMessage,
|
|
||||||
content: normalizeContentFromAPI([block], tools, options.agentId),
|
|
||||||
},
|
|
||||||
requestId: undefined,
|
|
||||||
type: 'assistant',
|
|
||||||
uuid: randomUUID(),
|
|
||||||
timestamp: new Date().toISOString(),
|
|
||||||
}
|
|
||||||
yield m
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'message_delta': {
|
case 'message_delta': {
|
||||||
@@ -329,21 +383,33 @@ export async function* queryModelOpenAI(
|
|||||||
if (deltaUsage) {
|
if (deltaUsage) {
|
||||||
usage = { ...usage, ...deltaUsage }
|
usage = { ...usage, ...deltaUsage }
|
||||||
}
|
}
|
||||||
// Update the stop_reason on the last yielded message
|
if ((event as any).delta?.stop_reason != null) {
|
||||||
// (we don't have a reference here, but the consumer handles this)
|
stopReason = (event as any).delta.stop_reason
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'message_stop':
|
case 'message_stop': {
|
||||||
|
// Assemble ONE AssistantMessage with ALL content blocks, matching the
|
||||||
|
// Anthropic SDK path. Real usage (input + output tokens) is available
|
||||||
|
// here and injected so tokenCountWithEstimation() can read it.
|
||||||
|
if (partialMessage) {
|
||||||
|
for (const output of assembleFinalAssistantOutputs({
|
||||||
|
partialMessage, contentBlocks, tools, agentId: options.agentId,
|
||||||
|
usage, stopReason, maxTokens,
|
||||||
|
})) {
|
||||||
|
yield output
|
||||||
|
}
|
||||||
|
// Reset partialMessage so the post-loop safety fallback does not
|
||||||
|
// yield a second identical AssistantMessage.
|
||||||
|
partialMessage = null
|
||||||
|
}
|
||||||
|
// Track cost and token usage
|
||||||
|
if (usage.input_tokens + usage.output_tokens > 0) {
|
||||||
|
const costUSD = calculateUSDCost(openaiModel, usage as any)
|
||||||
|
addToTotalSessionCost(costUSD, usage as any, options.model)
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track cost and token usage (matching the Anthropic path in claude.ts)
|
|
||||||
if (
|
|
||||||
event.type === 'message_stop' &&
|
|
||||||
usage.input_tokens + usage.output_tokens > 0
|
|
||||||
) {
|
|
||||||
const costUSD = calculateUSDCost(openaiModel, usage as any)
|
|
||||||
addToTotalSessionCost(costUSD, usage as any, options.model)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also yield as StreamEvent for real-time display (matching Anthropic path)
|
// Also yield as StreamEvent for real-time display (matching Anthropic path)
|
||||||
@@ -353,6 +419,16 @@ export async function* queryModelOpenAI(
|
|||||||
...(event.type === 'message_start' ? { ttftMs } : undefined),
|
...(event.type === 'message_start' ? { ttftMs } : undefined),
|
||||||
} as StreamEvent
|
} as StreamEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Safety: if stream ended without message_stop, assemble and yield whatever we have
|
||||||
|
if (partialMessage) {
|
||||||
|
for (const output of assembleFinalAssistantOutputs({
|
||||||
|
partialMessage, contentBlocks, tools, agentId: options.agentId,
|
||||||
|
usage, stopReason, maxTokens,
|
||||||
|
})) {
|
||||||
|
yield output
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||||
logForDebugging(`[OpenAI] Error: ${errorMessage}`, { level: 'error' })
|
logForDebugging(`[OpenAI] Error: ${errorMessage}`, { level: 'error' })
|
||||||
@@ -362,4 +438,4 @@ export async function* queryModelOpenAI(
|
|||||||
error: (error instanceof Error ? error : new Error(String(error))) as unknown as SDKAssistantMessageError,
|
error: (error instanceof Error ? error : new Error(String(error))) as unknown as SDKAssistantMessageError,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,16 @@ import { randomUUID } from 'crypto'
|
|||||||
* delta.content → content_block_start(text) + text_delta + content_block_stop
|
* delta.content → content_block_start(text) + text_delta + content_block_stop
|
||||||
* delta.tool_calls → content_block_start(tool_use) + input_json_delta + content_block_stop
|
* delta.tool_calls → content_block_start(tool_use) + input_json_delta + content_block_stop
|
||||||
* finish_reason → message_delta(stop_reason) + message_stop
|
* finish_reason → message_delta(stop_reason) + message_stop
|
||||||
* usage.cached_tokens → cache_read_input_tokens in message_start usage
|
*
|
||||||
|
* Usage field mapping (OpenAI → Anthropic):
|
||||||
|
* prompt_tokens → input_tokens
|
||||||
|
* completion_tokens → output_tokens
|
||||||
|
* prompt_tokens_details.cached_tokens → cache_read_input_tokens
|
||||||
|
* (no OpenAI equivalent) → cache_creation_input_tokens (always 0)
|
||||||
|
*
|
||||||
|
* All four fields are emitted in the post-loop message_delta (not message_start)
|
||||||
|
* so that trailing usage chunks (sent after finish_reason by some
|
||||||
|
* OpenAI-compatible endpoints) are fully captured before the final counts are reported.
|
||||||
*
|
*
|
||||||
* Thinking support:
|
* Thinking support:
|
||||||
* DeepSeek and compatible providers send `delta.reasoning_content` for chain-of-thought.
|
* DeepSeek and compatible providers send `delta.reasoning_content` for chain-of-thought.
|
||||||
@@ -41,26 +50,43 @@ export async function* adaptOpenAIStreamToAnthropic(
|
|||||||
// Track text block state
|
// Track text block state
|
||||||
let textBlockOpen = false
|
let textBlockOpen = false
|
||||||
|
|
||||||
// Track usage
|
// Track usage — all four Anthropic fields, populated from OpenAI usage fields:
|
||||||
|
// prompt_tokens → input_tokens
|
||||||
|
// completion_tokens → output_tokens
|
||||||
|
// prompt_tokens_details.cached_tokens → cache_read_input_tokens
|
||||||
|
// (no standard OpenAI equivalent) → cache_creation_input_tokens (always 0)
|
||||||
let inputTokens = 0
|
let inputTokens = 0
|
||||||
let outputTokens = 0
|
let outputTokens = 0
|
||||||
let cachedTokens = 0
|
let cachedReadTokens = 0
|
||||||
|
|
||||||
// Track all open content block indices (for cleanup)
|
// Track all open content block indices (for cleanup)
|
||||||
const openBlockIndices = new Set<number>()
|
const openBlockIndices = new Set<number>()
|
||||||
|
|
||||||
|
// Deferred finish state: populated when finish_reason is encountered so that
|
||||||
|
// message_delta / message_stop are emitted AFTER the stream loop ends.
|
||||||
|
// This ensures usage chunks that arrive after the finish_reason chunk are
|
||||||
|
// captured before we emit the final token counts.
|
||||||
|
let pendingFinishReason: string | null = null
|
||||||
|
let pendingHasToolCalls = false
|
||||||
|
|
||||||
for await (const chunk of stream) {
|
for await (const chunk of stream) {
|
||||||
const choice = chunk.choices?.[0]
|
const choice = chunk.choices?.[0]
|
||||||
const delta = choice?.delta
|
const delta = choice?.delta
|
||||||
|
|
||||||
// Extract usage from any chunk that carries it
|
// Extract usage from any chunk that carries it.
|
||||||
|
// Many OpenAI-compatible endpoints (e.g. DeepSeek) send usage in a separate
|
||||||
|
// final chunk that arrives AFTER the finish_reason chunk. Reading it here
|
||||||
|
// (before emitting message_delta) ensures the token counts are available
|
||||||
|
// when we later emit message_delta.
|
||||||
if (chunk.usage) {
|
if (chunk.usage) {
|
||||||
inputTokens = chunk.usage.prompt_tokens ?? inputTokens
|
inputTokens = chunk.usage.prompt_tokens ?? inputTokens
|
||||||
outputTokens = chunk.usage.completion_tokens ?? outputTokens
|
outputTokens = chunk.usage.completion_tokens ?? outputTokens
|
||||||
// OpenAI prompt caching: prompt_tokens_details.cached_tokens
|
// OpenAI prompt caching: prompt_tokens_details.cached_tokens
|
||||||
|
// → Anthropic cache_read_input_tokens
|
||||||
|
// Note: OpenAI has no equivalent for cache_creation_input_tokens.
|
||||||
const details = (chunk.usage as any).prompt_tokens_details
|
const details = (chunk.usage as any).prompt_tokens_details
|
||||||
if (details?.cached_tokens) {
|
if (details?.cached_tokens != null) {
|
||||||
cachedTokens = details.cached_tokens
|
cachedReadTokens = details.cached_tokens
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,7 +108,7 @@ export async function* adaptOpenAIStreamToAnthropic(
|
|||||||
input_tokens: inputTokens,
|
input_tokens: inputTokens,
|
||||||
output_tokens: 0,
|
output_tokens: 0,
|
||||||
cache_creation_input_tokens: 0,
|
cache_creation_input_tokens: 0,
|
||||||
cache_read_input_tokens: cachedTokens,
|
cache_read_input_tokens: cachedReadTokens,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
} as BetaRawMessageStreamEvent
|
} as BetaRawMessageStreamEvent
|
||||||
@@ -224,7 +250,10 @@ export async function* adaptOpenAIStreamToAnthropic(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle finish
|
// Handle finish: close all open content blocks and record the finish_reason.
|
||||||
|
// message_delta + message_stop are emitted AFTER the stream loop so that any
|
||||||
|
// trailing usage chunk (sent after the finish chunk by some endpoints)
|
||||||
|
// is captured first — ensuring token counts are non-zero.
|
||||||
if (choice?.finish_reason) {
|
if (choice?.finish_reason) {
|
||||||
// Close thinking block if still open
|
// Close thinking block if still open
|
||||||
if (thinkingBlockOpen) {
|
if (thinkingBlockOpen) {
|
||||||
@@ -257,27 +286,10 @@ export async function* adaptOpenAIStreamToAnthropic(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map finish_reason to Anthropic stop_reason.
|
// Defer message_delta / message_stop until after the loop so that any
|
||||||
// Some backends return "stop" even when tool_calls are present —
|
// trailing usage chunk is processed before we emit the final token counts.
|
||||||
// force "tool_use" when we saw any tool blocks to ensure the query
|
pendingFinishReason = choice.finish_reason
|
||||||
// loop actually executes the tools.
|
pendingHasToolCalls = toolBlocks.size > 0
|
||||||
const hasToolCalls = toolBlocks.size > 0
|
|
||||||
const stopReason = hasToolCalls ? 'tool_use' : mapFinishReason(choice.finish_reason)
|
|
||||||
|
|
||||||
yield {
|
|
||||||
type: 'message_delta',
|
|
||||||
delta: {
|
|
||||||
stop_reason: stopReason,
|
|
||||||
stop_sequence: null,
|
|
||||||
},
|
|
||||||
usage: {
|
|
||||||
output_tokens: outputTokens,
|
|
||||||
},
|
|
||||||
} as BetaRawMessageStreamEvent
|
|
||||||
|
|
||||||
yield {
|
|
||||||
type: 'message_stop',
|
|
||||||
} as BetaRawMessageStreamEvent
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,6 +300,54 @@ export async function* adaptOpenAIStreamToAnthropic(
|
|||||||
index: idx,
|
index: idx,
|
||||||
} as BetaRawMessageStreamEvent
|
} as BetaRawMessageStreamEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Emit message_delta + message_stop now that the stream is fully consumed.
|
||||||
|
// Usage values (inputTokens / outputTokens) reflect all chunks including any
|
||||||
|
// trailing usage-only chunk sent after the finish_reason chunk.
|
||||||
|
if (pendingFinishReason !== null) {
|
||||||
|
// Map finish_reason to Anthropic stop_reason.
|
||||||
|
// CRITICAL: When finish_reason is 'length' (token budget exhausted), always
|
||||||
|
// report 'max_tokens' regardless of whether partial tool calls were received.
|
||||||
|
// Otherwise the query loop would try to execute tool calls with incomplete
|
||||||
|
// JSON arguments instead of triggering the max_tokens retry/recovery path.
|
||||||
|
const stopReason =
|
||||||
|
pendingFinishReason === 'length'
|
||||||
|
? 'max_tokens'
|
||||||
|
: pendingHasToolCalls
|
||||||
|
? 'tool_use'
|
||||||
|
: mapFinishReason(pendingFinishReason)
|
||||||
|
|
||||||
|
yield {
|
||||||
|
type: 'message_delta',
|
||||||
|
delta: {
|
||||||
|
stop_reason: stopReason,
|
||||||
|
stop_sequence: null,
|
||||||
|
},
|
||||||
|
// Carry all four Anthropic usage fields so queryModelOpenAI's message_delta
|
||||||
|
// handler (which spreads this into the accumulated usage object) can override
|
||||||
|
// every field that message_start emitted as 0. For endpoints that send usage
|
||||||
|
// in a trailing chunk (e.g. DeepSeek), message_start is emitted on the first
|
||||||
|
// content chunk before the trailing usage chunk arrives, so all four fields
|
||||||
|
// start at 0. By the time we reach here (post-loop) the trailing chunk has
|
||||||
|
// been processed and all values reflect the real counts.
|
||||||
|
//
|
||||||
|
// OpenAI → Anthropic field mapping:
|
||||||
|
// prompt_tokens → input_tokens
|
||||||
|
// completion_tokens → output_tokens
|
||||||
|
// prompt_tokens_details.cached_tokens → cache_read_input_tokens
|
||||||
|
// (no OpenAI equivalent) → cache_creation_input_tokens (stays 0)
|
||||||
|
usage: {
|
||||||
|
input_tokens: inputTokens,
|
||||||
|
output_tokens: outputTokens,
|
||||||
|
cache_read_input_tokens: cachedReadTokens,
|
||||||
|
cache_creation_input_tokens: 0,
|
||||||
|
},
|
||||||
|
} as BetaRawMessageStreamEvent
|
||||||
|
|
||||||
|
yield {
|
||||||
|
type: 'message_stop',
|
||||||
|
} as BetaRawMessageStreamEvent
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -1,6 +1,16 @@
|
|||||||
import { describe, expect, test } from "bun:test";
|
import { describe, expect, test, beforeAll, afterAll } from "bun:test";
|
||||||
import { formatBriefTimestamp } from "../formatBriefTimestamp";
|
import { formatBriefTimestamp } from "../formatBriefTimestamp";
|
||||||
|
|
||||||
|
let savedLcAll: string | undefined;
|
||||||
|
beforeAll(() => {
|
||||||
|
savedLcAll = process.env.LC_ALL;
|
||||||
|
process.env.LC_ALL = "en_US.UTF-8";
|
||||||
|
});
|
||||||
|
afterAll(() => {
|
||||||
|
if (savedLcAll === undefined) delete process.env.LC_ALL;
|
||||||
|
else process.env.LC_ALL = savedLcAll;
|
||||||
|
});
|
||||||
|
|
||||||
describe("formatBriefTimestamp", () => {
|
describe("formatBriefTimestamp", () => {
|
||||||
// Fixed "now" for deterministic tests: 2026-04-02T14:00:00Z (Thursday)
|
// Fixed "now" for deterministic tests: 2026-04-02T14:00:00Z (Thursday)
|
||||||
const now = new Date("2026-04-02T14:00:00Z");
|
const now = new Date("2026-04-02T14:00:00Z");
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ function getLocale(): string | undefined {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Return the epoch-ms of the start of the local calendar day for `d`. */
|
||||||
function startOfDay(d: Date): number {
|
function startOfDay(d: Date): number {
|
||||||
return new Date(d.getFullYear(), d.getMonth(), d.getDate()).getTime()
|
return new Date(d.getFullYear(), d.getMonth(), d.getDate()).getTime()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user