diff --git a/src/entrypoints/init.ts b/src/entrypoints/init.ts index 05a6fda64..3e0c33933 100644 --- a/src/entrypoints/init.ts +++ b/src/entrypoints/init.ts @@ -49,6 +49,7 @@ import { isBetaTracingEnabled } from '../utils/telemetry/betaSessionTracing.js' import { getTelemetryAttributes } from '../utils/telemetryAttributes.js' import { setShellIfWindows } from '../utils/windowsPaths.js' import { initSentry } from '../utils/sentry.js' +import { initUser } from '../utils/user.js' import { initLangfuse, shutdownLangfuse } from '../services/langfuse/index.js' // initialize1PEventLogging is dynamically imported to defer OpenTelemetry sdk-logs/resources @@ -156,6 +157,8 @@ export const init = memoize(async (): Promise => { initSentry() // Initialize Langfuse tracing (no-op if keys not configured) + // Pre-warm user email cache so Langfuse traces include userId + await initUser() initLangfuse() registerCleanup(shutdownLangfuse) diff --git a/src/services/langfuse/__tests__/langfuse.test.ts b/src/services/langfuse/__tests__/langfuse.test.ts index 53dafb824..38beaa035 100644 --- a/src/services/langfuse/__tests__/langfuse.test.ts +++ b/src/services/langfuse/__tests__/langfuse.test.ts @@ -29,6 +29,7 @@ const mockRootEnd = mock(() => {}) // Mock LangfuseOtelSpanAttributes (re-exported from @langfuse/core) const mockLangfuseOtelSpanAttributes: Record = { TRACE_SESSION_ID: 'session.id', + TRACE_USER_ID: 'user.id', OBSERVATION_TYPE: 'observation.type', OBSERVATION_INPUT: 'observation.input', OBSERVATION_OUTPUT: 'observation.output', @@ -74,6 +75,14 @@ mock.module('src/utils/debug.js', () => ({ logForDebugging: mock(() => {}), })) +// Mock user data — resolveLangfuseUserId uses getCoreUserData().email and .deviceId +mock.module('src/utils/user.js', () => ({ + getCoreUserData: mock(() => ({ + email: 'test-device-id', + deviceId: 'test-device-id', + })), +})) + describe('Langfuse integration', () => { beforeEach(() => { // Reset env @@ -477,6 +486,70 @@ describe('Langfuse integration', () => { }) }) + describe('createTrace with username', () => { + test('sets user.id attribute when username is provided', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + mockSetAttribute.mockClear() + const { createTrace } = await import('../tracing.js') + const span = createTrace({ + sessionId: 's1', + model: 'claude-3', + provider: 'firstParty', + username: 'user@example.com', + }) + expect(span).not.toBeNull() + expect(mockSetAttribute).toHaveBeenCalledWith('user.id', 'user@example.com') + }) + + test('falls back to LANGFUSE_USER_ID env when username not provided', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + process.env.LANGFUSE_USER_ID = 'env-user@test.com' + mockSetAttribute.mockClear() + const { createTrace } = await import('../tracing.js') + const span = createTrace({ + sessionId: 's1', + model: 'claude-3', + provider: 'firstParty', + }) + expect(span).not.toBeNull() + expect(mockSetAttribute).toHaveBeenCalledWith('user.id', 'env-user@test.com') + delete process.env.LANGFUSE_USER_ID + }) + + test('falls back to deviceId when neither username nor env is provided', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + delete process.env.LANGFUSE_USER_ID + mockSetAttribute.mockClear() + const { createTrace } = await import('../tracing.js') + createTrace({ sessionId: 's1', model: 'claude-3', provider: 'firstParty' }) + // Falls back to getCoreUserData().deviceId (mocked as 'test-device-id') + expect(mockSetAttribute).toHaveBeenCalledWith('user.id', 'test-device-id') + }) + + test('username takes precedence over LANGFUSE_USER_ID env', async () => { + process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' + process.env.LANGFUSE_SECRET_KEY = 'sk-test' + process.env.LANGFUSE_USER_ID = 'env-user@test.com' + mockSetAttribute.mockClear() + const { createTrace } = await import('../tracing.js') + createTrace({ + sessionId: 's1', + model: 'claude-3', + provider: 'firstParty', + username: 'param-user@test.com', + }) + const userIdCalls = mockSetAttribute.mock.calls.filter( + (call: unknown[]) => Array.isArray(call) && call[0] === 'user.id', + ) + expect(userIdCalls.length).toBe(1) + expect((userIdCalls[0] as unknown[])[1]).toBe('param-user@test.com') + delete process.env.LANGFUSE_USER_ID + }) + }) + describe('nested agent scenario', () => { test('sub-agent trace shares sessionId with parent', async () => { process.env.LANGFUSE_PUBLIC_KEY = 'pk-test' diff --git a/src/services/langfuse/tracing.ts b/src/services/langfuse/tracing.ts index fc37faab0..c9fc7df17 100644 --- a/src/services/langfuse/tracing.ts +++ b/src/services/langfuse/tracing.ts @@ -3,11 +3,17 @@ import type { LangfuseSpan, LangfuseGeneration, LangfuseAgent } from '@langfuse/ import { isLangfuseEnabled } from './client.js' import { sanitizeToolInput, sanitizeToolOutput } from './sanitize.js' import { logForDebugging } from 'src/utils/debug.js' +import { getCoreUserData } from 'src/utils/user.js' export type { LangfuseSpan } // Root trace is an agent observation — represents one full agentic turn/session -type RootTrace = LangfuseAgent & { _sessionId?: string } +type RootTrace = LangfuseAgent & { _sessionId?: string; _userId?: string } + +/** Resolve the user ID for Langfuse traces: explicit param > env var > email > deviceId */ +function resolveLangfuseUserId(username?: string): string | undefined { + return username ?? process.env.LANGFUSE_USER_ID ?? getCoreUserData().email ?? getCoreUserData().deviceId +} export function createTrace(params: { sessionId: string @@ -16,6 +22,7 @@ export function createTrace(params: { input?: unknown name?: string querySource?: string + username?: string }): LangfuseSpan | null { if (!isLangfuseEnabled()) return null try { @@ -31,6 +38,11 @@ export function createTrace(params: { }, { asType: 'agent' }) as RootTrace rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, params.sessionId) rootSpan._sessionId = params.sessionId + const userId = resolveLangfuseUserId(params.username) + if (userId) { + rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_USER_ID, userId) + rootSpan._userId = userId + } logForDebugging(`[langfuse] Trace created: ${rootSpan.id}`) return rootSpan as unknown as LangfuseSpan } catch (e) { @@ -87,11 +99,15 @@ export function recordLLMObservation( }, ) - // Propagate session ID to generation span so Langfuse links it correctly + // Propagate session ID and user ID to generation span so Langfuse links it correctly const sessionId = (rootSpan as unknown as RootTrace)._sessionId if (sessionId) { gen.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId) } + const userId = (rootSpan as unknown as RootTrace)._userId + if (userId) { + gen.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_USER_ID, userId) + } gen.update({ output: params.output, @@ -142,11 +158,15 @@ export function recordToolObservation( }, ) - // Propagate session ID to tool span so Langfuse links it correctly + // Propagate session ID and user ID to tool span so Langfuse links it correctly const sessionId = (rootSpan as unknown as RootTrace)._sessionId if (sessionId) { toolObs.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId) } + const userId = (rootSpan as unknown as RootTrace)._userId + if (userId) { + toolObs.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_USER_ID, userId) + } toolObs.update({ output: sanitizeToolOutput(params.toolName, params.output), @@ -190,6 +210,10 @@ export function createToolBatchSpan( if (sessionId) { batchSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, sessionId) } + const userId = (rootSpan as unknown as RootTrace)._userId + if (userId) { + batchSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_USER_ID, userId) + } logForDebugging(`[langfuse] Tool batch span created: ${batchSpan.id} (tools=${params.toolNames.join(',')})`) return batchSpan @@ -216,6 +240,7 @@ export function createSubagentTrace(params: { model: string provider: string input?: unknown + username?: string }): LangfuseSpan | null { if (!isLangfuseEnabled()) return null try { @@ -230,6 +255,11 @@ export function createSubagentTrace(params: { }, { asType: 'agent' }) as RootTrace rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_SESSION_ID, params.sessionId) rootSpan._sessionId = params.sessionId + const userId = resolveLangfuseUserId(params.username) + if (userId) { + rootSpan.otelSpan.setAttribute(LangfuseOtelSpanAttributes.TRACE_USER_ID, userId) + rootSpan._userId = userId + } logForDebugging(`[langfuse] Sub-agent trace created: ${rootSpan.id} (type=${params.agentType})`) return rootSpan as unknown as LangfuseSpan } catch (e) {