feat: add codex provider via Responses API

This commit is contained in:
Kaxtrel
2026-04-24 17:40:09 +08:00
committed by claude-code-best
parent 3cb4828de6
commit 7d4b27c01a
29 changed files with 3438 additions and 23 deletions

View File

@@ -1347,6 +1347,12 @@ async function* queryModel(
return
}
if (getAPIProvider() === 'codex') {
const { queryModelCodex } = await import('./codex/index.js')
yield* queryModelCodex(messagesForAPI, systemPrompt, filteredTools, signal, options)
return
}
if (getAPIProvider() === 'gemini') {
const { queryModelGemini } = await import('./gemini/index.js')
yield* queryModelGemini(

View File

@@ -0,0 +1,408 @@
import { describe, expect, test } from 'bun:test'
import { createAssistantMessage, createUserMessage } from '../../../../utils/messages.js'
import { anthropicMessagesToCodexInput } from '../convertMessages.js'
import { anthropicToolsToCodex } from '../convertTools.js'
describe('anthropicMessagesToCodexInput', () => {
test('replays assistant tool calls and user tool results in order', async () => {
const assistant = createAssistantMessage({
content: [
'I will inspect the file.',
{
type: 'tool_use',
id: 'tool_1',
name: 'Read',
input: { file_path: 'README.md' },
},
'Then I will summarize.',
] as any,
})
const user = createUserMessage({
content: [
{
type: 'tool_result',
tool_use_id: 'tool_1',
content: [
{ type: 'text', text: 'file contents' },
{ type: 'text', text: 'second line' },
],
},
'Please continue.',
] as any,
})
const items = await anthropicMessagesToCodexInput([assistant, user])
expect(items).toHaveLength(5)
expect(items[0]).toMatchObject({
type: 'message',
role: 'assistant',
})
expect(items[0]).not.toHaveProperty('id')
expect(items[0]).not.toHaveProperty('status')
expect(items[1]).toMatchObject({
type: 'function_call',
call_id: 'tool_1',
name: 'Read',
arguments: '{"file_path":"README.md"}',
})
expect(items[1]).not.toHaveProperty('id')
expect(items[1]).not.toHaveProperty('status')
expect(items[2]).toMatchObject({
type: 'message',
role: 'assistant',
})
expect(items[2]).not.toHaveProperty('id')
expect(items[2]).not.toHaveProperty('status')
expect(items[3]).toMatchObject({
type: 'function_call_output',
call_id: 'tool_1',
output: [
{ type: 'input_text', text: 'file contents' },
{ type: 'input_text', text: 'second line' },
],
})
expect(items[3]).not.toHaveProperty('id')
expect(items[3]).not.toHaveProperty('status')
expect(items[4]).toMatchObject({
type: 'message',
role: 'user',
})
})
test('normalizes tool call ids consistently across assistant replay and tool results', async () => {
const assistant = createAssistantMessage({
content: [
{
type: 'tool_use',
id: ' tool 1 / weird ',
name: 'Read',
input: { file_path: 'README.md' },
},
] as any,
})
const user = createUserMessage({
content: [
{
type: 'tool_result',
tool_use_id: ' tool 1 / weird ',
content: 'ok',
},
] as any,
})
const items = await anthropicMessagesToCodexInput([assistant, user])
expect(items[0]).toMatchObject({
type: 'function_call',
call_id: 'tool_1_weird',
})
expect(items[1]).toMatchObject({
type: 'function_call_output',
call_id: 'tool_1_weird',
output: 'ok',
})
})
test('creates a deterministic fallback tool call id when assistant replay is missing one', async () => {
const assistant = createAssistantMessage({
content: [
{
type: 'tool_use',
id: '',
name: 'Read',
input: { file_path: 'README.md' },
},
] as any,
})
const items = await anthropicMessagesToCodexInput([assistant])
expect(items[0]).toMatchObject({
type: 'function_call',
name: 'Read',
arguments: '{"file_path":"README.md"}',
})
expect((items[0] as any).call_id).toMatch(/^call_[a-f0-9]{24}$/)
})
test('degrades unsupported user media blocks to text placeholders', async () => {
const user = createUserMessage({
content: [
{ type: 'text', text: 'Inspect the attachment.' },
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/png',
data: 'abc',
},
},
] as any,
})
const items = await anthropicMessagesToCodexInput([user])
expect(items).toEqual([
{
type: 'message',
role: 'user',
content: [
{
type: 'input_text',
text:
'Inspect the attachment.\n[Image omitted: codex gateway currently requires remote image URLs. Configure CODEX_IMGBB_API_KEY to auto-convert local images.]',
},
],
},
])
})
test('passes through remote image URLs for user messages', async () => {
const user = createUserMessage({
content: [
{ type: 'text', text: 'Read the image.' },
{
type: 'image',
source: {
type: 'url',
url: 'https://example.com/vision.png',
},
},
] as any,
})
const items = await anthropicMessagesToCodexInput([user])
expect(items).toEqual([
{
type: 'message',
role: 'user',
content: [
{
type: 'input_text',
text: 'Read the image.',
},
{
type: 'input_image',
image_url: 'https://example.com/vision.png',
detail: 'high',
},
],
},
])
})
test('converts base64 user images through the configured inline resolver', async () => {
const user = createUserMessage({
content: [
{ type: 'text', text: 'Read the image.' },
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/png',
data: 'abc',
},
},
] as any,
})
const items = await anthropicMessagesToCodexInput([user], {
resolveBase64ImageUrl: async (data, mediaType) =>
data === 'abc' && mediaType === 'image/png'
? 'https://example.com/inline-uploaded.png'
: null,
})
expect(items).toEqual([
{
type: 'message',
role: 'user',
content: [
{
type: 'input_text',
text: 'Read the image.',
},
{
type: 'input_image',
image_url: 'https://example.com/inline-uploaded.png',
detail: 'high',
},
],
},
])
})
test('passes through remote image URLs inside tool results', async () => {
const assistant = createAssistantMessage({
content: [
{
type: 'tool_use',
id: 'tool_vision',
name: 'Read',
input: { file_path: '/tmp/screenshot.png' },
},
] as any,
})
const user = createUserMessage({
content: [
{
type: 'tool_result',
tool_use_id: 'tool_vision',
content: [
{ type: 'text', text: 'Screenshot attached.' },
{
type: 'image',
source: {
type: 'url',
url: 'https://example.com/tool-screenshot.png',
},
},
],
},
] as any,
})
const items = await anthropicMessagesToCodexInput([assistant, user])
expect(items[1]).toEqual({
type: 'function_call_output',
call_id: 'tool_vision',
output: [
{ type: 'input_text', text: 'Screenshot attached.' },
{
type: 'input_image',
image_url: 'https://example.com/tool-screenshot.png',
detail: 'high',
},
],
})
})
test('degrades unsupported tool result images to text placeholders', async () => {
const assistant = createAssistantMessage({
content: [
{
type: 'tool_use',
id: 'tool_vision',
name: 'Read',
input: { file_path: '/tmp/screenshot.png' },
},
] as any,
})
const user = createUserMessage({
content: [
{
type: 'tool_result',
tool_use_id: 'tool_vision',
content: [
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/png',
data: 'abc',
},
},
],
},
] as any,
})
const items = await anthropicMessagesToCodexInput([assistant, user])
expect(items[1]).toEqual({
type: 'function_call_output',
call_id: 'tool_vision',
output:
'[Image omitted: codex gateway currently requires remote image URLs. Configure CODEX_IMGBB_API_KEY to auto-convert local images.]',
})
})
test('converts base64 tool result images through the configured inline resolver', async () => {
const assistant = createAssistantMessage({
content: [
{
type: 'tool_use',
id: 'tool_vision',
name: 'Read',
input: { file_path: '/tmp/screenshot.png' },
},
] as any,
})
const user = createUserMessage({
content: [
{
type: 'tool_result',
tool_use_id: 'tool_vision',
content: [
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/png',
data: 'abc',
},
},
],
},
] as any,
})
const items = await anthropicMessagesToCodexInput([assistant, user], {
resolveBase64ImageUrl: async (data, mediaType) =>
data === 'abc' && mediaType === 'image/png'
? 'https://example.com/tool-inline-uploaded.png'
: null,
})
expect(items[1]).toEqual({
type: 'function_call_output',
call_id: 'tool_vision',
output: [
{
type: 'input_image',
image_url: 'https://example.com/tool-inline-uploaded.png',
detail: 'high',
},
],
})
})
})
describe('anthropicToolsToCodex', () => {
test('converts only client function tools', () => {
const tools = anthropicToolsToCodex([
{
name: 'Read',
description: 'Read a file',
input_schema: {
type: 'object',
properties: {
file_path: { type: 'string' },
},
},
strict: true,
} as any,
{
type: 'advisor_20260301',
} as any,
])
expect(tools).toEqual([
{
type: 'function',
name: 'Read',
description: 'Read a file',
parameters: {
type: 'object',
properties: {
file_path: { type: 'string' },
},
},
strict: true,
},
])
})
})

View File

