From 6536757428aba0d919475560b40adf7770bdba58 Mon Sep 17 00:00:00 2001 From: claude-code-best Date: Sun, 19 Apr 2026 09:09:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AF=B9=E5=85=B6=E4=BB=96=20provider?= =?UTF-8?q?=20=E6=8F=90=E4=BE=9B=20langfuse=20=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/api/gemini/index.ts | 20 ++++++++++++++++++++ src/services/api/grok/index.ts | 22 ++++++++++++++++++++++ src/services/api/openai/index.ts | 24 ++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/src/services/api/gemini/index.ts b/src/services/api/gemini/index.ts index 647eb6493..a9ba6c6db 100644 --- a/src/services/api/gemini/index.ts +++ b/src/services/api/gemini/index.ts @@ -18,6 +18,8 @@ import type { SDKAssistantMessageError } from '../../../entrypoints/agentSdkType import type { SystemPrompt } from '../../../utils/systemPromptType.js' import type { ThinkingConfig } from '../../../utils/thinking.js' import type { Options } from '../claude.js' +import { recordLLMObservation } from '../../../services/langfuse/tracing.js' +import { convertMessagesToLangfuse, convertOutputToLangfuse, convertToolsToLangfuse } from '../../../services/langfuse/convert.js' import { streamGeminiGenerateContent } from './client.js' import { anthropicMessagesToGemini, resolveGeminiModel, adaptGeminiStreamToAnthropic, anthropicToolsToGemini, anthropicToolChoiceToGemini, GEMINI_THOUGHT_SIGNATURE_FIELD } from '@ant/model-provider' @@ -100,6 +102,7 @@ export async function* queryModelGemini( const adaptedStream = adaptGeminiStreamToAnthropic(stream, geminiModel) const contentBlocks: Record = {} + const collectedMessages: AssistantMessage[] = [] let partialMessage: any = undefined let ttftMs = 0 const start = Date.now() @@ -160,6 +163,7 @@ export async function* queryModelGemini( uuid: randomUUID(), timestamp: new Date().toISOString(), } + collectedMessages.push(message) yield message break } @@ -174,6 +178,22 @@ export async function* queryModelGemini( ...(event.type === 'message_start' ? { ttftMs } : undefined), } as StreamEvent } + + // Record LLM observation in Langfuse (no-op if not configured) + recordLLMObservation(options.langfuseTrace ?? null, { + model: geminiModel, + provider: 'gemini', + input: convertMessagesToLangfuse(messagesForAPI, systemPrompt), + output: convertOutputToLangfuse(collectedMessages), + usage: { + input_tokens: 0, + output_tokens: 0, + }, + startTime: new Date(start), + endTime: new Date(), + completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, + tools: convertToolsToLangfuse(toolSchemas as unknown[]), + }) } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error) logForDebugging(`[Gemini] Error: ${errorMessage}`, { level: 'error' }) diff --git a/src/services/api/grok/index.ts b/src/services/api/grok/index.ts index ceb87d77c..93b40d751 100644 --- a/src/services/api/grok/index.ts +++ b/src/services/api/grok/index.ts @@ -14,6 +14,8 @@ import { toolToAPISchema } from '../../../utils/api.js' import { logForDebugging } from '../../../utils/debug.js' import { addToTotalSessionCost } from '../../../cost-tracker.js' import { calculateUSDCost } from '../../../utils/modelCost.js' +import { recordLLMObservation } from '../../../services/langfuse/tracing.js' +import { convertMessagesToLangfuse, convertOutputToLangfuse, convertToolsToLangfuse } from '../../../services/langfuse/convert.js' import type { Options } from '../claude.js' import { randomUUID } from 'crypto' import { @@ -92,6 +94,7 @@ export async function* queryModelGrok( const adaptedStream = adaptOpenAIStreamToAnthropic(stream as AsyncIterable, grokModel) const contentBlocks: Record = {} + const collectedMessages: AssistantMessage[] = [] let partialMessage: any = undefined let usage = { input_tokens: 0, @@ -157,6 +160,7 @@ export async function* queryModelGrok( uuid: randomUUID(), timestamp: new Date().toISOString(), } + collectedMessages.push(m) yield m break } @@ -182,6 +186,24 @@ export async function* queryModelGrok( ...(event.type === 'message_start' ? { ttftMs } : undefined), } as StreamEvent } + + // Record LLM observation in Langfuse (no-op if not configured) + recordLLMObservation(options.langfuseTrace ?? null, { + model: grokModel, + provider: 'grok', + input: convertMessagesToLangfuse(messagesForAPI, systemPrompt), + output: convertOutputToLangfuse(collectedMessages), + usage: { + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cache_creation_input_tokens: usage.cache_creation_input_tokens, + cache_read_input_tokens: usage.cache_read_input_tokens, + }, + startTime: new Date(start), + endTime: new Date(), + completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, + tools: convertToolsToLangfuse(toolSchemas as unknown[]), + }) } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error) logForDebugging(`[Grok] Error: ${errorMessage}`, { level: 'error' }) diff --git a/src/services/api/openai/index.ts b/src/services/api/openai/index.ts index 00b9d9738..0db002225 100644 --- a/src/services/api/openai/index.ts +++ b/src/services/api/openai/index.ts @@ -24,6 +24,8 @@ import { logForDebugging } from '../../../utils/debug.js' import { addToTotalSessionCost } from '../../../cost-tracker.js' import { calculateUSDCost } from '../../../utils/modelCost.js' import { isOpenAIThinkingEnabled, resolveOpenAIMaxTokens, buildOpenAIRequestBody } from './requestBody.js' +import { recordLLMObservation } from '../../../services/langfuse/tracing.js' +import { convertMessagesToLangfuse, convertOutputToLangfuse, convertToolsToLangfuse } from '../../../services/langfuse/convert.js' export { isOpenAIThinkingEnabled, resolveOpenAIMaxTokens, buildOpenAIRequestBody } import { getModelMaxOutputTokens } from '../../../utils/context.js' import type { Options } from '../claude.js' @@ -246,6 +248,7 @@ export async function* queryModelOpenAI( // Accumulate content blocks and usage, same as the Anthropic path in claude.ts const contentBlocks: Record = {} + const collectedMessages: AssistantMessage[] = [] let partialMessage: any let stopReason: string | null = null let usage = { @@ -323,6 +326,9 @@ export async function* queryModelOpenAI( partialMessage, contentBlocks, tools, agentId: options.agentId, usage, stopReason, maxTokens, })) { + if (output.type === 'assistant') { + collectedMessages.push(output) + } yield output } // Reset partialMessage so the post-loop safety fallback does not @@ -346,6 +352,24 @@ export async function* queryModelOpenAI( } as StreamEvent } + // Record LLM observation in Langfuse (no-op if not configured) + recordLLMObservation(options.langfuseTrace ?? null, { + model: openaiModel, + provider: 'openai', + input: convertMessagesToLangfuse(messagesForAPI, systemPrompt), + output: convertOutputToLangfuse(collectedMessages), + usage: { + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cache_creation_input_tokens: usage.cache_creation_input_tokens, + cache_read_input_tokens: usage.cache_read_input_tokens, + }, + startTime: new Date(start), + endTime: new Date(), + completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, + tools: convertToolsToLangfuse(toolSchemas as unknown[]), + }) + // Safety: if stream ended without message_stop, assemble and yield whatever we have if (partialMessage) { for (const output of assembleFinalAssistantOutputs({