diff --git a/src/Tool.ts b/src/Tool.ts index 8c27885d9..dd9966983 100644 --- a/src/Tool.ts +++ b/src/Tool.ts @@ -277,6 +277,8 @@ export type ToolUseContext = { criticalSystemReminder_EXPERIMENTAL?: string /** Langfuse root trace span for this query turn. Passed down to tool execution for observability. */ langfuseTrace?: LangfuseSpan | null + /** Langfuse batch span wrapping a concurrent tool group. When set, tool observations are nested under it. */ + langfuseBatchSpan?: LangfuseSpan | null /** When true, preserve toolUseResult on messages even for subagents. * Used by in-process teammates whose transcripts are viewable by the user. */ preserveToolUseResults?: boolean diff --git a/src/services/langfuse/index.ts b/src/services/langfuse/index.ts index 7cd968643..6d044fa5a 100644 --- a/src/services/langfuse/index.ts +++ b/src/services/langfuse/index.ts @@ -1,4 +1,4 @@ export { initLangfuse, shutdownLangfuse, isLangfuseEnabled, getLangfuseProcessor } from './client.js' -export { createTrace, createSubagentTrace, recordLLMObservation, recordToolObservation, endTrace } from './tracing.js' +export { createTrace, createSubagentTrace, recordLLMObservation, recordToolObservation, endTrace, createToolBatchSpan, endToolBatchSpan } from './tracing.js' export type { LangfuseSpan } from './tracing.js' export { sanitizeToolInput, sanitizeToolOutput, sanitizeGlobal } from './sanitize.js' diff --git a/src/services/langfuse/tracing.ts b/src/services/langfuse/tracing.ts index 02a23c68e..fc37faab0 100644 --- a/src/services/langfuse/tracing.ts +++ b/src/services/langfuse/tracing.ts @@ -117,6 +117,7 @@ export function recordToolObservation( output: string startTime?: Date isError?: boolean + parentBatchSpan?: LangfuseSpan | null }, ): void { if (!rootSpan || !isLangfuseEnabled()) return @@ -124,6 +125,7 @@ export function recordToolObservation( // Use the global startObservation directly instead of rootSpan.startObservation(). // The instance method only forwards asType and drops startTime, // causing tool execution duration to be 0. + const parentSpan = params.parentBatchSpan ?? rootSpan const toolObs = startObservation( params.toolName, { @@ -136,7 +138,7 @@ export function recordToolObservation( { asType: 'tool', ...(params.startTime && { startTime: params.startTime }), - parentSpanContext: rootSpan.otelSpan.spanContext(), + parentSpanContext: parentSpan.otelSpan.spanContext(), }, ) @@ -158,6 +160,55 @@ export function recordToolObservation( } } +/** + * Create a span that wraps a batch of concurrent tool calls. + * Returns the batch span (to be passed as parentBatchSpan to recordToolObservation) + * and must be ended with endToolBatchSpan() after all tools complete. + */ +export function createToolBatchSpan( + rootSpan: LangfuseSpan | null, + params: { toolNames: string[]; batchIndex: number }, +): LangfuseSpan | null { + if (!rootSpan || !isLangfuseEnabled()) return null + try { + const batchSpan = startObservation( + `tools`, + { + metadata: { + toolNames: params.toolNames.join(', '), + toolCount: String(params.toolNames.length), + batchIndex: String(params.batchIndex), + }, + }, + { + asType: 'span', + parentSpanContext: rootSpan.otelSpan.spanContext(), + }, + ) as LangfuseSpan + + const sessionId = (rootSpan as unknown as RootTrace)._sessionId + if (sessionId) { + batchSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId) + } + + logForDebugging(`[langfuse] Tool batch span created: ${batchSpan.id} (tools=${params.toolNames.join(',')})`) + return batchSpan + } catch (e) { + logForDebugging(`[langfuse] createToolBatchSpan failed: ${e}`, { level: 'error' }) + return null + } +} + +export function endToolBatchSpan(batchSpan: LangfuseSpan | null): void { + if (!batchSpan) return + try { + batchSpan.end() + logForDebugging(`[langfuse] Tool batch span ended: ${batchSpan.id}`) + } catch (e) { + logForDebugging(`[langfuse] endToolBatchSpan failed: ${e}`, { level: 'error' }) + } +} + export function createSubagentTrace(params: { sessionId: string agentType: string @@ -187,14 +238,20 @@ export function createSubagentTrace(params: { } } -export function endTrace(rootSpan: LangfuseSpan | null, output?: unknown): void { +export function endTrace( + rootSpan: LangfuseSpan | null, + output?: unknown, + status?: 'interrupted' | 'error', +): void { if (!rootSpan) return try { - if (output !== undefined) { - rootSpan.update({ output }) - } + const updatePayload: Record = {} + if (output !== undefined) updatePayload.output = output + if (status === 'interrupted') updatePayload.level = 'WARNING' + else if (status === 'error') updatePayload.level = 'ERROR' + if (Object.keys(updatePayload).length > 0) rootSpan.update(updatePayload) rootSpan.end() - logForDebugging(`[langfuse] Trace ended: ${rootSpan.id}`) + logForDebugging(`[langfuse] Trace ended: ${rootSpan.id}${status ? ` (${status})` : ''}`) } catch (e) { logForDebugging(`[langfuse] endTrace failed: ${e}`, { level: 'error' }) } diff --git a/src/services/tools/StreamingToolExecutor.ts b/src/services/tools/StreamingToolExecutor.ts index ce7911c4b..b924fdd91 100644 --- a/src/services/tools/StreamingToolExecutor.ts +++ b/src/services/tools/StreamingToolExecutor.ts @@ -10,6 +10,8 @@ import { BASH_TOOL_NAME } from '@claude-code-best/builtin-tools/tools/BashTool/t import type { AssistantMessage, Message } from '../../types/message.js' import { createChildAbortController } from '../../utils/abortController.js' import { runToolUse } from './toolExecution.js' +import { createToolBatchSpan, endToolBatchSpan } from '../langfuse/index.js' +import type { LangfuseSpan } from '../langfuse/index.js' type MessageUpdate = { message?: Message @@ -42,13 +44,10 @@ export class StreamingToolExecutor { private toolUseContext: ToolUseContext private hasErrored = false private erroredToolDescription = '' - // Child of toolUseContext.abortController. Fires when a Bash tool errors - // so sibling subprocesses die immediately instead of running to completion. - // Aborting this does NOT abort the parent — query.ts won't end the turn. private siblingAbortController: AbortController private discarded = false - // Signal to wake up getRemainingResults when progress is available private progressAvailableResolve?: () => void + private turnSpan: LangfuseSpan | null = null constructor( private readonly toolDefinitions: Tools, @@ -74,6 +73,16 @@ export class StreamingToolExecutor { * Add a tool to the execution queue. Will start executing immediately if conditions allow. */ addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void { + // Create turn span on first tool — will be ended in getRemainingResults + if (this.tools.length === 0 && this.turnSpan === null) { + this.turnSpan = createToolBatchSpan( + this.toolUseContext.langfuseTrace ?? null, + { toolNames: [block.name], batchIndex: 0 }, + ) + if (this.turnSpan) { + this.toolUseContext = { ...this.toolUseContext, langfuseBatchSpan: this.turnSpan } + } + } const toolDefinition = findToolByName(this.toolDefinitions, block.name) if (!toolDefinition) { this.tools.push({ @@ -487,6 +496,9 @@ export class StreamingToolExecutor { for (const result of this.getCompletedResults()) { yield result } + + endToolBatchSpan(this.turnSpan) + this.turnSpan = null } /** diff --git a/src/services/tools/toolExecution.ts b/src/services/tools/toolExecution.ts index 89a4180cb..97852b2ad 100644 --- a/src/services/tools/toolExecution.ts +++ b/src/services/tools/toolExecution.ts @@ -1309,6 +1309,7 @@ async function checkPermissionsAndCallTool( output: toolResultStr, startTime: new Date(startTime), isError: false, + parentBatchSpan: toolUseContext.langfuseBatchSpan, }) // Map the tool result to API format once and cache it. This block is reused @@ -1628,6 +1629,7 @@ async function checkPermissionsAndCallTool( output: errorMessage(error), startTime: new Date(startTime), isError: true, + parentBatchSpan: toolUseContext.langfuseBatchSpan, }) // Handle MCP auth errors by updating the client status to 'needs-auth' diff --git a/src/services/tools/toolOrchestration.ts b/src/services/tools/toolOrchestration.ts index 2ddc948b5..9e5d52449 100644 --- a/src/services/tools/toolOrchestration.ts +++ b/src/services/tools/toolOrchestration.ts @@ -4,6 +4,7 @@ import { findToolByName, type ToolUseContext } from '../../Tool.js' import type { AssistantMessage, Message } from '../../types/message.js' import { all } from '../../utils/generators.js' import { type MessageUpdateLazy, runToolUse } from './toolExecution.js' +import { createToolBatchSpan, endToolBatchSpan } from '../langfuse/index.js' function getMaxToolUseConcurrency(): number { return ( @@ -22,7 +23,18 @@ export async function* runTools( canUseTool: CanUseToolFn, toolUseContext: ToolUseContext, ): AsyncGenerator { - let currentContext = toolUseContext + // Wrap all tool calls in this turn under a single Langfuse turn span + const turnSpan = toolUseMessages.length > 0 + ? createToolBatchSpan(toolUseContext.langfuseTrace ?? null, { + toolNames: toolUseMessages.map(b => b.name), + batchIndex: 0, + }) + : null + const contextWithTurn = turnSpan + ? { ...toolUseContext, langfuseBatchSpan: turnSpan } + : toolUseContext + + let currentContext = contextWithTurn for (const { isConcurrencySafe, blocks } of partitionToolCalls( toolUseMessages, currentContext, @@ -79,6 +91,8 @@ export async function* runTools( } } } + + endToolBatchSpan(turnSpan) } type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] }