@@ -0,0 +1,103 @@
import { afterEach, describe, expect, test } from 'bun:test'
import {
getCodexConfigurationError,
normalizeCodexError,
} from '../errors.js'
const originalCodexApiKey = process.env.CODEX_API_KEY
afterEach(() => {
if (originalCodexApiKey === undefined) {
delete process.env.CODEX_API_KEY
} else {
process.env.CODEX_API_KEY = originalCodexApiKey
}
})
describe('getCodexConfigurationError', () => {
test('reports missing CODEX_API_KEY clearly', () => {
delete process.env.CODEX_API_KEY
expect(getCodexConfigurationError()).toEqual({
content:
'Missing CODEX_API_KEY. Configure it in settings or your environment before using the codex provider.',
error: 'authentication_failed',
})
})
test('returns null when CODEX_API_KEY is present', () => {
process.env.CODEX_API_KEY = 'test-key'
expect(getCodexConfigurationError()).toBeNull()
})
})
describe('normalizeCodexError', () => {
test('maps authentication failures', () => {
expect(
normalizeCodexError({
status: 401,
message: 'invalid_api_key',
}),
).toEqual({
content:
'Codex authentication failed (401). Verify CODEX_API_KEY and CODEX_BASE_URL.',
error: 'authentication_failed',
})
})
test('maps missing endpoint failures', () => {
expect(
normalizeCodexError({
status: 404,
message: 'Not Found',
}),
).toEqual({
content:
'Codex endpoint not found (404). Verify CODEX_BASE_URL points to a Responses API root.',
error: 'invalid_request',
})
})
test('maps rate limits', () => {
expect(
normalizeCodexError({
status: 429,
message: 'Too Many Requests',
}),
).toEqual({
content:
'Codex rate limit reached (429). Retry shortly or reduce request volume.',
error: 'rate_limit',
})
})
test('maps upstream gateway 502 errors', () => {
expect(
normalizeCodexError({
status: 502,
message: 'Upstream request failed',
}),
).toEqual({
content:
'Codex gateway returned 502 Upstream request failed. This usually means a transient gateway issue or incomplete Responses API compatibility during tool replay.',
error: 'server_error',
})
})
test('passes through Codex preflight errors as invalid requests', () => {
expect(
normalizeCodexError(new Error('Codex preflight: input must be an array.')),
).toEqual({
content: 'Codex preflight: input must be an array.',
error: 'invalid_request',
})
})
test('falls back to generic API error text', () => {
expect(normalizeCodexError(new Error('socket hang up'))).toEqual({
content: 'API Error: socket hang up',
error: 'unknown',
})
})
})

View File

@@ -0,0 +1,103 @@
import { afterEach, beforeEach, describe, expect, test } from 'bun:test'
import { uploadCodexBase64Image } from '../imageUpload.js'
describe('codex image upload', () => {
const originalFetch = globalThis.fetch
const originalImgbbApiKey = process.env.CODEX_IMGBB_API_KEY
const originalUploadTimeout = process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS
const originalLegacyTimeout = process.env.CODEX_IMAGE_URL_TIMEOUT_MS
beforeEach(() => {
process.env.CODEX_IMGBB_API_KEY = 'imgbb-test-key'
delete process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS
delete process.env.CODEX_IMAGE_URL_TIMEOUT_MS
})
afterEach(() => {
globalThis.fetch = originalFetch
if (originalImgbbApiKey === undefined) {
delete process.env.CODEX_IMGBB_API_KEY
} else {
process.env.CODEX_IMGBB_API_KEY = originalImgbbApiKey
}
if (originalUploadTimeout === undefined) {
delete process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS
} else {
process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS = originalUploadTimeout
}
if (originalLegacyTimeout === undefined) {
delete process.env.CODEX_IMAGE_URL_TIMEOUT_MS
} else {
process.env.CODEX_IMAGE_URL_TIMEOUT_MS = originalLegacyTimeout
}
})
test('uploads inline base64 images to ImgBB and caches the result', async () => {
let fetchCalls = 0
globalThis.fetch = (async (input: string | URL | Request) => {
fetchCalls += 1
expect(String(input)).toBe(
'https://api.imgbb.com/1/upload?key=imgbb-test-key',
)
return new Response(
JSON.stringify({ data: { url: 'https://i.ibb.co/base64.png' } }),
{ status: 200 },
)
}) as unknown as typeof fetch
const first = await uploadCodexBase64Image('YWJj', 'image/png')
const second = await uploadCodexBase64Image('YWJj', 'image/png')
expect(first).toBe('https://i.ibb.co/base64.png')
expect(second).toBe('https://i.ibb.co/base64.png')
expect(fetchCalls).toBe(1)
})
test('prefers ImgBB derived variants before the raw url', async () => {
globalThis.fetch = (async () =>
new Response(
JSON.stringify({
data: {
url: 'https://i.ibb.co/raw/base64.png',
image: { url: 'https://i.ibb.co/image/base64.png' },
thumb: { url: 'https://i.ibb.co/thumb/base64.png' },
medium: { url: 'https://i.ibb.co/medium/base64.png' },
},
}),
{ status: 200 },
)) as unknown as typeof fetch
const url = await uploadCodexBase64Image('ZGVm', 'image/png')
expect(url).toBe('https://i.ibb.co/medium/base64.png')
})
test('prefers the new upload timeout env name over the legacy one', async () => {
let aborted = false
process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS = '1'
process.env.CODEX_IMAGE_URL_TIMEOUT_MS = '1000'
globalThis.fetch = (async (
_input: string | URL | Request,
init?: RequestInit,
) => {
const signal = init?.signal
if (!(signal instanceof AbortSignal)) {
throw new Error('Expected AbortSignal')
}
await new Promise<void>(resolve => {
signal.addEventListener('abort', () => {
aborted = true
resolve()
})
})
throw new Error('aborted')
}) as unknown as typeof fetch
const url = await uploadCodexBase64Image('Z2hp', 'image/png')
expect(url).toBeNull()
expect(aborted).toBe(true)
})
})

View File

@@ -0,0 +1,51 @@
import { describe, expect, test } from 'bun:test'
import { sanitizeCodexRequest } from '../preflight.js'
describe('sanitizeCodexRequest', () => {
test('normalizes function call ids and tool names', () => {
const request = sanitizeCodexRequest({
model: 'gpt-5.4',
input: [
{
type: 'function_call',
call_id: ' tool 1 / weird ',
name: ' Read ',
arguments: '{}',
},
] as any,
tools: [
{
type: 'function',
name: ' Read ',
parameters: null,
},
] as any,
} as any)
expect(request.input?.[0]).toMatchObject({
type: 'function_call',
call_id: 'tool_1_weird',
name: 'Read',
})
expect(request.tools?.[0]).toMatchObject({
type: 'function',
name: 'Read',
parameters: {},
})
})
test('rejects invalid function_call_output without call_id', () => {
expect(() =>
sanitizeCodexRequest({
model: 'gpt-5.4',
input: [
{
type: 'function_call_output',
call_id: ' ',
output: 'ok',
},
] as any,
} as any),
).toThrow('Codex preflight: function_call_output.call_id is required.')
})
})

View File

