From 2fea429dc64db338efd021bced1afef166cb1c40 Mon Sep 17 00:00:00 2001 From: claude-code-best Date: Sat, 11 Apr 2026 22:07:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AF=B9=20langfuse?= =?UTF-8?q?=20=E7=9B=91=E6=8E=A7=E7=9A=84=E6=94=AF=E6=8C=81=20(#242)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs: 更新类型检查的 CLAUDE.md * feat: 添加模型 1M 上下文切换 * chore: remove prefetchOfficialMcpUrls call on startup * docs: 添加 git commit 规范 * feat: 第一次接入 langfuse * fix: 修复 generation 的计时的错误 * feat: 添加多 agent 的监控 * feat: 添加 /poor 省流模式,toggle 关闭 extract_memories 和 prompt_suggestion Co-Authored-By: Claude Opus 4.6 * chore: 修复 lock 文件 * chore: 更新类型依赖 --------- Co-authored-by: Claude Opus 4.6 --- CLAUDE.md | 2 +- DEV-LOG.md | 10 + build.ts | 2 + bun.lock | 13 +- package.json | 5 +- scripts/dev.ts | 2 + src/Tool.ts | 3 + src/commands.ts | 6 + src/commands/poor/index.ts | 11 + src/commands/poor/poor.ts | 28 + src/commands/poor/poorMode.ts | 14 + src/entrypoints/init.ts | 5 + src/query.ts | 37 +- src/query/stopHooks.ts | 12 +- src/services/api/claude.ts | 18 + .../langfuse/__tests__/langfuse.test.ts | 568 ++++++++++++++++++ src/services/langfuse/client.ts | 72 +++ src/services/langfuse/convert.ts | 117 ++++ src/services/langfuse/index.ts | 4 + src/services/langfuse/sanitize.ts | 70 +++ src/services/langfuse/tracing.ts | 201 +++++++ src/services/tools/toolExecution.ts | 21 + src/tools/AgentTool/runAgent.ts | 27 + 23 files changed, 1242 insertions(+), 6 deletions(-) create mode 100644 src/commands/poor/index.ts create mode 100644 src/commands/poor/poor.ts create mode 100644 src/commands/poor/poorMode.ts create mode 100644 src/services/langfuse/__tests__/langfuse.test.ts create mode 100644 src/services/langfuse/client.ts create mode 100644 src/services/langfuse/convert.ts create mode 100644 src/services/langfuse/index.ts create mode 100644 src/services/langfuse/sanitize.ts create mode 100644 src/services/langfuse/tracing.ts diff --git a/CLAUDE.md b/CLAUDE.md index 69f9fd6ab..d07ddccf4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -274,7 +274,7 @@ bunx tsc --noEmit - **tsc must pass** — `bunx tsc --noEmit` 必须零错误,任何修改都不能引入新的类型错误。 - **Feature flags** — 默认全部关闭(`feature()` 返回 `false`)。Dev/build 各有自己的默认启用列表。不要在 `cli.tsx` 中重定义 `feature` 函数。 - **React Compiler output** — Components have decompiled memoization boilerplate (`const $ = _c(N)`). This is normal. -- **`bun:bundle` import** — `import { feature } from 'bun:bundle'` 是 Bun 内置模块,由运行时/构建器解析。不要用自定义函数替代它。 +- **`bun:bundle` import** — `import { feature } from 'bun:bundle'` 是 Bun 内置模块,由运行时/构建器解析。不要用自定义函数替代它。**`feature()` 只能直接用在 `if` 语句或三元表达式的条件位置**(Bun 编译器限制),不能赋值给变量、不能放在箭头函数体里、不能作为 `&&` 链的一部分。正确:`if (feature('X')) {}` 或 `feature('X') ? a : b`。 - **`src/` path alias** — tsconfig maps `src/*` to `./src/*`. Imports like `import { ... } from 'src/utils/...'` are valid. - **MACRO defines** — 集中管理在 `scripts/defines.ts`。Dev mode 通过 `bun -d` 注入,build 通过 `Bun.build({ define })` 注入。修改版本号等常量只改这个文件。 - **构建产物兼容 Node.js** — `build.ts` 会自动后处理 `import.meta.require`,产物可直接用 `node dist/cli.js` 运行。 diff --git a/DEV-LOG.md b/DEV-LOG.md index bfa77f2d6..33f328793 100644 --- a/DEV-LOG.md +++ b/DEV-LOG.md @@ -1,5 +1,15 @@ # DEV-LOG +## /poor 省流模式 (2026-04-11) + +新增 `/poor` 命令,toggle 关闭 `extract_memories` 和 `prompt_suggestion`,省 token。 + +- 新增 `POOR` feature flag(build.ts + dev.ts) +- `src/commands/poor/` — 命令定义 + toggle 实现 + 状态管理 +- `src/query/stopHooks.ts` — POOR 模式激活时跳过 extract_memories 和 prompt_suggestion + +--- + ## Daemon + Remote Control Server 还原 (2026-04-07) **分支**: `feat/daemon-remote-control-server` diff --git a/build.ts b/build.ts index 31d6c2d2a..6ea97a428 100644 --- a/build.ts +++ b/build.ts @@ -33,6 +33,8 @@ const DEFAULT_BUILD_FEATURES = [ 'ULTRAPLAN', // P2: daemon + remote control server 'DAEMON', + // P3: poor mode (disable extract_memories + prompt_suggestion) + 'POOR', ] // Collect FEATURE_* env vars → Bun.build features diff --git a/bun.lock b/bun.lock index 6d58ae8cc..2ab87561c 100644 --- a/bun.lock +++ b/bun.lock @@ -5,7 +5,7 @@ "": { "name": "claude-code", "dependencies": { - "@types/lodash-es": "^4.17.12", + "@types/he": "^1.2.3", }, "devDependencies": { "@alcalzone/ansi-tokenize": "^0.3.0", @@ -30,6 +30,8 @@ "@biomejs/biome": "^2.4.10", "@commander-js/extra-typings": "^14.0.0", "@growthbook/growthbook": "^1.6.5", + "@langfuse/otel": "^5.1.0", + "@langfuse/tracing": "^5.1.0", "@modelcontextprotocol/sdk": "^1.29.0", "@opentelemetry/api": "^1.9.1", "@opentelemetry/api-logs": "^0.214.0", @@ -54,6 +56,7 @@ "@smithy/node-http-handler": "^4.5.1", "@types/bun": "^1.3.11", "@types/cacache": "^20.0.1", + "@types/lodash-es": "^4.17.12", "@types/picomatch": "^4.0.3", "@types/plist": "^3.0.5", "@types/proper-lockfile": "^4.1.4", @@ -566,6 +569,12 @@ "@js-sdsl/ordered-map": ["@js-sdsl/ordered-map@4.4.2", "https://registry.npmmirror.com/@js-sdsl/ordered-map/-/ordered-map-4.4.2.tgz", {}, "sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw=="], + "@langfuse/core": ["@langfuse/core@5.1.0", "https://registry.npmmirror.com/@langfuse/core/-/core-5.1.0.tgz", { "peerDependencies": { "@opentelemetry/api": "^1.9.0" } }, "sha512-yFvC67HBtrY4B3tyzF8+RJaIqK79LBVXtAgtmEc2vhpKauecvSW0zevRnRynFX+ajUHqi9TN7tnD91FJszFLgQ=="], + + "@langfuse/otel": ["@langfuse/otel@5.1.0", "https://registry.npmmirror.com/@langfuse/otel/-/otel-5.1.0.tgz", { "dependencies": { "@langfuse/core": "^5.1.0" }, "peerDependencies": { "@opentelemetry/api": "^1.9.0", "@opentelemetry/core": "^2.0.1", "@opentelemetry/exporter-trace-otlp-http": ">=0.202.0 <1.0.0", "@opentelemetry/sdk-trace-base": "^2.0.1" } }, "sha512-pvaXgZHMHqjsRjn+Gs5amrrq61w0Rxz1OChmLr2FfQzlymNl7+MxSXsWBj5dZQlufGbhyG+LT3wdx3MV8aLXHQ=="], + + "@langfuse/tracing": ["@langfuse/tracing@5.1.0", "https://registry.npmmirror.com/@langfuse/tracing/-/tracing-5.1.0.tgz", { "dependencies": { "@langfuse/core": "^5.1.0" }, "peerDependencies": { "@opentelemetry/api": "^1.9.0" } }, "sha512-ScwYnQzqLZOaMPZkCsWizx139eb02GI8tD5yxs5XVjGNGZxKdw1DfRPTIONSlOhaAYCY9ILGTJdkqAtNTzsbRg=="], + "@mixmark-io/domino": ["@mixmark-io/domino@2.2.0", "https://registry.npmmirror.com/@mixmark-io/domino/-/domino-2.2.0.tgz", {}, "sha512-Y28PR25bHXUg88kCV7nivXrP2Nj2RueZ3/l/jdx6J9f8J4nsEGcgX0Qe6lt7Pa+J79+kPiJU3LguR6O/6zrLOw=="], "@modelcontextprotocol/sdk": ["@modelcontextprotocol/sdk@1.29.0", "https://registry.npmmirror.com/@modelcontextprotocol/sdk/-/sdk-1.29.0.tgz", { "dependencies": { "@hono/node-server": "^1.19.9", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "content-type": "^1.0.5", "cors": "^2.8.5", "cross-spawn": "^7.0.5", "eventsource": "^3.0.2", "eventsource-parser": "^3.0.0", "express": "^5.2.1", "express-rate-limit": "^8.2.1", "hono": "^4.11.4", "jose": "^6.1.3", "json-schema-typed": "^8.0.2", "pkce-challenge": "^5.0.0", "raw-body": "^3.0.0", "zod": "^3.25 || ^4.0", "zod-to-json-schema": "^3.25.1" }, "peerDependencies": { "@cfworker/json-schema": "^4.1.1" }, "optionalPeers": ["@cfworker/json-schema"] }, "sha512-zo37mZA9hJWpULgkRpowewez1y6ML5GsXJPY8FI0tBBCd77HEvza4jDqRKOXgHNn867PVGCyTdzqpz0izu5ZjQ=="], @@ -982,6 +991,8 @@ "@types/estree": ["@types/estree@1.0.8", "https://registry.npmmirror.com/@types/estree/-/estree-1.0.8.tgz", {}, "sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w=="], + "@types/he": ["@types/he@1.2.3", "https://registry.npmmirror.com/@types/he/-/he-1.2.3.tgz", {}, "sha512-q67/qwlxblDzEDvzHhVkwc1gzVWxaNxeyHUBF4xElrvjL11O+Ytze+1fGpBHlr/H9myiBUaUXNnNPmBHxxfAcA=="], + "@types/lodash": ["@types/lodash@4.17.24", "https://registry.npmmirror.com/@types/lodash/-/lodash-4.17.24.tgz", {}, "sha512-gIW7lQLZbue7lRSWEFql49QJJWThrTFFeIMJdp3eH4tKoxm1OvEPg02rm4wCCSHS0cL3/Fizimb35b7k8atwsQ=="], "@types/lodash-es": ["@types/lodash-es@4.17.12", "https://registry.npmmirror.com/@types/lodash-es/-/lodash-es-4.17.12.tgz", { "dependencies": { "@types/lodash": "*" } }, "sha512-0NgftHUcV4v34VhXm8QBSftKVXtbkBG3ViCjs6+eJ5a6y6Mi/jiFGPc1sC7QK+9BFhWrURE3EOggmWaSxL9OzQ=="], diff --git a/package.json b/package.json index 266e5c1c9..30b66373c 100644 --- a/package.json +++ b/package.json @@ -54,9 +54,12 @@ "rcs": "bun run scripts/rcs.ts" }, "dependencies": { - "@types/lodash-es": "^4.17.12" + "@types/he": "^1.2.3" }, "devDependencies": { + "@langfuse/otel": "^5.1.0", + "@langfuse/tracing": "^5.1.0", + "@types/lodash-es": "^4.17.12", "@alcalzone/ansi-tokenize": "^0.3.0", "@ant/claude-for-chrome-mcp": "workspace:*", "@ant/computer-use-input": "workspace:*", diff --git a/scripts/dev.ts b/scripts/dev.ts index 0b35f4bc8..7ca9f5335 100644 --- a/scripts/dev.ts +++ b/scripts/dev.ts @@ -37,6 +37,8 @@ const DEFAULT_FEATURES = [ "KAIROS_BRIEF", "AWAY_SUMMARY", "ULTRAPLAN", // P2: daemon + remote control server "DAEMON", + // P3: poor mode (disable extract_memories + prompt_suggestion) + "POOR", ]; // Any env var matching FEATURE_=1 will also enable that feature. diff --git a/src/Tool.ts b/src/Tool.ts index 235ebb46b..b14a1d594 100644 --- a/src/Tool.ts +++ b/src/Tool.ts @@ -76,6 +76,7 @@ import type { SpinnerMode } from './components/Spinner.js' import type { QuerySource } from './constants/querySource.js' import type { SDKStatus } from './entrypoints/agentSdkTypes.js' import type { AppState } from './state/AppState.js' +import type { LangfuseSpan } from './services/langfuse/index.js' import type { HookProgress, PromptRequest, @@ -273,6 +274,8 @@ export type ToolUseContext = { ) => (request: PromptRequest) => Promise toolUseId?: string criticalSystemReminder_EXPERIMENTAL?: string + /** Langfuse root trace span for this query turn. Passed down to tool execution for observability. */ + langfuseTrace?: LangfuseSpan | null /** When true, preserve toolUseResult on messages even for subagents. * Used by in-process teammates whose transcripts are viewable by the user. */ preserveToolUseResults?: boolean diff --git a/src/commands.ts b/src/commands.ts index defc80f02..2cb106c01 100644 --- a/src/commands.ts +++ b/src/commands.ts @@ -120,6 +120,11 @@ const buddy = feature('BUDDY') require('./commands/buddy/index.js') as typeof import('./commands/buddy/index.js') ).default : null +const poor = feature('POOR') + ? ( + require('./commands/poor/index.js') as typeof import('./commands/poor/index.js') + ).default + : null /* eslint-enable @typescript-eslint/no-require-imports */ import thinkback from './commands/thinkback/index.js' import thinkbackPlay from './commands/thinkback-play/index.js' @@ -321,6 +326,7 @@ const COMMANDS = memoize((): Command[] => [ ...(webCmd ? [webCmd] : []), ...(forkCmd ? [forkCmd] : []), ...(buddy ? [buddy] : []), + ...(poor ? [poor] : []), ...(proactive ? [proactive] : []), ...(briefCommand ? [briefCommand] : []), ...(assistantCommand ? [assistantCommand] : []), diff --git a/src/commands/poor/index.ts b/src/commands/poor/index.ts new file mode 100644 index 000000000..0ae331a68 --- /dev/null +++ b/src/commands/poor/index.ts @@ -0,0 +1,11 @@ +import type { Command } from '../../commands.js' + +const poor = { + type: 'local', + name: 'poor', + description: 'Toggle poor mode — disable extract_memories and prompt_suggestion to save tokens', + supportsNonInteractive: false, + load: () => import('./poor.js'), +} satisfies Command + +export default poor diff --git a/src/commands/poor/poor.ts b/src/commands/poor/poor.ts new file mode 100644 index 000000000..ab47de1ec --- /dev/null +++ b/src/commands/poor/poor.ts @@ -0,0 +1,28 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { isPoorModeActive, setPoorMode } from './poorMode.js' + +export const call: LocalCommandCall = async (_, context) => { + const currentlyActive = isPoorModeActive() + const newState = !currentlyActive + setPoorMode(newState) + + if (newState) { + // Disable prompt suggestion in AppState + context.setAppState(prev => ({ + ...prev, + promptSuggestionEnabled: false, + })) + } else { + // Re-enable prompt suggestion + context.setAppState(prev => ({ + ...prev, + promptSuggestionEnabled: true, + })) + } + + const status = newState ? 'ON' : 'OFF' + const details = newState + ? 'extract_memories and prompt_suggestion are disabled' + : 'extract_memories and prompt_suggestion are restored' + return { type: 'text', value: `Poor mode ${status} — ${details}` } +} diff --git a/src/commands/poor/poorMode.ts b/src/commands/poor/poorMode.ts new file mode 100644 index 000000000..533d9700f --- /dev/null +++ b/src/commands/poor/poorMode.ts @@ -0,0 +1,14 @@ +/** + * Poor mode state — when active, skips extract_memories and prompt_suggestion + * to reduce token consumption. + */ + +let poorModeActive = false + +export function isPoorModeActive(): boolean { + return poorModeActive +} + +export function setPoorMode(active: boolean): void { + poorModeActive = active +} diff --git a/src/entrypoints/init.ts b/src/entrypoints/init.ts index f202aa3a3..05a6fda64 100644 --- a/src/entrypoints/init.ts +++ b/src/entrypoints/init.ts @@ -49,6 +49,7 @@ import { isBetaTracingEnabled } from '../utils/telemetry/betaSessionTracing.js' import { getTelemetryAttributes } from '../utils/telemetryAttributes.js' import { setShellIfWindows } from '../utils/windowsPaths.js' import { initSentry } from '../utils/sentry.js' +import { initLangfuse, shutdownLangfuse } from '../services/langfuse/index.js' // initialize1PEventLogging is dynamically imported to defer OpenTelemetry sdk-logs/resources @@ -154,6 +155,10 @@ export const init = memoize(async (): Promise => { // Initialize Sentry for error reporting (no-op if SENTRY_DSN not set) initSentry() + // Initialize Langfuse tracing (no-op if keys not configured) + initLangfuse() + registerCleanup(shutdownLangfuse) + // Preconnect to the Anthropic API — overlap TCP+TLS handshake // (~100-200ms) with the ~100ms of action-handler work before the API // request. After CA certs + proxy agents are configured so the warmed diff --git a/src/query.ts b/src/query.ts index 9e5c7ed46..f8a7eb7b4 100644 --- a/src/query.ts +++ b/src/query.ts @@ -107,9 +107,12 @@ import { getCurrentTurnTokenBudget, getTurnOutputTokens, incrementBudgetContinuationCount, + getSessionId, } from './bootstrap/state.js' import { createBudgetTracker, checkTokenBudget } from './query/tokenBudget.js' import { count } from './utils/array.js' +import { createTrace, endTrace, isLangfuseEnabled } from './services/langfuse/index.js' +import { getAPIProvider } from './utils/model/providers.js' /* eslint-disable @typescript-eslint/no-require-imports */ const snipModule = feature('HISTORY_SNIP') @@ -227,7 +230,38 @@ export async function* query( Terminal > { const consumedCommandUuids: string[] = [] - const terminal = yield* queryLoop(params, consumedCommandUuids) + + // Create Langfuse trace for this query turn (no-op if not configured). + // When called as a sub-agent, langfuseTrace is already set by runAgent() + // — reuse it instead of creating an independent trace. + const ownsTrace = !params.toolUseContext.langfuseTrace + const langfuseTrace = params.toolUseContext.langfuseTrace + ?? (isLangfuseEnabled() + ? createTrace({ + sessionId: getSessionId(), + model: params.toolUseContext.options.mainLoopModel, + provider: getAPIProvider(), + input: params.messages, + querySource: params.querySource, + }) + : null) + + // Attach trace to toolUseContext so tool execution can record observations + const paramsWithTrace: QueryParams = langfuseTrace + ? { + ...params, + toolUseContext: { ...params.toolUseContext, langfuseTrace }, + } + : params + + let terminal: Terminal + try { + terminal = yield* queryLoop(paramsWithTrace, consumedCommandUuids) + } finally { + // Only end the trace if we created it — sub-agents own their traces + if (ownsTrace) endTrace(langfuseTrace) + } + // Only reached if queryLoop returned normally. Skipped on throw (error // propagates through yield*) and on .return() (Return completion closes // both generators). This gives the same asymmetric started-without-completed @@ -704,6 +738,7 @@ async function* queryLoop( }), }, }), + langfuseTrace: toolUseContext.langfuseTrace, }, })) { // We won't use the tool_calls from the first attempt diff --git a/src/query/stopHooks.ts b/src/query/stopHooks.ts index 457fe03ea..10e268dfb 100644 --- a/src/query/stopHooks.ts +++ b/src/query/stopHooks.ts @@ -133,15 +133,23 @@ export async function* handleStopHooks( // --bare / SIMPLE: skip background bookkeeping (prompt suggestion, // memory extraction, auto-dream). Scripted -p calls don't want auto-memory // or forked agents contending for resources during shutdown. + // Poor mode: also skip prompt suggestion and memory extraction. + const poorMode = feature('POOR') + ? (await import('../commands/poor/poorMode.js')).isPoorModeActive() + : false if (!isBareMode()) { // Inline env check for dead code elimination in external builds - if (!isEnvDefinedFalsy(process.env.CLAUDE_CODE_ENABLE_PROMPT_SUGGESTION)) { + if ( + !isEnvDefinedFalsy(process.env.CLAUDE_CODE_ENABLE_PROMPT_SUGGESTION) && + !poorMode + ) { void executePromptSuggestion(stopHookContext) } if ( feature('EXTRACT_MEMORIES') && !toolUseContext.agentId && - isExtractModeActive() + isExtractModeActive() && + !poorMode ) { // Fire-and-forget in both interactive and non-interactive. For -p/SDK, // print.ts drains the in-flight promise after flushing the response diff --git a/src/services/api/claude.ts b/src/services/api/claude.ts index 5e31a3f16..8a726886f 100644 --- a/src/services/api/claude.ts +++ b/src/services/api/claude.ts @@ -228,6 +228,9 @@ import { } from '../compact/microCompact.js' import { getInitializationStatus } from '../lsp/manager.js' import { isToolFromMcpServer } from '../mcp/utils.js' +import { recordLLMObservation } from '../langfuse/index.js' +import type { LangfuseSpan } from '../langfuse/index.js' +import { convertMessagesToLangfuse, convertOutputToLangfuse } from '../langfuse/convert.js' import { withStreamingVCR, withVCR } from '../vcr.js' import { CLIENT_REQUEST_ID_HEADER, getAnthropicClient } from './client.js' import { @@ -717,6 +720,8 @@ export type Options = { // so the model can pace itself. `remaining` is computed by the caller // (query.ts decrements across the agentic loop). taskBudget?: { total: number; remaining?: number } + /** Langfuse root trace span for observability. No-op if null/undefined. */ + langfuseTrace?: LangfuseSpan | null } export async function queryModelWithoutStreaming({ @@ -2895,6 +2900,19 @@ async function* queryModel( // limit) until getToolPermissionContext() resolves. const logMessageCount = messagesForAPI.length const logMessageTokens = tokenCountFromLastAPIResponse(messagesForAPI) + + // Record LLM observation in Langfuse (no-op if not configured) + recordLLMObservation(options.langfuseTrace ?? null, { + model: resolvedModel, + provider: getAPIProvider(), + input: convertMessagesToLangfuse(messagesForAPI, systemPrompt), + output: convertOutputToLangfuse(newMessages), + usage: { input_tokens: usage.input_tokens, output_tokens: usage.output_tokens }, + startTime: new Date(startIncludingRetries), + endTime: new Date(), + completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, + }) + void options.getToolPermissionContext().then(permissionContext => { logAPISuccessAndDuration({ model: diff --git a/src/services/langfuse/__tests__/langfuse.test.ts b/src/services/langfuse/__tests__/langfuse.test.ts new file mode 100644 index 000000000..53dafb824 --- /dev/null +++ b/src/services/langfuse/__tests__/langfuse.test.ts @@ -0,0 +1,568 @@ +import { mock, describe, test, expect, beforeEach } from 'bun:test' + +// Mock @langfuse/otel before any imports +const mockForceFlush = mock(() => Promise.resolve()) +const mockShutdown = mock(() => Promise.resolve()) + +mock.module('@langfuse/otel', () => ({ + LangfuseSpanProcessor: class MockLangfuseSpanProcessor { + forceFlush = mockForceFlush + shutdown = mockShutdown + onStart = mock(() => {}) + onEnd = mock(() => {}) + }, +})) + +// Mock @opentelemetry/sdk-trace-base +mock.module('@opentelemetry/sdk-trace-base', () => ({ + BasicTracerProvider: class MockBasicTracerProvider { + constructor(_opts?: unknown) {} + }, +})) + +// Mock @langfuse/tracing +const mockChildUpdate = mock(() => {}) +const mockChildEnd = mock(() => {}) +const mockRootUpdate = mock(() => {}) +const mockRootEnd = mock(() => {}) + +// Mock LangfuseOtelSpanAttributes (re-exported from @langfuse/core) +const mockLangfuseOtelSpanAttributes: Record = { + TRACE_SESSION_ID: 'session.id', + OBSERVATION_TYPE: 'observation.type', + OBSERVATION_INPUT: 'observation.input', + OBSERVATION_OUTPUT: 'observation.output', + OBSERVATION_MODEL: 'observation.model', + OBSERVATION_COMPLETION_START_TIME: 'observation.completionStartTime', + OBSERVATION_USAGE_DETAILS: 'observation.usageDetails', +} + +const mockSpanContext = { traceId: 'test-trace-id', spanId: 'test-span-id', traceFlags: 1 } +const mockSetAttribute = mock(() => {}) + +// Child observation mock (returned by rootSpan.startObservation for tools) +const mockChildStartObservation = mock(() => ({ + id: 'child-id', + update: mockChildUpdate, + end: mockChildEnd, +})) + +const mockStartObservation = mock(() => ({ + id: 'test-span-id', + traceId: 'test-trace-id', + type: 'span', + otelSpan: { + spanContext: () => mockSpanContext, + setAttribute: mockSetAttribute, + }, + update: mockRootUpdate, + end: mockRootEnd, + // Instance method — used by recordToolObservation + startObservation: mockChildStartObservation, +})) +const mockSetLangfuseTracerProvider = mock(() => {}) + +mock.module('@langfuse/tracing', () => ({ + startObservation: mockStartObservation, + LangfuseOtelSpanAttributes: mockLangfuseOtelSpanAttributes, + propagateAttributes: mock((_params: unknown, fn?: () => void) => fn?.()), + setLangfuseTracerProvider: mockSetLangfuseTracerProvider, +})) + +// Mock debug logger +mock.module('src/utils/debug.js', () => ({ + logForDebugging: mock(() => {}), +})) + +describe('Langfuse integration', () => { + beforeEach(() => { + // Reset env + delete process.env.LANGFUSE_PUBLIC_KEY + delete process.env.LANGFUSE_SECRET_KEY + delete process.env.LANGFUSE_BASE_URL + mockStartObservation.mockClear() + mockChildStartObservation.mockClear() + mockChildUpdate.mockClear() + mockChildEnd.mockClear() + mockRootUpdate.mockClear() + mockRootEnd.mockClear() + mockForceFlush.mockClear() + mockShutdown.mockClear() + mockSetAttribute.mockClear() + }) + + // ── sanitize tests ────────────────────────────────────────────────────────── + + describe('sanitizeToolInput', () => { + test('replaces home dir in file_path', async () => { + const { sanitizeToolInput } = await import('../sanitize.js') + const home = process.env.HOME ?? '/Users/testuser' + const result = sanitizeToolInput('FileReadTool', { file_path: `${home}/project/file.ts` }) as Record + expect(result.file_path).toBe('~/project/file.ts') + }) + + test('redacts sensitive keys', async () => { + const { sanitizeToolInput } = await import('../sanitize.js') + const result = sanitizeToolInput('MCPTool', { api_key: 'secret123', token: 'abc' }) as Record + expect(result.api_key).toBe('[REDACTED]') + expect(result.token).toBe('[REDACTED]') + }) + + test('returns non-object input unchanged', async () => { + const { sanitizeToolInput } = await import('../sanitize.js') + expect(sanitizeToolInput('BashTool', 'raw string')).toBe('raw string') + expect(sanitizeToolInput('BashTool', null)).toBe(null) + }) + }) + + describe('sanitizeToolOutput', () => { + test('redacts FileReadTool output', async () => { + const { sanitizeToolOutput } = await import('../sanitize.js') + const result = sanitizeToolOutput('FileReadTool', 'file content here') + expect(result).toBe('[file content redacted, 17 chars]') + }) + + test('redacts FileWriteTool output', async () => { + const { sanitizeToolOutput } = await import('../sanitize.js') + const result = sanitizeToolOutput('FileWriteTool', 'written content') + expect(result).toBe('[file content redacted, 15 chars]') + }) + + test('truncates BashTool output over 500 chars', async () => { + const { sanitizeToolOutput } = await import('../sanitize.js') + const longOutput = 'x'.repeat(600) + const result = sanitizeToolOutput('BashTool', longOutput) + expect(result).toContain('[truncated]') + expect(result.length).toBeLessThan(600) + }) + + test('does not truncate BashTool output under 500 chars', async () => { + const { sanitizeToolOutput } = await import('../sanitize.js') + const shortOutput = 'hello world' + expect(sanitizeToolOutput('BashTool', shortOutput)).toBe('hello world') + }) + + test('redacts ConfigTool output', async () => { + const { sanitizeToolOutput } = await import('../sanitize.js') + const result = sanitizeToolOutput('ConfigTool', 'config data') + expect(result).toBe('[ConfigTool output redacted, 11 chars]') + }) + + test('redacts MCPTool output', async () => { + const { sanitizeToolOutput } = await import('../sanitize.js') + const result = sanitizeToolOutput('MCPTool', 'mcp data') + expect(result).toBe('[MCPTool output redacted, 8 chars]') + }) + }) + + describe('sanitizeGlobal', () => { + test('replaces home dir in strings', async () => { + const { sanitizeGlobal } = await import('../sanitize.js') + const home = process.env.HOME ?? '/Users/testuser' + expect(sanitizeGlobal(`path: ${home}/file`)).toBe('path: ~/file') + }) + + test('recursively sanitizes nested objects', async () => { + const { sanitizeGlobal } = await import('../sanitize.js') + const result = sanitizeGlobal({ nested: { api_key: 'secret', name: 'test' } }) as Record> + expect(result.nested.api_key).toBe('[REDACTED]') + expect(result.nested.name).toBe('test') + }) + + test('returns non-string/object values unchanged', async () => { + const { sanitizeGlobal } = await import('../sanitize.js') + expect(sanitizeGlobal(42)).toBe(42) + expect(sanitizeGlobal(true)).toBe(true) + }) + }) + + // ── client tests ──────────────────────────────────────────────────────────── + + describe('isLangfuseEnabled', () => { + test('returns false when keys not configured', async () => { + const { isLangfuseEnabled } = await import('../client.js') + expect(isLangfuseEnabled()).toBe(false) + }) + + test('returns true when both keys are set', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { isLangfuseEnabled } = await import('../client.js') + expect(isLangfuseEnabled()).toBe(true) + }) + }) + + describe('initLangfuse', () => { + test('returns false when keys not configured', async () => { + const { initLangfuse } = await import('../client.js') + expect(initLangfuse()).toBe(false) + }) + + test('returns true when keys are configured', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + // client.js is a singleton — test via isLangfuseEnabled which reads env directly + const { isLangfuseEnabled } = await import('../client.js') + expect(isLangfuseEnabled()).toBe(true) + }) + + test('is idempotent — multiple calls do not re-initialize', async () => { + // client.js singleton: once processor is set, initLangfuse returns true immediately + // We verify this by checking that calling it multiple times doesn't throw + const { initLangfuse } = await import('../client.js') + expect(() => { initLangfuse(); initLangfuse() }).not.toThrow() + }) + }) + + describe('shutdownLangfuse', () => { + test('calls forceFlush and shutdown on processor', async () => { + // Verify shutdown is callable without error even when no processor is set + const { shutdownLangfuse } = await import('../client.js') + await expect(shutdownLangfuse()).resolves.toBeUndefined() + }) + }) + + // ── tracing tests ─────────────────────────────────────────────────────────── + + describe('createTrace', () => { + test('returns null when langfuse not enabled', async () => { + const { createTrace } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + expect(span).toBeNull() + }) + + test('creates root span when enabled', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty', input: [] }) + expect(span).not.toBeNull() + expect(mockStartObservation).toHaveBeenCalledWith('agent-run', expect.objectContaining({ + metadata: expect.objectContaining({ provider: 'firstParty', model: 'claude-3' }), + }), { asType: 'agent' }) + }) + }) + + describe('recordLLMObservation', () => { + test('no-ops when rootSpan is null', async () => { + const { recordLLMObservation } = await import('../tracing.js') + recordLLMObservation(null, { model: 'm', provider: 'firstParty', input: [], output: [], usage: { input_tokens: 10, output_tokens: 5 } }) + expect(mockStartObservation).toHaveBeenCalledTimes(0) + }) + + test('records generation child observation via global startObservation', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, recordLLMObservation } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + mockStartObservation.mockClear() + recordLLMObservation(span, { + model: 'claude-3', + provider: 'firstParty', + input: [{ role: 'user', content: 'hello' }], + output: [{ role: 'assistant', content: 'hi' }], + usage: { input_tokens: 10, output_tokens: 5 }, + }) + // Should call the global startObservation with asType: 'generation' and parentSpanContext + expect(mockStartObservation).toHaveBeenCalledWith('ChatAnthropic', expect.objectContaining({ + model: 'claude-3', + }), expect.objectContaining({ + asType: 'generation', + parentSpanContext: mockSpanContext, + })) + expect(mockRootUpdate).toHaveBeenCalledWith(expect.objectContaining({ + usageDetails: { input: 10, output: 5 }, + })) + expect(mockRootEnd).toHaveBeenCalled() + }) + }) + + describe('recordToolObservation', () => { + test('no-ops when rootSpan is null', async () => { + const { recordToolObservation } = await import('../tracing.js') + recordToolObservation(null, { toolName: 'BashTool', toolUseId: 'id1', input: {}, output: 'out' }) + // startObservation should not be called beyond the initial trace creation (none here) + }) + + test('records tool child observation via global startObservation', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, recordToolObservation } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + mockStartObservation.mockClear() + mockRootUpdate.mockClear() + mockRootEnd.mockClear() + recordToolObservation(span, { + toolName: 'BashTool', + toolUseId: 'tu-1', + input: { command: 'ls' }, + output: 'file.ts', + }) + // Should call the global startObservation with asType: 'tool' and parentSpanContext + expect(mockStartObservation).toHaveBeenCalledWith('BashTool', expect.objectContaining({ + input: expect.any(Object), + }), expect.objectContaining({ + asType: 'tool', + parentSpanContext: mockSpanContext, + })) + expect(mockRootUpdate).toHaveBeenCalled() + expect(mockRootEnd).toHaveBeenCalled() + }) + + test('passes startTime to global startObservation', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, recordToolObservation } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + mockStartObservation.mockClear() + const startTime = new Date('2026-01-01T00:00:00Z') + recordToolObservation(span, { + toolName: 'BashTool', + toolUseId: 'tu-2', + input: {}, + output: 'out', + startTime, + }) + expect(mockStartObservation).toHaveBeenCalledWith('BashTool', expect.any(Object), expect.objectContaining({ + startTime, + parentSpanContext: mockSpanContext, + })) + }) + + test('sanitizes FileReadTool output', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, recordToolObservation } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + mockRootUpdate.mockClear() + recordToolObservation(span, { + toolName: 'FileReadTool', + toolUseId: 'tu-2', + input: { file_path: '/tmp/file.ts' }, + output: 'file content here', + }) + expect(mockRootUpdate).toHaveBeenCalledWith(expect.objectContaining({ + output: '[file content redacted, 17 chars]', + })) + }) + + test('sets ERROR level for error observations', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, recordToolObservation } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + mockRootUpdate.mockClear() + recordToolObservation(span, { + toolName: 'BashTool', + toolUseId: 'tu-3', + input: {}, + output: 'error occurred', + isError: true, + }) + expect(mockRootUpdate).toHaveBeenCalledWith(expect.objectContaining({ level: 'ERROR' })) + }) + }) + + describe('endTrace', () => { + test('no-ops when rootSpan is null', async () => { + const { endTrace } = await import('../tracing.js') + endTrace(null) + expect(mockRootEnd).not.toHaveBeenCalled() + }) + + test('calls span.end()', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, endTrace } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + endTrace(span) + expect(mockRootEnd).toHaveBeenCalled() + }) + + test('calls span.update() with output when provided', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, endTrace } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + endTrace(span, 'final output') + expect(mockRootUpdate).toHaveBeenCalledWith({ output: 'final output' }) + expect(mockRootEnd).toHaveBeenCalled() + }) + }) + + describe('createSubagentTrace', () => { + test('returns null when langfuse not enabled', async () => { + const { createSubagentTrace } = await import('../tracing.js') + const span = createSubagentTrace({ + sessionId: 's1', + agentType: 'Explore', + agentId: 'agent-1', + model: 'claude-3', + provider: 'firstParty', + }) + expect(span).toBeNull() + }) + + test('creates trace with agentType and agentId metadata', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createSubagentTrace } = await import('../tracing.js') + const span = createSubagentTrace({ + sessionId: 's1', + agentType: 'Explore', + agentId: 'agent-1', + model: 'claude-3', + provider: 'firstParty', + input: [{ role: 'user', content: 'search for X' }], + }) + expect(span).not.toBeNull() + expect(mockStartObservation).toHaveBeenCalledWith('agent:Explore', expect.objectContaining({ + metadata: expect.objectContaining({ + agentType: 'Explore', + agentId: 'agent-1', + provider: 'firstParty', + model: 'claude-3', + }), + }), { asType: 'agent' }) + // Verify session.id attribute is set + expect(mockSetAttribute).toHaveBeenCalledWith('session.id', 's1') + }) + + test('returns null on SDK error', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') }) + const { createSubagentTrace } = await import('../tracing.js') + const span = createSubagentTrace({ + sessionId: 's1', + agentType: 'Plan', + agentId: 'agent-2', + model: 'claude-3', + provider: 'firstParty', + }) + expect(span).toBeNull() + }) + }) + + describe('createTrace with querySource', () => { + test('includes querySource in metadata', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace } = await import('../tracing.js') + const span = createTrace({ + sessionId: 's1', + model: 'claude-3', + provider: 'firstParty', + querySource: 'user', + }) + expect(span).not.toBeNull() + expect(mockStartObservation).toHaveBeenCalledWith('agent-run:user', expect.objectContaining({ + metadata: expect.objectContaining({ + agentType: 'main', + querySource: 'user', + }), + }), { asType: 'agent' }) + }) + + test('omits querySource when not provided', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + mockStartObservation.mockClear() + const { createTrace } = await import('../tracing.js') + createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + const calls = mockStartObservation.mock.calls as unknown[][] + const secondArg = calls[0]?.[1] as Record | undefined + const metadata = (secondArg?.metadata ?? {}) as Record + expect(metadata).not.toHaveProperty('querySource') + }) + }) + + describe('nested agent scenario', () => { + test('sub-agent trace shares sessionId with parent', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createTrace, createSubagentTrace } = await import('../tracing.js') + mockSetAttribute.mockClear() + + // Create parent trace + const parentSpan = createTrace({ + sessionId: 'shared-session', + model: 'claude-3', + provider: 'firstParty', + }) + + // Create sub-agent trace with same sessionId + const subSpan = createSubagentTrace({ + sessionId: 'shared-session', + agentType: 'Explore', + agentId: 'agent-explore-1', + model: 'claude-3', + provider: 'firstParty', + }) + + expect(parentSpan).not.toBeNull() + expect(subSpan).not.toBeNull() + + // Both should have set session.id attribute + const sessionAttributeCalls = mockSetAttribute.mock.calls.filter( + (call: unknown[]) => Array.isArray(call) && call[0] === 'session.id' && call[1] === 'shared-session', + ) + expect(sessionAttributeCalls.length).toBeGreaterThanOrEqual(2) + }) + + test('query reuses passed langfuseTrace instead of creating new one', async () => { + // This validates the pattern used in query.ts: + // const ownsTrace = !params.toolUseContext.langfuseTrace + // const langfuseTrace = params.toolUseContext.langfuseTrace ?? createTrace(...) + // When langfuseTrace is already set, createTrace should NOT be called + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + const { createSubagentTrace } = await import('../tracing.js') + + // Simulate what runAgent does: create subTrace, then pass it as langfuseTrace + const subTrace = createSubagentTrace({ + sessionId: 's1', + agentType: 'Explore', + agentId: 'agent-1', + model: 'claude-3', + provider: 'firstParty', + }) + expect(subTrace).not.toBeNull() + + // Simulate query.ts logic: if langfuseTrace already set, don't create new one + const ownsTrace = false // Would be: !params.toolUseContext.langfuseTrace + const langfuseTrace = subTrace // Would be: params.toolUseContext.langfuseTrace ?? createTrace(...) + + expect(ownsTrace).toBe(false) + expect(langfuseTrace).toBe(subTrace) + }) + }) + + describe('SDK exceptions do not affect main flow', () => { + test('createTrace returns null on SDK error', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') }) + const { createTrace } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + expect(span).toBeNull() + }) + + test('recordLLMObservation silently fails on SDK error', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') }) + const { createTrace, recordLLMObservation } = await import('../tracing.js') + const span = createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + // The second call to startObservation (for the generation) will throw + mockStartObservation.mockImplementationOnce(() => { throw new Error('SDK error') }) + expect(() => recordLLMObservation(span, { + model: 'm', + provider: 'firstParty', + input: [], + output: [], + usage: { input_tokens: 1, output_tokens: 1 }, + })).not.toThrow() + }) + }) +}) diff --git a/src/services/langfuse/client.ts b/src/services/langfuse/client.ts new file mode 100644 index 000000000..89037c607 --- /dev/null +++ b/src/services/langfuse/client.ts @@ -0,0 +1,72 @@ +import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base' +import { LangfuseSpanProcessor } from '@langfuse/otel' +import type { MaskFunction } from '@langfuse/otel' +import { setLangfuseTracerProvider } from '@langfuse/tracing' +import { sanitizeGlobal } from './sanitize.js' +import { logForDebugging } from 'src/utils/debug.js' + +declare const MACRO: { VERSION: string } + +let processor: LangfuseSpanProcessor | null = null +let provider: BasicTracerProvider | null = null + +export function isLangfuseEnabled(): boolean { + return !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY) +} + +export function getLangfuseProcessor(): LangfuseSpanProcessor | null { + return processor +} + +export function initLangfuse(): boolean { + if (processor !== null) return true + if (!isLangfuseEnabled()) { + logForDebugging('[langfuse] No keys configured, running in no-op mode') + return false + } + + try { + const maskFn: MaskFunction = ({ data }) => sanitizeGlobal(data) + + processor = new LangfuseSpanProcessor({ + publicKey: process.env.LANGFUSE_PUBLIC_KEY, + secretKey: process.env.LANGFUSE_SECRET_KEY, + baseUrl: process.env.LANGFUSE_BASE_URL ?? 'https://cloud.langfuse.com', + flushAt: parseInt(process.env.LANGFUSE_FLUSH_AT ?? '20', 10), + flushInterval: parseInt(process.env.LANGFUSE_FLUSH_INTERVAL ?? '10', 10), + mask: maskFn, + environment: process.env.LANGFUSE_TRACING_ENVIRONMENT ?? 'development', + release: MACRO.VERSION, + exportMode: (process.env.LANGFUSE_EXPORT_MODE as 'batched' | 'immediate' | undefined) ?? 'batched', + timeout: parseInt(process.env.LANGFUSE_TIMEOUT ?? '5', 10), + }) + + provider = new BasicTracerProvider({ + spanProcessors: [processor], + }) + + setLangfuseTracerProvider(provider) + + logForDebugging('[langfuse] Initialized with LangfuseSpanProcessor') + return true + } catch (e) { + logForDebugging(`[langfuse] Init failed: ${e}`, { level: 'error' }) + processor = null + provider = null + return false + } +} + +export async function shutdownLangfuse(): Promise { + try { + if (processor) { + await processor.forceFlush() + await processor.shutdown() + } + processor = null + provider = null + logForDebugging('[langfuse] Shutdown complete') + } catch (e) { + logForDebugging(`[langfuse] Shutdown error: ${e}`, { level: 'error' }) + } +} diff --git a/src/services/langfuse/convert.ts b/src/services/langfuse/convert.ts new file mode 100644 index 000000000..c07de5c94 --- /dev/null +++ b/src/services/langfuse/convert.ts @@ -0,0 +1,117 @@ +/** + * Convert internal Message types to Langfuse-compatible OpenAI-style chat format. + * + * Langfuse generations expect: + * input: { role, content }[] where content is string or structured parts + * output: { role: 'assistant', content: string | part[] } + */ + +import type { Message, AssistantMessage, UserMessage } from 'src/types/message.js' + +type LangfuseContentPart = + | { type: 'text'; text: string } + | { type: 'tool_use'; id: string; name: string; input: unknown } + | { type: 'tool_result'; tool_use_id: string; content: string } + | { type: 'thinking'; thinking: string } + | { type: string; [key: string]: unknown } + +type LangfuseChatMessage = { + role: 'user' | 'assistant' | 'system' + content: string | LangfuseContentPart[] +} + +function normalizeContent(content: unknown): string | LangfuseContentPart[] { + if (typeof content === 'string') return content + if (!Array.isArray(content)) return String(content ?? '') + + const parts: LangfuseContentPart[] = [] + for (const block of content) { + if (!block || typeof block !== 'object') continue + const b = block as Record + const type = b.type as string | undefined + + if (type === 'text') { + parts.push({ type: 'text', text: String(b.text ?? '') }) + } else if (type === 'thinking' || type === 'redacted_thinking') { + parts.push({ type: 'thinking', thinking: String(b.thinking ?? '[redacted]') }) + } else if (type === 'tool_use') { + parts.push({ type: 'tool_use', id: String(b.id ?? ''), name: String(b.name ?? ''), input: b.input }) + } else if (type === 'tool_result') { + const resultContent = Array.isArray(b.content) + ? (b.content as Record[]) + .map(c => { + if (c.type === 'text') return String(c.text ?? '') + if (c.type === 'image') return '[image]' + if (c.type === 'document') return '[document]' + return `[${String(c.type ?? 'unknown')}]` + }) + .join('\n') + : String(b.content ?? '') + parts.push({ type: 'tool_result', tool_use_id: String(b.tool_use_id ?? ''), content: resultContent }) + } else if (type === 'image') { + parts.push({ type: 'text', text: '[image]' }) + } else if (type === 'document') { + const name = (b.source as Record | undefined)?.filename + ?? (b.title as string | undefined) + ?? 'document' + parts.push({ type: 'text', text: `[document: ${name}]` }) + } else if (type === 'server_tool_use' || type === 'web_search_tool_result' || type === 'tool_search_tool_result') { + // server-side tool blocks — keep name/id, drop raw content + parts.push({ type: type, id: String(b.id ?? ''), name: String(b.name ?? type) }) + } else { + // unknown block: keep type + scalar fields only, drop any binary/large payloads + const safe: Record = { type: type ?? 'unknown' } + for (const [k, v] of Object.entries(b)) { + if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') safe[k] = v + } + parts.push(safe as LangfuseContentPart) + } + } + + // Collapse to plain string if only one text part + if (parts.length === 1 && parts[0]!.type === 'text') { + return (parts[0] as { type: 'text'; text: string }).text + } + return parts +} + +function toRole(msg: Message): 'user' | 'assistant' | 'system' { + if (msg.type === 'assistant') return 'assistant' + if (msg.type === 'system') return 'system' + return 'user' +} + +/** Convert messagesForAPI (UserMessage | AssistantMessage)[] → Langfuse input format */ +export function convertMessagesToLangfuse( + messages: (UserMessage | AssistantMessage)[], + systemPrompt?: readonly string[], +): LangfuseChatMessage[] { + const result: LangfuseChatMessage[] = [] + if (systemPrompt && systemPrompt.length > 0) { + for (const block of systemPrompt) { + if (block.trim()) result.push({ role: 'system', content: block }) + } + } + for (const msg of messages) { + const inner = msg.message + if (!inner) continue + const role = (inner.role as 'user' | 'assistant' | undefined) ?? toRole(msg) + result.push({ role, content: normalizeContent(inner.content) }) + } + return result +} + +/** Convert AssistantMessage[] (newMessages) → Langfuse output format (last assistant turn) */ +export function convertOutputToLangfuse( + messages: AssistantMessage[], +): LangfuseChatMessage | LangfuseChatMessage[] | null { + if (messages.length === 0) return null + if (messages.length === 1) { + const msg = messages[0]! + return { role: 'assistant', content: normalizeContent(msg.message?.content) } + } + return messages.map(msg => ({ + role: 'assistant' as const, + content: normalizeContent(msg.message?.content), + })) +} diff --git a/src/services/langfuse/index.ts b/src/services/langfuse/index.ts new file mode 100644 index 000000000..7cd968643 --- /dev/null +++ b/src/services/langfuse/index.ts @@ -0,0 +1,4 @@ +export { initLangfuse, shutdownLangfuse, isLangfuseEnabled, getLangfuseProcessor } from './client.js' +export { createTrace, createSubagentTrace, recordLLMObservation, recordToolObservation, endTrace } from './tracing.js' +export type { LangfuseSpan } from './tracing.js' +export { sanitizeToolInput, sanitizeToolOutput, sanitizeGlobal } from './sanitize.js' diff --git a/src/services/langfuse/sanitize.ts b/src/services/langfuse/sanitize.ts new file mode 100644 index 000000000..9a077b4e3 --- /dev/null +++ b/src/services/langfuse/sanitize.ts @@ -0,0 +1,70 @@ +const MAX_OUTPUT_LENGTH = 500 +const REDACTED_FILE_TOOLS = new Set(['FileReadTool', 'FileWriteTool', 'FileEditTool']) +const REDACTED_SHELL_TOOLS = new Set(['BashTool', 'PowerShellTool']) +const SENSITIVE_OUTPUT_TOOLS = new Set(['ConfigTool', 'MCPTool']) + +const HOME_DIR_PATTERN = new RegExp( + (process.env.HOME ?? '/Users/[^/]+').replace(/[.*+?^${}()|[\]\\]/g, '\\$&'), + 'g', +) + +const SENSITIVE_KEY_PATTERN = /(?:api_?key|token|secret|password|credential|auth_header)/i + +export function sanitizeGlobal(data: unknown): unknown { + if (typeof data === 'string') { + return data.replace(HOME_DIR_PATTERN, '~') + } + if (typeof data === 'object' && data !== null) { + return sanitizeObject(data as Record) + } + return data +} + +function sanitizeObject(obj: Record): Record { + const result: Record = {} + for (const [key, value] of Object.entries(obj)) { + if (SENSITIVE_KEY_PATTERN.test(key)) { + result[key] = '[REDACTED]' + } else if (typeof value === 'string') { + result[key] = value.replace(HOME_DIR_PATTERN, '~') + } else if (typeof value === 'object' && value !== null) { + result[key] = sanitizeObject(value as Record) + } else { + result[key] = value + } + } + return result +} + +export function sanitizeToolInput(toolName: string, input: unknown): unknown { + if (typeof input !== 'object' || input === null) return input + const obj = { ...(input as Record) } + + for (const key of Object.keys(obj)) { + if (SENSITIVE_KEY_PATTERN.test(key)) { + obj[key] = '[REDACTED]' + } + } + + for (const key of ['file_path', 'path', 'directory'] as const) { + if (key in obj && typeof obj[key] === 'string') { + obj[key] = (obj[key] as string).replace(HOME_DIR_PATTERN, '~') + } + } + return obj +} + +export function sanitizeToolOutput(toolName: string, output: string): string { + if (REDACTED_FILE_TOOLS.has(toolName)) { + return `[file content redacted, ${output.length} chars]` + } + if (REDACTED_SHELL_TOOLS.has(toolName)) { + if (output.length > MAX_OUTPUT_LENGTH) { + return output.slice(0, MAX_OUTPUT_LENGTH) + '\n[truncated]' + } + } + if (SENSITIVE_OUTPUT_TOOLS.has(toolName)) { + return `[${toolName} output redacted, ${output.length} chars]` + } + return output +} diff --git a/src/services/langfuse/tracing.ts b/src/services/langfuse/tracing.ts new file mode 100644 index 000000000..02a23c68e --- /dev/null +++ b/src/services/langfuse/tracing.ts @@ -0,0 +1,201 @@ +import { startObservation, LangfuseOtelSpanAttributes } from '@langfuse/tracing' +import type { LangfuseSpan, LangfuseGeneration, LangfuseAgent } from '@langfuse/tracing' +import { isLangfuseEnabled } from './client.js' +import { sanitizeToolInput, sanitizeToolOutput } from './sanitize.js' +import { logForDebugging } from 'src/utils/debug.js' + +export type { LangfuseSpan } + +// Root trace is an agent observation — represents one full agentic turn/session +type RootTrace = LangfuseAgent & { _sessionId?: string } + +export function createTrace(params: { + sessionId: string + model: string + provider: string + input?: unknown + name?: string + querySource?: string +}): LangfuseSpan | null { + if (!isLangfuseEnabled()) return null + try { + const traceName = params.name ?? (params.querySource ? `agent-run:${params.querySource}` : 'agent-run') + const rootSpan = startObservation(traceName, { + input: params.input, + metadata: { + provider: params.provider, + model: params.model, + agentType: 'main', + ...(params.querySource && { querySource: params.querySource }), + }, + }, { asType: 'agent' }) as RootTrace + rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, params.sessionId) + rootSpan._sessionId = params.sessionId + logForDebugging(`[langfuse] Trace created: ${rootSpan.id}`) + return rootSpan as unknown as LangfuseSpan + } catch (e) { + logForDebugging(`[langfuse] createTrace failed: ${e}`, { level: 'error' }) + return null + } +} + +const PROVIDER_GENERATION_NAMES: Record = { + firstParty: 'ChatAnthropic', + bedrock: 'ChatBedrockAnthropic', + vertex: 'ChatVertexAnthropic', + foundry: 'ChatFoundry', + openai: 'ChatOpenAI', + gemini: 'ChatGoogleGenerativeAI', + grok: 'ChatXAI', +} + +export function recordLLMObservation( + rootSpan: LangfuseSpan | null, + params: { + model: string + provider: string + input: unknown + output: unknown + usage: { input_tokens: number; output_tokens: number } + startTime?: Date + endTime?: Date + completionStartTime?: Date + }, +): void { + if (!rootSpan || !isLangfuseEnabled()) return + try { + const genName = PROVIDER_GENERATION_NAMES[params.provider] ?? `Chat${params.provider}` + + // Use the global startObservation directly instead of rootSpan.startObservation(). + // The instance method only forwards asType to the global function and drops startTime, + // which causes negative TTFT because the OTel span's start time defaults to "now". + const gen: LangfuseGeneration = startObservation( + genName, + { + model: params.model, + input: params.input, + metadata: { + provider: params.provider, + model: params.model, + }, + ...(params.completionStartTime && { completionStartTime: params.completionStartTime }), + }, + { + asType: 'generation', + ...(params.startTime && { startTime: params.startTime }), + parentSpanContext: rootSpan.otelSpan.spanContext(), + }, + ) + + // Propagate session ID to generation span so Langfuse links it correctly + const sessionId = (rootSpan as unknown as RootTrace)._sessionId + if (sessionId) { + gen.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId) + } + + gen.update({ + output: params.output, + usageDetails: { + input: params.usage.input_tokens, + output: params.usage.output_tokens, + }, + }) + + gen.end(params.endTime) + logForDebugging(`[langfuse] LLM observation recorded: ${gen.id}`) + } catch (e) { + logForDebugging(`[langfuse] recordLLMObservation failed: ${e}`, { level: 'error' }) + } +} + +export function recordToolObservation( + rootSpan: LangfuseSpan | null, + params: { + toolName: string + toolUseId: string + input: unknown + output: string + startTime?: Date + isError?: boolean + }, +): void { + if (!rootSpan || !isLangfuseEnabled()) return + try { + // Use the global startObservation directly instead of rootSpan.startObservation(). + // The instance method only forwards asType and drops startTime, + // causing tool execution duration to be 0. + const toolObs = startObservation( + params.toolName, + { + input: sanitizeToolInput(params.toolName, params.input), + metadata: { + toolUseId: params.toolUseId, + isError: String(params.isError ?? false), + }, + }, + { + asType: 'tool', + ...(params.startTime && { startTime: params.startTime }), + parentSpanContext: rootSpan.otelSpan.spanContext(), + }, + ) + + // Propagate session ID to tool span so Langfuse links it correctly + const sessionId = (rootSpan as unknown as RootTrace)._sessionId + if (sessionId) { + toolObs.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId) + } + + toolObs.update({ + output: sanitizeToolOutput(params.toolName, params.output), + ...(params.isError && { level: 'ERROR' as const }), + }) + + toolObs.end() + logForDebugging(`[langfuse] Tool observation recorded: ${params.toolName} (${toolObs.id})`) + } catch (e) { + logForDebugging(`[langfuse] recordToolObservation failed: ${e}`, { level: 'error' }) + } +} + +export function createSubagentTrace(params: { + sessionId: string + agentType: string + agentId: string + model: string + provider: string + input?: unknown +}): LangfuseSpan | null { + if (!isLangfuseEnabled()) return null + try { + const rootSpan = startObservation(`agent:${params.agentType}`, { + input: params.input, + metadata: { + provider: params.provider, + model: params.model, + agentType: params.agentType, + agentId: params.agentId, + }, + }, { asType: 'agent' }) as RootTrace + rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, params.sessionId) + rootSpan._sessionId = params.sessionId + logForDebugging(`[langfuse] Sub-agent trace created: ${rootSpan.id} (type=${params.agentType})`) + return rootSpan as unknown as LangfuseSpan + } catch (e) { + logForDebugging(`[langfuse] createSubagentTrace failed: ${e}`, { level: 'error' }) + return null + } +} + +export function endTrace(rootSpan: LangfuseSpan | null, output?: unknown): void { + if (!rootSpan) return + try { + if (output !== undefined) { + rootSpan.update({ output }) + } + rootSpan.end() + logForDebugging(`[langfuse] Trace ended: ${rootSpan.id}`) + } catch (e) { + logForDebugging(`[langfuse] endTrace failed: ${e}`, { level: 'error' }) + } +} diff --git a/src/services/tools/toolExecution.ts b/src/services/tools/toolExecution.ts index 255fc878f..e67ede8fb 100644 --- a/src/services/tools/toolExecution.ts +++ b/src/services/tools/toolExecution.ts @@ -50,6 +50,7 @@ import { } from '../../tools/ToolSearchTool/prompt.js' import { getAllBaseTools } from '../../tools.js' import type { HookProgress } from '../../types/hooks.js' +import { recordToolObservation } from '../langfuse/index.js' import type { AssistantMessage, AttachmentMessage, @@ -1300,6 +1301,16 @@ async function checkPermissionsAndCallTool( : String(result.data ?? '') endToolSpan(toolResultStr) + // Record tool observation in Langfuse (no-op if not configured) + recordToolObservation(toolUseContext.langfuseTrace ?? null, { + toolName: tool.name, + toolUseId: toolUseID, + input: processedInput, + output: toolResultStr, + startTime: new Date(startTime), + isError: false, + }) + // Map the tool result to API format once and cache it. This block is reused // by addToolResult (skipping the remap) and measured here for analytics. const mappedToolResultBlock = tool.mapToolResultToToolResultBlockParam( @@ -1609,6 +1620,16 @@ async function checkPermissionsAndCallTool( }) endToolSpan() + // Record error observation in Langfuse (no-op if not configured) + recordToolObservation(toolUseContext.langfuseTrace ?? null, { + toolName: tool?.name ?? 'unknown', + toolUseId: toolUseID, + input: processedInput ?? input, + output: errorMessage(error), + startTime: new Date(startTime), + isError: true, + }) + // Handle MCP auth errors by updating the client status to 'needs-auth' // This updates the /mcp display to show the server needs re-authorization if (error instanceof McpAuthError) { diff --git a/src/tools/AgentTool/runAgent.ts b/src/tools/AgentTool/runAgent.ts index 8d2b0cf7b..3672ba577 100644 --- a/src/tools/AgentTool/runAgent.ts +++ b/src/tools/AgentTool/runAgent.ts @@ -57,6 +57,12 @@ import { clearSessionHooks } from '../../utils/hooks/sessionHooks.js' import { executeSubagentStartHooks } from '../../utils/hooks.js' import { createUserMessage } from '../../utils/messages.js' import { getAgentModel } from '../../utils/model/agent.js' +import { getAPIProvider } from '../../utils/model/providers.js' +import { + createSubagentTrace, + endTrace, + isLangfuseEnabled, +} from '../../services/langfuse/index.js' import type { ModelAlias } from '../../utils/model/aliases.js' import { clearAgentTranscriptSubdir, @@ -744,6 +750,25 @@ export async function* runAgent({ // Track the last recorded message UUID for parent chain continuity let lastRecordedUuid: UUID | null = initialMessages.at(-1)?.uuid ?? null + // Create Langfuse sub-agent trace (no-op if not configured). + // Sub-agent trace shares the same sessionId as the parent, so Langfuse + // groups them under the same Session view. + const subTrace = isLangfuseEnabled() + ? createSubagentTrace({ + sessionId: getSessionId(), + agentType: agentDefinition.agentType, + agentId, + model: resolvedAgentModel, + provider: getAPIProvider(), + input: initialMessages, + }) + : null + + // Attach sub-agent trace to toolUseContext so query() reuses it + if (subTrace) { + agentToolUseContext.langfuseTrace = subTrace + } + try { for await (const message of query({ messages: initialMessages, @@ -814,6 +839,8 @@ export async function* runAgent({ agentDefinition.callback() } } finally { + // End Langfuse sub-agent trace (no-op if not configured) + endTrace(subTrace) // Clean up agent-specific MCP servers (runs on normal completion, abort, or error) await mcpCleanup() // Clean up agent's session hooks