feat: langfuse 工具调用显示为嵌套结构

This commit is contained in:
claude-code-best
2026-04-13 18:05:13 +08:00
parent d4b30d32c3
commit 05cabbbd73
6 changed files with 99 additions and 12 deletions

View File

@@ -277,6 +277,8 @@ export type ToolUseContext = {
criticalSystemReminder_EXPERIMENTAL?: string
/** Langfuse root trace span for this query turn. Passed down to tool execution for observability. */
langfuseTrace?: LangfuseSpan | null
/** Langfuse batch span wrapping a concurrent tool group. When set, tool observations are nested under it. */
langfuseBatchSpan?: LangfuseSpan | null
/** When true, preserve toolUseResult on messages even for subagents.
* Used by in-process teammates whose transcripts are viewable by the user. */
preserveToolUseResults?: boolean

View File

@@ -1,4 +1,4 @@
export { initLangfuse, shutdownLangfuse, isLangfuseEnabled, getLangfuseProcessor } from './client.js'
export { createTrace, createSubagentTrace, recordLLMObservation, recordToolObservation, endTrace } from './tracing.js'
export { createTrace, createSubagentTrace, recordLLMObservation, recordToolObservation, endTrace, createToolBatchSpan, endToolBatchSpan } from './tracing.js'
export type { LangfuseSpan } from './tracing.js'
export { sanitizeToolInput, sanitizeToolOutput, sanitizeGlobal } from './sanitize.js'

View File

@@ -117,6 +117,7 @@ export function recordToolObservation(
output: string
startTime?: Date
isError?: boolean
parentBatchSpan?: LangfuseSpan | null
},
): void {
if (!rootSpan || !isLangfuseEnabled()) return
@@ -124,6 +125,7 @@ export function recordToolObservation(
// Use the global startObservation directly instead of rootSpan.startObservation().
// The instance method only forwards asType and drops startTime,
// causing tool execution duration to be 0.
const parentSpan = params.parentBatchSpan ?? rootSpan
const toolObs = startObservation(
params.toolName,
{
@@ -136,7 +138,7 @@ export function recordToolObservation(
{
asType: 'tool',
...(params.startTime && { startTime: params.startTime }),
parentSpanContext: rootSpan.otelSpan.spanContext(),
parentSpanContext: parentSpan.otelSpan.spanContext(),
},
)
@@ -158,6 +160,55 @@ export function recordToolObservation(
}
}
/**
* Create a span that wraps a batch of concurrent tool calls.
* Returns the batch span (to be passed as parentBatchSpan to recordToolObservation)
* and must be ended with endToolBatchSpan() after all tools complete.
*/
export function createToolBatchSpan(
rootSpan: LangfuseSpan | null,
params: { toolNames: string[]; batchIndex: number },
): LangfuseSpan | null {
if (!rootSpan || !isLangfuseEnabled()) return null
try {
const batchSpan = startObservation(
`tools`,
{
metadata: {
toolNames: params.toolNames.join(', '),
toolCount: String(params.toolNames.length),
batchIndex: String(params.batchIndex),
},
},
{
asType: 'span',
parentSpanContext: rootSpan.otelSpan.spanContext(),
},
) as LangfuseSpan
const sessionId = (rootSpan as unknown as RootTrace)._sessionId
if (sessionId) {
batchSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId)
}
logForDebugging(`[langfuse] Tool batch span created: ${batchSpan.id} (tools=${params.toolNames.join(',')})`)
return batchSpan
} catch (e) {
logForDebugging(`[langfuse] createToolBatchSpan failed: ${e}`, { level: 'error' })
return null
}
}
export function endToolBatchSpan(batchSpan: LangfuseSpan | null): void {
if (!batchSpan) return
try {
batchSpan.end()
logForDebugging(`[langfuse] Tool batch span ended: ${batchSpan.id}`)
} catch (e) {
logForDebugging(`[langfuse] endToolBatchSpan failed: ${e}`, { level: 'error' })
}
}
export function createSubagentTrace(params: {
sessionId: string
agentType: string
@@ -187,14 +238,20 @@ export function createSubagentTrace(params: {
}
}
export function endTrace(rootSpan: LangfuseSpan | null, output?: unknown): void {
export function endTrace(
rootSpan: LangfuseSpan | null,
output?: unknown,
status?: 'interrupted' | 'error',
): void {
if (!rootSpan) return
try {
if (output !== undefined) {
rootSpan.update({ output })
}
const updatePayload: Record<string, unknown> = {}
if (output !== undefined) updatePayload.output = output
if (status === 'interrupted') updatePayload.level = 'WARNING'
else if (status === 'error') updatePayload.level = 'ERROR'
if (Object.keys(updatePayload).length > 0) rootSpan.update(updatePayload)
rootSpan.end()
logForDebugging(`[langfuse] Trace ended: ${rootSpan.id}`)
logForDebugging(`[langfuse] Trace ended: ${rootSpan.id}${status ? ` (${status})` : ''}`)
} catch (e) {
logForDebugging(`[langfuse] endTrace failed: ${e}`, { level: 'error' })
}

View File

@@ -10,6 +10,8 @@ import { BASH_TOOL_NAME } from '@claude-code-best/builtin-tools/tools/BashTool/t
import type { AssistantMessage, Message } from '../../types/message.js'
import { createChildAbortController } from '../../utils/abortController.js'
import { runToolUse } from './toolExecution.js'
import { createToolBatchSpan, endToolBatchSpan } from '../langfuse/index.js'
import type { LangfuseSpan } from '../langfuse/index.js'
type MessageUpdate = {
message?: Message
@@ -42,13 +44,10 @@ export class StreamingToolExecutor {
private toolUseContext: ToolUseContext
private hasErrored = false
private erroredToolDescription = ''
// Child of toolUseContext.abortController. Fires when a Bash tool errors
// so sibling subprocesses die immediately instead of running to completion.
// Aborting this does NOT abort the parent — query.ts won't end the turn.
private siblingAbortController: AbortController
private discarded = false
// Signal to wake up getRemainingResults when progress is available
private progressAvailableResolve?: () => void
private turnSpan: LangfuseSpan | null = null
constructor(
private readonly toolDefinitions: Tools,
@@ -74,6 +73,16 @@ export class StreamingToolExecutor {
* Add a tool to the execution queue. Will start executing immediately if conditions allow.
*/
addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
// Create turn span on first tool — will be ended in getRemainingResults
if (this.tools.length === 0 && this.turnSpan === null) {
this.turnSpan = createToolBatchSpan(
this.toolUseContext.langfuseTrace ?? null,
{ toolNames: [block.name], batchIndex: 0 },
)
if (this.turnSpan) {
this.toolUseContext = { ...this.toolUseContext, langfuseBatchSpan: this.turnSpan }
}
}
const toolDefinition = findToolByName(this.toolDefinitions, block.name)
if (!toolDefinition) {
this.tools.push({
@@ -487,6 +496,9 @@ export class StreamingToolExecutor {
for (const result of this.getCompletedResults()) {
yield result
}
endToolBatchSpan(this.turnSpan)
this.turnSpan = null
}
/**

View File

@@ -1309,6 +1309,7 @@ async function checkPermissionsAndCallTool(
output: toolResultStr,
startTime: new Date(startTime),
isError: false,
parentBatchSpan: toolUseContext.langfuseBatchSpan,
})
// Map the tool result to API format once and cache it. This block is reused
@@ -1628,6 +1629,7 @@ async function checkPermissionsAndCallTool(
output: errorMessage(error),
startTime: new Date(startTime),
isError: true,
parentBatchSpan: toolUseContext.langfuseBatchSpan,
})
// Handle MCP auth errors by updating the client status to 'needs-auth'

View File

@@ -4,6 +4,7 @@ import { findToolByName, type ToolUseContext } from '../../Tool.js'
import type { AssistantMessage, Message } from '../../types/message.js'
import { all } from '../../utils/generators.js'
import { type MessageUpdateLazy, runToolUse } from './toolExecution.js'
import { createToolBatchSpan, endToolBatchSpan } from '../langfuse/index.js'
function getMaxToolUseConcurrency(): number {
return (
@@ -22,7 +23,18 @@ export async function* runTools(
canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
let currentContext = toolUseContext
// Wrap all tool calls in this turn under a single Langfuse turn span
const turnSpan = toolUseMessages.length > 0
? createToolBatchSpan(toolUseContext.langfuseTrace ?? null, {
toolNames: toolUseMessages.map(b => b.name),
batchIndex: 0,
})
: null
const contextWithTurn = turnSpan
? { ...toolUseContext, langfuseBatchSpan: turnSpan }
: toolUseContext
let currentContext = contextWithTurn
for (const { isConcurrencySafe, blocks } of partitionToolCalls(
toolUseMessages,
currentContext,
@@ -79,6 +91,8 @@ export async function* runTools(
}
}
}
endToolBatchSpan(turnSpan)
}
type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] }