Add Discord gateway delegation orchestration

This commit is contained in:
matamune 2026-05-14 15:29:28 +00:00
parent d48fd258fa
commit c43b6925bd
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
8 changed files with 887 additions and 3 deletions

View file

@ -46,6 +46,9 @@ are attached only to that main thread and expose:
- `resume_delegation`
- `send_delegation`
- `read_delegation`
- `set_delegation_policy`
- `flush_delegation_results`
- `list_delegation_groups`
- `list_flow_runs`
- `list_flow_events`
@ -56,8 +59,23 @@ Those tools can:
- resume a delegated Codex session by thread id
- send a turn to a delegated session
- observe or summarize delegated session state
- group delegations for fan-out/fan-in coordination
- record completed delegation results into the main operator thread
- inspect flow backend state through `CODEX_FLOW_BACKEND_URL`
Gateway state stores delegation records, including optional Discord detail
thread ids for noisy work. Delegated Codex sessions do not receive the privileged
gateway tools; only the main operator thread can manage delegation.
Delegations support return modes:
- `wake_on_done`: inject and mirror the result, then wake the main operator when idle
- `wake_on_group`: inject and mirror each result, then wake once the whole group is terminal
- `record_only`: inject and mirror results without waking the main operator
- `manual`: keep results in gateway state until `flush_delegation_results`
- `detached`: do not loop results back to the main thread; useful for human-continued threads
Automatic result return uses `thread/inject_items` to append structured
delegation results to the main operator thread's model-visible history. Starting
a main-thread turn is a separate wake step, so long-running main goals are not
interrupted; wakes are queued until the main operator thread is idle.

View file

