Compare commits

..

6 Commits

Author SHA1 Message Date
claude-code-best
0fcdcd6018 docs: README 添加安装/更新失败的解决方案提示
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 16:59:55 +08:00
claude-code-best
4cbef9667d fix: Edit 工具增加 Tab/空格规范化匹配,修复中文和缩进文件编辑失败
Read 工具输出将 Tab 渲染为空格,用户复制后 Edit 工具无法匹配。
在 findActualString 中增加 Tab→空格规范化回退匹配,并精确映射回原始文件位置。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 16:52:11 +08:00
claude-code-best
c6338917e5 feat: 省略旧消息的代码 diff 展示,仅保留最新消息的完整 diff 2026-04-27 14:52:01 +08:00
claude-code-best
bcbb8a6e93 fix: 统一传递完整 thinking 配置而非仅 thinkingType
Langfuse 追踪直接传递整个 thinking 对象(含 type 和 budget_tokens),
Analytics 日志同步补充 thinkingBudgetTokens 字段,logAPIQuery 改为
接收 ThinkingConfig 类型参数。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 14:52:01 +08:00
claude-code-best
3fb48ec106 fix: langfuse tracing 兼容 budget_tokens snake_case 格式
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 14:52:01 +08:00
claude-code-best
36bf4f260f feat: langfuse tracing 增加 thinking 参数记录
在 recordLLMObservation 中添加 thinking 配置(type/budgetTokens),
所有 provider(claude/gemini/openai)及 tokenEstimation、sideQuery
调用处同步传递 thinking 信息,便于 Langfuse 面板观察 thinking 使用情况。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 14:52:01 +08:00
22 changed files with 325 additions and 407 deletions

View File

@@ -55,6 +55,8 @@ ccb update # 更新到最新版本
CLAUDE_BRIDGE_BASE_URL=https://remote-control.claude-code-best.win/ CLAUDE_BRIDGE_OAUTH_TOKEN=test-my-key ccb --remote-control # 我们有自部署的远程控制 CLAUDE_BRIDGE_BASE_URL=https://remote-control.claude-code-best.win/ CLAUDE_BRIDGE_OAUTH_TOKEN=test-my-key ccb --remote-control # 我们有自部署的远程控制
``` ```
> **安装/更新失败?** 先 `npm rm -g claude-code-best` 清理旧版本,再 `npm i -g claude-code-best@latest`。仍失败则指定版本号:`npm i -g claude-code-best@<版本号>`
## ⚡ 快速开始(源码版) ## ⚡ 快速开始(源码版)
### ⚙️ 环境要求 ### ⚙️ 环境要求

View File

@@ -106,6 +106,84 @@ describe("findActualString", () => {
const result = findActualString("hello", ""); const result = findActualString("hello", "");
expect(result).toBe(""); expect(result).toBe("");
}); });
// ── Tab/space normalization (Bug #2 reproduction) ──
test("finds match when search uses spaces but file uses tabs", () => {
// File content uses Tab indentation
const fileContent = "\tif (x) {\n\t\treturn 1;\n\t}";
// User copies from Read output which renders tabs as spaces
const searchWithSpaces = " if (x) {\n return 1;\n }";
const result = findActualString(fileContent, searchWithSpaces);
expect(result).not.toBeNull();
expect(result).toBe(fileContent);
});
test("finds match when search mixes tabs and spaces inconsistently", () => {
const fileContent = "\tconst x = 1; // comment";
const searchMixed = " const x = 1; // comment";
const result = findActualString(fileContent, searchMixed);
expect(result).not.toBeNull();
});
test("finds match for single-line tab-to-space mismatch", () => {
const fileContent = "\t\torder_price = NormalizeDouble(ask, digits);";
const searchSpaces = " order_price = NormalizeDouble(ask, digits);";
const result = findActualString(fileContent, searchSpaces);
expect(result).not.toBeNull();
});
// ── CJK / UTF-8 characters (Bug #1 reproduction) ──
test("finds match with CJK characters in content", () => {
const fileContent = "input int x = 620; // 止盈点数(点) — 32个pip=320点";
const result = findActualString(fileContent, fileContent);
expect(result).toBe(fileContent);
});
test("finds match with CJK characters when tab/space differs", () => {
const fileContent = "\t// 向上突破 → Sell Limit (逆方向做空)";
const searchSpaces = " // 向上突破 → Sell Limit (逆方向做空)";
const result = findActualString(fileContent, searchSpaces);
expect(result).not.toBeNull();
expect(result).toBe(fileContent);
});
// ── Multiline with tabs + CJK (combined Bug #1 + #2) ──
test("finds multiline match with tabs and CJK characters", () => {
const fileContent = "\tif(effective_dir == BREAKOUT_UP)\n\t\t{\n\t\t\t// 向上突破\n\t\t}";
const searchSpaces = " if(effective_dir == BREAKOUT_UP)\n {\n // 向上突破\n }";
const result = findActualString(fileContent, searchSpaces);
expect(result).not.toBeNull();
expect(result).toBe(fileContent);
});
// ── Returned string must be a valid substring of fileContent ──
test("returned string from tab match is a real substring of fileContent", () => {
const fileContent = "prefix\n\t\tindented code\nsuffix";
const searchSpaces = "prefix\n indented code\nsuffix";
const result = findActualString(fileContent, searchSpaces);
expect(result).not.toBeNull();
expect(fileContent.includes(result!)).toBe(true);
});
test("returned string from partial tab match is a real substring", () => {
const fileContent = "line1\n\tif (x) {\n\t\tdoStuff();\n\t}\nline5";
const searchSpaces = " if (x) {\n doStuff();\n }";
const result = findActualString(fileContent, searchSpaces);
expect(result).not.toBeNull();
expect(fileContent.includes(result!)).toBe(true);
});
test("tab match with mixed indentation levels", () => {
const fileContent = "class Foo {\n\t\tmethod1() {\n\t\t\treturn 42;\n\t\t}\n}";
const searchSpaces = "class Foo {\n method1() {\n return 42;\n }\n}";
const result = findActualString(fileContent, searchSpaces);
expect(result).not.toBeNull();
expect(fileContent.includes(result!)).toBe(true);
});
}); });
// ─── preserveQuoteStyle ───────────────────────────────────────────────── // ─── preserveQuoteStyle ─────────────────────────────────────────────────