@@ -0,0 +1,416 @@
import { afterEach, beforeEach, describe, expect, mock, test } from 'bun:test'
import type { Response, ResponseStreamEvent } from 'openai/resources/responses/responses.mjs'
import { asSystemPrompt } from '../../../../utils/systemPromptType.js'
type StreamRun = {
events?: ResponseStreamEvent[]
finalResponse?: Response
error?: unknown
}
let streamRuns: StreamRun[] = []
let createRuns: StreamRun[] = []
let lastRequestBody: any
let lastCreateRequestBody: any
function makeResponse(overrides: Partial<Response> = {}): Response {
return {
id: 'resp_test',
object: 'response',
created_at: 0,
status: 'completed',
model: 'gpt-5.4',
output: [],
parallel_tool_calls: false,
store: false,
temperature: 1,
tool_choice: 'auto',
top_p: 1,
truncation: 'disabled',
usage: {
input_tokens: 12,
output_tokens: 8,
total_tokens: 20,
input_tokens_details: {
cached_tokens: 0,
},
output_tokens_details: {
reasoning_tokens: 0,
},
},
...overrides,
} as Response
}
function makeStream(run: StreamRun) {
return {
async *[Symbol.asyncIterator]() {
for (const event of run.events ?? []) {
yield event
}
},
finalResponse: async () => {
if (run.error) {
throw run.error
}
return run.finalResponse ?? makeResponse()
},
}
}
function makeCreateStream(run: StreamRun) {
return {
async *[Symbol.asyncIterator]() {
if (run.error) {
throw run.error
}
for (const event of run.events ?? []) {
yield event
}
},
}
}
mock.module('../client.js', () => ({
getCodexClient: () => ({
responses: {
stream: (body: any) => {
lastRequestBody = body
const run = streamRuns.shift()
if (!run) {
throw new Error('unexpected stream call')
}
if (run.error && !run.events) {
throw run.error
}
return makeStream(run)
},
create: async (body: any) => {
lastCreateRequestBody = body
const run = createRuns.shift()
if (!run) {
throw new Error('unexpected create call')
}
return makeCreateStream(run)
},
},
}),
}))
mock.module('../convertMessages.js', () => ({
anthropicMessagesToCodexInput: () => [],
}))
mock.module('../convertTools.js', () => ({
anthropicToolsToCodex: () => [],
}))
mock.module('../model.js', () => ({
resolveCodexModel: () => 'gpt-5.4',
resolveCodexMaxTokens: () => 4096,
}))
mock.module('../../../../utils/context.js', () => ({
getModelMaxOutputTokens: () => ({ upperLimit: 4096 }),
}))
mock.module('../../../../utils/api.js', () => ({
toolToAPISchema: async () => ({}),
}))
mock.module('../../../../utils/debug.js', () => ({
logForDebugging: () => {},
}))
mock.module('../../../../services/langfuse/tracing.js', () => ({
recordLLMObservation: () => {},
}))
mock.module('../../../../services/langfuse/convert.js', () => ({
convertMessagesToLangfuse: () => [],
convertOutputToLangfuse: () => [],
convertToolsToLangfuse: () => [],
}))
async function runQuery(
nextStreamRuns: StreamRun[],
nextCreateRuns: StreamRun[] = [],
systemPrompt = asSystemPrompt([]),
) {
streamRuns = [...nextStreamRuns]
createRuns = [...nextCreateRuns]
const { queryModelCodex } = await import('../index.js')
const assistantMessages: any[] = []
const streamEvents: any[] = []
const options: any = {
model: 'gpt-5.4',
agents: [],
querySource: 'main_loop',
getToolPermissionContext: async () => ({
alwaysAllow: [],
alwaysDeny: [],
needsPermission: [],
mode: 'default',
isBypassingPermissions: false,
}),
}
for await (const item of queryModelCodex(
[],
systemPrompt,
[],
new AbortController().signal,
options,
)) {
if (item.type === 'assistant') {
assistantMessages.push(item)
} else if (item.type === 'stream_event') {
streamEvents.push(item)
}
}
return { assistantMessages, streamEvents }
}
describe('queryModelCodex streaming fallback', () => {
const originalCodexApiKey = process.env.CODEX_API_KEY
beforeEach(() => {
process.env.CODEX_API_KEY = 'test-key'
})
afterEach(() => {
streamRuns = []
createRuns = []
lastRequestBody = undefined
lastCreateRequestBody = undefined
if (originalCodexApiKey === undefined) {
delete process.env.CODEX_API_KEY
} else {
process.env.CODEX_API_KEY = originalCodexApiKey
}
})
test('builds the final assistant text from streamed blocks when final snapshots are empty', async () => {
const response = makeResponse()
const events: ResponseStreamEvent[] = [
{ type: 'response.created', response } as any,
{
type: 'response.output_item.added',
output_index: 0,
item: {
type: 'message',
id: 'msg_1',
role: 'assistant',
content: [],
status: 'in_progress',
},
} as any,
{
type: 'response.output_text.delta',
output_index: 0,
item_id: 'msg_1',
delta: 'hello',
} as any,
{
type: 'response.output_text.done',
output_index: 0,
item_id: 'msg_1',
text: 'hello world',
} as any,
{ type: 'response.completed', response } as any,
]
const { assistantMessages, streamEvents } = await runQuery([
{ events, finalResponse: response },
])
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0].message.content).toEqual([
{ type: 'text', text: 'hello world' },
])
expect(assistantMessages[0].message.stop_reason).toBe('end_turn')
expect(
streamEvents.find((item: any) => item.event.type === 'message_delta')?.event.delta
.stop_reason,
).toBe('end_turn')
})
test('builds tool_use blocks from streamed arguments when final snapshots are empty', async () => {
const response = makeResponse()
const events: ResponseStreamEvent[] = [
{ type: 'response.created', response } as any,
{
type: 'response.output_item.added',
output_index: 0,
item: {
type: 'function_call',
id: 'fc_1',
call_id: 'call_1',
name: 'Read',
arguments: '',
status: 'in_progress',
},
} as any,
{
type: 'response.function_call_arguments.delta',
output_index: 0,
item_id: 'fc_1',
delta: '{"file_path":"README.md"}',
} as any,
{
type: 'response.function_call_arguments.done',
output_index: 0,
item_id: 'fc_1',
arguments: '{"file_path":"README.md"}',
} as any,
{ type: 'response.completed', response } as any,
]
const { assistantMessages, streamEvents } = await runQuery([
{ events, finalResponse: response },
])
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0].message.content).toEqual([
{
type: 'tool_use',
id: 'call_1',
name: 'Read',
input: { file_path: 'README.md' },
},
])
expect(assistantMessages[0].message.stop_reason).toBe('tool_use')
expect(
streamEvents.find((item: any) => item.event.type === 'message_delta')?.event.delta
.stop_reason,
).toBe('tool_use')
})
test('sends system prompt via top-level instructions instead of system messages', async () => {
const response = makeResponse({
output: [
{
type: 'message',
role: 'assistant',
content: [{ type: 'output_text', text: 'ok' }],
status: 'completed',
} as any,
],
output_text: 'ok',
})
const events: ResponseStreamEvent[] = [
{ type: 'response.created', response } as any,
{ type: 'response.completed', response } as any,
]
await runQuery(
[{ events, finalResponse: response }],
[],
asSystemPrompt(['system one', 'system two']),
)
expect(lastRequestBody.instructions).toBe('system one\n\nsystem two')
expect(lastRequestBody.input).toEqual([])
})
test('continues incomplete responses and aggregates usage across attempts', async () => {
const incompleteResponse = makeResponse({
status: 'incomplete',
incomplete_details: { reason: 'max_output_tokens' } as any,
usage: {
input_tokens: 10,
output_tokens: 4,
total_tokens: 14,
input_tokens_details: { cached_tokens: 1 },
output_tokens_details: { reasoning_tokens: 0 },
} as any,
output: [
{
type: 'message',
role: 'assistant',
content: [{ type: 'output_text', text: 'hello ' }],
status: 'incomplete',
} as any,
],
})
const completedResponse = makeResponse({
usage: {
input_tokens: 20,
output_tokens: 6,
total_tokens: 26,
input_tokens_details: { cached_tokens: 2 },
output_tokens_details: { reasoning_tokens: 0 },
} as any,
output: [
{
type: 'message',
role: 'assistant',
content: [{ type: 'output_text', text: 'world' }],
status: 'completed',
} as any,
],
})
const { assistantMessages } = await runQuery([
{
events: [
{ type: 'response.created', response: incompleteResponse } as any,
{ type: 'response.incomplete', response: incompleteResponse } as any,
],
finalResponse: incompleteResponse,
},
{
events: [
{ type: 'response.created', response: completedResponse } as any,
{ type: 'response.completed', response: completedResponse } as any,
],
finalResponse: completedResponse,
},
])
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0].message.content).toEqual([
{ type: 'text', text: 'hello world' },
])
expect(assistantMessages[0].message.usage).toMatchObject({
input_tokens: 30,
output_tokens: 10,
cache_read_input_tokens: 3,
})
})
test('falls back to responses.create(stream:true) when helper streaming fails', async () => {
const fallbackResponse = makeResponse({
output: [
{
type: 'message',
role: 'assistant',
content: [{ type: 'output_text', text: 'fallback ok' }],
status: 'completed',
} as any,
],
})
const { assistantMessages } = await runQuery(
[{ error: new Error('helper stream failed') }],
[
{
events: [
{ type: 'response.created', response: fallbackResponse } as any,
{ type: 'response.completed', response: fallbackResponse } as any,
],
},
],
)
expect(lastCreateRequestBody.stream).toBe(true)
expect(assistantMessages).toHaveLength(1)
expect(assistantMessages[0].message.content).toEqual([
{ type: 'text', text: 'fallback ok' },
])
})
})

View File

@@ -0,0 +1,31 @@
import { createHash } from 'crypto'
const MAX_CODEX_CALL_ID_LENGTH = 96
export function normalizeCodexCallId(value: unknown): string | null {
if (typeof value !== 'string') {
return null
}
const sanitized = value
.trim()
.replace(/\s+/g, '_')
.replace(/[^A-Za-z0-9._:-]/g, '_')
.replace(/_+/g, '_')
.slice(0, MAX_CODEX_CALL_ID_LENGTH)
return sanitized.length > 0 ? sanitized : null
}
export function createCodexFallbackCallId(seed: string): string {
const hash = createHash('sha1')
.update(seed.length > 0 ? seed : 'codex-call')
.digest('hex')
.slice(0, 24)
return `call_${hash}`
}
export function resolveCodexCallId(value: unknown, seed: string): string {
return normalizeCodexCallId(value) ?? createCodexFallbackCallId(seed)
}

View File