@ -16,6 +16,8 @@ import type {
CodexBridgeClient,
DiscordBridgeConfig,
DiscordGatewayDelegation,
DiscordGatewayDelegationReturnMode,
DiscordGatewayPendingWake,
DiscordBridgeSession,
DiscordBridgeState,
DiscordBridgeStateStore,
@ -29,6 +31,7 @@ import type {
const maxDiscordMessageLength = 2000;
const gatewayToolsVersion = 1;
const defaultGatewayReconcileIntervalMs = 10_000;
type ThreadSnapshot = {
terminalTurnIds: string[];
@ -51,6 +54,7 @@ export class DiscordCodexBridge {
#dedupe: MessageDeduplicator;
#logger: DiscordBridgeLogger;
#consoleOutput: DiscordConsoleOutput | undefined;
#gatewayReconcileTimer: Timer | undefined;
constructor(options: {
client: CodexBridgeClient;
@ -128,12 +132,17 @@ export class DiscordCodexBridge {
for (const runner of this.#runnersByDiscordThread.values()) {
runner.start();
}
this.#startGatewayReconciler();
}
async stop(): Promise<void> {
this.#debug("bridge.stop", {
runners: this.#runnersByDiscordThread.size,
});
if (this.#gatewayReconcileTimer) {
clearInterval(this.#gatewayReconcileTimer);
this.#gatewayReconcileTimer = undefined;
}
await Promise.all(
[...this.#runnersByDiscordThread.values()].map((runner) => runner.stop()),
);
@ -666,6 +675,17 @@ export class DiscordCodexBridge {
if (tool === "read_delegation") {
return await this.#readDelegation(args);
}
if (tool === "set_delegation_policy") {
return await this.#setDelegationPolicy(args);
}
if (tool === "flush_delegation_results") {
return await this.#flushDelegationResults(args);
}
if (tool === "list_delegation_groups") {
return {
groups: this.#delegationGroups(),
};
}
if (tool === "list_flow_runs") {
return await this.#flowBackendGet("/runs", args);
}
@ -721,6 +741,7 @@ export class DiscordCodexBridge {
createdAt: existing.createdAt,
toolsVersion: state.gateway?.toolsVersion,
delegations: state.gateway?.delegations ?? [],
pendingWakes: state.gateway?.pendingWakes ?? [],
};
this.#registerRunner(existing);
await this.#persist();
@ -783,6 +804,7 @@ export class DiscordCodexBridge {
? state.gateway?.toolsVersion
: gatewayToolsVersion,
delegations: state.gateway?.delegations ?? [],
pendingWakes: state.gateway?.pendingWakes ?? [],
};
state.sessions.push(session);
this.#registerRunner(session);
@ -819,17 +841,37 @@ export class DiscordCodexBridge {
homeChannelId: this.config.gateway?.homeChannelId ?? "",
mainThreadId: this.#gatewaySession()?.codexThreadId,
delegations: [],
pendingWakes: [],
};
}
state.gateway.delegations ??= [];
return state.gateway.delegations;
}
#gatewayPendingWakes(): DiscordGatewayPendingWake[] {
const state = this.#requireState();
if (!state.gateway) {
state.gateway = {
homeChannelId: this.config.gateway?.homeChannelId ?? "",
mainThreadId: this.#gatewaySession()?.codexThreadId,
delegations: [],
pendingWakes: [],
};
}
state.gateway.pendingWakes ??= [];
return state.gateway.pendingWakes;
}
async #startDelegation(args: Record<string, unknown>): Promise<unknown> {
const cwd = requiredArg(args, "cwd");
const title = stringValue(args.title) ?? firstLine(stringValue(args.prompt)) ??
`Delegated ${compactId(cwd)}`;
const prompt = stringValue(args.prompt);
const groupId = stringValue(args.groupId);
const returnMode = returnModeFromArgs(
args,
groupId ? "wake_on_group" : "wake_on_done",
);
const started = await this.client.startThread(this.#threadStartParams(cwd));
const codexThreadId = started.thread.id;
await this.client.setThreadName({
@ -843,6 +885,8 @@ export class DiscordCodexBridge {
title,
status: prompt ? "active" : "idle",
cwd,
groupId,
returnMode,
discordDetailThreadId: stringValue(args.discordDetailThreadId),
parentDiscordMessageId: stringValue(args.parentDiscordMessageId),
createdAt: now,
@ -863,6 +907,7 @@ export class DiscordCodexBridge {
outputSchema: null,
});
turnId = turn.turn.id;
delegation.lastTurnId = turnId;
}
await this.#persist();
return { delegation, turnId };
@ -871,6 +916,7 @@ export class DiscordCodexBridge {
async #resumeDelegation(args: Record<string, unknown>): Promise<unknown> {
const codexThreadId = requiredArg(args, "threadId");
const cwd = stringValue(args.cwd);
const groupId = stringValue(args.groupId);
const resumed = await this.client.resumeThread(this.#threadResumeParams(codexThreadId, cwd));
const now = this.#now().toISOString();
const delegation = this.#upsertDelegation({
@ -879,6 +925,8 @@ export class DiscordCodexBridge {
title: stringValue(args.title) ?? `Delegated ${compactId(codexThreadId)}`,
status: "idle",
cwd: cwd ?? resumeResponseCwd(resumed),
groupId,
returnMode: returnModeFromArgs(args, "manual"),
discordDetailThreadId: stringValue(args.discordDetailThreadId),
parentDiscordMessageId: stringValue(args.parentDiscordMessageId),
createdAt: this.#delegationForThread(codexThreadId)?.createdAt ?? now,
@ -891,6 +939,14 @@ export class DiscordCodexBridge {
async #sendDelegation(args: Record<string, unknown>): Promise<unknown> {
const delegation = this.#requireDelegation(args);
const prompt = requiredArg(args, "prompt");
const groupId = stringValue(args.groupId);
if (groupId) {
delegation.groupId = groupId;
}
delegation.returnMode = returnModeFromArgs(
args,
delegation.returnMode ?? (delegation.groupId ? "wake_on_group" : "wake_on_done"),
);
const turn = await this.client.startTurn({
threadId: delegation.codexThreadId,
input: [{ type: "text", text: prompt, text_elements: [] }],
@ -904,6 +960,13 @@ export class DiscordCodexBridge {
outputSchema: null,
});
delegation.status = "active";
delegation.lastTurnId = turn.turn.id;
delegation.lastStatus = undefined;
delegation.lastFinal = undefined;
delegation.completedAt = undefined;
delegation.injectedAt = undefined;
delegation.mirroredAt = undefined;
delegation.reportedAt = undefined;
delegation.updatedAt = this.#now().toISOString();
await this.#persist();
return { delegation, turnId: turn.turn.id };
@ -920,12 +983,18 @@ export class DiscordCodexBridge {
const latest = record(turns[turns.length - 1]);
const latestStatus = stringValue(latest.status);
if (latestStatus === "completed") {
delegation.status = "idle";
delegation.status = "complete";
} else if (latestStatus === "failed" || latestStatus === "interrupted") {
delegation.status = "failed";
} else if (latestStatus) {
delegation.status = "active";
}
delegation.lastTurnId = stringValue(latest.id) ?? delegation.lastTurnId;
delegation.lastStatus = latestStatus ?? delegation.lastStatus;
delegation.lastFinal = snapshot.lastFinal?.text ?? delegation.lastFinal;
if (latestStatus && isTerminalTurnStatus(latestStatus)) {
delegation.completedAt ??= this.#now().toISOString();
}
delegation.updatedAt = this.#now().toISOString();
await this.#persist();
return {
@ -937,6 +1006,270 @@ export class DiscordCodexBridge {
};
}
async #setDelegationPolicy(args: Record<string, unknown>): Promise<unknown> {
const groupId = stringValue(args.groupId);
const mode = returnModeFromArgs(args, undefined);
if (!mode) {
throw new Error("Missing required argument: returnMode");
}
const delegations = groupId
? this.#gatewayDelegations().filter((delegation) => delegation.groupId === groupId)
: [this.#requireDelegation(args)];
if (delegations.length === 0) {
throw new Error("No matching gateway delegations.");
}
const now = this.#now().toISOString();
for (const delegation of delegations) {
delegation.returnMode = mode;
delegation.updatedAt = now;
}
await this.#persist();
return { delegations };
}
async #flushDelegationResults(args: Record<string, unknown>): Promise<unknown> {
await this.#reconcileDelegations();
const groupId = stringValue(args.groupId);
const delegations = groupId
? this.#gatewayDelegations().filter((delegation) => delegation.groupId === groupId)
: stringValue(args.delegationId) || stringValue(args.threadId) || stringValue(args.id)
? [this.#requireDelegation(args)]
: this.#gatewayDelegations();
const flushed: DiscordGatewayDelegation[] = [];
for (const delegation of delegations) {
if (!delegation.lastFinal || !isTerminalDelegation(delegation)) {
continue;
}
await this.#recordDelegationResult(delegation);
await this.#mirrorDelegationResult(delegation);
flushed.push(delegation);
}
if (flushed.length > 0 && stringValue(args.wake) !== "false") {
this.#enqueueWake({
kind: groupId ? "group" : "delegation",
groupId,
delegationIds: flushed.map((delegation) => delegation.id),
reason: groupId
? `Delegation group ${groupId} was manually flushed.`
: "Delegation results were manually flushed.",
});
await this.#processPendingWakes();
}
await this.#persist();
return { flushed };
}
#delegationGroups(): Array<{
groupId: string;
total: number;
active: number;
terminal: number;
pendingWake: boolean;
}> {
const groups = new Map<string, DiscordGatewayDelegation[]>();
for (const delegation of this.#gatewayDelegations()) {
if (!delegation.groupId) {
continue;
}
const existing = groups.get(delegation.groupId) ?? [];
existing.push(delegation);
groups.set(delegation.groupId, existing);
}
return [...groups.entries()].map(([groupId, delegations]) => ({
groupId,
total: delegations.length,
active: delegations.filter((delegation) => delegation.status === "active").length,
terminal: delegations.filter(isTerminalDelegation).length,
pendingWake: this.#gatewayPendingWakes().some((wake) =>
wake.groupId === groupId && !wake.startedAt
),
}));
}
#startGatewayReconciler(): void {
if (!this.config.gateway || this.#gatewayReconcileTimer) {
return;
}
const intervalMs = this.config.reconcileIntervalMs ?? defaultGatewayReconcileIntervalMs;
this.#gatewayReconcileTimer = setInterval(() => {
void this.#reconcileDelegations().catch((error) => {
this.#debug("gateway.delegations.reconcile.failed", {
error: errorMessage(error),
});
});
}, intervalMs);
void this.#reconcileDelegations().catch((error) => {
this.#debug("gateway.delegations.reconcile.failed", {
error: errorMessage(error),
});
});
}
async #reconcileDelegations(): Promise<void> {
const delegations = this.#gatewayDelegations();
let changed = false;
for (const delegation of delegations) {
if (delegation.status !== "active") {
continue;
}
const response = await this.client.readThread({
threadId: delegation.codexThreadId,
includeTurns: true,
});
const turns = Array.isArray(response.thread.turns) ? response.thread.turns : [];
const latest = record(turns[turns.length - 1]);
const latestStatus = stringValue(latest.status);
if (!latestStatus || !isTerminalTurnStatus(latestStatus)) {
continue;
}
const snapshot = threadSnapshotFromThread(response.thread);
delegation.status = latestStatus === "completed" ? "complete" : "failed";
delegation.lastTurnId = stringValue(latest.id) ?? snapshot.lastFinal?.turnId ??
delegation.lastTurnId;
delegation.lastStatus = latestStatus;
delegation.lastFinal = snapshot.lastFinal?.text ?? delegation.lastFinal;
delegation.completedAt = this.#now().toISOString();
delegation.updatedAt = delegation.completedAt;
changed = true;
}
for (const delegation of delegations) {
if (!isTerminalDelegation(delegation) || !delegation.lastFinal) {
continue;
}
const mode = delegation.returnMode ?? "manual";
if (mode === "detached" || mode === "manual") {
continue;
}
await this.#recordDelegationResult(delegation);
await this.#mirrorDelegationResult(delegation);
changed = true;
if (mode === "wake_on_done") {
this.#enqueueWake({
kind: "delegation",
delegationIds: [delegation.id],
reason: `Delegation ${delegation.title} completed.`,
});
}
if (mode === "wake_on_group" && delegation.groupId) {
const group = delegations.filter((candidate) =>
candidate.groupId === delegation.groupId
);
if (group.length > 0 && group.every(isTerminalDelegation)) {
this.#enqueueWake({
kind: "group",
groupId: delegation.groupId,
delegationIds: group.map((candidate) => candidate.id),
reason: `Delegation group ${delegation.groupId} completed.`,
});
}
}
}
await this.#processPendingWakes();
if (changed) {
await this.#persist();
}
}
async #recordDelegationResult(delegation: DiscordGatewayDelegation): Promise<void> {
const gatewaySession = this.#gatewaySession();
if (!gatewaySession || delegation.injectedAt || !delegation.lastFinal) {
return;
}
await this.client.injectThreadItems({
threadId: gatewaySession.codexThreadId,
items: [
{
type: "message",
role: "user",
content: [
{
type: "input_text",
text: delegationResultText(delegation),
},
],
},
],
});
delegation.injectedAt = this.#now().toISOString();
delegation.updatedAt = delegation.injectedAt;
}
async #mirrorDelegationResult(delegation: DiscordGatewayDelegation): Promise<void> {
const homeChannelId = this.config.gateway?.homeChannelId;
if (!homeChannelId || delegation.mirroredAt || !delegation.lastFinal) {
return;
}
await this.transport.sendMessage(homeChannelId, delegationResultText(delegation));
delegation.mirroredAt = this.#now().toISOString();
delegation.updatedAt = delegation.mirroredAt;
}
#enqueueWake(input: {
kind: DiscordGatewayPendingWake["kind"];
delegationIds: string[];
groupId?: string;
reason: string;
}): void {
const delegationIds = [...new Set(input.delegationIds)].sort();
if (delegationIds.length === 0) {
return;
}
const wakes = this.#gatewayPendingWakes();
if (wakes.some((wake) =>
!wake.startedAt &&
wake.kind === input.kind &&
wake.groupId === input.groupId &&
sameStringSet(wake.delegationIds, delegationIds)
)) {
return;
}
wakes.push({
id: wakeId(input.kind, input.groupId, delegationIds),
kind: input.kind,
groupId: input.groupId,
delegationIds,
reason: input.reason,
createdAt: this.#now().toISOString(),
});
}
async #processPendingWakes(): Promise<void> {
const gatewaySession = this.#gatewaySession();
if (!gatewaySession || this.#isSessionRunning(gatewaySession, this.#requireState())) {
return;
}
const wake = this.#gatewayPendingWakes().find((candidate) => !candidate.startedAt);
if (!wake) {
return;
}
const prompt = wakePrompt(wake, this.#gatewayDelegations());
const turn = await this.client.startTurn({
threadId: gatewaySession.codexThreadId,
input: [{ type: "text", text: prompt, text_elements: [] }],
cwd: gatewaySession.cwd ?? this.config.cwd ?? null,
model: this.config.model ?? null,
serviceTier: this.config.serviceTier ?? null,
effort: this.config.effort ?? null,
summary: this.config.summary ?? null,
approvalPolicy: this.config.approvalPolicy ?? null,
permissions: this.config.permissions ?? null,
outputSchema: null,
});
wake.startedAt = this.#now().toISOString();
for (const delegation of this.#gatewayDelegations()) {
if (wake.delegationIds.includes(delegation.id)) {
delegation.reportedAt = wake.startedAt;
delegation.updatedAt = wake.startedAt;
}
}
this.#debug("gateway.wake.started", {
wakeId: wake.id,
turnId: turn.turn.id,
kind: wake.kind,
groupId: wake.groupId,
});
}
async #flowBackendGet(
pathname: string,
args: Record<string, unknown>,
@ -1431,6 +1764,8 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] {
cwd: stringSchema("Workspace cwd for the delegated Codex session."),
title: optionalStringSchema("Human title for the delegated work."),
prompt: optionalStringSchema("Optional first prompt to send to the delegated session."),
groupId: optionalStringSchema("Optional delegation group id for fan-out/fan-in orchestration."),
returnMode: optionalStringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."),
discordDetailThreadId: optionalStringSchema("Optional Discord detail thread id for noisy work."),
parentDiscordMessageId: optionalStringSchema("Optional Discord message id that requested the delegation."),
}, ["cwd"]),
@ -1443,6 +1778,8 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] {
threadId: stringSchema("Existing Codex thread id to resume and track."),
cwd: optionalStringSchema("Optional cwd override for the resumed thread."),
title: optionalStringSchema("Human title for the delegated work."),
groupId: optionalStringSchema("Optional delegation group id for fan-out/fan-in orchestration."),
returnMode: optionalStringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."),
discordDetailThreadId: optionalStringSchema("Optional Discord detail thread id for noisy work."),
parentDiscordMessageId: optionalStringSchema("Optional Discord message id that requested the delegation."),
}, ["threadId"]),
@ -1455,6 +1792,8 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] {
delegationId: optionalStringSchema("Tracked delegation id."),
threadId: optionalStringSchema("Tracked delegated Codex thread id."),
prompt: stringSchema("Prompt to send to the delegated session."),
groupId: optionalStringSchema("Optional delegation group id to assign for this turn."),
returnMode: optionalStringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."),
}, ["prompt"]),
},
{
@ -1466,6 +1805,34 @@ function gatewayToolSpecs(): v2.DynamicToolSpec[] {
threadId: optionalStringSchema("Tracked delegated Codex thread id."),
}),
},
{
namespace: "codex_gateway",
name: "set_delegation_policy",
description: "Update return policy for one delegation or every delegation in a group.",
inputSchema: objectSchema({
delegationId: optionalStringSchema("Tracked delegation id."),
threadId: optionalStringSchema("Tracked delegated Codex thread id."),
groupId: optionalStringSchema("Delegation group id."),
returnMode: stringSchema("Return policy: detached, record_only, wake_on_done, wake_on_group, or manual."),
}, ["returnMode"]),
},
{
namespace: "codex_gateway",
name: "flush_delegation_results",
description: "Manually inject and mirror completed delegation results, optionally waking the main operator.",
inputSchema: objectSchema({
delegationId: optionalStringSchema("Tracked delegation id."),
threadId: optionalStringSchema("Tracked delegated Codex thread id."),
groupId: optionalStringSchema("Delegation group id."),
wake: optionalStringSchema("Set to false to avoid starting a main operator turn."),
}),
},
{
namespace: "codex_gateway",
name: "list_delegation_groups",
description: "List delegation groups and their terminal/active counts.",
inputSchema: objectSchema({}),
},
{
namespace: "codex_gateway",
name: "list_flow_runs",
@ -1516,6 +1883,93 @@ function requiredArg(args: Record<string, unknown>, name: string): string {
return value;
}
function returnModeFromArgs(
args: Record<string, unknown>,
fallback: DiscordGatewayDelegationReturnMode | undefined,
): DiscordGatewayDelegationReturnMode | undefined {
const value = stringValue(args.returnMode) ?? stringValue(args.returnPolicy);
if (!value) {
return fallback;
}
if (value === "immediate") {
return "wake_on_done";
}
if (value === "group_barrier") {
return "wake_on_group";
}
if (
value === "detached" ||
value === "record_only" ||
value === "wake_on_done" ||
value === "wake_on_group" ||
value === "manual"
) {
return value;
}
throw new Error(`Invalid returnMode: ${value}`);
}
function isTerminalDelegation(delegation: DiscordGatewayDelegation): boolean {
return delegation.status === "complete" ||
delegation.status === "failed" ||
delegation.status === "reported";
}
function delegationResultText(delegation: DiscordGatewayDelegation): string {
return [
"[discord-gateway delegation result]",
`Delegation: ${delegation.title}`,
`Delegation ID: ${delegation.id}`,
`Thread: ${delegation.codexThreadId}`,
delegation.groupId ? `Group: ${delegation.groupId}` : undefined,
delegation.cwd ? `Dir: ${delegation.cwd}` : undefined,
`Status: ${delegation.lastStatus ?? delegation.status}`,
delegation.lastTurnId ? `Turn: ${delegation.lastTurnId}` : undefined,
"",
"Result:",
delegation.lastFinal ?? "(no final assistant message captured)",
].filter((line): line is string => line !== undefined).join("\n");
}
function wakePrompt(
wake: DiscordGatewayPendingWake,
delegations: DiscordGatewayDelegation[],
): string {
const matching = delegations.filter((delegation) =>
wake.delegationIds.includes(delegation.id)
);
const summary = matching.map((delegation) =>
`- ${delegation.title} (${delegation.id}): ${delegation.lastStatus ?? delegation.status}`
).join("\n");
return [
"[discord-gateway wake]",
wake.reason,
wake.groupId ? `Group: ${wake.groupId}` : undefined,
"",
"Delegation results have already been injected into this thread history.",
"Review them and decide the next step.",
summary ? ["", "Delegations:", summary].join("\n") : undefined,
].filter((line): line is string => line !== undefined).join("\n");
}
function sameStringSet(left: string[], right: string[]): boolean {
if (left.length !== right.length) {
return false;
}
const rightSet = new Set(right);
return left.every((value) => rightSet.has(value));
}
function wakeId(
kind: DiscordGatewayPendingWake["kind"],
groupId: string | undefined,
delegationIds: string[],
): string {
return `wake-${createHash("sha256").update(
JSON.stringify({ kind, groupId, delegationIds }),
).digest("hex").slice(0, 12)}`;
}
function parseGatewayCommand(content: string): "status" | undefined {
const normalized = content.trim().toLowerCase();
return normalized === "status" || normalized === "/status"

View file

@ -117,6 +117,9 @@ function parseGateway(value: unknown): DiscordGatewayState | undefined {
delegations: Array.isArray(value.delegations)
? value.delegations.map(parseGatewayDelegation)
: [],
pendingWakes: Array.isArray(value.pendingWakes)
? value.pendingWakes.map(parseGatewayPendingWake)
: [],
};
}
@ -129,7 +132,8 @@ function parseGatewayDelegation(value: unknown): DiscordGatewayDelegation {
status !== "active" &&
status !== "idle" &&
status !== "failed" &&
status !== "complete"
status !== "complete" &&
status !== "reported"
) {
throw new Error("Invalid Discord bridge gateway delegation status");
}
@ -142,13 +146,57 @@ function parseGatewayDelegation(value: unknown): DiscordGatewayDelegation {
title: requiredString(value.title, "gateway.delegations.title"),
status,
cwd: optionalString(value.cwd),
groupId: optionalString(value.groupId),
returnMode: parseReturnMode(value.returnMode),
discordDetailThreadId: optionalString(value.discordDetailThreadId),
parentDiscordMessageId: optionalString(value.parentDiscordMessageId),
lastTurnId: optionalString(value.lastTurnId),
lastStatus: optionalString(value.lastStatus),
lastFinal: optionalString(value.lastFinal),
completedAt: optionalString(value.completedAt),
injectedAt: optionalString(value.injectedAt),
mirroredAt: optionalString(value.mirroredAt),
reportedAt: optionalString(value.reportedAt),
createdAt: requiredString(value.createdAt, "gateway.delegations.createdAt"),
updatedAt: requiredString(value.updatedAt, "gateway.delegations.updatedAt"),
};
}
function parseGatewayPendingWake(
value: unknown,
): NonNullable<DiscordGatewayState["pendingWakes"]>[number] {
if (!isRecord(value)) {
throw new Error("Invalid Discord bridge gateway pending wake");
}
const kind = value.kind === "delegation" || value.kind === "group"
? value.kind
: undefined;
if (!kind) {
throw new Error("Invalid Discord bridge gateway pending wake kind");
}
return {
id: requiredString(value.id, "gateway.pendingWakes.id"),
kind,
delegationIds: Array.isArray(value.delegationIds)
? uniqueStrings(value.delegationIds)
: [],
groupId: optionalString(value.groupId),
reason: requiredString(value.reason, "gateway.pendingWakes.reason"),
createdAt: requiredString(value.createdAt, "gateway.pendingWakes.createdAt"),
startedAt: optionalString(value.startedAt),
};
}
function parseReturnMode(value: unknown): DiscordGatewayDelegation["returnMode"] {
return value === "detached" ||
value === "record_only" ||
value === "wake_on_done" ||
value === "wake_on_group" ||
value === "manual"
? value
: undefined;
}
function parseActiveTurn(value: unknown): DiscordBridgeActiveTurn {
if (!isRecord(value)) {
throw new Error("Invalid Discord bridge active turn");

View file

@ -128,6 +128,7 @@ export type CodexBridgeClient = {
startTurn(params: v2.TurnStartParams): Promise<v2.TurnStartResponse>;
steerTurn(params: v2.TurnSteerParams): Promise<v2.TurnSteerResponse>;
readThread(params: v2.ThreadReadParams): Promise<v2.ThreadReadResponse>;
injectThreadItems(params: v2.ThreadInjectItemsParams): Promise<v2.ThreadInjectItemsResponse>;
listThreads(params: v2.ThreadListParams): Promise<v2.ThreadListResponse>;
getThreadGoal(params: v2.ThreadGoalGetParams): Promise<v2.ThreadGoalGetResponse>;
respond(id: string | number, result: unknown): void;
@ -151,20 +152,47 @@ export type DiscordGatewayState = {
createdAt?: string;
toolsVersion?: number;
delegations: DiscordGatewayDelegation[];
pendingWakes?: DiscordGatewayPendingWake[];
};
export type DiscordGatewayDelegationReturnMode =
| "detached"
| "record_only"
| "wake_on_done"
| "wake_on_group"
| "manual";
export type DiscordGatewayDelegation = {
id: string;
codexThreadId: string;
title: string;
status: "active" | "idle" | "failed" | "complete";
status: "active" | "idle" | "failed" | "complete" | "reported";
cwd?: string;
groupId?: string;
returnMode?: DiscordGatewayDelegationReturnMode;
discordDetailThreadId?: string;
parentDiscordMessageId?: string;
lastTurnId?: string;
lastStatus?: string;
lastFinal?: string;
completedAt?: string;
injectedAt?: string;
mirroredAt?: string;
reportedAt?: string;
createdAt: string;
updatedAt: string;
};
export type DiscordGatewayPendingWake = {
id: string;
kind: "delegation" | "group";
delegationIds: string[];
groupId?: string;
reason: string;
createdAt: string;
startedAt?: string;
};
export type DiscordBridgeSession = {
discordThreadId: string;
parentChannelId: string;

View file

@ -224,6 +224,286 @@ describe("DiscordCodexBridge", () => {
await bridge.stop();
});
test("gateway records group delegation results and wakes after the group finishes", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
for (const [index, title] of ["Workspace A", "Workspace B"].entries()) {
client.emitRequest({
id: `tool-${index}`,
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: `/workspace/${index}`,
title,
prompt: `Inspect ${title}.`,
groupId: "fanout",
},
},
});
await waitFor(() => client.responses.length === index + 1);
}
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Result A."),
]);
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(2);
expect(transport.messages.some((message) =>
message.channelId === "home-channel" &&
message.text.includes("Result A.")
)).toBe(true);
client.threadTurns.set("codex-thread-3", [
completedTurn("turn-2", "Result B."),
]);
await waitFor(() => client.startTurnCalls.length === 3);
expect(client.injectThreadItemsCalls).toHaveLength(2);
expect(client.startTurnCalls[2]).toEqual(
expect.objectContaining({
threadId: "codex-thread-1",
}),
);
expect(inputText(client.startTurnCalls[2]?.input[0])).toContain(
"Delegation group fanout completed.",
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
kind: "group",
groupId: "fanout",
startedAt: "2026-05-14T12:00:00.000Z",
}),
);
await bridge.stop();
});
test("gateway detached delegations complete without injecting or waking the main thread", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/detached",
title: "Detached workspace",
prompt: "Prepare this for a human.",
returnMode: "detached",
},
},
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Detached result."),
]);
await waitFor(() =>
bridge.stateForTest().gateway?.delegations[0]?.status === "complete"
);
expect(client.injectThreadItemsCalls).toEqual([]);
expect(client.startTurnCalls).toHaveLength(1);
expect(transport.messages.some((message) =>
message.text.includes("Detached result.")
)).toBe(false);
await bridge.stop();
});
test("gateway queues delegation wake while the main operator thread is busy", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
transport.emit({
kind: "message",
channelId: "home-channel",
messageId: "home-message-1",
author: { id: "user-1", name: "Peezy", isBot: false },
content: "work on a long-running main task",
createdAt: "2026-05-14T12:00:00.000Z",
});
await waitFor(() => bridge.stateForTest().activeTurns.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/side",
title: "Side task",
prompt: "Finish this side task.",
},
},
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-2", "Side task result."),
]);
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(2);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).toEqual(
expect.objectContaining({
kind: "delegation",
}),
);
expect(bridge.stateForTest().gateway?.pendingWakes?.[0]).not.toHaveProperty(
"startedAt",
);
await bridge.stop();
});
test("gateway record-only delegations inject and mirror without waking", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/record",
title: "Record only task",
prompt: "Record this result.",
returnMode: "record_only",
},
},
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Record-only result."),
]);
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(1);
expect(bridge.stateForTest().gateway?.pendingWakes ?? []).toEqual([]);
expect(transport.messages.some((message) =>
message.text.includes("Record-only result.")
)).toBe(true);
await bridge.stop();
});
test("gateway manually flushes completed manual delegation results", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
const bridge = new DiscordCodexBridge({
client,
transport,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
reconcileIntervalMs: 10,
}),
now: () => new Date("2026-05-14T12:00:00.000Z"),
});
await bridge.start();
await waitFor(() => bridge.stateForTest().sessions.length === 1);
client.emitRequest({
id: "tool-1",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "start_delegation",
arguments: {
cwd: "/workspace/manual",
title: "Manual task",
prompt: "Finish manually.",
returnMode: "manual",
},
},
});
await waitFor(() => client.responses.length === 1);
client.threadTurns.set("codex-thread-2", [
completedTurn("turn-1", "Manual result."),
]);
await waitFor(() =>
bridge.stateForTest().gateway?.delegations[0]?.status === "complete"
);
expect(client.injectThreadItemsCalls).toEqual([]);
client.emitRequest({
id: "tool-2",
method: "item/tool/call",
params: {
threadId: "codex-thread-1",
namespace: "codex_gateway",
tool: "flush_delegation_results",
arguments: {
delegationId: bridge.stateForTest().gateway?.delegations[0]?.id,
wake: "false",
},
},
});
await waitFor(() => client.injectThreadItemsCalls.length === 1);
expect(client.startTurnCalls).toHaveLength(1);
expect(transport.messages.some((message) =>
message.text.includes("Manual result.")
)).toBe(true);
await bridge.stop();
});
test("answers gateway status in the home channel without starting a turn", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();
@ -2650,6 +2930,7 @@ class FakeCodexClient implements CodexBridgeClient {
startTurnCalls: v2.TurnStartParams[] = [];
steerTurnCalls: v2.TurnSteerParams[] = [];
readThreadCalls: v2.ThreadReadParams[] = [];
injectThreadItemsCalls: v2.ThreadInjectItemsParams[] = [];
listThreadsCalls: v2.ThreadListParams[] = [];
getThreadGoalCalls: v2.ThreadGoalGetParams[] = [];
responses: Array<{ id: string | number; result: unknown }> = [];
@ -2750,6 +3031,13 @@ class FakeCodexClient implements CodexBridgeClient {
} as unknown as v2.ThreadReadResponse;
}
async injectThreadItems(
params: v2.ThreadInjectItemsParams,
): Promise<v2.ThreadInjectItemsResponse> {
this.injectThreadItemsCalls.push(params);
return {};
}
async listThreads(params: v2.ThreadListParams): Promise<v2.ThreadListResponse> {
this.listThreadsCalls.push(params);
return {
@ -2961,6 +3249,22 @@ function gatewayToolResult(value: unknown): unknown {
return text ? JSON.parse(text) : undefined;
}
function completedTurn(id: string, text: string): v2.Turn {
return {
id,
status: "completed",
items: [
{
type: "agentMessage",
id: `${id}-message`,
text,
phase: "final_answer",
memoryCitation: null,
},
],
} as unknown as v2.Turn;
}
function statusMessageText(transport: FakeDiscordTransport): string {
return transport.messages.find((message) => message.id === "message-out-1")
?.text ?? "";

View file

@ -33,6 +33,16 @@ describe("JsonFileStateStore", () => {
updatedAt: "2026-05-11T00:00:02.000Z",
},
],
pendingWakes: [
{
id: "wake-1",
kind: "group",
delegationIds: ["delegation-1"],
groupId: "patchbay",
reason: "Group patchbay completed.",
createdAt: "2026-05-11T00:00:03.000Z",
},
],
},
sessions: [
{
@ -101,6 +111,16 @@ describe("JsonFileStateStore", () => {
updatedAt: "2026-05-11T00:00:02.000Z",
},
],
pendingWakes: [
{
id: "wake-1",
kind: "group",
delegationIds: ["delegation-1"],
groupId: "patchbay",
reason: "Group patchbay completed.",
createdAt: "2026-05-11T00:00:03.000Z",
},
],
});
expect(state.sessions).toHaveLength(2);
expect(state.sessions[0]?.ownerUserId).toBe("user-1");

View file

@ -111,6 +111,12 @@ export class CodexBrowserAppServerClient extends CodexEventEmitter {
return this.request<v2.ThreadReadResponse>("thread/read", params);
}
injectThreadItems(
params: v2.ThreadInjectItemsParams,
): Promise<v2.ThreadInjectItemsResponse> {
return this.request<v2.ThreadInjectItemsResponse>("thread/inject_items", params);
}
startTurn(params: v2.TurnStartParams): Promise<v2.TurnStartResponse> {
return this.request<v2.TurnStartResponse>("turn/start", params);
}

View file

@ -117,6 +117,12 @@ export class CodexAppServerClient extends CodexEventEmitter {
return this.request<v2.ThreadReadResponse>("thread/read", params);
}
injectThreadItems(
params: v2.ThreadInjectItemsParams,
): Promise<v2.ThreadInjectItemsResponse> {
return this.request<v2.ThreadInjectItemsResponse>("thread/inject_items", params);
}
listThreadTurns(
params: v2.ThreadTurnsListParams,
): Promise<v2.ThreadTurnsListResponse> {