feat: 对其他 provider 提供 langfuse 监控

This commit is contained in:
claude-code-best
2026-04-19 09:09:27 +08:00
parent a0dc4540ca
commit 6536757428
3 changed files with 66 additions and 0 deletions

View File

@@ -18,6 +18,8 @@ import type { SDKAssistantMessageError } from '../../../entrypoints/agentSdkType
import type { SystemPrompt } from '../../../utils/systemPromptType.js'
import type { ThinkingConfig } from '../../../utils/thinking.js'
import type { Options } from '../claude.js'
import { recordLLMObservation } from '../../../services/langfuse/tracing.js'
import { convertMessagesToLangfuse, convertOutputToLangfuse, convertToolsToLangfuse } from '../../../services/langfuse/convert.js'
import { streamGeminiGenerateContent } from './client.js'
import { anthropicMessagesToGemini, resolveGeminiModel, adaptGeminiStreamToAnthropic, anthropicToolsToGemini, anthropicToolChoiceToGemini, GEMINI_THOUGHT_SIGNATURE_FIELD } from '@ant/model-provider'
@@ -100,6 +102,7 @@ export async function* queryModelGemini(
const adaptedStream = adaptGeminiStreamToAnthropic(stream, geminiModel)
const contentBlocks: Record<number, any> = {}
const collectedMessages: AssistantMessage[] = []
let partialMessage: any = undefined
let ttftMs = 0
const start = Date.now()
@@ -160,6 +163,7 @@ export async function* queryModelGemini(
uuid: randomUUID(),
timestamp: new Date().toISOString(),
}
collectedMessages.push(message)
yield message
break
}
@@ -174,6 +178,22 @@ export async function* queryModelGemini(
...(event.type === 'message_start' ? { ttftMs } : undefined),
} as StreamEvent
}
// Record LLM observation in Langfuse (no-op if not configured)
recordLLMObservation(options.langfuseTrace ?? null, {
model: geminiModel,
provider: 'gemini',
input: convertMessagesToLangfuse(messagesForAPI, systemPrompt),
output: convertOutputToLangfuse(collectedMessages),
usage: {
input_tokens: 0,
output_tokens: 0,
},
startTime: new Date(start),
endTime: new Date(),
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]),
})
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logForDebugging(`[Gemini] Error: ${errorMessage}`, { level: 'error' })

View File

@@ -14,6 +14,8 @@ import { toolToAPISchema } from '../../../utils/api.js'
import { logForDebugging } from '../../../utils/debug.js'
import { addToTotalSessionCost } from '../../../cost-tracker.js'
import { calculateUSDCost } from '../../../utils/modelCost.js'
import { recordLLMObservation } from '../../../services/langfuse/tracing.js'
import { convertMessagesToLangfuse, convertOutputToLangfuse, convertToolsToLangfuse } from '../../../services/langfuse/convert.js'
import type { Options } from '../claude.js'
import { randomUUID } from 'crypto'
import {
@@ -92,6 +94,7 @@ export async function* queryModelGrok(
const adaptedStream = adaptOpenAIStreamToAnthropic(stream as AsyncIterable<ChatCompletionChunk>, grokModel)
const contentBlocks: Record<number, any> = {}
const collectedMessages: AssistantMessage[] = []
let partialMessage: any = undefined
let usage = {
input_tokens: 0,
@@ -157,6 +160,7 @@ export async function* queryModelGrok(
uuid: randomUUID(),
timestamp: new Date().toISOString(),
}
collectedMessages.push(m)
yield m
break
}
@@ -182,6 +186,24 @@ export async function* queryModelGrok(
...(event.type === 'message_start' ? { ttftMs } : undefined),
} as StreamEvent
}
// Record LLM observation in Langfuse (no-op if not configured)
recordLLMObservation(options.langfuseTrace ?? null, {
model: grokModel,
provider: 'grok',
input: convertMessagesToLangfuse(messagesForAPI, systemPrompt),
output: convertOutputToLangfuse(collectedMessages),
usage: {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cache_creation_input_tokens: usage.cache_creation_input_tokens,
cache_read_input_tokens: usage.cache_read_input_tokens,
},
startTime: new Date(start),
endTime: new Date(),
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]),
})
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logForDebugging(`[Grok] Error: ${errorMessage}`, { level: 'error' })

View File

@@ -24,6 +24,8 @@ import { logForDebugging } from '../../../utils/debug.js'
import { addToTotalSessionCost } from '../../../cost-tracker.js'
import { calculateUSDCost } from '../../../utils/modelCost.js'
import { isOpenAIThinkingEnabled, resolveOpenAIMaxTokens, buildOpenAIRequestBody } from './requestBody.js'
import { recordLLMObservation } from '../../../services/langfuse/tracing.js'
import { convertMessagesToLangfuse, convertOutputToLangfuse, convertToolsToLangfuse } from '../../../services/langfuse/convert.js'
export { isOpenAIThinkingEnabled, resolveOpenAIMaxTokens, buildOpenAIRequestBody }
import { getModelMaxOutputTokens } from '../../../utils/context.js'
import type { Options } from '../claude.js'
@@ -246,6 +248,7 @@ export async function* queryModelOpenAI(
// Accumulate content blocks and usage, same as the Anthropic path in claude.ts
const contentBlocks: Record<number, any> = {}
const collectedMessages: AssistantMessage[] = []
let partialMessage: any
let stopReason: string | null = null
let usage = {
@@ -323,6 +326,9 @@ export async function* queryModelOpenAI(
partialMessage, contentBlocks, tools, agentId: options.agentId,
usage, stopReason, maxTokens,
})) {
if (output.type === 'assistant') {
collectedMessages.push(output)
}
yield output
}
// Reset partialMessage so the post-loop safety fallback does not
@@ -346,6 +352,24 @@ export async function* queryModelOpenAI(
} as StreamEvent
}
// Record LLM observation in Langfuse (no-op if not configured)
recordLLMObservation(options.langfuseTrace ?? null, {
model: openaiModel,
provider: 'openai',
input: convertMessagesToLangfuse(messagesForAPI, systemPrompt),
output: convertOutputToLangfuse(collectedMessages),
usage: {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cache_creation_input_tokens: usage.cache_creation_input_tokens,
cache_read_input_tokens: usage.cache_read_input_tokens,
},
startTime: new Date(start),
endTime: new Date(),
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]),
})
// Safety: if stream ended without message_stop, assemble and yield whatever we have
if (partialMessage) {
for (const output of assembleFinalAssistantOutputs({