diff --git a/apps/discord-bridge/README.md b/apps/discord-bridge/README.md index 4582446..71000f2 100644 --- a/apps/discord-bridge/README.md +++ b/apps/discord-bridge/README.md @@ -46,6 +46,9 @@ are attached only to that main thread and expose: - `resume_delegation` - `send_delegation` - `read_delegation` +- `set_delegation_policy` +- `flush_delegation_results` +- `list_delegation_groups` - `list_flow_runs` - `list_flow_events` @@ -56,8 +59,23 @@ Those tools can: - resume a delegated Codex session by thread id - send a turn to a delegated session - observe or summarize delegated session state +- group delegations for fan-out/fan-in coordination +- record completed delegation results into the main operator thread - inspect flow backend state through `CODEX_FLOW_BACKEND_URL` Gateway state stores delegation records, including optional Discord detail thread ids for noisy work. Delegated Codex sessions do not receive the privileged gateway tools; only the main operator thread can manage delegation. + +Delegations support return modes: + +- `wake_on_done`: inject and mirror the result, then wake the main operator when idle +- `wake_on_group`: inject and mirror each result, then wake once the whole group is terminal +- `record_only`: inject and mirror results without waking the main operator +- `manual`: keep results in gateway state until `flush_delegation_results` +- `detached`: do not loop results back to the main thread; useful for human-continued threads + +Automatic result return uses `thread/inject_items` to append structured +delegation results to the main operator thread's model-visible history. Starting +a main-thread turn is a separate wake step, so long-running main goals are not +interrupted; wakes are queued until the main operator thread is idle. diff --git a/apps/discord-bridge/src/bridge.ts b/apps/discord-bridge/src/bridge.ts index 94c4238..5469ccc 100644 --- a/apps/discord-bridge/src/bridge.ts +++ b/apps/discord-bridge/src/bridge.ts @@ -16,6 +16,8 @@ import type { CodexBridgeClient, DiscordBridgeConfig, DiscordGatewayDelegation, + DiscordGatewayDelegationReturnMode, + DiscordGatewayPendingWake, DiscordBridgeSession, DiscordBridgeState, DiscordBridgeStateStore, @@ -29,6 +31,7 @@ import type { const maxDiscordMessageLength = 2000; const gatewayToolsVersion = 1; +const defaultGatewayReconcileIntervalMs = 10_000; type ThreadSnapshot = { terminalTurnIds: string[]; @@ -51,6 +54,7 @@ export class DiscordCodexBridge { #dedupe: MessageDeduplicator; #logger: DiscordBridgeLogger; #consoleOutput: DiscordConsoleOutput | undefined; + #gatewayReconcileTimer: Timer | undefined; constructor(options: { client: CodexBridgeClient; @@ -128,12 +132,17 @@ export class DiscordCodexBridge { for (const runner of this.#runnersByDiscordThread.values()) { runner.start(); } + this.#startGatewayReconciler(); } async stop(): Promise { this.#debug("bridge.stop", { runners: this.#runnersByDiscordThread.size, }); + if (this.#gatewayReconcileTimer) { + clearInterval(this.#gatewayReconcileTimer); + this.#gatewayReconcileTimer = undefined; + } await Promise.all( [...this.#runnersByDiscordThread.values()].map((runner) => runner.stop()), ); @@ -666,6 +675,17 @@ export class DiscordCodexBridge { if (tool === "read_delegation") { return await this.#readDelegation(args); } + if (tool === "set_delegation_policy") { + return await this.#setDelegationPolicy(args); + } + if (tool === "flush_delegation_results") { + return await this.#flushDelegationResults(args); + } + if (tool === "list_delegation_groups") { + return { + groups: this.#delegationGroups(), + }; + } if (tool === "list_flow_runs") { return await this.#flowBackendGet("/runs", args); } @@ -721,6 +741,7 @@ export class DiscordCodexBridge { createdAt: existing.createdAt, toolsVersion: state.gateway?.toolsVersion, delegations: state.gateway?.delegations ?? [], + pendingWakes: state.gateway?.pendingWakes ?? [], }; this.#registerRunner(existing); await this.#persist(); @@ -783,6 +804,7 @@ export class DiscordCodexBridge { ? state.gateway?.toolsVersion : gatewayToolsVersion, delegations: state.gateway?.delegations ?? [], + pendingWakes: state.gateway?.pendingWakes ?? [], }; state.sessions.push(session); this.#registerRunner(session); @@ -819,17 +841,37 @@ export class DiscordCodexBridge { homeChannelId: this.config.gateway?.homeChannelId ?? "", mainThreadId: this.#gatewaySession()?.codexThreadId, delegations: [], + pendingWakes: [], }; } state.gateway.delegations ??= []; return state.gateway.delegations; } + #gatewayPendingWakes(): DiscordGatewayPendingWake[] { + const state = this.#requireState(); + if (!state.gateway) { + state.gateway = { + homeChannelId: this.config.gateway?.homeChannelId ?? "", + mainThreadId: this.#gatewaySession()?.codexThreadId, + delegations: [], + pendingWakes: [], + }; + } + state.gateway.pendingWakes ??= []; + return state.gateway.pendingWakes; + } + async #startDelegation(args: Record): Promise { const cwd = requiredArg(args, "cwd"); const title = stringValue(args.title) ?? firstLine(stringValue(args.prompt)) ?? `Delegated ${compactId(cwd)}`; const prompt = stringValue(args.prompt); + const groupId = stringValue(args.groupId); + const returnMode = returnModeFromArgs( + args, + groupId ? "wake_on_group" : "wake_on_done", + ); const started = await this.client.startThread(this.#threadStartParams(cwd)); const codexThreadId = started.thread.id; await this.client.setThreadName({ @@ -843,6 +885,8 @@ export class DiscordCodexBridge { title, status: prompt ? "active" : "idle", cwd, + groupId, + returnMode, discordDetailThreadId: stringValue(args.discordDetailThreadId), parentDiscordMessageId: stringValue(args.parentDiscordMessageId), createdAt: now, @@ -863,6 +907,7 @@ export class DiscordCodexBridge { outputSchema: null, }); turnId = turn.turn.id; + delegation.lastTurnId = turnId; } await this.#persist(); return { delegation, turnId }; @@ -871,6 +916,7 @@ export class DiscordCodexBridge { async #resumeDelegation(args: Record): Promise { const codexThreadId = requiredArg(args, "threadId"); const cwd = stringValue(args.cwd); + const groupId = stringValue(args.groupId); const resumed = await this.client.resumeThread(this.#threadResumeParams(codexThreadId, cwd)); const now = this.#now().toISOString(); const delegation = this.#upsertDelegation({ @@ -879,6 +925,8 @@ export class DiscordCodexBridge { title: stringValue(args.title) ?? `Delegated ${compactId(codexThreadId)}`, status: "idle", cwd: cwd ?? resumeResponseCwd(resumed), + groupId, + returnMode: returnModeFromArgs(args, "manual"), discordDetailThreadId: stringValue(args.discordDetailThreadId), parentDiscordMessageId: stringValue(args.parentDiscordMessageId), createdAt: this.#delegationForThread(codexThreadId)?.createdAt ?? now, @@ -891,6 +939,14 @@ export class DiscordCodexBridge { async #sendDelegation(args: Record): Promise { const delegation = this.#requireDelegation(args); const prompt = requiredArg(args, "prompt"); + const groupId = stringValue(args.groupId); + if (groupId) { + delegation.groupId = groupId; + } + delegation.returnMode = returnModeFromArgs( + args, + delegation.returnMode ?? (delegation.groupId ? "wake_on_group" : "wake_on_done"), + ); const turn = await this.client.startTurn({ threadId: delegation.codexThreadId, input: [{ type: "text", text: prompt, text_elements: [] }], @@ -904,6 +960,13 @@ export class DiscordCodexBridge { outputSchema: null, }); delegation.status = "active"; + delegation.lastTurnId = turn.turn.id; + delegation.lastStatus = undefined; + delegation.lastFinal = undefined; + delegation.completedAt = undefined; + delegation.injectedAt = undefined; + delegation.mirroredAt = undefined; + delegation.reportedAt = undefined; delegation.updatedAt = this.#now().toISOString(); await this.#persist(); return { delegation, turnId: turn.turn.id }; @@ -920,12 +983,18 @@ export class DiscordCodexBridge { const latest = record(turns[turns.length - 1]); const latestStatus = stringValue(latest.status); if (latestStatus === "completed") { - delegation.status = "idle"; + delegation.status = "complete"; } else if (latestStatus === "failed" || latestStatus === "interrupted") { delegation.status = "failed"; } else if (latestStatus) { delegation.status = "active"; } + delegation.lastTurnId = stringValue(latest.id) ?? delegation.lastTurnId; + delegation.lastStatus = latestStatus ?? delegation.lastStatus; + delegation.lastFinal = snapshot.lastFinal?.text ?? delegation.lastFinal; + if (latestStatus && isTerminalTurnStatus(latestStatus)) { + delegation.completedAt ??= this.#now().toISOString(); + } delegation.updatedAt = this.#now().toISOString(); await this.#persist(); return { @@ -937,6 +1006,270 @@ export class DiscordCodexBridge { }; } + async #setDelegationPolicy(args: Record): Promise { + const groupId = stringValue(args.groupId); + const mode = returnModeFromArgs(args, undefined); + if (!mode) { + throw new Error("Missing required argument: returnMode"); + } + const delegations = groupId + ? this.#gatewayDelegations().filter((delegation) => delegation.groupId === groupId) + : [this.#requireDelegation(args)]; + if (delegations.length === 0) { + throw new Error("No matching gateway delegations."); + } + const now = this.#now().toISOString(); + for (const delegation of delegations) { + delegation.returnMode = mode; + delegation.updatedAt = now; + } + await this.#persist(); + return { delegations }; + } + + async #flushDelegationResults(args: Record): Promise { + await this.#reconcileDelegations(); + const groupId = stringValue(args.groupId); + const delegations = groupId + ? this.#gatewayDelegations().filter((delegation) => delegation.groupId === groupId) + : stringValue(args.delegationId) || stringValue(args.threadId) || stringValue(args.id) + ? [this.#requireDelegation(args)] + : this.#gatewayDelegations(); + const flushed: DiscordGatewayDelegation[] = []; + for (const delegation of delegations) { + if (!delegation.lastFinal || !isTerminalDelegation(delegation)) { + continue; + } + await this.#recordDelegationResult(delegation); + await this.#mirrorDelegationResult(delegation); + flushed.push(delegation); + } + if (flushed.length > 0 && stringValue(args.wake) !== "false") { + this.#enqueueWake({ + kind: groupId ? "group" : "delegation", + groupId, + delegationIds: flushed.map((delegation) => delegation.id), + reason: groupId + ? `Delegation group ${groupId} was manually flushed.` + : "Delegation results were manually flushed.", + }); + await this.#processPendingWakes(); + } + await this.#persist(); + return { flushed }; + } + + #delegationGroups(): Array<{ + groupId: string; + total: number; + active: number; + terminal: number; + pendingWake: boolean; + }> { + const groups = new Map(); + for (const delegation of this.#gatewayDelegations()) { + if (!delegation.groupId) { + continue; + } + const existing = groups.get(delegation.groupId) ?? []; + existing.push(delegation); + groups.set(delegation.groupId, existing); + } + return [...groups.entries()].map(([groupId, delegations]) => ({ + groupId, + total: delegations.length, + active: delegations.filter((delegation) => delegation.status === "active").length, + terminal: delegations.filter(isTerminalDelegation).length, + pendingWake: this.#gatewayPendingWakes().some((wake) => + wake.groupId === groupId && !wake.startedAt + ), + })); + } + + #startGatewayReconciler(): void { + if (!this.config.gateway || this.#gatewayReconcileTimer) { + return; + } + const intervalMs = this.config.reconcileIntervalMs ?? defaultGatewayReconcileIntervalMs; + this.#gatewayReconcileTimer = setInterval(() => { + void this.#reconcileDelegations().catch((error) => { + this.#debug("gateway.delegations.reconcile.failed", { + error: errorMessage(error), + }); + }); + }, intervalMs); + void this.#reconcileDelegations().catch((error) => { + this.#debug("gateway.delegations.reconcile.failed", { + error: errorMessage(error), + }); + }); + } + + async #reconcileDelegations(): Promise { + const delegations = this.#gatewayDelegations(); + let changed = false; + for (const delegation of delegations) { + if (delegation.status !== "active") { + continue; + } + const response = await this.client.readThread({ + threadId: delegation.codexThreadId, + includeTurns: true, + }); + const turns = Array.isArray(response.thread.turns) ? response.thread.turns : []; + const latest = record(turns[turns.length - 1]); + const latestStatus = stringValue(latest.status); + if (!latestStatus || !isTerminalTurnStatus(latestStatus)) { + continue; + } + const snapshot = threadSnapshotFromThread(response.thread); + delegation.status = latestStatus === "completed" ? "complete" : "failed"; + delegation.lastTurnId = stringValue(latest.id) ?? snapshot.lastFinal?.turnId ?? + delegation.lastTurnId; + delegation.lastStatus = latestStatus; + delegation.lastFinal = snapshot.lastFinal?.text ?? delegation.lastFinal; + delegation.completedAt = this.#now().toISOString(); + delegation.updatedAt = delegation.completedAt; + changed = true; + } + for (const delegation of delegations) { + if (!isTerminalDelegation(delegation) || !delegation.lastFinal) { + continue; + } + const mode = delegation.returnMode ?? "manual"; + if (mode === "detached" || mode === "manual") { + continue; + } + await this.#recordDelegationResult(delegation); + await this.#mirrorDelegationResult(delegation); + changed = true; + if (mode === "wake_on_done") { + this.#enqueueWake({ + kind: "delegation", + delegationIds: [delegation.id], + reason: `Delegation ${delegation.title} completed.`, + }); + } + if (mode === "wake_on_group" && delegation.groupId) { + const group = delegations.filter((candidate) => + candidate.groupId === delegation.groupId + ); + if (group.length > 0 && group.every(isTerminalDelegation)) { + this.#enqueueWake({ + kind: "group", + groupId: delegation.groupId, + delegationIds: group.map((candidate) => candidate.id), + reason: `Delegation group ${delegation.groupId} completed.`, + }); + } + } + } + await this.#processPendingWakes(); + if (changed) { + await this.#persist(); + } + } + + async #recordDelegationResult(delegation: DiscordGatewayDelegation): Promise { + const gatewaySession = this.#gatewaySession(); + if (!gatewaySession || delegation.injectedAt || !delegation.lastFinal) { + return; + } + await this.client.injectThreadItems({ + threadId: gatewaySession.codexThreadId, + items: [ + { + type: "message", + role: "user", + content: [ + { + type: "input_text", + text: delegationResultText(delegation), + }, + ], + }, + ], + }); + delegation.injectedAt = this.#now().toISOString(); + delegation.updatedAt = delegation.injectedAt; + } + + async #mirrorDelegationResult(delegation: DiscordGatewayDelegation): Promise { + const homeChannelId = this.config.gateway?.homeChannelId; + if (!homeChannelId || delegation.mirroredAt || !delegation.lastFinal) { + return; + } + await this.transport.sendMessage(homeChannelId, delegationResultText(delegation)); + delegation.mirroredAt = this.#now().toISOString(); + delegation.updatedAt = delegation.mirroredAt; + } + + #enqueueWake(input: { + kind: DiscordGatewayPendingWake["kind"]; + delegationIds: string[]; + groupId?: string; + reason: string; + }): void { + const delegationIds = [...new Set(input.delegationIds)].sort(); + if (delegationIds.length === 0) { + return; + } + const wakes = this.#gatewayPendingWakes(); + if (wakes.some((wake) => + !wake.startedAt && + wake.kind === input.kind && + wake.groupId === input.groupId && + sameStringSet(wake.delegationIds, delegationIds) + )) { + return; + } + wakes.push({ + id: wakeId(input.kind, input.groupId, delegationIds), + kind: input.kind, + groupId: input.groupId, + delegationIds, + reason: input.reason, + createdAt: this.#now().toISOString(), + }); + } + + async #processPendingWakes(): Promise { + const gatewaySession = this.#gatewaySession(); + if (!gatewaySession || this.#isSessionRunning(gatewaySession, this.#requireState())) { + return; + } + const wake = this.#gatewayPendingWakes().find((candidate) => !candidate.startedAt); + if (!wake) { + return; + } + const prompt = wakePrompt(wake, this.#gatewayDelegations()); + const turn = await this.client.startTurn({ + threadId: gatewaySession.codexThreadId, + input: [{ type: "text", text: prompt, text_elements: [] }], + cwd: gatewaySession.cwd ?? this.config.cwd ?? null, + model: this.config.model ?? null, + serviceTier: this.config.serviceTier ?? null, + effort: this.config.effort ?? null, + summary: this.config.summary ?? null, + approvalPolicy: this.config.approvalPolicy ?? null, + permissions: this.config.permissions ?? null, + outputSchema: null, + }); + wake.startedAt = this.#now().toISOString(); + for (const delegation of this.#gatewayDelegations()) { + if (wake.delegationIds.includes(delegation.id)) { + delegation.reportedAt = wake.startedAt; + delegation.updatedAt = wake.startedAt; + } + } + this.#debug("gateway.wake.started", { + wakeId: wake.id, + turnId: turn.turn.id, + kind: wake.kind, + groupId: wake.groupId, + }); + } + async #flowBackendGet( pathname: string, args: Record, @@ -1431,6 +1764,8 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] { cwd: stringSchema("Workspace cwd for the delegated Codex session."), title: optionalStringSchema("Human title for the delegated work."), prompt: optionalStringSchema("Optional first prompt to send to the delegated session."), + groupId: optionalStringSchema("Optional delegation group id for fan-out/fan-in orchestration."), + returnMode: optionalStringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."), discordDetailThreadId: optionalStringSchema("Optional Discord detail thread id for noisy work."), parentDiscordMessageId: optionalStringSchema("Optional Discord message id that requested the delegation."), }, ["cwd"]), @@ -1443,6 +1778,8 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] { threadId: stringSchema("Existing Codex thread id to resume and track."), cwd: optionalStringSchema("Optional cwd override for the resumed thread."), title: optionalStringSchema("Human title for the delegated work."), + groupId: optionalStringSchema("Optional delegation group id for fan-out/fan-in orchestration."), + returnMode: optionalStringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."), discordDetailThreadId: optionalStringSchema("Optional Discord detail thread id for noisy work."), parentDiscordMessageId: optionalStringSchema("Optional Discord message id that requested the delegation."), }, ["threadId"]), @@ -1455,6 +1792,8 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] { delegationId: optionalStringSchema("Tracked delegation id."), threadId: optionalStringSchema("Tracked delegated Codex thread id."), prompt: stringSchema("Prompt to send to the delegated session."), + groupId: optionalStringSchema("Optional delegation group id to assign for this turn."), + returnMode: optionalStringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."), }, ["prompt"]), }, { @@ -1466,6 +1805,34 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] { threadId: optionalStringSchema("Tracked delegated Codex thread id."), }), }, + { + namespace: "codex_gateway", + name: "set_delegation_policy", + description: "Update return policy for one delegation or every delegation in a group.", + inputSchema: objectSchema({ + delegationId: optionalStringSchema("Tracked delegation id."), + threadId: optionalStringSchema("Tracked delegated Codex thread id."), + groupId: optionalStringSchema("Delegation group id."), + returnMode: stringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."), + }, ["returnMode"]), + }, + { + namespace: "codex_gateway", + name: "flush_delegation_results", + description: "Manually inject and mirror completed delegation results, optionally waking the main operator.", + inputSchema: objectSchema({ + delegationId: optionalStringSchema("Tracked delegation id."), + threadId: optionalStringSchema("Tracked delegated Codex thread id."), + groupId: optionalStringSchema("Delegation group id."), + wake: optionalStringSchema("Set to false to avoid starting a main operator turn."), + }), + }, + { + namespace: "codex_gateway", + name: "list_delegation_groups", + description: "List delegation groups and their terminal/active counts.", + inputSchema: objectSchema({}), + }, { namespace: "codex_gateway", name: "list_flow_runs", @@ -1516,6 +1883,93 @@ function requiredArg(args: Record, name: string): string { return value; } +function returnModeFromArgs( + args: Record, + fallback: DiscordGatewayDelegationReturnMode | undefined, +): DiscordGatewayDelegationReturnMode | undefined { + const value = stringValue(args.returnMode) ?? stringValue(args.returnPolicy); + if (!value) { + return fallback; + } + if (value === "immediate") { + return "wake_on_done"; + } + if (value === "group_barrier") { + return "wake_on_group"; + } + if ( + value === "detached" || + value === "record_only" || + value === "wake_on_done" || + value === "wake_on_group" || + value === "manual" + ) { + return value; + } + throw new Error(`Invalid returnMode: ${value}`); +} + +function isTerminalDelegation(delegation: DiscordGatewayDelegation): boolean { + return delegation.status === "complete" || + delegation.status === "failed" || + delegation.status === "reported"; +} + +function delegationResultText(delegation: DiscordGatewayDelegation): string { + return [ + "[discord-gateway delegation result]", + `Delegation: ${delegation.title}`, + `Delegation ID: ${delegation.id}`, + `Thread: ${delegation.codexThreadId}`, + delegation.groupId ? `Group: ${delegation.groupId}` : undefined, + delegation.cwd ? `Dir: ${delegation.cwd}` : undefined, + `Status: ${delegation.lastStatus ?? delegation.status}`, + delegation.lastTurnId ? `Turn: ${delegation.lastTurnId}` : undefined, + "", + "Result:", + delegation.lastFinal ?? "(no final assistant message captured)", + ].filter((line): line is string => line !== undefined).join("\n"); +} + +function wakePrompt( + wake: DiscordGatewayPendingWake, + delegations: DiscordGatewayDelegation[], +): string { + const matching = delegations.filter((delegation) => + wake.delegationIds.includes(delegation.id) + ); + const summary = matching.map((delegation) => + `- ${delegation.title} (${delegation.id}): ${delegation.lastStatus ?? delegation.status}` + ).join("\n"); + return [ + "[discord-gateway wake]", + wake.reason, + wake.groupId ? `Group: ${wake.groupId}` : undefined, + "", + "Delegation results have already been injected into this thread history.", + "Review them and decide the next step.", + summary ? ["", "Delegations:", summary].join("\n") : undefined, + ].filter((line): line is string => line !== undefined).join("\n"); +} + +function sameStringSet(left: string[], right: string[]): boolean { + if (left.length !== right.length) { + return false; + } + const rightSet = new Set(right); + return left.every((value) => rightSet.has(value)); +} + +function wakeId( + kind: DiscordGatewayPendingWake["kind"], + groupId: string | undefined, + delegationIds: string[], +): string { + return `wake-${createHash("sha256").update( + JSON.stringify({ kind, groupId, delegationIds }), + ).digest("hex").slice(0, 12)}`; +} + function parseGatewayCommand(content: string): "status" | undefined { const normalized = content.trim().toLowerCase(); return normalized === "status" || normalized === "/status" diff --git a/apps/discord-bridge/src/state.ts b/apps/discord-bridge/src/state.ts index 16873b3..d3f0cd4 100644 --- a/apps/discord-bridge/src/state.ts +++ b/apps/discord-bridge/src/state.ts @@ -117,6 +117,9 @@ function parseGateway(value: unknown): DiscordGatewayState | undefined { delegations: Array.isArray(value.delegations) ? value.delegations.map(parseGatewayDelegation) : [], + pendingWakes: Array.isArray(value.pendingWakes) + ? value.pendingWakes.map(parseGatewayPendingWake) + : [], }; } @@ -129,7 +132,8 @@ function parseGatewayDelegation(value: unknown): DiscordGatewayDelegation { status !== "active" && status !== "idle" && status !== "failed" && - status !== "complete" + status !== "complete" && + status !== "reported" ) { throw new Error("Invalid Discord bridge gateway delegation status"); } @@ -142,13 +146,57 @@ function parseGatewayDelegation(value: unknown): DiscordGatewayDelegation { title: requiredString(value.title, "gateway.delegations.title"), status, cwd: optionalString(value.cwd), + groupId: optionalString(value.groupId), + returnMode: parseReturnMode(value.returnMode), discordDetailThreadId: optionalString(value.discordDetailThreadId), parentDiscordMessageId: optionalString(value.parentDiscordMessageId), + lastTurnId: optionalString(value.lastTurnId), + lastStatus: optionalString(value.lastStatus), + lastFinal: optionalString(value.lastFinal), + completedAt: optionalString(value.completedAt), + injectedAt: optionalString(value.injectedAt), + mirroredAt: optionalString(value.mirroredAt), + reportedAt: optionalString(value.reportedAt), createdAt: requiredString(value.createdAt, "gateway.delegations.createdAt"), updatedAt: requiredString(value.updatedAt, "gateway.delegations.updatedAt"), }; } +function parseGatewayPendingWake( + value: unknown, +): NonNullable[number] { + if (!isRecord(value)) { + throw new Error("Invalid Discord bridge gateway pending wake"); + } + const kind = value.kind === "delegation" || value.kind === "group" + ? value.kind + : undefined; + if (!kind) { + throw new Error("Invalid Discord bridge gateway pending wake kind"); + } + return { + id: requiredString(value.id, "gateway.pendingWakes.id"), + kind, + delegationIds: Array.isArray(value.delegationIds) + ? uniqueStrings(value.delegationIds) + : [], + groupId: optionalString(value.groupId), + reason: requiredString(value.reason, "gateway.pendingWakes.reason"), + createdAt: requiredString(value.createdAt, "gateway.pendingWakes.createdAt"), + startedAt: optionalString(value.startedAt), + }; +} + +function parseReturnMode(value: unknown): DiscordGatewayDelegation["returnMode"] { + return value === "detached" || + value === "record_only" || + value === "wake_on_done" || + value === "wake_on_group" || + value === "manual" + ? value + : undefined; +} + function parseActiveTurn(value: unknown): DiscordBridgeActiveTurn { if (!isRecord(value)) { throw new Error("Invalid Discord bridge active turn"); diff --git a/apps/discord-bridge/src/types.ts b/apps/discord-bridge/src/types.ts index d2044ca..5037b9d 100644 --- a/apps/discord-bridge/src/types.ts +++ b/apps/discord-bridge/src/types.ts @@ -128,6 +128,7 @@ export type CodexBridgeClient = { startTurn(params: v2.TurnStartParams): Promise; steerTurn(params: v2.TurnSteerParams): Promise; readThread(params: v2.ThreadReadParams): Promise; + injectThreadItems(params: v2.ThreadInjectItemsParams): Promise; listThreads(params: v2.ThreadListParams): Promise; getThreadGoal(params: v2.ThreadGoalGetParams): Promise; respond(id: string | number, result: unknown): void; @@ -151,20 +152,47 @@ export type DiscordGatewayState = { createdAt?: string; toolsVersion?: number; delegations: DiscordGatewayDelegation[]; + pendingWakes?: DiscordGatewayPendingWake[]; }; +export type DiscordGatewayDelegationReturnMode = + | "detached" + | "record_only" + | "wake_on_done" + | "wake_on_group" + | "manual"; + export type DiscordGatewayDelegation = { id: string; codexThreadId: string; title: string; - status: "active" | "idle" | "failed" | "complete"; + status: "active" | "idle" | "failed" | "complete" | "reported"; cwd?: string; + groupId?: string; + returnMode?: DiscordGatewayDelegationReturnMode; discordDetailThreadId?: string; parentDiscordMessageId?: string; + lastTurnId?: string; + lastStatus?: string; + lastFinal?: string; + completedAt?: string; + injectedAt?: string; + mirroredAt?: string; + reportedAt?: string; createdAt: string; updatedAt: string; }; +export type DiscordGatewayPendingWake = { + id: string; + kind: "delegation" | "group"; + delegationIds: string[]; + groupId?: string; + reason: string; + createdAt: string; + startedAt?: string; +}; + export type DiscordBridgeSession = { discordThreadId: string; parentChannelId: string; diff --git a/apps/discord-bridge/test/bridge.test.ts b/apps/discord-bridge/test/bridge.test.ts index e8dbb70..a62498b 100644 --- a/apps/discord-bridge/test/bridge.test.ts +++ b/apps/discord-bridge/test/bridge.test.ts @@ -224,6 +224,286 @@ describe("DiscordCodexBridge", () => { await bridge.stop(); }); + test("gateway records group delegation results and wakes after the group finishes", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + reconcileIntervalMs: 10, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + for (const [index, title] of ["Workspace A", "Workspace B"].entries()) { + client.emitRequest({ + id: `tool-${index}`, + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: `/workspace/${index}`, + title, + prompt: `Inspect ${title}.`, + groupId: "fanout", + }, + }, + }); + await waitFor(() => client.responses.length === index + 1); + } + + client.threadTurns.set("codex-thread-2", [ + completedTurn("turn-1", "Result A."), + ]); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(2); + expect(transport.messages.some((message) => + message.channelId === "home-channel" && + message.text.includes("Result A.") + )).toBe(true); + + client.threadTurns.set("codex-thread-3", [ + completedTurn("turn-2", "Result B."), + ]); + await waitFor(() => client.startTurnCalls.length === 3); + expect(client.injectThreadItemsCalls).toHaveLength(2); + expect(client.startTurnCalls[2]).toEqual( + expect.objectContaining({ + threadId: "codex-thread-1", + }), + ); + expect(inputText(client.startTurnCalls[2]?.input[0])).toContain( + "Delegation group fanout completed.", + ); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( + expect.objectContaining({ + kind: "group", + groupId: "fanout", + startedAt: "2026-05-14T12:00:00.000Z", + }), + ); + await bridge.stop(); + }); + + test("gateway detached delegations complete without injecting or waking the main thread", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + reconcileIntervalMs: 10, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/detached", + title: "Detached workspace", + prompt: "Prepare this for a human.", + returnMode: "detached", + }, + }, + }); + + await waitFor(() => client.responses.length === 1); + client.threadTurns.set("codex-thread-2", [ + completedTurn("turn-1", "Detached result."), + ]); + await waitFor(() => + bridge.stateForTest().gateway?.delegations[0]?.status === "complete" + ); + expect(client.injectThreadItemsCalls).toEqual([]); + expect(client.startTurnCalls).toHaveLength(1); + expect(transport.messages.some((message) => + message.text.includes("Detached result.") + )).toBe(false); + await bridge.stop(); + }); + + test("gateway queues delegation wake while the main operator thread is busy", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + reconcileIntervalMs: 10, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + transport.emit({ + kind: "message", + channelId: "home-channel", + messageId: "home-message-1", + author: { id: "user-1", name: "Peezy", isBot: false }, + content: "work on a long-running main task", + createdAt: "2026-05-14T12:00:00.000Z", + }); + await waitFor(() => bridge.stateForTest().activeTurns.length === 1); + + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/side", + title: "Side task", + prompt: "Finish this side task.", + }, + }, + }); + + await waitFor(() => client.responses.length === 1); + client.threadTurns.set("codex-thread-2", [ + completedTurn("turn-2", "Side task result."), + ]); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(2); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual( + expect.objectContaining({ + kind: "delegation", + }), + ); + expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).not.toHaveProperty( + "startedAt", + ); + await bridge.stop(); + }); + + test("gateway record-only delegations inject and mirror without waking", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + reconcileIntervalMs: 10, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/record", + title: "Record only task", + prompt: "Record this result.", + returnMode: "record_only", + }, + }, + }); + + await waitFor(() => client.responses.length === 1); + client.threadTurns.set("codex-thread-2", [ + completedTurn("turn-1", "Record-only result."), + ]); + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(1); + expect(bridge.stateForTest().gateway?.pendingWakes ?? []).toEqual([]); + expect(transport.messages.some((message) => + message.text.includes("Record-only result.") + )).toBe(true); + await bridge.stop(); + }); + + test("gateway manually flushes completed manual delegation results", async () => { + const client = new FakeCodexClient(); + const transport = new FakeDiscordTransport(); + const bridge = new DiscordCodexBridge({ + client, + transport, + store: new MemoryStateStore(), + config: testConfig({ + gateway: { homeChannelId: "home-channel" }, + reconcileIntervalMs: 10, + }), + now: () => new Date("2026-05-14T12:00:00.000Z"), + }); + + await bridge.start(); + await waitFor(() => bridge.stateForTest().sessions.length === 1); + client.emitRequest({ + id: "tool-1", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "start_delegation", + arguments: { + cwd: "/workspace/manual", + title: "Manual task", + prompt: "Finish manually.", + returnMode: "manual", + }, + }, + }); + + await waitFor(() => client.responses.length === 1); + client.threadTurns.set("codex-thread-2", [ + completedTurn("turn-1", "Manual result."), + ]); + await waitFor(() => + bridge.stateForTest().gateway?.delegations[0]?.status === "complete" + ); + expect(client.injectThreadItemsCalls).toEqual([]); + client.emitRequest({ + id: "tool-2", + method: "item/tool/call", + params: { + threadId: "codex-thread-1", + namespace: "codex_gateway", + tool: "flush_delegation_results", + arguments: { + delegationId: bridge.stateForTest().gateway?.delegations[0]?.id, + wake: "false", + }, + }, + }); + + await waitFor(() => client.injectThreadItemsCalls.length === 1); + expect(client.startTurnCalls).toHaveLength(1); + expect(transport.messages.some((message) => + message.text.includes("Manual result.") + )).toBe(true); + await bridge.stop(); + }); + test("answers gateway status in the home channel without starting a turn", async () => { const client = new FakeCodexClient(); const transport = new FakeDiscordTransport(); @@ -2650,6 +2930,7 @@ class FakeCodexClient implements CodexBridgeClient { startTurnCalls: v2.TurnStartParams[] = []; steerTurnCalls: v2.TurnSteerParams[] = []; readThreadCalls: v2.ThreadReadParams[] = []; + injectThreadItemsCalls: v2.ThreadInjectItemsParams[] = []; listThreadsCalls: v2.ThreadListParams[] = []; getThreadGoalCalls: v2.ThreadGoalGetParams[] = []; responses: Array<{ id: string | number; result: unknown }> = []; @@ -2750,6 +3031,13 @@ class FakeCodexClient implements CodexBridgeClient { } as unknown as v2.ThreadReadResponse; } + async injectThreadItems( + params: v2.ThreadInjectItemsParams, + ): Promise { + this.injectThreadItemsCalls.push(params); + return {}; + } + async listThreads(params: v2.ThreadListParams): Promise { this.listThreadsCalls.push(params); return { @@ -2961,6 +3249,22 @@ function gatewayToolResult(value: unknown): unknown { return text ? JSON.parse(text) : undefined; } +function completedTurn(id: string, text: string): v2.Turn { + return { + id, + status: "completed", + items: [ + { + type: "agentMessage", + id: `${id}-message`, + text, + phase: "final_answer", + memoryCitation: null, + }, + ], + } as unknown as v2.Turn; +} + function statusMessageText(transport: FakeDiscordTransport): string { return transport.messages.find((message) => message.id === "message-out-1") ?.text ?? ""; diff --git a/apps/discord-bridge/test/state.test.ts b/apps/discord-bridge/test/state.test.ts index 69ce86a..17857f0 100644 --- a/apps/discord-bridge/test/state.test.ts +++ b/apps/discord-bridge/test/state.test.ts @@ -33,6 +33,16 @@ describe("JsonFileStateStore", () => { updatedAt: "2026-05-11T00:00:02.000Z", }, ], + pendingWakes: [ + { + id: "wake-1", + kind: "group", + delegationIds: ["delegation-1"], + groupId: "patchbay", + reason: "Group patchbay completed.", + createdAt: "2026-05-11T00:00:03.000Z", + }, + ], }, sessions: [ { @@ -101,6 +111,16 @@ describe("JsonFileStateStore", () => { updatedAt: "2026-05-11T00:00:02.000Z", }, ], + pendingWakes: [ + { + id: "wake-1", + kind: "group", + delegationIds: ["delegation-1"], + groupId: "patchbay", + reason: "Group patchbay completed.", + createdAt: "2026-05-11T00:00:03.000Z", + }, + ], }); expect(state.sessions).toHaveLength(2); expect(state.sessions[0]?.ownerUserId).toBe("user-1"); diff --git a/packages/codex-client/src/app-server/browser-client.ts b/packages/codex-client/src/app-server/browser-client.ts index 5892daf..f52b349 100644 --- a/packages/codex-client/src/app-server/browser-client.ts +++ b/packages/codex-client/src/app-server/browser-client.ts @@ -111,6 +111,12 @@ export class CodexBrowserAppServerClient extends CodexEventEmitter { return this.request("thread/read", params); } + injectThreadItems( + params: v2.ThreadInjectItemsParams, + ): Promise { + return this.request("thread/inject_items", params); + } + startTurn(params: v2.TurnStartParams): Promise { return this.request("turn/start", params); } diff --git a/packages/codex-client/src/app-server/client.ts b/packages/codex-client/src/app-server/client.ts index b4baedd..720969b 100644 --- a/packages/codex-client/src/app-server/client.ts +++ b/packages/codex-client/src/app-server/client.ts @@ -117,6 +117,12 @@ export class CodexAppServerClient extends CodexEventEmitter { return this.request("thread/read", params); } + injectThreadItems( + params: v2.ThreadInjectItemsParams, + ): Promise { + return this.request("thread/inject_items", params); + } + listThreadTurns( params: v2.ThreadTurnsListParams, ): Promise {