mirror of
https://github.com/claude-code-best/claude-code.git
synced 2026-06-17 05:45:51 +00:00
feat: 添加对 langfuse 监控的支持 (#242)
* docs: 更新类型检查的 CLAUDE.md * feat: 添加模型 1M 上下文切换 * chore: remove prefetchOfficialMcpUrls call on startup * docs: 添加 git commit 规范 * feat: 第一次接入 langfuse * fix: 修复 generation 的计时的错误 * feat: 添加多 agent 的监控 * feat: 添加 /poor 省流模式,toggle 关闭 extract_memories 和 prompt_suggestion Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: 修复 lock 文件 * chore: 更新类型依赖 --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -228,6 +228,9 @@ import {
|
||||
} from '../compact/microCompact.js'
|
||||
import { getInitializationStatus } from '../lsp/manager.js'
|
||||
import { isToolFromMcpServer } from '../mcp/utils.js'
|
||||
import { recordLLMObservation } from '../langfuse/index.js'
|
||||
import type { LangfuseSpan } from '../langfuse/index.js'
|
||||
import { convertMessagesToLangfuse, convertOutputToLangfuse } from '../langfuse/convert.js'
|
||||
import { withStreamingVCR, withVCR } from '../vcr.js'
|
||||
import { CLIENT_REQUEST_ID_HEADER, getAnthropicClient } from './client.js'
|
||||
import {
|
||||
@@ -717,6 +720,8 @@ export type Options = {
|
||||
// so the model can pace itself. `remaining` is computed by the caller
|
||||
// (query.ts decrements across the agentic loop).
|
||||
taskBudget?: { total: number; remaining?: number }
|
||||
/** Langfuse root trace span for observability. No-op if null/undefined. */
|
||||
langfuseTrace?: LangfuseSpan | null
|
||||
}
|
||||
|
||||
export async function queryModelWithoutStreaming({
|
||||
@@ -2895,6 +2900,19 @@ async function* queryModel(
|
||||
// limit) until getToolPermissionContext() resolves.
|
||||
const logMessageCount = messagesForAPI.length
|
||||
const logMessageTokens = tokenCountFromLastAPIResponse(messagesForAPI)
|
||||
|
||||
// Record LLM observation in Langfuse (no-op if not configured)
|
||||
recordLLMObservation(options.langfuseTrace ?? null, {
|
||||
model: resolvedModel,
|
||||
provider: getAPIProvider(),
|
||||
input: convertMessagesToLangfuse(messagesForAPI, systemPrompt),
|
||||
output: convertOutputToLangfuse(newMessages),
|
||||
usage: { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens },
|
||||
startTime: new Date(startIncludingRetries),
|
||||
endTime: new Date(),
|
||||
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
|
||||
})
|
||||
|
||||
void options.getToolPermissionContext().then(permissionContext => {
|
||||
logAPISuccessAndDuration({
|
||||
model:
|
||||
|
||||
568
src/services/langfuse/__tests__/langfuse.test.ts
Normal file
568
src/services/langfuse/__tests__/langfuse.test.ts
Normal file
@@ -0,0 +1,568 @@
|
||||
import { mock, describe, test, expect, beforeEach } from 'bun:test'
|
||||
|
||||
// Mock @langfuse/otel before any imports
|
||||
const mockForceFlush = mock(() => Promise.resolve())
|
||||
const mockShutdown = mock(() => Promise.resolve())
|
||||
|
||||
mock.module('@langfuse/otel', () => ({
|
||||
LangfuseSpanProcessor: class MockLangfuseSpanProcessor {
|
||||
forceFlush = mockForceFlush
|
||||
shutdown = mockShutdown
|
||||
onStart = mock(() => {})
|
||||
onEnd = mock(() => {})
|
||||
},
|
||||
}))
|
||||
|
||||
// Mock @opentelemetry/sdk-trace-base
|
||||
mock.module('@opentelemetry/sdk-trace-base', () => ({
|
||||
BasicTracerProvider: class MockBasicTracerProvider {
|
||||
constructor(_opts?: unknown) {}
|
||||
},
|
||||
}))
|
||||
|
||||
// Mock @langfuse/tracing
|
||||
const mockChildUpdate = mock(() => {})
|
||||
const mockChildEnd = mock(() => {})
|
||||
const mockRootUpdate = mock(() => {})
|
||||
const mockRootEnd = mock(() => {})
|
||||
|
||||
// Mock LangfuseOtelSpanAttributes (re-exported from @langfuse/core)
|
||||
const mockLangfuseOtelSpanAttributes: Record<string, string> = {
|
||||
TRACE_SESSION_ID: 'session.id',
|
||||
OBSERVATION_TYPE: 'observation.type',
|
||||
OBSERVATION_INPUT: 'observation.input',
|
||||
OBSERVATION_OUTPUT: 'observation.output',
|
||||
OBSERVATION_MODEL: 'observation.model',
|
||||
OBSERVATION_COMPLETION_START_TIME: 'observation.completionStartTime',
|
||||
OBSERVATION_USAGE_DETAILS: 'observation.usageDetails',
|
||||
}
|
||||
|
||||
const mockSpanContext = { traceId: 'test-trace-id', spanId: 'test-span-id', traceFlags: 1 }
|
||||
const mockSetAttribute = mock(() => {})
|
||||
|
||||
// Child observation mock (returned by rootSpan.startObservation for tools)
|
||||
const mockChildStartObservation = mock(() => ({
|
||||
id: 'child-id',
|
||||
update: mockChildUpdate,
|
||||
end: mockChildEnd,
|
||||
}))
|
||||
|
||||
const mockStartObservation = mock(() => ({
|
||||
id: 'test-span-id',
|
||||
traceId: 'test-trace-id',
|
||||
type: 'span',
|
||||
otelSpan: {
|
||||
spanContext: () => mockSpanContext,
|
||||
setAttribute: mockSetAttribute,
|
||||
},
|
||||
update: mockRootUpdate,
|
||||
end: mockRootEnd,
|
||||
// Instance method — used by recordToolObservation
|
||||
startObservation: mockChildStartObservation,
|
||||
}))
|
||||
const mockSetLangfuseTracerProvider = mock(() => {})
|
||||
|
||||
mock.module('@langfuse/tracing', () => ({
|
||||
startObservation: mockStartObservation,
|
||||
LangfuseOtelSpanAttributes: mockLangfuseOtelSpanAttributes,
|
||||
propagateAttributes: mock((_params: unknown, fn?: () => void) => fn?.()),
|
||||
setLangfuseTracerProvider: mockSetLangfuseTracerProvider,
|
||||
}))
|
||||
|
||||
// Mock debug logger
|
||||
mock.module('src/utils/debug.js', () => ({
|
||||
logForDebugging: mock(() => {}),
|
||||
}))
|
||||
|
||||
describe('Langfuse integration', () => {
|
||||
beforeEach(() => {
|
||||
// Reset env
|
||||
delete process.env.LANGFUSE_PUBLIC_KEY
|
||||
delete process.env.LANGFUSE_SECRET_KEY
|
||||
delete process.env.LANGFUSE_BASE_URL
|
||||
mockStartObservation.mockClear()
|
||||
mockChildStartObservation.mockClear()
|
||||
mockChildUpdate.mockClear()
|
||||
mockChildEnd.mockClear()
|
||||
mockRootUpdate.mockClear()
|
||||
mockRootEnd.mockClear()
|
||||
mockForceFlush.mockClear()
|
||||
mockShutdown.mockClear()
|
||||
mockSetAttribute.mockClear()
|
||||
})
|
||||
|
||||
// ── sanitize tests ──────────────────────────────────────────────────────────
|
||||
|
||||
describe('sanitizeToolInput', () => {
|
||||
test('replaces home dir in file_path', async () => {
|
||||
const { sanitizeToolInput } = await import('../sanitize.js')
|
||||
const home = process.env.HOME ?? '/Users/testuser'
|
||||
const result = sanitizeToolInput('FileReadTool', { file_path: `${home}/project/file.ts` }) as Record<string, string>
|
||||
expect(result.file_path).toBe('~/project/file.ts')
|
||||
})
|
||||
|
||||
test('redacts sensitive keys', async () => {
|
||||
const { sanitizeToolInput } = await import('../sanitize.js')
|
||||
const result = sanitizeToolInput('MCPTool', { api_key: 'secret123', token: 'abc' }) as Record<string, string>
|
||||
expect(result.api_key).toBe('[REDACTED]')
|
||||
expect(result.token).toBe('[REDACTED]')
|
||||
})
|
||||
|
||||
test('returns non-object input unchanged', async () => {
|
||||
const { sanitizeToolInput } = await import('../sanitize.js')
|
||||
expect(sanitizeToolInput('BashTool', 'raw string')).toBe('raw string')
|
||||
expect(sanitizeToolInput('BashTool', null)).toBe(null)
|
||||
})
|
||||
})
|
||||
|
||||
describe('sanitizeToolOutput', () => {
|
||||
test('redacts FileReadTool output', async () => {
|
||||
const { sanitizeToolOutput } = await import('../sanitize.js')
|
||||
const result = sanitizeToolOutput('FileReadTool', 'file content here')
|
||||
expect(result).toBe('[file content redacted, 17 chars]')
|
||||
})
|
||||
|
||||
test('redacts FileWriteTool output', async () => {
|
||||
const { sanitizeToolOutput } = await import('../sanitize.js')
|
||||
const result = sanitizeToolOutput('FileWriteTool', 'written content')
|
||||
expect(result).toBe('[file content redacted, 15 chars]')
|
||||
})
|
||||
|
||||
test('truncates BashTool output over 500 chars', async () => {
|
||||
const { sanitizeToolOutput } = await import('../sanitize.js')
|
||||
const longOutput = 'x'.repeat(600)
|
||||
const result = sanitizeToolOutput('BashTool', longOutput)
|
||||
expect(result).toContain('[truncated]')
|
||||
expect(result.length).toBeLessThan(600)
|
||||
})
|
||||
|
||||
test('does not truncate BashTool output under 500 chars', async () => {
|
||||
const { sanitizeToolOutput } = await import('../sanitize.js')
|
||||
const shortOutput = 'hello world'
|
||||
expect(sanitizeToolOutput('BashTool', shortOutput)).toBe('hello world')
|
||||
})
|
||||
|
||||
test('redacts ConfigTool output', async () => {
|
||||
const { sanitizeToolOutput } = await import('../sanitize.js')
|
||||
const result = sanitizeToolOutput('ConfigTool', 'config data')
|
||||
expect(result).toBe('[ConfigTool output redacted, 11 chars]')
|
||||
})
|
||||
|
||||
test('redacts MCPTool output', async () => {
|
||||
const { sanitizeToolOutput } = await import('../sanitize.js')
|
||||
const result = sanitizeToolOutput('MCPTool', 'mcp data')
|
||||
expect(result).toBe('[MCPTool output redacted, 8 chars]')
|
||||
})
|
||||
})
|
||||
|
||||
describe('sanitizeGlobal', () => {
|
||||
test('replaces home dir in strings', async () => {
|
||||
const { sanitizeGlobal } = await import('../sanitize.js')
|
||||
const home = process.env.HOME ?? '/Users/testuser'
|
||||
expect(sanitizeGlobal(`path: ${home}/file`)).toBe('path: ~/file')
|
||||
})
|
||||
|
||||
test('recursively sanitizes nested objects', async () => {
|
||||
const { sanitizeGlobal } = await import('../sanitize.js')
|
||||
const result = sanitizeGlobal({ nested: { api_key: 'secret', name: 'test' } }) as Record<string, Record<string, string>>
|
||||
expect(result.nested.api_key).toBe('[REDACTED]')
|
||||
expect(result.nested.name).toBe('test')
|
||||
})
|
||||
|
||||
test('returns non-string/object values unchanged', async () => {
|
||||
const { sanitizeGlobal } = await import('../sanitize.js')
|
||||
expect(sanitizeGlobal(42)).toBe(42)
|
||||
expect(sanitizeGlobal(true)).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
// ── client tests ────────────────────────────────────────────────────────────
|
||||
|
||||
describe('isLangfuseEnabled', () => {
|
||||
test('returns false when keys not configured', async () => {
|
||||
const { isLangfuseEnabled } = await import('../client.js')
|
||||
expect(isLangfuseEnabled()).toBe(false)
|
||||
})
|
||||
|
||||
test('returns true when both keys are set', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { isLangfuseEnabled } = await import('../client.js')
|
||||
expect(isLangfuseEnabled()).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('initLangfuse', () => {
|
||||
test('returns false when keys not configured', async () => {
|
||||
const { initLangfuse } = await import('../client.js')
|
||||
expect(initLangfuse()).toBe(false)
|
||||
})
|
||||
|
||||
test('returns true when keys are configured', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
// client.js is a singleton — test via isLangfuseEnabled which reads env directly
|
||||
const { isLangfuseEnabled } = await import('../client.js')
|
||||
expect(isLangfuseEnabled()).toBe(true)
|
||||
})
|
||||
|
||||
test('is idempotent — multiple calls do not re-initialize', async () => {
|
||||
// client.js singleton: once processor is set, initLangfuse returns true immediately
|
||||
// We verify this by checking that calling it multiple times doesn't throw
|
||||
const { initLangfuse } = await import('../client.js')
|
||||
expect(() => { initLangfuse(); initLangfuse() }).not.toThrow()
|
||||
})
|
||||
})
|
||||
|
||||
describe('shutdownLangfuse', () => {
|
||||
test('calls forceFlush and shutdown on processor', async () => {
|
||||
// Verify shutdown is callable without error even when no processor is set
|
||||
const { shutdownLangfuse } = await import('../client.js')
|
||||
await expect(shutdownLangfuse()).resolves.toBeUndefined()
|
||||
})
|
||||
})
|
||||
|
||||
// ── tracing tests ───────────────────────────────────────────────────────────
|
||||
|
||||
describe('createTrace', () => {
|
||||
test('returns null when langfuse not enabled', async () => {
|
||||
const { createTrace } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
expect(span).toBeNull()
|
||||
})
|
||||
|
||||
test('creates root span when enabled', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty', input: [] })
|
||||
expect(span).not.toBeNull()
|
||||
expect(mockStartObservation).toHaveBeenCalledWith('agent-run', expect.objectContaining({
|
||||
metadata: expect.objectContaining({ provider: 'firstParty', model: 'claude-3' }),
|
||||
}), { asType: 'agent' })
|
||||
})
|
||||
})
|
||||
|
||||
describe('recordLLMObservation', () => {
|
||||
test('no-ops when rootSpan is null', async () => {
|
||||
const { recordLLMObservation } = await import('../tracing.js')
|
||||
recordLLMObservation(null, { model: 'm', provider: 'firstParty', input: [], output: [], usage: { input_tokens: 10, output_tokens: 5 } })
|
||||
expect(mockStartObservation).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
|
||||
test('records generation child observation via global startObservation', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, recordLLMObservation } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
mockStartObservation.mockClear()
|
||||
recordLLMObservation(span, {
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
input: [{ role: 'user', content: 'hello' }],
|
||||
output: [{ role: 'assistant', content: 'hi' }],
|
||||
usage: { input_tokens: 10, output_tokens: 5 },
|
||||
})
|
||||
// Should call the global startObservation with asType: 'generation' and parentSpanContext
|
||||
expect(mockStartObservation).toHaveBeenCalledWith('ChatAnthropic', expect.objectContaining({
|
||||
model: 'claude-3',
|
||||
}), expect.objectContaining({
|
||||
asType: 'generation',
|
||||
parentSpanContext: mockSpanContext,
|
||||
}))
|
||||
expect(mockRootUpdate).toHaveBeenCalledWith(expect.objectContaining({
|
||||
usageDetails: { input: 10, output: 5 },
|
||||
}))
|
||||
expect(mockRootEnd).toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('recordToolObservation', () => {
|
||||
test('no-ops when rootSpan is null', async () => {
|
||||
const { recordToolObservation } = await import('../tracing.js')
|
||||
recordToolObservation(null, { toolName: 'BashTool', toolUseId: 'id1', input: {}, output: 'out' })
|
||||
// startObservation should not be called beyond the initial trace creation (none here)
|
||||
})
|
||||
|
||||
test('records tool child observation via global startObservation', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, recordToolObservation } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
mockStartObservation.mockClear()
|
||||
mockRootUpdate.mockClear()
|
||||
mockRootEnd.mockClear()
|
||||
recordToolObservation(span, {
|
||||
toolName: 'BashTool',
|
||||
toolUseId: 'tu-1',
|
||||
input: { command: 'ls' },
|
||||
output: 'file.ts',
|
||||
})
|
||||
// Should call the global startObservation with asType: 'tool' and parentSpanContext
|
||||
expect(mockStartObservation).toHaveBeenCalledWith('BashTool', expect.objectContaining({
|
||||
input: expect.any(Object),
|
||||
}), expect.objectContaining({
|
||||
asType: 'tool',
|
||||
parentSpanContext: mockSpanContext,
|
||||
}))
|
||||
expect(mockRootUpdate).toHaveBeenCalled()
|
||||
expect(mockRootEnd).toHaveBeenCalled()
|
||||
})
|
||||
|
||||
test('passes startTime to global startObservation', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, recordToolObservation } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
mockStartObservation.mockClear()
|
||||
const startTime = new Date('2026-01-01T00:00:00Z')
|
||||
recordToolObservation(span, {
|
||||
toolName: 'BashTool',
|
||||
toolUseId: 'tu-2',
|
||||
input: {},
|
||||
output: 'out',
|
||||
startTime,
|
||||
})
|
||||
expect(mockStartObservation).toHaveBeenCalledWith('BashTool', expect.any(Object), expect.objectContaining({
|
||||
startTime,
|
||||
parentSpanContext: mockSpanContext,
|
||||
}))
|
||||
})
|
||||
|
||||
test('sanitizes FileReadTool output', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, recordToolObservation } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
mockRootUpdate.mockClear()
|
||||
recordToolObservation(span, {
|
||||
toolName: 'FileReadTool',
|
||||
toolUseId: 'tu-2',
|
||||
input: { file_path: '/tmp/file.ts' },
|
||||
output: 'file content here',
|
||||
})
|
||||
expect(mockRootUpdate).toHaveBeenCalledWith(expect.objectContaining({
|
||||
output: '[file content redacted, 17 chars]',
|
||||
}))
|
||||
})
|
||||
|
||||
test('sets ERROR level for error observations', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, recordToolObservation } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
mockRootUpdate.mockClear()
|
||||
recordToolObservation(span, {
|
||||
toolName: 'BashTool',
|
||||
toolUseId: 'tu-3',
|
||||
input: {},
|
||||
output: 'error occurred',
|
||||
isError: true,
|
||||
})
|
||||
expect(mockRootUpdate).toHaveBeenCalledWith(expect.objectContaining({ level: 'ERROR' }))
|
||||
})
|
||||
})
|
||||
|
||||
describe('endTrace', () => {
|
||||
test('no-ops when rootSpan is null', async () => {
|
||||
const { endTrace } = await import('../tracing.js')
|
||||
endTrace(null)
|
||||
expect(mockRootEnd).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
test('calls span.end()', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, endTrace } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
endTrace(span)
|
||||
expect(mockRootEnd).toHaveBeenCalled()
|
||||
})
|
||||
|
||||
test('calls span.update() with output when provided', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, endTrace } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
endTrace(span, 'final output')
|
||||
expect(mockRootUpdate).toHaveBeenCalledWith({ output: 'final output' })
|
||||
expect(mockRootEnd).toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('createSubagentTrace', () => {
|
||||
test('returns null when langfuse not enabled', async () => {
|
||||
const { createSubagentTrace } = await import('../tracing.js')
|
||||
const span = createSubagentTrace({
|
||||
sessionId: 's1',
|
||||
agentType: 'Explore',
|
||||
agentId: 'agent-1',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
})
|
||||
expect(span).toBeNull()
|
||||
})
|
||||
|
||||
test('creates trace with agentType and agentId metadata', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createSubagentTrace } = await import('../tracing.js')
|
||||
const span = createSubagentTrace({
|
||||
sessionId: 's1',
|
||||
agentType: 'Explore',
|
||||
agentId: 'agent-1',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
input: [{ role: 'user', content: 'search for X' }],
|
||||
})
|
||||
expect(span).not.toBeNull()
|
||||
expect(mockStartObservation).toHaveBeenCalledWith('agent:Explore', expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
agentType: 'Explore',
|
||||
agentId: 'agent-1',
|
||||
provider: 'firstParty',
|
||||
model: 'claude-3',
|
||||
}),
|
||||
}), { asType: 'agent' })
|
||||
// Verify session.id attribute is set
|
||||
expect(mockSetAttribute).toHaveBeenCalledWith('session.id', 's1')
|
||||
})
|
||||
|
||||
test('returns null on SDK error', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') })
|
||||
const { createSubagentTrace } = await import('../tracing.js')
|
||||
const span = createSubagentTrace({
|
||||
sessionId: 's1',
|
||||
agentType: 'Plan',
|
||||
agentId: 'agent-2',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
})
|
||||
expect(span).toBeNull()
|
||||
})
|
||||
})
|
||||
|
||||
describe('createTrace with querySource', () => {
|
||||
test('includes querySource in metadata', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace } = await import('../tracing.js')
|
||||
const span = createTrace({
|
||||
sessionId: 's1',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
querySource: 'user',
|
||||
})
|
||||
expect(span).not.toBeNull()
|
||||
expect(mockStartObservation).toHaveBeenCalledWith('agent-run:user', expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
agentType: 'main',
|
||||
querySource: 'user',
|
||||
}),
|
||||
}), { asType: 'agent' })
|
||||
})
|
||||
|
||||
test('omits querySource when not provided', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
mockStartObservation.mockClear()
|
||||
const { createTrace } = await import('../tracing.js')
|
||||
createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
const calls = mockStartObservation.mock.calls as unknown[][]
|
||||
const secondArg = calls[0]?.[1] as Record<string, unknown> | undefined
|
||||
const metadata = (secondArg?.metadata ?? {}) as Record<string, unknown>
|
||||
expect(metadata).not.toHaveProperty('querySource')
|
||||
})
|
||||
})
|
||||
|
||||
describe('nested agent scenario', () => {
|
||||
test('sub-agent trace shares sessionId with parent', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createTrace, createSubagentTrace } = await import('../tracing.js')
|
||||
mockSetAttribute.mockClear()
|
||||
|
||||
// Create parent trace
|
||||
const parentSpan = createTrace({
|
||||
sessionId: 'shared-session',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
})
|
||||
|
||||
// Create sub-agent trace with same sessionId
|
||||
const subSpan = createSubagentTrace({
|
||||
sessionId: 'shared-session',
|
||||
agentType: 'Explore',
|
||||
agentId: 'agent-explore-1',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
})
|
||||
|
||||
expect(parentSpan).not.toBeNull()
|
||||
expect(subSpan).not.toBeNull()
|
||||
|
||||
// Both should have set session.id attribute
|
||||
const sessionAttributeCalls = mockSetAttribute.mock.calls.filter(
|
||||
(call: unknown[]) => Array.isArray(call) && call[0] === 'session.id' && call[1] === 'shared-session',
|
||||
)
|
||||
expect(sessionAttributeCalls.length).toBeGreaterThanOrEqual(2)
|
||||
})
|
||||
|
||||
test('query reuses passed langfuseTrace instead of creating new one', async () => {
|
||||
// This validates the pattern used in query.ts:
|
||||
// const ownsTrace = !params.toolUseContext.langfuseTrace
|
||||
// const langfuseTrace = params.toolUseContext.langfuseTrace ?? createTrace(...)
|
||||
// When langfuseTrace is already set, createTrace should NOT be called
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
const { createSubagentTrace } = await import('../tracing.js')
|
||||
|
||||
// Simulate what runAgent does: create subTrace, then pass it as langfuseTrace
|
||||
const subTrace = createSubagentTrace({
|
||||
sessionId: 's1',
|
||||
agentType: 'Explore',
|
||||
agentId: 'agent-1',
|
||||
model: 'claude-3',
|
||||
provider: 'firstParty',
|
||||
})
|
||||
expect(subTrace).not.toBeNull()
|
||||
|
||||
// Simulate query.ts logic: if langfuseTrace already set, don't create new one
|
||||
const ownsTrace = false // Would be: !params.toolUseContext.langfuseTrace
|
||||
const langfuseTrace = subTrace // Would be: params.toolUseContext.langfuseTrace ?? createTrace(...)
|
||||
|
||||
expect(ownsTrace).toBe(false)
|
||||
expect(langfuseTrace).toBe(subTrace)
|
||||
})
|
||||
})
|
||||
|
||||
describe('SDK exceptions do not affect main flow', () => {
|
||||
test('createTrace returns null on SDK error', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') })
|
||||
const { createTrace } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
expect(span).toBeNull()
|
||||
})
|
||||
|
||||
test('recordLLMObservation silently fails on SDK error', async () => {
|
||||
process.env.LANGFUSE_PUBLIC_KEY = 'pk-test'
|
||||
process.env.LANGFUSE_SECRET_KEY = 'sk-test'
|
||||
mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') })
|
||||
const { createTrace, recordLLMObservation } = await import('../tracing.js')
|
||||
const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' })
|
||||
// The second call to startObservation (for the generation) will throw
|
||||
mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') })
|
||||
expect(() => recordLLMObservation(span, {
|
||||
model: 'm',
|
||||
provider: 'firstParty',
|
||||
input: [],
|
||||
output: [],
|
||||
usage: { input_tokens: 1, output_tokens: 1 },
|
||||
})).not.toThrow()
|
||||
})
|
||||
})
|
||||
})
|
||||
72
src/services/langfuse/client.ts
Normal file
72
src/services/langfuse/client.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base'
|
||||
import { LangfuseSpanProcessor } from '@langfuse/otel'
|
||||
import type { MaskFunction } from '@langfuse/otel'
|
||||
import { setLangfuseTracerProvider } from '@langfuse/tracing'
|
||||
import { sanitizeGlobal } from './sanitize.js'
|
||||
import { logForDebugging } from 'src/utils/debug.js'
|
||||
|
||||
declare const MACRO: { VERSION: string }
|
||||
|
||||
let processor: LangfuseSpanProcessor | null = null
|
||||
let provider: BasicTracerProvider | null = null
|
||||
|
||||
export function isLangfuseEnabled(): boolean {
|
||||
return !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY)
|
||||
}
|
||||
|
||||
export function getLangfuseProcessor(): LangfuseSpanProcessor | null {
|
||||
return processor
|
||||
}
|
||||
|
||||
export function initLangfuse(): boolean {
|
||||
if (processor !== null) return true
|
||||
if (!isLangfuseEnabled()) {
|
||||
logForDebugging('[langfuse] No keys configured, running in no-op mode')
|
||||
return false
|
||||
}
|
||||
|
||||
try {
|
||||
const maskFn: MaskFunction = ({ data }) => sanitizeGlobal(data)
|
||||
|
||||
processor = new LangfuseSpanProcessor({
|
||||
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
|
||||
secretKey: process.env.LANGFUSE_SECRET_KEY,
|
||||
baseUrl: process.env.LANGFUSE_BASE_URL ?? 'https://cloud.langfuse.com',
|
||||
flushAt: parseInt(process.env.LANGFUSE_FLUSH_AT ?? '20', 10),
|
||||
flushInterval: parseInt(process.env.LANGFUSE_FLUSH_INTERVAL ?? '10', 10),
|
||||
mask: maskFn,
|
||||
environment: process.env.LANGFUSE_TRACING_ENVIRONMENT ?? 'development',
|
||||
release: MACRO.VERSION,
|
||||
exportMode: (process.env.LANGFUSE_EXPORT_MODE as 'batched' | 'immediate' | undefined) ?? 'batched',
|
||||
timeout: parseInt(process.env.LANGFUSE_TIMEOUT ?? '5', 10),
|
||||
})
|
||||
|
||||
provider = new BasicTracerProvider({
|
||||
spanProcessors: [processor],
|
||||
})
|
||||
|
||||
setLangfuseTracerProvider(provider)
|
||||
|
||||
logForDebugging('[langfuse] Initialized with LangfuseSpanProcessor')
|
||||
return true
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] Init failed: ${e}`, { level: 'error' })
|
||||
processor = null
|
||||
provider = null
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
export async function shutdownLangfuse(): Promise<void> {
|
||||
try {
|
||||
if (processor) {
|
||||
await processor.forceFlush()
|
||||
await processor.shutdown()
|
||||
}
|
||||
processor = null
|
||||
provider = null
|
||||
logForDebugging('[langfuse] Shutdown complete')
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] Shutdown error: ${e}`, { level: 'error' })
|
||||
}
|
||||
}
|
||||
117
src/services/langfuse/convert.ts
Normal file
117
src/services/langfuse/convert.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
/**
|
||||
* Convert internal Message types to Langfuse-compatible OpenAI-style chat format.
|
||||
*
|
||||
* Langfuse generations expect:
|
||||
* input: { role, content }[] where content is string or structured parts
|
||||
* output: { role: 'assistant', content: string | part[] }
|
||||
*/
|
||||
|
||||
import type { Message, AssistantMessage, UserMessage } from 'src/types/message.js'
|
||||
|
||||
type LangfuseContentPart =
|
||||
| { type: 'text'; text: string }
|
||||
| { type: 'tool_use'; id: string; name: string; input: unknown }
|
||||
| { type: 'tool_result'; tool_use_id: string; content: string }
|
||||
| { type: 'thinking'; thinking: string }
|
||||
| { type: string; [key: string]: unknown }
|
||||
|
||||
type LangfuseChatMessage = {
|
||||
role: 'user' | 'assistant' | 'system'
|
||||
content: string | LangfuseContentPart[]
|
||||
}
|
||||
|
||||
function normalizeContent(content: unknown): string | LangfuseContentPart[] {
|
||||
if (typeof content === 'string') return content
|
||||
if (!Array.isArray(content)) return String(content ?? '')
|
||||
|
||||
const parts: LangfuseContentPart[] = []
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== 'object') continue
|
||||
const b = block as Record<string, unknown>
|
||||
const type = b.type as string | undefined
|
||||
|
||||
if (type === 'text') {
|
||||
parts.push({ type: 'text', text: String(b.text ?? '') })
|
||||
} else if (type === 'thinking' || type === 'redacted_thinking') {
|
||||
parts.push({ type: 'thinking', thinking: String(b.thinking ?? '[redacted]') })
|
||||
} else if (type === 'tool_use') {
|
||||
parts.push({ type: 'tool_use', id: String(b.id ?? ''), name: String(b.name ?? ''), input: b.input })
|
||||
} else if (type === 'tool_result') {
|
||||
const resultContent = Array.isArray(b.content)
|
||||
? (b.content as Record<string, unknown>[])
|
||||
.map(c => {
|
||||
if (c.type === 'text') return String(c.text ?? '')
|
||||
if (c.type === 'image') return '[image]'
|
||||
if (c.type === 'document') return '[document]'
|
||||
return `[${String(c.type ?? 'unknown')}]`
|
||||
})
|
||||
.join('\n')
|
||||
: String(b.content ?? '')
|
||||
parts.push({ type: 'tool_result', tool_use_id: String(b.tool_use_id ?? ''), content: resultContent })
|
||||
} else if (type === 'image') {
|
||||
parts.push({ type: 'text', text: '[image]' })
|
||||
} else if (type === 'document') {
|
||||
const name = (b.source as Record<string, unknown> | undefined)?.filename
|
||||
?? (b.title as string | undefined)
|
||||
?? 'document'
|
||||
parts.push({ type: 'text', text: `[document: ${name}]` })
|
||||
} else if (type === 'server_tool_use' || type === 'web_search_tool_result' || type === 'tool_search_tool_result') {
|
||||
// server-side tool blocks — keep name/id, drop raw content
|
||||
parts.push({ type: type, id: String(b.id ?? ''), name: String(b.name ?? type) })
|
||||
} else {
|
||||
// unknown block: keep type + scalar fields only, drop any binary/large payloads
|
||||
const safe: Record<string, unknown> = { type: type ?? 'unknown' }
|
||||
for (const [k, v] of Object.entries(b)) {
|
||||
if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') safe[k] = v
|
||||
}
|
||||
parts.push(safe as LangfuseContentPart)
|
||||
}
|
||||
}
|
||||
|
||||
// Collapse to plain string if only one text part
|
||||
if (parts.length === 1 && parts[0]!.type === 'text') {
|
||||
return (parts[0] as { type: 'text'; text: string }).text
|
||||
}
|
||||
return parts
|
||||
}
|
||||
|
||||
function toRole(msg: Message): 'user' | 'assistant' | 'system' {
|
||||
if (msg.type === 'assistant') return 'assistant'
|
||||
if (msg.type === 'system') return 'system'
|
||||
return 'user'
|
||||
}
|
||||
|
||||
/** Convert messagesForAPI (UserMessage | AssistantMessage)[] → Langfuse input format */
|
||||
export function convertMessagesToLangfuse(
|
||||
messages: (UserMessage | AssistantMessage)[],
|
||||
systemPrompt?: readonly string[],
|
||||
): LangfuseChatMessage[] {
|
||||
const result: LangfuseChatMessage[] = []
|
||||
if (systemPrompt && systemPrompt.length > 0) {
|
||||
for (const block of systemPrompt) {
|
||||
if (block.trim()) result.push({ role: 'system', content: block })
|
||||
}
|
||||
}
|
||||
for (const msg of messages) {
|
||||
const inner = msg.message
|
||||
if (!inner) continue
|
||||
const role = (inner.role as 'user' | 'assistant' | undefined) ?? toRole(msg)
|
||||
result.push({ role, content: normalizeContent(inner.content) })
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
/** Convert AssistantMessage[] (newMessages) → Langfuse output format (last assistant turn) */
|
||||
export function convertOutputToLangfuse(
|
||||
messages: AssistantMessage[],
|
||||
): LangfuseChatMessage | LangfuseChatMessage[] | null {
|
||||
if (messages.length === 0) return null
|
||||
if (messages.length === 1) {
|
||||
const msg = messages[0]!
|
||||
return { role: 'assistant', content: normalizeContent(msg.message?.content) }
|
||||
}
|
||||
return messages.map(msg => ({
|
||||
role: 'assistant' as const,
|
||||
content: normalizeContent(msg.message?.content),
|
||||
}))
|
||||
}
|
||||
4
src/services/langfuse/index.ts
Normal file
4
src/services/langfuse/index.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export { initLangfuse, shutdownLangfuse, isLangfuseEnabled, getLangfuseProcessor } from './client.js'
|
||||
export { createTrace, createSubagentTrace, recordLLMObservation, recordToolObservation, endTrace } from './tracing.js'
|
||||
export type { LangfuseSpan } from './tracing.js'
|
||||
export { sanitizeToolInput, sanitizeToolOutput, sanitizeGlobal } from './sanitize.js'
|
||||
70
src/services/langfuse/sanitize.ts
Normal file
70
src/services/langfuse/sanitize.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
const MAX_OUTPUT_LENGTH = 500
|
||||
const REDACTED_FILE_TOOLS = new Set(['FileReadTool', 'FileWriteTool', 'FileEditTool'])
|
||||
const REDACTED_SHELL_TOOLS = new Set(['BashTool', 'PowerShellTool'])
|
||||
const SENSITIVE_OUTPUT_TOOLS = new Set(['ConfigTool', 'MCPTool'])
|
||||
|
||||
const HOME_DIR_PATTERN = new RegExp(
|
||||
(process.env.HOME ?? '/Users/[^/]+').replace(/[.*+?^${}()|[\]\\]/g, '\\$&'),
|
||||
'g',
|
||||
)
|
||||
|
||||
const SENSITIVE_KEY_PATTERN = /(?:api_?key|token|secret|password|credential|auth_header)/i
|
||||
|
||||
export function sanitizeGlobal(data: unknown): unknown {
|
||||
if (typeof data === 'string') {
|
||||
return data.replace(HOME_DIR_PATTERN, '~')
|
||||
}
|
||||
if (typeof data === 'object' && data !== null) {
|
||||
return sanitizeObject(data as Record<string, unknown>)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
function sanitizeObject(obj: Record<string, unknown>): Record<string, unknown> {
|
||||
const result: Record<string, unknown> = {}
|
||||
for (const [key, value] of Object.entries(obj)) {
|
||||
if (SENSITIVE_KEY_PATTERN.test(key)) {
|
||||
result[key] = '[REDACTED]'
|
||||
} else if (typeof value === 'string') {
|
||||
result[key] = value.replace(HOME_DIR_PATTERN, '~')
|
||||
} else if (typeof value === 'object' && value !== null) {
|
||||
result[key] = sanitizeObject(value as Record<string, unknown>)
|
||||
} else {
|
||||
result[key] = value
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
export function sanitizeToolInput(toolName: string, input: unknown): unknown {
|
||||
if (typeof input !== 'object' || input === null) return input
|
||||
const obj = { ...(input as Record<string, unknown>) }
|
||||
|
||||
for (const key of Object.keys(obj)) {
|
||||
if (SENSITIVE_KEY_PATTERN.test(key)) {
|
||||
obj[key] = '[REDACTED]'
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of ['file_path', 'path', 'directory'] as const) {
|
||||
if (key in obj && typeof obj[key] === 'string') {
|
||||
obj[key] = (obj[key] as string).replace(HOME_DIR_PATTERN, '~')
|
||||
}
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
||||
export function sanitizeToolOutput(toolName: string, output: string): string {
|
||||
if (REDACTED_FILE_TOOLS.has(toolName)) {
|
||||
return `[file content redacted, ${output.length} chars]`
|
||||
}
|
||||
if (REDACTED_SHELL_TOOLS.has(toolName)) {
|
||||
if (output.length > MAX_OUTPUT_LENGTH) {
|
||||
return output.slice(0, MAX_OUTPUT_LENGTH) + '\n[truncated]'
|
||||
}
|
||||
}
|
||||
if (SENSITIVE_OUTPUT_TOOLS.has(toolName)) {
|
||||
return `[${toolName} output redacted, ${output.length} chars]`
|
||||
}
|
||||
return output
|
||||
}
|
||||
201
src/services/langfuse/tracing.ts
Normal file
201
src/services/langfuse/tracing.ts
Normal file
@@ -0,0 +1,201 @@
|
||||
import { startObservation, LangfuseOtelSpanAttributes } from '@langfuse/tracing'
|
||||
import type { LangfuseSpan, LangfuseGeneration, LangfuseAgent } from '@langfuse/tracing'
|
||||
import { isLangfuseEnabled } from './client.js'
|
||||
import { sanitizeToolInput, sanitizeToolOutput } from './sanitize.js'
|
||||
import { logForDebugging } from 'src/utils/debug.js'
|
||||
|
||||
export type { LangfuseSpan }
|
||||
|
||||
// Root trace is an agent observation — represents one full agentic turn/session
|
||||
type RootTrace = LangfuseAgent & { _sessionId?: string }
|
||||
|
||||
export function createTrace(params: {
|
||||
sessionId: string
|
||||
model: string
|
||||
provider: string
|
||||
input?: unknown
|
||||
name?: string
|
||||
querySource?: string
|
||||
}): LangfuseSpan | null {
|
||||
if (!isLangfuseEnabled()) return null
|
||||
try {
|
||||
const traceName = params.name ?? (params.querySource ? `agent-run:${params.querySource}` : 'agent-run')
|
||||
const rootSpan = startObservation(traceName, {
|
||||
input: params.input,
|
||||
metadata: {
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
agentType: 'main',
|
||||
...(params.querySource && { querySource: params.querySource }),
|
||||
},
|
||||
}, { asType: 'agent' }) as RootTrace
|
||||
rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, params.sessionId)
|
||||
rootSpan._sessionId = params.sessionId
|
||||
logForDebugging(`[langfuse] Trace created: ${rootSpan.id}`)
|
||||
return rootSpan as unknown as LangfuseSpan
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] createTrace failed: ${e}`, { level: 'error' })
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
const PROVIDER_GENERATION_NAMES: Record<string, string> = {
|
||||
firstParty: 'ChatAnthropic',
|
||||
bedrock: 'ChatBedrockAnthropic',
|
||||
vertex: 'ChatVertexAnthropic',
|
||||
foundry: 'ChatFoundry',
|
||||
openai: 'ChatOpenAI',
|
||||
gemini: 'ChatGoogleGenerativeAI',
|
||||
grok: 'ChatXAI',
|
||||
}
|
||||
|
||||
export function recordLLMObservation(
|
||||
rootSpan: LangfuseSpan | null,
|
||||
params: {
|
||||
model: string
|
||||
provider: string
|
||||
input: unknown
|
||||
output: unknown
|
||||
usage: { input_tokens: number; output_tokens: number }
|
||||
startTime?: Date
|
||||
endTime?: Date
|
||||
completionStartTime?: Date
|
||||
},
|
||||
): void {
|
||||
if (!rootSpan || !isLangfuseEnabled()) return
|
||||
try {
|
||||
const genName = PROVIDER_GENERATION_NAMES[params.provider] ?? `Chat${params.provider}`
|
||||
|
||||
// Use the global startObservation directly instead of rootSpan.startObservation().
|
||||
// The instance method only forwards asType to the global function and drops startTime,
|
||||
// which causes negative TTFT because the OTel span's start time defaults to "now".
|
||||
const gen: LangfuseGeneration = startObservation(
|
||||
genName,
|
||||
{
|
||||
model: params.model,
|
||||
input: params.input,
|
||||
metadata: {
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
},
|
||||
...(params.completionStartTime && { completionStartTime: params.completionStartTime }),
|
||||
},
|
||||
{
|
||||
asType: 'generation',
|
||||
...(params.startTime && { startTime: params.startTime }),
|
||||
parentSpanContext: rootSpan.otelSpan.spanContext(),
|
||||
},
|
||||
)
|
||||
|
||||
// Propagate session ID to generation span so Langfuse links it correctly
|
||||
const sessionId = (rootSpan as unknown as RootTrace)._sessionId
|
||||
if (sessionId) {
|
||||
gen.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId)
|
||||
}
|
||||
|
||||
gen.update({
|
||||
output: params.output,
|
||||
usageDetails: {
|
||||
input: params.usage.input_tokens,
|
||||
output: params.usage.output_tokens,
|
||||
},
|
||||
})
|
||||
|
||||
gen.end(params.endTime)
|
||||
logForDebugging(`[langfuse] LLM observation recorded: ${gen.id}`)
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] recordLLMObservation failed: ${e}`, { level: 'error' })
|
||||
}
|
||||
}
|
||||
|
||||
export function recordToolObservation(
|
||||
rootSpan: LangfuseSpan | null,
|
||||
params: {
|
||||
toolName: string
|
||||
toolUseId: string
|
||||
input: unknown
|
||||
output: string
|
||||
startTime?: Date
|
||||
isError?: boolean
|
||||
},
|
||||
): void {
|
||||
if (!rootSpan || !isLangfuseEnabled()) return
|
||||
try {
|
||||
// Use the global startObservation directly instead of rootSpan.startObservation().
|
||||
// The instance method only forwards asType and drops startTime,
|
||||
// causing tool execution duration to be 0.
|
||||
const toolObs = startObservation(
|
||||
params.toolName,
|
||||
{
|
||||
input: sanitizeToolInput(params.toolName, params.input),
|
||||
metadata: {
|
||||
toolUseId: params.toolUseId,
|
||||
isError: String(params.isError ?? false),
|
||||
},
|
||||
},
|
||||
{
|
||||
asType: 'tool',
|
||||
...(params.startTime && { startTime: params.startTime }),
|
||||
parentSpanContext: rootSpan.otelSpan.spanContext(),
|
||||
},
|
||||
)
|
||||
|
||||
// Propagate session ID to tool span so Langfuse links it correctly
|
||||
const sessionId = (rootSpan as unknown as RootTrace)._sessionId
|
||||
if (sessionId) {
|
||||
toolObs.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId)
|
||||
}
|
||||
|
||||
toolObs.update({
|
||||
output: sanitizeToolOutput(params.toolName, params.output),
|
||||
...(params.isError && { level: 'ERROR' as const }),
|
||||
})
|
||||
|
||||
toolObs.end()
|
||||
logForDebugging(`[langfuse] Tool observation recorded: ${params.toolName} (${toolObs.id})`)
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] recordToolObservation failed: ${e}`, { level: 'error' })
|
||||
}
|
||||
}
|
||||
|
||||
export function createSubagentTrace(params: {
|
||||
sessionId: string
|
||||
agentType: string
|
||||
agentId: string
|
||||
model: string
|
||||
provider: string
|
||||
input?: unknown
|
||||
}): LangfuseSpan | null {
|
||||
if (!isLangfuseEnabled()) return null
|
||||
try {
|
||||
const rootSpan = startObservation(`agent:${params.agentType}`, {
|
||||
input: params.input,
|
||||
metadata: {
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
agentType: params.agentType,
|
||||
agentId: params.agentId,
|
||||
},
|
||||
}, { asType: 'agent' }) as RootTrace
|
||||
rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, params.sessionId)
|
||||
rootSpan._sessionId = params.sessionId
|
||||
logForDebugging(`[langfuse] Sub-agent trace created: ${rootSpan.id} (type=${params.agentType})`)
|
||||
return rootSpan as unknown as LangfuseSpan
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] createSubagentTrace failed: ${e}`, { level: 'error' })
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export function endTrace(rootSpan: LangfuseSpan | null, output?: unknown): void {
|
||||
if (!rootSpan) return
|
||||
try {
|
||||
if (output !== undefined) {
|
||||
rootSpan.update({ output })
|
||||
}
|
||||
rootSpan.end()
|
||||
logForDebugging(`[langfuse] Trace ended: ${rootSpan.id}`)
|
||||
} catch (e) {
|
||||
logForDebugging(`[langfuse] endTrace failed: ${e}`, { level: 'error' })
|
||||
}
|
||||
}
|
||||
@@ -50,6 +50,7 @@ import {
|
||||
} from '../../tools/ToolSearchTool/prompt.js'
|
||||
import { getAllBaseTools } from '../../tools.js'
|
||||
import type { HookProgress } from '../../types/hooks.js'
|
||||
import { recordToolObservation } from '../langfuse/index.js'
|
||||
import type {
|
||||
AssistantMessage,
|
||||
AttachmentMessage,
|
||||
@@ -1300,6 +1301,16 @@ async function checkPermissionsAndCallTool(
|
||||
: String(result.data ?? '')
|
||||
endToolSpan(toolResultStr)
|
||||
|
||||
// Record tool observation in Langfuse (no-op if not configured)
|
||||
recordToolObservation(toolUseContext.langfuseTrace ?? null, {
|
||||
toolName: tool.name,
|
||||
toolUseId: toolUseID,
|
||||
input: processedInput,
|
||||
output: toolResultStr,
|
||||
startTime: new Date(startTime),
|
||||
isError: false,
|
||||
})
|
||||
|
||||
// Map the tool result to API format once and cache it. This block is reused
|
||||
// by addToolResult (skipping the remap) and measured here for analytics.
|
||||
const mappedToolResultBlock = tool.mapToolResultToToolResultBlockParam(
|
||||
@@ -1609,6 +1620,16 @@ async function checkPermissionsAndCallTool(
|
||||
})
|
||||
endToolSpan()
|
||||
|
||||
// Record error observation in Langfuse (no-op if not configured)
|
||||
recordToolObservation(toolUseContext.langfuseTrace ?? null, {
|
||||
toolName: tool?.name ?? 'unknown',
|
||||
toolUseId: toolUseID,
|
||||
input: processedInput ?? input,
|
||||
output: errorMessage(error),
|
||||
startTime: new Date(startTime),
|
||||
isError: true,
|
||||
})
|
||||
|
||||
// Handle MCP auth errors by updating the client status to 'needs-auth'
|
||||
// This updates the /mcp display to show the server needs re-authorization
|
||||
if (error instanceof McpAuthError) {
|
||||
|
||||
Reference in New Issue
Block a user