View File

@@ -63,9 +63,26 @@ export function stripTrailingWhitespace(str: string): string {
return result return result
} }
/**
* Normalizes whitespace for fuzzy matching by converting tabs to spaces
* and collapsing leading whitespace on each line to a canonical form.
* This handles the case where Read tool output renders tabs as spaces,
* so users copy spaces from the output but the file actually has tabs.
*/
function normalizeWhitespace(str: string): string {
return str.replace(/\t/g, ' ')
}
/** /**
* Finds the actual string in the file content that matches the search string, * Finds the actual string in the file content that matches the search string,
* accounting for quote normalization * accounting for quote normalization and tab/space differences.
*
* Matching cascade:
* 1. Exact match
* 2. Quote normalization (curly → straight quotes)
* 3. Tab/space normalization (tabs ↔ spaces in leading whitespace)
* 4. Quote + tab/space normalization combined
*
* @param fileContent The file content to search in * @param fileContent The file content to search in
* @param searchString The string to search for * @param searchString The string to search for
* @returns The actual string found in the file, or null if not found * @returns The actual string found in the file, or null if not found
@@ -89,9 +106,92 @@ export function findActualString(
return fileContent.substring(searchIndex, searchIndex + searchString.length) return fileContent.substring(searchIndex, searchIndex + searchString.length)
} }
// Try with tab/space normalization — handles the case where Read output
// renders tabs as spaces and the user copies the rendered version
const wsNormalizedFile = normalizeWhitespace(fileContent)
const wsNormalizedSearch = normalizeWhitespace(searchString)
const wsSearchIndex = wsNormalizedFile.indexOf(wsNormalizedSearch)
if (wsSearchIndex !== -1) {
// Map the match position back to the original file content.
// We need to find the corresponding range in the original string.
return mapNormalizedMatchBackToFile(fileContent, wsNormalizedFile, wsSearchIndex, wsNormalizedSearch.length)
}
// Try combined: quote normalization + tab/space normalization
const combinedFile = normalizeWhitespace(normalizedFile)
const combinedSearch = normalizeWhitespace(normalizedSearch)
const combinedIndex = combinedFile.indexOf(combinedSearch)
if (combinedIndex !== -1) {
return mapNormalizedMatchBackToFile(fileContent, combinedFile, combinedIndex, combinedSearch.length)
}
return null return null
} }
/**
* Given a match found in a normalized version of fileContent, map the match
* position back to the original fileContent and extract the corresponding
* substring.
*
* Strategy: walk through both strings character by character, building a
* mapping from normalized offset to original offset. When a tab is expanded
* to 4 spaces in the normalized version, the normalized offset advances by 4
* while the original offset advances by 1.
*/
function mapNormalizedMatchBackToFile(
fileContent: string,
normalizedFile: string,
normalizedStart: number,
normalizedLength: number,
): string {
// Build a sparse mapping from normalized position → original position.
// We only need to map the range [normalizedStart, normalizedStart + normalizedLength].
let normPos = 0
let origPos = 0
let origStart = -1
let origEnd = -1
while (origPos < fileContent.length && normPos <= normalizedStart + normalizedLength) {
if (normPos === normalizedStart) {
origStart = origPos
}
if (normPos === normalizedStart + normalizedLength) {
origEnd = origPos
break
}
const origChar = fileContent[origPos]!
if (origChar === '\t') {
// Tab expands to 4 spaces in normalized version
const nextNormPos = normPos + 4
// If normalizedStart falls within this expanded tab, snap to origPos
if (normPos < normalizedStart && nextNormPos > normalizedStart && origStart === -1) {
origStart = origPos
}
if (normPos < normalizedStart + normalizedLength && nextNormPos > normalizedStart + normalizedLength && origEnd === -1) {
origEnd = origPos + 1
}
normPos = nextNormPos
origPos++
} else {
normPos++
origPos++
}
}
// Fallback: if we couldn't map precisely, use character-count heuristic
if (origStart === -1) origStart = 0
if (origEnd === -1) {
// Approximate: use the ratio of original to normalized length
const ratio = fileContent.length / normalizedFile.length
origEnd = Math.round(origStart + normalizedLength * ratio)
}
return fileContent.substring(origStart, origEnd)
}
/** /**
* When old_string matched via quote normalization (curly quotes in file, * When old_string matched via quote normalization (curly quotes in file,
* straight quotes from model), apply the same curly quote style to new_string * straight quotes from model), apply the same curly quote style to new_string

View File

@@ -77,6 +77,8 @@ export type Props = {
lastThinkingBlockId?: string | null lastThinkingBlockId?: string | null
/** UUID of the latest user bash output message (for auto-expanding) */ /** UUID of the latest user bash output message (for auto-expanding) */
latestBashOutputUUID?: string | null latestBashOutputUUID?: string | null
/** Whether to collapse diff display for this message */
shouldCollapseDiffs?: boolean
} }
function MessageImpl({ function MessageImpl({
@@ -99,6 +101,7 @@ function MessageImpl({
isUserContinuation = false, isUserContinuation = false,
lastThinkingBlockId, lastThinkingBlockId,
latestBashOutputUUID, latestBashOutputUUID,
shouldCollapseDiffs,
}: Props): React.ReactNode { }: Props): React.ReactNode {
switch (message.type) { switch (message.type) {
case 'attachment': case 'attachment':
@@ -181,6 +184,7 @@ function MessageImpl({
isUserContinuation={isUserContinuation} isUserContinuation={isUserContinuation}
lookups={lookups} lookups={lookups}
isTranscriptMode={isTranscriptMode} isTranscriptMode={isTranscriptMode}
shouldCollapseDiffs={shouldCollapseDiffs}
/> />
))} ))}
</Box> </Box>
@@ -293,6 +297,7 @@ function UserMessage({
isUserContinuation, isUserContinuation,
lookups, lookups,
isTranscriptMode, isTranscriptMode,
shouldCollapseDiffs,
}: { }: {
message: NormalizedUserMessage message: NormalizedUserMessage
addMargin: boolean addMargin: boolean
@@ -309,6 +314,7 @@ function UserMessage({
isUserContinuation: boolean isUserContinuation: boolean
lookups: ReturnType<typeof buildMessageLookups> lookups: ReturnType<typeof buildMessageLookups>
isTranscriptMode: boolean isTranscriptMode: boolean
shouldCollapseDiffs?: boolean
}): React.ReactNode { }): React.ReactNode {
const { columns } = useTerminalSize() const { columns } = useTerminalSize()
switch (param.type) { switch (param.type) {
@@ -344,6 +350,7 @@ function UserMessage({
verbose={verbose} verbose={verbose}
width={columns - 5} width={columns - 5}
isTranscriptMode={isTranscriptMode} isTranscriptMode={isTranscriptMode}
shouldCollapseDiffs={shouldCollapseDiffs}
/> />
) )
default: default:

View File

@@ -55,6 +55,7 @@ export type Props = {
columns: number columns: number
isLoading: boolean isLoading: boolean
lookups: ReturnType<typeof buildMessageLookups> lookups: ReturnType<typeof buildMessageLookups>
shouldCollapseDiffs?: boolean
} }
/** /**
@@ -141,6 +142,7 @@ function MessageRowImpl({
columns, columns,
isLoading, isLoading,
lookups, lookups,
shouldCollapseDiffs,
}: Props): React.ReactNode { }: Props): React.ReactNode {
const isTranscriptMode = screen === 'transcript' const isTranscriptMode = screen === 'transcript'
const isGrouped = msg.type === 'grouped_tool_use' const isGrouped = msg.type === 'grouped_tool_use'
@@ -221,6 +223,7 @@ function MessageRowImpl({
isUserContinuation={isUserContinuation} isUserContinuation={isUserContinuation}
lastThinkingBlockId={lastThinkingBlockId} lastThinkingBlockId={lastThinkingBlockId}
latestBashOutputUUID={latestBashOutputUUID} latestBashOutputUUID={latestBashOutputUUID}
shouldCollapseDiffs={shouldCollapseDiffs}
/> />
) )
// OffscreenFreeze: the outer React.memo already bails for static messages, // OffscreenFreeze: the outer React.memo already bails for static messages,

View File

@@ -814,6 +814,12 @@ const MessagesImpl = ({
streamingToolUseIDs, streamingToolUseIDs,
)) ))
// Collapse diffs for messages beyond the latest N messages.
// verbose (ctrl+o) overrides and always shows full diffs.
const DIFF_COLLAPSE_DISTANCE = 0
const shouldCollapseDiffs =
renderableMessages.length - 1 - index > DIFF_COLLAPSE_DISTANCE
const k = messageKey(msg) const k = messageKey(msg)
const row = ( const row = (
<MessageRow <MessageRow
@@ -838,6 +844,7 @@ const MessagesImpl = ({
columns={columns} columns={columns}
isLoading={isLoading} isLoading={isLoading}
lookups={lookups} lookups={lookups}
shouldCollapseDiffs={shouldCollapseDiffs}
/> />
) )

View File

@@ -27,6 +27,7 @@ type Props = {
verbose: boolean verbose: boolean
width: number | string width: number | string
isTranscriptMode?: boolean isTranscriptMode?: boolean
shouldCollapseDiffs?: boolean
} }
export function UserToolResultMessage({ export function UserToolResultMessage({
@@ -39,6 +40,7 @@ export function UserToolResultMessage({
verbose, verbose,
width, width,
isTranscriptMode, isTranscriptMode,
shouldCollapseDiffs,
}: Props): React.ReactNode { }: Props): React.ReactNode {
const toolUse = useGetToolFromMessages(param.tool_use_id, tools, lookups) const toolUse = useGetToolFromMessages(param.tool_use_id, tools, lookups)
if (!toolUse) { if (!toolUse) {
@@ -96,6 +98,7 @@ export function UserToolResultMessage({
verbose={verbose} verbose={verbose}
width={width} width={width}
isTranscriptMode={isTranscriptMode} isTranscriptMode={isTranscriptMode}
shouldCollapseDiffs={shouldCollapseDiffs}
/> />
) )
} }

View File

@@ -33,6 +33,7 @@ type Props = {
verbose: boolean verbose: boolean
width: number | string width: number | string
isTranscriptMode?: boolean isTranscriptMode?: boolean
shouldCollapseDiffs?: boolean
} }
export function UserToolSuccessMessage({ export function UserToolSuccessMessage({
@@ -46,6 +47,7 @@ export function UserToolSuccessMessage({
verbose, verbose,
width, width,
isTranscriptMode, isTranscriptMode,
shouldCollapseDiffs,
}: Props): React.ReactNode { }: Props): React.ReactNode {
const [theme] = useTheme() const [theme] = useTheme()
// Hook stays inside feature() ternary so external builds don't pay a // Hook stays inside feature() ternary so external builds don't pay a
@@ -83,12 +85,16 @@ export function UserToolSuccessMessage({
} }
const toolResult = parsedOutput?.data ?? message.toolUseResult const toolResult = parsedOutput?.data ?? message.toolUseResult
// Collapse diff display for old messages (verbose/ctrl+o overrides)
const effectiveStyle =
shouldCollapseDiffs && !verbose ? 'condensed' : style
const renderedMessage = const renderedMessage =
tool.renderToolResultMessage?.( tool.renderToolResultMessage?.(
toolResult as never, toolResult as never,
filterToolProgressMessages(progressMessagesForMessage), filterToolProgressMessages(progressMessagesForMessage),
{ {
style, style: effectiveStyle,
theme, theme,
tools, tools,
verbose, verbose,

View File

@@ -6907,6 +6907,9 @@ async function logTenguInit({
allowDangerouslySkipPermissionsPassed, allowDangerouslySkipPermissionsPassed,
thinkingType: thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...(thinkingConfig.type === "enabled" && {
thinkingBudgetTokens: thinkingConfig.budgetTokens,
}),
...(systemPromptFlag && { ...(systemPromptFlag && {
systemPromptFlag: systemPromptFlag:
systemPromptFlag as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, systemPromptFlag as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,

View File

@@ -5,10 +5,7 @@ import type {
CacheSafeParams, CacheSafeParams,
ForkedAgentResult, ForkedAgentResult,
} from '../../../utils/forkedAgent.js' } from '../../../utils/forkedAgent.js'
import { import { startAgentSummarization } from '../agentSummary.js'
type AgentSummaryDependencies,
startAgentSummarization,
} from '../agentSummary.js'
const transcriptMessages = [ const transcriptMessages = [
{ type: 'user', message: { content: 'start' }, uuid: 'u1' }, { type: 'user', message: { content: 'start' }, uuid: 'u1' },
@@ -30,16 +27,17 @@ describe('startAgentSummarization', () => {
let forkCalls: ForkCall[] let forkCalls: ForkCall[]
let updateCalls: Array<{ taskId: string; summary: string }> let updateCalls: Array<{ taskId: string; summary: string }>
let transcriptMessagesForTest: Message[] let transcriptMessagesForTest: Message[]
let debugLogs: string[]
let loggedErrors: Error[]
let clearedHandles: unknown[]
let scheduledCount: number
let lastTimerHandle: unknown
function startTestSummarization( beforeEach(() => {
dependencies: AgentSummaryDependencies = {}, forkCalls = []
): { stop: () => void } { updateCalls = []
return startAgentSummarization( scheduled = undefined
handle = undefined
transcriptMessagesForTest = transcriptMessages
})
test('summarizes bounded transcript once and skips unchanged fingerprints', async () => {
handle = startAgentSummarization(
'task-1', 'task-1',
asAgentId('a0000000000000000'), asAgentId('a0000000000000000'),
{ {
@@ -50,22 +48,14 @@ describe('startAgentSummarization', () => {
} as unknown as CacheSafeParams, } as unknown as CacheSafeParams,
() => undefined, () => undefined,
{ {
clearTimeout: ((timeoutId: unknown) => { clearTimeout: () => undefined,
clearedHandles.push(timeoutId)
}) as typeof clearTimeout,
getAgentTranscript: async () => ({ getAgentTranscript: async () => ({
messages: transcriptMessagesForTest, messages: transcriptMessagesForTest,
contentReplacements: [], contentReplacements: [],
}), }),
isPoorModeActive: () => false, isPoorModeActive: () => false,
logError: error => { logError: () => undefined,
loggedErrors.push( logForDebugging: () => undefined,
error instanceof Error ? error : new Error(String(error)),
)
},
logForDebugging: message => {
debugLogs.push(message)
},
runForkedAgent: async (args: ForkCall) => { runForkedAgent: async (args: ForkCall) => {
forkCalls.push(args) forkCalls.push(args)
return { return {
@@ -83,38 +73,14 @@ describe('startAgentSummarization', () => {
if (typeof callback !== 'function') { if (typeof callback !== 'function') {
throw new Error('Expected timer callback') throw new Error('Expected timer callback')
} }
scheduledCount += 1
scheduled = callback as () => void | Promise<void> scheduled = callback as () => void | Promise<void>
lastTimerHandle = { id: scheduledCount } return 1 as unknown as ReturnType<typeof setTimeout>
return lastTimerHandle as ReturnType<typeof setTimeout>
}) as unknown as typeof setTimeout, }) as unknown as typeof setTimeout,
updateAgentSummary: (taskId: string, summary: string) => { updateAgentSummary: (taskId: string, summary: string) => {
updateCalls.push({ taskId, summary }) updateCalls.push({ taskId, summary })
}, },
...dependencies,
}, },
) )
}
beforeEach(() => {
forkCalls = []
updateCalls = []
scheduled = undefined
handle = undefined
transcriptMessagesForTest = transcriptMessages
debugLogs = []
loggedErrors = []
clearedHandles = []
scheduledCount = 0
lastTimerHandle = undefined
})
function expectDebugLogContaining(fragment: string): void {
expect(debugLogs.some(message => message.includes(fragment))).toBe(true)
}
test('summarizes bounded transcript once and skips unchanged fingerprints', async () => {
handle = startTestSummarization()
expect(typeof scheduled).toBe('function') expect(typeof scheduled).toBe('function')
await scheduled!() await scheduled!()
@@ -138,87 +104,49 @@ describe('startAgentSummarization', () => {
expect(forkCalls).toHaveLength(1) expect(forkCalls).toHaveLength(1)
expect(updateCalls).toHaveLength(1) expect(updateCalls).toHaveLength(1)
expect(loggedErrors).toEqual([])
}) })
test('skips summarization when filtering leaves too little bounded context', async () => { test('skips summarization when bounded context is too small', async () => {
transcriptMessagesForTest = [ transcriptMessagesForTest = transcriptMessages.slice(0, 2)
{ type: 'user', message: { content: 'start' }, uuid: 'u1' },
handle = startAgentSummarization(
'task-1',
asAgentId('a0000000000000000'),
{ {
type: 'assistant', forkContextMessages: transcriptMessages,
uuid: 'a1', model: 'claude-test',
message: { } as unknown as CacheSafeParams,
content: [{ type: 'tool_use', id: 'missing', name: 'Read' }], () => undefined,
{
clearTimeout: () => undefined,
getAgentTranscript: async () => ({
messages: transcriptMessagesForTest,
contentReplacements: [],
}),
isPoorModeActive: () => false,
logError: () => undefined,
logForDebugging: () => undefined,
runForkedAgent: async (args: ForkCall) => {
forkCalls.push(args)
return { messages: [] } as unknown as ForkedAgentResult
},
setTimeout: ((callback: TimerHandler) => {
if (typeof callback !== 'function') {
throw new Error('Expected timer callback')
}
scheduled = callback as () => void | Promise<void>
return 1 as unknown as ReturnType<typeof setTimeout>
}) as unknown as typeof setTimeout,
updateAgentSummary: (taskId: string, summary: string) => {
updateCalls.push({ taskId, summary })
}, },
}, },
{ type: 'user', message: { content: 'continue' }, uuid: 'u2' }, )
] as unknown as Message[]
handle = startTestSummarization()
expect(typeof scheduled).toBe('function') expect(typeof scheduled).toBe('function')
await scheduled!() await scheduled!()
expect(forkCalls).toEqual([]) expect(forkCalls).toEqual([])
expect(updateCalls).toEqual([]) expect(updateCalls).toEqual([])
expectDebugLogContaining('no bounded context available')
})
test('skips summarization before building context when transcript is too short', async () => {
transcriptMessagesForTest = transcriptMessages.slice(0, 2)
handle = startTestSummarization()
expect(typeof scheduled).toBe('function')
await scheduled!()
expect(forkCalls).toEqual([])
expect(updateCalls).toEqual([])
expectDebugLogContaining('not enough messages (2)')
})
test('skips and reschedules while poor mode is active', async () => {
handle = startTestSummarization({
isPoorModeActive: () => true,
})
expect(typeof scheduled).toBe('function')
const initialScheduledCount = scheduledCount
const initialTimerHandle = lastTimerHandle
await scheduled!()
expect(forkCalls).toEqual([])
expect(updateCalls).toEqual([])
expectDebugLogContaining('poor mode active')
expect(scheduledCount).toBe(initialScheduledCount + 1)
expect(lastTimerHandle).not.toBe(initialTimerHandle)
})
test('logs summary errors and schedules the next timer', async () => {
const error = new Error('fork failed')
handle = startTestSummarization({
runForkedAgent: async () => {
throw error
},
})
expect(typeof scheduled).toBe('function')
const initialScheduledCount = scheduledCount
const initialTimerHandle = lastTimerHandle
await scheduled!()
expect(loggedErrors).toEqual([error])
expect(updateCalls).toEqual([])
expect(scheduledCount).toBe(initialScheduledCount + 1)
expect(lastTimerHandle).not.toBe(initialTimerHandle)
})
test('stop clears the pending summary timer', () => {
handle = startTestSummarization()
const pendingHandle = lastTimerHandle
handle.stop()
expectDebugLogContaining('Stopping summarization for task-1')
expect(clearedHandles).toEqual([pendingHandle])
}) })
}) })

View File

@@ -141,13 +141,6 @@ describe('getSummaryContextFingerprint', () => {
expect(estimateMessageChars(message)).toBeGreaterThan(0) expect(estimateMessageChars(message)).toBeGreaterThan(0)
}) })
test('treats unsupported top-level primitives as zero-size estimates', () => {
expect(
estimateMessageChars((() => undefined) as unknown as Message),
).toBe(0)
expect(estimateMessageChars(1n as unknown as Message)).toBe(0)
})
test('returns null for an empty transcript', () => { test('returns null for an empty transcript', () => {
expect(getSummaryContextFingerprint([])).toBeNull() expect(getSummaryContextFingerprint([])).toBeNull()
}) })

View File

@@ -1776,6 +1776,10 @@ async function* queryModel(
// captures only primitives instead of paramsFromContext's full closure scope // captures only primitives instead of paramsFromContext's full closure scope
// (messagesForAPI, system, allTools, betas — the entire request-building // (messagesForAPI, system, allTools, betas — the entire request-building
// context), which would otherwise be pinned until the promise resolves. // context), which would otherwise be pinned until the promise resolves.
// Also capture thinking params for Langfuse observability.
// Pass the entire thinking config object so all fields (type, budget_tokens,
// and any future additions) flow through without cherry-picking.
let langfuseThinking: BetaMessageStreamParams['thinking'] | undefined
{ {
const queryParams = paramsFromContext({ const queryParams = paramsFromContext({
model: options.model, model: options.model,
@@ -1783,8 +1787,10 @@ async function* queryModel(
}) })
const logMessagesLength = queryParams.messages.length const logMessagesLength = queryParams.messages.length
const logBetas = useBetas ? (queryParams.betas ?? []) : [] const logBetas = useBetas ? (queryParams.betas ?? []) : []
const logThinkingType = queryParams.thinking?.type ?? 'disabled'
const logEffortValue = queryParams.output_config?.effort const logEffortValue = queryParams.output_config?.effort
if (queryParams.thinking && queryParams.thinking.type !== 'disabled') {
langfuseThinking = queryParams.thinking
}
void options.getToolPermissionContext().then(permissionContext => { void options.getToolPermissionContext().then(permissionContext => {
logAPIQuery({ logAPIQuery({
model: options.model, model: options.model,
@@ -1794,7 +1800,7 @@ async function* queryModel(
permissionMode: permissionContext.mode, permissionMode: permissionContext.mode,
querySource: options.querySource, querySource: options.querySource,
queryTracking: options.queryTracking, queryTracking: options.queryTracking,
thinkingType: logThinkingType, thinkingConfig,
effortValue: logEffortValue, effortValue: logEffortValue,
fastMode: isFastMode, fastMode: isFastMode,
previousRequestId, previousRequestId,
@@ -2545,6 +2551,9 @@ async function* queryModel(
maxOutputTokens, maxOutputTokens,
thinkingType: thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...(thinkingConfig.type === 'enabled' && {
thinkingBudgetTokens: thinkingConfig.budgetTokens,
}),
fallback_disabled: true, fallback_disabled: true,
request_id: (streamRequestId ?? request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
@@ -2577,6 +2586,9 @@ async function* queryModel(
maxOutputTokens, maxOutputTokens,
thinkingType: thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...(thinkingConfig.type === 'enabled' && {
thinkingBudgetTokens: thinkingConfig.budgetTokens,
}),
fallback_disabled: false, fallback_disabled: false,
request_id: (streamRequestId ?? request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
@@ -2693,6 +2705,9 @@ async function* queryModel(
maxOutputTokens, maxOutputTokens,
thinkingType: thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...(thinkingConfig.type === 'enabled' && {
thinkingBudgetTokens: thinkingConfig.budgetTokens,
}),
request_id: request_id:
failedRequestId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, failedRequestId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_cause: fallback_cause:
@@ -2925,6 +2940,7 @@ async function* queryModel(
endTime: new Date(), endTime: new Date(),
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]), tools: convertToolsToLangfuse(toolSchemas as unknown[]),
thinking: langfuseThinking,
}) })
void options.getToolPermissionContext().then(permissionContext => { void options.getToolPermissionContext().then(permissionContext => {

View File

@@ -193,6 +193,15 @@ export async function* queryModelGemini(
endTime: new Date(), endTime: new Date(),
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]), tools: convertToolsToLangfuse(toolSchemas as unknown[]),
thinking:
thinkingConfig.type !== 'disabled'
? {
type: thinkingConfig.type,
...(thinkingConfig.type === 'enabled' && {
budgetTokens: thinkingConfig.budgetTokens,
}),
}
: undefined,
}) })
} catch (error) { } catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error) const errorMessage = error instanceof Error ? error.message : String(error)

View File

@@ -23,6 +23,7 @@ import { getAPIProviderForStatsig } from 'src/utils/model/providers.js'
import type { PermissionMode } from 'src/utils/permissions/PermissionMode.js' import type { PermissionMode } from 'src/utils/permissions/PermissionMode.js'
import { jsonStringify } from 'src/utils/slowOperations.js' import { jsonStringify } from 'src/utils/slowOperations.js'
import { logOTelEvent } from 'src/utils/telemetry/events.js' import { logOTelEvent } from 'src/utils/telemetry/events.js'
import type { ThinkingConfig } from 'src/utils/thinking.js'
import { import {
endLLMRequestSpan, endLLMRequestSpan,
isBetaTracingEnabled, isBetaTracingEnabled,
@@ -176,7 +177,7 @@ export function logAPIQuery({
permissionMode, permissionMode,
querySource, querySource,
queryTracking, queryTracking,
thinkingType, thinkingConfig,
effortValue, effortValue,
fastMode, fastMode,
previousRequestId, previousRequestId,
@@ -188,11 +189,13 @@ export function logAPIQuery({
permissionMode?: PermissionMode permissionMode?: PermissionMode
querySource: string querySource: string
queryTracking?: QueryChainTracking queryTracking?: QueryChainTracking
thinkingType?: 'adaptive' | 'enabled' | 'disabled' thinkingConfig?: ThinkingConfig
effortValue?: EffortLevel | null effortValue?: EffortLevel | null
fastMode?: boolean fastMode?: boolean
previousRequestId?: string | null previousRequestId?: string | null
}): void { }): void {
const thinkingType = thinkingConfig?.type ?? 'disabled'
const thinkingBudgetTokens = thinkingConfig?.type === 'enabled' ? thinkingConfig.budgetTokens : undefined
logEvent('tengu_api_query', { logEvent('tengu_api_query', {
model: model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, model: model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
messagesLength, messagesLength,
@@ -219,6 +222,9 @@ export function logAPIQuery({
: {}), : {}),
thinkingType: thinkingType:
thinkingType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, thinkingType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...(thinkingBudgetTokens !== undefined && {
thinkingBudgetTokens,
}),
effortValue: effortValue:
effortValue as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, effortValue as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fastMode, fastMode,

View File

@@ -418,6 +418,7 @@ export async function* queryModelOpenAI(
endTime: new Date(), endTime: new Date(),
completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined, completionStartTime: ttftMs > 0 ? new Date(start + ttftMs) : undefined,
tools: convertToolsToLangfuse(toolSchemas as unknown[]), tools: convertToolsToLangfuse(toolSchemas as unknown[]),
...(enableThinking && { thinking: { type: 'enabled' } }),
}) })
// Safety: if stream ended without message_stop, assemble and yield whatever we have // Safety: if stream ended without message_stop, assemble and yield whatever we have

View File

@@ -78,6 +78,16 @@ export function recordLLMObservation(
endTime?: Date endTime?: Date
completionStartTime?: Date completionStartTime?: Date
tools?: unknown tools?: unknown
/** Thinking depth configuration used for this request.
* Accepts the full API thinking config object. Fields:
* - type: thinking mode ("enabled", "adaptive", "disabled")
* - budget_tokens (snake_case, from Anthropic API) or budgetTokens (camelCase)
*/
thinking?: {
type: string
budget_tokens?: number
budgetTokens?: number
}
}, },
): void { ): void {
if (!rootSpan || !isLangfuseEnabled()) return if (!rootSpan || !isLangfuseEnabled()) return
@@ -97,6 +107,7 @@ export function recordLLMObservation(
metadata: { metadata: {
provider: params.provider, provider: params.provider,
model: params.model, model: params.model,
...(params.thinking && { thinking: params.thinking }),
}, },
...(params.completionStartTime && { completionStartTime: params.completionStartTime }), ...(params.completionStartTime && { completionStartTime: params.completionStartTime }),
}, },

View File

@@ -354,6 +354,7 @@ export async function countTokensViaHaikuFallback(
}, },
startTime: new Date(apiStart), startTime: new Date(apiStart),
endTime: new Date(), endTime: new Date(),
...(containsThinking && { thinking: { type: 'enabled', budgetTokens: TOKEN_COUNT_THINKING_BUDGET } }),
}) })
endTrace(langfuseTrace) endTrace(langfuseTrace)