@@ -0,0 +1,55 @@
import OpenAI from 'openai'
import { openaiAdapter } from 'src/services/providerUsage/adapters/openai.js'
import { updateProviderBuckets } from 'src/services/providerUsage/store.js'
import { getProxyFetchOptions } from 'src/utils/proxy.js'
let cachedClient: OpenAI | null = null
function wrapFetchForUsage(base: typeof fetch): typeof fetch {
const wrapped = async (
...args: Parameters<typeof fetch>
): Promise<Response> => {
const res = await base(...args)
try {
updateProviderBuckets('codex', openaiAdapter.parseHeaders(res.headers))
} catch {
// Usage tracking must not affect the request path.
}
return res
}
return wrapped as unknown as typeof fetch
}
export function getCodexClient(options?: {
maxRetries?: number
fetchOverride?: typeof fetch
}): OpenAI {
if (cachedClient && !options?.fetchOverride) {
return cachedClient
}
const apiKey = process.env.CODEX_API_KEY || ''
const baseURL = process.env.CODEX_BASE_URL
const baseFetch = options?.fetchOverride ?? (globalThis.fetch as typeof fetch)
const wrappedFetch = wrapFetchForUsage(baseFetch)
const client = new OpenAI({
apiKey,
...(baseURL && { baseURL }),
maxRetries: options?.maxRetries ?? 0,
timeout: parseInt(process.env.API_TIMEOUT_MS || String(600 * 1000), 10),
dangerouslyAllowBrowser: true,
fetchOptions: getProxyFetchOptions({ forAnthropicAPI: false }),
fetch: wrappedFetch,
})
if (!options?.fetchOverride) {
cachedClient = client
}
return client
}
export function clearCodexClientCache(): void {
cachedClient = null
}

View File

@@ -0,0 +1,392 @@
import type {
ResponseFunctionToolCallOutputItem,
ResponseInputImage,
ResponseInputItem,
ResponseInputText,
} from 'openai/resources/responses/responses.mjs'
import type { Message } from '../../../types/message.js'
import {
normalizeCodexCallId,
resolveCodexCallId,
} from './callIds.js'
type ContentBlock = {
type: string
text?: string
source?: {
type?: string
data?: string
media_type?: string
url?: string
}
}
type ToolUseLikeBlock = {
type: 'tool_use'
id: string
name: string
input: unknown
}
type ToolResultLikeBlock = {
type: 'tool_result'
tool_use_id: string
content?: string | ReadonlyArray<ContentBlock>
}
export type CodexImageConversionOptions = {
resolveBase64ImageUrl?: (
data: string,
mediaType?: string,
) => Promise<string | null>
}
type CodexCallIdState = {
byOriginalId: Map<string, string>
sequence: number
}
function createInputText(text: string): ResponseInputText {
return {
type: 'input_text',
text,
}
}
function createInputImage(imageUrl: string): ResponseInputImage {
return {
type: 'input_image',
image_url: imageUrl,
detail: 'high',
}
}
function getUnsupportedBlockText(type: string): string | null {
switch (type) {
case 'image':
return '[Image omitted: codex gateway currently requires remote image URLs. Configure CODEX_IMGBB_API_KEY to auto-convert local images.]'
case 'document':
return '[Document omitted: codex gateway does not support document replay.]'
default:
return null
}
}
function getImageUrl(block: ContentBlock): string | null {
const source = block.source
if (!source) {
return null
}
if (source.type === 'url' && typeof source.url === 'string' && source.url.length > 0) {
return source.url
}
return null
}
async function resolveImageUrl(
block: ContentBlock,
options: CodexImageConversionOptions,
): Promise<string | null> {
const directUrl = getImageUrl(block)
if (directUrl) {
return directUrl
}
if (block.source?.type !== 'base64') {
return null
}
if (options.resolveBase64ImageUrl && typeof block.source.data === 'string') {
const uploadedUrl = await options.resolveBase64ImageUrl(
block.source.data,
block.source.media_type,
)
if (uploadedUrl) {
return uploadedUrl
}
}
return null
}
async function convertBlocksToInputContent(
content: ReadonlyArray<ContentBlock>,
options: CodexImageConversionOptions,
): Promise<Array<ResponseInputText | ResponseInputImage>> {
const output: Array<ResponseInputText | ResponseInputImage> = []
for (const block of content) {
if (block.type === 'text' && block.text) {
output.push(createInputText(block.text))
continue
}
if (block.type === 'image') {
const imageUrl = await resolveImageUrl(block, options)
if (imageUrl) {
output.push(createInputImage(imageUrl))
continue
}
}
const fallback = getUnsupportedBlockText(block.type)
if (fallback) {
output.push(createInputText(fallback))
}
}
return output
}
async function convertToolResultOutput(
content: string | ReadonlyArray<ContentBlock> | undefined,
options: CodexImageConversionOptions,
): Promise<ResponseFunctionToolCallOutputItem['output']> {
if (!content) {
return ''
}
if (typeof content === 'string') {
return content
}
const output = await convertBlocksToInputContent(content, options)
if (output.length === 0) {
return ''
}
if (output.length === 1 && output[0].type === 'input_text') {
return output[0].text
}
return output
}
function pushUserMessage(
items: ResponseInputItem[],
textParts: string[],
imageUrls: string[] = [],
): void {
const text = textParts.join('\n').trim()
if (text.length === 0 && imageUrls.length === 0) {
return
}
items.push({
type: 'message',
role: 'user',
content: [
...(text.length > 0 ? [createInputText(text)] : []),
...imageUrls.map(createInputImage),
],
} as unknown as ResponseInputItem)
}
function pushAssistantMessage(
items: ResponseInputItem[],
textParts: string[],
): void {
const text = textParts.join('\n').trim()
if (text.length === 0) {
return
}
items.push({
type: 'message',
role: 'assistant',
content: [
{
type: 'output_text',
text,
annotations: [],
},
],
} as unknown as ResponseInputItem)
}
function stringifyToolInput(input: unknown): string {
if (typeof input === 'string') {
return input
}
try {
return JSON.stringify(input ?? {})
} catch {
return '{}'
}
}
function createCodexCallIdState(): CodexCallIdState {
return {
byOriginalId: new Map(),
sequence: 0,
}
}
function resolveAssistantCallId(
block: ToolUseLikeBlock,
state: CodexCallIdState,
): string {
const originalId = typeof block.id === 'string' ? block.id : ''
const seed = `${block.name}:${stringifyToolInput(block.input)}:${state.sequence}`
const callId = resolveCodexCallId(originalId, seed)
if (originalId.length > 0) {
state.byOriginalId.set(originalId, callId)
}
state.sequence += 1
return callId
}
function resolveToolResultCallId(
toolUseId: unknown,
state: CodexCallIdState,
): string | null {
if (typeof toolUseId !== 'string') {
return null
}
return state.byOriginalId.get(toolUseId) ?? normalizeCodexCallId(toolUseId)
}
async function convertUserContentToInputItems(
items: ResponseInputItem[],
content: ReadonlyArray<string | ContentBlock>,
options: CodexImageConversionOptions,
callIdState: CodexCallIdState,
): Promise<void> {
const textParts: string[] = []
const imageUrls: string[] = []
for (const block of content) {
if (typeof block === 'string') {
textParts.push(block)
continue
}
if (block.type === 'tool_result') {
pushUserMessage(items, textParts, imageUrls)
textParts.length = 0
imageUrls.length = 0
const toolResultBlock = block as ToolResultLikeBlock
const callId = resolveToolResultCallId(
toolResultBlock.tool_use_id,
callIdState,
)
if (!callId) {
continue
}
items.push({
type: 'function_call_output',
call_id: callId,
output: await convertToolResultOutput(toolResultBlock.content, options),
})
continue
}
if (block.type === 'text' && block.text) {
textParts.push(block.text)
continue
}
if (block.type === 'image') {
const imageUrl = await resolveImageUrl(block, options)
if (imageUrl) {
imageUrls.push(imageUrl)
continue
}
}
const fallback = getUnsupportedBlockText(block.type)
if (fallback) {
textParts.push(fallback)
}
}
pushUserMessage(items, textParts, imageUrls)
}
function convertAssistantContentToInputItems(
items: ResponseInputItem[],
content: ReadonlyArray<string | ContentBlock>,
callIdState: CodexCallIdState,
): void {
const textParts: string[] = []
for (const block of content) {
if (typeof block === 'string') {
textParts.push(block)
continue
}
if (block.type === 'tool_use') {
pushAssistantMessage(items, textParts)
textParts.length = 0
const toolUseBlock = block as unknown as ToolUseLikeBlock
items.push({
type: 'function_call',
call_id: resolveAssistantCallId(toolUseBlock, callIdState),
name: toolUseBlock.name,
arguments: stringifyToolInput(toolUseBlock.input),
})
continue
}
if (block.type === 'text' && block.text) {
textParts.push(block.text)
}
}
pushAssistantMessage(items, textParts)
}
export async function anthropicMessagesToCodexInput(
messages: Message[],
options: CodexImageConversionOptions = {},
): Promise<ResponseInputItem[]> {
const items: ResponseInputItem[] = []
const callIdState = createCodexCallIdState()
for (const message of messages) {
if (message.type !== 'user' && message.type !== 'assistant') {
continue
}
const apiMessage = message.message
if (!apiMessage?.content) {
continue
}
if (typeof apiMessage.content === 'string') {
if (message.type === 'user') {
pushUserMessage(items, [apiMessage.content])
} else {
pushAssistantMessage(items, [apiMessage.content])
}
continue
}
if (message.type === 'user') {
await convertUserContentToInputItems(
items,
apiMessage.content as ReadonlyArray<string | ContentBlock>,
options,
callIdState,
)
} else {
convertAssistantContentToInputItems(
items,
apiMessage.content as ReadonlyArray<string | ContentBlock>,
callIdState,
)
}
}
return items
}

