diff --git a/apps/discord-bridge/README.md b/apps/discord-bridge/README.md index 34b14d6..f420d2b 100644 --- a/apps/discord-bridge/README.md +++ b/apps/discord-bridge/README.md @@ -13,8 +13,10 @@ Set these environment values before starting the bridge: ```bash CODEX_DISCORD_HOME_CHANNEL_ID=1502107617512919220 CODEX_DISCORD_MAIN_THREAD_ID=019e2509-ddbb-7380-b97b-41575092d86b +CODEX_DISCORD_WORKSPACE_FORUM_CHANNEL_ID=1502107617512919221 +CODEX_DISCORD_TASK_THREADS_CHANNEL_ID=1502107617512919222 CODEX_DISCORD_ALLOWED_CHANNEL_IDS=1502107617512919220 -CODEX_DISCORD_DIR=/home/peezy/codex-fork-workspace/codex-flows +CODEX_DISCORD_DIR=/home/peezy CODEX_FLOW_BACKEND_URL=http://127.0.0.1:8090 CODEX_DISCORD_HOOK_SPOOL_DIR=/home/peezy/.codex/discord-bridge/stop-hooks ``` @@ -30,7 +32,9 @@ In the home channel: - normal messages are sent to the main operator thread - bot mentions are treated as gateway messages and do not create Discord task threads -- `status` replies directly with gateway state instead of starting a Codex turn +- `/status` replies directly with gateway state instead of starting a Codex turn +- `/status` also lists active Codex threads, linking any opened Discord thread + and offering private buttons to open active threads that are not yet in Discord The prompt sent to the main thread uses `[discord-gateway]` framing so the model knows it is operating as the gateway over the codex-flows backend, not as a @@ -68,6 +72,55 @@ Gateway state stores delegation records, including optional Discord detail thread ids for noisy work. Delegated Codex sessions do not receive the privileged gateway tools; only the main operator thread can manage delegation. +## Workbench Prototype + +The gateway can optionally maintain a noisy Discord workbench beside the home +channel. Configure both channels to enable it: + +```bash +CODEX_DISCORD_WORKSPACE_FORUM_CHANNEL_ID=1502107617512919221 +CODEX_DISCORD_TASK_THREADS_CHANNEL_ID=1502107617512919222 +``` + +The home channel remains the compact operator chat. The workspace forum gets one +post for each discoverable top-level folder under `CODEX_DISCORD_DIR`, which is +the gateway's main workspace root. For the home-folder gateway, set +`CODEX_DISCORD_DIR=/home/peezy`; a delegated cwd such as +`/home/peezy/codex-fork-workspace/codex-flows` maps to the +`/home/peezy/codex-fork-workspace` workspace post. Hidden folders and +`node_modules` are skipped. Workspace posts are compact dashboards that only +show Codex threads already opened into Discord. Run `/threads` in a workspace +post to list all Codex threads for that workspace; the bridge replies with an +ephemeral numbered button picker visible only to the command sender. Choosing a +number opens or reuses one Discord task thread in the task thread channel, and +messages in that Discord thread are routed directly to the opened Codex thread. + +When the workbench is enabled: + +- `start_delegation` and `resume_delegation` create or reuse the workspace forum + post for the top-level workspace containing the delegation cwd +- bridge startup creates missing workspace forum posts for discoverable folders + under the main workspace root +- workspace dashboards list only open Discord task threads for that workspace +- `/threads` lists known Codex threads from `thread/list` plus tracked + delegations that may not have appeared in the list yet +- choosing an item from the ephemeral `/threads` picker creates or reuses one + Discord task thread in the task thread channel +- `/status` shows all active Codex threads across workspaces and uses the same + ephemeral button flow to open active threads without Discord task threads +- repeated delegations in the same cwd reuse the same workspace post and update + the workspace thread list +- Stop lifecycle events update the workspace dashboard and any already-opened + task thread +- the home channel receives only compact status/link messages for completed + delegations +- main-thread injection and wake behavior still follow the delegation return + mode + +If both workbench channels are omitted, the workbench is disabled and the bridge +keeps the legacy home-channel result mirroring behavior. Setting only one +workbench channel is rejected as an invalid partial configuration. + Delegations support return modes: - `wake_on_done`: inject and mirror the result, then wake the main operator when idle @@ -78,15 +131,17 @@ Delegations support return modes: Automatic result return uses `thread/inject_items` to append structured delegation results to the main operator thread's model-visible history. Codex -`Stop` hooks, not background thread polling, drive automatic result return: -the global hook writes durable Stop events into the spool directory, and the -gateway drains that spool on startup and while running. Starting a main-thread -turn is a separate wake step, so long-running main goals are not interrupted; -wakes are queued until the main operator thread is idle. +hooks, not background thread polling, drive automatic result return and passive +observability. The global hook writes durable lifecycle events into the spool +directory, and the gateway drains that spool on startup and while running. +Starting a main-thread turn is a separate wake step, so long-running main goals +are not interrupted; wakes are queued until the main operator thread is idle. +For sessions that were not created through the gateway, the same hook stream +updates an observed-thread index used by `/threads`. -## Codex Stop Hook +## Codex Hooks -Install the global hook once for the Codex runtime that backs the gateway: +Install the global hooks once for the Codex runtime that backs the gateway: ```bash codex-discord-bridge hook install @@ -102,19 +157,40 @@ The installer enables the current hooks feature in `~/.codex/config.toml`: hooks = true ``` -It also registers the Stop hook in `~/.codex/hooks.json`: +It also registers passive observability hooks in `~/.codex/hooks.json`: ```json { "hooks": { + "SessionStart": [ + { + "hooks": [ + { + "type": "command", + "command": "codex-discord-bridge hook event", + "timeout": 10 + } + ] + } + ], + "UserPromptSubmit": [ + { + "hooks": [ + { + "type": "command", + "command": "codex-discord-bridge hook event", + "timeout": 10 + } + ] + } + ], "Stop": [ { "hooks": [ { "type": "command", - "command": "codex-discord-bridge hook stop", - "timeout": 10, - "statusMessage": "Recording Discord gateway Stop event" + "command": "codex-discord-bridge hook event", + "timeout": 10 } ] } @@ -123,6 +199,11 @@ It also registers the Stop hook in `~/.codex/hooks.json`: } ``` +The installer also registers `PreToolUse`, `PermissionRequest`, and +`PostToolUse` with the same command. Those higher-volume events update local +observed-thread metadata such as status, current tool, or waiting reason; they +do not create Discord messages. + For package-on-demand installs, write a `bunx` command instead: ```bash @@ -131,9 +212,12 @@ codex-discord-bridge hook install --bunx-package @peezy.tech/codex-flows ``` The hook is intentionally dumb: it does not read gateway state or call the -backend. It only writes idempotent Stop-event files. The gateway ignores unknown -sessions, treats known delegated sessions according to their return mode, and -uses main-operator Stop events to drain queued wakes. +backend. It only writes idempotent lifecycle-event files and lets Codex +continue. The gateway treats known delegated `Stop` events according to their +return mode, uses main-operator `Stop` events to drain queued wakes, and records +unknown non-main sessions as observed threads. Observed threads are visible from +`/threads` for their workspace and can be opened into the task thread channel +on demand. After changing hook configuration, restart the Codex runtime that backs the gateway and trust the hook when Codex asks for review. `hooks/list` should show diff --git a/apps/discord-bridge/src/bridge.ts b/apps/discord-bridge/src/bridge.ts index 6e2c2f8..1e633f6 100644 --- a/apps/discord-bridge/src/bridge.ts +++ b/apps/discord-bridge/src/bridge.ts @@ -1,7 +1,8 @@ -import { watch, type FSWatcher } from "node:fs"; +import { watch, type Dirent, type FSWatcher } from "node:fs"; +import { readdir, stat } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { createHash } from "node:crypto"; +import { createHash, randomUUID } from "node:crypto"; import type { JsonRpcNotification, JsonRpcRequest } from "@peezy.tech/codex-flows/rpc"; import type { JsonValue } from "@peezy.tech/codex-flows/generated/serde_json/JsonValue"; @@ -24,8 +25,10 @@ import type { DiscordBridgeConfig, DiscordGatewayDelegation, DiscordGatewayDelegationReturnMode, + DiscordGatewayHookEvent, + DiscordGatewayObservedThread, DiscordGatewayPendingWake, - DiscordGatewayStopHookEvent, + DiscordGatewayWorkspaceSurface, DiscordBridgeSession, DiscordBridgeState, DiscordBridgeStateStore, @@ -34,6 +37,10 @@ import type { DiscordClearWebhooksInbound, DiscordInbound, DiscordMessageInbound, + DiscordReactionInbound, + DiscordStatusInbound, + DiscordThreadPickerInbound, + DiscordThreadsInbound, DiscordThreadStartInbound, } from "./types.ts"; @@ -41,6 +48,18 @@ const maxDiscordMessageLength = 2000; const gatewayToolsVersion = 1; const stopHookDrainDebounceMs = 100; const stopHookRetryMs = 1_000; +const threadPickerReactions = [ + "1️⃣", + "2️⃣", + "3️⃣", + "4️⃣", + "5️⃣", + "6️⃣", + "7️⃣", + "8️⃣", + "9️⃣", + "🔟", +]; type ThreadSnapshot = { terminalTurnIds: string[]; @@ -50,6 +69,21 @@ type ThreadSnapshot = { }; }; +type WorkspaceThreadSummary = { + id: string; + title: string; + cwd: string; + status: string; + updatedAt: number; + discordThreadId?: string; +}; + +type WorkspaceThreadPicker = { + channelId: string; + authorId: string; + entries: WorkspaceThreadSummary[]; +}; + export class DiscordCodexBridge { readonly client: CodexBridgeClient; readonly transport: DiscordBridgeTransport; @@ -66,6 +100,9 @@ export class DiscordCodexBridge { #gatewayStopHookWatcher: FSWatcher | undefined; #gatewayStopHookDrainTimer: Timer | undefined; #gatewayStopHookDrainChain: Promise = Promise.resolve(); + #transportStarted = false; + #threadPickersByMessage = new Map(); + #threadPickersById = new Map(); constructor(options: { client: CodexBridgeClient; @@ -137,11 +174,17 @@ export class DiscordCodexBridge { }); }, }); + this.#transportStarted = true; this.#debug("transport.started"); - await this.transport.registerCommands(); + await this.#reconcileGatewayWorkbench(); + await this.transport.registerCommands({ + channelIds: this.#commandRegistrationChannelIds(), + }); this.#debug("commands.registered"); for (const runner of this.#runnersByDiscordThread.values()) { - runner.start(); + if (this.#shouldAutoStartRunner(runner.session)) { + runner.start(); + } } await this.#startGatewayStopHookSpool(); } @@ -164,6 +207,7 @@ export class DiscordCodexBridge { await this.#gatewayStopHookDrainChain.catch(() => undefined); await this.#persistChain.catch(() => undefined); await this.transport.stop(); + this.#transportStarted = false; this.client.close(); } @@ -213,6 +257,22 @@ export class DiscordCodexBridge { await this.#handleClearWebhooks(inbound); return; } + if (inbound.kind === "status") { + await this.#handleStatusCommand(inbound); + return; + } + if (inbound.kind === "threads") { + await this.#handleThreadsCommand(inbound); + return; + } + if (inbound.kind === "threadPicker") { + await this.#handleThreadPickerSelection(inbound); + return; + } + if (inbound.kind === "reaction") { + await this.#handleThreadPickerReaction(inbound); + return; + } if (inbound.kind === "threadStart") { if (this.config.gateway?.homeChannelId === inbound.channelId) { @@ -358,6 +418,198 @@ export class DiscordCodexBridge { await command.reply?.(clearWebhooksSummary(result)); } + async #handleStatusCommand(command: DiscordStatusInbound): Promise { + if (!this.config.allowedUserIds.has(command.author.id)) { + this.#debug("status.ignored.user", { + channelId: command.channelId, + authorId: command.author.id, + }); + await command.reply?.("Only globally allowed Discord users can read gateway status."); + return; + } + if (!this.#isAllowedChannel(command.channelId)) { + this.#debug("status.ignored.channel", { channelId: command.channelId }); + await command.reply?.("This Discord channel is not allowed for the bridge."); + return; + } + const activeThreads = await this.#listActiveCodexThreadSummaries(); + const openableThreads = activeThreads.filter((thread) => + !thread.discordThreadId && + !this.#isGatewayMainThread(thread.id) && + Boolean(this.#gatewayWorkbenchConfig()) + ).slice(0, threadPickerReactions.length); + const statusText = this.#gatewayStatusMessage({ + activeThreads, + openableThreads, + }); + if (openableThreads.length === 0 || !command.replyPicker) { + await command.reply?.(statusText); + return; + } + const pickerId = `status-${randomUUID()}`; + this.#threadPickersById.set(pickerId, { + channelId: command.channelId, + authorId: command.author.id, + entries: openableThreads, + }); + try { + await command.replyPicker({ + pickerId, + text: statusText, + options: openableThreads.map((_, index) => ({ + id: String(index), + label: String(index + 1), + })), + }); + } catch (error) { + this.#threadPickersById.delete(pickerId); + await command.reply?.( + `Failed to send active-thread picker: ${errorMessage(error)}`, + ); + } + } + + async #handleThreadsCommand(command: DiscordThreadsInbound): Promise { + if (!this.config.allowedUserIds.has(command.author.id)) { + this.#debug("threads.ignored.user", { + channelId: command.channelId, + authorId: command.author.id, + }); + await command.reply?.("Only globally allowed Discord users can list workspace threads."); + return; + } + if (!this.#isAllowedChannel(command.channelId)) { + this.#debug("threads.ignored.channel", { channelId: command.channelId }); + await command.reply?.("This Discord channel is not allowed for the bridge."); + return; + } + const workspace = this.#workspaceForChannel(command.channelId); + if (!workspace) { + await command.reply?.("Run `/threads` in a workspace forum post or opened workspace thread."); + return; + } + const threads = await this.#listWorkspaceThreads(workspace); + if (threads.length === 0) { + await command.reply?.(`No Codex threads found for ${workspace.title}.`); + return; + } + if (!command.replyPicker) { + await command.reply?.( + "This Discord transport cannot send ephemeral thread pickers.", + ); + return; + } + const entries = threads.slice(0, threadPickerReactions.length); + const pickerId = `threads-${randomUUID()}`; + this.#threadPickersById.set(pickerId, { + channelId: command.channelId, + authorId: command.author.id, + entries, + }); + try { + await command.replyPicker({ + pickerId, + text: threadPickerText(workspace, entries, threads.length, { + action: "Choose a number to open or resume that thread in Discord.", + }), + options: entries.map((_, index) => ({ + id: String(index), + label: String(index + 1), + })), + }); + } catch (error) { + this.#threadPickersById.delete(pickerId); + await command.reply?.( + `Failed to send the ephemeral thread picker: ${errorMessage(error)}`, + ); + return; + } + } + + async #handleThreadPickerSelection( + selection: DiscordThreadPickerInbound, + ): Promise { + if (!this.config.allowedUserIds.has(selection.author.id)) { + return; + } + const picker = this.#threadPickersById.get(selection.pickerId); + if (!picker) { + await selection.update?.("This thread picker is no longer active."); + return; + } + if (selection.author.id !== picker.authorId) { + await selection.reply?.("Only the user who ran `/threads` can use this picker."); + return; + } + const index = Number.parseInt(selection.optionId, 10); + const entry = Number.isInteger(index) ? picker.entries[index] : undefined; + if (!entry) { + await selection.update?.("That thread choice is no longer available."); + return; + } + this.#threadPickersById.delete(selection.pickerId); + try { + const session = await this.#materializeWorkspaceThread(entry.id, { + author: selection.author, + }); + await updateOrReply( + selection, + `Opened ${session.title}: <#${session.discordThreadId}>`, + ); + } catch (error) { + this.#error("threads.picker.openFailed", { + channelId: selection.channelId, + pickerId: selection.pickerId, + threadId: entry.id, + error: errorMessage(error), + }); + await updateOrReply( + selection, + `Failed to open ${entry.title}: ${errorMessage(error)}`, + ); + } + } + + async #handleThreadPickerReaction(reaction: DiscordReactionInbound): Promise { + if (!this.config.allowedUserIds.has(reaction.author.id)) { + return; + } + const pickerKey = threadPickerKey(reaction.channelId, reaction.messageId); + const picker = this.#threadPickersByMessage.get(pickerKey); + if (!picker) { + return; + } + if (reaction.author.id !== picker.authorId) { + return; + } + const index = threadPickerReactionIndex(reaction.emoji); + const entry = index === undefined ? undefined : picker.entries[index]; + if (!entry) { + return; + } + this.#threadPickersByMessage.delete(pickerKey); + try { + const session = await this.#materializeWorkspaceThread(entry.id, { + author: reaction.author, + }); + await this.transport.sendMessage( + picker.channelId, + `Opened ${session.title}: <#${session.discordThreadId}>`, + ); + } catch (error) { + this.#error("threads.reaction.openFailed", { + channelId: reaction.channelId, + messageId: reaction.messageId, + threadId: entry.id, + error: errorMessage(error), + }); + await this.transport.sendMessage( + picker.channelId, + `Failed to open ${entry.title}: ${errorMessage(error)}`, + ); + } + } + async #handleThreadStart(start: DiscordThreadStartInbound): Promise { const state = this.#requireState(); if ( @@ -560,16 +812,6 @@ export class DiscordCodexBridge { }); return; } - const command = parseGatewayCommand(message.content); - if (command === "status") { - await this.transport.sendMessage( - message.channelId, - this.#gatewayStatusMessage(), - ); - addProcessedMessageId(this.#requireState(), message.messageId); - await this.#persist(); - return; - } const runner = this.#gatewayRunner(); if (!runner) { this.#debug("gateway.message.ignored.noSession", { @@ -581,14 +823,23 @@ export class DiscordCodexBridge { await runner.enqueueMessage(message); } - #gatewayStatusMessage(): string { + #gatewayStatusMessage( + options: { + activeThreads?: WorkspaceThreadSummary[]; + openableThreads?: WorkspaceThreadSummary[]; + } = {}, + ): string { const state = this.#requireState(); const gateway = state.gateway; const session = this.#gatewaySession(); const delegations = gateway?.delegations ?? []; + const workspaces = gateway?.workspaces ?? []; const activeDelegations = delegations.filter((delegation) => delegation.status === "active" ); + const workbench = this.#gatewayWorkbenchConfig(); + const activeThreads = options.activeThreads ?? []; + const openableThreads = options.openableThreads ?? []; return [ "**Codex Gateway**", `Home channel: \`${this.config.gateway?.homeChannelId ?? "disabled"}\``, @@ -601,12 +852,29 @@ export class DiscordCodexBridge { `Status: ${state.gateway?.toolsVersion === gatewayToolsVersion ? "privileged gateway tools available to the main Codex operator thread" : "waiting for a tool-enabled main Codex operator thread"}.`, `Flow backend: \`${this.config.flowBackendUrl ?? "not configured"}\``, "", - "**Detail Threads**", - "Status: optional detail-thread records are supported in state; automatic detail thread mirroring is not enabled yet.", - ].join("\n"); + "**Workbench**", + workbench + ? `Status: enabled; workspace forum <#${workbench.workspaceForumChannelId}>, task threads <#${workbench.taskThreadsChannelId}>` + : "Status: disabled", + `Workspaces: ${workspaces.length} tracked`, + "", + "**Active Codex Threads**", + activeThreads.length > 0 + ? activeThreadStatusLines(activeThreads, openableThreads).join("\n") + : "None", + openableThreads.length > 0 + ? "Choose a number to create or reuse a Discord task thread." + : undefined, + ].filter((line): line is string => line !== undefined).join("\n"); } async #handleNotification(message: JsonRpcNotification): Promise { + if (!this.#transportStarted) { + this.#debug("notification.ignored.transportNotStarted", { + method: message.method, + }); + return; + } const params = record(message.params); const threadId = stringValue(params.threadId); if (!threadId) { @@ -749,11 +1017,12 @@ export class DiscordCodexBridge { state.gateway?.toolsVersion === gatewayToolsVersion; if (existing && shouldReuseExisting) { try { + const gatewayCwd = this.config.cwd ?? existing.cwd; const resumed = await this.client.resumeThread(this.#threadResumeParams( existing.codexThreadId, - existing.cwd ?? this.config.cwd, + gatewayCwd, )); - existing.cwd = resumeResponseCwd(resumed) ?? existing.cwd ?? this.config.cwd; + existing.cwd = gatewayCwd ?? resumeResponseCwd(resumed) ?? existing.cwd; state.gateway = { homeChannelId: gatewayConfig.homeChannelId, mainThreadId: existing.codexThreadId, @@ -761,7 +1030,10 @@ export class DiscordCodexBridge { createdAt: existing.createdAt, toolsVersion: state.gateway?.toolsVersion, delegations: state.gateway?.delegations ?? [], + workspaces: state.gateway?.workspaces ?? [], + observedThreads: state.gateway?.observedThreads ?? [], pendingWakes: state.gateway?.pendingWakes ?? [], + processedHookEventIds: state.gateway?.processedHookEventIds ?? [], processedStopHookEventIds: state.gateway?.processedStopHookEventIds ?? [], }; this.#registerRunner(existing); @@ -825,7 +1097,10 @@ export class DiscordCodexBridge { ? state.gateway?.toolsVersion : gatewayToolsVersion, delegations: state.gateway?.delegations ?? [], + workspaces: state.gateway?.workspaces ?? [], + observedThreads: state.gateway?.observedThreads ?? [], pendingWakes: state.gateway?.pendingWakes ?? [], + processedHookEventIds: state.gateway?.processedHookEventIds ?? [], processedStopHookEventIds: state.gateway?.processedStopHookEventIds ?? [], }; state.sessions.push(session); @@ -856,6 +1131,11 @@ export class DiscordCodexBridge { : undefined; } + #shouldAutoStartRunner(session: DiscordBridgeSession): boolean { + const workbench = this.#gatewayWorkbenchConfig(); + return session.parentChannelId !== workbench?.taskThreadsChannelId; + } + #isGatewayMainThread(threadId: string): boolean { const session = this.#gatewaySession(); return Boolean( @@ -864,6 +1144,19 @@ export class DiscordCodexBridge { ); } + #gatewayWorkbenchConfig(): + | { workspaceForumChannelId: string; taskThreadsChannelId: string } + | undefined { + const gateway = this.config.gateway; + if (!gateway?.workspaceForumChannelId || !gateway.taskThreadsChannelId) { + return undefined; + } + return { + workspaceForumChannelId: gateway.workspaceForumChannelId, + taskThreadsChannelId: gateway.taskThreadsChannelId, + }; + } + #gatewayStopHookSpoolDir(): string { return this.config.hookSpoolDir ?? path.join(path.dirname(this.config.statePath), "stop-hooks"); @@ -876,7 +1169,10 @@ export class DiscordCodexBridge { homeChannelId: this.config.gateway?.homeChannelId ?? "", mainThreadId: this.#gatewaySession()?.codexThreadId, delegations: [], + workspaces: [], + observedThreads: [], pendingWakes: [], + processedHookEventIds: [], processedStopHookEventIds: [], }; } @@ -884,6 +1180,24 @@ export class DiscordCodexBridge { return state.gateway.delegations; } + #gatewayWorkspaces(): DiscordGatewayWorkspaceSurface[] { + const state = this.#requireState(); + if (!state.gateway) { + state.gateway = { + homeChannelId: this.config.gateway?.homeChannelId ?? "", + mainThreadId: this.#gatewaySession()?.codexThreadId, + delegations: [], + workspaces: [], + observedThreads: [], + pendingWakes: [], + processedHookEventIds: [], + processedStopHookEventIds: [], + }; + } + state.gateway.workspaces ??= []; + return state.gateway.workspaces; + } + #gatewayPendingWakes(): DiscordGatewayPendingWake[] { const state = this.#requireState(); if (!state.gateway) { @@ -891,7 +1205,10 @@ export class DiscordCodexBridge { homeChannelId: this.config.gateway?.homeChannelId ?? "", mainThreadId: this.#gatewaySession()?.codexThreadId, delegations: [], + workspaces: [], + observedThreads: [], pendingWakes: [], + processedHookEventIds: [], processedStopHookEventIds: [], }; } @@ -899,19 +1216,42 @@ export class DiscordCodexBridge { return state.gateway.pendingWakes; } - #gatewayProcessedStopHookEventIds(): string[] { + #gatewayObservedThreads(): DiscordGatewayObservedThread[] { const state = this.#requireState(); if (!state.gateway) { state.gateway = { homeChannelId: this.config.gateway?.homeChannelId ?? "", mainThreadId: this.#gatewaySession()?.codexThreadId, delegations: [], + workspaces: [], + observedThreads: [], pendingWakes: [], + processedHookEventIds: [], processedStopHookEventIds: [], }; } - state.gateway.processedStopHookEventIds ??= []; - return state.gateway.processedStopHookEventIds; + state.gateway.observedThreads ??= []; + return state.gateway.observedThreads; + } + + #gatewayProcessedHookEventIds(): string[] { + const state = this.#requireState(); + if (!state.gateway) { + state.gateway = { + homeChannelId: this.config.gateway?.homeChannelId ?? "", + mainThreadId: this.#gatewaySession()?.codexThreadId, + delegations: [], + workspaces: [], + observedThreads: [], + pendingWakes: [], + processedHookEventIds: [], + processedStopHookEventIds: [], + }; + } + state.gateway.processedHookEventIds ??= [ + ...(state.gateway.processedStopHookEventIds ?? []), + ]; + return state.gateway.processedHookEventIds; } async #startDelegation(args: Record): Promise { @@ -944,6 +1284,7 @@ export class DiscordCodexBridge { createdAt: now, updatedAt: now, }); + const workbench = await this.#ensureDelegationWorkbench(delegation); let turnId: string | undefined; if (prompt) { const turn = await this.client.startTurn({ @@ -962,7 +1303,7 @@ export class DiscordCodexBridge { delegation.lastTurnId = turnId; } await this.#persist(); - return { delegation, turnId }; + return { delegation, turnId, workbench }; } async #resumeDelegation(args: Record): Promise { @@ -984,8 +1325,9 @@ export class DiscordCodexBridge { createdAt: this.#delegationForThread(codexThreadId)?.createdAt ?? now, updatedAt: now, }); + const workbench = await this.#ensureDelegationWorkbench(delegation); await this.#persist(); - return { delegation }; + return { delegation, workbench }; } async #sendDelegation(args: Record): Promise { @@ -1018,10 +1360,14 @@ export class DiscordCodexBridge { delegation.completedAt = undefined; delegation.injectedAt = undefined; delegation.mirroredAt = undefined; + delegation.taskMirroredAt = undefined; delegation.reportedAt = undefined; delegation.updatedAt = this.#now().toISOString(); + const workbench = await this.#syncDelegationWorkbench(delegation, { + includeTaskResult: false, + }); await this.#persist(); - return { delegation, turnId: turn.turn.id }; + return { delegation, turnId: turn.turn.id, workbench }; } async #readDelegation(args: Record): Promise { @@ -1137,6 +1483,608 @@ export class DiscordCodexBridge { })); } + async #ensureDelegationWorkbench( + delegation: DiscordGatewayDelegation, + ): Promise { + return await this.#syncDelegationWorkbench(delegation, { + includeTaskResult: false, + }); + } + + async #syncDelegationWorkbench( + delegation: DiscordGatewayDelegation, + options: { includeTaskResult: boolean }, + ): Promise { + const config = this.#gatewayWorkbenchConfig(); + if (!config) { + return { enabled: false }; + } + try { + const workspace = await this.#ensureWorkspaceSurface(delegation, config); + if (delegation.discordTaskThreadId) { + await this.#ensureDelegationTaskThread(delegation, workspace, config); + } + if (options.includeTaskResult) { + await this.#mirrorDelegationResultToTaskThread(delegation); + } + await this.#updateWorkspaceSurface(workspace); + return { + enabled: true, + workspace: { + key: workspace.key, + cwd: workspace.cwd, + threadId: workspace.discordThreadId, + }, + taskThreadId: delegation.discordTaskThreadId, + }; + } catch (error) { + const message = errorMessage(error); + this.#debug("gateway.workbench.sync.failed", { + delegationId: delegation.id, + codexThreadId: delegation.codexThreadId, + error: message, + }); + return { enabled: true, error: message }; + } + } + + async #materializeWorkspaceThread( + codexThreadId: string, + input: { author: { id: string } }, + ): Promise { + const config = this.#gatewayWorkbenchConfig(); + if (!config) { + throw new Error("Gateway workbench is not enabled."); + } + const existing = this.#requireState().sessions.find((session) => + session.codexThreadId === codexThreadId && + session.parentChannelId === config.taskThreadsChannelId + ); + if (existing) { + this.#registerRunner(existing).start(); + return existing; + } + + const delegation = this.#delegationForThread(codexThreadId); + const observed = this.#observedThreadForThread(codexThreadId); + const resumed = await this.client.resumeThread( + this.#threadResumeParams(codexThreadId, delegation?.cwd ?? observed?.cwd), + ); + const thread = threadFromResponse(resumed); + const cwd = resumeResponseCwd(resumed) ?? thread?.cwd ?? delegation?.cwd ?? + observed?.cwd ?? + this.config.cwd; + const title = delegation?.title ?? observed?.title ?? (thread + ? codexThreadTitle(thread) + : `Codex ${compactId(codexThreadId)}`); + const workspace = await this.#ensureWorkspaceSurfaceForCwd( + workspaceCwdForPath(cwd, this.config.cwd), + config, + ); + const discordThreadId = await this.transport.createThread( + config.taskThreadsChannelId, + truncateDiscordThreadName(`${workspace.title}: ${title}`), + ); + const session: DiscordBridgeSession = { + discordThreadId, + parentChannelId: config.taskThreadsChannelId, + codexThreadId, + title, + createdAt: this.#now().toISOString(), + ownerUserId: input.author.id, + cwd, + mode: "workspace", + }; + this.#requireState().sessions.push(session); + this.#registerRunner(session).start(); + + if (delegation) { + delegation.workspaceKey = workspace.key; + delegation.discordWorkspaceThreadId = workspace.discordThreadId; + delegation.discordTaskThreadId = discordThreadId; + delegation.discordDetailThreadId ??= discordThreadId; + delegation.updatedAt = this.#now().toISOString(); + await this.#mirrorDelegationResultToTaskThread(delegation); + } + await this.#updateWorkspaceSurface(workspace); + await this.#persist(); + this.#debug("gateway.workbench.thread.opened", { + codexThreadId, + discordThreadId, + workspaceKey: workspace.key, + }); + return session; + } + + async #reconcileGatewayWorkbench(): Promise { + const config = this.#gatewayWorkbenchConfig(); + if (!config) { + return; + } + for (const cwd of await this.#discoverGatewayWorkspaceCwds()) { + try { + const workspace = await this.#ensureWorkspaceSurfaceForCwd(cwd, config); + await this.#updateWorkspaceSurface(workspace); + } catch (error) { + this.#error("gateway.workbench.workspaceDiscovery.failed", { + cwd, + error: errorMessage(error), + }); + } + } + for (const delegation of this.#gatewayDelegations()) { + await this.#syncDelegationWorkbench(delegation, { + includeTaskResult: false, + }); + } + await this.#persist(); + } + + async #discoverGatewayWorkspaceCwds(): Promise { + const root = normalizeWorkspaceCwd(this.config.cwd); + let entries: Dirent[]; + try { + entries = await readdir(root, { withFileTypes: true }); + } catch (error) { + this.#debug("gateway.workbench.workspaceDiscovery.skipped", { + root, + error: errorMessage(error), + }); + return []; + } + const cwds: string[] = []; + for (const entry of entries) { + if (!isDiscoverableWorkspaceEntry(entry.name)) { + continue; + } + const fullPath = path.join(root, entry.name); + if (entry.isDirectory()) { + cwds.push(fullPath); + continue; + } + if (!entry.isSymbolicLink()) { + continue; + } + try { + if ((await stat(fullPath)).isDirectory()) { + cwds.push(fullPath); + } + } catch { + continue; + } + } + return uniqueStringList(cwds.map((cwd) => normalizeWorkspaceCwd(cwd))).sort( + (left, right) => + workspaceTitle(left).localeCompare(workspaceTitle(right)) || + left.localeCompare(right), + ); + } + + async #ensureWorkspaceSurface( + delegation: DiscordGatewayDelegation, + config: { workspaceForumChannelId: string; taskThreadsChannelId: string }, + ): Promise { + const workspace = await this.#ensureWorkspaceSurfaceForCwd( + workspaceCwdForPath(delegation.cwd ?? this.config.cwd, this.config.cwd), + config, + [delegation], + ); + delegation.workspaceKey = workspace.key; + delegation.discordWorkspaceThreadId = workspace.discordThreadId; + return workspace; + } + + async #ensureWorkspaceSurfaceForCwd( + cwd: string, + config: { workspaceForumChannelId: string; taskThreadsChannelId: string }, + delegations: DiscordGatewayDelegation[] = [], + ): Promise { + if (!this.transport.createForumPost) { + throw new Error("Discord transport cannot create workspace forum posts."); + } + const normalizedCwd = normalizeWorkspaceCwd(cwd); + const key = workspaceKey(normalizedCwd); + const now = this.#now().toISOString(); + const delegationIds = delegations.map((delegation) => delegation.id); + let workspace = this.#gatewayWorkspaces().find((candidate) => + candidate.key === key + ); + if (!workspace) { + const title = workspaceTitle(normalizedCwd); + const created = await this.transport.createForumPost( + config.workspaceForumChannelId, + truncateDiscordThreadName(title), + workspaceDashboardText({ + key, + cwd: normalizedCwd, + title, + discordThreadId: "pending", + statusMessageId: undefined, + delegationIds, + createdAt: now, + updatedAt: now, + }, { delegations }), + ); + workspace = { + key, + cwd: normalizedCwd, + title, + discordThreadId: created.threadId, + statusMessageId: created.messageId, + delegationIds, + createdAt: now, + updatedAt: now, + }; + this.#gatewayWorkspaces().push(workspace); + this.#debug("gateway.workbench.workspace.created", { + key, + cwd: normalizedCwd, + discordThreadId: workspace.discordThreadId, + }); + if (workspace.statusMessageId) { + await this.#pinMessage(workspace.discordThreadId, workspace.statusMessageId); + } + } + workspace.delegationIds = uniqueStringList([ + ...workspace.delegationIds, + ...delegationIds, + ]); + workspace.updatedAt = now; + return workspace; + } + + async #ensureDelegationTaskThread( + delegation: DiscordGatewayDelegation, + workspace: DiscordGatewayWorkspaceSurface, + config: { workspaceForumChannelId: string; taskThreadsChannelId: string }, + ): Promise { + if (!delegation.discordTaskThreadId) { + delegation.discordWorkspaceThreadId = workspace.discordThreadId; + return; + } + const existingSession = delegation.discordTaskThreadId + ? this.#requireState().sessions.find((session) => + session.discordThreadId === delegation.discordTaskThreadId && + session.codexThreadId === delegation.codexThreadId + ) + : undefined; + if (existingSession) { + delegation.discordDetailThreadId ??= delegation.discordTaskThreadId; + delegation.discordWorkspaceThreadId = workspace.discordThreadId; + this.#registerRunner(existingSession); + return; + } + if (delegation.discordTaskThreadId) { + const recovered: DiscordBridgeSession = { + discordThreadId: delegation.discordTaskThreadId, + parentChannelId: config.taskThreadsChannelId, + codexThreadId: delegation.codexThreadId, + title: delegation.title, + createdAt: delegation.createdAt, + cwd: delegation.cwd, + mode: "delegated", + }; + delegation.discordDetailThreadId ??= delegation.discordTaskThreadId; + delegation.discordWorkspaceThreadId = workspace.discordThreadId; + this.#requireState().sessions.push(recovered); + this.#registerRunner(recovered); + return; + } + } + + async #updateWorkspaceSurface( + workspace: DiscordGatewayWorkspaceSurface, + ): Promise { + if (!this.transport.updateMessage) { + return; + } + if (!workspace.statusMessageId) { + return; + } + const delegations = this.#gatewayDelegations().filter((delegation) => + workspace.delegationIds.includes(delegation.id) + ); + const threads = this.#listOpenWorkspaceThreads(workspace); + await this.transport.updateMessage( + workspace.discordThreadId, + workspace.statusMessageId, + workspaceDashboardText(workspace, { + delegations, + threads, + }), + ); + if (workspace.statusMessageId) { + await this.#pinMessage(workspace.discordThreadId, workspace.statusMessageId); + } + } + + async #listWorkspaceThreads( + workspace: DiscordGatewayWorkspaceSurface, + ): Promise { + const byId = new Map(); + for (const thread of await this.#listCodexThreadSummaries()) { + if ( + workspaceKey(workspaceCwdForPath(thread.cwd, this.config.cwd)) === + workspace.key + ) { + byId.set(thread.id, thread); + } + } + for (const delegation of this.#gatewayDelegations()) { + const delegationWorkspaceKey = delegation.workspaceKey ?? + workspaceKey(workspaceCwdForPath(delegation.cwd, this.config.cwd)); + if ( + delegationWorkspaceKey !== workspace.key || + byId.has(delegation.codexThreadId) + ) { + continue; + } + byId.set(delegation.codexThreadId, { + id: delegation.codexThreadId, + title: delegation.title, + cwd: delegation.cwd ?? workspace.cwd, + status: delegation.lastStatus ?? delegation.status, + updatedAt: Date.parse(delegation.updatedAt) / 1000, + discordThreadId: delegation.discordTaskThreadId, + }); + } + for (const observed of this.#gatewayObservedThreads()) { + const observedWorkspaceKey = observed.workspaceKey ?? + workspaceKey(workspaceCwdForPath(observed.cwd, this.config.cwd)); + if (observedWorkspaceKey !== workspace.key) { + continue; + } + const existing = byId.get(observed.threadId); + const observedSummary: WorkspaceThreadSummary = { + id: observed.threadId, + title: observed.title ?? `Codex ${compactId(observed.threadId)}`, + cwd: observed.cwd ?? workspace.cwd, + status: observedThreadStatusText(observed), + updatedAt: Date.parse(observed.lastSeenAt) / 1000, + discordThreadId: this.#workspaceDiscordThreadForCodexThread( + observed.threadId, + )?.discordThreadId, + }; + byId.set( + observed.threadId, + existing + ? { + ...existing, + status: observedSummary.status, + updatedAt: Math.max(existing.updatedAt, observedSummary.updatedAt), + discordThreadId: existing.discordThreadId ?? + observedSummary.discordThreadId, + } + : observedSummary, + ); + } + return [...byId.values()].sort((left, right) => right.updatedAt - left.updatedAt); + } + + async #listActiveCodexThreadSummaries(): Promise { + const byId = new Map(); + const put = (summary: WorkspaceThreadSummary) => { + const existing = byId.get(summary.id); + byId.set(summary.id, { + ...existing, + ...summary, + title: summary.title || existing?.title || `Codex ${compactId(summary.id)}`, + cwd: summary.cwd || existing?.cwd || this.config.cwd || process.cwd(), + status: summary.status || existing?.status || "active", + updatedAt: Math.max(existing?.updatedAt ?? 0, summary.updatedAt), + discordThreadId: existing?.discordThreadId ?? + summary.discordThreadId ?? + this.#discordChannelForCodexThread(summary.id), + }); + }; + + for (const thread of await this.#listCodexThreadSummaries()) { + if (thread.status === "active") { + put({ + ...thread, + discordThreadId: this.#discordChannelForCodexThread(thread.id) ?? + thread.discordThreadId, + }); + } + } + + const state = this.#requireState(); + for (const session of state.sessions) { + if (!this.#isSessionRunning(session, state)) { + continue; + } + put({ + id: session.codexThreadId, + title: session.title, + cwd: session.cwd ?? this.config.cwd ?? process.cwd(), + status: "active", + updatedAt: Date.parse(session.createdAt) / 1000, + discordThreadId: session.discordThreadId, + }); + } + + for (const delegation of this.#gatewayDelegations()) { + if (delegation.status !== "active" && delegation.lastStatus !== "in_progress") { + continue; + } + put({ + id: delegation.codexThreadId, + title: delegation.title, + cwd: delegation.cwd ?? this.config.cwd ?? process.cwd(), + status: delegation.lastStatus ?? delegation.status, + updatedAt: Date.parse(delegation.updatedAt) / 1000, + discordThreadId: this.#discordChannelForCodexThread(delegation.codexThreadId), + }); + } + + for (const observed of this.#gatewayObservedThreads()) { + if ( + observed.status !== "starting" && + observed.status !== "active" && + observed.status !== "tool" && + observed.status !== "waiting" + ) { + continue; + } + put({ + id: observed.threadId, + title: observed.title ?? `Codex ${compactId(observed.threadId)}`, + cwd: observed.cwd ?? this.config.cwd ?? process.cwd(), + status: observedThreadStatusText(observed), + updatedAt: Date.parse(observed.lastSeenAt) / 1000, + discordThreadId: this.#discordChannelForCodexThread(observed.threadId), + }); + } + + return [...byId.values()].sort((left, right) => right.updatedAt - left.updatedAt); + } + + #listOpenWorkspaceThreads( + workspace: DiscordGatewayWorkspaceSurface, + ): WorkspaceThreadSummary[] { + const workbench = this.#gatewayWorkbenchConfig(); + if (!workbench) { + return []; + } + const sessions = this.#requireState().sessions.filter((session) => + session.parentChannelId === workbench.taskThreadsChannelId && + workspaceKey(workspaceCwdForPath(session.cwd, this.config.cwd)) === + workspace.key + ); + return sessions.map((session) => ({ + id: session.codexThreadId, + title: session.title, + cwd: session.cwd ?? workspace.cwd, + status: this.#isSessionRunning(session, this.#requireState()) + ? "active" + : "open", + updatedAt: Date.parse(session.createdAt) / 1000, + discordThreadId: session.discordThreadId, + })).sort((left, right) => right.updatedAt - left.updatedAt); + } + + async #listCodexThreadSummaries(): Promise { + const summaries: WorkspaceThreadSummary[] = []; + let cursor: string | null | undefined; + for (let page = 0; page < 10; page += 1) { + let response: v2.ThreadListResponse; + try { + response = await this.client.listThreads({ + cursor: cursor ?? null, + limit: 100, + sortKey: "updated_at", + sortDirection: "desc", + archived: false, + sourceKinds: [], + useStateDbOnly: false, + }); + } catch (error) { + this.#debug("gateway.workbench.threadList.failed", { + error: errorMessage(error), + }); + return summaries; + } + for (const thread of response.data) { + summaries.push({ + id: thread.id, + title: codexThreadTitle(thread), + cwd: thread.cwd, + status: threadStatusText(thread.status), + updatedAt: thread.updatedAt, + discordThreadId: this.#workspaceDiscordThreadForCodexThread(thread.id) + ?.discordThreadId, + }); + } + if (!response.nextCursor) { + break; + } + cursor = response.nextCursor; + } + return summaries; + } + + #workspaceDiscordThreadForCodexThread( + codexThreadId: string, + ): DiscordBridgeSession | undefined { + const workbench = this.#gatewayWorkbenchConfig(); + return this.#requireState().sessions.find((session) => + session.codexThreadId === codexThreadId && + session.parentChannelId === workbench?.taskThreadsChannelId + ); + } + + #discordChannelForCodexThread(codexThreadId: string): string | undefined { + if (this.#isGatewayMainThread(codexThreadId)) { + return this.config.gateway?.homeChannelId; + } + const session = this.#requireState().sessions.find((candidate) => + candidate.codexThreadId === codexThreadId + ); + const delegation = this.#delegationForThread(codexThreadId); + return session?.discordThreadId ?? + delegation?.discordTaskThreadId ?? + delegation?.discordDetailThreadId; + } + + #workspaceForChannel(channelId: string): DiscordGatewayWorkspaceSurface | undefined { + const workspaces = this.#requireState().gateway?.workspaces ?? []; + const direct = workspaces.find((workspace) => + workspace.discordThreadId === channelId + ); + if (direct) { + return direct; + } + const session = this.#requireState().sessions.find((candidate) => + candidate.discordThreadId === channelId + ); + if (!session?.cwd) { + return undefined; + } + const key = workspaceKey(workspaceCwdForPath(session.cwd, this.config.cwd)); + return workspaces.find((workspace) => workspace.key === key); + } + + async #mirrorDelegationResultToTaskThread( + delegation: DiscordGatewayDelegation, + ): Promise { + if ( + !delegation.discordTaskThreadId || + !delegation.lastFinal || + delegation.taskMirroredAt || + this.#hasDelegationTaskFinalDelivery(delegation) + ) { + return; + } + const outboundMessageIds = await this.transport.sendMessage( + delegation.discordTaskThreadId, + delegationTaskResultText(delegation), + ); + const deliveredAt = this.#now().toISOString(); + this.#requireState().deliveries.push({ + discordMessageId: `gateway-workbench:${delegation.id}:${delegation.lastTurnId ?? "latest"}`, + discordThreadId: delegation.discordTaskThreadId, + codexThreadId: delegation.codexThreadId, + turnId: delegation.lastTurnId, + kind: "final", + outboundMessageIds, + deliveredAt, + }); + delegation.taskMirroredAt = deliveredAt; + delegation.updatedAt = deliveredAt; + } + + #hasDelegationTaskFinalDelivery(delegation: DiscordGatewayDelegation): boolean { + if (!delegation.discordTaskThreadId) { + return false; + } + return this.#requireState().deliveries.some((delivery) => + delivery.kind === "final" && + delivery.discordThreadId === delegation.discordTaskThreadId && + delivery.codexThreadId === delegation.codexThreadId && + (!delegation.lastTurnId || delivery.turnId === delegation.lastTurnId) + ); + } + async #startGatewayStopHookSpool(): Promise { if (!this.config.gateway || this.#gatewayStopHookWatcher) { return; @@ -1197,17 +2145,27 @@ export class DiscordCodexBridge { await archiveStopHookSpoolFile(file, spoolDir, "failed"); continue; } - const processedIds = this.#gatewayProcessedStopHookEventIds(); + const processedIds = this.#gatewayProcessedHookEventIds(); if (processedIds.includes(file.event.id)) { await archiveStopHookSpoolFile(file, spoolDir, "ignored"); continue; } - const result = await this.#handleGatewayStopHookEvent(file.event); + const result = await this.#handleGatewayHookEvent(file.event); if (result === "retry") { shouldRetry = true; continue; } processedIds.push(file.event.id); + if (file.event.eventName === "Stop") { + const gateway = this.#requireState().gateway; + const stopIds = gateway?.processedStopHookEventIds ?? []; + if (!stopIds.includes(file.event.id)) { + stopIds.push(file.event.id); + } + if (gateway) { + gateway.processedStopHookEventIds = stopIds; + } + } await this.#persist(); await archiveStopHookSpoolFile( file, @@ -1220,13 +2178,17 @@ export class DiscordCodexBridge { } } - async #handleGatewayStopHookEvent( - event: DiscordGatewayStopHookEvent, + async #handleGatewayHookEvent( + event: DiscordGatewayHookEvent, ): Promise<"processed" | "ignored" | "retry"> { - if (event.eventName !== "Stop") { - return "ignored"; + const isGatewayMain = this.#isGatewayMainThread(event.sessionId); + if (!isGatewayMain) { + await this.#recordObservedThreadEvent(event); } - if (this.#isGatewayMainThread(event.sessionId)) { + if (event.eventName !== "Stop") { + return "processed"; + } + if (isGatewayMain) { const started = await this.#processPendingWakes({ completedThreadId: event.sessionId, completedTurnId: event.turnId, @@ -1237,7 +2199,7 @@ export class DiscordCodexBridge { } const delegation = this.#delegationForThread(event.sessionId); if (!delegation) { - return "ignored"; + return "processed"; } const completedAt = this.#now().toISOString(); delegation.status = "complete"; @@ -1246,11 +2208,68 @@ export class DiscordCodexBridge { delegation.lastFinal = event.lastAssistantMessage ?? delegation.lastFinal; delegation.completedAt = completedAt; delegation.updatedAt = completedAt; + await this.#syncDelegationWorkbench(delegation, { includeTaskResult: true }); await this.#applyDelegationReturnPolicy(delegation); await this.#processPendingWakes(); return "processed"; } + async #recordObservedThreadEvent( + event: DiscordGatewayHookEvent, + ): Promise { + const observedThreads = this.#gatewayObservedThreads(); + const seenAt = event.createdAt || this.#now().toISOString(); + let observed = observedThreads.find((thread) => + thread.threadId === event.sessionId + ); + if (!observed) { + observed = { + threadId: event.sessionId, + title: observedThreadTitle(event), + status: observedStatusForHookEvent(event), + firstSeenAt: seenAt, + lastSeenAt: seenAt, + updatedAt: seenAt, + }; + observedThreads.push(observed); + } + + const cwd = event.cwd ?? observed.cwd; + observed.status = observedStatusForHookEvent(event); + observed.cwd = cwd; + observed.workspaceKey = cwd + ? workspaceKey(workspaceCwdForPath(cwd, this.config.cwd)) + : observed.workspaceKey; + observed.model = event.model ?? observed.model; + observed.transcriptPath = event.transcriptPath ?? observed.transcriptPath; + observed.lastTurnId = event.turnId ?? observed.lastTurnId; + observed.lastHookEventName = event.eventName; + observed.source = event.source ?? observed.source; + observed.promptPreview = event.promptPreview ?? observed.promptPreview; + observed.assistantPreview = event.lastAssistantMessage + ? previewText(event.lastAssistantMessage) + : observed.assistantPreview; + observed.toolName = event.toolName ?? observed.toolName; + observed.toolUseId = event.toolUseId ?? observed.toolUseId; + observed.toolInputPreview = event.toolInputPreview ?? observed.toolInputPreview; + observed.toolResponsePreview = event.toolResponsePreview ?? + observed.toolResponsePreview; + observed.permissionDescription = event.permissionDescription ?? + observed.permissionDescription; + observed.title = observedThreadTitle(event, observed); + observed.lastSeenAt = seenAt; + observed.updatedAt = seenAt; + + const config = this.#gatewayWorkbenchConfig(); + if (config && cwd) { + const workspace = await this.#ensureWorkspaceSurfaceForCwd( + workspaceCwdForPath(cwd, this.config.cwd), + config, + ); + await this.#updateWorkspaceSurface(workspace); + } + } + async #applyDelegationReturnPolicy( delegation: DiscordGatewayDelegation, ): Promise { @@ -1314,7 +2333,16 @@ export class DiscordCodexBridge { if (!homeChannelId || delegation.mirroredAt) { return; } - await this.transport.sendMessage(homeChannelId, delegationResultText(delegation)); + await this.#syncDelegationWorkbench(delegation, { includeTaskResult: true }); + const hasWorkbenchLinks = Boolean( + delegation.discordWorkspaceThreadId || delegation.discordTaskThreadId, + ); + await this.transport.sendMessage( + homeChannelId, + this.#gatewayWorkbenchConfig() && hasWorkbenchLinks + ? compactDelegationResultText(delegation) + : delegationResultText(delegation), + ); delegation.mirroredAt = this.#now().toISOString(); delegation.updatedAt = delegation.mirroredAt; } @@ -1457,6 +2485,14 @@ export class DiscordCodexBridge { ); } + #observedThreadForThread( + threadId: string, + ): DiscordGatewayObservedThread | undefined { + return this.#gatewayObservedThreads().find((thread) => + thread.threadId === threadId + ); + } + #isSessionRunning( session: DiscordBridgeSession, state: DiscordBridgeState, @@ -1490,20 +2526,48 @@ export class DiscordCodexBridge { } #isAllowedChannel(channelId: string): boolean { + const workbench = this.#gatewayWorkbenchConfig(); + if ( + channelId === this.config.gateway?.homeChannelId || + channelId === workbench?.workspaceForumChannelId || + channelId === workbench?.taskThreadsChannelId + ) { + return true; + } if (this.config.allowedChannelIds.size === 0) { return true; } if (this.config.allowedChannelIds.has(channelId)) { return true; } + if ( + this.#requireState().gateway?.workspaces?.some((workspace) => + workspace.discordThreadId === channelId + ) + ) { + return true; + } const session = this.#requireState().sessions.find( (candidate) => candidate.discordThreadId === channelId, ); return Boolean( - session && this.config.allowedChannelIds.has(session.parentChannelId), + session && + (this.config.allowedChannelIds.has(session.parentChannelId) || + session.parentChannelId === workbench?.taskThreadsChannelId || + session.parentChannelId === workbench?.workspaceForumChannelId), ); } + #commandRegistrationChannelIds(): string[] { + const gateway = this.config.gateway; + return uniqueStringList([ + ...this.config.allowedChannelIds, + gateway?.homeChannelId ?? "", + gateway?.workspaceForumChannelId ?? "", + gateway?.taskThreadsChannelId ?? "", + ]); + } + #isAllowedInboundChannel( inbound: DiscordMessageInbound | DiscordThreadStartInbound, ): boolean { @@ -1554,6 +2618,21 @@ export class DiscordCodexBridge { } } + async #pinMessage(channelId: string, messageId: string): Promise { + if (!this.transport.pinMessage) { + return; + } + try { + await this.transport.pinMessage(channelId, messageId); + } catch (error) { + this.#debug("discord.message.pinFailed", { + channelId, + messageId, + error: errorMessage(error), + }); + } + } + async #deleteSourceMessage(session: DiscordBridgeSession): Promise { if (!session.sourceMessageId) { return; @@ -2076,6 +3155,233 @@ function delegationResultText(delegation: DiscordGatewayDelegation): string { ].filter((line): line is string => line !== undefined).join("\n"); } +function delegationTaskResultText(delegation: DiscordGatewayDelegation): string { + return [ + "**Delegation Result**", + `Delegation: ${delegation.title}`, + `Codex thread: \`${delegation.codexThreadId}\``, + delegation.groupId ? `Group: \`${delegation.groupId}\`` : undefined, + `Status: \`${delegation.lastStatus ?? delegation.status}\``, + delegation.lastTurnId ? `Turn: \`${delegation.lastTurnId}\`` : undefined, + "", + delegation.lastFinal ?? "(no final assistant message captured)", + ].filter((line): line is string => line !== undefined).join("\n"); +} + +function compactDelegationResultText(delegation: DiscordGatewayDelegation): string { + const links = [ + delegation.discordWorkspaceThreadId + ? `workspace <#${delegation.discordWorkspaceThreadId}>` + : undefined, + delegation.discordTaskThreadId + ? `task <#${delegation.discordTaskThreadId}>` + : undefined, + ].filter((link): link is string => link !== undefined).join(", "); + return [ + "[discord-gateway delegation result]", + `${delegation.title}: ${delegation.lastStatus ?? delegation.status}`, + delegation.groupId ? `Group: ${delegation.groupId}` : undefined, + links ? `Links: ${links}` : undefined, + delegation.lastTurnId ? `Turn: ${delegation.lastTurnId}` : undefined, + ].filter((line): line is string => line !== undefined).join("\n"); +} + +function workspaceDashboardText( + workspace: DiscordGatewayWorkspaceSurface, + options: { + delegations?: DiscordGatewayDelegation[]; + threads?: WorkspaceThreadSummary[]; + } = {}, +): string { + const delegations = options.delegations ?? []; + const threads = options.threads ?? []; + const visibleThreads = threads.slice(0, 25); + return [ + `**Workspace: ${workspace.title}**`, + `Dir: \`${workspace.cwd}\``, + `Open Discord threads: ${threads.length}`, + `Tracked delegations: ${delegations.length}`, + "", + "**Open Threads**", + visibleThreads.length > 0 + ? visibleThreads.map(workspaceThreadLine).join("\n") + : "None", + threads.length > visibleThreads.length + ? `Showing newest ${visibleThreads.length} of ${threads.length} threads.` + : undefined, + "", + "Run `/threads` here to browse or resume workspace Codex threads.", + ].filter((line): line is string => line !== undefined).join("\n"); +} + +function workspaceThreadLine( + thread: WorkspaceThreadSummary, + index: number, +): string { + const link = thread.discordThreadId ? `<#${thread.discordThreadId}>` : "`not opened`"; + const title = truncateDiscordThreadName(thread.title); + return `${index + 1}. ${link} ${title} (${thread.status})`; +} + +function activeThreadStatusLines( + threads: WorkspaceThreadSummary[], + openableThreads: WorkspaceThreadSummary[], +): string[] { + const createIndexById = new Map( + openableThreads.map((thread, index) => [thread.id, index]), + ); + return threads.map((thread) => { + const createIndex = createIndexById.get(thread.id); + const marker = createIndex === undefined + ? "-" + : threadPickerReactions[createIndex] ?? `${createIndex + 1}.`; + const link = thread.discordThreadId ? `<#${thread.discordThreadId}>` : "`not opened`"; + const title = truncateDiscordThreadName(thread.title); + return `${marker} ${link} ${title} (${thread.status})`; + }); +} + +function threadPickerText( + workspace: DiscordGatewayWorkspaceSurface, + threads: WorkspaceThreadSummary[], + total: number, + options: { action?: string } = {}, +): string { + return [ + `**Threads: ${workspace.title}**`, + `Dir: \`${workspace.cwd}\``, + "", + ...threads.map((thread, index) => { + const link = thread.discordThreadId + ? `<#${thread.discordThreadId}>` + : "`not opened`"; + const title = truncateDiscordThreadName(thread.title); + return `${threadPickerReactions[index]} ${link} ${title} (${thread.status})`; + }), + total > threads.length ? `Showing newest ${threads.length} of ${total}.` : undefined, + "", + options.action ?? "Choose a number to open or resume that thread in Discord.", + ].filter((line): line is string => line !== undefined).join("\n"); +} + +function threadPickerKey(channelId: string, messageId: string): string { + return `${channelId}:${messageId}`; +} + +function threadPickerReactionIndex(emoji: string): number | undefined { + const index = threadPickerReactions.indexOf(emoji); + return index >= 0 ? index : undefined; +} + +async function updateOrReply( + interaction: Pick, + text: string, +): Promise { + if (interaction.update) { + await interaction.update(text); + return; + } + await interaction.reply?.(text); +} + +function threadFromResponse(response: v2.ThreadResumeResponse): v2.Thread | undefined { + const thread = (response as { thread?: unknown }).thread; + return thread && typeof thread === "object" && "id" in thread + ? thread as v2.Thread + : undefined; +} + +function codexThreadTitle(thread: v2.Thread): string { + return thread.name?.trim() || + firstLine(thread.preview)?.trim() || + `Codex ${compactId(thread.id)}`; +} + +function threadStatusText(status: v2.ThreadStatus): string { + return status.type === "active" ? "active" : status.type; +} + +function observedThreadStatusText(thread: DiscordGatewayObservedThread): string { + if (thread.status === "waiting" && thread.permissionDescription) { + return `waiting: ${thread.permissionDescription}`; + } + if (thread.status === "tool" && thread.toolName) { + return `tool: ${thread.toolName}`; + } + return thread.status; +} + +function observedStatusForHookEvent( + event: DiscordGatewayHookEvent, +): DiscordGatewayObservedThread["status"] { + if (event.eventName === "SessionStart") { + return "starting"; + } + if (event.eventName === "UserPromptSubmit") { + return "active"; + } + if (event.eventName === "PermissionRequest") { + return "waiting"; + } + if (event.eventName === "PreToolUse" || event.eventName === "PostToolUse") { + return "tool"; + } + return "idle"; +} + +function observedThreadTitle( + event: DiscordGatewayHookEvent, + existing?: DiscordGatewayObservedThread, +): string { + return firstLine(event.promptPreview)?.trim() || + firstLine(event.lastAssistantMessage)?.trim() || + existing?.title || + `Codex ${compactId(event.sessionId)}`; +} + +function previewText(value: string, maxLength = 500): string { + return value.length <= maxLength ? value : `${value.slice(0, maxLength - 3)}...`; +} + +function normalizeWorkspaceCwd(cwd: string | undefined): string { + return path.resolve(cwd ?? process.cwd()); +} + +function workspaceCwdForPath(cwd: string | undefined, root: string | undefined): string { + const normalizedRoot = normalizeWorkspaceCwd(root); + const normalizedCwd = normalizeWorkspaceCwd(cwd ?? normalizedRoot); + const relative = path.relative(normalizedRoot, normalizedCwd); + if (!relative) { + return normalizedRoot; + } + if ( + relative === ".." || + relative.startsWith(`..${path.sep}`) || + path.isAbsolute(relative) + ) { + return normalizedCwd; + } + const [workspaceName] = relative.split(path.sep).filter(Boolean); + return workspaceName ? path.join(normalizedRoot, workspaceName) : normalizedRoot; +} + +function workspaceKey(cwd: string): string { + return `workspace-${createHash("sha256").update(cwd).digest("hex").slice(0, 12)}`; +} + +function workspaceTitle(cwd: string): string { + const base = path.basename(cwd); + return base && base !== path.sep ? base : cwd; +} + +function uniqueStringList(values: string[]): string[] { + return [...new Set(values.filter(Boolean))]; +} + +function isDiscoverableWorkspaceEntry(name: string): boolean { + return Boolean(name) && !name.startsWith(".") && name !== "node_modules"; +} + function wakePrompt( wake: DiscordGatewayPendingWake, delegations: DiscordGatewayDelegation[], @@ -2115,13 +3421,6 @@ function wakeId( ).digest("hex").slice(0, 12)}`; } -function parseGatewayCommand(content: string): "status" | undefined { - const normalized = content.trim().toLowerCase(); - return normalized === "status" || normalized === "/status" - ? "status" - : undefined; -} - function record(value: unknown): Record { return typeof value === "object" && value !== null && !Array.isArray(value) ? (value as Record) diff --git a/apps/discord-bridge/src/config.ts b/apps/discord-bridge/src/config.ts index b3c59d7..ec2713b 100644 --- a/apps/discord-bridge/src/config.ts +++ b/apps/discord-bridge/src/config.ts @@ -290,15 +290,46 @@ function gatewayConfig( stringFlag(flags, "gateway-main-thread-id") ?? env.CODEX_DISCORD_MAIN_THREAD_ID ?? env.CODEX_DISCORD_GATEWAY_MAIN_THREAD_ID; + const workspaceForumChannelId = + stringFlag(flags, "workspace-forum-channel-id") ?? + stringFlag(flags, "gateway-workspace-forum-channel-id") ?? + env.CODEX_DISCORD_WORKSPACE_FORUM_CHANNEL_ID ?? + env.CODEX_DISCORD_GATEWAY_WORKSPACE_FORUM_CHANNEL_ID; + const taskThreadsChannelId = + stringFlag(flags, "task-threads-channel-id") ?? + stringFlag(flags, "gateway-task-threads-channel-id") ?? + env.CODEX_DISCORD_TASK_THREADS_CHANNEL_ID ?? + env.CODEX_DISCORD_GATEWAY_TASK_THREADS_CHANNEL_ID; if (!homeChannelId) { if (mainThreadId) { throw new Error("Cannot set a gateway main thread without a gateway home channel."); } + if (workspaceForumChannelId || taskThreadsChannelId) { + throw new Error("Cannot set Discord workbench channels without a gateway home channel."); + } return undefined; } + if (Boolean(workspaceForumChannelId) !== Boolean(taskThreadsChannelId)) { + throw new Error( + "Discord workbench requires both workspace forum and task threads channels.", + ); + } + if ( + workspaceForumChannelId && + taskThreadsChannelId && + (workspaceForumChannelId === homeChannelId || + taskThreadsChannelId === homeChannelId || + workspaceForumChannelId === taskThreadsChannelId) + ) { + throw new Error( + "Discord workbench channels must be separate from the gateway home channel and each other.", + ); + } return { homeChannelId, mainThreadId, + workspaceForumChannelId, + taskThreadsChannelId, }; } @@ -369,8 +400,11 @@ Options: --allowed-channel-ids Comma-separated parent channel ids --home-channel-id Enable gateway mode for one Discord home channel --main-thread-id Resume an existing Codex operator thread for gateway mode + --workspace-forum-channel-id + Optional workbench forum channel for workspace posts + --task-threads-channel-id Optional workbench text channel for task threads --flow-backend-url Optional codex-flow-systemd-local backend URL - --hook-spool-dir Directory drained for Codex Stop hook events + --hook-spool-dir Directory drained for Codex hook events [dir] Optional Codex thread directory, resolved from home --dir Codex thread directory, resolved from home --cwd Alias for --dir diff --git a/apps/discord-bridge/src/discord-transport.ts b/apps/discord-bridge/src/discord-transport.ts index 9fbe0d7..f1c262b 100644 --- a/apps/discord-bridge/src/discord-transport.ts +++ b/apps/discord-bridge/src/discord-transport.ts @@ -1,9 +1,18 @@ import { + ActionRowBuilder, + ButtonBuilder, + ButtonStyle, Client, Events, GatewayIntentBits, + Partials, type Interaction, + type ApplicationCommandDataResolvable, type Message, + type MessageReaction, + type PartialMessageReaction, + type PartialUser, + type User, } from "discord.js"; import { splitDiscordMessage } from "./bridge.ts"; @@ -12,6 +21,8 @@ import { type DiscordBridgeLogger, } from "./logger.ts"; import type { + DiscordBridgeCommandRegistration, + DiscordEphemeralPicker, DiscordBridgeTransport, DiscordBridgeTransportHandlers, } from "./types.ts"; @@ -21,6 +32,8 @@ export type DiscordJsBridgeTransportOptions = { logger?: DiscordBridgeLogger; }; +const threadPickerCustomIdPrefix = "codex_threads"; + export class DiscordJsBridgeTransport implements DiscordBridgeTransport { #token: string; #logger: DiscordBridgeLogger; @@ -41,9 +54,11 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { intents: [ GatewayIntentBits.Guilds, GatewayIntentBits.GuildMessages, + GatewayIntentBits.GuildMessageReactions, GatewayIntentBits.DirectMessages, GatewayIntentBits.MessageContent, ], + partials: [Partials.Message, Partials.Channel, Partials.Reaction], }); this.#client = client; client.once(Events.ClientReady, (readyClient) => { @@ -53,6 +68,13 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { }); }); client.on(Events.MessageCreate, (message) => this.#handleMessage(message)); + client.on(Events.MessageReactionAdd, (reaction, user) => + void this.#handleReaction(reaction, user).catch((error) => { + this.#logger.error("discord.reaction.failed", { + error: errorMessage(error), + }); + }) + ); client.on(Events.InteractionCreate, (interaction) => void this.#handleInteraction(interaction).catch((error) => { this.#logger.error("discord.interaction.failed", { @@ -68,29 +90,35 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { this.#client = undefined; } - async registerCommands(): Promise { - const application = this.#client?.application; - if (!application) { + async registerCommands( + options: DiscordBridgeCommandRegistration = {}, + ): Promise { + const client = await this.#readyClient(); + const commands = discordBridgeCommands(); + const commandNames = commands.map(commandName); + const guildIds = await this.#guildIdsForCommandChannels(options.channelIds ?? []); + if (guildIds.length === 0) { + await client.application.commands.set(commands); + this.#logger.info("discord.commands.registered", { + scope: "global", + commands: commandNames, + }); return; } - await application.commands.set([ - { - name: "clear", - description: "Delete inactive Codex bridge threads", - }, - { - name: "clear-webhooks", - description: "Delete webhook-authored messages in this channel", - options: [ - { - name: "webhook_url", - description: "Optional webhook URL to target a single webhook", - type: 3, - required: false, - }, - ], - }, - ]); + await client.application.commands.set([]); + this.#logger.info("discord.commands.registered", { + scope: "global-cleared", + commands: [], + }); + for (const guildId of guildIds) { + const guild = await client.guilds.fetch(guildId); + await guild.commands.set(commands); + this.#logger.info("discord.commands.registered", { + scope: "guild", + guildId, + commands: commandNames, + }); + } } async createThread( @@ -130,10 +158,48 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { return thread.id; } + async createForumPost( + channelId: string, + name: string, + message: string, + ): Promise<{ threadId: string; messageId?: string }> { + const client = this.#client; + if (!client) { + throw new Error("Discord bridge is not connected"); + } + const channel = await client.channels.fetch(channelId); + if (!channel || typeof channel !== "object") { + throw new Error(`Discord channel cannot create forum posts: ${channelId}`); + } + const threads = getThreadsManager(channel as ThreadCreatableChannel); + if (!threads) { + throw new Error(`Discord channel cannot create forum posts: ${channelId}`); + } + const thread = await threads.create({ + name, + autoArchiveDuration: 10080, + message: { + content: splitDiscordMessage(message)[0] ?? "", + allowedMentions: { + parse: [], + users: [], + roles: [], + repliedUser: false, + }, + }, + reason: "Codex Discord bridge workspace post", + }); + if (!thread.id) { + throw new Error("Discord did not return a forum post thread id"); + } + return { threadId: thread.id, messageId: thread.id }; + } + async sendMessage(channelId: string, text: string): Promise { const channel = await this.#sendableChannel(channelId); const messageIds: string[] = []; - for (const chunk of splitDiscordMessage(text)) { + const chunks = splitDiscordMessage(text); + for (const chunk of chunks) { const sent = await channel.send({ content: chunk, allowedMentions: { @@ -253,6 +319,25 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { } } + async addReactions( + channelId: string, + messageId: string, + reactions: string[], + ): Promise { + const channel = await this.#sendableChannel(channelId); + const messages = getMessagesManager(channel); + if (!messages) { + throw new Error(`Discord channel cannot fetch messages: ${channelId}`); + } + const message = await messages.fetch(messageId); + if (!message.react) { + throw new Error(`Discord message cannot receive reactions: ${messageId}`); + } + for (const reaction of reactions) { + await message.react(reaction); + } + } + async pinMessage(channelId: string, messageId: string): Promise { const channel = await this.#sendableChannel(channelId); const messages = getMessagesManager(channel); @@ -323,28 +408,106 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { }); } + async #handleReaction( + reaction: MessageReaction | PartialMessageReaction, + user: User | PartialUser, + ): Promise { + if (user.bot) { + return; + } + const fullReaction = reaction.partial ? await reaction.fetch() : reaction; + const message = fullReaction.message.partial + ? await fullReaction.message.fetch() + : fullReaction.message; + const emoji = fullReaction.emoji.name ?? fullReaction.emoji.id; + if (!emoji) { + return; + } + this.#handlers?.onInbound({ + kind: "reaction", + channelId: message.channelId, + guildId: message.guildId ?? undefined, + messageId: message.id, + emoji, + author: { + id: user.id, + name: user.globalName ?? user.username ?? user.id, + isBot: user.bot, + }, + createdAt: new Date().toISOString(), + }); + } + async #handleInteraction(interaction: Interaction): Promise { + if (interaction.isButton()) { + const selection = threadPickerSelectionFromCustomId(interaction.customId); + if (!selection) { + return; + } + await interaction.deferUpdate(); + const reply = async (text: string) => { + await interaction.followUp({ + content: text, + ephemeral: true, + allowedMentions: emptyAllowedMentions(), + }); + }; + const update = async (text: string) => { + await interaction.editReply({ + content: text, + components: [], + allowedMentions: emptyAllowedMentions(), + }); + }; + this.#handlers?.onInbound({ + kind: "threadPicker", + channelId: interaction.channelId, + guildId: interaction.guildId ?? undefined, + pickerId: selection.pickerId, + optionId: selection.optionId, + author: { + id: interaction.user.id, + name: interaction.member && "displayName" in interaction.member + ? String(interaction.member.displayName) + : interaction.user.globalName || interaction.user.username, + isBot: interaction.user.bot, + }, + createdAt: new Date().toISOString(), + reply, + update, + }); + return; + } if (!interaction.isChatInputCommand()) { return; } if ( interaction.commandName !== "clear" && - interaction.commandName !== "clear-webhooks" + interaction.commandName !== "clear-webhooks" && + interaction.commandName !== "status" && + interaction.commandName !== "threads" ) { return; } const channelId = interaction.channelId; + if (interaction.commandName === "status" || interaction.commandName === "threads") { + await interaction.deferReply({ ephemeral: true }); + } const reply = async (text: string) => { - await interaction.reply({ + const payload = { content: text, - ephemeral: true, allowedMentions: { parse: [], users: [], roles: [], repliedUser: false, }, - }); + }; + if (interaction.deferred || interaction.replied) { + await interaction.editReply(payload); + return; + } + await interaction.reply({ ...payload, ephemeral: true }); }; if (interaction.commandName === "clear-webhooks") { const webhookUrl = interaction.options.getString("webhook_url") ?? undefined; @@ -365,6 +528,66 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { }); return; } + if (interaction.commandName === "status") { + const replyPicker = async (picker: DiscordEphemeralPicker) => { + const payload = { + content: picker.text, + components: threadPickerComponents(picker), + allowedMentions: emptyAllowedMentions(), + }; + if (interaction.deferred || interaction.replied) { + await interaction.editReply(payload); + return; + } + await interaction.reply({ ...payload, ephemeral: true }); + }; + this.#handlers?.onInbound({ + kind: "status", + channelId, + guildId: interaction.guildId ?? undefined, + author: { + id: interaction.user.id, + name: interaction.member && "displayName" in interaction.member + ? String(interaction.member.displayName) + : interaction.user.globalName || interaction.user.username, + isBot: interaction.user.bot, + }, + createdAt: new Date().toISOString(), + reply, + replyPicker, + }); + return; + } + if (interaction.commandName === "threads") { + const replyPicker = async (picker: DiscordEphemeralPicker) => { + const payload = { + content: picker.text, + components: threadPickerComponents(picker), + allowedMentions: emptyAllowedMentions(), + }; + if (interaction.deferred || interaction.replied) { + await interaction.editReply(payload); + return; + } + await interaction.reply({ ...payload, ephemeral: true }); + }; + this.#handlers?.onInbound({ + kind: "threads", + channelId, + guildId: interaction.guildId ?? undefined, + author: { + id: interaction.user.id, + name: interaction.member && "displayName" in interaction.member + ? String(interaction.member.displayName) + : interaction.user.globalName || interaction.user.username, + isBot: interaction.user.bot, + }, + createdAt: new Date().toISOString(), + reply, + replyPicker, + }); + return; + } this.#handlers?.onInbound({ kind: "clear", channelId, @@ -392,21 +615,163 @@ export class DiscordJsBridgeTransport implements DiscordBridgeTransport { } return channel as unknown as SendableChannel; } + + async #readyClient(): Promise> { + const client = this.#client; + if (!client) { + throw new Error("Discord bridge is not connected"); + } + if (client.isReady()) { + return client; + } + return await new Promise>((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("Timed out waiting for Discord client ready event")); + }, 15_000); + client.once(Events.ClientReady, (readyClient) => { + clearTimeout(timeout); + resolve(readyClient); + }); + }); + } + + async #guildIdsForCommandChannels(channelIds: string[]): Promise { + const client = await this.#readyClient(); + const guildIds = new Set(); + for (const channelId of channelIds) { + try { + const channel = await client.channels.fetch(channelId); + const guildId = guildIdFromChannel(channel); + if (guildId) { + guildIds.add(guildId); + } + } catch (error) { + this.#logger.debug("discord.commands.channelFetchFailed", { + channelId, + error: errorMessage(error), + }); + } + } + return [...guildIds]; + } +} + +function discordBridgeCommands(): ApplicationCommandDataResolvable[] { + return [ + { + name: "clear", + description: "Delete inactive Codex bridge threads", + }, + { + name: "clear-webhooks", + description: "Delete webhook-authored messages in this channel", + options: [ + { + name: "webhook_url", + description: "Optional webhook URL to target a single webhook", + type: 3, + required: false, + }, + ], + }, + { + name: "status", + description: "Show Codex gateway status", + }, + { + name: "threads", + description: "List Codex threads for this workspace", + }, + ]; +} + +function commandName(command: ApplicationCommandDataResolvable): string { + return typeof command === "object" && command !== null && "name" in command + ? String(command.name) + : "unknown"; +} + +function threadPickerComponents( + picker: DiscordEphemeralPicker, +): ActionRowBuilder[] { + const rows: ActionRowBuilder[] = []; + for (let index = 0; index < picker.options.length; index += 5) { + const row = new ActionRowBuilder(); + for (const option of picker.options.slice(index, index + 5)) { + row.addComponents( + new ButtonBuilder() + .setCustomId(threadPickerCustomId(picker.pickerId, option.id)) + .setLabel(option.label) + .setStyle(ButtonStyle.Secondary), + ); + } + rows.push(row); + } + return rows; +} + +function threadPickerCustomId(pickerId: string, optionId: string): string { + return `${threadPickerCustomIdPrefix}:${pickerId}:${optionId}`; +} + +function threadPickerSelectionFromCustomId( + customId: string, +): { pickerId: string; optionId: string } | undefined { + const prefix = `${threadPickerCustomIdPrefix}:`; + if (!customId.startsWith(prefix)) { + return undefined; + } + const rest = customId.slice(prefix.length); + const [pickerId, optionId, ...extra] = rest.split(":"); + if (!pickerId || !optionId || extra.length > 0) { + return undefined; + } + return { pickerId, optionId }; +} + +function emptyAllowedMentions(): Record { + return { + parse: [], + users: [], + roles: [], + repliedUser: false, + }; +} + +function guildIdFromChannel(channel: unknown): string | undefined { + if (!channel || typeof channel !== "object") { + return undefined; + } + const candidate = channel as { + guildId?: unknown; + guild?: { id?: unknown }; + }; + if (typeof candidate.guildId === "string") { + return candidate.guildId; + } + if (typeof candidate.guild?.id === "string") { + return candidate.guild.id; + } + return undefined; } type ThreadCreateOptions = { name: string; autoArchiveDuration?: number; + message?: Record; reason?: string; }; -type SendableChannel = { +type ThreadCreatableChannel = { id: string; - send(options: Record): Promise<{ id?: string }>; - sendTyping?: () => Promise; threads?: { create(options: ThreadCreateOptions): Promise<{ id?: string }>; }; +}; + +type SendableChannel = ThreadCreatableChannel & { + send(options: Record): Promise<{ id?: string }>; + sendTyping?: () => Promise; members?: { add(userId: string): Promise; }; @@ -426,12 +791,13 @@ type DiscordFetchedMessage = { edit(options: Record): Promise; pinned?: boolean; pin?(): Promise; + react?(reaction: string): Promise; startThread?(options: ThreadCreateOptions): Promise<{ id?: string }>; }; function getThreadsManager( - channel: SendableChannel, -): SendableChannel["threads"] | undefined { + channel: ThreadCreatableChannel, +): ThreadCreatableChannel["threads"] | undefined { return channel.threads; } diff --git a/apps/discord-bridge/src/hook-cli.ts b/apps/discord-bridge/src/hook-cli.ts index 891a510..63744fd 100644 --- a/apps/discord-bridge/src/hook-cli.ts +++ b/apps/discord-bridge/src/hook-cli.ts @@ -2,11 +2,18 @@ import { mkdir, readFile, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { writeStopHookSpoolEvent } from "./stop-hook-spool.ts"; +import { writeHookSpoolEvent } from "./stop-hook-spool.ts"; -const defaultHookCommand = "codex-discord-bridge hook stop"; +const defaultHookCommand = "codex-discord-bridge hook event"; const defaultBunxPackage = "codex-discord-bridge"; -const hookStatusMessage = "Recording Discord gateway Stop event"; +const gatewayHookEvents = [ + "SessionStart", + "UserPromptSubmit", + "PreToolUse", + "PermissionRequest", + "PostToolUse", + "Stop", +] as const; export type HookInstallOptions = { command?: string; @@ -29,8 +36,8 @@ export async function handleHookCommand(argv: string[]): Promise { return false; } const subcommand = argv[1] ?? "help"; - if (subcommand === "stop") { - await runStopHook(); + if (subcommand === "event" || subcommand === "stop") { + await runHookEvent(); return true; } if (subcommand === "install") { @@ -45,17 +52,25 @@ export async function handleHookCommand(argv: string[]): Promise { throw new Error(`Unknown hook subcommand: ${subcommand}`); } -export async function runStopHook(): Promise { +export async function runHookEvent(): Promise { + let input = ""; try { - const input = await new Response(Bun.stdin.stream()).text(); - await writeStopHookSpoolEvent(JSON.parse(input)); - process.stdout.write(`${JSON.stringify({ continue: true })}\n`); + input = await new Response(Bun.stdin.stream()).text(); + const parsed = JSON.parse(input); + const event = await writeHookSpoolEvent(parsed); + if (eventSupportsContinueOutput(event.eventName)) { + process.stdout.write(`${JSON.stringify({ continue: true })}\n`); + } } catch (error) { - process.stderr.write(`discord gateway stop hook failed: ${errorMessage(error)}\n`); - process.stdout.write(`${JSON.stringify({ continue: true })}\n`); + process.stderr.write(`discord gateway hook failed: ${errorMessage(error)}\n`); + if (eventSupportsContinueOutput(eventNameFromHookInput(input))) { + process.stdout.write(`${JSON.stringify({ continue: true })}\n`); + } } } +export const runStopHook = runHookEvent; + export async function installStopHook( options: HookInstallOptions = {}, ): Promise { @@ -110,13 +125,15 @@ export function upsertStopHookConfig( ): Record { const config = parseHooksJson(hooksText); const hooks = record(config.hooks); - const stopGroups = Array.isArray(hooks.Stop) ? hooks.Stop : []; - hooks.Stop = [ - stopHookGroup(command), - ...stopGroups - .map(removeGatewayStopHookHandlers) - .filter((group): group is Record => group !== undefined), - ]; + for (const eventName of gatewayHookEvents) { + const groups = Array.isArray(hooks[eventName]) ? hooks[eventName] : []; + hooks[eventName] = [ + hookGroup(command), + ...groups + .map(removeGatewayStopHookHandlers) + .filter((group): group is Record => group !== undefined), + ]; + } config.hooks = hooks; return config; } @@ -188,14 +205,13 @@ function hookCommand(options: HookInstallOptions): string { return defaultHookCommand; } -function stopHookGroup(command: string): Record { +function hookGroup(command: string): Record { return { hooks: [ { type: "command", command, timeout: 10, - statusMessage: hookStatusMessage, }, ], }; @@ -216,10 +232,30 @@ function isGatewayStopHookHandler(input: unknown): boolean { const handler = record(input); const command = typeof handler.command === "string" ? handler.command : ""; return command.includes("codex-discord-bridge hook stop") || + command.includes("codex-discord-bridge hook event") || command.includes("codex-discord-gateway-stop-hook") || command.includes("apps/discord-bridge/src/stop-hook.ts"); } +function eventSupportsContinueOutput(eventName: string): boolean { + return eventName === "SessionStart" || + eventName === "UserPromptSubmit" || + eventName === "Stop"; +} + +function eventNameFromHookInput(input: string): string { + try { + const parsed = record(JSON.parse(input)); + return typeof parsed.hook_event_name === "string" + ? parsed.hook_event_name + : typeof parsed.eventName === "string" + ? parsed.eventName + : ""; + } catch { + return ""; + } +} + async function readTextIfExists(filePath: string): Promise { try { return await readFile(filePath, "utf8"); @@ -264,14 +300,14 @@ function requiredNext(argv: string[], index: number, flag: string): string { } function hookHelpText(): string { - return `codex-discord-bridge hook manages the global Codex Stop hook. + return `codex-discord-bridge hook manages the global Codex observability hooks. Usage: codex-discord-bridge hook install [options] - codex-discord-bridge hook stop + codex-discord-bridge hook event Options: - --command Hook command to write. Defaults to "codex-discord-bridge hook stop". + --command Hook command to write. Defaults to "codex-discord-bridge hook event". --bunx Write a bunx command instead of the global binary command. --bunx-package Package for bunx --package. Defaults to codex-discord-bridge. --config-path Codex config.toml path. diff --git a/apps/discord-bridge/src/state.ts b/apps/discord-bridge/src/state.ts index 8082adf..dafc212 100644 --- a/apps/discord-bridge/src/state.ts +++ b/apps/discord-bridge/src/state.ts @@ -10,12 +10,17 @@ import type { DiscordBridgeState, DiscordBridgeStateStore, DiscordGatewayDelegation, + DiscordGatewayHookEventName, + DiscordGatewayObservedThread, + DiscordGatewayWorkspaceSurface, DiscordGatewayState, } from "./types.ts"; const maxProcessedMessageIds = 1000; const maxDeliveries = 500; const maxProcessedStopHookEventIds = 2000; +const maxProcessedHookEventIds = 5000; +const maxObservedThreads = 1000; export class JsonFileStateStore implements DiscordBridgeStateStore { readonly path: string; @@ -81,6 +86,17 @@ export function trimState(state: DiscordBridgeState): void { -maxProcessedStopHookEventIds, ); } + if (state.gateway?.processedHookEventIds) { + state.gateway.processedHookEventIds = + state.gateway.processedHookEventIds.slice(-maxProcessedHookEventIds); + } + if (state.gateway?.observedThreads) { + state.gateway.observedThreads = [...state.gateway.observedThreads] + .sort((left, right) => + Date.parse(right.lastSeenAt) - Date.parse(left.lastSeenAt) + ) + .slice(0, maxObservedThreads); + } } function parseState(value: unknown): DiscordBridgeState { @@ -124,9 +140,23 @@ function parseGateway(value: unknown): DiscordGatewayState | undefined { delegations: Array.isArray(value.delegations) ? value.delegations.map(parseGatewayDelegation) : [], + workspaces: Array.isArray(value.workspaces) + ? value.workspaces.map(parseGatewayWorkspace) + : [], + observedThreads: Array.isArray(value.observedThreads) + ? value.observedThreads.map(parseGatewayObservedThread) + : [], pendingWakes: Array.isArray(value.pendingWakes) ? value.pendingWakes.map(parseGatewayPendingWake) : [], + processedHookEventIds: uniqueStrings([ + ...(Array.isArray(value.processedHookEventIds) + ? value.processedHookEventIds + : []), + ...(Array.isArray(value.processedStopHookEventIds) + ? value.processedStopHookEventIds + : []), + ]), processedStopHookEventIds: Array.isArray(value.processedStopHookEventIds) ? uniqueStrings(value.processedStopHookEventIds) : [], @@ -156,9 +186,12 @@ function parseGatewayDelegation(value: unknown): DiscordGatewayDelegation { title: requiredString(value.title, "gateway.delegations.title"), status, cwd: optionalString(value.cwd), + workspaceKey: optionalString(value.workspaceKey), groupId: optionalString(value.groupId), returnMode: parseReturnMode(value.returnMode), discordDetailThreadId: optionalString(value.discordDetailThreadId), + discordTaskThreadId: optionalString(value.discordTaskThreadId), + discordWorkspaceThreadId: optionalString(value.discordWorkspaceThreadId), parentDiscordMessageId: optionalString(value.parentDiscordMessageId), lastTurnId: optionalString(value.lastTurnId), lastStatus: optionalString(value.lastStatus), @@ -166,12 +199,34 @@ function parseGatewayDelegation(value: unknown): DiscordGatewayDelegation { completedAt: optionalString(value.completedAt), injectedAt: optionalString(value.injectedAt), mirroredAt: optionalString(value.mirroredAt), + taskMirroredAt: optionalString(value.taskMirroredAt), reportedAt: optionalString(value.reportedAt), createdAt: requiredString(value.createdAt, "gateway.delegations.createdAt"), updatedAt: requiredString(value.updatedAt, "gateway.delegations.updatedAt"), }; } +function parseGatewayWorkspace(value: unknown): DiscordGatewayWorkspaceSurface { + if (!isRecord(value)) { + throw new Error("Invalid Discord bridge gateway workspace"); + } + return { + key: requiredString(value.key, "gateway.workspaces.key"), + cwd: requiredString(value.cwd, "gateway.workspaces.cwd"), + title: requiredString(value.title, "gateway.workspaces.title"), + discordThreadId: requiredString( + value.discordThreadId, + "gateway.workspaces.discordThreadId", + ), + statusMessageId: optionalString(value.statusMessageId), + delegationIds: Array.isArray(value.delegationIds) + ? uniqueStrings(value.delegationIds) + : [], + createdAt: requiredString(value.createdAt, "gateway.workspaces.createdAt"), + updatedAt: requiredString(value.updatedAt, "gateway.workspaces.updatedAt"), + }; +} + function parseGatewayPendingWake( value: unknown, ): NonNullable[number] { @@ -197,6 +252,57 @@ function parseGatewayPendingWake( }; } +function parseGatewayObservedThread(value: unknown): DiscordGatewayObservedThread { + if (!isRecord(value)) { + throw new Error("Invalid Discord bridge gateway observed thread"); + } + return { + threadId: requiredString(value.threadId, "gateway.observedThreads.threadId"), + title: optionalString(value.title), + status: parseObservedThreadStatus(value.status), + cwd: optionalString(value.cwd), + workspaceKey: optionalString(value.workspaceKey), + model: optionalString(value.model), + transcriptPath: optionalString(value.transcriptPath), + lastTurnId: optionalString(value.lastTurnId), + lastHookEventName: parseHookEventName(value.lastHookEventName), + source: optionalString(value.source), + promptPreview: optionalString(value.promptPreview), + assistantPreview: optionalString(value.assistantPreview), + toolName: optionalString(value.toolName), + toolUseId: optionalString(value.toolUseId), + toolInputPreview: optionalString(value.toolInputPreview), + toolResponsePreview: optionalString(value.toolResponsePreview), + permissionDescription: optionalString(value.permissionDescription), + firstSeenAt: requiredString(value.firstSeenAt, "gateway.observedThreads.firstSeenAt"), + lastSeenAt: requiredString(value.lastSeenAt, "gateway.observedThreads.lastSeenAt"), + updatedAt: requiredString(value.updatedAt, "gateway.observedThreads.updatedAt"), + }; +} + +function parseObservedThreadStatus( + value: unknown, +): DiscordGatewayObservedThread["status"] { + return value === "starting" || + value === "active" || + value === "tool" || + value === "waiting" || + value === "idle" + ? value + : "idle"; +} + +function parseHookEventName(value: unknown): DiscordGatewayHookEventName | undefined { + return value === "SessionStart" || + value === "UserPromptSubmit" || + value === "PreToolUse" || + value === "PermissionRequest" || + value === "PostToolUse" || + value === "Stop" + ? value + : undefined; +} + function parseReturnMode(value: unknown): DiscordGatewayDelegation["returnMode"] { return value === "detached" || value === "record_only" || @@ -318,7 +424,11 @@ function optionalNumber(value: unknown): number | undefined { } function parseSessionMode(value: unknown): DiscordBridgeSession["mode"] { - return value === "new" || value === "resumed" || value === "gateway" + return value === "new" || + value === "resumed" || + value === "gateway" || + value === "delegated" || + value === "workspace" ? value : undefined; } diff --git a/apps/discord-bridge/src/stop-hook-spool.ts b/apps/discord-bridge/src/stop-hook-spool.ts index f9839eb..779386b 100644 --- a/apps/discord-bridge/src/stop-hook-spool.ts +++ b/apps/discord-bridge/src/stop-hook-spool.ts @@ -10,21 +10,26 @@ import { import os from "node:os"; import path from "node:path"; -import type { DiscordGatewayStopHookEvent } from "./types.ts"; +import type { + DiscordGatewayHookEvent, + DiscordGatewayHookEventName, +} from "./types.ts"; -export type StopHookSpoolDisposition = "processed" | "ignored" | "failed"; +export type HookEventSpoolDisposition = "processed" | "ignored" | "failed"; +export type StopHookSpoolDisposition = HookEventSpoolDisposition; -export type PendingStopHookSpoolFile = +export type PendingHookEventSpoolFile = | { filePath: string; fileName: string; - event: DiscordGatewayStopHookEvent; + event: DiscordGatewayHookEvent; } | { filePath: string; fileName: string; error: Error; }; +export type PendingStopHookSpoolFile = PendingHookEventSpoolFile; export function defaultStopHookSpoolDir(): string { return path.join(os.homedir(), ".codex", "discord-bridge", "stop-hooks"); @@ -37,7 +42,7 @@ export function stopHookSpoolDirFromEnv( } export function stopHookSpoolPaths(spoolDir: string): Record< - "pending" | StopHookSpoolDisposition, + "pending" | HookEventSpoolDisposition, string > { const root = path.resolve(spoolDir); @@ -60,9 +65,19 @@ export async function writeStopHookSpoolEvent( spoolDir?: string; now?: () => Date; } = {}, -): Promise { +): Promise { + return await writeHookSpoolEvent(input, options); +} + +export async function writeHookSpoolEvent( + input: unknown, + options: { + spoolDir?: string; + now?: () => Date; + } = {}, +): Promise { const spoolDir = options.spoolDir ?? stopHookSpoolDirFromEnv(); - const event = stopHookEventFromInput(input, options.now ?? (() => new Date())); + const event = hookEventFromInput(input, options.now ?? (() => new Date())); const paths = stopHookSpoolPaths(spoolDir); await mkdir(paths.pending, { recursive: true }); const fileName = `${event.id}.json`; @@ -78,13 +93,13 @@ export async function writeStopHookSpoolEvent( export async function readPendingStopHookSpoolFiles( spoolDir: string, -): Promise { +): Promise { const paths = stopHookSpoolPaths(spoolDir); await ensureStopHookSpool(spoolDir); const fileNames = (await readdir(paths.pending)) .filter((fileName) => fileName.endsWith(".json")) .sort(); - const files: PendingStopHookSpoolFile[] = []; + const files: PendingHookEventSpoolFile[] = []; for (const fileName of fileNames) { const filePath = path.join(paths.pending, fileName); try { @@ -92,7 +107,7 @@ export async function readPendingStopHookSpoolFiles( files.push({ filePath, fileName, - event: parseStopHookSpoolEvent(parsed), + event: parseHookSpoolEvent(parsed), }); } catch (error) { files.push({ @@ -106,9 +121,9 @@ export async function readPendingStopHookSpoolFiles( } export async function archiveStopHookSpoolFile( - file: Pick, + file: Pick, spoolDir: string, - disposition: StopHookSpoolDisposition, + disposition: HookEventSpoolDisposition, ): Promise { const paths = stopHookSpoolPaths(spoolDir); await mkdir(paths[disposition], { recursive: true }); @@ -133,23 +148,29 @@ export async function removeStopHookSpool(spoolDir: string): Promise { await rm(path.resolve(spoolDir), { recursive: true, force: true }); } -function stopHookEventFromInput( +function hookEventFromInput( input: unknown, now: () => Date, -): DiscordGatewayStopHookEvent { +): DiscordGatewayHookEvent { const parsed = record(input); const eventName = stringValue(parsed.hook_event_name) ?? stringValue(parsed.eventName); - if (eventName && eventName !== "Stop") { + if (!isHookEventName(eventName)) { throw new Error(`Unsupported hook event: ${eventName}`); } const sessionId = stringValue(parsed.session_id) ?? stringValue(parsed.sessionId); if (!sessionId) { - throw new Error("Stop hook input is missing session_id"); + throw new Error("Hook input is missing session_id"); } const turnId = stringValue(parsed.turn_id) ?? stringValue(parsed.turnId); const transcriptPath = stringValue(parsed.transcript_path) ?? stringValue(parsed.transcriptPath); const cwd = stringValue(parsed.cwd); + const model = stringValue(parsed.model); + const source = stringValue(parsed.source); + const toolName = stringValue(parsed.tool_name) ?? stringValue(parsed.toolName); + const toolUseId = stringValue(parsed.tool_use_id) ?? stringValue(parsed.toolUseId); + const toolInput = parsed.tool_input ?? parsed.toolInput; + const toolResponse = parsed.tool_response ?? parsed.toolResponse; const lastAssistantMessage = nullableString(parsed.last_assistant_message) ?? nullableString(parsed.lastAssistantMessage); @@ -159,37 +180,49 @@ function stopHookEventFromInput( : typeof parsed.stopHookActive === "boolean" ? parsed.stopHookActive : undefined; - const id = stopHookEventId({ + const id = hookEventId({ + eventName, sessionId, turnId, transcriptPath, cwd, + toolName, + toolUseId, + source, }); return { version: 1, id, - eventName: "Stop", + eventName, sessionId, turnId, cwd, transcriptPath, - lastAssistantMessage, - stopHookActive, + model, + source, + promptPreview: previewString(parsed.prompt), + toolName, + toolUseId, + toolInputPreview: previewJson(toolInput), + toolResponsePreview: previewJson(toolResponse), + permissionDescription: nullableString(record(toolInput).description), + lastAssistantMessage: eventName === "Stop" ? lastAssistantMessage : undefined, + stopHookActive: eventName === "Stop" ? stopHookActive : undefined, createdAt: now().toISOString(), }; } -function parseStopHookSpoolEvent(input: unknown): DiscordGatewayStopHookEvent { +function parseHookSpoolEvent(input: unknown): DiscordGatewayHookEvent { const parsed = record(input); if (parsed.version !== 1) { - throw new Error("Invalid stop hook event version"); + throw new Error("Invalid hook event version"); } const eventName = stringValue(parsed.eventName); const id = stringValue(parsed.id); const sessionId = stringValue(parsed.sessionId); const createdAt = stringValue(parsed.createdAt); - if (eventName !== "Stop" || !id || !sessionId || !createdAt) { - throw new Error("Invalid stop hook event"); + if (!isHookEventName(eventName) || !id || !sessionId || !createdAt) { + throw new Error("Invalid hook event"); } return { version: 1, @@ -199,6 +232,14 @@ function parseStopHookSpoolEvent(input: unknown): DiscordGatewayStopHookEvent { turnId: stringValue(parsed.turnId), cwd: stringValue(parsed.cwd), transcriptPath: stringValue(parsed.transcriptPath), + model: stringValue(parsed.model), + source: stringValue(parsed.source), + promptPreview: stringValue(parsed.promptPreview), + toolName: stringValue(parsed.toolName), + toolUseId: stringValue(parsed.toolUseId), + toolInputPreview: stringValue(parsed.toolInputPreview), + toolResponsePreview: stringValue(parsed.toolResponsePreview), + permissionDescription: stringValue(parsed.permissionDescription), lastAssistantMessage: nullableString(parsed.lastAssistantMessage), stopHookActive: typeof parsed.stopHookActive === "boolean" ? parsed.stopHookActive @@ -207,21 +248,58 @@ function parseStopHookSpoolEvent(input: unknown): DiscordGatewayStopHookEvent { }; } -function stopHookEventId(input: { +function hookEventId(input: { + eventName: DiscordGatewayHookEventName; sessionId: string; turnId?: string; transcriptPath?: string; cwd?: string; + toolName?: string; + toolUseId?: string; + source?: string; }): string { const identity = input.turnId - ? { eventName: "Stop", sessionId: input.sessionId, turnId: input.turnId } - : { - eventName: "Stop", + ? { + eventName: input.eventName, sessionId: input.sessionId, + turnId: input.turnId, + toolName: input.toolName, + toolUseId: input.toolUseId, + } + : { + eventName: input.eventName, + sessionId: input.sessionId, + source: input.source, transcriptPath: input.transcriptPath, cwd: input.cwd, }; - return `stop-${createHash("sha256").update(JSON.stringify(identity)).digest("hex").slice(0, 24)}`; + const prefix = input.eventName === "Stop" ? "stop" : "hook"; + return `${prefix}-${createHash("sha256").update(JSON.stringify(identity)).digest("hex").slice(0, 24)}`; +} + +function isHookEventName(value: unknown): value is DiscordGatewayHookEventName { + return value === "SessionStart" || + value === "UserPromptSubmit" || + value === "PreToolUse" || + value === "PermissionRequest" || + value === "PostToolUse" || + value === "Stop"; +} + +function previewString(value: unknown, maxLength = 500): string | undefined { + const parsed = nullableString(value); + if (!parsed) { + return undefined; + } + return parsed.length <= maxLength ? parsed : `${parsed.slice(0, maxLength - 3)}...`; +} + +function previewJson(value: unknown, maxLength = 500): string | undefined { + if (value === undefined || value === null) { + return undefined; + } + const text = typeof value === "string" ? value : JSON.stringify(value); + return previewString(text, maxLength); } function record(value: unknown): Record { diff --git a/apps/discord-bridge/src/types.ts b/apps/discord-bridge/src/types.ts index 4994643..f61117f 100644 --- a/apps/discord-bridge/src/types.ts +++ b/apps/discord-bridge/src/types.ts @@ -36,6 +36,8 @@ export type DiscordConsoleOutputMode = "messages" | "none"; export type DiscordGatewayConfig = { homeChannelId: string; mainThreadId?: string; + workspaceForumChannelId?: string; + taskThreadsChannelId?: string; }; export type DiscordAuthor = { @@ -86,27 +88,97 @@ export type DiscordClearWebhooksInbound = { reply?: (text: string) => Promise; }; +export type DiscordStatusInbound = { + kind: "status"; + channelId: string; + guildId?: string; + author: DiscordAuthor; + createdAt: string; + reply?: (text: string) => Promise; + replyPicker?: (picker: DiscordEphemeralPicker) => Promise; +}; + +export type DiscordThreadsInbound = { + kind: "threads"; + channelId: string; + guildId?: string; + author: DiscordAuthor; + createdAt: string; + reply?: (text: string) => Promise; + replyPicker?: (picker: DiscordEphemeralPicker) => Promise; +}; + +export type DiscordThreadPickerInbound = { + kind: "threadPicker"; + channelId: string; + guildId?: string; + pickerId: string; + optionId: string; + author: DiscordAuthor; + createdAt: string; + reply?: (text: string) => Promise; + update?: (text: string) => Promise; +}; + +export type DiscordReactionInbound = { + kind: "reaction"; + channelId: string; + guildId?: string; + messageId: string; + emoji: string; + author: DiscordAuthor; + createdAt: string; +}; + export type DiscordInbound = | DiscordMessageInbound | DiscordThreadStartInbound | DiscordClearInbound - | DiscordClearWebhooksInbound; + | DiscordClearWebhooksInbound + | DiscordStatusInbound + | DiscordThreadsInbound + | DiscordThreadPickerInbound + | DiscordReactionInbound; + +export type DiscordEphemeralPicker = { + pickerId: string; + text: string; + options: DiscordEphemeralPickerOption[]; +}; + +export type DiscordEphemeralPickerOption = { + id: string; + label: string; +}; export type DiscordBridgeTransportHandlers = { onInbound(inbound: DiscordInbound): void; }; +export type DiscordBridgeCommandRegistration = { + channelIds?: string[]; +}; + export type DiscordBridgeTransport = { start(handlers: DiscordBridgeTransportHandlers): Promise; stop(): Promise; - registerCommands(): Promise; + registerCommands(options?: DiscordBridgeCommandRegistration): Promise; + createForumPost?( + channelId: string, + name: string, + message: string, + ): Promise<{ threadId: string; messageId?: string }>; createThread( channelId: string, name: string, sourceMessageId?: string, ): Promise; sendMessage(channelId: string, text: string): Promise; - updateMessage?(channelId: string, messageId: string, text: string): Promise; + updateMessage?( + channelId: string, + messageId: string, + text: string, + ): Promise; deleteMessage(channelId: string, messageId: string): Promise; deleteWebhookMessages?( channelId: string, @@ -114,6 +186,7 @@ export type DiscordBridgeTransport = { ): Promise<{ deleted: number; failed: number }>; deleteThread?(channelId: string): Promise; addThreadMembers?(channelId: string, userIds: string[]): Promise; + addReactions?(channelId: string, messageId: string, reactions: string[]): Promise; pinMessage?(channelId: string, messageId: string): Promise; sendTyping(channelId: string): Promise; }; @@ -153,7 +226,10 @@ export type DiscordGatewayState = { createdAt?: string; toolsVersion?: number; delegations: DiscordGatewayDelegation[]; + workspaces?: DiscordGatewayWorkspaceSurface[]; + observedThreads?: DiscordGatewayObservedThread[]; pendingWakes?: DiscordGatewayPendingWake[]; + processedHookEventIds?: string[]; processedStopHookEventIds?: string[]; }; @@ -170,9 +246,12 @@ export type DiscordGatewayDelegation = { title: string; status: "active" | "idle" | "failed" | "complete" | "reported"; cwd?: string; + workspaceKey?: string; groupId?: string; returnMode?: DiscordGatewayDelegationReturnMode; discordDetailThreadId?: string; + discordTaskThreadId?: string; + discordWorkspaceThreadId?: string; parentDiscordMessageId?: string; lastTurnId?: string; lastStatus?: string; @@ -180,11 +259,23 @@ export type DiscordGatewayDelegation = { completedAt?: string; injectedAt?: string; mirroredAt?: string; + taskMirroredAt?: string; reportedAt?: string; createdAt: string; updatedAt: string; }; +export type DiscordGatewayWorkspaceSurface = { + key: string; + cwd: string; + title: string; + discordThreadId: string; + statusMessageId?: string; + delegationIds: string[]; + createdAt: string; + updatedAt: string; +}; + export type DiscordGatewayPendingWake = { id: string; kind: "delegation" | "group"; @@ -195,19 +286,69 @@ export type DiscordGatewayPendingWake = { startedAt?: string; }; -export type DiscordGatewayStopHookEvent = { +export type DiscordGatewayHookEventName = + | "SessionStart" + | "UserPromptSubmit" + | "PreToolUse" + | "PermissionRequest" + | "PostToolUse" + | "Stop"; + +export type DiscordGatewayHookEvent = { version: 1; id: string; - eventName: "Stop"; + eventName: DiscordGatewayHookEventName; sessionId: string; turnId?: string; cwd?: string; transcriptPath?: string; + model?: string; + source?: string; + promptPreview?: string; + toolName?: string; + toolUseId?: string; + toolInputPreview?: string; + toolResponsePreview?: string; + permissionDescription?: string; lastAssistantMessage?: string; stopHookActive?: boolean; createdAt: string; }; +export type DiscordGatewayStopHookEvent = DiscordGatewayHookEvent & { + eventName: "Stop"; +}; + +export type DiscordGatewayObservedThreadStatus = + | "starting" + | "active" + | "tool" + | "waiting" + | "idle"; + +export type DiscordGatewayObservedThread = { + threadId: string; + title?: string; + status: DiscordGatewayObservedThreadStatus; + cwd?: string; + workspaceKey?: string; + model?: string; + transcriptPath?: string; + lastTurnId?: string; + lastHookEventName?: DiscordGatewayHookEventName; + source?: string; + promptPreview?: string; + assistantPreview?: string; + toolName?: string; + toolUseId?: string; + toolInputPreview?: string; + toolResponsePreview?: string; + permissionDescription?: string; + firstSeenAt: string; + lastSeenAt: string; + updatedAt: string; +}; + export type DiscordBridgeSession = { discordThreadId: string; parentChannelId: string; @@ -219,7 +360,7 @@ export type DiscordBridgeSession = { ownerUserId?: string; participantUserIds?: string[]; cwd?: string; - mode?: "new" | "resumed" | "gateway"; + mode?: "new" | "resumed" | "gateway" | "delegated" | "workspace"; statusMessageId?: string; }; diff --git a/apps/discord-bridge/test/bridge.test.ts b/apps/discord-bridge/test/bridge.test.ts index a8d32cd..8045052 100644 --- a/apps/discord-bridge/test/bridge.test.ts +++ b/apps/discord-bridge/test/bridge.test.ts @@ -1,4 +1,5 @@ -import { mkdtemp, rm } from "node:fs/promises"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import { createHash } from "node:crypto"; import os from "node:os"; import path from "node:path"; import { describe, expect, test } from "bun:test"; @@ -15,8 +16,10 @@ import { writeStopHookSpoolEvent } from "../src/stop-hook-spool.ts"; import type { CodexBridgeClient, DiscordBridgeConfig, + DiscordBridgeCommandRegistration, DiscordBridgeTransport, DiscordBridgeTransportHandlers, + DiscordEphemeralPicker, DiscordInbound, } from "../src/types.ts"; @@ -53,6 +56,9 @@ describe("DiscordCodexBridge", () => { await bridge.start(); await waitFor(() => bridge.stateForTest().sessions.length === 1); + expect(transport.registeredCommands).toEqual([ + { channelIds: ["parent-channel", "home-channel"] }, + ]); expect(client.startThreadCalls).toHaveLength(1); expect(client.startThreadCalls[0]?.dynamicTools).toEqual( expect.arrayContaining([ @@ -189,6 +195,556 @@ describe("DiscordCodexBridge", () => { await bridge.stop(); }); + test("gateway workbench opens delegation task threads lazily from workspace posts", async () => { + const hookSpoolDir = await testHookSpoolDir(); + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { + homeChannelId: "home-channel", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", + }, + allowedChannelIds: new Set(["home-channel"]), + hookSpoolDir, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/codex-flows", + title: "Hook packaging", + prompt: "Package the hook command.", + returnMode: "record_only", + }, + }, + }); + + await waitFor(() => client.responses.length === 1); + expect(transport.createdForumPosts).toEqual([ + expect.objectContaining({ + channelId: "workspace-forum", + name: "codex-flows", + threadId: "forum-post-1", + }), + ]); + expect(transport.createdThreads).toEqual([]); + const state = bridge.stateForTest(); + expect(state.gateway?.workspaces).toEqual([ + expect.objectContaining({ + cwd: "/workspace/codex-flows", + title: "codex-flows", + discordThreadId: "forum-post-1", + statusMessageId: "forum-post-1", + delegationIds: [state.gateway?.delegations[0]?.id], + }), + ]); + expect(state.gateway?.delegations[0]).toEqual( + expect.objectContaining({ + codexThreadId: "codex-thread-2", + workspaceKey: state.gateway?.workspaces?.[0]?.key, + discordWorkspaceThreadId: "forum-post-1", + }), + ); + expect(state.gateway?.delegations[0]?.discordTaskThreadId).toBeUndefined(); + const workspaceUpdate = transport.updatedMessages.find((message) => + message.channelId === "forum-post-1" && + message.messageId === "forum-post-1" + ); + expect(workspaceUpdate?.text).toContain("**Open Threads**\nNone"); + expect(workspaceUpdate?.text).not.toContain("Hook packaging"); + + const replies: string[] = []; + transport.emit({ + kind: "threads", + channelId: "forum-post-1", + author: { id: "user-1", name: "Peezy", isBot: false }, + createdAt: "2026-05-14T12:00:30.000Z", + reply: async (text) => { + replies.push(text); + }, + replyPicker: transport.threadsReplyPicker(), + }); + await waitFor(() => transport.ephemeralPickers.length === 1); + expect(replies).toEqual([]); + expect(transport.messages.some((message) => + message.channelId === "forum-post-1" && + message.text.includes("Hook packaging") + )).toBe(false); + const picker = transport.ephemeralPickers[0]; + expect(picker?.text).toContain("1️⃣ `not opened` Hook packaging"); + expect(picker?.text).toContain( + "Choose a number to open or resume that thread in Discord.", + ); + expect(picker?.options).toEqual([{ id: "0", label: "1" }]); + + transport.emitThreadPicker({ + pickerId: picker?.pickerId ?? "", + optionId: "0", + }); + await waitFor(() => transport.createdThreads.length === 1); + expect(transport.ephemeralUpdates.some((update) => + update.pickerId === picker?.pickerId && + update.text === "Opened Hook packaging: <#discord-thread-1>" + )).toBe(true); + expect(transport.createdThreads).toEqual([ + { + channelId: "task-channel", + name: "codex-flows: Hook packaging", + sourceMessageId: undefined, + }, + ]); + expect(bridge.stateForTest().sessions).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + discordThreadId: "discord-thread-1", + parentChannelId: "task-channel", + codexThreadId: "codex-thread-2", + cwd: "/workspace/codex-flows", + mode: "workspace", + }), + ]), + ); + + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Workbench result.", + cwd: "/workspace/codex-flows", + }); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(transport.messages.some((message) => + message.channelId === "discord-thread-1" && + message.text.includes("Workbench result.") + )).toBe(true); + const homeResult = transport.messages.find((message) => + message.channelId === "home-channel" && + message.text.includes("[discord-gateway delegation result]") && + message.text.includes("Hook packaging") + )?.text ?? ""; + expect(homeResult).toContain("<#forum-post-1>"); + expect(homeResult).toContain("<#discord-thread-1>"); + expect(homeResult).not.toContain("Workbench result."); + expect(transport.updatedMessages.some((message) => + message.channelId === "forum-post-1" && + message.text.includes("<#discord-thread-1> Hook packaging") + )).toBe(true); + + transport.emit({ + kind: "message", + channelId: "discord-thread-1", + messageId: "task-follow-up", + author: { id: "user-1", name: "Peezy", isBot: false }, + content: "Continue in this delegated thread.", + createdAt: "2026-05-14T12:01:00.000Z", + }); + await waitFor(() => client.startTurnCalls.length === 2); + expect(client.startTurnCalls[1]).toEqual( + expect.objectContaining({ + threadId: "codex-thread-2", + cwd: "/workspace/codex-flows", + }), + ); + expect(inputText(client.startTurnCalls[1]?.input[0])).toContain( + "Continue in this delegated thread.", + ); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } + }); + + test("gateway workbench reuses one workspace post per normalized cwd", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { + homeChannelId: "home-channel", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", + }, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + for (const [index, title] of ["First task", "Second task"].entries()) { + client.emitRequest({ + id: `tool-${index}`, + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: index === 0 + ? "/workspace/codex-flows/." + : "/workspace/codex-flows", + title, + }, + }, + }); + await waitFor(() => client.responses.length === index + 1); + } + + expect(transport.createdForumPosts).toHaveLength(1); + expect(transport.createdThreads).toHaveLength(0); + const workspaces = bridge.stateForTest().gateway?.workspaces ?? []; + const delegations = bridge.stateForTest().gateway?.delegations ?? []; + expect(workspaces).toEqual([ + expect.objectContaining({ + cwd: "/workspace/codex-flows", + delegationIds: delegations.map((delegation) => delegation.id), + }), + ]); + expect(new Set(delegations.map((delegation) => delegation.workspaceKey)).size) + .toBe(1); + await bridge.stop(); + }); + + test("gateway workbench discovers top-level folders under the main workspace", async () => { + const root = await mkdtemp(path.join(os.tmpdir(), "discord-workspaces-")); + await mkdir(path.join(root, "alpha", "nested"), { recursive: true }); + await mkdir(path.join(root, "beta"), { recursive: true }); + await mkdir(path.join(root, ".cache"), { recursive: true }); + const client = new FakeCodexClient(); + client.threads = [ + testThread({ + id: "codex-alpha-existing", + cwd: path.join(root, "alpha", "nested"), + name: "Alpha existing", + updatedAt: 3, + }), + testThread({ + id: "codex-beta-existing", + cwd: path.join(root, "beta"), + name: "Beta existing", + updatedAt: 2, + }), + ]; + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + cwd: root, + gateway: { + homeChannelId: "home-channel", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", + }, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + try { + await bridge.start(); + expect(transport.createdForumPosts.map((post) => post.name)).toEqual([ + "alpha", + "beta", + ]); + expect(bridge.stateForTest().gateway?.workspaces?.map((workspace) => + workspace.cwd + )).toEqual([ + path.join(root, "alpha"), + path.join(root, "beta"), + ]); + expect(transport.updatedMessages.some((message) => + message.channelId === "forum-post-1" && + message.text.includes("**Open Threads**\nNone") + )).toBe(true); + expect(transport.updatedMessages.some((message) => + message.channelId === "forum-post-2" && + message.text.includes("**Open Threads**\nNone") + )).toBe(true); + const replies: string[] = []; + transport.emit({ + kind: "threads", + channelId: "forum-post-1", + author: { id: "user-1", name: "Peezy", isBot: false }, + createdAt: "2026-05-14T12:00:30.000Z", + reply: async (text) => { + replies.push(text); + }, + replyPicker: transport.threadsReplyPicker(), + }); + await waitFor(() => transport.ephemeralPickers.length === 1); + expect(replies).toEqual([]); + expect(transport.messages.some((message) => + message.channelId === "forum-post-1" && + message.text.includes("Alpha existing") + )).toBe(false); + const picker = transport.ephemeralPickers[0]; + expect(picker?.text).toContain("1️⃣ `not opened` Alpha existing"); + transport.emitThreadPicker({ + pickerId: picker?.pickerId ?? "", + optionId: "0", + }); + await waitFor(() => transport.createdThreads.length === 1); + expect(client.resumeThreadCalls.some((call) => + call.threadId === "codex-alpha-existing" + )).toBe(true); + expect(transport.createdThreads[0]).toEqual({ + channelId: "task-channel", + name: "alpha: Alpha existing", + sourceMessageId: undefined, + }); + + client.emitRequest({ + id: "tool-nested", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: path.join(root, "alpha", "nested", "project"), + title: "Nested task", + }, + }, + }); + await waitFor(() => client.responses.length === 1); + expect(transport.createdForumPosts).toHaveLength(2); + expect(transport.createdThreads).toHaveLength(1); + const state = bridge.stateForTest(); + const alpha = state.gateway?.workspaces?.find((workspace) => + workspace.cwd === path.join(root, "alpha") + ); + const delegation = state.gateway?.delegations[0]; + expect(delegation).toBeDefined(); + expect(alpha?.delegationIds).toEqual([delegation!.id]); + expect(delegation).toEqual( + expect.objectContaining({ + workspaceKey: alpha?.key, + discordWorkspaceThreadId: alpha?.discordThreadId, + }), + ); + } finally { + await bridge.stop(); + await rm(root, { recursive: true, force: true }); + } + }); + + test("gateway workbench surfaces hook-observed non-gateway threads", async () => { + const root = await mkdtemp(path.join(os.tmpdir(), "discord-observed-")); + const hookSpoolDir = await testHookSpoolDir(); + await mkdir(path.join(root, "alpha", "project"), { recursive: true }); + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + cwd: root, + gateway: { + homeChannelId: "home-channel", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", + }, + hookSpoolDir, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + try { + await bridge.start(); + expect(transport.createdForumPosts.map((post) => post.name)).toEqual([ + "alpha", + ]); + await emitHookEvent(hookSpoolDir, { + eventName: "UserPromptSubmit", + sessionId: "codex-observed", + turnId: "turn-observed", + cwd: path.join(root, "alpha", "project"), + prompt: "Inspect observed runtime activity.", + }); + await waitFor(() => + bridge.stateForTest().gateway?.observedThreads?.[0]?.status === "active" + ); + await emitHookEvent(hookSpoolDir, { + eventName: "PermissionRequest", + sessionId: "codex-observed", + turnId: "turn-observed", + cwd: path.join(root, "alpha", "project"), + toolName: "Bash", + toolInput: { description: "Needs network" }, + }); + await waitFor(() => + bridge.stateForTest().gateway?.observedThreads?.[0]?.status === "waiting" + ); + expect(bridge.stateForTest().gateway?.observedThreads?.[0]).toEqual( + expect.objectContaining({ + threadId: "codex-observed", + title: "Inspect observed runtime activity.", + cwd: path.join(root, "alpha", "project"), + promptPreview: "Inspect observed runtime activity.", + permissionDescription: "Needs network", + }), + ); + + const replies: string[] = []; + transport.emit({ + kind: "threads", + channelId: "forum-post-1", + author: { id: "user-1", name: "Peezy", isBot: false }, + createdAt: "2026-05-14T12:00:30.000Z", + reply: async (text) => { + replies.push(text); + }, + replyPicker: transport.threadsReplyPicker(), + }); + await waitFor(() => transport.ephemeralPickers.length === 1); + expect(replies).toEqual([]); + expect(transport.messages.some((message) => + message.channelId === "forum-post-1" && + message.text.includes("Inspect observed runtime activity") + )).toBe(false); + const picker = transport.ephemeralPickers[0]; + expect(picker?.text).toContain( + "1️⃣ `not opened` Inspect observed runtime activity. (waiting: Needs network)", + ); + + transport.emitThreadPicker({ + pickerId: picker?.pickerId ?? "", + optionId: "0", + }); + await waitFor(() => transport.createdThreads.length === 1); + expect(client.resumeThreadCalls.some((call) => + call.threadId === "codex-observed" && + call.cwd === path.join(root, "alpha", "project") + )).toBe(true); + expect(transport.createdThreads[0]).toEqual({ + channelId: "task-channel", + name: "alpha: Inspect observed runtime activity.", + sourceMessageId: undefined, + }); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + await rm(root, { recursive: true, force: true }); + } + }); + + test("gateway workbench resumes persisted task thread sessions after restart", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const existingWorkspaceKey = testWorkspaceKey("/workspace/codex-flows"); + const store = new MemoryStateStore({ + ...emptyState(), + gateway: { + homeChannelId: "home-channel", + mainThreadId: "codex-main", + toolsVersion: 1, + delegations: [ + { + id: "delegation-existing", + codexThreadId: "codex-delegated", + title: "Existing task", + status: "idle", + cwd: "/workspace/codex-flows", + workspaceKey: existingWorkspaceKey, + discordTaskThreadId: "task-thread-existing", + discordWorkspaceThreadId: "workspace-post-existing", + createdAt: "2026-05-14T11:00:00.000Z", + updatedAt: "2026-05-14T11:00:00.000Z", + }, + ], + workspaces: [ + { + key: existingWorkspaceKey, + cwd: "/workspace/codex-flows", + title: "codex-flows", + discordThreadId: "workspace-post-existing", + statusMessageId: "workspace-status-existing", + delegationIds: ["delegation-existing"], + createdAt: "2026-05-14T11:00:00.000Z", + updatedAt: "2026-05-14T11:00:00.000Z", + }, + ], + }, + sessions: [ + { + discordThreadId: "home-channel", + parentChannelId: "home-channel", + codexThreadId: "codex-main", + title: "Codex Gateway", + createdAt: "2026-05-14T11:00:00.000Z", + cwd: "/workspace", + mode: "gateway", + }, + { + discordThreadId: "task-thread-existing", + parentChannelId: "task-channel", + codexThreadId: "codex-delegated", + title: "Existing task", + createdAt: "2026-05-14T11:00:00.000Z", + cwd: "/workspace/codex-flows", + mode: "delegated", + }, + ], + queue: [], + activeTurns: [], + processedMessageIds: [], + deliveries: [], + }); + const bridge = new DiscordCodexBridge({ + client, + transport, + store, + config: testConfig({ + gateway: { + homeChannelId: "home-channel", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", + }, + allowedChannelIds: new Set(["home-channel"]), + }), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 2); + expect(transport.createdForumPosts).toEqual([]); + expect(transport.createdThreads).toEqual([]); + + transport.emit({ + kind: "message", + channelId: "task-thread-existing", + messageId: "message-existing-task", + author: { id: "user-1", name: "Peezy", isBot: false }, + content: "Continue the restarted delegated task.", + createdAt: "2026-05-14T12:00:00.000Z", + }); + await waitFor(() => client.startTurnCalls.length === 1); + expect(client.startTurnCalls[0]).toEqual( + expect.objectContaining({ + threadId: "codex-delegated", + cwd: "/workspace/codex-flows", + }), + ); + await bridge.stop(); + }); + test("gateway rejects dynamic tool calls outside the main operator thread", async () => { const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); @@ -687,18 +1243,21 @@ describe("DiscordCodexBridge", () => { await waitFor(() => client.injectThreadItemsCalls.length === 1); expect(client.startTurnCalls).toHaveLength(1); - expect(transport.messages.some((message) => - message.text.includes("Manual result.") - )).toBe(true); + await waitFor(() => + transport.messages.some((message) => + message.text.includes("Manual result.") + ) + ); } finally { await bridge.stop(); await rm(hookSpoolDir, { recursive: true, force: true }); } }); - test("answers gateway status in the home channel without starting a turn", async () => { + test("answers gateway status command without starting a turn", async () => { const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); + const replies: string[] = []; const bridge = new DiscordCodexBridge({ client, transport, @@ -711,27 +1270,123 @@ describe("DiscordCodexBridge", () => { await bridge.start(); await waitFor(() => bridge.stateForTest().sessions.length === 1); transport.emit({ - kind: "message", + kind: "status", channelId: "home-channel", - messageId: "status-message-1", author: { id: "user-1", name: "Peezy", isBot: false }, - content: "status", createdAt: "2026-05-14T00:00:00.000Z", + reply: async (text) => { + replies.push(text); + }, }); - await waitFor(() => - transport.messages.some((message) => - message.channelId === "home-channel" && - message.text.includes("**Codex Gateway**") - ) - ); + await waitFor(() => replies.length === 1); + expect(replies[0]).toContain("**Codex Gateway**"); expect(client.startTurnCalls).toHaveLength(0); - expect(bridge.stateForTest().processedMessageIds).toContain( - "status-message-1", - ); await bridge.stop(); }); + test("status lists active Codex threads and opens unlinked threads", async () => { + const root = await mkdtemp(path.join(os.tmpdir(), "discord-status-")); + await mkdir(path.join(root, "alpha", "project"), { recursive: true }); + await mkdir(path.join(root, "beta", "project"), { recursive: true }); + const client = new FakeCodexClient(); + client.threads = [ + testThread({ + id: "codex-active-linked", + cwd: path.join(root, "alpha", "project"), + name: "Linked active", + status: { type: "active" } as v2.ThreadStatus, + updatedAt: 30, + }), + testThread({ + id: "codex-active-missing", + cwd: path.join(root, "beta", "project"), + name: "Missing active", + status: { type: "active" } as v2.ThreadStatus, + updatedAt: 40, + }), + testThread({ + id: "codex-idle", + cwd: path.join(root, "beta", "project"), + name: "Idle thread", + status: { type: "idle" } as v2.ThreadStatus, + updatedAt: 50, + }), + ]; + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore({ + ...emptyState(), + sessions: [ + { + discordThreadId: "task-linked", + parentChannelId: "task-channel", + codexThreadId: "codex-active-linked", + title: "Linked active", + createdAt: "2026-05-14T11:00:00.000Z", + cwd: path.join(root, "alpha", "project"), + mode: "workspace", + }, + ], + }), + config: testConfig({ + cwd: root, + gateway: { + homeChannelId: "home-channel", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", + }, + }), + }); + + try { + await bridge.start(); + const replies: string[] = []; + transport.emit({ + kind: "status", + channelId: "home-channel", + author: { id: "user-1", name: "Peezy", isBot: false }, + createdAt: "2026-05-14T12:00:00.000Z", + reply: async (text) => { + replies.push(text); + }, + replyPicker: transport.threadsReplyPicker(), + }); + + await waitFor(() => transport.ephemeralPickers.length === 1); + expect(replies).toEqual([]); + const picker = transport.ephemeralPickers[0]; + expect(picker?.text).toContain("**Active Codex Threads**"); + expect(picker?.text).toContain("<#task-linked> Linked active (active)"); + expect(picker?.text).toContain("1️⃣ `not opened` Missing active (active)"); + expect(picker?.text).not.toContain("Idle thread"); + expect(picker?.options).toEqual([{ id: "0", label: "1" }]); + + transport.emitThreadPicker({ + pickerId: picker?.pickerId ?? "", + optionId: "0", + }); + await waitFor(() => transport.createdThreads.length === 1); + expect(client.resumeThreadCalls.some((call) => + call.threadId === "codex-active-missing" + )).toBe(true); + expect(transport.createdThreads[0]).toEqual({ + channelId: "task-channel", + name: "beta: Missing active", + sourceMessageId: undefined, + }); + expect(transport.ephemeralUpdates.some((update) => + update.pickerId === picker?.pickerId && + update.text === "Opened Missing active: <#discord-thread-1>" + )).toBe(true); + } finally { + await bridge.stop(); + await rm(root, { recursive: true, force: true }); + } + }); + test("resumes a configured gateway main thread without creating Discord threads", async () => { const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); @@ -3104,6 +3759,38 @@ async function testHookSpoolDir(): Promise { return await mkdtemp(path.join(os.tmpdir(), "discord-bridge-hooks-")); } +async function emitHookEvent( + spoolDir: string, + input: { + eventName: string; + sessionId: string; + turnId?: string; + cwd?: string; + prompt?: string; + toolName?: string; + toolInput?: unknown; + lastAssistantMessage?: string; + }, +): Promise { + await writeStopHookSpoolEvent( + { + hook_event_name: input.eventName, + session_id: input.sessionId, + turn_id: input.turnId, + cwd: input.cwd ?? "/workspace", + transcript_path: `/tmp/${input.sessionId}.jsonl`, + prompt: input.prompt, + tool_name: input.toolName, + tool_input: input.toolInput, + last_assistant_message: input.lastAssistantMessage ?? null, + }, + { + spoolDir, + now: () => new Date("2026-05-14T12:00:00.000Z"), + }, + ); +} + async function emitStopHook( spoolDir: string, input: { @@ -3144,6 +3831,41 @@ function testConfig( }; } +function testWorkspaceKey(cwd: string): string { + return `workspace-${createHash("sha256").update(cwd).digest("hex").slice(0, 12)}`; +} + +function testThread(input: { + id: string; + cwd: string; + name?: string; + preview?: string; + updatedAt?: number; + status?: v2.ThreadStatus; +}): v2.Thread { + return { + id: input.id, + sessionId: input.id, + forkedFromId: null, + preview: input.preview ?? input.name ?? input.id, + ephemeral: false, + modelProvider: "openai", + createdAt: input.updatedAt ?? 1, + updatedAt: input.updatedAt ?? 1, + status: input.status ?? { type: "idle" }, + path: null, + cwd: input.cwd, + cliVersion: "test", + source: "cli", + threadSource: null, + agentNickname: null, + agentRole: null, + gitInfo: null, + name: input.name ?? null, + turns: [], + } as v2.Thread; +} + class FakeCodexClient implements CodexBridgeClient { startThreadCalls: v2.ThreadStartParams[] = []; resumeThreadCalls: v2.ThreadResumeParams[] = []; @@ -3164,6 +3886,7 @@ class FakeCodexClient implements CodexBridgeClient { threadTurns = new Map(); threadCwds = new Map(); threadGoals = new Map(); + threads: v2.Thread[] = []; failedResumeThreadIds = new Set(); blockStartTurn = false; #startTurnResolvers: Array<() => void> = []; @@ -3209,10 +3932,12 @@ class FakeCodexClient implements CodexBridgeClient { if (this.failedResumeThreadIds.has(params.threadId)) { throw new Error(`thread not found: ${params.threadId}`); } - const cwd = params.cwd ?? this.threadCwds.get(params.threadId) ?? "/workspace"; + const listedThread = this.threads.find((thread) => thread.id === params.threadId); + const cwd = params.cwd ?? this.threadCwds.get(params.threadId) ?? + listedThread?.cwd ?? "/workspace"; return { cwd, - thread: { + thread: listedThread ?? { id: params.threadId, cwd, turns: this.threadTurns.get(params.threadId) ?? [], @@ -3261,8 +3986,16 @@ class FakeCodexClient implements CodexBridgeClient { async listThreads(params: v2.ThreadListParams): Promise { this.listThreadsCalls.push(params); + const cwdFilter = Array.isArray(params.cwd) + ? new Set(params.cwd) + : params.cwd + ? new Set([params.cwd]) + : undefined; + const filtered = this.threads.filter((thread) => + !cwdFilter || cwdFilter.has(thread.cwd) + ); return { - data: [], + data: filtered.slice(0, params.limit ?? filtered.length), nextCursor: null, backwardsCursor: null, }; @@ -3316,12 +4049,24 @@ class FakeDiscordTransport implements DiscordBridgeTransport { name: string; sourceMessageId?: string; }> = []; + createdForumPosts: Array<{ + channelId: string; + name: string; + message: string; + threadId: string; + messageId: string; + }> = []; messages: Array<{ channelId: string; id: string; text: string; webhookId?: string; }> = []; + ephemeralPickers: DiscordEphemeralPicker[] = []; + ephemeralUpdates: Array<{ + pickerId: string; + text: string; + }> = []; updatedMessages: Array<{ channelId: string; messageId: string; @@ -3334,6 +4079,12 @@ class FakeDiscordTransport implements DiscordBridgeTransport { }> = []; deletedThreads: string[] = []; addedThreadMembers: Array<{ channelId: string; userIds: string[] }> = []; + addedReactions: Array<{ + channelId: string; + messageId: string; + reactions: string[]; + }> = []; + registeredCommands: DiscordBridgeCommandRegistration[] = []; pinnedMessages: Array<{ channelId: string; messageId: string }> = []; typingCount = 0; @@ -3343,7 +4094,11 @@ class FakeDiscordTransport implements DiscordBridgeTransport { async stop(): Promise {} - async registerCommands(): Promise {} + async registerCommands( + options: DiscordBridgeCommandRegistration = {}, + ): Promise { + this.registeredCommands.push(options); + } async createThread( channelId: string, @@ -3354,6 +4109,24 @@ class FakeDiscordTransport implements DiscordBridgeTransport { return `discord-thread-${this.createdThreads.length}`; } + async createForumPost( + channelId: string, + name: string, + message: string, + ): Promise<{ threadId: string; messageId?: string }> { + const threadId = `forum-post-${this.createdForumPosts.length + 1}`; + const messageId = threadId; + this.createdForumPosts.push({ + channelId, + name, + message, + threadId, + messageId, + }); + this.messages.push({ channelId: threadId, id: messageId, text: message }); + return { threadId, messageId }; + } + async sendMessage(channelId: string, text: string): Promise { const id = `message-out-${this.messages.length + 1}`; this.messages.push({ channelId, id, text }); @@ -3411,6 +4184,14 @@ class FakeDiscordTransport implements DiscordBridgeTransport { this.addedThreadMembers.push({ channelId, userIds }); } + async addReactions( + channelId: string, + messageId: string, + reactions: string[], + ): Promise { + this.addedReactions.push({ channelId, messageId, reactions }); + } + async pinMessage(channelId: string, messageId: string): Promise { this.pinnedMessages.push({ channelId, messageId }); } @@ -3422,6 +4203,57 @@ class FakeDiscordTransport implements DiscordBridgeTransport { emit(inbound: DiscordInbound): void { this.handlers?.onInbound(inbound); } + + threadsReplyPicker(): (picker: DiscordEphemeralPicker) => Promise { + return async (picker) => { + this.ephemeralPickers.push(picker); + }; + } + + emitThreadPicker(input: { + pickerId: string; + optionId: string; + authorId?: string; + }): void { + this.emit({ + kind: "threadPicker", + channelId: "ephemeral-command", + pickerId: input.pickerId, + optionId: input.optionId, + author: { + id: input.authorId ?? "user-1", + name: "Peezy", + isBot: false, + }, + createdAt: "2026-05-14T12:00:00.000Z", + update: async (text) => { + this.ephemeralUpdates.push({ pickerId: input.pickerId, text }); + }, + reply: async (text) => { + this.ephemeralUpdates.push({ pickerId: input.pickerId, text }); + }, + }); + } + + emitReaction(input: { + channelId: string; + messageId: string; + emoji: string; + authorId?: string; + }): void { + this.emit({ + kind: "reaction", + channelId: input.channelId, + messageId: input.messageId, + emoji: input.emoji, + author: { + id: input.authorId ?? "user-1", + name: "Peezy", + isBot: false, + }, + createdAt: "2026-05-14T12:00:00.000Z", + }); + } } class FakeConsoleOutput implements DiscordConsoleOutput { diff --git a/apps/discord-bridge/test/config.test.ts b/apps/discord-bridge/test/config.test.ts index acbe3f2..12e833b 100644 --- a/apps/discord-bridge/test/config.test.ts +++ b/apps/discord-bridge/test/config.test.ts @@ -196,6 +196,10 @@ describe("parseConfig", () => { "home-channel", "--main-thread-id", "main-thread", + "--workspace-forum-channel-id", + "workspace-forum", + "--task-threads-channel-id", + "task-channel", "--flow-backend-url", "http://127.0.0.1:8089", ], @@ -206,6 +210,8 @@ describe("parseConfig", () => { { CODEX_DISCORD_GATEWAY_HOME_CHANNEL_ID: "env-home", CODEX_DISCORD_GATEWAY_MAIN_THREAD_ID: "env-thread", + CODEX_DISCORD_GATEWAY_WORKSPACE_FORUM_CHANNEL_ID: "env-workspace-forum", + CODEX_DISCORD_GATEWAY_TASK_THREADS_CHANNEL_ID: "env-task-channel", CODEX_FLOW_BACKEND_URL: "http://127.0.0.1:8090", }, ); @@ -216,11 +222,15 @@ describe("parseConfig", () => { expect(fromFlag.config.gateway).toEqual({ homeChannelId: "home-channel", mainThreadId: "main-thread", + workspaceForumChannelId: "workspace-forum", + taskThreadsChannelId: "task-channel", }); expect(fromFlag.config.flowBackendUrl).toBe("http://127.0.0.1:8089"); expect(fromEnv.config.gateway).toEqual({ homeChannelId: "env-home", mainThreadId: "env-thread", + workspaceForumChannelId: "env-workspace-forum", + taskThreadsChannelId: "env-task-channel", }); expect(fromEnv.config.flowBackendUrl).toBe("http://127.0.0.1:8090"); } @@ -242,6 +252,48 @@ describe("parseConfig", () => { ).toThrow("Cannot set a gateway main thread without a gateway home channel."); }); + test("rejects partial gateway workbench channel configuration", () => { + expect(() => + parseConfig( + [ + "--token", + "discord-token", + "--allowed-user-ids", + "user-1", + "--home-channel-id", + "home-channel", + "--workspace-forum-channel-id", + "workspace-forum", + ], + {}, + ) + ).toThrow( + "Discord workbench requires both workspace forum and task threads channels.", + ); + }); + + test("rejects gateway workbench channels that are not separate", () => { + expect(() => + parseConfig( + [ + "--token", + "discord-token", + "--allowed-user-ids", + "user-1", + "--home-channel-id", + "home-channel", + "--workspace-forum-channel-id", + "workspace-forum", + "--task-threads-channel-id", + "home-channel", + ], + {}, + ) + ).toThrow( + "Discord workbench channels must be separate from the gateway home channel and each other.", + ); + }); + test("can force a local app-server even when workspace URL env is set", () => { const parsed = parseConfig( [ diff --git a/apps/discord-bridge/test/hook-cli.test.ts b/apps/discord-bridge/test/hook-cli.test.ts index db95b10..6e6d075 100644 --- a/apps/discord-bridge/test/hook-cli.test.ts +++ b/apps/discord-bridge/test/hook-cli.test.ts @@ -22,7 +22,7 @@ describe("discord gateway hook CLI", () => { ); }); - test("upserts package-bin Stop hook while preserving unrelated hooks", () => { + test("upserts package-bin observability hooks while preserving unrelated hooks", () => { const updated = upsertStopHookConfig( JSON.stringify({ hooks: { @@ -46,25 +46,66 @@ describe("discord gateway hook CLI", () => { ], }, }), - "codex-discord-bridge hook stop", + "codex-discord-bridge hook event", ); expect(updated).toEqual({ hooks: { PreToolUse: [ + { + hooks: [ + { + type: "command", + command: "codex-discord-bridge hook event", + timeout: 10, + }, + ], + }, { matcher: "Bash", hooks: [{ type: "command", command: "echo pre" }], }, ], + PermissionRequest: [ + { + hooks: [ + { + type: "command", + command: "codex-discord-bridge hook event", + timeout: 10, + }, + ], + }, + ], + PostToolUse: [ + { + hooks: [ + { + type: "command", + command: "codex-discord-bridge hook event", + timeout: 10, + }, + ], + }, + ], + SessionStart: [ + { + hooks: [ + { + type: "command", + command: "codex-discord-bridge hook event", + timeout: 10, + }, + ], + }, + ], Stop: [ { hooks: [ { type: "command", - command: "codex-discord-bridge hook stop", + command: "codex-discord-bridge hook event", timeout: 10, - statusMessage: "Recording Discord gateway Stop event", }, ], }, @@ -72,6 +113,17 @@ describe("discord gateway hook CLI", () => { hooks: [{ type: "command", command: "echo other-stop" }], }, ], + UserPromptSubmit: [ + { + hooks: [ + { + type: "command", + command: "codex-discord-bridge hook event", + timeout: 10, + }, + ], + }, + ], }, }); }); @@ -91,7 +143,7 @@ describe("discord gateway hook CLI", () => { expect(result).toEqual({ command: - "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook stop", + "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook event", configPath, hooksPath, dryRun: false, @@ -102,12 +154,22 @@ describe("discord gateway hook CLI", () => { expect(JSON.parse(await readFile(hooksPath, "utf8"))).toEqual( expect.objectContaining({ hooks: expect.objectContaining({ + UserPromptSubmit: [ + { + hooks: [ + expect.objectContaining({ + command: + "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook event", + }), + ], + }, + ], Stop: [ { hooks: [ expect.objectContaining({ command: - "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook stop", + "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook event", }), ], }, diff --git a/apps/discord-bridge/test/state.test.ts b/apps/discord-bridge/test/state.test.ts index c7bbccc..b9808f8 100644 --- a/apps/discord-bridge/test/state.test.ts +++ b/apps/discord-bridge/test/state.test.ts @@ -27,12 +27,46 @@ describe("JsonFileStateStore", () => { title: "Patchbay webhook work", status: "active", cwd: "/workspace/patchbay", + workspaceKey: "workspace-patchbay", discordDetailThreadId: "discord-detail-thread", + discordTaskThreadId: "discord-task-thread", + discordWorkspaceThreadId: "discord-workspace-thread", parentDiscordMessageId: "message-parent", + taskMirroredAt: "2026-05-11T00:00:02.500Z", createdAt: "2026-05-11T00:00:01.000Z", updatedAt: "2026-05-11T00:00:02.000Z", }, ], + workspaces: [ + { + key: "workspace-patchbay", + cwd: "/workspace/patchbay", + title: "patchbay", + discordThreadId: "discord-workspace-thread", + statusMessageId: "message-workspace-status", + delegationIds: ["delegation-1", "", "delegation-1"], + createdAt: "2026-05-11T00:00:00.500Z", + updatedAt: "2026-05-11T00:00:02.500Z", + }, + ], + observedThreads: [ + { + threadId: "codex-observed-thread", + title: "Observed work", + status: "waiting", + cwd: "/workspace/patchbay", + workspaceKey: "workspace-patchbay", + model: "gpt-test", + transcriptPath: "/tmp/observed.jsonl", + lastTurnId: "turn-observed", + lastHookEventName: "PermissionRequest", + promptPreview: "Observed prompt", + permissionDescription: "Needs approval", + firstSeenAt: "2026-05-11T00:00:01.000Z", + lastSeenAt: "2026-05-11T00:00:04.000Z", + updatedAt: "2026-05-11T00:00:04.000Z", + }, + ], pendingWakes: [ { id: "wake-1", @@ -49,6 +83,7 @@ describe("JsonFileStateStore", () => { "stop-1", "stop-2", ], + processedHookEventIds: ["hook-1", "", "hook-1"], }, sessions: [ { @@ -111,12 +146,52 @@ describe("JsonFileStateStore", () => { title: "Patchbay webhook work", status: "active", cwd: "/workspace/patchbay", + workspaceKey: "workspace-patchbay", discordDetailThreadId: "discord-detail-thread", + discordTaskThreadId: "discord-task-thread", + discordWorkspaceThreadId: "discord-workspace-thread", parentDiscordMessageId: "message-parent", + taskMirroredAt: "2026-05-11T00:00:02.500Z", createdAt: "2026-05-11T00:00:01.000Z", updatedAt: "2026-05-11T00:00:02.000Z", }, ], + workspaces: [ + { + key: "workspace-patchbay", + cwd: "/workspace/patchbay", + title: "patchbay", + discordThreadId: "discord-workspace-thread", + statusMessageId: "message-workspace-status", + delegationIds: ["delegation-1"], + createdAt: "2026-05-11T00:00:00.500Z", + updatedAt: "2026-05-11T00:00:02.500Z", + }, + ], + observedThreads: [ + { + threadId: "codex-observed-thread", + title: "Observed work", + status: "waiting", + cwd: "/workspace/patchbay", + workspaceKey: "workspace-patchbay", + model: "gpt-test", + transcriptPath: "/tmp/observed.jsonl", + lastTurnId: "turn-observed", + lastHookEventName: "PermissionRequest", + source: undefined, + promptPreview: "Observed prompt", + assistantPreview: undefined, + toolName: undefined, + toolUseId: undefined, + toolInputPreview: undefined, + toolResponsePreview: undefined, + permissionDescription: "Needs approval", + firstSeenAt: "2026-05-11T00:00:01.000Z", + lastSeenAt: "2026-05-11T00:00:04.000Z", + updatedAt: "2026-05-11T00:00:04.000Z", + }, + ], pendingWakes: [ { id: "wake-1", @@ -127,6 +202,7 @@ describe("JsonFileStateStore", () => { createdAt: "2026-05-11T00:00:03.000Z", }, ], + processedHookEventIds: ["hook-1", "stop-1", "stop-2"], processedStopHookEventIds: ["stop-1", "stop-2"], }); expect(state.sessions).toHaveLength(2); diff --git a/apps/discord-bridge/test/stop-hook-spool.test.ts b/apps/discord-bridge/test/stop-hook-spool.test.ts index 2ed1030..a93095f 100644 --- a/apps/discord-bridge/test/stop-hook-spool.test.ts +++ b/apps/discord-bridge/test/stop-hook-spool.test.ts @@ -60,6 +60,52 @@ describe("stop hook spool", () => { } }); + test("writes passive lifecycle hook events with previews", async () => { + const spoolDir = await mkdtemp(path.join(os.tmpdir(), "hook-spool-")); + try { + const event = await writeStopHookSpoolEvent( + { + hook_event_name: "UserPromptSubmit", + session_id: "session-observed", + turn_id: "turn-observed", + cwd: "/workspace/observed", + transcript_path: "/tmp/session-observed.jsonl", + model: "gpt-test", + prompt: "Inspect the observed workspace without routing through Discord.", + }, + { + spoolDir, + now: () => new Date("2026-05-14T12:00:00.000Z"), + }, + ); + + expect(event).toEqual( + expect.objectContaining({ + eventName: "UserPromptSubmit", + sessionId: "session-observed", + turnId: "turn-observed", + cwd: "/workspace/observed", + model: "gpt-test", + promptPreview: + "Inspect the observed workspace without routing through Discord.", + }), + ); + const pending = await readPendingStopHookSpoolFiles(spoolDir); + expect(pending[0]).toEqual( + expect.objectContaining({ + event: expect.objectContaining({ + id: event.id, + eventName: "UserPromptSubmit", + promptPreview: + "Inspect the observed workspace without routing through Discord.", + }), + }), + ); + } finally { + await rm(spoolDir, { recursive: true, force: true }); + } + }); + test("archives processed files out of pending", async () => { const spoolDir = await mkdtemp(path.join(os.tmpdir(), "stop-hook-spool-")); try {