diff --git a/apps/discord-bridge/README.md b/apps/discord-bridge/README.md index 71000f2..34b14d6 100644 --- a/apps/discord-bridge/README.md +++ b/apps/discord-bridge/README.md @@ -16,6 +16,7 @@ CODEX_DISCORD_MAIN_THREAD_ID=019e2509-ddbb-7380-b97b-41575092d86b CODEX_DISCORD_ALLOWED_CHANNEL_IDS=1502107617512919220 CODEX_DISCORD_DIR=/home/peezy/codex-fork-workspace/codex-flows CODEX_FLOW_BACKEND_URL=http://127.0.0.1:8090 +CODEX_DISCORD_HOOK_SPOOL_DIR=/home/peezy/.codex/discord-bridge/stop-hooks ``` `CODEX_DISCORD_MAIN_THREAD_ID` is optional. If omitted, the bridge creates a new @@ -76,6 +77,64 @@ Delegations support return modes: - `detached`: do not loop results back to the main thread; useful for human-continued threads Automatic result return uses `thread/inject_items` to append structured -delegation results to the main operator thread's model-visible history. Starting -a main-thread turn is a separate wake step, so long-running main goals are not -interrupted; wakes are queued until the main operator thread is idle. +delegation results to the main operator thread's model-visible history. Codex +`Stop` hooks, not background thread polling, drive automatic result return: +the global hook writes durable Stop events into the spool directory, and the +gateway drains that spool on startup and while running. Starting a main-thread +turn is a separate wake step, so long-running main goals are not interrupted; +wakes are queued until the main operator thread is idle. + +## Codex Stop Hook + +Install the global hook once for the Codex runtime that backs the gateway: + +```bash +codex-discord-bridge hook install +``` + +The bridge and hook default to `~/.codex/discord-bridge/stop-hooks`; override +both with `CODEX_DISCORD_HOOK_SPOOL_DIR` or `--hook-spool-dir` if needed. + +The installer enables the current hooks feature in `~/.codex/config.toml`: + +```toml +[features] +hooks = true +``` + +It also registers the Stop hook in `~/.codex/hooks.json`: + +```json +{ + "hooks": { + "Stop": [ + { + "hooks": [ + { + "type": "command", + "command": "codex-discord-bridge hook stop", + "timeout": 10, + "statusMessage": "Recording Discord gateway Stop event" + } + ] + } + ] + } +} +``` + +For package-on-demand installs, write a `bunx` command instead: + +```bash +codex-discord-bridge hook install --bunx +codex-discord-bridge hook install --bunx-package @peezy.tech/codex-flows +``` + +The hook is intentionally dumb: it does not read gateway state or call the +backend. It only writes idempotent Stop-event files. The gateway ignores unknown +sessions, treats known delegated sessions according to their return mode, and +uses main-operator Stop events to drain queued wakes. + +After changing hook configuration, restart the Codex runtime that backs the +gateway and trust the hook when Codex asks for review. `hooks/list` should show +the hook as `trusted`; untrusted hooks are discovered but do not run. diff --git a/apps/discord-bridge/src/bridge.ts b/apps/discord-bridge/src/bridge.ts index 4c2b454..6e2c2f8 100644 --- a/apps/discord-bridge/src/bridge.ts +++ b/apps/discord-bridge/src/bridge.ts @@ -1,3 +1,4 @@ +import { watch, type FSWatcher } from "node:fs"; import os from "node:os"; import path from "node:path"; import { createHash } from "node:crypto"; @@ -12,12 +13,19 @@ import { createDiscordBridgeLogger, type DiscordBridgeLogger, } from "./logger.ts"; +import { + archiveStopHookSpoolFile, + ensureStopHookSpool, + readPendingStopHookSpoolFiles, + stopHookSpoolPaths, +} from "./stop-hook-spool.ts"; import type { CodexBridgeClient, DiscordBridgeConfig, DiscordGatewayDelegation, DiscordGatewayDelegationReturnMode, DiscordGatewayPendingWake, + DiscordGatewayStopHookEvent, DiscordBridgeSession, DiscordBridgeState, DiscordBridgeStateStore, @@ -31,7 +39,8 @@ import type { const maxDiscordMessageLength = 2000; const gatewayToolsVersion = 1; -const defaultGatewayReconcileIntervalMs = 10_000; +const stopHookDrainDebounceMs = 100; +const stopHookRetryMs = 1_000; type ThreadSnapshot = { terminalTurnIds: string[]; @@ -54,7 +63,9 @@ export class DiscordCodexBridge { #dedupe: MessageDeduplicator; #logger: DiscordBridgeLogger; #consoleOutput: DiscordConsoleOutput | undefined; - #gatewayReconcileTimer: Timer | undefined; + #gatewayStopHookWatcher: FSWatcher | undefined; + #gatewayStopHookDrainTimer: Timer | undefined; + #gatewayStopHookDrainChain: Promise = Promise.resolve(); constructor(options: { client: CodexBridgeClient; @@ -132,20 +143,25 @@ export class DiscordCodexBridge { for (const runner of this.#runnersByDiscordThread.values()) { runner.start(); } - this.#startGatewayReconciler(); + await this.#startGatewayStopHookSpool(); } async stop(): Promise { this.#debug("bridge.stop", { runners: this.#runnersByDiscordThread.size, }); - if (this.#gatewayReconcileTimer) { - clearInterval(this.#gatewayReconcileTimer); - this.#gatewayReconcileTimer = undefined; + if (this.#gatewayStopHookDrainTimer) { + clearTimeout(this.#gatewayStopHookDrainTimer); + this.#gatewayStopHookDrainTimer = undefined; + } + if (this.#gatewayStopHookWatcher) { + this.#gatewayStopHookWatcher.close(); + this.#gatewayStopHookWatcher = undefined; } await Promise.all( [...this.#runnersByDiscordThread.values()].map((runner) => runner.stop()), ); + await this.#gatewayStopHookDrainChain.catch(() => undefined); await this.#persistChain.catch(() => undefined); await this.transport.stop(); this.client.close(); @@ -608,6 +624,10 @@ export class DiscordCodexBridge { return; } await runner.handleNotification(message); + if (message.method === "turn/completed" && this.#isGatewayMainThread(threadId)) { + await this.#processPendingWakes(); + await this.#persist(); + } } #handleServerRequest(message: JsonRpcRequest): void { @@ -742,6 +762,7 @@ export class DiscordCodexBridge { toolsVersion: state.gateway?.toolsVersion, delegations: state.gateway?.delegations ?? [], pendingWakes: state.gateway?.pendingWakes ?? [], + processedStopHookEventIds: state.gateway?.processedStopHookEventIds ?? [], }; this.#registerRunner(existing); await this.#persist(); @@ -805,6 +826,7 @@ export class DiscordCodexBridge { : gatewayToolsVersion, delegations: state.gateway?.delegations ?? [], pendingWakes: state.gateway?.pendingWakes ?? [], + processedStopHookEventIds: state.gateway?.processedStopHookEventIds ?? [], }; state.sessions.push(session); this.#registerRunner(session); @@ -834,6 +856,19 @@ export class DiscordCodexBridge { : undefined; } + #isGatewayMainThread(threadId: string): boolean { + const session = this.#gatewaySession(); + return Boolean( + (session && session.codexThreadId === threadId) || + this.#requireState().gateway?.mainThreadId === threadId, + ); + } + + #gatewayStopHookSpoolDir(): string { + return this.config.hookSpoolDir ?? + path.join(path.dirname(this.config.statePath), "stop-hooks"); + } + #gatewayDelegations(): DiscordGatewayDelegation[] { const state = this.#requireState(); if (!state.gateway) { @@ -842,6 +877,7 @@ export class DiscordCodexBridge { mainThreadId: this.#gatewaySession()?.codexThreadId, delegations: [], pendingWakes: [], + processedStopHookEventIds: [], }; } state.gateway.delegations ??= []; @@ -856,12 +892,28 @@ export class DiscordCodexBridge { mainThreadId: this.#gatewaySession()?.codexThreadId, delegations: [], pendingWakes: [], + processedStopHookEventIds: [], }; } state.gateway.pendingWakes ??= []; return state.gateway.pendingWakes; } + #gatewayProcessedStopHookEventIds(): string[] { + const state = this.#requireState(); + if (!state.gateway) { + state.gateway = { + homeChannelId: this.config.gateway?.homeChannelId ?? "", + mainThreadId: this.#gatewaySession()?.codexThreadId, + delegations: [], + pendingWakes: [], + processedStopHookEventIds: [], + }; + } + state.gateway.processedStopHookEventIds ??= []; + return state.gateway.processedStopHookEventIds; + } + async #startDelegation(args: Record): Promise { const cwd = requiredArg(args, "cwd"); const title = stringValue(args.title) ?? firstLine(stringValue(args.prompt)) ?? @@ -1028,7 +1080,6 @@ export class DiscordCodexBridge { } async #flushDelegationResults(args: Record): Promise { - await this.#reconcileDelegations(); const groupId = stringValue(args.groupId); const delegations = groupId ? this.#gatewayDelegations().filter((delegation) => delegation.groupId === groupId) @@ -1037,7 +1088,7 @@ export class DiscordCodexBridge { : this.#gatewayDelegations(); const flushed: DiscordGatewayDelegation[] = []; for (const delegation of delegations) { - if (!delegation.lastFinal || !isTerminalDelegation(delegation)) { + if (!isTerminalDelegation(delegation)) { continue; } await this.#recordDelegationResult(delegation); @@ -1086,93 +1137,157 @@ export class DiscordCodexBridge { })); } - #startGatewayReconciler(): void { - if (!this.config.gateway || this.#gatewayReconcileTimer) { + async #startGatewayStopHookSpool(): Promise { + if (!this.config.gateway || this.#gatewayStopHookWatcher) { return; } - const intervalMs = this.config.reconcileIntervalMs ?? defaultGatewayReconcileIntervalMs; - this.#gatewayReconcileTimer = setInterval(() => { - void this.#reconcileDelegations().catch((error) => { - this.#debug("gateway.delegations.reconcile.failed", { - error: errorMessage(error), - }); - }); - }, intervalMs); - void this.#reconcileDelegations().catch((error) => { - this.#debug("gateway.delegations.reconcile.failed", { + const spoolDir = this.#gatewayStopHookSpoolDir(); + await ensureStopHookSpool(spoolDir); + const pendingDir = stopHookSpoolPaths(spoolDir).pending; + this.#gatewayStopHookWatcher = watch(pendingDir, { persistent: false }, () => { + this.#scheduleGatewayStopHookDrain(); + }); + this.#gatewayStopHookWatcher.on("error", (error) => { + this.#debug("gateway.stopHook.watch.failed", { error: errorMessage(error), }); }); + await this.#drainGatewayStopHookSpool(); } - async #reconcileDelegations(): Promise { - const delegations = this.#gatewayDelegations(); - let changed = false; - for (const delegation of delegations) { - if (delegation.status !== "active") { - continue; - } - const response = await this.client.readThread({ - threadId: delegation.codexThreadId, - includeTurns: true, - }); - const turns = Array.isArray(response.thread.turns) ? response.thread.turns : []; - const latest = record(turns[turns.length - 1]); - const latestStatus = stringValue(latest.status); - if (!latestStatus || !isTerminalTurnStatus(latestStatus)) { - continue; - } - const snapshot = threadSnapshotFromThread(response.thread); - delegation.status = latestStatus === "completed" ? "complete" : "failed"; - delegation.lastTurnId = stringValue(latest.id) ?? snapshot.lastFinal?.turnId ?? - delegation.lastTurnId; - delegation.lastStatus = latestStatus; - delegation.lastFinal = snapshot.lastFinal?.text ?? delegation.lastFinal; - delegation.completedAt = this.#now().toISOString(); - delegation.updatedAt = delegation.completedAt; - changed = true; + #scheduleGatewayStopHookDrain(delayMs = stopHookDrainDebounceMs): void { + if (!this.config.gateway) { + return; } - for (const delegation of delegations) { - if (!isTerminalDelegation(delegation) || !delegation.lastFinal) { + if (this.#gatewayStopHookDrainTimer) { + clearTimeout(this.#gatewayStopHookDrainTimer); + } + this.#gatewayStopHookDrainTimer = setTimeout(() => { + this.#gatewayStopHookDrainTimer = undefined; + void this.#drainGatewayStopHookSpool().catch((error) => { + this.#debug("gateway.stopHook.drain.failed", { + error: errorMessage(error), + }); + }); + }, delayMs); + this.#gatewayStopHookDrainTimer.unref?.(); + } + + async #drainGatewayStopHookSpool(): Promise { + const drain = this.#gatewayStopHookDrainChain + .catch(() => undefined) + .then(() => this.#drainGatewayStopHookSpoolOnce()); + this.#gatewayStopHookDrainChain = drain.catch(() => undefined); + await drain; + } + + async #drainGatewayStopHookSpoolOnce(): Promise { + if (!this.config.gateway) { + return; + } + const spoolDir = this.#gatewayStopHookSpoolDir(); + const files = await readPendingStopHookSpoolFiles(spoolDir); + let shouldRetry = false; + for (const file of files) { + if ("error" in file) { + this.#debug("gateway.stopHook.file.invalid", { + fileName: file.fileName, + error: file.error.message, + }); + await archiveStopHookSpoolFile(file, spoolDir, "failed"); continue; } - const mode = delegation.returnMode ?? "manual"; - if (mode === "detached" || mode === "manual") { + const processedIds = this.#gatewayProcessedStopHookEventIds(); + if (processedIds.includes(file.event.id)) { + await archiveStopHookSpoolFile(file, spoolDir, "ignored"); continue; } - await this.#recordDelegationResult(delegation); - await this.#mirrorDelegationResult(delegation); - changed = true; - if (mode === "wake_on_done") { + const result = await this.#handleGatewayStopHookEvent(file.event); + if (result === "retry") { + shouldRetry = true; + continue; + } + processedIds.push(file.event.id); + await this.#persist(); + await archiveStopHookSpoolFile( + file, + spoolDir, + result === "processed" ? "processed" : "ignored", + ); + } + if (shouldRetry) { + this.#scheduleGatewayStopHookDrain(stopHookRetryMs); + } + } + + async #handleGatewayStopHookEvent( + event: DiscordGatewayStopHookEvent, + ): Promise<"processed" | "ignored" | "retry"> { + if (event.eventName !== "Stop") { + return "ignored"; + } + if (this.#isGatewayMainThread(event.sessionId)) { + const started = await this.#processPendingWakes({ + completedThreadId: event.sessionId, + completedTurnId: event.turnId, + }); + return started || !this.#gatewayPendingWakes().some((wake) => !wake.startedAt) + ? "processed" + : "retry"; + } + const delegation = this.#delegationForThread(event.sessionId); + if (!delegation) { + return "ignored"; + } + const completedAt = this.#now().toISOString(); + delegation.status = "complete"; + delegation.lastTurnId = event.turnId ?? delegation.lastTurnId; + delegation.lastStatus = "completed"; + delegation.lastFinal = event.lastAssistantMessage ?? delegation.lastFinal; + delegation.completedAt = completedAt; + delegation.updatedAt = completedAt; + await this.#applyDelegationReturnPolicy(delegation); + await this.#processPendingWakes(); + return "processed"; + } + + async #applyDelegationReturnPolicy( + delegation: DiscordGatewayDelegation, + ): Promise { + if (!isTerminalDelegation(delegation)) { + return; + } + const mode = delegation.returnMode ?? "manual"; + if (mode === "detached" || mode === "manual") { + return; + } + await this.#recordDelegationResult(delegation); + await this.#mirrorDelegationResult(delegation); + if (mode === "wake_on_done") { + this.#enqueueWake({ + kind: "delegation", + delegationIds: [delegation.id], + reason: `Delegation ${delegation.title} completed.`, + }); + } + if (mode === "wake_on_group" && delegation.groupId) { + const group = this.#gatewayDelegations().filter((candidate) => + candidate.groupId === delegation.groupId + ); + if (group.length > 0 && group.every(isTerminalDelegation)) { this.#enqueueWake({ - kind: "delegation", - delegationIds: [delegation.id], - reason: `Delegation ${delegation.title} completed.`, + kind: "group", + groupId: delegation.groupId, + delegationIds: group.map((candidate) => candidate.id), + reason: `Delegation group ${delegation.groupId} completed.`, }); } - if (mode === "wake_on_group" && delegation.groupId) { - const group = delegations.filter((candidate) => - candidate.groupId === delegation.groupId - ); - if (group.length > 0 && group.every(isTerminalDelegation)) { - this.#enqueueWake({ - kind: "group", - groupId: delegation.groupId, - delegationIds: group.map((candidate) => candidate.id), - reason: `Delegation group ${delegation.groupId} completed.`, - }); - } - } - } - await this.#processPendingWakes(); - if (changed) { - await this.#persist(); } } async #recordDelegationResult(delegation: DiscordGatewayDelegation): Promise { const gatewaySession = this.#gatewaySession(); - if (!gatewaySession || delegation.injectedAt || !delegation.lastFinal) { + if (!gatewaySession || delegation.injectedAt) { return; } await this.client.injectThreadItems({ @@ -1196,7 +1311,7 @@ export class DiscordCodexBridge { async #mirrorDelegationResult(delegation: DiscordGatewayDelegation): Promise { const homeChannelId = this.config.gateway?.homeChannelId; - if (!homeChannelId || delegation.mirroredAt || !delegation.lastFinal) { + if (!homeChannelId || delegation.mirroredAt) { return; } await this.transport.sendMessage(homeChannelId, delegationResultText(delegation)); @@ -1232,28 +1347,46 @@ export class DiscordCodexBridge { }); } - async #processPendingWakes(): Promise { + async #processPendingWakes(options: { + completedThreadId?: string; + completedTurnId?: string; + } = {}): Promise { const gatewaySession = this.#gatewaySession(); - if (!gatewaySession || this.#isSessionRunning(gatewaySession, this.#requireState())) { - return; + if ( + !gatewaySession || + this.#isSessionRunning(gatewaySession, this.#requireState(), options) + ) { + return false; } const wake = this.#gatewayPendingWakes().find((candidate) => !candidate.startedAt); if (!wake) { - return; + return false; } const prompt = wakePrompt(wake, this.#gatewayDelegations()); - const turn = await this.client.startTurn({ - threadId: gatewaySession.codexThreadId, - input: [{ type: "text", text: prompt, text_elements: [] }], - cwd: gatewaySession.cwd ?? this.config.cwd ?? null, - model: this.config.model ?? null, - serviceTier: this.config.serviceTier ?? null, - effort: this.config.effort ?? null, - summary: this.config.summary ?? null, - approvalPolicy: this.config.approvalPolicy ?? null, - permissions: this.config.permissions ?? null, - outputSchema: null, - }); + let turn: v2.TurnStartResponse; + try { + turn = await this.client.startTurn({ + threadId: gatewaySession.codexThreadId, + input: [{ type: "text", text: prompt, text_elements: [] }], + cwd: gatewaySession.cwd ?? this.config.cwd ?? null, + model: this.config.model ?? null, + serviceTier: this.config.serviceTier ?? null, + effort: this.config.effort ?? null, + summary: this.config.summary ?? null, + approvalPolicy: this.config.approvalPolicy ?? null, + permissions: this.config.permissions ?? null, + outputSchema: null, + }); + } catch (error) { + if (errorMessage(error).includes("already has an active turn")) { + this.#debug("gateway.wake.deferred.activeTurn", { + wakeId: wake.id, + error: errorMessage(error), + }); + return false; + } + throw error; + } wake.startedAt = this.#now().toISOString(); for (const delegation of this.#gatewayDelegations()) { if (wake.delegationIds.includes(delegation.id)) { @@ -1267,6 +1400,7 @@ export class DiscordCodexBridge { kind: wake.kind, groupId: wake.groupId, }); + return true; } async #flowBackendGet( @@ -1326,11 +1460,19 @@ export class DiscordCodexBridge { #isSessionRunning( session: DiscordBridgeSession, state: DiscordBridgeState, + options: { + completedThreadId?: string; + completedTurnId?: string; + } = {}, ): boolean { const hasActiveTurn = state.activeTurns.some( (active) => active.discordThreadId === session.discordThreadId && - active.codexThreadId === session.codexThreadId, + active.codexThreadId === session.codexThreadId && + !( + active.codexThreadId === options.completedThreadId && + active.turnId === options.completedTurnId + ), ); if (hasActiveTurn) { return true; @@ -1339,7 +1481,11 @@ export class DiscordCodexBridge { (item) => item.discordThreadId === session.discordThreadId && item.codexThreadId === session.codexThreadId && - item.status !== "failed", + item.status !== "failed" && + !( + item.codexThreadId === options.completedThreadId && + item.turnId === options.completedTurnId + ), ); } diff --git a/apps/discord-bridge/src/config.ts b/apps/discord-bridge/src/config.ts index 3e4e51e..b3c59d7 100644 --- a/apps/discord-bridge/src/config.ts +++ b/apps/discord-bridge/src/config.ts @@ -166,6 +166,10 @@ export function parseConfig(argv: string[], env: NodeJS.ProcessEnv): ParsedConfi permissions: permissionsProfile ? { type: "profile", id: permissionsProfile } : undefined, + hookSpoolDir: resolveHomeDir( + stringFlag(args, "hook-spool-dir") ?? + env.CODEX_DISCORD_HOOK_SPOOL_DIR, + ), debug, }, }; @@ -366,6 +370,7 @@ Options: --home-channel-id Enable gateway mode for one Discord home channel --main-thread-id Resume an existing Codex operator thread for gateway mode --flow-backend-url Optional codex-flow-systemd-local backend URL + --hook-spool-dir Directory drained for Codex Stop hook events [dir] Optional Codex thread directory, resolved from home --dir Codex thread directory, resolved from home --cwd Alias for --dir diff --git a/apps/discord-bridge/src/hook-cli.ts b/apps/discord-bridge/src/hook-cli.ts new file mode 100644 index 0000000..891a510 --- /dev/null +++ b/apps/discord-bridge/src/hook-cli.ts @@ -0,0 +1,291 @@ +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { writeStopHookSpoolEvent } from "./stop-hook-spool.ts"; + +const defaultHookCommand = "codex-discord-bridge hook stop"; +const defaultBunxPackage = "codex-discord-bridge"; +const hookStatusMessage = "Recording Discord gateway Stop event"; + +export type HookInstallOptions = { + command?: string; + useBunx?: boolean; + bunxPackage?: string; + configPath?: string; + hooksPath?: string; + dryRun?: boolean; +}; + +export type HookInstallResult = { + command: string; + configPath: string; + hooksPath: string; + dryRun: boolean; +}; + +export async function handleHookCommand(argv: string[]): Promise { + if (argv[0] !== "hook") { + return false; + } + const subcommand = argv[1] ?? "help"; + if (subcommand === "stop") { + await runStopHook(); + return true; + } + if (subcommand === "install") { + const result = await installStopHook(parseInstallArgs(argv.slice(2))); + process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + return true; + } + if (subcommand === "help" || subcommand === "--help" || subcommand === "-h") { + process.stdout.write(hookHelpText()); + return true; + } + throw new Error(`Unknown hook subcommand: ${subcommand}`); +} + +export async function runStopHook(): Promise { + try { + const input = await new Response(Bun.stdin.stream()).text(); + await writeStopHookSpoolEvent(JSON.parse(input)); + process.stdout.write(`${JSON.stringify({ continue: true })}\n`); + } catch (error) { + process.stderr.write(`discord gateway stop hook failed: ${errorMessage(error)}\n`); + process.stdout.write(`${JSON.stringify({ continue: true })}\n`); + } +} + +export async function installStopHook( + options: HookInstallOptions = {}, +): Promise { + const configPath = path.resolve( + expandHome(options.configPath ?? path.join(os.homedir(), ".codex", "config.toml")), + ); + const hooksPath = path.resolve( + expandHome(options.hooksPath ?? path.join(os.homedir(), ".codex", "hooks.json")), + ); + const command = hookCommand(options); + if (!options.dryRun) { + const configText = await readTextIfExists(configPath); + const hooksText = await readTextIfExists(hooksPath); + await mkdir(path.dirname(configPath), { recursive: true }); + await mkdir(path.dirname(hooksPath), { recursive: true }); + await writeFile(configPath, enableHooksFeature(configText)); + await writeFile(hooksPath, `${JSON.stringify(upsertStopHookConfig(hooksText, command), null, 2)}\n`); + } + return { + command, + configPath, + hooksPath, + dryRun: Boolean(options.dryRun), + }; +} + +export function enableHooksFeature(configText: string): string { + const lines = configText.replace(/\s*$/, "").split(/\r?\n/); + if (lines.length === 1 && lines[0] === "") { + return "[features]\nhooks = true\n"; + } + const featureHeaderIndex = lines.findIndex((line) => line.trim() === "[features]"); + if (featureHeaderIndex < 0) { + return `${lines.join("\n")}\n\n[features]\nhooks = true\n`; + } + let insertIndex = featureHeaderIndex + 1; + while (insertIndex < lines.length && !lines[insertIndex]?.trim().startsWith("[")) { + const line = lines[insertIndex] ?? ""; + if (/^\s*hooks\s*=/.test(line)) { + lines[insertIndex] = "hooks = true"; + return `${lines.join("\n")}\n`; + } + insertIndex += 1; + } + lines.splice(featureHeaderIndex + 1, 0, "hooks = true"); + return `${lines.join("\n")}\n`; +} + +export function upsertStopHookConfig( + hooksText: string, + command: string, +): Record { + const config = parseHooksJson(hooksText); + const hooks = record(config.hooks); + const stopGroups = Array.isArray(hooks.Stop) ? hooks.Stop : []; + hooks.Stop = [ + stopHookGroup(command), + ...stopGroups + .map(removeGatewayStopHookHandlers) + .filter((group): group is Record => group !== undefined), + ]; + config.hooks = hooks; + return config; +} + +function parseInstallArgs(argv: string[]): HookInstallOptions { + const options: HookInstallOptions = {}; + for (let index = 0; index < argv.length; index += 1) { + const arg = argv[index]; + if (!arg) { + continue; + } + if (arg === "--dry-run") { + options.dryRun = true; + continue; + } + if (arg === "--bunx") { + options.useBunx = true; + continue; + } + if (arg === "--command") { + options.command = requiredNext(argv, ++index, arg); + continue; + } + if (arg.startsWith("--command=")) { + options.command = arg.slice("--command=".length); + continue; + } + if (arg === "--bunx-package") { + options.useBunx = true; + options.bunxPackage = requiredNext(argv, ++index, arg); + continue; + } + if (arg.startsWith("--bunx-package=")) { + options.useBunx = true; + options.bunxPackage = arg.slice("--bunx-package=".length); + continue; + } + if (arg === "--config-path") { + options.configPath = requiredNext(argv, ++index, arg); + continue; + } + if (arg.startsWith("--config-path=")) { + options.configPath = arg.slice("--config-path=".length); + continue; + } + if (arg === "--hooks-path") { + options.hooksPath = requiredNext(argv, ++index, arg); + continue; + } + if (arg.startsWith("--hooks-path=")) { + options.hooksPath = arg.slice("--hooks-path=".length); + continue; + } + throw new Error(`Unknown hook install option: ${arg}`); + } + return options; +} + +function hookCommand(options: HookInstallOptions): string { + if (options.command && options.useBunx) { + throw new Error("Cannot set both --command and --bunx."); + } + if (options.command) { + return options.command; + } + if (options.useBunx || options.bunxPackage) { + return `bunx --package ${options.bunxPackage ?? defaultBunxPackage} ${defaultHookCommand}`; + } + return defaultHookCommand; +} + +function stopHookGroup(command: string): Record { + return { + hooks: [ + { + type: "command", + command, + timeout: 10, + statusMessage: hookStatusMessage, + }, + ], + }; +} + +function removeGatewayStopHookHandlers(input: unknown): Record | undefined { + const group = record(input); + const handlers = Array.isArray(group.hooks) + ? group.hooks.filter((handler) => !isGatewayStopHookHandler(handler)) + : []; + if (handlers.length === 0) { + return undefined; + } + return { ...group, hooks: handlers }; +} + +function isGatewayStopHookHandler(input: unknown): boolean { + const handler = record(input); + const command = typeof handler.command === "string" ? handler.command : ""; + return command.includes("codex-discord-bridge hook stop") || + command.includes("codex-discord-gateway-stop-hook") || + command.includes("apps/discord-bridge/src/stop-hook.ts"); +} + +async function readTextIfExists(filePath: string): Promise { + try { + return await readFile(filePath, "utf8"); + } catch (error) { + const code = error instanceof Error && "code" in error + ? String((error as NodeJS.ErrnoException).code) + : ""; + if (code === "ENOENT") { + return ""; + } + throw error; + } +} + +function parseHooksJson(text: string): Record { + if (!text.trim()) { + return {}; + } + try { + return record(JSON.parse(text)); + } catch (error) { + throw new Error(`Failed to parse hooks.json: ${errorMessage(error)}`); + } +} + +function expandHome(value: string): string { + if (value === "~") { + return os.homedir(); + } + if (value.startsWith("~/")) { + return path.join(os.homedir(), value.slice(2)); + } + return value; +} + +function requiredNext(argv: string[], index: number, flag: string): string { + const value = argv[index]; + if (!value) { + throw new Error(`${flag} requires a value.`); + } + return value; +} + +function hookHelpText(): string { + return `codex-discord-bridge hook manages the global Codex Stop hook. + +Usage: + codex-discord-bridge hook install [options] + codex-discord-bridge hook stop + +Options: + --command Hook command to write. Defaults to "codex-discord-bridge hook stop". + --bunx Write a bunx command instead of the global binary command. + --bunx-package Package for bunx --package. Defaults to codex-discord-bridge. + --config-path Codex config.toml path. + --hooks-path Codex hooks.json path. + --dry-run Print the planned install result without writing files. +`; +} + +function record(value: unknown): Record { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? value as Record + : {}; +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/apps/discord-bridge/src/index.ts b/apps/discord-bridge/src/index.ts old mode 100644 new mode 100755 index 41174d4..980f598 --- a/apps/discord-bridge/src/index.ts +++ b/apps/discord-bridge/src/index.ts @@ -1,20 +1,17 @@ #!/usr/bin/env bun -import { - CodexAppServerClient, - CodexStdioTransport, -} from "@peezy.tech/codex-flows"; - -import { DiscordCodexBridge } from "./bridge.ts"; -import { createDiscordConsoleOutput } from "./console-output.ts"; +import type { DiscordCodexBridge } from "./bridge.ts"; +import { handleHookCommand } from "./hook-cli.ts"; import { parseConfig } from "./config.ts"; -import { DiscordJsBridgeTransport } from "./discord-transport.ts"; import { createDiscordBridgeLogger } from "./logger.ts"; -import { JsonFileStateStore } from "./state.ts"; async function main(): Promise { let logger = createDiscordBridgeLogger(); try { - const parsed = parseConfig(Bun.argv.slice(2), process.env); + const argv = Bun.argv.slice(2); + if (await handleHookCommand(argv)) { + return; + } + const parsed = parseConfig(argv, process.env); if (parsed.type === "help") { process.stdout.write(parsed.text); return; @@ -23,6 +20,13 @@ async function main(): Promise { debug: parsed.config.debug, logLevel: parsed.config.logLevel, }); + const { CodexAppServerClient, CodexStdioTransport } = await import( + "@peezy.tech/codex-flows" + ); + const { DiscordCodexBridge } = await import("./bridge.ts"); + const { createDiscordConsoleOutput } = await import("./console-output.ts"); + const { DiscordJsBridgeTransport } = await import("./discord-transport.ts"); + const { JsonFileStateStore } = await import("./state.ts"); const consoleOutput = parsed.config.consoleOutput === "messages" ? createDiscordConsoleOutput() : undefined; diff --git a/apps/discord-bridge/src/state.ts b/apps/discord-bridge/src/state.ts index d3f0cd4..8082adf 100644 --- a/apps/discord-bridge/src/state.ts +++ b/apps/discord-bridge/src/state.ts @@ -15,6 +15,7 @@ import type { const maxProcessedMessageIds = 1000; const maxDeliveries = 500; +const maxProcessedStopHookEventIds = 2000; export class JsonFileStateStore implements DiscordBridgeStateStore { readonly path: string; @@ -74,6 +75,12 @@ export function trimState(state: DiscordBridgeState): void { -maxProcessedMessageIds, ); state.deliveries = state.deliveries.slice(-maxDeliveries); + if (state.gateway?.processedStopHookEventIds) { + state.gateway.processedStopHookEventIds = + state.gateway.processedStopHookEventIds.slice( + -maxProcessedStopHookEventIds, + ); + } } function parseState(value: unknown): DiscordBridgeState { @@ -120,6 +127,9 @@ function parseGateway(value: unknown): DiscordGatewayState | undefined { pendingWakes: Array.isArray(value.pendingWakes) ? value.pendingWakes.map(parseGatewayPendingWake) : [], + processedStopHookEventIds: Array.isArray(value.processedStopHookEventIds) + ? uniqueStrings(value.processedStopHookEventIds) + : [], }; } diff --git a/apps/discord-bridge/src/stop-hook-spool.ts b/apps/discord-bridge/src/stop-hook-spool.ts new file mode 100644 index 0000000..f9839eb --- /dev/null +++ b/apps/discord-bridge/src/stop-hook-spool.ts @@ -0,0 +1,239 @@ +import { createHash, randomUUID } from "node:crypto"; +import { + mkdir, + readdir, + readFile, + rename, + rm, + writeFile, +} from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import type { DiscordGatewayStopHookEvent } from "./types.ts"; + +export type StopHookSpoolDisposition = "processed" | "ignored" | "failed"; + +export type PendingStopHookSpoolFile = + | { + filePath: string; + fileName: string; + event: DiscordGatewayStopHookEvent; + } + | { + filePath: string; + fileName: string; + error: Error; + }; + +export function defaultStopHookSpoolDir(): string { + return path.join(os.homedir(), ".codex", "discord-bridge", "stop-hooks"); +} + +export function stopHookSpoolDirFromEnv( + env: NodeJS.ProcessEnv = process.env, +): string { + return env.CODEX_DISCORD_HOOK_SPOOL_DIR || defaultStopHookSpoolDir(); +} + +export function stopHookSpoolPaths(spoolDir: string): Record< + "pending" | StopHookSpoolDisposition, + string +> { + const root = path.resolve(spoolDir); + return { + pending: path.join(root, "pending"), + processed: path.join(root, "processed"), + ignored: path.join(root, "ignored"), + failed: path.join(root, "failed"), + }; +} + +export async function ensureStopHookSpool(spoolDir: string): Promise { + const paths = stopHookSpoolPaths(spoolDir); + await Promise.all(Object.values(paths).map((dir) => mkdir(dir, { recursive: true }))); +} + +export async function writeStopHookSpoolEvent( + input: unknown, + options: { + spoolDir?: string; + now?: () => Date; + } = {}, +): Promise { + const spoolDir = options.spoolDir ?? stopHookSpoolDirFromEnv(); + const event = stopHookEventFromInput(input, options.now ?? (() => new Date())); + const paths = stopHookSpoolPaths(spoolDir); + await mkdir(paths.pending, { recursive: true }); + const fileName = `${event.id}.json`; + const finalPath = path.join(paths.pending, fileName); + const tempPath = path.join( + paths.pending, + `.${fileName}.${process.pid}.${Date.now()}.${randomUUID()}.tmp`, + ); + await writeFile(tempPath, `${JSON.stringify(event, null, 2)}\n`); + await rename(tempPath, finalPath); + return event; +} + +export async function readPendingStopHookSpoolFiles( + spoolDir: string, +): Promise { + const paths = stopHookSpoolPaths(spoolDir); + await ensureStopHookSpool(spoolDir); + const fileNames = (await readdir(paths.pending)) + .filter((fileName) => fileName.endsWith(".json")) + .sort(); + const files: PendingStopHookSpoolFile[] = []; + for (const fileName of fileNames) { + const filePath = path.join(paths.pending, fileName); + try { + const parsed = JSON.parse(await readFile(filePath, "utf8")) as unknown; + files.push({ + filePath, + fileName, + event: parseStopHookSpoolEvent(parsed), + }); + } catch (error) { + files.push({ + filePath, + fileName, + error: error instanceof Error ? error : new Error(String(error)), + }); + } + } + return files; +} + +export async function archiveStopHookSpoolFile( + file: Pick, + spoolDir: string, + disposition: StopHookSpoolDisposition, +): Promise { + const paths = stopHookSpoolPaths(spoolDir); + await mkdir(paths[disposition], { recursive: true }); + const target = path.join( + paths[disposition], + `${Date.now()}-${randomUUID()}-${file.fileName}`, + ); + try { + await rename(file.filePath, target); + } catch (error) { + const code = error instanceof Error && "code" in error + ? String((error as NodeJS.ErrnoException).code) + : ""; + if (code === "ENOENT") { + return; + } + throw error; + } +} + +export async function removeStopHookSpool(spoolDir: string): Promise { + await rm(path.resolve(spoolDir), { recursive: true, force: true }); +} + +function stopHookEventFromInput( + input: unknown, + now: () => Date, +): DiscordGatewayStopHookEvent { + const parsed = record(input); + const eventName = stringValue(parsed.hook_event_name) ?? stringValue(parsed.eventName); + if (eventName && eventName !== "Stop") { + throw new Error(`Unsupported hook event: ${eventName}`); + } + const sessionId = stringValue(parsed.session_id) ?? stringValue(parsed.sessionId); + if (!sessionId) { + throw new Error("Stop hook input is missing session_id"); + } + const turnId = stringValue(parsed.turn_id) ?? stringValue(parsed.turnId); + const transcriptPath = + stringValue(parsed.transcript_path) ?? stringValue(parsed.transcriptPath); + const cwd = stringValue(parsed.cwd); + const lastAssistantMessage = + nullableString(parsed.last_assistant_message) ?? + nullableString(parsed.lastAssistantMessage); + const stopHookActive = + typeof parsed.stop_hook_active === "boolean" + ? parsed.stop_hook_active + : typeof parsed.stopHookActive === "boolean" + ? parsed.stopHookActive + : undefined; + const id = stopHookEventId({ + sessionId, + turnId, + transcriptPath, + cwd, + }); + return { + version: 1, + id, + eventName: "Stop", + sessionId, + turnId, + cwd, + transcriptPath, + lastAssistantMessage, + stopHookActive, + createdAt: now().toISOString(), + }; +} + +function parseStopHookSpoolEvent(input: unknown): DiscordGatewayStopHookEvent { + const parsed = record(input); + if (parsed.version !== 1) { + throw new Error("Invalid stop hook event version"); + } + const eventName = stringValue(parsed.eventName); + const id = stringValue(parsed.id); + const sessionId = stringValue(parsed.sessionId); + const createdAt = stringValue(parsed.createdAt); + if (eventName !== "Stop" || !id || !sessionId || !createdAt) { + throw new Error("Invalid stop hook event"); + } + return { + version: 1, + id, + eventName, + sessionId, + turnId: stringValue(parsed.turnId), + cwd: stringValue(parsed.cwd), + transcriptPath: stringValue(parsed.transcriptPath), + lastAssistantMessage: nullableString(parsed.lastAssistantMessage), + stopHookActive: typeof parsed.stopHookActive === "boolean" + ? parsed.stopHookActive + : undefined, + createdAt, + }; +} + +function stopHookEventId(input: { + sessionId: string; + turnId?: string; + transcriptPath?: string; + cwd?: string; +}): string { + const identity = input.turnId + ? { eventName: "Stop", sessionId: input.sessionId, turnId: input.turnId } + : { + eventName: "Stop", + sessionId: input.sessionId, + transcriptPath: input.transcriptPath, + cwd: input.cwd, + }; + return `stop-${createHash("sha256").update(JSON.stringify(identity)).digest("hex").slice(0, 24)}`; +} + +function record(value: unknown): Record { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? value as Record + : {}; +} + +function stringValue(value: unknown): string | undefined { + return typeof value === "string" && value.length > 0 ? value : undefined; +} + +function nullableString(value: unknown): string | undefined { + return typeof value === "string" && value.length > 0 ? value : undefined; +} diff --git a/apps/discord-bridge/src/stop-hook.ts b/apps/discord-bridge/src/stop-hook.ts new file mode 100644 index 0000000..8e7c9e8 --- /dev/null +++ b/apps/discord-bridge/src/stop-hook.ts @@ -0,0 +1,4 @@ +#!/usr/bin/env bun +import { runStopHook } from "./hook-cli.ts"; + +await runStopHook(); diff --git a/apps/discord-bridge/src/types.ts b/apps/discord-bridge/src/types.ts index 5037b9d..4994643 100644 --- a/apps/discord-bridge/src/types.ts +++ b/apps/discord-bridge/src/types.ts @@ -23,6 +23,7 @@ export type DiscordBridgeConfig = { permissions?: v2.PermissionProfileSelectionParams; typingIntervalMs?: number; reconcileIntervalMs?: number; + hookSpoolDir?: string; progressMode?: DiscordProgressMode; consoleOutput?: DiscordConsoleOutputMode; logLevel?: DiscordBridgeLogLevelSetting; @@ -153,6 +154,7 @@ export type DiscordGatewayState = { toolsVersion?: number; delegations: DiscordGatewayDelegation[]; pendingWakes?: DiscordGatewayPendingWake[]; + processedStopHookEventIds?: string[]; }; export type DiscordGatewayDelegationReturnMode = @@ -193,6 +195,19 @@ export type DiscordGatewayPendingWake = { startedAt?: string; }; +export type DiscordGatewayStopHookEvent = { + version: 1; + id: string; + eventName: "Stop"; + sessionId: string; + turnId?: string; + cwd?: string; + transcriptPath?: string; + lastAssistantMessage?: string; + stopHookActive?: boolean; + createdAt: string; +}; + export type DiscordBridgeSession = { discordThreadId: string; parentChannelId: string; diff --git a/apps/discord-bridge/test/bridge.test.ts b/apps/discord-bridge/test/bridge.test.ts index a747f6a..a8d32cd 100644 --- a/apps/discord-bridge/test/bridge.test.ts +++ b/apps/discord-bridge/test/bridge.test.ts @@ -1,3 +1,4 @@ +import { mkdtemp, rm } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { describe, expect, test } from "bun:test"; @@ -10,6 +11,7 @@ import type { DiscordConsoleOutput, } from "../src/console-output.ts"; import { MemoryStateStore, emptyState } from "../src/state.ts"; +import { writeStopHookSpoolEvent } from "../src/stop-hook-spool.ts"; import type { CodexBridgeClient, DiscordBridgeConfig, @@ -225,6 +227,7 @@ describe("DiscordCodexBridge", () => { }); test("gateway records group delegation results and wakes after the group finishes", async () => { + const hookSpoolDir = await testHookSpoolDir(); const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); const bridge = new DiscordCodexBridge({ @@ -233,116 +236,134 @@ describe("DiscordCodexBridge", () => { store: new MemoryStateStore(), config: testConfig({ gateway: { homeChannelId: "home-channel" }, - reconcileIntervalMs: 10, + hookSpoolDir, }), now: () => new Date("2026-05-14T12:00:00.000Z"), }); - await bridge.start(); - await waitFor(() => bridge.stateForTest().sessions.length === 1); - for (const [index, title] of ["Workspace A", "Workspace B"].entries()) { + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + for (const [index, title] of ["Workspace A", "Workspace B"].entries()) { + client.emitRequest({ + id: `tool-${index}`, + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: `/workspace/${index}`, + title, + prompt: `Inspect ${title}.`, + groupId: "fanout", + }, + }, + }); + await waitFor(() => client.responses.length === index + 1); + } + + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Result A.", + }); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(2); + expect(client.readThreadCalls).toEqual([]); + expect(transport.messages.some((message) => + message.channelId === "home-channel" && + message.text.includes("Result A.") + )).toBe(true); + + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-3", + turnId: "turn-2", + lastAssistantMessage: "Result B.", + }); + await waitFor(() => client.startTurnCalls.length === 3); + expect(client.injectThreadItemsCalls).toHaveLength(2); + expect(client.startTurnCalls[2]).toEqual( + expect.objectContaining({ + threadId: "codex-thread-1", + }), + ); + expect(inputText(client.startTurnCalls[2]?.input[0])).toContain( + "Delegation group fanout completed.", + ); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( + expect.objectContaining({ + kind: "group", + groupId: "fanout", + startedAt: "2026-05-14T12:00:00.000Z", + }), + ); + await sleep(30); + expect(client.startTurnCalls).toHaveLength(3); + expect(bridge.stateForTest().gateway?.pendingWakes).toHaveLength(1); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } + }); + + test("gateway detached delegations complete without injecting or waking the main thread", async () => { + const hookSpoolDir = await testHookSpoolDir(); + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + hookSpoolDir, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); client.emitRequest({ - id: `tool-${index}`, + id: "tool-1", method: "item/tool/call", params: { threadId: "codex-thread-1", namespace: "codex_gateway", tool: "start_delegation", arguments: { - cwd: `/workspace/${index}`, - title, - prompt: `Inspect ${title}.`, - groupId: "fanout", + cwd: "/workspace/detached", + title: "Detached workspace", + prompt: "Prepare this for a human.", + returnMode: "detached", }, }, }); - await waitFor(() => client.responses.length === index + 1); + + await waitFor(() => client.responses.length === 1); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Detached result.", + }); + await waitFor(() => + bridge.stateForTest().gateway?.delegations[0]?.status === "complete" + ); + expect(client.injectThreadItemsCalls).toEqual([]); + expect(client.startTurnCalls).toHaveLength(1); + expect(client.readThreadCalls).toEqual([]); + expect(transport.messages.some((message) => + message.text.includes("Detached result.") + )).toBe(false); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); } - - client.threadTurns.set("codex-thread-2", [ - completedTurn("turn-1", "Result A."), - ]); - await waitFor(() => client.injectThreadItemsCalls.length === 1); - expect(client.startTurnCalls).toHaveLength(2); - expect(transport.messages.some((message) => - message.channelId === "home-channel" && - message.text.includes("Result A.") - )).toBe(true); - - client.threadTurns.set("codex-thread-3", [ - completedTurn("turn-2", "Result B."), - ]); - await waitFor(() => client.startTurnCalls.length === 3); - expect(client.injectThreadItemsCalls).toHaveLength(2); - expect(client.startTurnCalls[2]).toEqual( - expect.objectContaining({ - threadId: "codex-thread-1", - }), - ); - expect(inputText(client.startTurnCalls[2]?.input[0])).toContain( - "Delegation group fanout completed.", - ); - expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( - expect.objectContaining({ - kind: "group", - groupId: "fanout", - startedAt: "2026-05-14T12:00:00.000Z", - }), - ); - await sleep(30); - expect(client.startTurnCalls).toHaveLength(3); - expect(bridge.stateForTest().gateway?.pendingWakes).toHaveLength(1); - await bridge.stop(); - }); - - test("gateway detached delegations complete without injecting or waking the main thread", async () => { - const client = new FakeCodexClient(); - const transport = new FakeDiscordTransport(); - const bridge = new DiscordCodexBridge({ - client, - transport, - store: new MemoryStateStore(), - config: testConfig({ - gateway: { homeChannelId: "home-channel" }, - reconcileIntervalMs: 10, - }), - now: () => new Date("2026-05-14T12:00:00.000Z"), - }); - - await bridge.start(); - await waitFor(() => bridge.stateForTest().sessions.length === 1); - client.emitRequest({ - id: "tool-1", - method: "item/tool/call", - params: { - threadId: "codex-thread-1", - namespace: "codex_gateway", - tool: "start_delegation", - arguments: { - cwd: "/workspace/detached", - title: "Detached workspace", - prompt: "Prepare this for a human.", - returnMode: "detached", - }, - }, - }); - - await waitFor(() => client.responses.length === 1); - client.threadTurns.set("codex-thread-2", [ - completedTurn("turn-1", "Detached result."), - ]); - await waitFor(() => - bridge.stateForTest().gateway?.delegations[0]?.status === "complete" - ); - expect(client.injectThreadItemsCalls).toEqual([]); - expect(client.startTurnCalls).toHaveLength(1); - expect(transport.messages.some((message) => - message.text.includes("Detached result.") - )).toBe(false); - await bridge.stop(); }); test("gateway queues delegation wake while the main operator thread is busy", async () => { + const hookSpoolDir = await testHookSpoolDir(); const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); const bridge = new DiscordCodexBridge({ @@ -351,59 +372,77 @@ describe("DiscordCodexBridge", () => { store: new MemoryStateStore(), config: testConfig({ gateway: { homeChannelId: "home-channel" }, - reconcileIntervalMs: 10, + hookSpoolDir, }), now: () => new Date("2026-05-14T12:00:00.000Z"), }); - await bridge.start(); - await waitFor(() => bridge.stateForTest().sessions.length === 1); - transport.emit({ - kind: "message", - channelId: "home-channel", - messageId: "home-message-1", - author: { id: "user-1", name: "Peezy", isBot: false }, - content: "work on a long-running main task", - createdAt: "2026-05-14T12:00:00.000Z", - }); - await waitFor(() => bridge.stateForTest().activeTurns.length === 1); + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + transport.emit({ + kind: "message", + channelId: "home-channel", + messageId: "home-message-1", + author: { id: "user-1", name: "Peezy", isBot: false }, + content: "work on a long-running main task", + createdAt: "2026-05-14T12:00:00.000Z", + }); + await waitFor(() => bridge.stateForTest().activeTurns.length === 1); - client.emitRequest({ - id: "tool-1", - method: "item/tool/call", - params: { - threadId: "codex-thread-1", - namespace: "codex_gateway", - tool: "start_delegation", - arguments: { - cwd: "/workspace/side", - title: "Side task", - prompt: "Finish this side task.", + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/side", + title: "Side task", + prompt: "Finish this side task.", + }, }, - }, - }); + }); - await waitFor(() => client.responses.length === 1); - client.threadTurns.set("codex-thread-2", [ - completedTurn("turn-2", "Side task result."), - ]); - await waitFor(() => client.injectThreadItemsCalls.length === 1); - expect(client.startTurnCalls).toHaveLength(2); - expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( - expect.objectContaining({ - kind: "delegation", - }), - ); - expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).not.toHaveProperty( - "startedAt", - ); - await sleep(30); - expect(client.startTurnCalls).toHaveLength(2); - expect(bridge.stateForTest().gateway?.pendingWakes).toHaveLength(1); - await bridge.stop(); + await waitFor(() => client.responses.length === 1); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-2", + lastAssistantMessage: "Side task result.", + }); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(2); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( + expect.objectContaining({ + kind: "delegation", + }), + ); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).not.toHaveProperty( + "startedAt", + ); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-1", + turnId: "turn-1", + lastAssistantMessage: "Main task paused.", + }); + await waitFor(() => client.startTurnCalls.length === 3); + expect(inputText(client.startTurnCalls[2]?.input[0])).toContain( + "Delegation Side task completed.", + ); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( + expect.objectContaining({ + startedAt: "2026-05-14T12:00:00.000Z", + }), + ); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } }); test("gateway record-only delegations inject and mirror without waking", async () => { + const hookSpoolDir = await testHookSpoolDir(); const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); const bridge = new DiscordCodexBridge({ @@ -412,43 +451,184 @@ describe("DiscordCodexBridge", () => { store: new MemoryStateStore(), config: testConfig({ gateway: { homeChannelId: "home-channel" }, - reconcileIntervalMs: 10, + hookSpoolDir, }), now: () => new Date("2026-05-14T12:00:00.000Z"), }); - await bridge.start(); - await waitFor(() => bridge.stateForTest().sessions.length === 1); - client.emitRequest({ - id: "tool-1", - method: "item/tool/call", - params: { - threadId: "codex-thread-1", - namespace: "codex_gateway", - tool: "start_delegation", - arguments: { - cwd: "/workspace/record", - title: "Record only task", - prompt: "Record this result.", - returnMode: "record_only", + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/record", + title: "Record only task", + prompt: "Record this result.", + returnMode: "record_only", + }, }, + }); + + await waitFor(() => client.responses.length === 1); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Record-only result.", + }); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(1); + expect(bridge.stateForTest().gateway?.pendingWakes ?? []).toEqual([]); + expect(transport.messages.some((message) => + message.text.includes("Record-only result.") + )).toBe(true); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } + }); + + test("gateway drains queued stop hook events on startup", async () => { + const hookSpoolDir = await testHookSpoolDir(); + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const store = new MemoryStateStore({ + ...emptyState(), + gateway: { + homeChannelId: "home-channel", + mainThreadId: "codex-thread-1", + toolsVersion: 1, + delegations: [ + { + id: "delegation-queued", + codexThreadId: "codex-thread-2", + title: "Queued event task", + status: "active", + cwd: "/workspace/queued", + returnMode: "record_only", + createdAt: "2026-05-14T11:59:00.000Z", + updatedAt: "2026-05-14T11:59:00.000Z", + }, + ], + pendingWakes: [], + processedStopHookEventIds: [], }, + sessions: [ + { + discordThreadId: "home-channel", + parentChannelId: "home-channel", + codexThreadId: "codex-thread-1", + title: "Codex Gateway", + createdAt: "2026-05-14T11:59:00.000Z", + cwd: "/workspace", + mode: "gateway", + }, + ], + }); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-queued", + lastAssistantMessage: "Queued result.", + cwd: "/workspace/queued", + }); + const bridge = new DiscordCodexBridge({ + client, + transport, + store, + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + hookSpoolDir, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), }); - await waitFor(() => client.responses.length === 1); - client.threadTurns.set("codex-thread-2", [ - completedTurn("turn-1", "Record-only result."), - ]); - await waitFor(() => client.injectThreadItemsCalls.length === 1); - expect(client.startTurnCalls).toHaveLength(1); - expect(bridge.stateForTest().gateway?.pendingWakes ?? []).toEqual([]); - expect(transport.messages.some((message) => - message.text.includes("Record-only result.") - )).toBe(true); - await bridge.stop(); + try { + await bridge.start(); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.readThreadCalls).toEqual([]); + expect(bridge.stateForTest().gateway?.delegations[0]).toEqual( + expect.objectContaining({ + status: "complete", + lastTurnId: "turn-queued", + lastFinal: "Queued result.", + injectedAt: "2026-05-14T12:00:00.000Z", + }), + ); + expect(transport.messages.some((message) => + message.text.includes("Queued result.") + )).toBe(true); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } + }); + + test("gateway stop hook events are idempotent", async () => { + const hookSpoolDir = await testHookSpoolDir(); + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + hookSpoolDir, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/idempotent", + title: "Idempotent task", + prompt: "Return once.", + returnMode: "record_only", + }, + }, + }); + await waitFor(() => client.responses.length === 1); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Exactly once.", + }); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Duplicate with changed text.", + }); + await sleep(200); + expect(client.injectThreadItemsCalls).toHaveLength(1); + expect(transport.messages.filter((message) => + message.text.includes("Exactly once.") + )).toHaveLength(1); + expect( + bridge.stateForTest().gateway?.processedStopHookEventIds, + ).toHaveLength(1); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } }); test("gateway manually flushes completed manual delegation results", async () => { + const hookSpoolDir = await testHookSpoolDir(); const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); const bridge = new DiscordCodexBridge({ @@ -457,57 +637,63 @@ describe("DiscordCodexBridge", () => { store: new MemoryStateStore(), config: testConfig({ gateway: { homeChannelId: "home-channel" }, - reconcileIntervalMs: 10, + hookSpoolDir, }), now: () => new Date("2026-05-14T12:00:00.000Z"), }); - await bridge.start(); - await waitFor(() => bridge.stateForTest().sessions.length === 1); - client.emitRequest({ - id: "tool-1", - method: "item/tool/call", - params: { - threadId: "codex-thread-1", - namespace: "codex_gateway", - tool: "start_delegation", - arguments: { - cwd: "/workspace/manual", - title: "Manual task", - prompt: "Finish manually.", - returnMode: "manual", + try { + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/manual", + title: "Manual task", + prompt: "Finish manually.", + returnMode: "manual", + }, }, - }, - }); + }); - await waitFor(() => client.responses.length === 1); - client.threadTurns.set("codex-thread-2", [ - completedTurn("turn-1", "Manual result."), - ]); - await waitFor(() => - bridge.stateForTest().gateway?.delegations[0]?.status === "complete" - ); - expect(client.injectThreadItemsCalls).toEqual([]); - client.emitRequest({ - id: "tool-2", - method: "item/tool/call", - params: { - threadId: "codex-thread-1", - namespace: "codex_gateway", - tool: "flush_delegation_results", - arguments: { - delegationId: bridge.stateForTest().gateway?.delegations[0]?.id, - wake: "false", + await waitFor(() => client.responses.length === 1); + await emitStopHook(hookSpoolDir, { + sessionId: "codex-thread-2", + turnId: "turn-1", + lastAssistantMessage: "Manual result.", + }); + await waitFor(() => + bridge.stateForTest().gateway?.delegations[0]?.status === "complete" + ); + expect(client.injectThreadItemsCalls).toEqual([]); + client.emitRequest({ + id: "tool-2", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "flush_delegation_results", + arguments: { + delegationId: bridge.stateForTest().gateway?.delegations[0]?.id, + wake: "false", + }, }, - }, - }); + }); - await waitFor(() => client.injectThreadItemsCalls.length === 1); - expect(client.startTurnCalls).toHaveLength(1); - expect(transport.messages.some((message) => - message.text.includes("Manual result.") - )).toBe(true); - await bridge.stop(); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(1); + expect(transport.messages.some((message) => + message.text.includes("Manual result.") + )).toBe(true); + } finally { + await bridge.stop(); + await rm(hookSpoolDir, { recursive: true, force: true }); + } }); test("answers gateway status in the home channel without starting a turn", async () => { @@ -2914,6 +3100,35 @@ describe("DiscordCodexBridge", () => { }); }); +async function testHookSpoolDir(): Promise { + return await mkdtemp(path.join(os.tmpdir(), "discord-bridge-hooks-")); +} + +async function emitStopHook( + spoolDir: string, + input: { + sessionId: string; + turnId: string; + lastAssistantMessage?: string; + cwd?: string; + }, +): Promise { + await writeStopHookSpoolEvent( + { + hook_event_name: "Stop", + session_id: input.sessionId, + turn_id: input.turnId, + cwd: input.cwd ?? "/workspace", + transcript_path: `/tmp/${input.sessionId}.jsonl`, + last_assistant_message: input.lastAssistantMessage ?? null, + }, + { + spoolDir, + now: () => new Date("2026-05-14T12:00:00.000Z"), + }, + ); +} + function testConfig( overrides: Partial = {}, ): DiscordBridgeConfig { @@ -3255,22 +3470,6 @@ function gatewayToolResult(value: unknown): unknown { return text ? JSON.parse(text) : undefined; } -function completedTurn(id: string, text: string): v2.Turn { - return { - id, - status: "completed", - items: [ - { - type: "agentMessage", - id: `${id}-message`, - text, - phase: "final_answer", - memoryCitation: null, - }, - ], - } as unknown as v2.Turn; -} - function statusMessageText(transport: FakeDiscordTransport): string { return transport.messages.find((message) => message.id === "message-out-1") ?.text ?? ""; diff --git a/apps/discord-bridge/test/hook-cli.test.ts b/apps/discord-bridge/test/hook-cli.test.ts new file mode 100644 index 0000000..db95b10 --- /dev/null +++ b/apps/discord-bridge/test/hook-cli.test.ts @@ -0,0 +1,122 @@ +import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, test } from "bun:test"; + +import { + enableHooksFeature, + installStopHook, + upsertStopHookConfig, +} from "../src/hook-cli.ts"; + +describe("discord gateway hook CLI", () => { + test("enables the current hooks feature in config.toml", () => { + expect(enableHooksFeature("model = \"gpt-5\"\n")).toBe( + "model = \"gpt-5\"\n\n[features]\nhooks = true\n", + ); + expect(enableHooksFeature("[features]\ngoals = true\n")).toBe( + "[features]\nhooks = true\ngoals = true\n", + ); + expect(enableHooksFeature("[features]\nhooks = false\ngoals = true\n")).toBe( + "[features]\nhooks = true\ngoals = true\n", + ); + }); + + test("upserts package-bin Stop hook while preserving unrelated hooks", () => { + const updated = upsertStopHookConfig( + JSON.stringify({ + hooks: { + PreToolUse: [ + { + matcher: "Bash", + hooks: [{ type: "command", command: "echo pre" }], + }, + ], + Stop: [ + { + hooks: [ + { + type: "command", + command: + "bun /home/peezy/codex-fork-workspace/codex-flows/apps/discord-bridge/src/stop-hook.ts", + }, + { type: "command", command: "echo other-stop" }, + ], + }, + ], + }, + }), + "codex-discord-bridge hook stop", + ); + + expect(updated).toEqual({ + hooks: { + PreToolUse: [ + { + matcher: "Bash", + hooks: [{ type: "command", command: "echo pre" }], + }, + ], + Stop: [ + { + hooks: [ + { + type: "command", + command: "codex-discord-bridge hook stop", + timeout: 10, + statusMessage: "Recording Discord gateway Stop event", + }, + ], + }, + { + hooks: [{ type: "command", command: "echo other-stop" }], + }, + ], + }, + }); + }); + + test("install writes config and hooks files", async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), "discord-hook-cli-")); + try { + const configPath = path.join(dir, "config.toml"); + const hooksPath = path.join(dir, "hooks.json"); + await writeFile(configPath, "[features]\ngoals = true\n"); + const result = await installStopHook({ + configPath, + hooksPath, + useBunx: true, + bunxPackage: "@peezy.tech/codex-flows", + }); + + expect(result).toEqual({ + command: + "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook stop", + configPath, + hooksPath, + dryRun: false, + }); + expect(await readFile(configPath, "utf8")).toBe( + "[features]\nhooks = true\ngoals = true\n", + ); + expect(JSON.parse(await readFile(hooksPath, "utf8"))).toEqual( + expect.objectContaining({ + hooks: expect.objectContaining({ + Stop: [ + { + hooks: [ + expect.objectContaining({ + command: + "bunx --package @peezy.tech/codex-flows codex-discord-bridge hook stop", + }), + ], + }, + ], + }), + }), + ); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/apps/discord-bridge/test/state.test.ts b/apps/discord-bridge/test/state.test.ts index 17857f0..c7bbccc 100644 --- a/apps/discord-bridge/test/state.test.ts +++ b/apps/discord-bridge/test/state.test.ts @@ -43,6 +43,12 @@ describe("JsonFileStateStore", () => { createdAt: "2026-05-11T00:00:03.000Z", }, ], + processedStopHookEventIds: [ + "stop-1", + "", + "stop-1", + "stop-2", + ], }, sessions: [ { @@ -121,6 +127,7 @@ describe("JsonFileStateStore", () => { createdAt: "2026-05-11T00:00:03.000Z", }, ], + processedStopHookEventIds: ["stop-1", "stop-2"], }); expect(state.sessions).toHaveLength(2); expect(state.sessions[0]?.ownerUserId).toBe("user-1"); diff --git a/apps/discord-bridge/test/stop-hook-spool.test.ts b/apps/discord-bridge/test/stop-hook-spool.test.ts new file mode 100644 index 0000000..2ed1030 --- /dev/null +++ b/apps/discord-bridge/test/stop-hook-spool.test.ts @@ -0,0 +1,83 @@ +import { mkdtemp, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, test } from "bun:test"; + +import { + archiveStopHookSpoolFile, + readPendingStopHookSpoolFiles, + writeStopHookSpoolEvent, +} from "../src/stop-hook-spool.ts"; + +describe("stop hook spool", () => { + test("writes stable Stop events into the pending spool", async () => { + const spoolDir = await mkdtemp(path.join(os.tmpdir(), "stop-hook-spool-")); + try { + const input = { + hook_event_name: "Stop", + session_id: "session-1", + turn_id: "turn-1", + cwd: "/workspace", + transcript_path: "/tmp/session.jsonl", + last_assistant_message: "Finished.", + stop_hook_active: false, + }; + + const first = await writeStopHookSpoolEvent(input, { + spoolDir, + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + const second = await writeStopHookSpoolEvent(input, { + spoolDir, + now: () => new Date("2026-05-14T12:01:00.000Z"), + }); + const third = await writeStopHookSpoolEvent( + { ...input, last_assistant_message: "Finished again." }, + { + spoolDir, + now: () => new Date("2026-05-14T12:02:00.000Z"), + }, + ); + + expect(second.id).toBe(first.id); + expect(third.id).toBe(first.id); + const pending = await readPendingStopHookSpoolFiles(spoolDir); + expect(pending).toHaveLength(1); + expect(pending[0]).toEqual( + expect.objectContaining({ + event: expect.objectContaining({ + id: first.id, + eventName: "Stop", + sessionId: "session-1", + turnId: "turn-1", + lastAssistantMessage: "Finished again.", + stopHookActive: false, + }), + }), + ); + } finally { + await rm(spoolDir, { recursive: true, force: true }); + } + }); + + test("archives processed files out of pending", async () => { + const spoolDir = await mkdtemp(path.join(os.tmpdir(), "stop-hook-spool-")); + try { + await writeStopHookSpoolEvent( + { + hook_event_name: "Stop", + session_id: "session-1", + turn_id: "turn-1", + }, + { spoolDir }, + ); + const [file] = await readPendingStopHookSpoolFiles(spoolDir); + expect(file).toBeDefined(); + await archiveStopHookSpoolFile(file!, spoolDir, "processed"); + + expect(await readPendingStopHookSpoolFiles(spoolDir)).toEqual([]); + } finally { + await rm(spoolDir, { recursive: true, force: true }); + } + }); +});