View File

@@ -0,0 +1,39 @@
import type { BetaToolUnion } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
import type { Tool as CodexTool } from 'openai/resources/responses/responses.mjs'
function isClientFunctionTool(
tool: BetaToolUnion,
): tool is BetaToolUnion & {
name: string
description?: string
input_schema?: { [key: string]: unknown }
strict?: boolean
defer_loading?: boolean
} {
const value = tool as unknown as Record<string, unknown>
return typeof value.name === 'string'
}
export function anthropicToolsToCodex(
tools: BetaToolUnion[],
): CodexTool[] {
return tools.flatMap(tool => {
const value = tool as unknown as Record<string, unknown>
if (
value.type === 'advisor_20260301' ||
value.type === 'computer_20250124' ||
!isClientFunctionTool(tool)
) {
return []
}
return [{
type: 'function',
name: tool.name,
description: tool.description,
parameters: tool.input_schema ?? {},
strict: tool.strict ?? null,
...(tool.defer_loading && { defer_loading: true }),
}]
})
}

View File

@@ -0,0 +1,114 @@
import type { SDKAssistantMessageError } from '../../../entrypoints/agentSdkTypes.js'
type CodexErrorLike = {
status?: unknown
message?: unknown
error?: {
message?: unknown
}
}
export type NormalizedCodexError = {
content: string
error: SDKAssistantMessageError
}
function readErrorStatus(error: unknown): number | null {
if (
typeof error === 'object' &&
error !== null &&
typeof (error as CodexErrorLike).status === 'number'
) {
return (error as CodexErrorLike).status as number
}
return null
}
function readErrorMessage(error: unknown): string {
if (error instanceof Error && error.message.length > 0) {
return error.message
}
if (typeof error === 'object' && error !== null) {
const value = error as CodexErrorLike
if (typeof value.message === 'string' && value.message.length > 0) {
return value.message
}
if (
typeof value.error?.message === 'string' &&
value.error.message.length > 0
) {
return value.error.message
}
}
return String(error)
}
export function getCodexConfigurationError(): NormalizedCodexError | null {
if (!process.env.CODEX_API_KEY) {
return {
content:
'Missing CODEX_API_KEY. Configure it in settings or your environment before using the codex provider.',
error: 'authentication_failed',
}
}
return null
}
export function normalizeCodexError(error: unknown): NormalizedCodexError {
const status = readErrorStatus(error)
const message = readErrorMessage(error)
if (/^Codex preflight:/i.test(message)) {
return {
content: message,
error: 'invalid_request',
}
}
if (status === 401 || status === 403) {
return {
content: `Codex authentication failed (${status}). Verify CODEX_API_KEY and CODEX_BASE_URL.`,
error: 'authentication_failed',
}
}
if (status === 404) {
return {
content:
'Codex endpoint not found (404). Verify CODEX_BASE_URL points to a Responses API root.',
error: 'invalid_request',
}
}
if (status === 429) {
return {
content:
'Codex rate limit reached (429). Retry shortly or reduce request volume.',
error: 'rate_limit',
}
}
if (status === 502 && /upstream request failed/i.test(message)) {
return {
content:
'Codex gateway returned 502 Upstream request failed. This usually means a transient gateway issue or incomplete Responses API compatibility during tool replay.',
error: 'server_error',
}
}
if (status !== null && status >= 500) {
return {
content: `Codex server error (${status}): ${message}`,
error: 'server_error',
}
}
return {
content: `API Error: ${message}`,
error: 'unknown',
}
}

View File

@@ -0,0 +1,132 @@
import { createHash } from 'crypto'
import { logForDebugging } from '../../../utils/debug.js'
const resolvedImageUrls = new Map<string, string>()
const DEFAULT_TIMEOUT_MS = 30_000
const IMGBB_UPLOAD_URL = 'https://api.imgbb.com/1/upload'
type ImgbbVariant = {
url?: unknown
}
type ImgbbPayload = {
data?: {
url?: unknown
display_url?: unknown
image?: ImgbbVariant
medium?: ImgbbVariant
thumb?: ImgbbVariant
}
}
function getUploadTimeoutMs(): number {
const raw =
process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS ??
process.env.CODEX_IMAGE_URL_TIMEOUT_MS
if (!raw) {
return DEFAULT_TIMEOUT_MS
}
const parsed = Number.parseInt(raw, 10)
return Number.isFinite(parsed) && parsed > 0 ? parsed : DEFAULT_TIMEOUT_MS
}
function getCacheKey(prefix: string, value: string): string {
return `${prefix}:${createHash('sha256').update(value).digest('hex')}`
}
function getImgbbApiKey(): string | null {
const apiKey = process.env.CODEX_IMGBB_API_KEY?.trim()
return apiKey && apiKey.length > 0 ? apiKey : null
}
function pickImgbbImageUrl(payload: ImgbbPayload): string | null {
const candidates = [
payload.data?.medium?.url,
payload.data?.thumb?.url,
payload.data?.image?.url,
payload.data?.url,
payload.data?.display_url,
]
for (const candidate of candidates) {
if (typeof candidate === 'string' && candidate.length > 0) {
return candidate
}
}
return null
}
async function withTimeout<T>(
run: (signal: AbortSignal) => Promise<T>,
): Promise<T> {
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), getUploadTimeoutMs())
try {
return await run(controller.signal)
} finally {
clearTimeout(timeout)
}
}
async function uploadToImgbb(
base64Image: string,
): Promise<string | null> {
const apiKey = getImgbbApiKey()
if (!apiKey) {
return null
}
try {
const url = await withTimeout(async signal => {
const body = new FormData()
body.append('image', base64Image)
const response = await fetch(`${IMGBB_UPLOAD_URL}?key=${encodeURIComponent(apiKey)}`, {
method: 'POST',
body,
signal,
})
if (!response.ok) {
logForDebugging(
`[Codex] ImgBB upload failed: ${response.status} ${response.statusText}`,
)
return null
}
return pickImgbbImageUrl((await response.json()) as ImgbbPayload)
})
if (!url) {
logForDebugging('[Codex] ImgBB upload produced no usable URL.')
return null
}
return url
} catch (error) {
logForDebugging(`[Codex] Failed to upload image to ImgBB: ${error}`)
return null
}
}
export async function uploadCodexBase64Image(
data: string,
mediaType: string = 'image/png',
): Promise<string | null> {
const cacheKey = getCacheKey('base64', `${mediaType}:${data}`)
const cached = resolvedImageUrls.get(cacheKey)
if (cached) {
return cached
}
const url = await uploadToImgbb(data)
if (!url) {
return null
}
resolvedImageUrls.set(cacheKey, url)
return url
}

View File