View File

@@ -1,10 +1,9 @@
import { afterEach, beforeEach, describe, expect, test } from 'bun:test' import { afterEach, beforeEach, describe, expect, test } from 'bun:test'
import { mkdir, readFile, rm, stat, writeFile } from 'node:fs/promises' import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'
import { mkdtempSync } from 'node:fs' import { mkdtempSync } from 'node:fs'
import { tmpdir } from 'node:os' import { tmpdir } from 'node:os'
import { dirname, join } from 'node:path' import { dirname, join } from 'node:path'
import type { Message } from 'src/types/message.js' import type { Message } from 'src/types/message.js'
import { getErrnoCode } from 'src/utils/errors.js'
import { import {
compactMailboxMessages, compactMailboxMessages,
getLastPeerDmSummary, getLastPeerDmSummary,
@@ -172,17 +171,6 @@ describe('compactMailboxMessages', () => {
expect(compacted).toEqual([]) expect(compacted).toEqual([])
}) })
test('returns an empty mailbox when all retention lanes are disabled', () => {
const compacted = compactMailboxMessages([message('unread', false)], {
maxMessages: 0,
maxReadMessages: 0,
maxUnreadProtocolMessages: 0,
maxRetainedBytes: 1_000,
})
expect(compacted).toEqual([])
})
}) })
describe('teammate mailbox retention', () => { describe('teammate mailbox retention', () => {
@@ -343,36 +331,6 @@ describe('teammate mailbox retention', () => {
expect(await readFile(inboxPath, 'utf-8')).toBe('{not-json') expect(await readFile(inboxPath, 'utf-8')).toBe('{not-json')
}) })
test('writeToMailbox rejects when the inbox path is already a directory', async () => {
const inboxPath = getInboxPath('worker', 'alpha')
await mkdir(inboxPath, { recursive: true })
const error = await writeToMailbox(
'worker',
{
from: 'team-lead',
text: 'new',
timestamp: new Date(5).toISOString(),
},
'alpha',
).then(
() => undefined,
err => err,
)
const code = getErrnoCode(error)
expect(code).toBeDefined()
if (code === undefined) {
throw new Error('Expected filesystem errno code')
}
const expectedCodes =
process.platform === 'win32'
? ['EISDIR', 'EPERM', 'EACCES']
: ['EISDIR']
expect(expectedCodes).toContain(code)
expect((await stat(inboxPath)).isDirectory()).toBe(true)
})
test('readMailbox fails closed on corrupt mailbox content', async () => { test('readMailbox fails closed on corrupt mailbox content', async () => {
const inboxPath = getInboxPath('worker', 'alpha') const inboxPath = getInboxPath('worker', 'alpha')
await mkdir(dirname(inboxPath), { recursive: true }) await mkdir(dirname(inboxPath), { recursive: true })

View File

@@ -11,7 +11,7 @@ import {
writeFile, writeFile,
} from 'node:fs/promises' } from 'node:fs/promises'
import { createHash } from 'node:crypto' import { createHash } from 'node:crypto'
import { createConnection, createServer, type Socket } from 'node:net' import { createConnection, createServer } from 'node:net'
import { dirname, join } from 'node:path' import { dirname, join } from 'node:path'
import { tmpdir } from 'node:os' import { tmpdir } from 'node:os'
import { import {
@@ -217,146 +217,6 @@ describe('UDS inbox retention', () => {
) )
}) })
test('udsClient send reports connection failures without leaking token state', async () => {
const path = socketPath('uds-client-connect-error')
const capabilityDir = join(tempConfigDir, 'messaging-capabilities')
const capabilityName = `${createHash('sha256').update(path).digest('hex')}.json`
await mkdir(capabilityDir, { recursive: true, mode: 0o700 })
await writeFile(
join(capabilityDir, capabilityName),
JSON.stringify({ socketPath: path, authToken: 'test-token' }),
'utf-8',
)
const { sendToUdsSocket, UdsPeerConnectionError } = await import(
'../udsClient.js'
)
const error = await sendToUdsSocket(path, 'hello').then(
() => undefined,
err => err,
)
expect(error).toBeInstanceOf(UdsPeerConnectionError)
if (!(error instanceof UdsPeerConnectionError)) {
throw new Error('Expected UDS peer connection error')
}
expect(error.socketPath).toBe(path)
expect(error.message).not.toContain('test-token')
})
test('udsClient send reports response timeouts as peer connection errors', async () => {
const path = socketPath('uds-client-timeout')
const capabilityDir = join(tempConfigDir, 'messaging-capabilities')
const capabilityName = `${createHash('sha256').update(path).digest('hex')}.json`
await mkdir(capabilityDir, { recursive: true, mode: 0o700 })
await writeFile(
join(capabilityDir, capabilityName),
JSON.stringify({ socketPath: path, authToken: 'test-token' }),
'utf-8',
)
if (process.platform !== 'win32') {
await mkdir(dirname(path), { recursive: true })
}
const sockets = new Set<Socket>()
const receiver = createServer(socket => {
sockets.add(socket)
socket.on('close', () => {
sockets.delete(socket)
})
socket.on('data', () => undefined)
})
await new Promise<void>((resolve, reject) => {
receiver.on('error', reject)
receiver.listen(path, () => resolve())
})
try {
const { sendToUdsSocket, UdsPeerConnectionError } = await import(
'../udsClient.js'
)
const error = await sendToUdsSocket(path, 'hello', 200).then(
() => undefined,
err => err,
)
expect(error).toBeInstanceOf(UdsPeerConnectionError)
if (!(error instanceof UdsPeerConnectionError)) {
throw new Error('Expected UDS peer connection timeout error')
}
expect(error.socketPath).toBe(path)
expect(error.cause).toBeInstanceOf(Error)
if (!(error.cause instanceof Error)) {
throw new Error('Expected timeout cause')
}
expect(error.cause.message).toBe('Connection timed out')
expect(error.message).not.toContain('test-token')
} finally {
for (const socket of sockets) {
socket.destroy()
}
await closeServer(receiver)
if (process.platform !== 'win32') {
await unlink(path).catch(() => undefined)
}
}
})
test('connectToPeer reports connection failures as peer connection errors', async () => {
const path = socketPath('uds-connect-error')
const { connectToPeer, UdsPeerConnectionError } = await import(
'../udsClient.js'
)
const error = await connectToPeer(path).then(
() => undefined,
err => err,
)
expect(error).toBeInstanceOf(UdsPeerConnectionError)
if (!(error instanceof UdsPeerConnectionError)) {
throw new Error('Expected UDS peer connection error')
}
expect(error.socketPath).toBe(path)
})
test('connectToPeer leaves connected socket lifecycle to the caller', async () => {
const path = socketPath('uds-connect-lifecycle')
if (process.platform !== 'win32') {
await mkdir(dirname(path), { recursive: true })
}
const sockets = new Set<Socket>()
const receiver = createServer(socket => {
sockets.add(socket)
socket.on('close', () => {
sockets.delete(socket)
})
})
await new Promise<void>((resolve, reject) => {
receiver.on('error', reject)
receiver.listen(path, () => resolve())
})
let client: Socket | undefined
try {
const { connectToPeer } = await import('../udsClient.js')
client = await connectToPeer(path, 50)
await new Promise(resolve => setTimeout(resolve, 100))
expect(client.destroyed).toBe(false)
expect(client.listenerCount('error')).toBe(0)
} finally {
client?.destroy()
for (const socket of sockets) {
socket.destroy()
}
await closeServer(receiver)
if (process.platform !== 'win32') {
await unlink(path).catch(() => undefined)
}
}
})
test('sendUdsMessage fails closed before connecting without an auth token', async () => { test('sendUdsMessage fails closed before connecting without an auth token', async () => {
await expect( await expect(
sendUdsMessage(socketPath('no-auth-token'), { type: 'text', data: 'x' }), sendUdsMessage(socketPath('no-auth-token'), { type: 'text', data: 'x' }),

View File

@@ -97,28 +97,6 @@ describe('attachUdsResponseReader', () => {
expect(socket.ended).toBe(true) expect(socket.ended).toBe(true)
}) })
test('continues scanning when blank and valid frames share one chunk', () => {
const socket = new FakeSocket()
let settled = false
let settledError: Error | undefined
attachUdsResponseReader(asSocket(socket), {
maxFrameBytes: 128,
onSettled: error => {
settled = true
settledError = error
},
})
socket.emitData(
Buffer.from(`\n${JSON.stringify({ type: 'response' })}\n`),
)
expect(settled).toBe(true)
expect(settledError).toBeUndefined()
expect(socket.ended).toBe(true)
})
test('rejects receiver error frames', () => { test('rejects receiver error frames', () => {
const socket = new FakeSocket() const socket = new FakeSocket()
let settledError: Error | undefined let settledError: Error | undefined
@@ -138,31 +116,6 @@ describe('attachUdsResponseReader', () => {
expect(socket.destroyed).toBe(true) expect(socket.destroyed).toBe(true)
}) })
test('ignores unrelated receiver frames until a terminal response arrives', () => {
const socket = new FakeSocket()
let settled = false
let settledError: Error | undefined
attachUdsResponseReader(asSocket(socket), {
maxFrameBytes: 128,
onSettled: error => {
settled = true
settledError = error
},
})
socket.emitData(
Buffer.from(
`${JSON.stringify({ type: 'notification', data: 'queued' })}\n`,
),
)
expect(settled).toBe(false)
socket.emitData(Buffer.from(`${JSON.stringify({ type: 'response' })}\n`))
expect(settled).toBe(true)
expect(settledError).toBeUndefined()
})
test('uses custom socket error formatting', () => { test('uses custom socket error formatting', () => {
const socket = new FakeSocket() const socket = new FakeSocket()
let settledError: Error | undefined let settledError: Error | undefined

View File

@@ -294,6 +294,12 @@ export async function sideQuery(opts: SideQueryOptions): Promise<BetaMessage> {
startTime: new Date(start), startTime: new Date(start),
endTime: new Date(), endTime: new Date(),
...(tools && { tools: convertToolsToLangfuse(tools as unknown[]) }), ...(tools && { tools: convertToolsToLangfuse(tools as unknown[]) }),
...(thinkingConfig && thinkingConfig.type !== 'disabled' && {
thinking: {
type: thinkingConfig.type,
...(thinkingConfig.type === 'enabled' && { budgetTokens: thinkingConfig.budget_tokens }),
},
}),
}) })
endTrace(langfuseTrace) endTrace(langfuseTrace)

View File

@@ -36,19 +36,6 @@ export type PeerSession = {
alive: boolean alive: boolean
} }
export class UdsPeerConnectionError extends Error {
readonly socketPath: string
constructor(socketPath: string, cause: unknown) {
super(
`Failed to connect to peer at ${socketPath}: ${errorMessage(cause)}`,
{ cause },
)
this.name = 'UdsPeerConnectionError'
this.socketPath = socketPath
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Session directory // Session directory
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -206,7 +193,6 @@ export async function isPeerAlive(
export async function sendToUdsSocket( export async function sendToUdsSocket(
targetSocketPath: string, targetSocketPath: string,
message: string | Record<string, unknown>, message: string | Record<string, unknown>,
timeoutMs = 5000,
): Promise<void> { ): Promise<void> {
const { parseUdsTarget } = await import('./udsMessaging.js') const { parseUdsTarget } = await import('./udsMessaging.js')
const target = parseUdsTarget(targetSocketPath) const target = parseUdsTarget(targetSocketPath)
@@ -251,15 +237,12 @@ export async function sendToUdsSocket(
maxFrameBytes: MAX_UDS_FRAME_BYTES, maxFrameBytes: MAX_UDS_FRAME_BYTES,
onSettled: finish, onSettled: finish,
formatSocketError: err => formatSocketError: err =>
new UdsPeerConnectionError(target.socketPath, err), new Error(
}) `Failed to connect to peer at ${target.socketPath}: ${errorMessage(err)}`,
conn.setTimeout(timeoutMs, () => {
finish(
new UdsPeerConnectionError(
target.socketPath,
new Error('Connection timed out'),
), ),
) })
conn.setTimeout(5000, () => {
finish(new Error('Connection timed out'))
}) })
}) })
} }
@@ -268,30 +251,14 @@ export async function sendToUdsSocket(
* Connect to a peer and return the raw socket for bidirectional communication. * Connect to a peer and return the raw socket for bidirectional communication.
* The caller is responsible for managing the connection lifecycle. * The caller is responsible for managing the connection lifecycle.
*/ */
export function connectToPeer( export function connectToPeer(socketPath: string): Promise<Socket> {
socketPath: string,
timeoutMs = 5000,
): Promise<Socket> {
return new Promise<Socket>((resolve, reject) => { return new Promise<Socket>((resolve, reject) => {
const conn = createConnection(socketPath) const conn = createConnection(socketPath, () => {
let settled = false
const fail = (cause: unknown) => {
if (settled) {
return
}
settled = true
conn.destroy()
reject(new UdsPeerConnectionError(socketPath, cause))
}
conn.once('connect', () => {
settled = true
conn.setTimeout(0)
conn.off('error', fail)
resolve(conn) resolve(conn)
}) })
conn.on('error', fail) conn.on('error', reject)
conn.setTimeout(timeoutMs, () => { conn.setTimeout(5000, () => {
fail(new Error('Connection timed out')) conn.destroy(new Error('Connection timed out'))
}) })
}) })
} }