Add hook-spooled Discord gateway returns
Some checks failed
ci / check (push) Failing after 16s

This commit is contained in:
matamune 2026-05-14 17:41:10 +00:00
parent 2196f0de9b
commit 84400f8689
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
13 changed files with 1511 additions and 327 deletions

View file

@ -16,6 +16,7 @@ CODEX_DISCORD_MAIN_THREAD_ID=019e2509-ddbb-7380-b97b-41575092d86b
CODEX_DISCORD_ALLOWED_CHANNEL_IDS=1502107617512919220
CODEX_DISCORD_DIR=/home/peezy/codex-fork-workspace/codex-flows
CODEX_FLOW_BACKEND_URL=http://127.0.0.1:8090
CODEX_DISCORD_HOOK_SPOOL_DIR=/home/peezy/.codex/discord-bridge/stop-hooks
```
`CODEX_DISCORD_MAIN_THREAD_ID` is optional. If omitted, the bridge creates a new
@ -76,6 +77,64 @@ Delegations support return modes:
- `detached`: do not loop results back to the main thread; useful for human-continued threads
Automatic result return uses `thread/inject_items` to append structured
delegation results to the main operator thread's model-visible history. Starting
a main-thread turn is a separate wake step, so long-running main goals are not
interrupted; wakes are queued until the main operator thread is idle.
delegation results to the main operator thread's model-visible history. Codex
`Stop` hooks, not background thread polling, drive automatic result return:
the global hook writes durable Stop events into the spool directory, and the
gateway drains that spool on startup and while running. Starting a main-thread
turn is a separate wake step, so long-running main goals are not interrupted;
wakes are queued until the main operator thread is idle.
## Codex Stop Hook
Install the global hook once for the Codex runtime that backs the gateway:
```bash
codex-discord-bridge hook install
```
The bridge and hook default to `~/.codex/discord-bridge/stop-hooks`; override
both with `CODEX_DISCORD_HOOK_SPOOL_DIR` or `--hook-spool-dir` if needed.
The installer enables the current hooks feature in `~/.codex/config.toml`:
```toml
[features]
hooks = true
```
It also registers the Stop hook in `~/.codex/hooks.json`:
```json
{
"hooks": {
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "codex-discord-bridge hook stop",
"timeout": 10,
"statusMessage": "Recording Discord gateway Stop event"
}
]
}
]
}
}
```
For package-on-demand installs, write a `bunx` command instead:
```bash
codex-discord-bridge hook install --bunx
codex-discord-bridge hook install --bunx-package @peezy.tech/codex-flows
```
The hook is intentionally dumb: it does not read gateway state or call the
backend. It only writes idempotent Stop-event files. The gateway ignores unknown
sessions, treats known delegated sessions according to their return mode, and
uses main-operator Stop events to drain queued wakes.
After changing hook configuration, restart the Codex runtime that backs the
gateway and trust the hook when Codex asks for review. `hooks/list` should show
the hook as `trusted`; untrusted hooks are discovered but do not run.

View file

@ -1,3 +1,4 @@
import { watch, type FSWatcher } from "node:fs";
import os from "node:os";
import path from "node:path";
import { createHash } from "node:crypto";
@ -12,12 +13,19 @@ import {
createDiscordBridgeLogger,
type DiscordBridgeLogger,
} from "./logger.ts";
import {
archiveStopHookSpoolFile,
ensureStopHookSpool,
readPendingStopHookSpoolFiles,
stopHookSpoolPaths,
} from "./stop-hook-spool.ts";
import type {
CodexBridgeClient,
DiscordBridgeConfig,
DiscordGatewayDelegation,
DiscordGatewayDelegationReturnMode,
DiscordGatewayPendingWake,
DiscordGatewayStopHookEvent,
DiscordBridgeSession,
DiscordBridgeState,
DiscordBridgeStateStore,
@ -31,7 +39,8 @@ import type {
const maxDiscordMessageLength = 2000;
const gatewayToolsVersion = 1;
const defaultGatewayReconcileIntervalMs = 10_000;
const stopHookDrainDebounceMs = 100;
const stopHookRetryMs = 1_000;
type ThreadSnapshot = {
terminalTurnIds: string[];
@ -54,7 +63,9 @@ export class DiscordCodexBridge {
#dedupe: MessageDeduplicator;
#logger: DiscordBridgeLogger;
#consoleOutput: DiscordConsoleOutput | undefined;
#gatewayReconcileTimer: Timer | undefined;
#gatewayStopHookWatcher: FSWatcher | undefined;
#gatewayStopHookDrainTimer: Timer | undefined;
#gatewayStopHookDrainChain: Promise<void> = Promise.resolve();
constructor(options: {
client: CodexBridgeClient;
@ -132,20 +143,25 @@ export class DiscordCodexBridge {
for (const runner of this.#runnersByDiscordThread.values()) {
runner.start();
}
this.#startGatewayReconciler();
await this.#startGatewayStopHookSpool();
}
async stop(): Promise<void> {
this.#debug("bridge.stop", {
runners: this.#runnersByDiscordThread.size,
});
if (this.#gatewayReconcileTimer) {
clearInterval(this.#gatewayReconcileTimer);
this.#gatewayReconcileTimer = undefined;
if (this.#gatewayStopHookDrainTimer) {
clearTimeout(this.#gatewayStopHookDrainTimer);
this.#gatewayStopHookDrainTimer = undefined;
}
if (this.#gatewayStopHookWatcher) {
this.#gatewayStopHookWatcher.close();
this.#gatewayStopHookWatcher = undefined;
}
await Promise.all(
[...this.#runnersByDiscordThread.values()].map((runner) => runner.stop()),
);
await this.#gatewayStopHookDrainChain.catch(() => undefined);
await this.#persistChain.catch(() => undefined);
await this.transport.stop();
this.client.close();
@ -608,6 +624,10 @@ export class DiscordCodexBridge {
return;
}
await runner.handleNotification(message);
if (message.method === "turn/completed" && this.#isGatewayMainThread(threadId)) {
await this.#processPendingWakes();
await this.#persist();
}
}
#handleServerRequest(message: JsonRpcRequest): void {
@ -742,6 +762,7 @@ export class DiscordCodexBridge {
toolsVersion: state.gateway?.toolsVersion,
delegations: state.gateway?.delegations ?? [],
pendingWakes: state.gateway?.pendingWakes ?? [],
processedStopHookEventIds: state.gateway?.processedStopHookEventIds ?? [],
};
this.#registerRunner(existing);
await this.#persist();
@ -805,6 +826,7 @@ export class DiscordCodexBridge {
: gatewayToolsVersion,
delegations: state.gateway?.delegations ?? [],
pendingWakes: state.gateway?.pendingWakes ?? [],
processedStopHookEventIds: state.gateway?.processedStopHookEventIds ?? [],
};
state.sessions.push(session);
this.#registerRunner(session);
@ -834,6 +856,19 @@ export class DiscordCodexBridge {
: undefined;
}
#isGatewayMainThread(threadId: string): boolean {
const session = this.#gatewaySession();
return Boolean(
(session && session.codexThreadId === threadId) ||
this.#requireState().gateway?.mainThreadId === threadId,
);
}
#gatewayStopHookSpoolDir(): string {
return this.config.hookSpoolDir ??
path.join(path.dirname(this.config.statePath), "stop-hooks");
}
#gatewayDelegations(): DiscordGatewayDelegation[] {
const state = this.#requireState();
if (!state.gateway) {
@ -842,6 +877,7 @@ export class DiscordCodexBridge {
mainThreadId: this.#gatewaySession()?.codexThreadId,
delegations: [],
pendingWakes: [],
processedStopHookEventIds: [],
};
}
state.gateway.delegations ??= [];
@ -856,12 +892,28 @@ export class DiscordCodexBridge {
mainThreadId: this.#gatewaySession()?.codexThreadId,
delegations: [],
pendingWakes: [],
processedStopHookEventIds: [],
};
}
state.gateway.pendingWakes ??= [];
return state.gateway.pendingWakes;
}
#gatewayProcessedStopHookEventIds(): string[] {
const state = this.#requireState();
if (!state.gateway) {
state.gateway = {
homeChannelId: this.config.gateway?.homeChannelId ?? "",
mainThreadId: this.#gatewaySession()?.codexThreadId,
delegations: [],
pendingWakes: [],
processedStopHookEventIds: [],
};
}
state.gateway.processedStopHookEventIds ??= [];
return state.gateway.processedStopHookEventIds;
}
async #startDelegation(args: Record<string, unknown>): Promise<unknown> {
const cwd = requiredArg(args, "cwd");
const title = stringValue(args.title) ?? firstLine(stringValue(args.prompt)) ??
@ -1028,7 +1080,6 @@ export class DiscordCodexBridge {
}
async #flushDelegationResults(args: Record<string, unknown>): Promise<unknown> {
await this.#reconcileDelegations();
const groupId = stringValue(args.groupId);
const delegations = groupId
? this.#gatewayDelegations().filter((delegation) => delegation.groupId === groupId)
@ -1037,7 +1088,7 @@ export class DiscordCodexBridge {
: this.#gatewayDelegations();
const flushed: DiscordGatewayDelegation[] = [];
for (const delegation of delegations) {
if (!delegation.lastFinal || !isTerminalDelegation(delegation)) {
if (!isTerminalDelegation(delegation)) {
continue;
}
await this.#recordDelegationResult(delegation);
@ -1086,93 +1137,157 @@ export class DiscordCodexBridge {
}));
}
#startGatewayReconciler(): void {
if (!this.config.gateway || this.#gatewayReconcileTimer) {
async #startGatewayStopHookSpool(): Promise<void> {
if (!this.config.gateway || this.#gatewayStopHookWatcher) {
return;
}
const intervalMs = this.config.reconcileIntervalMs ?? defaultGatewayReconcileIntervalMs;
this.#gatewayReconcileTimer = setInterval(() => {
void this.#reconcileDelegations().catch((error) => {
this.#debug("gateway.delegations.reconcile.failed", {
error: errorMessage(error),
});
});
}, intervalMs);
void this.#reconcileDelegations().catch((error) => {
this.#debug("gateway.delegations.reconcile.failed", {
const spoolDir = this.#gatewayStopHookSpoolDir();
await ensureStopHookSpool(spoolDir);
const pendingDir = stopHookSpoolPaths(spoolDir).pending;
this.#gatewayStopHookWatcher = watch(pendingDir, { persistent: false }, () => {
this.#scheduleGatewayStopHookDrain();
});
this.#gatewayStopHookWatcher.on("error", (error) => {
this.#debug("gateway.stopHook.watch.failed", {
error: errorMessage(error),
});
});
await this.#drainGatewayStopHookSpool();
}
async #reconcileDelegations(): Promise<void> {
const delegations = this.#gatewayDelegations();
let changed = false;
for (const delegation of delegations) {
if (delegation.status !== "active") {
continue;
}
const response = await this.client.readThread({
threadId: delegation.codexThreadId,
includeTurns: true,
});
const turns = Array.isArray(response.thread.turns) ? response.thread.turns : [];
const latest = record(turns[turns.length - 1]);
const latestStatus = stringValue(latest.status);
if (!latestStatus || !isTerminalTurnStatus(latestStatus)) {
continue;
}
const snapshot = threadSnapshotFromThread(response.thread);
delegation.status = latestStatus === "completed" ? "complete" : "failed";
delegation.lastTurnId = stringValue(latest.id) ?? snapshot.lastFinal?.turnId ??
delegation.lastTurnId;
delegation.lastStatus = latestStatus;
delegation.lastFinal = snapshot.lastFinal?.text ?? delegation.lastFinal;
delegation.completedAt = this.#now().toISOString();
delegation.updatedAt = delegation.completedAt;
changed = true;
#scheduleGatewayStopHookDrain(delayMs = stopHookDrainDebounceMs): void {
if (!this.config.gateway) {
return;
}
for (const delegation of delegations) {
if (!isTerminalDelegation(delegation) || !delegation.lastFinal) {
if (this.#gatewayStopHookDrainTimer) {
clearTimeout(this.#gatewayStopHookDrainTimer);
}
this.#gatewayStopHookDrainTimer = setTimeout(() => {
this.#gatewayStopHookDrainTimer = undefined;
void this.#drainGatewayStopHookSpool().catch((error) => {
this.#debug("gateway.stopHook.drain.failed", {
error: errorMessage(error),
});
});
}, delayMs);
this.#gatewayStopHookDrainTimer.unref?.();
}
async #drainGatewayStopHookSpool(): Promise<void> {
const drain = this.#gatewayStopHookDrainChain
.catch(() => undefined)
.then(() => this.#drainGatewayStopHookSpoolOnce());
this.#gatewayStopHookDrainChain = drain.catch(() => undefined);
await drain;
}
async #drainGatewayStopHookSpoolOnce(): Promise<void> {
if (!this.config.gateway) {
return;
}
const spoolDir = this.#gatewayStopHookSpoolDir();
const files = await readPendingStopHookSpoolFiles(spoolDir);
let shouldRetry = false;
for (const file of files) {
if ("error" in file) {
this.#debug("gateway.stopHook.file.invalid", {
fileName: file.fileName,
error: file.error.message,
});
await archiveStopHookSpoolFile(file, spoolDir, "failed");
continue;
}
const mode = delegation.returnMode ?? "manual";
if (mode === "detached" || mode === "manual") {
const processedIds = this.#gatewayProcessedStopHookEventIds();
if (processedIds.includes(file.event.id)) {
await archiveStopHookSpoolFile(file, spoolDir, "ignored");
continue;
}
await this.#recordDelegationResult(delegation);
await this.#mirrorDelegationResult(delegation);
changed = true;
if (mode === "wake_on_done") {
const result = await this.#handleGatewayStopHookEvent(file.event);
if (result === "retry") {
shouldRetry = true;
continue;
}
processedIds.push(file.event.id);
await this.#persist();
await archiveStopHookSpoolFile(
file,
spoolDir,
result === "processed" ? "processed" : "ignored",
);
}
if (shouldRetry) {
this.#scheduleGatewayStopHookDrain(stopHookRetryMs);
}
}
async #handleGatewayStopHookEvent(
event: DiscordGatewayStopHookEvent,
): Promise<"processed" | "ignored" | "retry"> {
if (event.eventName !== "Stop") {
return "ignored";
}
if (this.#isGatewayMainThread(event.sessionId)) {
const started = await this.#processPendingWakes({
completedThreadId: event.sessionId,
completedTurnId: event.turnId,
});
return started || !this.#gatewayPendingWakes().some((wake) => !wake.startedAt)
? "processed"
: "retry";
}
const delegation = this.#delegationForThread(event.sessionId);
if (!delegation) {
return "ignored";
}
const completedAt = this.#now().toISOString();
delegation.status = "complete";
delegation.lastTurnId = event.turnId ?? delegation.lastTurnId;
delegation.lastStatus = "completed";
delegation.lastFinal = event.lastAssistantMessage ?? delegation.lastFinal;
delegation.completedAt = completedAt;
delegation.updatedAt = completedAt;
await this.#applyDelegationReturnPolicy(delegation);
await this.#processPendingWakes();
return "processed";
}
async #applyDelegationReturnPolicy(
delegation: DiscordGatewayDelegation,
): Promise<void> {
if (!isTerminalDelegation(delegation)) {
return;
}
const mode = delegation.returnMode ?? "manual";
if (mode === "detached" || mode === "manual") {
return;
}
await this.#recordDelegationResult(delegation);
await this.#mirrorDelegationResult(delegation);
if (mode === "wake_on_done") {
this.#enqueueWake({
kind: "delegation",
delegationIds: [delegation.id],
reason: `Delegation ${delegation.title} completed.`,
});
}
if (mode === "wake_on_group" && delegation.groupId) {
const group = this.#gatewayDelegations().filter((candidate) =>
candidate.groupId === delegation.groupId
);
if (group.length > 0 && group.every(isTerminalDelegation)) {
this.#enqueueWake({
kind: "delegation",
delegationIds: [delegation.id],
reason: `Delegation ${delegation.title} completed.`,
kind: "group",
groupId: delegation.groupId,
delegationIds: group.map((candidate) => candidate.id),
reason: `Delegation group ${delegation.groupId} completed.`,
});
}
if (mode === "wake_on_group" && delegation.groupId) {
const group = delegations.filter((candidate) =>
candidate.groupId === delegation.groupId
);
if (group.length > 0 && group.every(isTerminalDelegation)) {
this.#enqueueWake({
kind: "group",
groupId: delegation.groupId,
delegationIds: group.map((candidate) => candidate.id),
reason: `Delegation group ${delegation.groupId} completed.`,
});
}
}
}
await this.#processPendingWakes();
if (changed) {
await this.#persist();
}
}
async #recordDelegationResult(delegation: DiscordGatewayDelegation): Promise<void> {
const gatewaySession = this.#gatewaySession();
if (!gatewaySession || delegation.injectedAt || !delegation.lastFinal) {
if (!gatewaySession || delegation.injectedAt) {
return;
}
await this.client.injectThreadItems({
@ -1196,7 +1311,7 @@ export class DiscordCodexBridge {
async #mirrorDelegationResult(delegation: DiscordGatewayDelegation): Promise<void> {
const homeChannelId = this.config.gateway?.homeChannelId;
if (!homeChannelId || delegation.mirroredAt || !delegation.lastFinal) {
if (!homeChannelId || delegation.mirroredAt) {
return;
}
await this.transport.sendMessage(homeChannelId, delegationResultText(delegation));
@ -1232,28 +1347,46 @@ export class DiscordCodexBridge {
});
}
async #processPendingWakes(): Promise<void> {
async #processPendingWakes(options: {
completedThreadId?: string;
completedTurnId?: string;
} = {}): Promise<boolean> {
const gatewaySession = this.#gatewaySession();
if (!gatewaySession || this.#isSessionRunning(gatewaySession, this.#requireState())) {
return;
if (
!gatewaySession ||
this.#isSessionRunning(gatewaySession, this.#requireState(), options)
) {
return false;
}
const wake = this.#gatewayPendingWakes().find((candidate) => !candidate.startedAt);
if (!wake) {
return;
return false;
}
const prompt = wakePrompt(wake, this.#gatewayDelegations());
const turn = await this.client.startTurn({
threadId: gatewaySession.codexThreadId,
input: [{ type: "text", text: prompt, text_elements: [] }],
cwd: gatewaySession.cwd ?? this.config.cwd ?? null,
model: this.config.model ?? null,
serviceTier: this.config.serviceTier ?? null,
effort: this.config.effort ?? null,
summary: this.config.summary ?? null,
approvalPolicy: this.config.approvalPolicy ?? null,
permissions: this.config.permissions ?? null,
outputSchema: null,
});
let turn: v2.TurnStartResponse;
try {
turn = await this.client.startTurn({
threadId: gatewaySession.codexThreadId,
input: [{ type: "text", text: prompt, text_elements: [] }],
cwd: gatewaySession.cwd ?? this.config.cwd ?? null,
model: this.config.model ?? null,
serviceTier: this.config.serviceTier ?? null,
effort: this.config.effort ?? null,
summary: this.config.summary ?? null,
approvalPolicy: this.config.approvalPolicy ?? null,
permissions: this.config.permissions ?? null,
outputSchema: null,
});
} catch (error) {
if (errorMessage(error).includes("already has an active turn")) {
this.#debug("gateway.wake.deferred.activeTurn", {
wakeId: wake.id,
error: errorMessage(error),
});
return false;
}
throw error;
}
wake.startedAt = this.#now().toISOString();
for (const delegation of this.#gatewayDelegations()) {
if (wake.delegationIds.includes(delegation.id)) {
@ -1267,6 +1400,7 @@ export class DiscordCodexBridge {
kind: wake.kind,
groupId: wake.groupId,
});
return true;
}
async #flowBackendGet(
@ -1326,11 +1460,19 @@ export class DiscordCodexBridge {
#isSessionRunning(
session: DiscordBridgeSession,
state: DiscordBridgeState,
options: {
completedThreadId?: string;
completedTurnId?: string;
} = {},
): boolean {
const hasActiveTurn = state.activeTurns.some(
(active) =>
active.discordThreadId === session.discordThreadId &&
active.codexThreadId === session.codexThreadId,
active.codexThreadId === session.codexThreadId &&
!(
active.codexThreadId === options.completedThreadId &&
active.turnId === options.completedTurnId
),
);
if (hasActiveTurn) {
return true;
@ -1339,7 +1481,11 @@ export class DiscordCodexBridge {
(item) =>
item.discordThreadId === session.discordThreadId &&
item.codexThreadId === session.codexThreadId &&
item.status !== "failed",
item.status !== "failed" &&
!(
item.codexThreadId === options.completedThreadId &&
item.turnId === options.completedTurnId
),
);
}

View file

@ -166,6 +166,10 @@ export function parseConfig(argv: string[], env: NodeJS.ProcessEnv): ParsedConfi
permissions: permissionsProfile
? { type: "profile", id: permissionsProfile }
: undefined,
hookSpoolDir: resolveHomeDir(
stringFlag(args, "hook-spool-dir") ??
env.CODEX_DISCORD_HOOK_SPOOL_DIR,
),
debug,
},
};
@ -366,6 +370,7 @@ Options:
--home-channel-id <id> Enable gateway mode for one Discord home channel
--main-thread-id <id> Resume an existing Codex operator thread for gateway mode
--flow-backend-url <url> Optional codex-flow-systemd-local backend URL
--hook-spool-dir <path> Directory drained for Codex Stop hook events
[dir] Optional Codex thread directory, resolved from home
--dir <path> Codex thread directory, resolved from home
--cwd <path> Alias for --dir

View file

@ -0,0 +1,291 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { writeStopHookSpoolEvent } from "./stop-hook-spool.ts";
const defaultHookCommand = "codex-discord-bridge hook stop";
const defaultBunxPackage = "codex-discord-bridge";
const hookStatusMessage = "Recording Discord gateway Stop event";
export type HookInstallOptions = {
command?: string;
useBunx?: boolean;
bunxPackage?: string;
configPath?: string;
hooksPath?: string;
dryRun?: boolean;
};
export type HookInstallResult = {
command: string;
configPath: string;
hooksPath: string;
dryRun: boolean;
};
export async function handleHookCommand(argv: string[]): Promise<boolean> {
if (argv[0] !== "hook") {
return false;
}
const subcommand = argv[1] ?? "help";
if (subcommand === "stop") {
await runStopHook();
return true;
}
if (subcommand === "install") {
const result = await installStopHook(parseInstallArgs(argv.slice(2)));
process.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
return true;
}
if (subcommand === "help" || subcommand === "--help" || subcommand === "-h") {
process.stdout.write(hookHelpText());
return true;
}
throw new Error(`Unknown hook subcommand: ${subcommand}`);
}
export async function runStopHook(): Promise<void> {
try {
const input = await new Response(Bun.stdin.stream()).text();
await writeStopHookSpoolEvent(JSON.parse(input));
process.stdout.write(`${JSON.stringify({ continue: true })}\n`);
} catch (error) {
process.stderr.write(`discord gateway stop hook failed: ${errorMessage(error)}\n`);
process.stdout.write(`${JSON.stringify({ continue: true })}\n`);
}
}
export async function installStopHook(
options: HookInstallOptions = {},
): Promise<HookInstallResult> {
const configPath = path.resolve(
expandHome(options.configPath ?? path.join(os.homedir(), ".codex", "config.toml")),
);
const hooksPath = path.resolve(
expandHome(options.hooksPath ?? path.join(os.homedir(), ".codex", "hooks.json")),
);
const command = hookCommand(options);
if (!options.dryRun) {
const configText = await readTextIfExists(configPath);
const hooksText = await readTextIfExists(hooksPath);
await mkdir(path.dirname(configPath), { recursive: true });
await mkdir(path.dirname(hooksPath), { recursive: true });
await writeFile(configPath, enableHooksFeature(configText));
await writeFile(hooksPath, `${JSON.stringify(upsertStopHookConfig(hooksText, command), null, 2)}\n`);
}
return {
command,
configPath,
hooksPath,
dryRun: Boolean(options.dryRun),
};
}
export function enableHooksFeature(configText: string): string {
const lines = configText.replace(/\s*$/, "").split(/\r?\n/);
if (lines.length === 1 && lines[0] === "") {
return "[features]\nhooks = true\n";
}
const featureHeaderIndex = lines.findIndex((line) => line.trim() === "[features]");
if (featureHeaderIndex < 0) {
return `${lines.join("\n")}\n\n[features]\nhooks = true\n`;
}
let insertIndex = featureHeaderIndex + 1;
while (insertIndex < lines.length && !lines[insertIndex]?.trim().startsWith("[")) {
const line = lines[insertIndex] ?? "";
if (/^\s*hooks\s*=/.test(line)) {
lines[insertIndex] = "hooks = true";
return `${lines.join("\n")}\n`;
}
insertIndex += 1;
}
lines.splice(featureHeaderIndex + 1, 0, "hooks = true");
return `${lines.join("\n")}\n`;
}
export function upsertStopHookConfig(
hooksText: string,
command: string,
): Record<string, unknown> {
const config = parseHooksJson(hooksText);
const hooks = record(config.hooks);
const stopGroups = Array.isArray(hooks.Stop) ? hooks.Stop : [];
hooks.Stop = [
stopHookGroup(command),
...stopGroups
.map(removeGatewayStopHookHandlers)
.filter((group): group is Record<string, unknown> => group !== undefined),
];
config.hooks = hooks;
return config;
}
function parseInstallArgs(argv: string[]): HookInstallOptions {
const options: HookInstallOptions = {};
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
if (!arg) {
continue;
}
if (arg === "--dry-run") {
options.dryRun = true;
continue;
}
if (arg === "--bunx") {
options.useBunx = true;
continue;
}
if (arg === "--command") {
options.command = requiredNext(argv, ++index, arg);
continue;
}
if (arg.startsWith("--command=")) {
options.command = arg.slice("--command=".length);
continue;
}
if (arg === "--bunx-package") {
options.useBunx = true;
options.bunxPackage = requiredNext(argv, ++index, arg);
continue;
}
if (arg.startsWith("--bunx-package=")) {
options.useBunx = true;
options.bunxPackage = arg.slice("--bunx-package=".length);
continue;
}
if (arg === "--config-path") {
options.configPath = requiredNext(argv, ++index, arg);
continue;
}
if (arg.startsWith("--config-path=")) {
options.configPath = arg.slice("--config-path=".length);
continue;
}
if (arg === "--hooks-path") {
options.hooksPath = requiredNext(argv, ++index, arg);
continue;
}
if (arg.startsWith("--hooks-path=")) {
options.hooksPath = arg.slice("--hooks-path=".length);
continue;
}
throw new Error(`Unknown hook install option: ${arg}`);
}
return options;
}
function hookCommand(options: HookInstallOptions): string {
if (options.command && options.useBunx) {
throw new Error("Cannot set both --command and --bunx.");
}
if (options.command) {
return options.command;
}
if (options.useBunx || options.bunxPackage) {
return `bunx --package ${options.bunxPackage ?? defaultBunxPackage} ${defaultHookCommand}`;
}
return defaultHookCommand;
}
function stopHookGroup(command: string): Record<string, unknown> {
return {
hooks: [
{
type: "command",
command,
timeout: 10,
statusMessage: hookStatusMessage,
},
],
};
}
function removeGatewayStopHookHandlers(input: unknown): Record<string, unknown> | undefined {
const group = record(input);
const handlers = Array.isArray(group.hooks)
? group.hooks.filter((handler) => !isGatewayStopHookHandler(handler))
: [];
if (handlers.length === 0) {
return undefined;
}
return { ...group, hooks: handlers };
}
function isGatewayStopHookHandler(input: unknown): boolean {
const handler = record(input);
const command = typeof handler.command === "string" ? handler.command : "";
return command.includes("codex-discord-bridge hook stop") ||
command.includes("codex-discord-gateway-stop-hook") ||
command.includes("apps/discord-bridge/src/stop-hook.ts");
}
async function readTextIfExists(filePath: string): Promise<string> {
try {
return await readFile(filePath, "utf8");
} catch (error) {
const code = error instanceof Error && "code" in error
? String((error as NodeJS.ErrnoException).code)
: "";
if (code === "ENOENT") {
return "";
}
throw error;
}
}
function parseHooksJson(text: string): Record<string, unknown> {
if (!text.trim()) {
return {};
}
try {
return record(JSON.parse(text));
} catch (error) {
throw new Error(`Failed to parse hooks.json: ${errorMessage(error)}`);
}
}
function expandHome(value: string): string {
if (value === "~") {
return os.homedir();
}
if (value.startsWith("~/")) {
return path.join(os.homedir(), value.slice(2));
}
return value;
}
function requiredNext(argv: string[], index: number, flag: string): string {
const value = argv[index];
if (!value) {
throw new Error(`${flag} requires a value.`);
}
return value;
}
function hookHelpText(): string {
return `codex-discord-bridge hook manages the global Codex Stop hook.
Usage:
codex-discord-bridge hook install [options]
codex-discord-bridge hook stop
Options:
--command <cmd> Hook command to write. Defaults to "codex-discord-bridge hook stop".
--bunx Write a bunx command instead of the global binary command.
--bunx-package <pkg> Package for bunx --package. Defaults to codex-discord-bridge.
--config-path <path> Codex config.toml path.
--hooks-path <path> Codex hooks.json path.
--dry-run Print the planned install result without writing files.
`;
}
function record(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value)
? value as Record<string, unknown>
: {};
}
function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

24
apps/discord-bridge/src/index.ts Normal file → Executable file
View file

@ -1,20 +1,17 @@
#!/usr/bin/env bun
import {
CodexAppServerClient,
CodexStdioTransport,
} from "@peezy.tech/codex-flows";
import { DiscordCodexBridge } from "./bridge.ts";
import { createDiscordConsoleOutput } from "./console-output.ts";
import type { DiscordCodexBridge } from "./bridge.ts";
import { handleHookCommand } from "./hook-cli.ts";
import { parseConfig } from "./config.ts";
import { DiscordJsBridgeTransport } from "./discord-transport.ts";
import { createDiscordBridgeLogger } from "./logger.ts";
import { JsonFileStateStore } from "./state.ts";
async function main(): Promise<void> {
let logger = createDiscordBridgeLogger();
try {
const parsed = parseConfig(Bun.argv.slice(2), process.env);
const argv = Bun.argv.slice(2);
if (await handleHookCommand(argv)) {
return;
}
const parsed = parseConfig(argv, process.env);
if (parsed.type === "help") {
process.stdout.write(parsed.text);
return;
@ -23,6 +20,13 @@ async function main(): Promise<void> {
debug: parsed.config.debug,
logLevel: parsed.config.logLevel,
});
const { CodexAppServerClient, CodexStdioTransport } = await import(
"@peezy.tech/codex-flows"
);
const { DiscordCodexBridge } = await import("./bridge.ts");
const { createDiscordConsoleOutput } = await import("./console-output.ts");
const { DiscordJsBridgeTransport } = await import("./discord-transport.ts");
const { JsonFileStateStore } = await import("./state.ts");
const consoleOutput = parsed.config.consoleOutput === "messages"
? createDiscordConsoleOutput()
: undefined;

View file

@ -15,6 +15,7 @@ import type {
const maxProcessedMessageIds = 1000;
const maxDeliveries = 500;
const maxProcessedStopHookEventIds = 2000;
export class JsonFileStateStore implements DiscordBridgeStateStore {
readonly path: string;
@ -74,6 +75,12 @@ export function trimState(state: DiscordBridgeState): void {
-maxProcessedMessageIds,
);
state.deliveries = state.deliveries.slice(-maxDeliveries);
if (state.gateway?.processedStopHookEventIds) {
state.gateway.processedStopHookEventIds =
state.gateway.processedStopHookEventIds.slice(
-maxProcessedStopHookEventIds,
);
}
}
function parseState(value: unknown): DiscordBridgeState {
@ -120,6 +127,9 @@ function parseGateway(value: unknown): DiscordGatewayState | undefined {
pendingWakes: Array.isArray(value.pendingWakes)
? value.pendingWakes.map(parseGatewayPendingWake)
: [],
processedStopHookEventIds: Array.isArray(value.processedStopHookEventIds)
? uniqueStrings(value.processedStopHookEventIds)
: [],
};
}

View file

@ -0,0 +1,239 @@
import { createHash, randomUUID } from "node:crypto";
import {
mkdir,
readdir,
readFile,
rename,
rm,
writeFile,
} from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { DiscordGatewayStopHookEvent } from "./types.ts";
export type StopHookSpoolDisposition = "processed" | "ignored" | "failed";
export type PendingStopHookSpoolFile =
| {
filePath: string;
fileName: string;
event: DiscordGatewayStopHookEvent;
}
| {
filePath: string;
fileName: string;
error: Error;
};
export function defaultStopHookSpoolDir(): string {
return path.join(os.homedir(), ".codex", "discord-bridge", "stop-hooks");
}
export function stopHookSpoolDirFromEnv(
env: NodeJS.ProcessEnv = process.env,
): string {
return env.CODEX_DISCORD_HOOK_SPOOL_DIR || defaultStopHookSpoolDir();
}
export function stopHookSpoolPaths(spoolDir: string): Record<
"pending" | StopHookSpoolDisposition,
string
> {
const root = path.resolve(spoolDir);
return {
pending: path.join(root, "pending"),
processed: path.join(root, "processed"),
ignored: path.join(root, "ignored"),
failed: path.join(root, "failed"),
};
}
export async function ensureStopHookSpool(spoolDir: string): Promise<void> {
const paths = stopHookSpoolPaths(spoolDir);
await Promise.all(Object.values(paths).map((dir) => mkdir(dir, { recursive: true })));
}
export async function writeStopHookSpoolEvent(
input: unknown,
options: {
spoolDir?: string;
now?: () => Date;
} = {},
): Promise<DiscordGatewayStopHookEvent> {
const spoolDir = options.spoolDir ?? stopHookSpoolDirFromEnv();
const event = stopHookEventFromInput(input, options.now ?? (() => new Date()));
const paths = stopHookSpoolPaths(spoolDir);
await mkdir(paths.pending, { recursive: true });
const fileName = `${event.id}.json`;
const finalPath = path.join(paths.pending, fileName);
const tempPath = path.join(
paths.pending,
`.${fileName}.${process.pid}.${Date.now()}.${randomUUID()}.tmp`,
);
await writeFile(tempPath, `${JSON.stringify(event, null, 2)}\n`);
await rename(tempPath, finalPath);
return event;
}
export async function readPendingStopHookSpoolFiles(
spoolDir: string,
): Promise<PendingStopHookSpoolFile[]> {
const paths = stopHookSpoolPaths(spoolDir);
await ensureStopHookSpool(spoolDir);
const fileNames = (await readdir(paths.pending))
.filter((fileName) => fileName.endsWith(".json"))
.sort();
const files: PendingStopHookSpoolFile[] = [];
for (const fileName of fileNames) {
const filePath = path.join(paths.pending, fileName);
try {
const parsed = JSON.parse(await readFile(filePath, "utf8")) as unknown;
files.push({
filePath,
fileName,
event: parseStopHookSpoolEvent(parsed),
});
} catch (error) {
files.push({
filePath,
fileName,
error: error instanceof Error ? error : new Error(String(error)),
});
}
}
return files;
}
export async function archiveStopHookSpoolFile(
file: Pick<PendingStopHookSpoolFile, "filePath" | "fileName">,
spoolDir: string,
disposition: StopHookSpoolDisposition,
): Promise<void> {
const paths = stopHookSpoolPaths(spoolDir);
await mkdir(paths[disposition], { recursive: true });
const target = path.join(
paths[disposition],
`${Date.now()}-${randomUUID()}-${file.fileName}`,
);
try {
await rename(file.filePath, target);
} catch (error) {
const code = error instanceof Error && "code" in error
? String((error as NodeJS.ErrnoException).code)
: "";
if (code === "ENOENT") {
return;
}
throw error;
}
}
export async function removeStopHookSpool(spoolDir: string): Promise<void> {
await rm(path.resolve(spoolDir), { recursive: true, force: true });
}
function stopHookEventFromInput(
input: unknown,
now: () => Date,
): DiscordGatewayStopHookEvent {
const parsed = record(input);
const eventName = stringValue(parsed.hook_event_name) ?? stringValue(parsed.eventName);
if (eventName && eventName !== "Stop") {
throw new Error(`Unsupported hook event: ${eventName}`);
}
const sessionId = stringValue(parsed.session_id) ?? stringValue(parsed.sessionId);
if (!sessionId) {
throw new Error("Stop hook input is missing session_id");
}
const turnId = stringValue(parsed.turn_id) ?? stringValue(parsed.turnId);
const transcriptPath =
stringValue(parsed.transcript_path) ?? stringValue(parsed.transcriptPath);
const cwd = stringValue(parsed.cwd);
const lastAssistantMessage =
nullableString(parsed.last_assistant_message) ??
nullableString(parsed.lastAssistantMessage);
const stopHookActive =
typeof parsed.stop_hook_active === "boolean"
? parsed.stop_hook_active
: typeof parsed.stopHookActive === "boolean"
? parsed.stopHookActive
: undefined;
const id = stopHookEventId({
sessionId,
turnId,
transcriptPath,
cwd,
});
return {
version: 1,
id,
eventName: "Stop",
sessionId,
turnId,
cwd,
transcriptPath,
lastAssistantMessage,
stopHookActive,
createdAt: now().toISOString(),
};
}
function parseStopHookSpoolEvent(input: unknown): DiscordGatewayStopHookEvent {
const parsed = record(input);
if (parsed.version !== 1) {
throw new Error("Invalid stop hook event version");
}
const eventName = stringValue(parsed.eventName);
const id = stringValue(parsed.id);
const sessionId = stringValue(parsed.sessionId);
const createdAt = stringValue(parsed.createdAt);
if (eventName !== "Stop" || !id || !sessionId || !createdAt) {
throw new Error("Invalid stop hook event");
}
return {
version: 1,
id,
eventName,
sessionId,
turnId: stringValue(parsed.turnId),
cwd: stringValue(parsed.cwd),
transcriptPath: stringValue(parsed.transcriptPath),
lastAssistantMessage: nullableString(parsed.lastAssistantMessage),
stopHookActive: typeof parsed.stopHookActive === "boolean"
? parsed.stopHookActive
: undefined,
createdAt,
};
}
function stopHookEventId(input: {
sessionId: string;
turnId?: string;
transcriptPath?: string;
cwd?: string;
}): string {
const identity = input.turnId
? { eventName: "Stop", sessionId: input.sessionId, turnId: input.turnId }
: {
eventName: "Stop",
sessionId: input.sessionId,
transcriptPath: input.transcriptPath,
cwd: input.cwd,
};
return `stop-${createHash("sha256").update(JSON.stringify(identity)).digest("hex").slice(0, 24)}`;
}
function record(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value)
? value as Record<string, unknown>
: {};
}
function stringValue(value: unknown): string | undefined {
return typeof value === "string" && value.length > 0 ? value : undefined;
}
function nullableString(value: unknown): string | undefined {
return typeof value === "string" && value.length > 0 ? value : undefined;
}

View file

@ -0,0 +1,4 @@
#!/usr/bin/env bun
import { runStopHook } from "./hook-cli.ts";
await runStopHook();

View file

@ -23,6 +23,7 @@ export type DiscordBridgeConfig = {
permissions?: v2.PermissionProfileSelectionParams;
typingIntervalMs?: number;
reconcileIntervalMs?: number;
hookSpoolDir?: string;
progressMode?: DiscordProgressMode;
consoleOutput?: DiscordConsoleOutputMode;
logLevel?: DiscordBridgeLogLevelSetting;
@ -153,6 +154,7 @@ export type DiscordGatewayState = {
toolsVersion?: number;
delegations: DiscordGatewayDelegation[];
pendingWakes?: DiscordGatewayPendingWake[];
processedStopHookEventIds?: string[];
};
export type DiscordGatewayDelegationReturnMode =
@ -193,6 +195,19 @@ export type DiscordGatewayPendingWake = {
startedAt?: string;
};
export type DiscordGatewayStopHookEvent = {
version: 1;
id: string;
eventName: "Stop";
sessionId: string;
turnId?: string;
cwd?: string;
transcriptPath?: string;
lastAssistantMessage?: string;
stopHookActive?: boolean;
createdAt: string;
};
export type DiscordBridgeSession = {
discordThreadId: string;
parentChannelId: string;

View file

@ -1,3 +1,4 @@
import { mkdtemp, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "bun:test";
@ -10,6 +11,7 @@ import type {
DiscordConsoleOutput,
} from "../src/console-output.ts";
import { MemoryStateStore, emptyState } from "../src/state.ts";
import { writeStopHookSpoolEvent } from "../src/stop-hook-spool.ts";
import type {
CodexBridgeClient,
DiscordBridgeConfig,
@ -225,6 +227,7 @@ describe("DiscordCodexBridge", () => {
});
test("gateway records group delegation results and wakes after the group finishes", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
@ -233,116 +236,134 @@ describe("DiscordCodexBridge", () => {
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
for (const [index, title] of ["Workspace A", "Workspace B"].entries()) {
try {
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
for (const [index, title] of ["Workspace A", "Workspace B"].entries()) {
client.emitRequest({
id: `tool-${index}`,
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: `/workspace/${index}`,
title,
prompt: `Inspect ${title}.`,
groupId: "fanout",
},
},
});
await waitFor(() => client.responses.length === index + 1);
}
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-1",
lastAssistantMessage: "Result A.",
});
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(2);
expect(client.readThreadCalls).toEqual([]);
expect(transport.messages.some((message) =>
message.channelId === "home-channel" &&
message.text.includes("Result A.")
)).toBe(true);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-3",
turnId: "turn-2",
lastAssistantMessage: "Result B.",
});
await waitFor(() => client.startTurnCalls.length === 3);
expect(client.injectThreadItemsCalls).toHaveLength(2);
expect(client.startTurnCalls[2]).toEqual(
expect.objectContaining({
threadId: "codex-thread-1",
}),
);
expect(inputText(client.startTurnCalls[2]?.input[0])).toContain(
"Delegation group fanout completed.",
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
kind: "group",
groupId: "fanout",
startedAt: "2026-05-14T12:00:00.000Z",
}),
);
await sleep(30);
expect(client.startTurnCalls).toHaveLength(3);
expect(bridge.stateForTest().gateway?.pendingWakes).toHaveLength(1);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
});
test("gateway detached delegations complete without injecting or waking the main thread", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
try {
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: `tool-${index}`,
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: `/workspace/${index}`,
title,
prompt: `Inspect ${title}.`,
groupId: "fanout",
cwd: "/workspace/detached",
title: "Detached workspace",
prompt: "Prepare this for a human.",
returnMode: "detached",
},
},
});
await waitFor(() => client.responses.length === index + 1);
await waitFor(() => client.responses.length === 1);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-1",
lastAssistantMessage: "Detached result.",
});
await waitFor(() =>
bridge.stateForTest().gateway?.delegations[0]?.status === "complete"
);
expect(client.injectThreadItemsCalls).toEqual([]);
expect(client.startTurnCalls).toHaveLength(1);
expect(client.readThreadCalls).toEqual([]);
expect(transport.messages.some((message) =>
message.text.includes("Detached result.")
)).toBe(false);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Result A."),
]);
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(2);
expect(transport.messages.some((message) =>
message.channelId === "home-channel" &&
message.text.includes("Result A.")
)).toBe(true);
client.threadTurns.set("codex-thread-3", [
completedTurn("turn-2", "Result B."),
]);
await waitFor(() => client.startTurnCalls.length === 3);
expect(client.injectThreadItemsCalls).toHaveLength(2);
expect(client.startTurnCalls[2]).toEqual(
expect.objectContaining({
threadId: "codex-thread-1",
}),
);
expect(inputText(client.startTurnCalls[2]?.input[0])).toContain(
"Delegation group fanout completed.",
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
kind: "group",
groupId: "fanout",
startedAt: "2026-05-14T12:00:00.000Z",
}),
);
await sleep(30);
expect(client.startTurnCalls).toHaveLength(3);
expect(bridge.stateForTest().gateway?.pendingWakes).toHaveLength(1);
await bridge.stop();
});
test("gateway detached delegations complete without injecting or waking the main thread", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/detached",
title: "Detached workspace",
prompt: "Prepare this for a human.",
returnMode: "detached",
},
},
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Detached result."),
]);
await waitFor(() =>
bridge.stateForTest().gateway?.delegations[0]?.status === "complete"
);
expect(client.injectThreadItemsCalls).toEqual([]);
expect(client.startTurnCalls).toHaveLength(1);
expect(transport.messages.some((message) =>
message.text.includes("Detached result.")
)).toBe(false);
await bridge.stop();
});
test("gateway queues delegation wake while the main operator thread is busy", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
@ -351,59 +372,77 @@ describe("DiscordCodexBridge", () => {
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
transport.emit({
kind: "message",
channelId: "home-channel",
messageId: "home-message-1",
author: { id: "user-1", name: "Peezy", isBot: false },
content: "work on a long-running main task",
createdAt: "2026-05-14T12:00:00.000Z",
});
await waitFor(() => bridge.stateForTest().activeTurns.length === 1);
try {
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
transport.emit({
kind: "message",
channelId: "home-channel",
messageId: "home-message-1",
author: { id: "user-1", name: "Peezy", isBot: false },
content: "work on a long-running main task",
createdAt: "2026-05-14T12:00:00.000Z",
});
await waitFor(() => bridge.stateForTest().activeTurns.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/side",
title: "Side task",
prompt: "Finish this side task.",
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/side",
title: "Side task",
prompt: "Finish this side task.",
},
},
},
});
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-2", "Side task result."),
]);
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(2);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
kind: "delegation",
}),
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).not.toHaveProperty(
"startedAt",
);
await sleep(30);
expect(client.startTurnCalls).toHaveLength(2);
expect(bridge.stateForTest().gateway?.pendingWakes).toHaveLength(1);
await bridge.stop();
await waitFor(() => client.responses.length === 1);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-2",
lastAssistantMessage: "Side task result.",
});
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(2);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
kind: "delegation",
}),
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).not.toHaveProperty(
"startedAt",
);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-1",
turnId: "turn-1",
lastAssistantMessage: "Main task paused.",
});
await waitFor(() => client.startTurnCalls.length === 3);
expect(inputText(client.startTurnCalls[2]?.input[0])).toContain(
"Delegation Side task completed.",
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
startedAt: "2026-05-14T12:00:00.000Z",
}),
);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
});
test("gateway record-only delegations inject and mirror without waking", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
@ -412,43 +451,184 @@ describe("DiscordCodexBridge", () => {
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/record",
title: "Record only task",
prompt: "Record this result.",
returnMode: "record_only",
try {
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/record",
title: "Record only task",
prompt: "Record this result.",
returnMode: "record_only",
},
},
});
await waitFor(() => client.responses.length === 1);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-1",
lastAssistantMessage: "Record-only result.",
});
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(1);
expect(bridge.stateForTest().gateway?.pendingWakes ?? []).toEqual([]);
expect(transport.messages.some((message) =>
message.text.includes("Record-only result.")
)).toBe(true);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
});
test("gateway drains queued stop hook events on startup", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const store = new MemoryStateStore({
...emptyState(),
gateway: {
homeChannelId: "home-channel",
mainThreadId: "codex-thread-1",
toolsVersion: 1,
delegations: [
{
id: "delegation-queued",
codexThreadId: "codex-thread-2",
title: "Queued event task",
status: "active",
cwd: "/workspace/queued",
returnMode: "record_only",
createdAt: "2026-05-14T11:59:00.000Z",
updatedAt: "2026-05-14T11:59:00.000Z",
},
],
pendingWakes: [],
processedStopHookEventIds: [],
},
sessions: [
{
discordThreadId: "home-channel",
parentChannelId: "home-channel",
codexThreadId: "codex-thread-1",
title: "Codex Gateway",
createdAt: "2026-05-14T11:59:00.000Z",
cwd: "/workspace",
mode: "gateway",
},
],
});
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-queued",
lastAssistantMessage: "Queued result.",
cwd: "/workspace/queued",
});
const bridge = new DiscordCodexBridge({
client,
transport,
store,
config: testConfig({
gateway: { homeChannelId: "home-channel" },
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Record-only result."),
]);
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(1);
expect(bridge.stateForTest().gateway?.pendingWakes ?? []).toEqual([]);
expect(transport.messages.some((message) =>
message.text.includes("Record-only result.")
)).toBe(true);
await bridge.stop();
try {
await bridge.start();
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.readThreadCalls).toEqual([]);
expect(bridge.stateForTest().gateway?.delegations[0]).toEqual(
expect.objectContaining({
status: "complete",
lastTurnId: "turn-queued",
lastFinal: "Queued result.",
injectedAt: "2026-05-14T12:00:00.000Z",
}),
);
expect(transport.messages.some((message) =>
message.text.includes("Queued result.")
)).toBe(true);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
});
test("gateway stop hook events are idempotent", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
try {
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/idempotent",
title: "Idempotent task",
prompt: "Return once.",
returnMode: "record_only",
},
},
});
await waitFor(() => client.responses.length === 1);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-1",
lastAssistantMessage: "Exactly once.",
});
await waitFor(() => client.injectThreadItemsCalls.length === 1);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-1",
lastAssistantMessage: "Duplicate with changed text.",
});
await sleep(200);
expect(client.injectThreadItemsCalls).toHaveLength(1);
expect(transport.messages.filter((message) =>
message.text.includes("Exactly once.")
)).toHaveLength(1);
expect(
bridge.stateForTest().gateway?.processedStopHookEventIds,
).toHaveLength(1);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
});
test("gateway manually flushes completed manual delegation results", async () => {
const hookSpoolDir = await testHookSpoolDir();
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
@ -457,57 +637,63 @@ describe("DiscordCodexBridge", () => {
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
hookSpoolDir,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/manual",
title: "Manual task",
prompt: "Finish manually.",
returnMode: "manual",
try {
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/manual",
title: "Manual task",
prompt: "Finish manually.",
returnMode: "manual",
},
},
},
});
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Manual result."),
]);
await waitFor(() =>
bridge.stateForTest().gateway?.delegations[0]?.status === "complete"
);
expect(client.injectThreadItemsCalls).toEqual([]);
client.emitRequest({
id: "tool-2",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "flush_delegation_results",
arguments: {
delegationId: bridge.stateForTest().gateway?.delegations[0]?.id,
wake: "false",
await waitFor(() => client.responses.length === 1);
await emitStopHook(hookSpoolDir, {
sessionId: "codex-thread-2",
turnId: "turn-1",
lastAssistantMessage: "Manual result.",
});
await waitFor(() =>
bridge.stateForTest().gateway?.delegations[0]?.status === "complete"
);
expect(client.injectThreadItemsCalls).toEqual([]);
client.emitRequest({
id: "tool-2",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "flush_delegation_results",
arguments: {
delegationId: bridge.stateForTest().gateway?.delegations[0]?.id,
wake: "false",
},
},
},
});
});
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(1);
expect(transport.messages.some((message) =>
message.text.includes("Manual result.")
)).toBe(true);
await bridge.stop();
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(1);
expect(transport.messages.some((message) =>
message.text.includes("Manual result.")
)).toBe(true);
} finally {
await bridge.stop();
await rm(hookSpoolDir, { recursive: true, force: true });
}
});
test("answers gateway status in the home channel without starting a turn", async () => {
@ -2914,6 +3100,35 @@ describe("DiscordCodexBridge", () => {
});
});
async function testHookSpoolDir(): Promise<string> {
return await mkdtemp(path.join(os.tmpdir(), "discord-bridge-hooks-"));
}
async function emitStopHook(
spoolDir: string,
input: {
sessionId: string;
turnId: string;
lastAssistantMessage?: string;
cwd?: string;
},
): Promise<void> {
await writeStopHookSpoolEvent(
{
hook_event_name: "Stop",
session_id: input.sessionId,
turn_id: input.turnId,
cwd: input.cwd ?? "/workspace",
transcript_path: `/tmp/${input.sessionId}.jsonl`,
last_assistant_message: input.lastAssistantMessage ?? null,
},
{
spoolDir,
now: () => new Date("2026-05-14T12:00:00.000Z"),
},
);
}
function testConfig(
overrides: Partial<DiscordBridgeConfig> = {},
): DiscordBridgeConfig {
@ -3255,22 +3470,6 @@ function gatewayToolResult(value: unknown): unknown {
return text ? JSON.parse(text) : undefined;
}
function completedTurn(id: string, text: string): v2.Turn {
return {
id,
status: "completed",
items: [
{
type: "agentMessage",
id: `${id}-message`,
text,
phase: "final_answer",
memoryCitation: null,
},
],
} as unknown as v2.Turn;
}
function statusMessageText(transport: FakeDiscordTransport): string {
return transport.messages.find((message) => message.id === "message-out-1")
?.text ?? "";

View file

@ -0,0 +1,122 @@
import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "bun:test";
import {
enableHooksFeature,
installStopHook,
upsertStopHookConfig,
} from "../src/hook-cli.ts";
describe("discord gateway hook CLI", () => {
test("enables the current hooks feature in config.toml", () => {
expect(enableHooksFeature("model = \"gpt-5\"\n")).toBe(
"model = \"gpt-5\"\n\n[features]\nhooks = true\n",
);
expect(enableHooksFeature("[features]\ngoals = true\n")).toBe(
"[features]\nhooks = true\ngoals = true\n",
);
expect(enableHooksFeature("[features]\nhooks = false\ngoals = true\n")).toBe(
"[features]\nhooks = true\ngoals = true\n",
);
});
test("upserts package-bin Stop hook while preserving unrelated hooks", () => {
const updated = upsertStopHookConfig(
JSON.stringify({
hooks: {
PreToolUse: [
{
matcher: "Bash",
hooks: [{ type: "command", command: "echo pre" }],
},
],
Stop: [
{
hooks: [
{
type: "command",
command:
"bun /home/peezy/codex-fork-workspace/codex-flows/apps/discord-bridge/src/stop-hook.ts",
},
{ type: "command", command: "echo other-stop" },
],
},
],
},
}),
"codex-discord-bridge hook stop",
);
expect(updated).toEqual({
hooks: {
PreToolUse: [
{
matcher: "Bash",
hooks: [{ type: "command", command: "echo pre" }],
},
],
Stop: [
{
hooks: [
{
type: "command",
command: "codex-discord-bridge hook stop",
timeout: 10,
statusMessage: "Recording Discord gateway Stop event",
},
],
},
{
hooks: [{ type: "command", command: "echo other-stop" }],
},
],
},
});
});
test("install writes config and hooks files", async () => {
const dir = await mkdtemp(path.join(os.tmpdir(), "discord-hook-cli-"));
try {
const configPath = path.join(dir, "config.toml");
const hooksPath = path.join(dir, "hooks.json");
await writeFile(configPath, "[features]\ngoals = true\n");
const result = await installStopHook({
configPath,
hooksPath,
useBunx: true,
bunxPackage: "@peezy.tech/codex-flows",
});
expect(result).toEqual({
command:
"bunx --package @peezy.tech/codex-flows codex-discord-bridge hook stop",
configPath,
hooksPath,
dryRun: false,
});
expect(await readFile(configPath, "utf8")).toBe(
"[features]\nhooks = true\ngoals = true\n",
);
expect(JSON.parse(await readFile(hooksPath, "utf8"))).toEqual(
expect.objectContaining({
hooks: expect.objectContaining({
Stop: [
{
hooks: [
expect.objectContaining({
command:
"bunx --package @peezy.tech/codex-flows codex-discord-bridge hook stop",
}),
],
},
],
}),
}),
);
} finally {
await rm(dir, { recursive: true, force: true });
}
});
});

View file

@ -43,6 +43,12 @@ describe("JsonFileStateStore", () => {
createdAt: "2026-05-11T00:00:03.000Z",
},
],
processedStopHookEventIds: [
"stop-1",
"",
"stop-1",
"stop-2",
],
},
sessions: [
{
@ -121,6 +127,7 @@ describe("JsonFileStateStore", () => {
createdAt: "2026-05-11T00:00:03.000Z",
},
],
processedStopHookEventIds: ["stop-1", "stop-2"],
});
expect(state.sessions).toHaveLength(2);
expect(state.sessions[0]?.ownerUserId).toBe("user-1");

View file

@ -0,0 +1,83 @@
import { mkdtemp, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "bun:test";
import {
archiveStopHookSpoolFile,
readPendingStopHookSpoolFiles,
writeStopHookSpoolEvent,
} from "../src/stop-hook-spool.ts";
describe("stop hook spool", () => {
test("writes stable Stop events into the pending spool", async () => {
const spoolDir = await mkdtemp(path.join(os.tmpdir(), "stop-hook-spool-"));
try {
const input = {
hook_event_name: "Stop",
session_id: "session-1",
turn_id: "turn-1",
cwd: "/workspace",
transcript_path: "/tmp/session.jsonl",
last_assistant_message: "Finished.",
stop_hook_active: false,
};
const first = await writeStopHookSpoolEvent(input, {
spoolDir,
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
const second = await writeStopHookSpoolEvent(input, {
spoolDir,
now: () => new Date("2026-05-14T12:01:00.000Z"),
});
const third = await writeStopHookSpoolEvent(
{ ...input, last_assistant_message: "Finished again." },
{
spoolDir,
now: () => new Date("2026-05-14T12:02:00.000Z"),
},
);
expect(second.id).toBe(first.id);
expect(third.id).toBe(first.id);
const pending = await readPendingStopHookSpoolFiles(spoolDir);
expect(pending).toHaveLength(1);
expect(pending[0]).toEqual(
expect.objectContaining({
event: expect.objectContaining({
id: first.id,
eventName: "Stop",
sessionId: "session-1",
turnId: "turn-1",
lastAssistantMessage: "Finished again.",
stopHookActive: false,
}),
}),
);
} finally {
await rm(spoolDir, { recursive: true, force: true });
}
});
test("archives processed files out of pending", async () => {
const spoolDir = await mkdtemp(path.join(os.tmpdir(), "stop-hook-spool-"));
try {
await writeStopHookSpoolEvent(
{
hook_event_name: "Stop",
session_id: "session-1",
turn_id: "turn-1",
},
{ spoolDir },
);
const [file] = await readPendingStopHookSpoolFiles(spoolDir);
expect(file).toBeDefined();
await archiveStopHookSpoolFile(file!, spoolDir, "processed");
expect(await readPendingStopHookSpoolFiles(spoolDir)).toEqual([]);
} finally {
await rm(spoolDir, { recursive: true, force: true });
}
});
});