@@ -0,0 +1,299 @@
import type { BetaToolUnion } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
import type {
Response,
ResponseCreateParamsNonStreaming,
} from 'openai/resources/responses/responses.mjs'
import { appendFileSync } from 'fs'
import type { SystemPrompt } from '../../../utils/systemPromptType.js'
import type {
AssistantMessage,
Message,
StreamEvent,
SystemAPIErrorMessage,
} from '../../../types/message.js'
import type { Tools } from '../../../Tool.js'
import type { SDKAssistantMessageError } from '../../../entrypoints/agentSdkTypes.js'
import { toolToAPISchema } from '../../../utils/api.js'
import {
createAssistantAPIErrorMessage,
normalizeMessagesForAPI,
} from '../../../utils/messages.js'
import { logForDebugging } from '../../../utils/debug.js'
import { getModelMaxOutputTokens } from '../../../utils/context.js'
import type { Options } from '../claude.js'
import { recordLLMObservation } from '../../../services/langfuse/tracing.js'
import {
convertMessagesToLangfuse,
convertOutputToLangfuse,
convertToolsToLangfuse,
} from '../../../services/langfuse/convert.js'
import { anthropicMessagesToCodexInput } from './convertMessages.js'
import { anthropicToolsToCodex } from './convertTools.js'
import { getCodexClient } from './client.js'
import { uploadCodexBase64Image } from './imageUpload.js'
import {
getCodexConfigurationError,
normalizeCodexError,
} from './errors.js'
import { resolveCodexMaxTokens, resolveCodexModel } from './model.js'
import { sanitizeCodexRequest } from './preflight.js'
import {
addCodexUsage,
type CodexStreamResult,
type CodexUsage,
rawAssistantBlocksToAssistantMessage,
type RawAssistantBlock,
streamCodexAttempt,
} from './streaming.js'
const MAX_CODEX_CONTINUATIONS = 3
function dumpCodexPayload(
body: ResponseCreateParamsNonStreaming,
): void {
const path = process.env.CODEX_DEBUG_PAYLOADS
if (!path) {
return
}
appendFileSync(
path,
`${JSON.stringify({ timestamp: new Date().toISOString(), body }, null, 2)}\n`,
)
}
function appendRawAssistantBlocks(
target: RawAssistantBlock[],
source: RawAssistantBlock[],
): void {
for (const block of source) {
const lastBlock = target.at(-1)
if (lastBlock?.type === 'text' && block.type === 'text') {
lastBlock.text += block.text
continue
}
if (
lastBlock?.type === 'tool_use' &&
block.type === 'tool_use' &&
lastBlock.id === block.id &&
lastBlock.name === block.name &&
block.input.startsWith(lastBlock.input)
) {
lastBlock.input = block.input
continue
}
target.push({ ...block })
}
}
export async function* queryModelCodex(
messages: Message[],
systemPrompt: SystemPrompt,
tools: Tools,
signal: AbortSignal,
options: Options,
): AsyncGenerator<
StreamEvent | AssistantMessage | SystemAPIErrorMessage,
void
> {
try {
const configurationError = getCodexConfigurationError()
if (configurationError) {
yield createAssistantAPIErrorMessage({
content: configurationError.content,
apiError: 'api_error',
error: configurationError.error,
})
return
}
const model = resolveCodexModel(options.model)
const messagesForAPI = normalizeMessagesForAPI(messages, tools)
const toolSchemas = await Promise.all(
tools.map(tool =>
toolToAPISchema(tool, {
getToolPermissionContext: options.getToolPermissionContext,
tools,
agents: options.agents,
allowedAgentTypes: options.allowedAgentTypes,
model: options.model,
}),
),
)
const codexTools = anthropicToolsToCodex(toolSchemas as BetaToolUnion[])
const { upperLimit } = getModelMaxOutputTokens(model)
const maxTokens = resolveCodexMaxTokens(
upperLimit,
options.maxOutputTokensOverride,
)
const client = getCodexClient({
maxRetries: 0,
fetchOverride: options.fetchOverride as typeof fetch | undefined,
})
const start = Date.now()
const collectedMessages: AssistantMessage[] = []
let totalUsage: CodexUsage = {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
}
const aggregateBlocks: RawAssistantBlock[] = []
let replayMessages = messagesForAPI
let partialMessage: AssistantMessage['message'] | undefined
let finalResponse: Response | undefined
let terminalIncompleteResponse: Response | undefined
for (
let attempt = 0;
attempt <= MAX_CODEX_CONTINUATIONS;
attempt += 1
) {
const input = await anthropicMessagesToCodexInput(replayMessages, {
resolveBase64ImageUrl: uploadCodexBase64Image,
})
const requestBody = sanitizeCodexRequest({
model,
input,
store: false,
parallel_tool_calls: false,
max_output_tokens: maxTokens,
...(systemPrompt.length > 0 && {
instructions: systemPrompt.join('\n\n'),
}),
...(codexTools.length > 0 && {
tools: codexTools,
}),
...(options.temperatureOverride !== undefined && {
temperature: options.temperatureOverride,
}),
} satisfies ResponseCreateParamsNonStreaming)
if (attempt === 0) {
logForDebugging(
`[Codex] Calling model=${model}, inputItems=${input.length}, tools=${codexTools.length}`,
)
dumpCodexPayload(requestBody)
} else {
logForDebugging(
`[Codex] Continuing incomplete response attempt ${attempt}/${MAX_CODEX_CONTINUATIONS}`,
)
}
const attemptStream = streamCodexAttempt({
client,
requestBody,
signal,
start,
emitPrimaryEvents: attempt === 0,
})
let attemptResult: CodexStreamResult | undefined
while (true) {
const next = await attemptStream.next()
if (next.done) {
attemptResult = next.value
break
}
yield next.value
}
if (!attemptResult?.response) {
continue
}
partialMessage = partialMessage ?? attemptResult.partialMessage
finalResponse = attemptResult.response
terminalIncompleteResponse = attemptResult.incompleteResponse
totalUsage = addCodexUsage(totalUsage, attemptResult.response)
if (attemptResult.assistantBlocks.length === 0) {
break
}
appendRawAssistantBlocks(aggregateBlocks, attemptResult.assistantBlocks)
const shouldContinue =
attemptResult.incompleteResponse !== undefined &&
attempt < MAX_CODEX_CONTINUATIONS
if (!shouldContinue) {
break
}
const continuationMessage = rawAssistantBlocksToAssistantMessage(
attemptResult.assistantBlocks,
attemptResult.response,
tools,
options.agentId,
)
replayMessages = [...replayMessages, continuationMessage]
}
if (finalResponse) {
if (aggregateBlocks.length === 0) {
yield createAssistantAPIErrorMessage({
content: 'Codex returned an empty streamed response.',
apiError: 'api_error',
error: 'unknown',
})
return
}
const assistantMessage = rawAssistantBlocksToAssistantMessage(
aggregateBlocks,
finalResponse,
tools,
options.agentId,
)
assistantMessage.message.usage = totalUsage as any
collectedMessages.push(assistantMessage)
yield assistantMessage
recordLLMObservation(options.langfuseTrace ?? null, {
model,
provider: 'codex',
input: convertMessagesToLangfuse(messagesForAPI, systemPrompt),
output: convertOutputToLangfuse(collectedMessages),
usage: totalUsage,
startTime: new Date(start),
endTime: new Date(),
completionStartTime:
partialMessage !== undefined ? new Date(start) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]),
})
} else {
yield createAssistantAPIErrorMessage({
content: 'Codex returned an empty streamed response.',
apiError: 'api_error',
error: 'unknown',
})
return
}
if (
terminalIncompleteResponse?.incomplete_details?.reason ===
'max_output_tokens'
) {
yield createAssistantAPIErrorMessage({
content: `Output truncated: response exceeded the ${maxTokens} token limit. Set CODEX_MAX_TOKENS or CLAUDE_CODE_MAX_OUTPUT_TOKENS to override.`,
apiError: 'max_output_tokens',
error: 'max_output_tokens' as unknown as SDKAssistantMessageError,
})
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
const normalizedError = normalizeCodexError(error)
logForDebugging(`[Codex] Error: ${errorMessage}`, { level: 'error' })
yield createAssistantAPIErrorMessage({
content: normalizedError.content,
apiError: 'api_error',
error: normalizedError.error,
})
}
}

View File

@@ -0,0 +1,39 @@
function getModelFamily(model: string): 'haiku' | 'sonnet' | 'opus' | null {
if (/haiku/i.test(model)) return 'haiku'
if (/opus/i.test(model)) return 'opus'
if (/sonnet/i.test(model)) return 'sonnet'
return null
}
export function resolveCodexModel(model: string): string {
if (process.env.CODEX_MODEL) {
return process.env.CODEX_MODEL
}
const cleanModel = model.replace(/\[1m\]$/, '')
const family = getModelFamily(cleanModel)
if (family) {
const familyOverride = process.env[`CODEX_DEFAULT_${family.toUpperCase()}_MODEL`]
if (familyOverride) {
return familyOverride
}
}
return cleanModel
}
export function resolveCodexMaxTokens(
upperLimit: number,
maxOutputTokensOverride?: number,
): number {
return (
maxOutputTokensOverride ??
(process.env.CODEX_MAX_TOKENS
? parseInt(process.env.CODEX_MAX_TOKENS, 10) || undefined
: undefined) ??
(process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS
? parseInt(process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS, 10) || undefined
: undefined) ??
upperLimit
)
}

View File

@@ -0,0 +1,151 @@
import type {
ResponseCreateParamsNonStreaming,
ResponseCreateParamsStreaming,
ResponseInputItem,
Tool,
} from 'openai/resources/responses/responses.mjs'
import { normalizeCodexCallId } from './callIds.js'
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value)
}
function assertString(value: unknown, label: string): string {
if (typeof value !== 'string') {
throw new Error(`Codex preflight: ${label} must be a string.`)
}
return value
}
function sanitizeMessageItem(item: Record<string, unknown>): ResponseInputItem {
const role = assertString(item.role, 'message.role')
const content = item.content
if ((role !== 'user' && role !== 'assistant') || !Array.isArray(content)) {
throw new Error('Codex preflight: message items require role and content array.')
}
return item as unknown as ResponseInputItem
}
function sanitizeFunctionCallItem(item: Record<string, unknown>): ResponseInputItem {
const callId = normalizeCodexCallId(item.call_id)
const name = assertString(item.name, 'function_call.name').trim()
const argumentsValue = item.arguments
if (!callId) {
throw new Error('Codex preflight: function_call.call_id is required.')
}
if (name.length === 0) {
throw new Error('Codex preflight: function_call.name is required.')
}
if (typeof argumentsValue !== 'string') {
throw new Error('Codex preflight: function_call.arguments must be a string.')
}
return {
...item,
call_id: callId,
name,
arguments: argumentsValue,
} as ResponseInputItem
}
function sanitizeFunctionCallOutputItem(
item: Record<string, unknown>,
): ResponseInputItem {
const callId = normalizeCodexCallId(item.call_id)
const output = item.output
if (!callId) {
throw new Error('Codex preflight: function_call_output.call_id is required.')
}
if (
typeof output !== 'string' &&
!(Array.isArray(output) && output.every(part => isRecord(part)))
) {
throw new Error(
'Codex preflight: function_call_output.output must be a string or content array.',
)
}
return {
...item,
call_id: callId,
} as ResponseInputItem
}
function sanitizeInputItem(item: unknown): ResponseInputItem {
if (!isRecord(item) || typeof item.type !== 'string') {
throw new Error('Codex preflight: each input item requires a type.')
}
switch (item.type) {
case 'message':
return sanitizeMessageItem(item)
case 'function_call':
return sanitizeFunctionCallItem(item)
case 'function_call_output':
return sanitizeFunctionCallOutputItem(item)
default:
throw new Error(`Codex preflight: unsupported input item type "${item.type}".`)
}
}
function sanitizeTool(tool: unknown): Tool {
if (!isRecord(tool) || tool.type !== 'function') {
throw new Error('Codex preflight: only function tools are supported.')
}
const name = assertString(tool.name, 'tool.name').trim()
const parameters = isRecord(tool.parameters) ? tool.parameters : {}
if (name.length === 0) {
throw new Error('Codex preflight: tool.name is required.')
}
return {
...tool,
type: 'function',
name,
parameters,
} as Tool
}
export function sanitizeCodexRequest(
request: ResponseCreateParamsNonStreaming,
): ResponseCreateParamsNonStreaming {
if (typeof request.model !== 'string' || request.model.trim().length === 0) {
throw new Error('Codex preflight: model is required.')
}
if (
request.instructions !== undefined &&
request.instructions !== null &&
typeof request.instructions !== 'string'
) {
throw new Error('Codex preflight: instructions must be a string.')
}
if (!Array.isArray(request.input)) {
throw new Error('Codex preflight: input must be an array.')
}
return {
...request,
model: request.model.trim(),
instructions: request.instructions?.trim() || undefined,
input: request.input.map(sanitizeInputItem),
tools: request.tools?.map(sanitizeTool),
}
}
export function toStreamingCodexRequest(
request: ResponseCreateParamsNonStreaming,
): ResponseCreateParamsStreaming {
return {
...request,
stream: true,
}
}

