diff --git a/packages/@ant/model-provider/src/index.ts b/packages/@ant/model-provider/src/index.ts index a4acf428c..6f0ccdbaf 100644 --- a/packages/@ant/model-provider/src/index.ts +++ b/packages/@ant/model-provider/src/index.ts @@ -61,3 +61,10 @@ export { anthropicMessagesToOpenAI } from './shared/openaiConvertMessages.js' export type { ConvertMessagesOptions } from './shared/openaiConvertMessages.js' export { anthropicToolsToOpenAI, anthropicToolChoiceToOpenAI } from './shared/openaiConvertTools.js' export { adaptOpenAIStreamToAnthropic } from './shared/openaiStreamAdapter.js' + +// Codex provider utilities +export { normalizeCodexCallId, resolveCodexCallId, createCodexFallbackCallId } from './providers/codex/callIds.js' +export { resolveCodexModel, resolveCodexMaxTokens } from './providers/codex/modelMapping.js' +export { anthropicMessagesToCodexInput } from './providers/codex/convertMessages.js' +export type { CodexImageConversionOptions } from './providers/codex/convertMessages.js' +export { anthropicToolsToCodex } from './providers/codex/convertTools.js' diff --git a/packages/@ant/model-provider/src/providers/codex/__tests__/modelMapping.test.ts b/packages/@ant/model-provider/src/providers/codex/__tests__/modelMapping.test.ts new file mode 100644 index 000000000..cbd7be7d9 --- /dev/null +++ b/packages/@ant/model-provider/src/providers/codex/__tests__/modelMapping.test.ts @@ -0,0 +1,94 @@ +import { describe, expect, test, beforeEach, afterEach } from 'bun:test' +import { resolveCodexModel } from '../modelMapping.js' + +describe('resolveCodexModel', () => { + const originalEnv = { + CODEX_MODEL: process.env.CODEX_MODEL, + CODEX_DEFAULT_HAIKU_MODEL: process.env.CODEX_DEFAULT_HAIKU_MODEL, + CODEX_DEFAULT_SONNET_MODEL: process.env.CODEX_DEFAULT_SONNET_MODEL, + CODEX_DEFAULT_OPUS_MODEL: process.env.CODEX_DEFAULT_OPUS_MODEL, + } + + beforeEach(() => { + delete process.env.CODEX_MODEL + delete process.env.CODEX_DEFAULT_HAIKU_MODEL + delete process.env.CODEX_DEFAULT_SONNET_MODEL + delete process.env.CODEX_DEFAULT_OPUS_MODEL + }) + + afterEach(() => { + Object.assign(process.env, originalEnv) + }) + + test('CODEX_MODEL env var overrides all', () => { + process.env.CODEX_MODEL = 'my-custom-model' + expect(resolveCodexModel('claude-sonnet-4-6')).toBe('my-custom-model') + }) + + test('CODEX_DEFAULT_SONNET_MODEL overrides default map', () => { + process.env.CODEX_DEFAULT_SONNET_MODEL = 'my-sonnet' + expect(resolveCodexModel('claude-sonnet-4-6')).toBe('my-sonnet') + }) + + test('CODEX_DEFAULT_HAIKU_MODEL overrides default map', () => { + process.env.CODEX_DEFAULT_HAIKU_MODEL = 'my-haiku' + expect(resolveCodexModel('claude-haiku-4-5-20251001')).toBe('my-haiku') + }) + + test('CODEX_DEFAULT_OPUS_MODEL overrides default map', () => { + process.env.CODEX_DEFAULT_OPUS_MODEL = 'my-opus' + expect(resolveCodexModel('claude-opus-4-6')).toBe('my-opus') + }) + + test('maps known sonnet model via DEFAULT_MODEL_MAP', () => { + expect(resolveCodexModel('claude-sonnet-4-6')).toBe('gpt-5.4-mini') + }) + + test('maps known haiku model via DEFAULT_MODEL_MAP', () => { + expect(resolveCodexModel('claude-haiku-4-5-20251001')).toBe('gpt-5.4-nano') + }) + + test('maps known opus model via DEFAULT_MODEL_MAP', () => { + expect(resolveCodexModel('claude-opus-4-6')).toBe('gpt-5.4') + }) + + test('maps legacy sonnet models', () => { + expect(resolveCodexModel('claude-sonnet-4-20250514')).toBe('gpt-5.4-mini') + expect(resolveCodexModel('claude-3-5-sonnet-20241022')).toBe('gpt-5.4-mini') + }) + + test('maps legacy haiku models', () => { + expect(resolveCodexModel('claude-3-5-haiku-20241022')).toBe('gpt-5.4-nano') + }) + + test('maps legacy opus models', () => { + expect(resolveCodexModel('claude-opus-4-20250514')).toBe('gpt-5.4') + expect(resolveCodexModel('claude-opus-4-5-20251101')).toBe('gpt-5.4') + }) + + test('uses family default for unrecognized haiku model', () => { + expect(resolveCodexModel('claude-haiku-99')).toBe('gpt-5.4-nano') + }) + + test('uses family default for unrecognized sonnet model', () => { + expect(resolveCodexModel('claude-sonnet-99')).toBe('gpt-5.4-mini') + }) + + test('uses family default for unrecognized opus model', () => { + expect(resolveCodexModel('claude-opus-99')).toBe('gpt-5.4') + }) + + test('passes through unknown model name without family', () => { + expect(resolveCodexModel('some-random-model')).toBe('some-random-model') + }) + + test('strips [1m] suffix', () => { + expect(resolveCodexModel('claude-sonnet-4-6[1m]')).toBe('gpt-5.4-mini') + }) + + test('CODEX_MODEL takes precedence over family-specific vars', () => { + process.env.CODEX_MODEL = 'global-override' + process.env.CODEX_DEFAULT_SONNET_MODEL = 'family-override' + expect(resolveCodexModel('claude-sonnet-4-6')).toBe('global-override') + }) +}) diff --git a/packages/@ant/model-provider/src/providers/codex/callIds.ts b/packages/@ant/model-provider/src/providers/codex/callIds.ts new file mode 100644 index 000000000..9e21ff2f8 --- /dev/null +++ b/packages/@ant/model-provider/src/providers/codex/callIds.ts @@ -0,0 +1,31 @@ +import { createHash } from 'crypto' + +const MAX_CODEX_CALL_ID_LENGTH = 96 + +export function normalizeCodexCallId(value: unknown): string | null { + if (typeof value !== 'string') { + return null + } + + const sanitized = value + .trim() + .replace(/\s+/g, '_') + .replace(/[^A-Za-z0-9._:-]/g, '_') + .replace(/_+/g, '_') + .slice(0, MAX_CODEX_CALL_ID_LENGTH) + + return sanitized.length > 0 ? sanitized : null +} + +export function createCodexFallbackCallId(seed: string): string { + const hash = createHash('sha1') + .update(seed.length > 0 ? seed : 'codex-call') + .digest('hex') + .slice(0, 24) + + return `call_${hash}` +} + +export function resolveCodexCallId(value: unknown, seed: string): string { + return normalizeCodexCallId(value) ?? createCodexFallbackCallId(seed) +} diff --git a/packages/@ant/model-provider/src/providers/codex/convertMessages.ts b/packages/@ant/model-provider/src/providers/codex/convertMessages.ts new file mode 100644 index 000000000..5bb8b1d31 --- /dev/null +++ b/packages/@ant/model-provider/src/providers/codex/convertMessages.ts @@ -0,0 +1,392 @@ +import type { + ResponseFunctionToolCallOutputItem, + ResponseInputImage, + ResponseInputItem, + ResponseInputText, +} from 'openai/resources/responses/responses.mjs' +import type { Message } from '../../types/index.js' +import { + normalizeCodexCallId, + resolveCodexCallId, +} from './callIds.js' + +type ContentBlock = { + type: string + text?: string + source?: { + type?: string + data?: string + media_type?: string + url?: string + } +} + +type ToolUseLikeBlock = { + type: 'tool_use' + id: string + name: string + input: unknown +} + +type ToolResultLikeBlock = { + type: 'tool_result' + tool_use_id: string + content?: string | ReadonlyArray +} + +export type CodexImageConversionOptions = { + resolveBase64ImageUrl?: ( + data: string, + mediaType?: string, + ) => Promise +} + +type CodexCallIdState = { + byOriginalId: Map + sequence: number +} + +function createInputText(text: string): ResponseInputText { + return { + type: 'input_text', + text, + } +} + +function createInputImage(imageUrl: string): ResponseInputImage { + return { + type: 'input_image', + image_url: imageUrl, + detail: 'high', + } +} + +function getUnsupportedBlockText(type: string): string | null { + switch (type) { + case 'image': + return '[Image omitted: codex gateway currently requires remote image URLs. Configure CODEX_IMGBB_API_KEY to auto-convert local images.]' + case 'document': + return '[Document omitted: codex gateway does not support document replay.]' + default: + return null + } +} + +function getImageUrl(block: ContentBlock): string | null { + const source = block.source + if (!source) { + return null + } + + if (source.type === 'url' && typeof source.url === 'string' && source.url.length > 0) { + return source.url + } + + return null +} + +async function resolveImageUrl( + block: ContentBlock, + options: CodexImageConversionOptions, +): Promise { + const directUrl = getImageUrl(block) + if (directUrl) { + return directUrl + } + + if (block.source?.type !== 'base64') { + return null + } + + if (options.resolveBase64ImageUrl && typeof block.source.data === 'string') { + const uploadedUrl = await options.resolveBase64ImageUrl( + block.source.data, + block.source.media_type, + ) + if (uploadedUrl) { + return uploadedUrl + } + } + return null +} + +async function convertBlocksToInputContent( + content: ReadonlyArray, + options: CodexImageConversionOptions, +): Promise> { + const output: Array = [] + + for (const block of content) { + if (block.type === 'text' && block.text) { + output.push(createInputText(block.text)) + continue + } + + if (block.type === 'image') { + const imageUrl = await resolveImageUrl(block, options) + if (imageUrl) { + output.push(createInputImage(imageUrl)) + continue + } + } + + const fallback = getUnsupportedBlockText(block.type) + if (fallback) { + output.push(createInputText(fallback)) + } + } + + return output +} + +async function convertToolResultOutput( + content: string | ReadonlyArray | undefined, + options: CodexImageConversionOptions, +): Promise { + if (!content) { + return '' + } + + if (typeof content === 'string') { + return content + } + + const output = await convertBlocksToInputContent(content, options) + + if (output.length === 0) { + return '' + } + + if (output.length === 1 && output[0].type === 'input_text') { + return output[0].text + } + + return output +} + +function pushUserMessage( + items: ResponseInputItem[], + textParts: string[], + imageUrls: string[] = [], +): void { + const text = textParts.join('\n').trim() + if (text.length === 0 && imageUrls.length === 0) { + return + } + + items.push({ + type: 'message', + role: 'user', + content: [ + ...(text.length > 0 ? [createInputText(text)] : []), + ...imageUrls.map(createInputImage), + ], + } as unknown as ResponseInputItem) +} + +function pushAssistantMessage( + items: ResponseInputItem[], + textParts: string[], +): void { + const text = textParts.join('\n').trim() + if (text.length === 0) { + return + } + + items.push({ + type: 'message', + role: 'assistant', + content: [ + { + type: 'output_text', + text, + annotations: [], + }, + ], + } as unknown as ResponseInputItem) +} + +function stringifyToolInput(input: unknown): string { + if (typeof input === 'string') { + return input + } + + try { + return JSON.stringify(input ?? {}) + } catch { + return '{}' + } +} + +function createCodexCallIdState(): CodexCallIdState { + return { + byOriginalId: new Map(), + sequence: 0, + } +} + +function resolveAssistantCallId( + block: ToolUseLikeBlock, + state: CodexCallIdState, +): string { + const originalId = typeof block.id === 'string' ? block.id : '' + const seed = `${block.name}:${stringifyToolInput(block.input)}:${state.sequence}` + const callId = resolveCodexCallId(originalId, seed) + + if (originalId.length > 0) { + state.byOriginalId.set(originalId, callId) + } + state.sequence += 1 + + return callId +} + +function resolveToolResultCallId( + toolUseId: unknown, + state: CodexCallIdState, +): string | null { + if (typeof toolUseId !== 'string') { + return null + } + + return state.byOriginalId.get(toolUseId) ?? normalizeCodexCallId(toolUseId) +} + +async function convertUserContentToInputItems( + items: ResponseInputItem[], + content: ReadonlyArray, + options: CodexImageConversionOptions, + callIdState: CodexCallIdState, +): Promise { + const textParts: string[] = [] + const imageUrls: string[] = [] + + for (const block of content) { + if (typeof block === 'string') { + textParts.push(block) + continue + } + + if (block.type === 'tool_result') { + pushUserMessage(items, textParts, imageUrls) + textParts.length = 0 + imageUrls.length = 0 + + const toolResultBlock = block as ToolResultLikeBlock + const callId = resolveToolResultCallId( + toolResultBlock.tool_use_id, + callIdState, + ) + if (!callId) { + continue + } + + items.push({ + type: 'function_call_output', + call_id: callId, + output: await convertToolResultOutput(toolResultBlock.content, options), + }) + continue + } + + if (block.type === 'text' && block.text) { + textParts.push(block.text) + continue + } + + if (block.type === 'image') { + const imageUrl = await resolveImageUrl(block, options) + if (imageUrl) { + imageUrls.push(imageUrl) + continue + } + } + + const fallback = getUnsupportedBlockText(block.type) + if (fallback) { + textParts.push(fallback) + } + } + + pushUserMessage(items, textParts, imageUrls) +} + +function convertAssistantContentToInputItems( + items: ResponseInputItem[], + content: ReadonlyArray, + callIdState: CodexCallIdState, +): void { + const textParts: string[] = [] + + for (const block of content) { + if (typeof block === 'string') { + textParts.push(block) + continue + } + + if (block.type === 'tool_use') { + pushAssistantMessage(items, textParts) + textParts.length = 0 + + const toolUseBlock = block as unknown as ToolUseLikeBlock + items.push({ + type: 'function_call', + call_id: resolveAssistantCallId(toolUseBlock, callIdState), + name: toolUseBlock.name, + arguments: stringifyToolInput(toolUseBlock.input), + }) + continue + } + + if (block.type === 'text' && block.text) { + textParts.push(block.text) + } + } + + pushAssistantMessage(items, textParts) +} + +export async function anthropicMessagesToCodexInput( + messages: Message[], + options: CodexImageConversionOptions = {}, +): Promise { + const items: ResponseInputItem[] = [] + const callIdState = createCodexCallIdState() + + for (const message of messages) { + if (message.type !== 'user' && message.type !== 'assistant') { + continue + } + + const apiMessage = message.message + if (!apiMessage?.content) { + continue + } + + if (typeof apiMessage.content === 'string') { + if (message.type === 'user') { + pushUserMessage(items, [apiMessage.content]) + } else { + pushAssistantMessage(items, [apiMessage.content]) + } + continue + } + + if (message.type === 'user') { + await convertUserContentToInputItems( + items, + apiMessage.content as ReadonlyArray, + options, + callIdState, + ) + } else { + convertAssistantContentToInputItems( + items, + apiMessage.content as ReadonlyArray, + callIdState, + ) + } + } + + return items +} diff --git a/packages/@ant/model-provider/src/providers/codex/convertTools.ts b/packages/@ant/model-provider/src/providers/codex/convertTools.ts new file mode 100644 index 000000000..0a0dd11e2 --- /dev/null +++ b/packages/@ant/model-provider/src/providers/codex/convertTools.ts @@ -0,0 +1,39 @@ +import type { BetaToolUnion } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs' +import type { Tool as CodexTool } from 'openai/resources/responses/responses.mjs' + +function isClientFunctionTool( + tool: BetaToolUnion, +): tool is BetaToolUnion & { + name: string + description?: string + input_schema?: { [key: string]: unknown } + strict?: boolean + defer_loading?: boolean +} { + const value = tool as unknown as Record + return typeof value.name === 'string' +} + +export function anthropicToolsToCodex( + tools: BetaToolUnion[], +): CodexTool[] { + return tools.flatMap(tool => { + const value = tool as unknown as Record + if ( + value.type === 'advisor_20260301' || + value.type === 'computer_20250124' || + !isClientFunctionTool(tool) + ) { + return [] + } + + return [{ + type: 'function', + name: tool.name, + description: tool.description, + parameters: tool.input_schema ?? {}, + strict: tool.strict ?? null, + ...(tool.defer_loading && { defer_loading: true }), + }] + }) +} diff --git a/packages/@ant/model-provider/src/providers/codex/modelMapping.ts b/packages/@ant/model-provider/src/providers/codex/modelMapping.ts new file mode 100644 index 000000000..3bee191b4 --- /dev/null +++ b/packages/@ant/model-provider/src/providers/codex/modelMapping.ts @@ -0,0 +1,85 @@ +/** + * Default mapping from Anthropic model names to Codex (OpenAI Responses API) model names. + * Used only when CODEX_DEFAULT_{FAMILY}_MODEL env vars are not set. + */ +const DEFAULT_MODEL_MAP: Record = { + 'claude-sonnet-4-20250514': 'gpt-5.4-mini', + 'claude-sonnet-4-5-20250929': 'gpt-5.4-mini', + 'claude-sonnet-4-6': 'gpt-5.4-mini', + 'claude-3-7-sonnet-20250219': 'gpt-5.4-mini', + 'claude-3-5-sonnet-20241022': 'gpt-5.4-mini', + 'claude-opus-4-20250514': 'gpt-5.4', + 'claude-opus-4-1-20250805': 'gpt-5.4', + 'claude-opus-4-5-20251101': 'gpt-5.4', + 'claude-opus-4-6': 'gpt-5.4', + 'claude-haiku-4-5-20251001': 'gpt-5.4-nano', + 'claude-3-5-haiku-20241022': 'gpt-5.4-nano', +} + +/** + * Default model for each family when an exact match is not in DEFAULT_MODEL_MAP. + */ +const DEFAULT_FAMILY_MAP: Record = { + haiku: 'gpt-5.4-nano', + sonnet: 'gpt-5.4-mini', + opus: 'gpt-5.4', +} + +function getModelFamily(model: string): 'haiku' | 'sonnet' | 'opus' | null { + if (/haiku/i.test(model)) return 'haiku' + if (/opus/i.test(model)) return 'opus' + if (/sonnet/i.test(model)) return 'sonnet' + return null +} + +/** + * Resolve the Codex (OpenAI Responses API) model name for a given Anthropic model. + * + * Priority: + * 1. CODEX_MODEL env var (override all) + * 2. CODEX_DEFAULT_{FAMILY}_MODEL env var (e.g. CODEX_DEFAULT_SONNET_MODEL) + * 3. DEFAULT_MODEL_MAP lookup (exact Anthropic model name match) + * 4. DEFAULT_FAMILY_MAP lookup (family-based default) + * 5. Pass through original model name + */ +export function resolveCodexModel(model: string): string { + if (process.env.CODEX_MODEL) { + return process.env.CODEX_MODEL + } + + const cleanModel = model.replace(/\[1m\]$/, '') + const family = getModelFamily(cleanModel) + if (family) { + const familyOverride = process.env[`CODEX_DEFAULT_${family.toUpperCase()}_MODEL`] + if (familyOverride) { + return familyOverride + } + } + + const mapped = DEFAULT_MODEL_MAP[cleanModel] + if (mapped) { + return mapped + } + + if (family) { + return DEFAULT_FAMILY_MAP[family] + } + + return cleanModel +} + +export function resolveCodexMaxTokens( + upperLimit: number, + maxOutputTokensOverride?: number, +): number { + return ( + maxOutputTokensOverride ?? + (process.env.CODEX_MAX_TOKENS + ? parseInt(process.env.CODEX_MAX_TOKENS, 10) || undefined + : undefined) ?? + (process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS + ? parseInt(process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS, 10) || undefined + : undefined) ?? + upperLimit + ) +} diff --git a/src/commands/provider.ts b/src/commands/provider.ts index eae2e2dd2..e6a839dd6 100644 --- a/src/commands/provider.ts +++ b/src/commands/provider.ts @@ -124,12 +124,12 @@ const call: LocalCommandCall = async (args, context) => { // Check env vars when switching to codex (including settings.env) if (arg === 'codex') { const mergedEnv = getMergedEnv() - const hasKey = !!mergedEnv.CODEX_API_KEY + const hasKey = !!(mergedEnv.CODEX_API_KEY || mergedEnv.CODEX_ACCESS_TOKEN) if (!hasKey) { updateSettingsForSource('userSettings', { modelType: 'codex' }) return { type: 'text', - value: `Switched to Codex provider.\nWarning: Missing env var: CODEX_API_KEY\nConfigure it via /login (ChatGPT Subscription) or set manually.`, + value: `Switched to Codex provider.\nWarning: No CODEX_API_KEY or CODEX_ACCESS_TOKEN found.\nUse /login (ChatGPT Subscription) or set manually.`, } } } diff --git a/src/services/api/claude.ts b/src/services/api/claude.ts index 720370db6..4e7d096c5 100644 --- a/src/services/api/claude.ts +++ b/src/services/api/claude.ts @@ -1347,6 +1347,12 @@ async function* queryModel( return } + if (getAPIProvider() === 'codex') { + const { queryModelCodex } = await import('./codex/index.js') + yield* queryModelCodex(messagesForAPI, systemPrompt, filteredTools, signal, options) + return + } + if (getAPIProvider() === 'gemini') { const { queryModelGemini } = await import('./gemini/index.js') yield* queryModelGemini( diff --git a/src/services/api/codex/client.ts b/src/services/api/codex/client.ts new file mode 100644 index 000000000..f1eb25639 --- /dev/null +++ b/src/services/api/codex/client.ts @@ -0,0 +1,57 @@ +import OpenAI from 'openai' +import { openaiAdapter } from 'src/services/providerUsage/adapters/openai.js' +import { updateProviderBuckets } from 'src/services/providerUsage/store.js' +import { getProxyFetchOptions } from 'src/utils/proxy.js' + +export const DEFAULT_CODEX_BASE_URL = 'https://api.openai.com/v1' + +let cachedClient: OpenAI | null = null + +function wrapFetchForUsage(base: typeof fetch): typeof fetch { + const wrapped = async ( + ...args: Parameters + ): Promise => { + const res = await base(...args) + try { + updateProviderBuckets('codex', openaiAdapter.parseHeaders(res.headers)) + } catch { + // Usage tracking must not affect the request path. + } + return res + } + return wrapped as unknown as typeof fetch +} + +export function getCodexClient(options?: { + maxRetries?: number + fetchOverride?: typeof fetch +}): OpenAI { + if (cachedClient && !options?.fetchOverride) { + return cachedClient + } + + const apiKey = process.env.CODEX_API_KEY || process.env.CODEX_ACCESS_TOKEN || '' + const baseURL = process.env.CODEX_BASE_URL || DEFAULT_CODEX_BASE_URL + const baseFetch = options?.fetchOverride ?? (globalThis.fetch as typeof fetch) + const wrappedFetch = wrapFetchForUsage(baseFetch) + + const client = new OpenAI({ + apiKey, + baseURL, + maxRetries: options?.maxRetries ?? 0, + timeout: parseInt(process.env.API_TIMEOUT_MS || String(600 * 1000), 10), + dangerouslyAllowBrowser: true, + fetchOptions: getProxyFetchOptions({ forAnthropicAPI: false }), + fetch: wrappedFetch, + }) + + if (!options?.fetchOverride) { + cachedClient = client + } + + return client +} + +export function clearCodexClientCache(): void { + cachedClient = null +} diff --git a/src/services/api/codex/errors.ts b/src/services/api/codex/errors.ts new file mode 100644 index 000000000..a6b384f10 --- /dev/null +++ b/src/services/api/codex/errors.ts @@ -0,0 +1,115 @@ +import type { SDKAssistantMessageError } from '../../../entrypoints/agentSdkTypes.js' + +type CodexErrorLike = { + status?: unknown + message?: unknown + error?: { + message?: unknown + } +} + +export type NormalizedCodexError = { + content: string + error: SDKAssistantMessageError +} + +function readErrorStatus(error: unknown): number | null { + if ( + typeof error === 'object' && + error !== null && + typeof (error as CodexErrorLike).status === 'number' + ) { + return (error as CodexErrorLike).status as number + } + + return null +} + +function readErrorMessage(error: unknown): string { + if (error instanceof Error && error.message.length > 0) { + return error.message + } + + if (typeof error === 'object' && error !== null) { + const value = error as CodexErrorLike + if (typeof value.message === 'string' && value.message.length > 0) { + return value.message + } + if ( + typeof value.error?.message === 'string' && + value.error.message.length > 0 + ) { + return value.error.message + } + } + + return String(error) +} + +export function getCodexConfigurationError(): NormalizedCodexError | null { + if (!process.env.CODEX_API_KEY && !process.env.CODEX_ACCESS_TOKEN) { + return { + content: + 'Missing CODEX_API_KEY or CODEX_ACCESS_TOKEN. Use /login (ChatGPT Subscription) or set manually.', + error: 'authentication_failed', + } + } + + return null +} + +export function normalizeCodexError(error: unknown): NormalizedCodexError { + const status = readErrorStatus(error) + const message = readErrorMessage(error) + + if (/^Codex preflight:/i.test(message)) { + return { + content: message, + error: 'invalid_request', + } + } + + if (status === 401 || status === 403) { + + return { + content: `Codex authentication failed (${status}). ${message}`, + error: 'authentication_failed', + } + } + + if (status === 404) { + return { + content: + 'Codex endpoint not found (404). Verify CODEX_BASE_URL points to a Responses API root.', + error: 'invalid_request', + } + } + + if (status === 429) { + return { + content: + 'Codex rate limit reached (429). Retry shortly or reduce request volume.', + error: 'rate_limit', + } + } + + if (status === 502 && /upstream request failed/i.test(message)) { + return { + content: + 'Codex gateway returned 502 Upstream request failed. This usually means a transient gateway issue or incomplete Responses API compatibility during tool replay.', + error: 'server_error', + } + } + + if (status !== null && status >= 500) { + return { + content: `Codex server error (${status}): ${message}`, + error: 'server_error', + } + } + + return { + content: `API Error: ${message}`, + error: 'unknown', + } +} diff --git a/src/services/api/codex/imageUpload.ts b/src/services/api/codex/imageUpload.ts new file mode 100644 index 000000000..fef8abdc4 --- /dev/null +++ b/src/services/api/codex/imageUpload.ts @@ -0,0 +1,132 @@ +import { createHash } from 'crypto' +import { logForDebugging } from '../../../utils/debug.js' + +const resolvedImageUrls = new Map() +const DEFAULT_TIMEOUT_MS = 30_000 +const IMGBB_UPLOAD_URL = 'https://api.imgbb.com/1/upload' + +type ImgbbVariant = { + url?: unknown +} + +type ImgbbPayload = { + data?: { + url?: unknown + display_url?: unknown + image?: ImgbbVariant + medium?: ImgbbVariant + thumb?: ImgbbVariant + } +} + +function getUploadTimeoutMs(): number { + const raw = + process.env.CODEX_IMAGE_UPLOAD_TIMEOUT_MS ?? + process.env.CODEX_IMAGE_URL_TIMEOUT_MS + if (!raw) { + return DEFAULT_TIMEOUT_MS + } + + const parsed = Number.parseInt(raw, 10) + return Number.isFinite(parsed) && parsed > 0 ? parsed : DEFAULT_TIMEOUT_MS +} + +function getCacheKey(prefix: string, value: string): string { + return `${prefix}:${createHash('sha256').update(value).digest('hex')}` +} + +function getImgbbApiKey(): string | null { + const apiKey = process.env.CODEX_IMGBB_API_KEY?.trim() + return apiKey && apiKey.length > 0 ? apiKey : null +} + +function pickImgbbImageUrl(payload: ImgbbPayload): string | null { + const candidates = [ + payload.data?.medium?.url, + payload.data?.thumb?.url, + payload.data?.image?.url, + payload.data?.url, + payload.data?.display_url, + ] + + for (const candidate of candidates) { + if (typeof candidate === 'string' && candidate.length > 0) { + return candidate + } + } + + return null +} + +async function withTimeout( + run: (signal: AbortSignal) => Promise, +): Promise { + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), getUploadTimeoutMs()) + + try { + return await run(controller.signal) + } finally { + clearTimeout(timeout) + } +} + +async function uploadToImgbb( + base64Image: string, +): Promise { + const apiKey = getImgbbApiKey() + if (!apiKey) { + return null + } + + try { + const url = await withTimeout(async signal => { + const body = new FormData() + body.append('image', base64Image) + + const response = await fetch(`${IMGBB_UPLOAD_URL}?key=${encodeURIComponent(apiKey)}`, { + method: 'POST', + body, + signal, + }) + + if (!response.ok) { + logForDebugging( + `[Codex] ImgBB upload failed: ${response.status} ${response.statusText}`, + ) + return null + } + + return pickImgbbImageUrl((await response.json()) as ImgbbPayload) + }) + + if (!url) { + logForDebugging('[Codex] ImgBB upload produced no usable URL.') + return null + } + + return url + } catch (error) { + logForDebugging(`[Codex] Failed to upload image to ImgBB: ${error}`) + return null + } +} + +export async function uploadCodexBase64Image( + data: string, + mediaType: string = 'image/png', +): Promise { + const cacheKey = getCacheKey('base64', `${mediaType}:${data}`) + const cached = resolvedImageUrls.get(cacheKey) + if (cached) { + return cached + } + + const url = await uploadToImgbb(data) + if (!url) { + return null + } + + resolvedImageUrls.set(cacheKey, url) + return url +} diff --git a/src/services/api/codex/index.ts b/src/services/api/codex/index.ts new file mode 100644 index 000000000..81e9ecfb5 --- /dev/null +++ b/src/services/api/codex/index.ts @@ -0,0 +1,304 @@ +import type { BetaToolUnion } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs' +import type { + Response, + ResponseCreateParamsNonStreaming, +} from 'openai/resources/responses/responses.mjs' +import { appendFileSync } from 'fs' +import type { SystemPrompt } from '../../../utils/systemPromptType.js' +import type { + AssistantMessage, + Message, + StreamEvent, + SystemAPIErrorMessage, +} from '../../../types/message.js' +import type { Tools } from '../../../Tool.js' +import type { SDKAssistantMessageError } from '../../../entrypoints/agentSdkTypes.js' +import { toolToAPISchema } from '../../../utils/api.js' +import { + createAssistantAPIErrorMessage, + normalizeMessagesForAPI, +} from '../../../utils/messages.js' +import { logForDebugging } from '../../../utils/debug.js' +import { getModelMaxOutputTokens } from '../../../utils/context.js' +import type { Options } from '../claude.js' +import { recordLLMObservation } from '../../../services/langfuse/tracing.js' +import { + convertMessagesToLangfuse, + convertOutputToLangfuse, + convertToolsToLangfuse, +} from '../../../services/langfuse/convert.js' +import { + anthropicMessagesToCodexInput, + anthropicToolsToCodex, + resolveCodexMaxTokens, + resolveCodexModel, +} from '@ant/model-provider' +import { getCodexClient } from './client.js' +import { uploadCodexBase64Image } from './imageUpload.js' +import { + getCodexConfigurationError, + normalizeCodexError, +} from './errors.js' +import { sanitizeCodexRequest } from './preflight.js' +import { + addCodexUsage, + type CodexStreamResult, + type CodexUsage, + rawAssistantBlocksToAssistantMessage, + type RawAssistantBlock, + streamCodexAttempt, +} from './streaming.js' + +const MAX_CODEX_CONTINUATIONS = 3 + +function dumpCodexPayload( + body: ResponseCreateParamsNonStreaming, +): void { + const path = process.env.CODEX_DEBUG_PAYLOADS + if (!path) { + return + } + + appendFileSync( + path, + `${JSON.stringify({ timestamp: new Date().toISOString(), body }, null, 2)}\n`, + ) +} + +function appendRawAssistantBlocks( + target: RawAssistantBlock[], + source: RawAssistantBlock[], +): void { + for (const block of source) { + const lastBlock = target.at(-1) + + if (lastBlock?.type === 'text' && block.type === 'text') { + lastBlock.text += block.text + continue + } + + if ( + lastBlock?.type === 'tool_use' && + block.type === 'tool_use' && + lastBlock.id === block.id && + lastBlock.name === block.name && + block.input.startsWith(lastBlock.input) + ) { + lastBlock.input = block.input + continue + } + + target.push({ ...block }) + } +} + +export async function* queryModelCodex( + messages: Message[], + systemPrompt: SystemPrompt, + tools: Tools, + signal: AbortSignal, + options: Options, +): AsyncGenerator< + StreamEvent | AssistantMessage | SystemAPIErrorMessage, + void +> { + try { + const configurationError = getCodexConfigurationError() + if (configurationError) { + yield createAssistantAPIErrorMessage({ + content: configurationError.content, + apiError: 'api_error', + error: configurationError.error, + }) + return + } + + const model = resolveCodexModel(options.model) + const messagesForAPI = normalizeMessagesForAPI(messages, tools) + const toolSchemas = await Promise.all( + tools.map(tool => + toolToAPISchema(tool, { + getToolPermissionContext: options.getToolPermissionContext, + tools, + agents: options.agents, + allowedAgentTypes: options.allowedAgentTypes, + model: options.model, + }), + ), + ) + const codexTools = anthropicToolsToCodex(toolSchemas as BetaToolUnion[]) + const { upperLimit } = getModelMaxOutputTokens(model) + const maxTokens = resolveCodexMaxTokens( + upperLimit, + options.maxOutputTokensOverride, + ) + + const client = getCodexClient({ + maxRetries: 0, + fetchOverride: options.fetchOverride as typeof fetch | undefined, + }) + const start = Date.now() + const collectedMessages: AssistantMessage[] = [] + let totalUsage: CodexUsage = { + input_tokens: 0, + output_tokens: 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + } + + const aggregateBlocks: RawAssistantBlock[] = [] + let replayMessages = messagesForAPI + let partialMessage: AssistantMessage['message'] | undefined + let finalResponse: Response | undefined + let terminalIncompleteResponse: Response | undefined + + for ( + let attempt = 0; + attempt <= MAX_CODEX_CONTINUATIONS; + attempt += 1 + ) { + const input = await anthropicMessagesToCodexInput(replayMessages, { + resolveBase64ImageUrl: uploadCodexBase64Image, + }) + const requestBody = sanitizeCodexRequest({ + model, + input, + store: false, + parallel_tool_calls: false, + max_output_tokens: maxTokens, + ...(systemPrompt.length > 0 && { + instructions: systemPrompt.join('\n\n'), + }), + ...(codexTools.length > 0 && { + tools: codexTools, + }), + ...(options.temperatureOverride !== undefined && { + temperature: options.temperatureOverride, + }), + } satisfies ResponseCreateParamsNonStreaming) + + if (attempt === 0) { + logForDebugging( + `[Codex] Calling model=${model}, inputItems=${input.length}, tools=${codexTools.length}`, + ) + dumpCodexPayload(requestBody) + } else { + logForDebugging( + `[Codex] Continuing incomplete response attempt ${attempt}/${MAX_CODEX_CONTINUATIONS}`, + ) + } + + const attemptStream = streamCodexAttempt({ + client, + requestBody, + signal, + start, + emitPrimaryEvents: attempt === 0, + }) + + let attemptResult: CodexStreamResult | undefined + while (true) { + const next = await attemptStream.next() + if (next.done) { + attemptResult = next.value + break + } + yield next.value + } + + if (!attemptResult?.response) { + continue + } + + partialMessage = partialMessage ?? attemptResult.partialMessage + finalResponse = attemptResult.response + terminalIncompleteResponse = attemptResult.incompleteResponse + totalUsage = addCodexUsage(totalUsage, attemptResult.response) + + if (attemptResult.assistantBlocks.length === 0) { + break + } + + appendRawAssistantBlocks(aggregateBlocks, attemptResult.assistantBlocks) + + const shouldContinue = + attemptResult.incompleteResponse !== undefined && + attempt < MAX_CODEX_CONTINUATIONS + + if (!shouldContinue) { + break + } + + const continuationMessage = rawAssistantBlocksToAssistantMessage( + attemptResult.assistantBlocks, + attemptResult.response, + tools, + options.agentId, + ) + replayMessages = [...replayMessages, continuationMessage] + } + + if (finalResponse) { + if (aggregateBlocks.length === 0) { + yield createAssistantAPIErrorMessage({ + content: 'Codex returned an empty streamed response.', + apiError: 'api_error', + error: 'unknown', + }) + return + } + + const assistantMessage = rawAssistantBlocksToAssistantMessage( + aggregateBlocks, + finalResponse, + tools, + options.agentId, + ) + assistantMessage.message.usage = totalUsage as any + collectedMessages.push(assistantMessage) + yield assistantMessage + + recordLLMObservation(options.langfuseTrace ?? null, { + model, + provider: process.env.CODEX_LOGIN_METHOD === 'chatgpt_subscription' + ? 'codex-chatgpt' + : 'codex', + input: convertMessagesToLangfuse(messagesForAPI, systemPrompt), + output: convertOutputToLangfuse(collectedMessages), + usage: totalUsage, + startTime: new Date(start), + endTime: new Date(), + completionStartTime: + partialMessage !== undefined ? new Date(start) : undefined, + tools: convertToolsToLangfuse(toolSchemas as unknown[]), + }) + } else { + yield createAssistantAPIErrorMessage({ + content: 'Codex returned an empty streamed response.', + apiError: 'api_error', + error: 'unknown', + }) + return + } + + if ( + terminalIncompleteResponse?.incomplete_details?.reason === + 'max_output_tokens' + ) { + yield createAssistantAPIErrorMessage({ + content: `Output truncated: response exceeded the ${maxTokens} token limit. Set CODEX_MAX_TOKENS or CLAUDE_CODE_MAX_OUTPUT_TOKENS to override.`, + apiError: 'max_output_tokens', + error: 'max_output_tokens' as unknown as SDKAssistantMessageError, + }) + } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + const normalizedError = normalizeCodexError(error) + logForDebugging(`[Codex] Error: ${errorMessage}`, { level: 'error' }) + yield createAssistantAPIErrorMessage({ + content: normalizedError.content, + apiError: 'api_error', + error: normalizedError.error, + }) + } +} diff --git a/src/services/api/codex/preflight.ts b/src/services/api/codex/preflight.ts new file mode 100644 index 000000000..2c6ec9b8b --- /dev/null +++ b/src/services/api/codex/preflight.ts @@ -0,0 +1,151 @@ +import type { + ResponseCreateParamsNonStreaming, + ResponseCreateParamsStreaming, + ResponseInputItem, + Tool, +} from 'openai/resources/responses/responses.mjs' +import { normalizeCodexCallId } from '@ant/model-provider' + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value) +} + +function assertString(value: unknown, label: string): string { + if (typeof value !== 'string') { + throw new Error(`Codex preflight: ${label} must be a string.`) + } + + return value +} + +function sanitizeMessageItem(item: Record): ResponseInputItem { + const role = assertString(item.role, 'message.role') + const content = item.content + + if ((role !== 'user' && role !== 'assistant') || !Array.isArray(content)) { + throw new Error('Codex preflight: message items require role and content array.') + } + + return item as unknown as ResponseInputItem +} + +function sanitizeFunctionCallItem(item: Record): ResponseInputItem { + const callId = normalizeCodexCallId(item.call_id) + const name = assertString(item.name, 'function_call.name').trim() + const argumentsValue = item.arguments + + if (!callId) { + throw new Error('Codex preflight: function_call.call_id is required.') + } + if (name.length === 0) { + throw new Error('Codex preflight: function_call.name is required.') + } + if (typeof argumentsValue !== 'string') { + throw new Error('Codex preflight: function_call.arguments must be a string.') + } + + return { + ...item, + call_id: callId, + name, + arguments: argumentsValue, + } as ResponseInputItem +} + +function sanitizeFunctionCallOutputItem( + item: Record, +): ResponseInputItem { + const callId = normalizeCodexCallId(item.call_id) + const output = item.output + + if (!callId) { + throw new Error('Codex preflight: function_call_output.call_id is required.') + } + if ( + typeof output !== 'string' && + !(Array.isArray(output) && output.every(part => isRecord(part))) + ) { + throw new Error( + 'Codex preflight: function_call_output.output must be a string or content array.', + ) + } + + return { + ...item, + call_id: callId, + } as ResponseInputItem +} + +function sanitizeInputItem(item: unknown): ResponseInputItem { + if (!isRecord(item) || typeof item.type !== 'string') { + throw new Error('Codex preflight: each input item requires a type.') + } + + switch (item.type) { + case 'message': + return sanitizeMessageItem(item) + case 'function_call': + return sanitizeFunctionCallItem(item) + case 'function_call_output': + return sanitizeFunctionCallOutputItem(item) + default: + throw new Error(`Codex preflight: unsupported input item type "${item.type}".`) + } +} + +function sanitizeTool(tool: unknown): Tool { + if (!isRecord(tool) || tool.type !== 'function') { + throw new Error('Codex preflight: only function tools are supported.') + } + + const name = assertString(tool.name, 'tool.name').trim() + const parameters = isRecord(tool.parameters) ? tool.parameters : {} + + if (name.length === 0) { + throw new Error('Codex preflight: tool.name is required.') + } + + return { + ...tool, + type: 'function', + name, + parameters, + } as Tool +} + +export function sanitizeCodexRequest( + request: ResponseCreateParamsNonStreaming, +): ResponseCreateParamsNonStreaming { + if (typeof request.model !== 'string' || request.model.trim().length === 0) { + throw new Error('Codex preflight: model is required.') + } + + if ( + request.instructions !== undefined && + request.instructions !== null && + typeof request.instructions !== 'string' + ) { + throw new Error('Codex preflight: instructions must be a string.') + } + + if (!Array.isArray(request.input)) { + throw new Error('Codex preflight: input must be an array.') + } + + return { + ...request, + model: request.model.trim(), + instructions: request.instructions?.trim() || undefined, + input: request.input.map(sanitizeInputItem), + tools: request.tools?.map(sanitizeTool), + } +} + +export function toStreamingCodexRequest( + request: ResponseCreateParamsNonStreaming, +): ResponseCreateParamsStreaming { + return { + ...request, + stream: true, + } +} diff --git a/src/services/api/codex/streaming.ts b/src/services/api/codex/streaming.ts new file mode 100644 index 000000000..f8a27c0c6 --- /dev/null +++ b/src/services/api/codex/streaming.ts @@ -0,0 +1,681 @@ +import { randomUUID } from 'crypto' +import type { + Response, + ResponseCreateParamsNonStreaming, + ResponseFunctionToolCall, + ResponseOutputItem, + ResponseOutputMessage, + ResponseStreamEvent, +} from 'openai/resources/responses/responses.mjs' +import type { AssistantMessage, StreamEvent } from '../../../types/message.js' +import type { Tools } from '../../../Tool.js' +import { + createAssistantMessage, + normalizeContentFromAPI, +} from '../../../utils/messages.js' +import { getCodexClient } from './client.js' +import { resolveCodexCallId } from '@ant/model-provider' +import { toStreamingCodexRequest } from './preflight.js' + +export type RawAssistantBlock = + | { type: 'text'; text: string } + | { type: 'tool_use'; id: string; name: string; input: string } + +export type CodexUsage = { + input_tokens: number + output_tokens: number + cache_creation_input_tokens: number + cache_read_input_tokens: number +} + +export type CodexStreamResult = { + response?: Response + incompleteResponse?: Response + partialMessage?: AssistantMessage['message'] + assistantBlocks: RawAssistantBlock[] +} + +type CodexStreamState = { + contentBlocks: Record + completedBlocks: Array + partialMessage?: AssistantMessage['message'] + finalResponse?: Response + incompleteResponse?: Response + failedResponse?: Response +} + +export function getCodexUsage( + response: Pick | null | undefined, +): CodexUsage { + return { + input_tokens: response?.usage?.input_tokens ?? 0, + output_tokens: response?.usage?.output_tokens ?? 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: + response?.usage?.input_tokens_details.cached_tokens ?? 0, + } +} + +export function addCodexUsage( + total: CodexUsage, + response: Pick | null | undefined, +): CodexUsage { + const usage = getCodexUsage(response) + + return { + input_tokens: total.input_tokens + usage.input_tokens, + output_tokens: total.output_tokens + usage.output_tokens, + cache_creation_input_tokens: + total.cache_creation_input_tokens + usage.cache_creation_input_tokens, + cache_read_input_tokens: + total.cache_read_input_tokens + usage.cache_read_input_tokens, + } +} + +function createPartialAssistantMessage( + response: Response, +): AssistantMessage['message'] { + return { + id: response.id, + type: 'message', + role: 'assistant', + content: [], + model: response.model, + stop_reason: null, + stop_sequence: null, + usage: getCodexUsage(response) as any, + } as AssistantMessage['message'] +} + +function createToolUseBlock( + item: Partial & { id?: string }, +): RawAssistantBlock { + return { + type: 'tool_use', + id: resolveCodexCallId( + item.call_id ?? item.id, + `tool:${item.name ?? ''}:${item.arguments ?? ''}:${item.id ?? ''}`, + ), + name: item.name ?? '', + input: item.arguments ?? '', + } +} + +function getCompletedTextFromItem(item: ResponseOutputItem): string | null { + if (item.type !== 'message' || item.role !== 'assistant') { + return null + } + + for (const content of (item as ResponseOutputMessage).content) { + if (content.type === 'output_text' && content.text.length > 0) { + return content.text + } + if (content.type === 'refusal' && content.refusal.length > 0) { + return content.refusal + } + } + + return null +} + +function getCompletedAssistantBlocks( + blocks: Array, +): RawAssistantBlock[] { + return blocks.filter( + (block): block is RawAssistantBlock => block !== undefined, + ) +} + +function getCodexStopReason( + response: Pick, + blocks: RawAssistantBlock[], +): string { + if (response.incomplete_details?.reason === 'max_output_tokens') { + return 'max_tokens' + } + + return blocks.some(block => block.type === 'tool_use') ? 'tool_use' : 'end_turn' +} + +function emitTrailingTextDelta( + output: StreamEvent[], + index: number, + currentText: string, + finalText: string, +): void { + if (!finalText.startsWith(currentText)) { + return + } + + const delta = finalText.slice(currentText.length) + if (delta.length === 0) { + return + } + + output.push({ + type: 'stream_event', + event: { + type: 'content_block_delta', + index, + delta: { + type: 'text_delta', + text: delta, + }, + } as any, + } as StreamEvent) +} + +function emitTrailingToolDelta( + output: StreamEvent[], + index: number, + currentInput: string, + finalInput: string, +): void { + if (!finalInput.startsWith(currentInput)) { + return + } + + const delta = finalInput.slice(currentInput.length) + if (delta.length === 0) { + return + } + + output.push({ + type: 'stream_event', + event: { + type: 'content_block_delta', + index, + delta: { + type: 'input_json_delta', + partial_json: delta, + }, + } as any, + } as StreamEvent) +} + +function responseToRawAssistantBlocks(response: Response): RawAssistantBlock[] { + const blocks: RawAssistantBlock[] = [] + + for (const item of response.output) { + if (item.type === 'function_call') { + const functionCall = item as ResponseFunctionToolCall + blocks.push({ + type: 'tool_use', + id: resolveCodexCallId( + functionCall.call_id, + `output:${functionCall.name}:${functionCall.arguments}`, + ), + name: functionCall.name, + input: functionCall.arguments, + }) + continue + } + + if (item.type !== 'message' || item.role !== 'assistant') { + continue + } + + for (const content of (item as ResponseOutputMessage).content) { + if (content.type === 'output_text' && content.text.length > 0) { + blocks.push({ + type: 'text', + text: content.text, + }) + } else if (content.type === 'refusal' && content.refusal.length > 0) { + blocks.push({ + type: 'text', + text: content.refusal, + }) + } + } + } + + if ( + blocks.length === 0 && + typeof response.output_text === 'string' && + response.output_text.length > 0 + ) { + blocks.push({ + type: 'text', + text: response.output_text, + }) + } + + return blocks +} + +export function rawAssistantBlocksToAssistantMessage( + rawBlocks: RawAssistantBlock[], + response: Pick, + tools: Tools, + agentId?: string, +): AssistantMessage { + const content = normalizeContentFromAPI( + rawBlocks as any, + tools, + agentId as any, + ) + + const assistantMessage = createAssistantMessage({ + content: content as any, + usage: { + input_tokens: response.usage?.input_tokens ?? 0, + output_tokens: response.usage?.output_tokens ?? 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: + response.usage?.input_tokens_details.cached_tokens ?? 0, + } as any, + }) + + assistantMessage.message.id = response.id + assistantMessage.message.model = response.model + assistantMessage.message.stop_reason = getCodexStopReason(response, rawBlocks) as any + assistantMessage.message.stop_sequence = null + assistantMessage.uuid = randomUUID() + assistantMessage.timestamp = new Date().toISOString() + + return assistantMessage +} + +function handleCodexStreamEvent(params: { + event: ResponseStreamEvent + partialMessage: AssistantMessage['message'] | undefined + contentBlocks: Record + completedBlocks: Array + start: number +}): { + output: StreamEvent[] + partialMessage: AssistantMessage['message'] | undefined + finalResponse?: Response + failedResponse?: Response + incompleteResponse?: Response +} { + const { event, start } = params + const output: StreamEvent[] = [] + const contentBlocks = params.contentBlocks + const completedBlocks = params.completedBlocks + let partialMessage = params.partialMessage + let finalResponse: Response | undefined + let failedResponse: Response | undefined + let incompleteResponse: Response | undefined + + const ensureMessageStart = (response: Response): void => { + if (partialMessage) { + return + } + + partialMessage = createPartialAssistantMessage(response) + output.push({ + type: 'stream_event', + event: { + type: 'message_start', + message: partialMessage, + } as any, + ttftMs: Date.now() - start, + } as StreamEvent) + } + + const ensureTextBlock = (index: number): RawAssistantBlock => { + const existing = contentBlocks[index] + if (existing) { + return existing + } + + const block: RawAssistantBlock = { type: 'text', text: '' } + contentBlocks[index] = block + output.push({ + type: 'stream_event', + event: { + type: 'content_block_start', + index, + content_block: { type: 'text', text: '' }, + } as any, + } as StreamEvent) + return block + } + + const ensureToolUseBlock = ( + index: number, + item?: Partial & { id?: string }, + ): RawAssistantBlock => { + const existing = contentBlocks[index] + if (existing) { + return existing + } + + const block = createToolUseBlock(item ?? {}) + contentBlocks[index] = block + const toolBlock = block as Extract + output.push({ + type: 'stream_event', + event: { + type: 'content_block_start', + index, + content_block: { + type: 'tool_use', + id: toolBlock.id, + name: toolBlock.name, + input: '', + }, + } as any, + } as StreamEvent) + return block + } + + const emitCompletedBlock = (index: number): void => { + const block = contentBlocks[index] + if (!block) { + return + } + completedBlocks[index] = { ...block } + output.push({ + type: 'stream_event', + event: { + type: 'content_block_stop', + index, + } as any, + } as StreamEvent) + delete contentBlocks[index] + } + + switch (event.type) { + case 'response.created': + case 'response.in_progress': + ensureMessageStart(event.response) + break + case 'response.output_item.added': + if (event.item.type === 'function_call') { + ensureToolUseBlock(event.output_index, event.item) + } else if (event.item.type === 'message' && event.item.role === 'assistant') { + ensureTextBlock(event.output_index) + } + break + case 'response.output_text.delta': + case 'response.refusal.delta': { + const block = ensureTextBlock(event.output_index) + if (block.type === 'text') { + block.text += event.delta + } + output.push({ + type: 'stream_event', + event: { + type: 'content_block_delta', + index: event.output_index, + delta: { + type: 'text_delta', + text: event.delta, + }, + } as any, + } as StreamEvent) + break + } + case 'response.function_call_arguments.delta': { + const block = ensureToolUseBlock(event.output_index, { id: event.item_id }) + if (block.type === 'tool_use') { + block.input += event.delta + } + output.push({ + type: 'stream_event', + event: { + type: 'content_block_delta', + index: event.output_index, + delta: { + type: 'input_json_delta', + partial_json: event.delta, + }, + } as any, + } as StreamEvent) + break + } + case 'response.output_text.done': + case 'response.refusal.done': { + const block = ensureTextBlock(event.output_index) + const finalText = event.type === 'response.output_text.done' + ? event.text + : event.refusal + if (block.type === 'text') { + emitTrailingTextDelta(output, event.output_index, block.text, finalText) + block.text = finalText + } + emitCompletedBlock(event.output_index) + break + } + case 'response.function_call_arguments.done': { + const block = ensureToolUseBlock(event.output_index, { + id: event.item_id, + name: event.name, + }) + if (block.type === 'tool_use') { + if (event.name) { + block.name = event.name + } + emitTrailingToolDelta(output, event.output_index, block.input, event.arguments) + block.input = event.arguments + } + emitCompletedBlock(event.output_index) + break + } + case 'response.output_item.done': + if ( + event.item.type === 'message' && + event.item.role === 'assistant' && + contentBlocks[event.output_index] + ) { + const finalText = getCompletedTextFromItem(event.item) + if (finalText !== null) { + const block = contentBlocks[event.output_index] + if (block.type === 'text') { + emitTrailingTextDelta(output, event.output_index, block.text, finalText) + block.text = finalText + } + } + emitCompletedBlock(event.output_index) + } else if ( + event.item.type === 'function_call' && + contentBlocks[event.output_index] + ) { + const block = contentBlocks[event.output_index] + if (block.type === 'tool_use') { + block.id = resolveCodexCallId( + event.item.call_id, + `done:${event.item.name}:${event.item.arguments}:${event.item.id}`, + ) + block.name = event.item.name + emitTrailingToolDelta( + output, + event.output_index, + block.input, + event.item.arguments, + ) + block.input = event.item.arguments + } + emitCompletedBlock(event.output_index) + } + break + case 'response.completed': + case 'response.incomplete': { + ensureMessageStart(event.response) + if (event.type === 'response.completed') { + finalResponse = event.response + } else { + incompleteResponse = event.response + } + const assistantBlocks = getCompletedAssistantBlocks(completedBlocks) + output.push({ + type: 'stream_event', + event: { + type: 'message_delta', + delta: { + stop_reason: getCodexStopReason(event.response, assistantBlocks), + stop_sequence: null, + }, + usage: getCodexUsage(event.response), + } as any, + } as StreamEvent) + output.push({ + type: 'stream_event', + event: { + type: 'message_stop', + } as any, + } as StreamEvent) + break + } + case 'response.failed': + failedResponse = event.response + break + case 'error': + throw new Error(event.message) + } + + return { + output, + partialMessage, + finalResponse, + failedResponse, + incompleteResponse, + } +} + +function selectResponse( + state: CodexStreamState, + streamedResponse?: Response, +): CodexStreamResult { + const response = + [streamedResponse, state.finalResponse, state.incompleteResponse, state.failedResponse] + .find( + candidate => + candidate !== undefined && + responseToRawAssistantBlocks(candidate).length > 0, + ) ?? + streamedResponse ?? + state.finalResponse ?? + state.incompleteResponse ?? + state.failedResponse + + return { + response, + incompleteResponse: state.incompleteResponse, + partialMessage: state.partialMessage, + assistantBlocks: + response !== undefined && responseToRawAssistantBlocks(response).length > 0 + ? responseToRawAssistantBlocks(response) + : getCompletedAssistantBlocks(state.completedBlocks), + } +} + +async function consumeCodexStream( + events: AsyncIterable, + start: number, +): Promise { + const state: CodexStreamState = { + contentBlocks: {}, + completedBlocks: [], + } + + for await (const event of events) { + const handled = handleCodexStreamEvent({ + event, + partialMessage: state.partialMessage, + contentBlocks: state.contentBlocks, + completedBlocks: state.completedBlocks, + start, + }) + + state.partialMessage = handled.partialMessage + state.finalResponse = handled.finalResponse ?? state.finalResponse + state.incompleteResponse = + handled.incompleteResponse ?? state.incompleteResponse + state.failedResponse = handled.failedResponse ?? state.failedResponse + } + + return state +} + +export async function* streamCodexAttempt(params: { + client: ReturnType + requestBody: ResponseCreateParamsNonStreaming + signal: AbortSignal + start: number + emitPrimaryEvents?: boolean +}): AsyncGenerator { + let primaryError: unknown + let primaryResult: CodexStreamResult | undefined + + try { + const stream = params.client.responses.stream( + params.requestBody as unknown as Parameters< + typeof params.client.responses.stream + >[0], + { signal: params.signal }, + ) + + const state: CodexStreamState = { + contentBlocks: {}, + completedBlocks: [], + } + + for await (const event of stream) { + const handled = handleCodexStreamEvent({ + event, + partialMessage: state.partialMessage, + contentBlocks: state.contentBlocks, + completedBlocks: state.completedBlocks, + start: params.start, + }) + + state.partialMessage = handled.partialMessage + state.finalResponse = handled.finalResponse ?? state.finalResponse + state.incompleteResponse = + handled.incompleteResponse ?? state.incompleteResponse + state.failedResponse = handled.failedResponse ?? state.failedResponse + + if (params.emitPrimaryEvents !== false) { + yield* handled.output + } + } + + let streamedResponse: Response | undefined + try { + streamedResponse = await stream.finalResponse() + } catch { + streamedResponse = undefined + } + + primaryResult = selectResponse(state, streamedResponse) + if (primaryResult.assistantBlocks.length > 0 || primaryResult.response) { + return primaryResult + } + } catch (error) { + primaryError = error + } + + try { + const fallbackStream = await params.client.responses.create( + toStreamingCodexRequest(params.requestBody), + { signal: params.signal }, + ) + + const fallbackState = await consumeCodexStream( + fallbackStream as AsyncIterable, + params.start, + ) + const fallbackResult = selectResponse(fallbackState) + + if (fallbackResult.assistantBlocks.length > 0 || fallbackResult.response) { + return fallbackResult + } + } catch (fallbackError) { + if (primaryError) { + throw primaryError + } + throw fallbackError + } + + if (primaryError) { + throw primaryError + } + + return primaryResult ?? { + assistantBlocks: [], + } +} diff --git a/src/services/langfuse/tracing.ts b/src/services/langfuse/tracing.ts index da6ed00d1..52f3fc0c6 100644 --- a/src/services/langfuse/tracing.ts +++ b/src/services/langfuse/tracing.ts @@ -57,6 +57,8 @@ const PROVIDER_GENERATION_NAMES: Record = { vertex: 'ChatVertexAnthropic', foundry: 'ChatFoundry', openai: 'ChatOpenAI', + codex: 'ChatCodex', + 'codex-chatgpt': 'ChatCodex', gemini: 'ChatGoogleGenerativeAI', grok: 'ChatXAI', }