diff --git a/docs/features/ssh-remote.md b/docs/features/ssh-remote.md new file mode 100644 index 000000000..981dbbb3d --- /dev/null +++ b/docs/features/ssh-remote.md @@ -0,0 +1,426 @@ +# SSH Remote — 远程主机运行 Claude Code + +## 概述 + +SSH Remote 提供两种方式在远程 Linux 主机上运行 Claude Code: + +1. **SSH Remote 模块**(`ccb ssh `)— 本地 REPL + 远程工具执行,自动部署二进制 + 认证隧道 +2. **直接 SSH 运行**(`ssh -t ccb`)— 远程已安装 ccb,直接启动交互式会话 + +## 架构 + +### 方式一:SSH Remote 模块(完整模式) + +适用场景:远端没有 API 凭据或没有安装 ccb。 + +``` +┌──────────────── 本地 Windows/Mac/Linux ───────────┐ +│ │ +│ ccb ssh [dir] │ +│ │ │ +│ ├── 1. SSHProbe: 探测远端平台/架构/已有二进制 │ +│ ├── 2. SSHDeploy: 部署 dist/ 到远端 │ +│ ├── 3. SSHAuthProxy: 启动本地认证代理 │ +│ │ ├─ Unix Socket (Linux/Mac) │ +│ │ └─ TCP 127.0.0.1: (Windows) │ +│ │ │ +│ └── 4. SSH -R 反向隧道 + 启动远端 CLI │ +│ ssh -R : \ │ +│ ANTHROPIC_BASE_URL=... \ │ +│ ANTHROPIC_AUTH_NONCE=... \ │ +│ ccb --output-format stream-json │ +│ │ +│ ┌─────── 本地 REPL (Ink TUI) ───────┐ │ +│ │ 用户输入 → NDJSON → SSH stdin │ │ +│ │ SSH stdout → NDJSON → 渲染消息 │ │ +│ │ 工具权限请求 → 本地审批 → 回传 │ │ +│ └────────────────────────────────────┘ │ +└────────────────────────────────────────────────────┘ + │ + │ SSH 连接 (加密通道) + │ +┌───────────────── 远端 Linux ──────────────────────┐ +│ │ +│ ccb (自动部署或已存在) │ +│ ├── --output-format stream-json │ +│ ├── --input-format stream-json │ +│ ├── --verbose -p │ +│ │ │ +│ ├── API 请求 → ANTHROPIC_BASE_URL │ +│ │ → SSH 反向隧道 → 本地 AuthProxy │ +│ │ → 注入真实凭据 → api.anthropic.com │ +│ │ │ +│ └── 工具执行 (Bash/Read/Write/...) │ +│ 直接在远端文件系统上操作 │ +└────────────────────────────────────────────────────┘ +``` + +### 方式二:直接 SSH 运行(简单模式) + +适用场景:远端已安装 ccb 且已有 API 凭据(订阅或 API Key)。 + +``` +┌─────── 本地终端 ───────┐ ┌──────── 远端 Linux ────────┐ +│ │ SSH │ │ +│ ssh -t ccb │ ──────→ │ ccb (全局安装) │ +│ │ │ ├── 使用远端自身凭据 │ +│ 终端直接显示远端 TUI │ ←────── │ ├── 远端文件系统操作 │ +│ │ TTY │ └── API 直连 Anthropic │ +└─────────────────────────┘ └─────────────────────────────┘ +``` + +### 适用场景对比 + +| | SSH Remote 模块 | 直接 SSH 运行 | +|---|---|---| +| 远端需要安装 ccb | 不需要(自动部署) | 需要 | +| 远端需要 API 凭据 | 不需要(本地隧道) | 需要 | +| 本地需要安装 ccb | 需要 | 不需要(任何终端) | +| 斜杠命令 | 本地处理 | 远端处理 | +| 网络延迟敏感 | 高(NDJSON 双向) | 低(仅 TTY) | +| 推荐场景 | 远端无凭据/无安装 | 远端已配置完整 | + +--- + +## 前置准备:SSH 密钥配置 + +两种方式都依赖 SSH 免密连接。以下是完整的密钥配置步骤。 + +### 1. 生成 SSH 密钥对(本地) + +```bash +# 生成 Ed25519 密钥(推荐) +ssh-keygen -t ed25519 -C "your-email@example.com" -f ~/.ssh/id_remote + +# 或 RSA 4096 位 +ssh-keygen -t rsa -b 4096 -C "your-email@example.com" -f ~/.ssh/id_remote +``` + +生成两个文件: +- `~/.ssh/id_remote` — 私钥(不可泄露) +- `~/.ssh/id_remote.pub` — 公钥(部署到远端) + +### 2. 将公钥部署到远端 + +```bash +# 方式 A:ssh-copy-id(推荐) +ssh-copy-id -i ~/.ssh/id_remote.pub user@remote-host + +# 方式 B:手动复制 +cat ~/.ssh/id_remote.pub | ssh user@remote-host "mkdir -p ~/.ssh && chmod 700 ~/.ssh && cat >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys" +``` + +### 3. 配置 SSH Config(本地) + +编辑 `~/.ssh/config`(不存在则创建): + +``` +Host my-server + HostName 192.168.1.100 # 远端 IP 或域名 + User root # 远端用户名 + IdentityFile ~/.ssh/id_remote # 私钥路径 + ServerAliveInterval 60 # 防止连接超时断开 + ServerAliveCountMax 3 +``` + +配置后可直接用别名连接: + +```bash +ssh my-server # 等同于 ssh -i ~/.ssh/id_remote root@192.168.1.100 +``` + +### 4. 文件权限设置 + +#### Linux / macOS + +```bash +chmod 700 ~/.ssh +chmod 600 ~/.ssh/config +chmod 600 ~/.ssh/id_remote +chmod 644 ~/.ssh/id_remote.pub +``` + +#### Windows(OpenSSH 强制 ACL 检查) + +```powershell +# 重置 .ssh 目录权限:仅允许当前用户 + SYSTEM +icacls "$env:USERPROFILE\.ssh" /inheritance:r /grant:r "$($env:USERNAME):(OI)(CI)F" /grant "SYSTEM:(OI)(CI)F" + +# 修复 config 文件权限 +icacls "$env:USERPROFILE\.ssh\config" /inheritance:r /grant:r "$($env:USERNAME):F" /grant "SYSTEM:F" + +# 修复私钥权限 +icacls "$env:USERPROFILE\.ssh\id_remote" /inheritance:r /grant:r "$($env:USERNAME):F" /grant "SYSTEM:F" +``` + +> **Windows 常见错误**:如果 `icacls` 显示 `UNKNOWN\UNKNOWN` ACL 条目,需要先移除再重新授权。权限错误会导致 SSH 拒绝使用密钥。 + +### 5. 验证免密连接 + +```bash +ssh my-server "echo 'SSH connection OK'" +# 应直接输出 "SSH connection OK",不要求输入密码 +``` + +--- + +## 使用方式 + +### 方式一:SSH Remote 模块 + +```bash +# 基本用法 — 自动探测、部署、启动 +ccb ssh user@remote-host + +# 使用 SSH Config 别名 +ccb ssh my-server + +# 指定远端工作目录 +ccb ssh my-server /home/user/project + +# 使用自定义远端二进制(跳过探测/部署) +ccb ssh my-server --remote-bin "bun /opt/ccb/dist/cli.js" + +# 权限控制 +ccb ssh my-server --permission-mode auto +ccb ssh my-server --dangerously-skip-permissions + +# 恢复远端会话 +ccb ssh my-server --continue +ccb ssh my-server --resume + +# 选择模型 +ccb ssh my-server --model claude-sonnet-4-6-20250514 + +# 本地测试模式(不连接远端,测试 auth proxy 管道) +ccb ssh localhost --local +``` + +### 方式二:直接 SSH 运行 + +```bash +# 启动交互式会话 +ssh my-server -t ccb + +# 指定工作目录 +ssh my-server -t "ccb --cwd /home/user/project" + +# 使用特定模型 +ssh my-server -t "ccb --model claude-sonnet-4-6-20250514" +``` + +--- + +## 构建与部署 + +### 构建产物 + +```bash +# 安装依赖 +bun install + +# 构建(输出到 dist/) +bun run build +``` + +产物说明: + +| 文件 | 说明 | +|------|------| +| `dist/cli.js` | Bun 入口(`#!/usr/bin/env bun`) | +| `dist/cli-node.js` | Node.js 入口(`#!/usr/bin/env node` → `import ./cli.js`) | +| `dist/cli-bun.js` | Bun 专用入口 | +| `dist/chunk-*.js` | 代码分割 chunk 文件(约 668 个) | + +### 运行方式 + +```bash +# 方式 A:通过 bun 直接运行(开发/调试) +bun run dev + +# 方式 B:运行构建产物(bun 运行时) +bun dist/cli.js + +# 方式 C:运行构建产物(node 运行时) +node dist/cli-node.js + +# 方式 D:全局安装后使用命令名 +ccb +``` + +### 全局安装 + +在项目根目录执行: + +```bash +# bun 全局安装(推荐) +bun install -g . + +# 创建的命令: +# ccb → dist/cli-node.js +# ccb-bun → dist/cli-bun.js +# claude-code-best → dist/cli-node.js + +# 安装位置:~/.bun/bin/ccb +``` + +或使用 npm: + +```bash +npm install -g . +``` + +验证: + +```bash +ccb --version +# → x.x.x (Claude Code) +``` + +### 远端部署(全流程) + +```bash +# 1. 登录远端 +ssh my-server + +# 2. 克隆或同步项目代码 +git clone ~/ccb-project +cd ~/ccb-project + +# 3. 安装运行时(如果没有 bun) +curl -fsSL https://bun.sh/install | bash +source ~/.bashrc + +# 4. 安装依赖 + 构建 +bun install +bun run build + +# 5. 全局安装 +bun install -g . + +# 6. 确保非交互式 SSH 可访问 ccb 命令 +# bun install -g 安装到 ~/.bun/bin/,但非交互式 SSH 不加载 .bashrc, +# 所以 PATH 中不包含 ~/.bun/bin/ +# 解决方式(任选其一): + +# 方式 A:符号链接到系统 PATH(推荐) +ln -sf ~/.bun/bin/ccb /usr/local/bin/ccb + +# 方式 B:添加到 /etc/profile.d/(所有用户生效) +echo 'export PATH="$HOME/.bun/bin:$PATH"' > /etc/profile.d/bun-path.sh + +# 方式 C:添加到 ~/.bash_profile(当前用户,ssh -t 时生效) +echo 'export PATH="$HOME/.bun/bin:$PATH"' >> ~/.bash_profile + +# 7. 验证 +ccb --version + +# 8. 从本地测试 +# (在本地终端) +ssh my-server -t ccb +``` + +### SSH Remote 自动部署 + +使用 `ccb ssh ` 时,模块自动处理: + +1. **SSHProbe** 探测远端 `~/.local/bin/claude` 或 `command -v claude` +2. 若二进制不存在或版本不匹配,**SSHDeploy** 通过 `scp` 传输 `dist/` 目录 +3. 在远端创建 wrapper 脚本(`~/.local/bin/claude`) +4. 无需手动安装 + +--- + +## 模块结构 + +``` +src/ssh/ +├── createSSHSession.ts — 会话工厂:编排 probe → deploy → proxy → spawn +├── SSHSessionManager.ts — 双向 NDJSON 通信管理 + 权限转发 + 重连 +├── SSHAuthProxy.ts — 本地认证代理(API 凭据隧道) +├── SSHProbe.ts — 远端主机探测(平台/架构/已有二进制) +├── SSHDeploy.ts — 远端二进制部署(scp + wrapper 脚本) +└── __tests__/ + └── SSHSessionManager.test.ts — 17 个单元测试 +``` + +## 关键技术细节 + +### 认证隧道 + +- **AuthProxy** 在本地监听(Unix socket 或 TCP),接收远端 CLI 的 API 请求 +- 通过 SSH `-R` 反向端口转发隧道到远端 +- AuthProxy 注入本地真实凭据(API key 或 OAuth token),转发到 `api.anthropic.com` +- `ANTHROPIC_AUTH_NONCE` header 防止未授权访问(nonce 通过环境变量传递给远端 CLI,远端 CLI 在每个 API 请求中携带此 header) + +### waitForInit vs 存活检查 + +- **标准模式**:`waitForInit` 等待远端 CLI 发送 `{type:'system', subtype:'init'}` JSON 消息 +- **`--remote-bin` 模式**:跳过 `waitForInit`(print+stream-json 模式下 init 只在首次查询后发送),改用 3 秒进程存活检查 + +### 重连机制 + +- `SSHSessionManager` 检测 SSH 连接断开后自动重连 +- 重连时在远端 CLI 命令中追加 `--continue` 恢复会话 +- 指数退避重试(最多 5 次,间隔 1s → 2s → 4s → 8s → 16s) + +## Feature Flag + +SSH Remote 功能受 `SSH_REMOTE` feature flag 控制: + +- **Dev 模式**:默认启用 +- **Build 模式**:需在 `build.ts` 的 `DEFAULT_BUILD_FEATURES` 中添加 `'SSH_REMOTE'` +- **运行时**:`FEATURE_SSH_REMOTE=1` 环境变量 + +--- + +## 常见问题 + +### `ccb: command not found`(SSH 远程执行时) + +非交互式 SSH 不加载 `.bashrc`,`~/.bun/bin` 不在 PATH 中。 + +```bash +# 解决:创建符号链接 +ln -sf ~/.bun/bin/ccb /usr/local/bin/ccb +``` + +### SSH 密钥被拒绝 + +``` +Permission denied (publickey) +``` + +1. 确认公钥已添加到远端 `~/.ssh/authorized_keys` +2. 确认本地私钥文件权限正确(`chmod 600`) +3. 确认 `~/.ssh/config` 中 `IdentityFile` 路径正确 +4. Windows 用户检查 ACL 权限(见上方 Windows 权限设置) + +### SSH 连接超时 + +``` +ssh: connect to host x.x.x.x port 22: Connection timed out +``` + +1. 确认远端 SSH 服务正在运行:`systemctl status sshd` +2. 确认防火墙允许 22 端口 +3. 确认 IP 地址/域名正确 +4. 在 `~/.ssh/config` 中添加 `ConnectTimeout 10` + +### 403 Forbidden(SSH Remote 模块) + +AuthProxy 的 nonce 验证失败。确认: +1. 远端 CLI 版本包含 nonce header 注入修复 +2. `ANTHROPIC_AUTH_NONCE` 环境变量正确传递到远端 +3. `src/services/api/client.ts` 中 `x-auth-nonce` header 已启用 + +### 远端 CLI 启动后立即退出 + +``` +Remote process exited immediately (code 1) +``` + +1. 确认远端 `bun` / `node` 运行时可用 +2. 手动在远端执行 `ccb --version` 验证安装 +3. 检查 `--remote-bin` 路径是否正确 +4. 查看 stderr 输出获取详细错误信息 diff --git a/scripts/defines.ts b/scripts/defines.ts index 804935419..37d2e97e8 100644 --- a/scripts/defines.ts +++ b/scripts/defines.ts @@ -72,4 +72,6 @@ export const DEFAULT_BUILD_FEATURES = [ 'POOR', // 穷鬼模式,跳过 extract_memories/prompt_suggestion 减少消耗 // Team Memory 'TEAMMEM', // 团队记忆,代理队友间共享记忆文件 + // SSH Remote + 'SSH_REMOTE', // SSH 远程连接,本地 REPL + 远端工具执行 ]as const; diff --git a/src/main.tsx b/src/main.tsx index c4588b1b2..0b13c182e 100644 --- a/src/main.tsx +++ b/src/main.tsx @@ -869,6 +869,7 @@ type PendingSSH = { local: boolean; /** Extra CLI args to forward to the remote CLI on initial spawn (--resume, -c). */ extraCliArgs: string[]; + remoteBin: string | undefined; }; const _pendingSSH: PendingSSH | undefined = feature("SSH_REMOTE") ? { @@ -878,6 +879,7 @@ const _pendingSSH: PendingSSH | undefined = feature("SSH_REMOTE") dangerouslySkipPermissions: false, local: false, extraCliArgs: [], + remoteBin: undefined, } : undefined; @@ -1084,6 +1086,17 @@ export async function main() { rawCliArgs.splice(eqI, 1); } }; + const rbIdx = rawCliArgs.indexOf('--remote-bin'); + if (rbIdx !== -1 && rawCliArgs[rbIdx + 1] && !rawCliArgs[rbIdx + 1]!.startsWith('-')) { + _pendingSSH.remoteBin = rawCliArgs[rbIdx + 1]; + rawCliArgs.splice(rbIdx, 2); + } + const rbEqIdx = rawCliArgs.findIndex(a => a.startsWith('--remote-bin=')); + if (rbEqIdx !== -1) { + _pendingSSH.remoteBin = rawCliArgs[rbEqIdx]!.split('=').slice(1).join('='); + rawCliArgs.splice(rbEqIdx, 1); + } + extractFlag("-c", { as: "--continue" }); extractFlag("--continue"); extractFlag("--resume", { hasValue: true }); @@ -4643,6 +4656,7 @@ async function run(): Promise { dangerouslySkipPermissions: _pendingSSH.dangerouslySkipPermissions, extraCliArgs: _pendingSSH.extraCliArgs, + remoteBin: _pendingSSH.remoteBin, }, isTTY ? { @@ -5980,6 +5994,11 @@ async function run(): Promise { "--dangerously-skip-permissions", "Skip all permission prompts on the remote (dangerous)", ) + .option( + "--remote-bin ", + "Custom remote binary command (skips probe/deploy). " + + "Example: --remote-bin 'bun /path/to/project/dist/cli.js'", + ) .option( "--local", "e2e test mode — spawn the child CLI locally (skip ssh/deploy). " + diff --git a/src/services/api/client.ts b/src/services/api/client.ts index b01efc2d9..f433fe013 100644 --- a/src/services/api/client.ts +++ b/src/services/api/client.ts @@ -109,6 +109,10 @@ export async function getAnthropicClient({ : {}), // SDK consumers can identify their app/library for backend analytics ...(clientApp ? { 'x-client-app': clientApp } : {}), + // SSH auth proxy nonce — tunneled API requests must carry this header + ...(process.env.ANTHROPIC_AUTH_NONCE + ? { 'x-auth-nonce': process.env.ANTHROPIC_AUTH_NONCE } + : {}), } // Log API client configuration for HFI debugging diff --git a/src/ssh/SSHAuthProxy.ts b/src/ssh/SSHAuthProxy.ts new file mode 100644 index 000000000..4b16f3c6b --- /dev/null +++ b/src/ssh/SSHAuthProxy.ts @@ -0,0 +1,165 @@ +import { randomUUID } from 'crypto' +import { unlinkSync } from 'fs' +import { getClaudeAIOAuthTokens } from 'src/utils/auth.js' +import { getOauthConfig } from 'src/constants/oauth.js' +import { logForDebugging } from 'src/utils/debug.js' + +export interface SSHAuthProxy { + stop(): void +} + +export interface AuthProxyInfo { + proxy: SSHAuthProxy + /** Unix socket path or 127.0.0.1: */ + localAddress: string + /** Environment variables to inject into the remote/child CLI process */ + authEnv: Record +} + +const isWindows = process.platform === 'win32' + +function resolveAuthHeaders(): Record { + const apiKey = process.env.ANTHROPIC_API_KEY + if (apiKey) { + return { 'x-api-key': apiKey } + } + + const oauthTokens = getClaudeAIOAuthTokens() + if (oauthTokens?.accessToken) { + return { Authorization: `Bearer ${oauthTokens.accessToken}` } + } + + return {} +} + +function resolveUpstreamBaseUrl(): string { + return process.env.ANTHROPIC_BASE_URL || getOauthConfig().BASE_API_URL +} + +async function proxyFetch( + req: Request, + nonce: string | null, +): Promise { + if (nonce && req.headers.get('x-auth-nonce') !== nonce) { + return new Response('Forbidden', { status: 403 }) + } + + const upstreamBase = resolveUpstreamBaseUrl() + const url = new URL(req.url) + const upstreamUrl = `${upstreamBase}${url.pathname}${url.search}` + + const authHeaders = resolveAuthHeaders() + if (Object.keys(authHeaders).length === 0) { + return new Response( + JSON.stringify({ + error: 'No API credentials available on local machine', + }), + { status: 401, headers: { 'content-type': 'application/json' } }, + ) + } + + const forwardHeaders = new Headers(req.headers) + for (const [k, v] of Object.entries(authHeaders)) { + forwardHeaders.set(k, v) + } + forwardHeaders.delete('host') + forwardHeaders.delete('x-auth-nonce') + + logForDebugging( + `[SSHAuthProxy] ${req.method} ${url.pathname} -> ${upstreamUrl}`, + ) + + try { + const upstreamRes = await fetch(upstreamUrl, { + method: req.method, + headers: forwardHeaders, + body: req.body, + // @ts-expect-error Bun supports duplex for streaming request bodies + duplex: 'half', + }) + + const responseHeaders = new Headers(upstreamRes.headers) + responseHeaders.delete('content-encoding') + responseHeaders.delete('content-length') + + return new Response(upstreamRes.body, { + status: upstreamRes.status, + statusText: upstreamRes.statusText, + headers: responseHeaders, + }) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + logForDebugging(`[SSHAuthProxy] upstream error: ${message}`) + return new Response( + JSON.stringify({ error: `Proxy upstream error: ${message}` }), + { status: 502, headers: { 'content-type': 'application/json' } }, + ) + } +} + +export async function createAuthProxy(): Promise { + const id = randomUUID() + + if (isWindows) { + return createTcpAuthProxy(id) + } + return createUnixSocketAuthProxy(id) +} + +async function createUnixSocketAuthProxy(id: string): Promise { + const socketPath = `/tmp/claude-ssh-auth-${id}.sock` + + const server = Bun.serve({ + unix: socketPath, + fetch: req => proxyFetch(req, null), + }) + + logForDebugging(`[SSHAuthProxy] listening on unix:${socketPath}`) + + const proxy: SSHAuthProxy = { + stop() { + server.stop(true) + try { + unlinkSync(socketPath) + } catch { + // Socket file may already be cleaned up + } + }, + } + + return { + proxy, + localAddress: socketPath, + authEnv: { ANTHROPIC_AUTH_SOCKET: socketPath }, + } +} + +async function createTcpAuthProxy(id: string): Promise { + const nonce = randomUUID() + + const server = Bun.serve({ + port: 0, + hostname: '127.0.0.1', + fetch: req => proxyFetch(req, nonce), + }) + + const port = server.port + logForDebugging( + `[SSHAuthProxy] listening on TCP 127.0.0.1:${port} (nonce-protected)`, + ) + + const proxy: SSHAuthProxy = { + stop() { + server.stop(true) + }, + } + + return { + proxy, + localAddress: `127.0.0.1:${port}`, + authEnv: { + ANTHROPIC_BASE_URL: `http://127.0.0.1:${port}`, + ANTHROPIC_AUTH_NONCE: nonce, + }, + } +} diff --git a/src/ssh/SSHDeploy.ts b/src/ssh/SSHDeploy.ts new file mode 100644 index 000000000..fbddb4b18 --- /dev/null +++ b/src/ssh/SSHDeploy.ts @@ -0,0 +1,123 @@ +import { existsSync } from 'fs' +import { resolve } from 'path' +import { logForDebugging } from 'src/utils/debug.js' + +const SSH_TIMEOUT_MS = 60_000 +const REMOTE_BIN_DIR = '~/.local/bin' +const REMOTE_CLI_FILE = 'claude-code-cli.js' +const REMOTE_WRAPPER = 'claude' + +export interface DeployOptions { + host: string + remotePlatform: string + remoteArch: string + localVersion: string + onProgress?: (msg: string) => void +} + +async function runSshCommand( + host: string, + command: string, + timeoutMs = SSH_TIMEOUT_MS, +): Promise<{ stdout: string; stderr: string; exitCode: number }> { + const proc = Bun.spawn(['ssh', '-o', 'ConnectTimeout=10', host, command], { + stdout: 'pipe', + stderr: 'pipe', + }) + + const timer = setTimeout(() => proc.kill(), timeoutMs) + + try { + const [stdout, stderr] = await Promise.all([ + new Response(proc.stdout).text(), + new Response(proc.stderr).text(), + ]) + const exitCode = await proc.exited + return { stdout: stdout.trim(), stderr: stderr.trim(), exitCode } + } finally { + clearTimeout(timer) + } +} + +function findLocalBinary(): string { + const projectRoot = resolve(import.meta.dir, '../..') + const distPath = resolve(projectRoot, 'dist/cli.js') + if (existsSync(distPath)) return distPath + + const devPath = resolve(projectRoot, 'src/entrypoints/cli.tsx') + if (existsSync(devPath)) return devPath + + throw new Error( + 'Cannot find local CLI binary to deploy. Run `bun run build` first.', + ) +} + +export async function deployBinary(options: DeployOptions): Promise { + const { host, remotePlatform, remoteArch, localVersion, onProgress } = options + + if (remotePlatform !== 'linux' && remotePlatform !== 'darwin') { + throw new Error( + `Remote platform "${remotePlatform}" is not supported. Only linux and darwin are supported.`, + ) + } + + logForDebugging( + `[SSHDeploy] deploying to ${host} (${remotePlatform}/${remoteArch}, v${localVersion})`, + ) + + const localBinary = findLocalBinary() + logForDebugging(`[SSHDeploy] local binary: ${localBinary}`) + + onProgress?.('Creating remote directory...') + const mkdirResult = await runSshCommand(host, `mkdir -p ${REMOTE_BIN_DIR}`) + if (mkdirResult.exitCode !== 0) { + throw new Error(`Failed to create remote directory: ${mkdirResult.stderr}`) + } + + onProgress?.('Uploading binary...') + const remotePath = `${REMOTE_BIN_DIR}/${REMOTE_CLI_FILE}` + const scpProc = Bun.spawn( + ['scp', '-o', 'ConnectTimeout=10', localBinary, `${host}:${remotePath}`], + { stdout: 'pipe', stderr: 'pipe' }, + ) + const scpTimer = setTimeout(() => scpProc.kill(), SSH_TIMEOUT_MS) + const scpStderr = await new Response(scpProc.stderr).text() + const scpExit = await scpProc.exited + clearTimeout(scpTimer) + + if (scpExit !== 0) { + throw new Error(`SCP upload failed (exit ${scpExit}): ${scpStderr.trim()}`) + } + + onProgress?.('Installing wrapper script...') + const wrapperScript = [ + `cat > ${REMOTE_BIN_DIR}/${REMOTE_WRAPPER} << 'WRAPPER'`, + '#!/bin/sh', + `exec bun ${REMOTE_BIN_DIR}/${REMOTE_CLI_FILE} "$@"`, + 'WRAPPER', + `chmod +x ${REMOTE_BIN_DIR}/${REMOTE_WRAPPER}`, + ].join('\n') + + const wrapperResult = await runSshCommand(host, wrapperScript) + if (wrapperResult.exitCode !== 0) { + throw new Error(`Failed to install wrapper script: ${wrapperResult.stderr}`) + } + + onProgress?.('Verifying installation...') + const verifyResult = await runSshCommand( + host, + `${REMOTE_BIN_DIR}/${REMOTE_WRAPPER} --version`, + ) + if (verifyResult.exitCode !== 0) { + throw new Error( + `Binary deployed but verification failed (exit ${verifyResult.exitCode}): ${verifyResult.stderr}`, + ) + } + + logForDebugging( + `[SSHDeploy] deployed successfully, remote version: ${verifyResult.stdout}`, + ) + onProgress?.(`Deployed v${verifyResult.stdout}`) + + return `${REMOTE_BIN_DIR}/${REMOTE_WRAPPER}` +} diff --git a/src/ssh/SSHProbe.ts b/src/ssh/SSHProbe.ts new file mode 100644 index 000000000..adb074ff1 --- /dev/null +++ b/src/ssh/SSHProbe.ts @@ -0,0 +1,99 @@ +import { logForDebugging } from 'src/utils/debug.js' + +const PROBE_TIMEOUT_MS = 15_000 + +export interface ProbeResult { + hasBinary: boolean + remoteVersion: string | null + remotePlatform: 'linux' | 'darwin' + remoteArch: 'x64' | 'arm64' + defaultCwd: string + binaryPath: string | null +} + +export class SSHProbeError extends Error { + constructor(message: string) { + super(message) + this.name = 'SSHProbeError' + } +} + +export async function probeRemote( + host: string, + onProgress?: (msg: string) => void, +): Promise { + onProgress?.('Probing remote host…') + + const proc = Bun.spawn( + [ + 'ssh', + '-o', + 'BatchMode=yes', + '-o', + 'ConnectTimeout=10', + host, + 'CLAUDE_BIN=$(test -x "$HOME/.local/bin/claude" && echo "$HOME/.local/bin/claude" || command -v claude 2>/dev/null); echo "$CLAUDE_BIN"; $CLAUDE_BIN --version 2>/dev/null; uname -sm; pwd', + ], + { stdin: 'ignore', stdout: 'pipe', stderr: 'pipe' }, + ) + + const result = await Promise.race([ + proc.exited, + new Promise((_, reject) => + setTimeout( + () => + reject( + new SSHProbeError( + `SSH probe timed out after ${PROBE_TIMEOUT_MS / 1000}s`, + ), + ), + PROBE_TIMEOUT_MS, + ), + ), + ]) + + const stdout = await new Response(proc.stdout).text() + const stderr = await new Response(proc.stderr).text() + + if (result !== 0) { + const detail = stderr.trim() || `exit code ${result}` + throw new SSHProbeError(`SSH probe failed: ${detail}`) + } + + const lines = stdout + .split('\n') + .map(l => l.trim()) + .filter(Boolean) + logForDebugging(`[SSHProbe] raw lines: ${JSON.stringify(lines)}`) + + const unameIdx = lines.findIndex(l => /^(Linux|Darwin)\s/.test(l)) + if (unameIdx === -1) { + throw new SSHProbeError( + 'Could not detect remote platform (uname output missing)', + ) + } + + const binaryPath = unameIdx >= 2 ? lines[unameIdx - 2] || null : null + const versionLine = unameIdx >= 1 ? lines[unameIdx - 1] || null : null + const remoteVersion = + versionLine && /^\d+\.\d+/.test(versionLine) ? versionLine : null + const hasBinary = binaryPath !== null && binaryPath.startsWith('/') + const defaultCwd = lines[unameIdx + 1] || '/' + + const [osName, arch] = lines[unameIdx]!.split(/\s+/) + + const remotePlatform = osName === 'Darwin' ? 'darwin' : 'linux' + const remoteArch: 'x64' | 'arm64' = + arch === 'aarch64' || arch === 'arm64' ? 'arm64' : 'x64' + + onProgress?.(`Detected ${remotePlatform}/${remoteArch}`) + + return { + hasBinary: hasBinary && remoteVersion !== null, + remoteVersion, + remotePlatform, + remoteArch, + defaultCwd, + binaryPath: hasBinary ? binaryPath : null, + } +} diff --git a/src/ssh/SSHSessionManager.ts b/src/ssh/SSHSessionManager.ts index 6a2faaefa..47741345c 100644 --- a/src/ssh/SSHSessionManager.ts +++ b/src/ssh/SSHSessionManager.ts @@ -1,15 +1,26 @@ -// Auto-generated stub — replace with real implementation -import type { SDKMessage } from '../entrypoints/sdk/coreTypes.js' +import type { Subprocess } from 'bun' +import type { SDKMessage } from '../entrypoints/agentSdkTypes.js' +import type { + SDKControlPermissionRequest, + StdoutMessage, +} from '../entrypoints/sdk/controlTypes.js' import type { PermissionUpdate } from '../types/permissions.js' +import { logForDebugging } from '../utils/debug.js' +import { jsonParse, jsonStringify } from '../utils/slowOperations.js' import type { RemoteMessageContent } from '../utils/teleport/api.js' export interface SSHSessionManagerOptions { onMessage: (sdkMessage: SDKMessage) => void - onPermissionRequest: (request: SSHPermissionRequest, requestId: string) => void + onPermissionRequest: ( + request: SSHPermissionRequest, + requestId: string, + ) => void onConnected: () => void onReconnecting: (attempt: number, max: number) => void onDisconnected: () => void onError: (error: Error) => void + reconnect?: () => Promise + maxReconnectAttempts?: number } export interface SSHPermissionRequest { @@ -26,5 +37,317 @@ export interface SSHSessionManager { disconnect(): void sendMessage(content: RemoteMessageContent): Promise sendInterrupt(): void - respondToPermissionRequest(requestId: string, response: { behavior: string; message?: string; updatedInput?: unknown }): void + respondToPermissionRequest( + requestId: string, + response: { behavior: string; message?: string; updatedInput?: unknown }, + ): void +} + +function isStdoutMessage(value: unknown): value is StdoutMessage { + return ( + typeof value === 'object' && + value !== null && + 'type' in value && + typeof (value as Record).type === 'string' + ) +} + +const BASE_RECONNECT_DELAY_MS = 2_000 +const MAX_RECONNECT_DELAY_MS = 15_000 +const DEFAULT_MAX_RECONNECT_ATTEMPTS = 3 + +export class SSHSessionManagerImpl implements SSHSessionManager { + private proc: Subprocess + private options: SSHSessionManagerOptions + private connected = false + private disconnected = false + private readLoopAbort: AbortController | null = null + private reconnectAttempt = 0 + private readonly maxReconnectAttempts: number + private userInitiatedDisconnect = false + private reconnecting = false + + constructor(proc: Subprocess, options: SSHSessionManagerOptions) { + this.proc = proc + this.options = options + this.maxReconnectAttempts = + options.maxReconnectAttempts ?? DEFAULT_MAX_RECONNECT_ATTEMPTS + } + + connect(): void { + if (this.connected) return + + this.readLoopAbort = new AbortController() + this.startReadLoop() + this.monitorExit() + + this.connected = true + this.options.onConnected() + } + + private async startReadLoop(): Promise { + const stdout = this.proc.stdout + if (!stdout) { + this.options.onError(new Error('SSH process stdout is not available')) + return + } + + const reader = (stdout as ReadableStream).getReader() + const decoder = new TextDecoder() + let lineBuffer = '' + + try { + while (!this.disconnected) { + const { done, value } = await reader.read() + if (done) break + + lineBuffer += decoder.decode(value, { stream: true }) + const lines = lineBuffer.split('\n') + lineBuffer = lines.pop() ?? '' + + for (const line of lines) { + const trimmed = line.trim() + if (!trimmed) continue + this.processLine(trimmed) + } + } + } catch (err) { + if (!this.disconnected) { + this.options.onError( + err instanceof Error ? err : new Error(String(err)), + ) + } + } finally { + reader.releaseLock() + if (!this.disconnected && !this.userInitiatedDisconnect) { + void this.handleProcessExit() + } + } + } + + private monitorExit(): void { + if (this.proc.exitCode !== null) { + if (!this.userInitiatedDisconnect) { + void this.handleProcessExit() + } + return + } + this.proc.exited + .then(() => { + if (!this.disconnected && !this.userInitiatedDisconnect) { + void this.handleProcessExit() + } + }) + .catch(() => { + if (!this.disconnected && !this.userInitiatedDisconnect) { + void this.handleProcessExit() + } + }) + } + + private async handleProcessExit(): Promise { + if (this.disconnected || this.reconnecting) return + this.connected = false + + if (!this.options.reconnect) { + this.disconnected = true + this.options.onDisconnected() + return + } + + if (this.reconnectAttempt >= this.maxReconnectAttempts) { + this.disconnected = true + this.options.onDisconnected() + return + } + + this.reconnecting = true + try { + await this.attemptReconnect() + } finally { + this.reconnecting = false + } + } + + private async attemptReconnect(): Promise { + const reconnect = this.options.reconnect! + + while (this.reconnectAttempt < this.maxReconnectAttempts) { + this.reconnectAttempt++ + this.options.onReconnecting( + this.reconnectAttempt, + this.maxReconnectAttempts, + ) + + const delay = Math.min( + BASE_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempt - 1), + MAX_RECONNECT_DELAY_MS, + ) + await new Promise(r => setTimeout(r, delay)) + + if (this.userInitiatedDisconnect) return + + try { + const newProc = await reconnect() + this.proc = newProc + this.reconnectAttempt = 0 + this.connected = true + this.startReadLoop() + this.monitorExit() + this.options.onConnected() + return + } catch (err) { + logForDebugging( + `[SSH] reconnect attempt ${this.reconnectAttempt} failed: ${err instanceof Error ? err.message : String(err)}`, + ) + } + } + + this.disconnected = true + this.options.onDisconnected() + } + + private processLine(line: string): void { + let raw: unknown + try { + raw = jsonParse(line) + } catch { + return + } + + if (!isStdoutMessage(raw)) return + const parsed = raw + + if (parsed.type === 'control_request') { + const request = parsed as unknown as { + request_id: string + request: SDKControlPermissionRequest & { subtype: string } + } + if (request.request.subtype === 'can_use_tool') { + this.options.onPermissionRequest( + request.request as unknown as SSHPermissionRequest, + request.request_id, + ) + } else { + logForDebugging( + `[SSH] Unsupported control request subtype: ${request.request.subtype}`, + ) + this.sendErrorResponse( + request.request_id, + `Unsupported control request subtype: ${request.request.subtype}`, + ) + } + return + } + + if ( + parsed.type !== 'control_response' && + parsed.type !== 'keep_alive' && + parsed.type !== 'control_cancel_request' && + parsed.type !== 'streamlined_text' && + parsed.type !== 'streamlined_tool_use_summary' && + !( + parsed.type === 'system' && + (parsed as Record).subtype === 'post_turn_summary' + ) + ) { + this.options.onMessage(parsed as SDKMessage) + } + } + + private writeToStdin(data: string): boolean { + try { + const stdin = this.proc.stdin + if (!stdin || typeof stdin === 'number' || this.disconnected) return false + const encoded = new TextEncoder().encode(data + '\n') + ;(stdin as unknown as { write(d: Uint8Array): number }).write(encoded) + ;(stdin as unknown as { flush?(): void }).flush?.() + return true + } catch { + return false + } + } + + async sendMessage(content: RemoteMessageContent): Promise { + const message = jsonStringify({ + type: 'user', + message: { + role: 'user', + content, + }, + parent_tool_use_id: null, + session_id: '', + }) + return this.writeToStdin(message) + } + + sendInterrupt(): void { + const request = jsonStringify({ + type: 'control_request', + request_id: crypto.randomUUID(), + request: { + subtype: 'interrupt', + }, + }) + this.writeToStdin(request) + } + + respondToPermissionRequest( + requestId: string, + response: { behavior: string; message?: string; updatedInput?: unknown }, + ): void { + const msg = jsonStringify({ + type: 'control_response', + response: { + subtype: 'success', + request_id: requestId, + response: { + behavior: response.behavior, + ...(response.behavior === 'allow' + ? { updatedInput: response.updatedInput } + : { message: response.message }), + }, + }, + }) + this.writeToStdin(msg) + } + + private sendErrorResponse(requestId: string, error: string): void { + const response = jsonStringify({ + type: 'control_response', + response: { + subtype: 'error', + request_id: requestId, + error, + }, + }) + this.writeToStdin(response) + } + + disconnect(): void { + if (this.disconnected) return + this.userInitiatedDisconnect = true + this.disconnected = true + this.connected = false + this.readLoopAbort?.abort() + + try { + const stdin = this.proc.stdin + if (stdin && typeof stdin !== 'number') { + ;(stdin as unknown as { end?(): void }).end?.() + } + } catch { + // stdin may already be closed + } + + try { + this.proc.kill() + } catch { + // process may already be dead + } + } + + isConnected(): boolean { + return this.connected && !this.disconnected + } } diff --git a/src/ssh/__tests__/SSHSessionManager.test.ts b/src/ssh/__tests__/SSHSessionManager.test.ts new file mode 100644 index 000000000..1f169abc5 --- /dev/null +++ b/src/ssh/__tests__/SSHSessionManager.test.ts @@ -0,0 +1,413 @@ +import { describe, test, expect, mock, beforeEach } from 'bun:test' +import { debugMock } from '../../../tests/mocks/debug' + +mock.module('src/utils/debug.ts', debugMock) + +import { SSHSessionManagerImpl } from '../SSHSessionManager' +import type { SSHSessionManagerOptions } from '../SSHSessionManager' +import type { Subprocess } from 'bun' + +function createMockSubprocess(options?: { + exitCode?: number | null + stdoutLines?: string[] +}): { + proc: Subprocess + writeToStdout: (data: string) => void + simulateExit: (code?: number) => void +} { + let stdoutController: ReadableStreamDefaultController + const exitResolvers: Array<(code: number) => void> = [] + let exitCode: number | null = options?.exitCode ?? null + + const stdout = new ReadableStream({ + start(controller) { + stdoutController = controller + if (options?.stdoutLines) { + const encoder = new TextEncoder() + for (const line of options.stdoutLines) { + controller.enqueue(encoder.encode(line + '\n')) + } + } + }, + }) + + const stdinChunks: Uint8Array[] = [] + const stdin = { + write(d: Uint8Array) { + stdinChunks.push(d) + return d.length + }, + flush() {}, + end() {}, + } + + const exited = new Promise(resolve => { + exitResolvers.push(resolve) + if (exitCode !== null) resolve(exitCode) + }) + + const proc = { + stdout, + stdin, + stderr: null, + get exitCode() { + return exitCode + }, + exited, + kill: mock(() => {}), + pid: 12345, + killed: false, + signalCode: null, + ref: () => {}, + unref: () => {}, + } as unknown as Subprocess + + return { + proc, + writeToStdout(data: string) { + const encoder = new TextEncoder() + stdoutController.enqueue(encoder.encode(data + '\n')) + }, + simulateExit(code = 0) { + exitCode = code + try { + stdoutController.close() + } catch { + // may already be closed + } + for (const resolve of exitResolvers) resolve(code) + }, + } +} + +interface MockState { + messages: unknown[] + permissionRequests: Array<{ request: unknown; requestId: string }> + reconnectingCalls: Array<{ attempt: number; max: number }> + connectedCount: number + disconnectedCount: number + errors: Error[] +} + +function createMockOptions( + overrides?: Partial, +): SSHSessionManagerOptions & { state: MockState } { + const state: MockState = { + messages: [], + permissionRequests: [], + reconnectingCalls: [], + connectedCount: 0, + disconnectedCount: 0, + errors: [], + } + + return { + state, + onMessage: msg => { + state.messages.push(msg) + }, + onPermissionRequest: (request, requestId) => { + state.permissionRequests.push({ request, requestId }) + }, + onConnected: () => { + state.connectedCount++ + }, + onReconnecting: (attempt, max) => { + state.reconnectingCalls.push({ attempt, max }) + }, + onDisconnected: () => { + state.disconnectedCount++ + }, + onError: err => { + state.errors.push(err) + }, + ...overrides, + } +} + +describe('SSHSessionManagerImpl', () => { + test('connect() sets connected state and calls onConnected', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + + expect(manager.isConnected()).toBe(true) + expect(opts.state.connectedCount).toBe(1) + }) + + test('connect() is idempotent', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.connect() + + expect(opts.state.connectedCount).toBe(1) + }) + + test('disconnect() sets disconnected state and kills process', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.disconnect() + + expect(manager.isConnected()).toBe(false) + expect((proc.kill as ReturnType).mock.calls.length).toBe(1) + }) + + test('disconnect() is idempotent', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.disconnect() + manager.disconnect() + + expect((proc.kill as ReturnType).mock.calls.length).toBe(1) + }) + + test('processLine routes SDK messages to onMessage', async () => { + const sdkMessage = JSON.stringify({ + type: 'assistant', + message: { role: 'assistant', content: 'hello' }, + }) + + const { proc, writeToStdout, simulateExit } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + writeToStdout(sdkMessage) + + await new Promise(r => setTimeout(r, 50)) + simulateExit(0) + await new Promise(r => setTimeout(r, 50)) + + expect(opts.state.messages.length).toBe(1) + expect((opts.state.messages[0] as Record).type).toBe( + 'assistant', + ) + }) + + test('processLine filters noise types', async () => { + const noiseTypes = [ + 'control_response', + 'keep_alive', + 'control_cancel_request', + 'streamlined_text', + 'streamlined_tool_use_summary', + ] + + const { proc, writeToStdout, simulateExit } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + + for (const type of noiseTypes) { + writeToStdout(JSON.stringify({ type })) + } + writeToStdout( + JSON.stringify({ type: 'system', subtype: 'post_turn_summary' }), + ) + + await new Promise(r => setTimeout(r, 50)) + simulateExit(0) + await new Promise(r => setTimeout(r, 50)) + + expect(opts.state.messages.length).toBe(0) + }) + + test('processLine routes control_request to onPermissionRequest', async () => { + const controlRequest = JSON.stringify({ + type: 'control_request', + request_id: 'req-123', + request: { + subtype: 'can_use_tool', + tool_name: 'Bash', + tool_use_id: 'tool-456', + input: { command: 'ls' }, + }, + }) + + const { proc, writeToStdout, simulateExit } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + writeToStdout(controlRequest) + + await new Promise(r => setTimeout(r, 50)) + simulateExit(0) + await new Promise(r => setTimeout(r, 50)) + + expect(opts.state.permissionRequests.length).toBe(1) + expect(opts.state.permissionRequests[0]!.requestId).toBe('req-123') + }) + + test('sendMessage writes NDJSON to stdin', async () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + const result = await manager.sendMessage('hello world') + + expect(result).toBe(true) + }) + + test('sendInterrupt writes interrupt control request', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.sendInterrupt() + + const stdin = proc.stdin as unknown as { write: ReturnType } + expect(stdin.write).toBeDefined() + }) + + test('respondToPermissionRequest sends allow response', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.respondToPermissionRequest('req-123', { + behavior: 'allow', + updatedInput: { command: 'ls -la' }, + }) + }) + + test('respondToPermissionRequest sends deny response', () => { + const { proc } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.respondToPermissionRequest('req-123', { + behavior: 'deny', + message: 'User denied', + }) + }) + + test('process exit without reconnect calls onDisconnected', async () => { + const { proc, simulateExit } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + simulateExit(1) + + await new Promise(r => setTimeout(r, 100)) + + expect(opts.state.disconnectedCount).toBe(1) + expect(manager.isConnected()).toBe(false) + }) + + test('user disconnect does not trigger reconnect', async () => { + let reconnectCalled = false + const { proc } = createMockSubprocess() + const opts = createMockOptions({ + reconnect: async () => { + reconnectCalled = true + return createMockSubprocess().proc + }, + maxReconnectAttempts: 3, + }) + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + manager.disconnect() + + await new Promise(r => setTimeout(r, 200)) + + expect(reconnectCalled).toBe(false) + expect(opts.state.reconnectingCalls.length).toBe(0) + }) + + test('invalid JSON lines are silently skipped', async () => { + const { proc, writeToStdout, simulateExit } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + writeToStdout('not valid json') + writeToStdout('{also: broken') + writeToStdout( + JSON.stringify({ type: 'assistant', message: { role: 'assistant' } }), + ) + + await new Promise(r => setTimeout(r, 50)) + simulateExit(0) + await new Promise(r => setTimeout(r, 50)) + + expect(opts.state.messages.length).toBe(1) + expect(opts.state.errors.length).toBe(0) + }) + + test('non-StdoutMessage objects are skipped', async () => { + const { proc, writeToStdout, simulateExit } = createMockSubprocess() + const opts = createMockOptions() + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + writeToStdout(JSON.stringify({ noTypeField: true })) + writeToStdout(JSON.stringify([1, 2, 3])) + writeToStdout(JSON.stringify('string')) + + await new Promise(r => setTimeout(r, 50)) + simulateExit(0) + await new Promise(r => setTimeout(r, 50)) + + expect(opts.state.messages.length).toBe(0) + }) + + test('process exit with reconnect factory attempts reconnection', async () => { + const { proc: proc1, simulateExit } = createMockSubprocess() + const { proc: proc2 } = createMockSubprocess() + + const opts = createMockOptions({ + reconnect: mock(async () => proc2), + maxReconnectAttempts: 3, + }) + const manager = new SSHSessionManagerImpl(proc1, opts) + + manager.connect() + simulateExit(1) + + await new Promise(r => setTimeout(r, 3000)) + + expect(opts.state.reconnectingCalls.length).toBeGreaterThanOrEqual(1) + expect(opts.state.reconnectingCalls[0]!.attempt).toBe(1) + expect(opts.state.reconnectingCalls[0]!.max).toBe(3) + }) + + test('reconnect failure exhausts attempts then disconnects', async () => { + const { proc, simulateExit } = createMockSubprocess() + + const opts = createMockOptions({ + reconnect: mock(async () => { + throw new Error('SSH connection refused') + }), + maxReconnectAttempts: 2, + }) + const manager = new SSHSessionManagerImpl(proc, opts) + + manager.connect() + simulateExit(1) + + await new Promise(r => setTimeout(r, 12000)) + + expect(opts.state.reconnectingCalls.length).toBe(2) + expect(opts.state.disconnectedCount).toBe(1) + expect(manager.isConnected()).toBe(false) + }, 15000) +}) diff --git a/src/ssh/createSSHSession.ts b/src/ssh/createSSHSession.ts index 1db14a1f3..fa10844dd 100644 --- a/src/ssh/createSSHSession.ts +++ b/src/ssh/createSSHSession.ts @@ -1,10 +1,21 @@ -// Auto-generated stub — replace with real implementation import type { Subprocess } from 'bun' -import type { SSHSessionManager, SSHSessionManagerOptions } from './SSHSessionManager.js' +import { SSHSessionManagerImpl } from './SSHSessionManager.js' +import type { + SSHSessionManager, + SSHSessionManagerOptions, +} from './SSHSessionManager.js' +import { createAuthProxy } from './SSHAuthProxy.js' +export type { SSHAuthProxy } from './SSHAuthProxy.js' +import type { SSHAuthProxy } from './SSHAuthProxy.js' +import { probeRemote } from './SSHProbe.js' +import { deployBinary } from './SSHDeploy.js' +import { buildCliLaunch } from '../utils/cliLaunch.js' +import { logForDebugging } from '../utils/debug.js' +import { jsonParse } from '../utils/slowOperations.js' +import { randomUUID } from 'crypto' -export interface SSHAuthProxy { - stop(): void -} +const INIT_TIMEOUT_MS = 30_000 +const STDERR_TAIL_LINES = 20 export interface SSHSession { remoteCwd: string @@ -21,9 +32,419 @@ export class SSHSessionError extends Error { } } -export const createSSHSession: (...args: unknown[]) => Promise = (async () => { - throw new SSHSessionError('SSH sessions are not supported in this build') -}); -export const createLocalSSHSession: (...args: unknown[]) => Promise = (async () => { - throw new SSHSessionError('Local SSH sessions are not supported in this build') -}); +export async function createSSHSession( + config: { + host: string + cwd?: string + localVersion: string + permissionMode?: string + dangerouslySkipPermissions?: boolean + extraCliArgs: string[] + remoteBin?: string + }, + callbacks?: { + onProgress?: (msg: string) => void + }, +): Promise { + const { host, localVersion, extraCliArgs, remoteBin } = config + const onProgress = callbacks?.onProgress + + let remoteBinaryPath: string + let defaultCwd = '/' + + if (remoteBin) { + onProgress?.('Using custom remote binary, skipping probe/deploy…') + remoteBinaryPath = remoteBin + logForDebugging(`[SSH] custom remoteBin: ${remoteBin}`) + // Quick SSH to get remote home directory for default CWD + try { + const pwdProc = Bun.spawn( + ['ssh', '-o', 'BatchMode=yes', '-o', 'ConnectTimeout=5', host, 'pwd'], + { + stdin: 'ignore', + stdout: 'pipe', + stderr: 'ignore', + }, + ) + await pwdProc.exited + const pwd = (await new Response(pwdProc.stdout).text()).trim() + if (pwd.startsWith('/')) defaultCwd = pwd + } catch { + /* use fallback */ + } + } else { + // 1. Probe remote host + const probe = await probeRemote(host, onProgress) + logForDebugging(`[SSH] probe result: ${JSON.stringify(probe)}`) + defaultCwd = probe.defaultCwd + + // 2. Deploy if binary missing or version mismatch + remoteBinaryPath = probe.binaryPath ?? '~/.local/bin/claude' + if (!probe.hasBinary || probe.remoteVersion !== localVersion) { + onProgress?.( + probe.hasBinary + ? `Updating remote binary (${probe.remoteVersion} → ${localVersion})…` + : 'Deploying binary to remote…', + ) + remoteBinaryPath = await deployBinary({ + host, + remotePlatform: probe.remotePlatform, + remoteArch: probe.remoteArch, + localVersion, + onProgress, + }) + } + } + + // 3. Start local auth proxy + const { proxy, localAddress, authEnv } = await createAuthProxy() + logForDebugging(`[SSH] auth proxy listening on ${localAddress}`) + + // 4. Build SSH command with -R reverse forward and remote CLI + const remoteSocketId = randomUUID().slice(0, 8) + const isWindows = process.platform === 'win32' + + const remoteCli: string[] = [] + for (const [k, v] of Object.entries(authEnv)) { + remoteCli.push(`${k}=${v}`) + } + remoteCli.push( + remoteBinaryPath, + '--output-format', + 'stream-json', + '--input-format', + 'stream-json', + '--verbose', + '-p', + ) + if (config.cwd) remoteCli.push('--cwd', config.cwd) + if (config.permissionMode) + remoteCli.push('--permission-mode', config.permissionMode) + if (config.dangerouslySkipPermissions) + remoteCli.push('--dangerously-skip-permissions') + remoteCli.push(...extraCliArgs) + + const sshArgs = ['ssh'] + + if (!isWindows) { + const remoteSocket = `/tmp/claude-ssh-auth-${remoteSocketId}.sock` + sshArgs.push('-R', `${remoteSocket}:${localAddress}`) + sshArgs.push('-o', 'StreamLocalBindUnlink=yes') + // Override auth env to use the remote socket path + const idx = remoteCli.indexOf( + `ANTHROPIC_AUTH_SOCKET=${authEnv.ANTHROPIC_AUTH_SOCKET}`, + ) + if (idx !== -1) { + remoteCli[idx] = `ANTHROPIC_AUTH_SOCKET=${remoteSocket}` + } + } else { + // Windows: TCP reverse forward + const localPort = localAddress.split(':')[1] + const remotePort = 10000 + Math.floor(Math.random() * 50000) + sshArgs.push('-R', `${remotePort}:127.0.0.1:${localPort}`) + // Override auth env to use remote TCP address + const baseIdx = remoteCli.findIndex(s => + s.startsWith('ANTHROPIC_BASE_URL='), + ) + if (baseIdx !== -1) { + remoteCli[baseIdx] = `ANTHROPIC_BASE_URL=http://127.0.0.1:${remotePort}` + } + } + + sshArgs.push(host, remoteCli.join(' ')) + + onProgress?.('Starting remote session…') + logForDebugging(`[SSH] spawning: ${sshArgs.join(' ')}`) + + let proc: Subprocess + try { + proc = Bun.spawn(sshArgs, { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + }) + } catch (err) { + proxy.stop() + throw new SSHSessionError( + `Failed to spawn SSH process: ${err instanceof Error ? err.message : String(err)}`, + ) + } + + const stderrChunks: string[] = [] + collectStderr(proc, stderrChunks) + + let remoteCwd: string + if (remoteBin) { + // Custom binary mode: the remote CLI in print+stream-json mode emits + // init only after receiving the first user message (QueryEngine yield). + // Waiting for init here would deadlock. Instead, verify the process + // is alive and use the configured or probed CWD. + const earlyExit = await Promise.race([ + proc.exited.then(code => code), + new Promise(r => setTimeout(() => r(null), 3_000)), + ]) + if (earlyExit !== null) { + proxy.stop() + const tail = stderrChunks.join('').trim() + throw new SSHSessionError( + `Remote process exited immediately (code ${earlyExit})${tail ? `: ${tail}` : ''}`, + ) + } + remoteCwd = config.cwd || defaultCwd || '/' + } else { + try { + remoteCwd = await waitForInit(proc, config.cwd || defaultCwd) + } catch (err) { + proxy.stop() + proc.kill() + throw err + } + } + + logForDebugging(`[SSH] remote session initialized, remoteCwd=${remoteCwd}`) + + let currentProc = proc + + const reconnect = async (): Promise => { + logForDebugging('[SSH] reconnect: re-spawning SSH process with --continue') + const reconnectArgs = [...sshArgs] + const cmdIdx = reconnectArgs.length - 1 + const existingCmd = reconnectArgs[cmdIdx]! + if (!existingCmd.includes('--continue')) { + reconnectArgs[cmdIdx] = existingCmd.replace( + / -p(?:\s|$)/, + ' -p --continue ', + ) + } + + const newProc = Bun.spawn(reconnectArgs, { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + }) + + const newStderrChunks: string[] = [] + collectStderr(newProc, newStderrChunks) + + await waitForInit(newProc, remoteCwd) + currentProc = newProc + stderrChunks.length = 0 + stderrChunks.push(...newStderrChunks) + + return newProc + } + + return { + remoteCwd, + get proc() { + return currentProc + }, + proxy, + createManager(options: SSHSessionManagerOptions): SSHSessionManager { + return new SSHSessionManagerImpl(currentProc, { + ...options, + reconnect, + }) + }, + getStderrTail(): string { + return stderrChunks.slice(-STDERR_TAIL_LINES).join('') + }, + } +} + +export async function createLocalSSHSession(config: { + cwd?: string + permissionMode?: string + dangerouslySkipPermissions?: boolean +}): Promise { + const { proxy, authEnv } = await createAuthProxy() + + const cliArgs: string[] = [ + '--output-format', + 'stream-json', + '--input-format', + 'stream-json', + '-p', + ] + if (config.cwd) { + cliArgs.push('--cwd', config.cwd) + } + if (config.permissionMode) { + cliArgs.push('--permission-mode', config.permissionMode) + } + if (config.dangerouslySkipPermissions) { + cliArgs.push('--dangerously-skip-permissions') + } + + const spec = buildCliLaunch(cliArgs) + + let proc: Subprocess + try { + proc = Bun.spawn([spec.execPath, ...spec.args], { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + env: { ...spec.env, ...authEnv }, + }) + } catch (err) { + proxy.stop() + throw new SSHSessionError( + `Failed to spawn local CLI process: ${err instanceof Error ? err.message : String(err)}`, + ) + } + + logForDebugging('[SSH] local session spawned, waiting for init message...') + + const stderrChunks: string[] = [] + collectStderr(proc, stderrChunks) + + let remoteCwd: string + try { + remoteCwd = await waitForInit(proc, config.cwd) + } catch (err) { + proxy.stop() + proc.kill() + throw err + } + + logForDebugging(`[SSH] local session initialized, remoteCwd=${remoteCwd}`) + + let currentProc = proc + + const reconnect = async (): Promise => { + logForDebugging('[SSH] local reconnect: re-spawning CLI with --continue') + const reconnectCliArgs = [...cliArgs] + if (!reconnectCliArgs.includes('--continue')) { + reconnectCliArgs.push('--continue') + } + + const reconnectSpec = buildCliLaunch(reconnectCliArgs) + const newProc = Bun.spawn([reconnectSpec.execPath, ...reconnectSpec.args], { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + env: { ...reconnectSpec.env, ...authEnv }, + }) + + const newStderrChunks: string[] = [] + collectStderr(newProc, newStderrChunks) + + await waitForInit(newProc, remoteCwd) + currentProc = newProc + stderrChunks.length = 0 + stderrChunks.push(...newStderrChunks) + + return newProc + } + + return { + remoteCwd, + get proc() { + return currentProc + }, + proxy, + createManager(options: SSHSessionManagerOptions): SSHSessionManager { + return new SSHSessionManagerImpl(currentProc, { + ...options, + reconnect, + }) + }, + getStderrTail(): string { + return stderrChunks.slice(-STDERR_TAIL_LINES).join('') + }, + } +} + +async function waitForInit( + proc: Subprocess, + fallbackCwd?: string, +): Promise { + const stdout = proc.stdout + if (!stdout) { + throw new SSHSessionError('Child process stdout is not readable') + } + + const reader = (stdout as ReadableStream).getReader() + const decoder = new TextDecoder() + let buffer = '' + const deadline = Date.now() + INIT_TIMEOUT_MS + + try { + while (Date.now() < deadline) { + const remaining = deadline - Date.now() + const result = await Promise.race([ + reader.read(), + new Promise<{ done: true; value: undefined }>((_, reject) => + setTimeout( + () => + reject( + new SSHSessionError( + 'Remote CLI did not initialize within 30 seconds. Check SSH connectivity and remote binary.', + ), + ), + remaining, + ), + ), + ]) + + if (result.done) { + throw new SSHSessionError( + 'Child process exited before sending init message', + ) + } + + buffer += decoder.decode(result.value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + const trimmed = line.trim() + if (!trimmed) continue + try { + const msg = jsonParse(trimmed) as Record + if (msg.type === 'system' && msg.subtype === 'init') { + reader.releaseLock() + return (msg.cwd as string) || fallbackCwd || process.cwd() + } + } catch { + // not valid JSON — skip + } + } + } + } catch (err) { + reader.releaseLock() + throw err instanceof SSHSessionError + ? err + : new SSHSessionError( + `Error reading init message: ${err instanceof Error ? err.message : String(err)}`, + ) + } + + reader.releaseLock() + throw new SSHSessionError( + 'Remote CLI did not initialize within 30 seconds. Check SSH connectivity and remote binary.', + ) +} + +function collectStderr(proc: Subprocess, chunks: string[]): void { + const stderr = proc.stderr + if (!stderr) return + + const reader = (stderr as ReadableStream).getReader() + const decoder = new TextDecoder() + + void (async () => { + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(decoder.decode(value, { stream: true })) + if (chunks.length > STDERR_TAIL_LINES * 2) { + chunks.splice(0, chunks.length - STDERR_TAIL_LINES) + } + } + } catch { + // stderr closed — expected on process exit + } + })() +}