View File

@@ -0,0 +1,681 @@
import { randomUUID } from 'crypto'
import type {
Response,
ResponseCreateParamsNonStreaming,
ResponseFunctionToolCall,
ResponseOutputItem,
ResponseOutputMessage,
ResponseStreamEvent,
} from 'openai/resources/responses/responses.mjs'
import type { AssistantMessage, StreamEvent } from '../../../types/message.js'
import type { Tools } from '../../../Tool.js'
import {
createAssistantMessage,
normalizeContentFromAPI,
} from '../../../utils/messages.js'
import { getCodexClient } from './client.js'
import { resolveCodexCallId } from './callIds.js'
import { toStreamingCodexRequest } from './preflight.js'
export type RawAssistantBlock =
| { type: 'text'; text: string }
| { type: 'tool_use'; id: string; name: string; input: string }
export type CodexUsage = {
input_tokens: number
output_tokens: number
cache_creation_input_tokens: number
cache_read_input_tokens: number
}
export type CodexStreamResult = {
response?: Response
incompleteResponse?: Response
partialMessage?: AssistantMessage['message']
assistantBlocks: RawAssistantBlock[]
}
type CodexStreamState = {
contentBlocks: Record<number, RawAssistantBlock>
completedBlocks: Array<RawAssistantBlock | undefined>
partialMessage?: AssistantMessage['message']
finalResponse?: Response
incompleteResponse?: Response
failedResponse?: Response
}
export function getCodexUsage(
response: Pick<Response, 'usage'> | null | undefined,
): CodexUsage {
return {
input_tokens: response?.usage?.input_tokens ?? 0,
output_tokens: response?.usage?.output_tokens ?? 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens:
response?.usage?.input_tokens_details.cached_tokens ?? 0,
}
}
export function addCodexUsage(
total: CodexUsage,
response: Pick<Response, 'usage'> | null | undefined,
): CodexUsage {
const usage = getCodexUsage(response)
return {
input_tokens: total.input_tokens + usage.input_tokens,
output_tokens: total.output_tokens + usage.output_tokens,
cache_creation_input_tokens:
total.cache_creation_input_tokens + usage.cache_creation_input_tokens,
cache_read_input_tokens:
total.cache_read_input_tokens + usage.cache_read_input_tokens,
}
}
function createPartialAssistantMessage(
response: Response,
): AssistantMessage['message'] {
return {
id: response.id,
type: 'message',
role: 'assistant',
content: [],
model: response.model,
stop_reason: null,
stop_sequence: null,
usage: getCodexUsage(response) as any,
} as AssistantMessage['message']
}
function createToolUseBlock(
item: Partial<ResponseFunctionToolCall> & { id?: string },
): RawAssistantBlock {
return {
type: 'tool_use',
id: resolveCodexCallId(
item.call_id ?? item.id,
`tool:${item.name ?? ''}:${item.arguments ?? ''}:${item.id ?? ''}`,
),
name: item.name ?? '',
input: item.arguments ?? '',
}
}
function getCompletedTextFromItem(item: ResponseOutputItem): string | null {
if (item.type !== 'message' || item.role !== 'assistant') {
return null
}
for (const content of (item as ResponseOutputMessage).content) {
if (content.type === 'output_text' && content.text.length > 0) {
return content.text
}
if (content.type === 'refusal' && content.refusal.length > 0) {
return content.refusal
}
}
return null
}
function getCompletedAssistantBlocks(
blocks: Array<RawAssistantBlock | undefined>,
): RawAssistantBlock[] {
return blocks.filter(
(block): block is RawAssistantBlock => block !== undefined,
)
}
function getCodexStopReason(
response: Pick<Response, 'incomplete_details'>,
blocks: RawAssistantBlock[],
): string {
if (response.incomplete_details?.reason === 'max_output_tokens') {
return 'max_tokens'
}
return blocks.some(block => block.type === 'tool_use') ? 'tool_use' : 'end_turn'
}
function emitTrailingTextDelta(
output: StreamEvent[],
index: number,
currentText: string,
finalText: string,
): void {
if (!finalText.startsWith(currentText)) {
return
}
const delta = finalText.slice(currentText.length)
if (delta.length === 0) {
return
}
output.push({
type: 'stream_event',
event: {
type: 'content_block_delta',
index,
delta: {
type: 'text_delta',
text: delta,
},
} as any,
} as StreamEvent)
}
function emitTrailingToolDelta(
output: StreamEvent[],
index: number,
currentInput: string,
finalInput: string,
): void {
if (!finalInput.startsWith(currentInput)) {
return
}
const delta = finalInput.slice(currentInput.length)
if (delta.length === 0) {
return
}
output.push({
type: 'stream_event',
event: {
type: 'content_block_delta',
index,
delta: {
type: 'input_json_delta',
partial_json: delta,
},
} as any,
} as StreamEvent)
}
function responseToRawAssistantBlocks(response: Response): RawAssistantBlock[] {
const blocks: RawAssistantBlock[] = []
for (const item of response.output) {
if (item.type === 'function_call') {
const functionCall = item as ResponseFunctionToolCall
blocks.push({
type: 'tool_use',
id: resolveCodexCallId(
functionCall.call_id,
`output:${functionCall.name}:${functionCall.arguments}`,
),
name: functionCall.name,
input: functionCall.arguments,
})
continue
}
if (item.type !== 'message' || item.role !== 'assistant') {
continue
}
for (const content of (item as ResponseOutputMessage).content) {
if (content.type === 'output_text' && content.text.length > 0) {
blocks.push({
type: 'text',
text: content.text,
})
} else if (content.type === 'refusal' && content.refusal.length > 0) {
blocks.push({
type: 'text',
text: content.refusal,
})
}
}
}
if (
blocks.length === 0 &&
typeof response.output_text === 'string' &&
response.output_text.length > 0
) {
blocks.push({
type: 'text',
text: response.output_text,
})
}
return blocks
}
export function rawAssistantBlocksToAssistantMessage(
rawBlocks: RawAssistantBlock[],
response: Pick<Response, 'id' | 'model' | 'usage' | 'incomplete_details'>,
tools: Tools,
agentId?: string,
): AssistantMessage {
const content = normalizeContentFromAPI(
rawBlocks as any,
tools,
agentId as any,
)
const assistantMessage = createAssistantMessage({
content: content as any,
usage: {
input_tokens: response.usage?.input_tokens ?? 0,
output_tokens: response.usage?.output_tokens ?? 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens:
response.usage?.input_tokens_details.cached_tokens ?? 0,
} as any,
})
assistantMessage.message.id = response.id
assistantMessage.message.model = response.model
assistantMessage.message.stop_reason = getCodexStopReason(response, rawBlocks) as any
assistantMessage.message.stop_sequence = null
assistantMessage.uuid = randomUUID()
assistantMessage.timestamp = new Date().toISOString()
return assistantMessage
}
function handleCodexStreamEvent(params: {
event: ResponseStreamEvent
partialMessage: AssistantMessage['message'] | undefined
contentBlocks: Record<number, RawAssistantBlock>
completedBlocks: Array<RawAssistantBlock | undefined>
start: number
}): {
output: StreamEvent[]
partialMessage: AssistantMessage['message'] | undefined
finalResponse?: Response
failedResponse?: Response
incompleteResponse?: Response
} {
const { event, start } = params
const output: StreamEvent[] = []
const contentBlocks = params.contentBlocks
const completedBlocks = params.completedBlocks
let partialMessage = params.partialMessage
let finalResponse: Response | undefined
let failedResponse: Response | undefined
let incompleteResponse: Response | undefined
const ensureMessageStart = (response: Response): void => {
if (partialMessage) {
return
}
partialMessage = createPartialAssistantMessage(response)
output.push({
type: 'stream_event',
event: {
type: 'message_start',
message: partialMessage,
} as any,
ttftMs: Date.now() - start,
} as StreamEvent)
}
const ensureTextBlock = (index: number): RawAssistantBlock => {
const existing = contentBlocks[index]
if (existing) {
return existing
}
const block: RawAssistantBlock = { type: 'text', text: '' }
contentBlocks[index] = block
output.push({
type: 'stream_event',
event: {
type: 'content_block_start',
index,
content_block: { type: 'text', text: '' },
} as any,
} as StreamEvent)
return block
}
const ensureToolUseBlock = (
index: number,
item?: Partial<ResponseFunctionToolCall> & { id?: string },
): RawAssistantBlock => {
const existing = contentBlocks[index]
if (existing) {
return existing
}
const block = createToolUseBlock(item ?? {})
contentBlocks[index] = block
const toolBlock = block as Extract<RawAssistantBlock, { type: 'tool_use' }>
output.push({
type: 'stream_event',
event: {
type: 'content_block_start',
index,
content_block: {
type: 'tool_use',
id: toolBlock.id,
name: toolBlock.name,
input: '',
},
} as any,
} as StreamEvent)
return block
}
const emitCompletedBlock = (index: number): void => {
const block = contentBlocks[index]
if (!block) {
return
}
completedBlocks[index] = { ...block }
output.push({
type: 'stream_event',
event: {
type: 'content_block_stop',
index,
} as any,
} as StreamEvent)
delete contentBlocks[index]
}
switch (event.type) {
case 'response.created':
case 'response.in_progress':
ensureMessageStart(event.response)
break
case 'response.output_item.added':
if (event.item.type === 'function_call') {
ensureToolUseBlock(event.output_index, event.item)
} else if (event.item.type === 'message' && event.item.role === 'assistant') {
ensureTextBlock(event.output_index)
}
break
case 'response.output_text.delta':
case 'response.refusal.delta': {
const block = ensureTextBlock(event.output_index)
if (block.type === 'text') {
block.text += event.delta
}
output.push({
type: 'stream_event',
event: {
type: 'content_block_delta',
index: event.output_index,
delta: {
type: 'text_delta',
text: event.delta,
},
} as any,
} as StreamEvent)
break
}
case 'response.function_call_arguments.delta': {
const block = ensureToolUseBlock(event.output_index, { id: event.item_id })
if (block.type === 'tool_use') {
block.input += event.delta
}
output.push({
type: 'stream_event',
event: {
type: 'content_block_delta',
index: event.output_index,
delta: {
type: 'input_json_delta',
partial_json: event.delta,
},
} as any,
} as StreamEvent)
break
}
case 'response.output_text.done':
case 'response.refusal.done': {
const block = ensureTextBlock(event.output_index)
const finalText = event.type === 'response.output_text.done'
? event.text
: event.refusal
if (block.type === 'text') {
emitTrailingTextDelta(output, event.output_index, block.text, finalText)
block.text = finalText
}
emitCompletedBlock(event.output_index)
break
}
case 'response.function_call_arguments.done': {
const block = ensureToolUseBlock(event.output_index, {
id: event.item_id,
name: event.name,
})
if (block.type === 'tool_use') {
if (event.name) {
block.name = event.name
}
emitTrailingToolDelta(output, event.output_index, block.input, event.arguments)
block.input = event.arguments
}
emitCompletedBlock(event.output_index)
break
}
case 'response.output_item.done':
if (
event.item.type === 'message' &&
event.item.role === 'assistant' &&
contentBlocks[event.output_index]
) {
const finalText = getCompletedTextFromItem(event.item)
if (finalText !== null) {
const block = contentBlocks[event.output_index]
if (block.type === 'text') {
emitTrailingTextDelta(output, event.output_index, block.text, finalText)
block.text = finalText
}
}
emitCompletedBlock(event.output_index)
} else if (
event.item.type === 'function_call' &&
contentBlocks[event.output_index]
) {
const block = contentBlocks[event.output_index]
if (block.type === 'tool_use') {
block.id = resolveCodexCallId(
event.item.call_id,
`done:${event.item.name}:${event.item.arguments}:${event.item.id}`,
)
block.name = event.item.name
emitTrailingToolDelta(
output,
event.output_index,
block.input,
event.item.arguments,
)
block.input = event.item.arguments
}
emitCompletedBlock(event.output_index)
}
break
case 'response.completed':
case 'response.incomplete': {
ensureMessageStart(event.response)
if (event.type === 'response.completed') {
finalResponse = event.response
} else {
incompleteResponse = event.response
}
const assistantBlocks = getCompletedAssistantBlocks(completedBlocks)
output.push({
type: 'stream_event',
event: {
type: 'message_delta',
delta: {
stop_reason: getCodexStopReason(event.response, assistantBlocks),
stop_sequence: null,
},
usage: getCodexUsage(event.response),
} as any,
} as StreamEvent)
output.push({
type: 'stream_event',
event: {
type: 'message_stop',
} as any,
} as StreamEvent)
break
}
case 'response.failed':
failedResponse = event.response
break
case 'error':
throw new Error(event.message)
}
return {
output,
partialMessage,
finalResponse,
failedResponse,
incompleteResponse,
}
}
function selectResponse(
state: CodexStreamState,
streamedResponse?: Response,
): CodexStreamResult {
const response =
[streamedResponse, state.finalResponse, state.incompleteResponse, state.failedResponse]
.find(
candidate =>
candidate !== undefined &&
responseToRawAssistantBlocks(candidate).length > 0,
) ??
streamedResponse ??
state.finalResponse ??
state.incompleteResponse ??
state.failedResponse
return {
response,
incompleteResponse: state.incompleteResponse,
partialMessage: state.partialMessage,
assistantBlocks:
response !== undefined && responseToRawAssistantBlocks(response).length > 0
? responseToRawAssistantBlocks(response)
: getCompletedAssistantBlocks(state.completedBlocks),
}
}
async function consumeCodexStream(
events: AsyncIterable<ResponseStreamEvent>,
start: number,
): Promise<CodexStreamState> {
const state: CodexStreamState = {
contentBlocks: {},
completedBlocks: [],
}
for await (const event of events) {
const handled = handleCodexStreamEvent({
event,
partialMessage: state.partialMessage,
contentBlocks: state.contentBlocks,
completedBlocks: state.completedBlocks,
start,
})
state.partialMessage = handled.partialMessage
state.finalResponse = handled.finalResponse ?? state.finalResponse
state.incompleteResponse =
handled.incompleteResponse ?? state.incompleteResponse
state.failedResponse = handled.failedResponse ?? state.failedResponse
}
return state
}
export async function* streamCodexAttempt(params: {
client: ReturnType<typeof getCodexClient>
requestBody: ResponseCreateParamsNonStreaming
signal: AbortSignal
start: number
emitPrimaryEvents?: boolean
}): AsyncGenerator<StreamEvent, CodexStreamResult, void> {
let primaryError: unknown
let primaryResult: CodexStreamResult | undefined
try {
const stream = params.client.responses.stream(
params.requestBody as unknown as Parameters<
typeof params.client.responses.stream
>[0],
{ signal: params.signal },
)
const state: CodexStreamState = {
contentBlocks: {},
completedBlocks: [],
}
for await (const event of stream) {
const handled = handleCodexStreamEvent({
event,
partialMessage: state.partialMessage,
contentBlocks: state.contentBlocks,
completedBlocks: state.completedBlocks,
start: params.start,
})
state.partialMessage = handled.partialMessage
state.finalResponse = handled.finalResponse ?? state.finalResponse
state.incompleteResponse =
handled.incompleteResponse ?? state.incompleteResponse
state.failedResponse = handled.failedResponse ?? state.failedResponse
if (params.emitPrimaryEvents !== false) {
yield* handled.output
}
}
let streamedResponse: Response | undefined
try {
streamedResponse = await stream.finalResponse()
} catch {
streamedResponse = undefined
}
primaryResult = selectResponse(state, streamedResponse)
if (primaryResult.assistantBlocks.length > 0 || primaryResult.response) {
return primaryResult
}
} catch (error) {
primaryError = error
}
try {
const fallbackStream = await params.client.responses.create(
toStreamingCodexRequest(params.requestBody),
{ signal: params.signal },
)
const fallbackState = await consumeCodexStream(
fallbackStream as AsyncIterable<ResponseStreamEvent>,
params.start,
)
const fallbackResult = selectResponse(fallbackState)
if (fallbackResult.assistantBlocks.length > 0 || fallbackResult.response) {
return fallbackResult
}
} catch (fallbackError) {
if (primaryError) {
throw primaryError
}
throw fallbackError
}
if (primaryError) {
throw primaryError
}
return primaryResult ?? {
assistantBlocks: [],
}
}

View File

@@ -57,6 +57,7 @@ const PROVIDER_GENERATION_NAMES: Record<string, string> = {
vertex: 'ChatVertexAnthropic',
foundry: 'ChatFoundry',
openai: 'ChatOpenAI',
codex: 'ChatOpenAIResponses',
gemini: 'ChatGoogleGenerativeAI',
grok: 'ChatXAI',
}