Add Discord gateway delegation tools
This commit is contained in:
parent
625c9c85b8
commit
bec3641b1c
6 changed files with 572 additions and 17 deletions
|
|
@ -15,10 +15,14 @@ CODEX_DISCORD_HOME_CHANNEL_ID=1502107617512919220
|
|||
CODEX_DISCORD_MAIN_THREAD_ID=019e2509-ddbb-7380-b97b-41575092d86b
|
||||
CODEX_DISCORD_ALLOWED_CHANNEL_IDS=1502107617512919220
|
||||
CODEX_DISCORD_DIR=/home/peezy/codex-fork-workspace/codex-flows
|
||||
CODEX_FLOW_BACKEND_URL=http://127.0.0.1:8090
|
||||
```
|
||||
|
||||
`CODEX_DISCORD_MAIN_THREAD_ID` is optional. If omitted, the bridge creates a new
|
||||
main operator thread and stores it in the bridge state file.
|
||||
main operator thread, attaches the privileged gateway tools to it, and stores it
|
||||
in the bridge state file. Existing configured main threads are resumed as-is;
|
||||
recreate the main operator thread if you need to attach gateway tools to a
|
||||
thread that predates gateway mode.
|
||||
|
||||
In the home channel:
|
||||
|
||||
|
|
@ -31,19 +35,29 @@ The prompt sent to the main thread uses `[discord-gateway]` framing so the model
|
|||
knows it is operating as the gateway over the codex-flows backend, not as a
|
||||
single task thread.
|
||||
|
||||
## Delegation Direction
|
||||
## Delegation Tools
|
||||
|
||||
Discord should not become a workspace registry. The main operator thread is the
|
||||
place where routing decisions happen. Future privileged backend or MCP tools
|
||||
should be attached only to that main thread and expose operations such as:
|
||||
place where routing decisions happen. Privileged `codex_gateway` dynamic tools
|
||||
are attached only to that main thread and expose:
|
||||
|
||||
- list active Codex sessions or backend runs
|
||||
- `list_delegations`
|
||||
- `start_delegation`
|
||||
- `resume_delegation`
|
||||
- `send_delegation`
|
||||
- `read_delegation`
|
||||
- `list_flow_runs`
|
||||
- `list_flow_events`
|
||||
|
||||
Those tools can:
|
||||
|
||||
- list tracked delegated Codex sessions and backend runs/events
|
||||
- start a delegated Codex session in a requested cwd
|
||||
- resume a delegated Codex session by thread id
|
||||
- send a turn to a delegated session
|
||||
- observe or summarize delegated session state
|
||||
- dispatch, inspect, or replay flow backend events
|
||||
- inspect flow backend state through `CODEX_FLOW_BACKEND_URL`
|
||||
|
||||
Gateway state already has delegation records for those future tools, including
|
||||
optional Discord detail thread ids for noisy work. Final results should return
|
||||
to the home channel even when detail threads are used.
|
||||
Gateway state stores delegation records, including optional Discord detail
|
||||
thread ids for noisy work. Delegated Codex sessions do not receive the privileged
|
||||
gateway tools; only the main operator thread can manage delegation.
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { createHash } from "node:crypto";
|
||||
|
||||
import type { JsonRpcNotification, JsonRpcRequest } from "@peezy.tech/codex-flows/rpc";
|
||||
import type { JsonValue } from "@peezy.tech/codex-flows/generated/serde_json/JsonValue";
|
||||
import type { v2 } from "@peezy.tech/codex-flows/generated";
|
||||
|
||||
import type { DiscordConsoleOutput } from "./console-output.ts";
|
||||
|
|
@ -13,6 +15,7 @@ import {
|
|||
import type {
|
||||
CodexBridgeClient,
|
||||
DiscordBridgeConfig,
|
||||
DiscordGatewayDelegation,
|
||||
DiscordBridgeSession,
|
||||
DiscordBridgeState,
|
||||
DiscordBridgeStateStore,
|
||||
|
|
@ -569,7 +572,8 @@ export class DiscordCodexBridge {
|
|||
`Delegations: ${delegations.length} tracked, ${activeDelegations.length} active`,
|
||||
"",
|
||||
"**Delegation Backend**",
|
||||
"Status: prepared for privileged backend/MCP tools; no delegation tool is attached yet.",
|
||||
`Status: ${session ? "privileged gateway tools available to the main Codex operator thread" : "waiting for main Codex operator thread"}.`,
|
||||
`Flow backend: \`${this.config.flowBackendUrl ?? "not configured"}\``,
|
||||
"",
|
||||
"**Detail Threads**",
|
||||
"Status: optional detail-thread records are supported in state; automatic detail thread mirroring is not enabled yet.",
|
||||
|
|
@ -597,11 +601,77 @@ export class DiscordCodexBridge {
|
|||
}
|
||||
|
||||
#handleServerRequest(message: JsonRpcRequest): void {
|
||||
this.client.respondError(
|
||||
message.id,
|
||||
-32603,
|
||||
"codex-discord-bridge does not handle app-server requests yet",
|
||||
);
|
||||
if (message.method === "item/tool/call") {
|
||||
void this.#handleDynamicToolCall(message).catch((error) => {
|
||||
this.client.respondError(
|
||||
message.id,
|
||||
-32603,
|
||||
errorMessage(error),
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
this.client.respondError(message.id, -32603, "Unsupported app-server request");
|
||||
}
|
||||
|
||||
async #handleDynamicToolCall(message: JsonRpcRequest): Promise<void> {
|
||||
const params = record(message.params);
|
||||
const threadId = stringValue(params.threadId);
|
||||
const namespace = stringValue(params.namespace);
|
||||
const tool = stringValue(params.tool);
|
||||
if (
|
||||
!threadId ||
|
||||
threadId !== this.#gatewaySession()?.codexThreadId ||
|
||||
namespace !== "codex_gateway" ||
|
||||
!tool
|
||||
) {
|
||||
this.client.respondError(
|
||||
message.id,
|
||||
-32601,
|
||||
"Unknown dynamic tool request",
|
||||
);
|
||||
return;
|
||||
}
|
||||
const result = await this.#callGatewayTool(tool, record(params.arguments));
|
||||
this.client.respond(message.id, {
|
||||
contentItems: [
|
||||
{
|
||||
type: "inputText",
|
||||
text: JSON.stringify(result, null, 2),
|
||||
},
|
||||
],
|
||||
success: true,
|
||||
});
|
||||
}
|
||||
|
||||
async #callGatewayTool(
|
||||
tool: string,
|
||||
args: Record<string, unknown>,
|
||||
): Promise<unknown> {
|
||||
if (tool === "list_delegations") {
|
||||
return {
|
||||
delegations: this.#gatewayDelegations(),
|
||||
};
|
||||
}
|
||||
if (tool === "start_delegation") {
|
||||
return await this.#startDelegation(args);
|
||||
}
|
||||
if (tool === "resume_delegation") {
|
||||
return await this.#resumeDelegation(args);
|
||||
}
|
||||
if (tool === "send_delegation") {
|
||||
return await this.#sendDelegation(args);
|
||||
}
|
||||
if (tool === "read_delegation") {
|
||||
return await this.#readDelegation(args);
|
||||
}
|
||||
if (tool === "list_flow_runs") {
|
||||
return await this.#flowBackendGet("/runs", args);
|
||||
}
|
||||
if (tool === "list_flow_events") {
|
||||
return await this.#flowBackendGet("/events", args);
|
||||
}
|
||||
throw new Error(`Unknown gateway tool: ${tool}`);
|
||||
}
|
||||
|
||||
#registerRunner(session: DiscordBridgeSession): DiscordThreadRunner {
|
||||
|
|
@ -653,7 +723,10 @@ export class DiscordCodexBridge {
|
|||
configuredThreadId,
|
||||
this.config.cwd,
|
||||
))
|
||||
: await this.client.startThread(this.#threadStartParams(this.config.cwd));
|
||||
: await this.client.startThread({
|
||||
...this.#threadStartParams(this.config.cwd),
|
||||
dynamicTools: gatewayToolSpecs(),
|
||||
});
|
||||
const codexThreadId = started.thread.id;
|
||||
if (!configuredThreadId) {
|
||||
await this.client.setThreadName({
|
||||
|
|
@ -704,6 +777,185 @@ export class DiscordCodexBridge {
|
|||
: undefined;
|
||||
}
|
||||
|
||||
#gatewayDelegations(): DiscordGatewayDelegation[] {
|
||||
const state = this.#requireState();
|
||||
if (!state.gateway) {
|
||||
state.gateway = {
|
||||
homeChannelId: this.config.gateway?.homeChannelId ?? "",
|
||||
mainThreadId: this.#gatewaySession()?.codexThreadId,
|
||||
delegations: [],
|
||||
};
|
||||
}
|
||||
state.gateway.delegations ??= [];
|
||||
return state.gateway.delegations;
|
||||
}
|
||||
|
||||
async #startDelegation(args: Record<string, unknown>): Promise<unknown> {
|
||||
const cwd = requiredArg(args, "cwd");
|
||||
const title = stringValue(args.title) ?? firstLine(stringValue(args.prompt)) ??
|
||||
`Delegated ${compactId(cwd)}`;
|
||||
const prompt = stringValue(args.prompt);
|
||||
const started = await this.client.startThread(this.#threadStartParams(cwd));
|
||||
const codexThreadId = started.thread.id;
|
||||
await this.client.setThreadName({
|
||||
threadId: codexThreadId,
|
||||
name: `[delegated] ${title}`,
|
||||
});
|
||||
const now = this.#now().toISOString();
|
||||
const delegation = this.#upsertDelegation({
|
||||
id: delegationId(codexThreadId),
|
||||
codexThreadId,
|
||||
title,
|
||||
status: prompt ? "active" : "idle",
|
||||
cwd,
|
||||
discordDetailThreadId: stringValue(args.discordDetailThreadId),
|
||||
parentDiscordMessageId: stringValue(args.parentDiscordMessageId),
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
let turnId: string | undefined;
|
||||
if (prompt) {
|
||||
const turn = await this.client.startTurn({
|
||||
threadId: codexThreadId,
|
||||
input: [{ type: "text", text: prompt, text_elements: [] }],
|
||||
cwd,
|
||||
model: this.config.model ?? null,
|
||||
serviceTier: this.config.serviceTier ?? null,
|
||||
effort: this.config.effort ?? null,
|
||||
summary: this.config.summary ?? null,
|
||||
approvalPolicy: this.config.approvalPolicy ?? null,
|
||||
permissions: this.config.permissions ?? null,
|
||||
outputSchema: null,
|
||||
});
|
||||
turnId = turn.turn.id;
|
||||
}
|
||||
await this.#persist();
|
||||
return { delegation, turnId };
|
||||
}
|
||||
|
||||
async #resumeDelegation(args: Record<string, unknown>): Promise<unknown> {
|
||||
const codexThreadId = requiredArg(args, "threadId");
|
||||
const cwd = stringValue(args.cwd);
|
||||
const resumed = await this.client.resumeThread(this.#threadResumeParams(codexThreadId, cwd));
|
||||
const now = this.#now().toISOString();
|
||||
const delegation = this.#upsertDelegation({
|
||||
id: stringValue(args.id) ?? delegationId(codexThreadId),
|
||||
codexThreadId,
|
||||
title: stringValue(args.title) ?? `Delegated ${compactId(codexThreadId)}`,
|
||||
status: "idle",
|
||||
cwd: cwd ?? resumeResponseCwd(resumed),
|
||||
discordDetailThreadId: stringValue(args.discordDetailThreadId),
|
||||
parentDiscordMessageId: stringValue(args.parentDiscordMessageId),
|
||||
createdAt: this.#delegationForThread(codexThreadId)?.createdAt ?? now,
|
||||
updatedAt: now,
|
||||
});
|
||||
await this.#persist();
|
||||
return { delegation };
|
||||
}
|
||||
|
||||
async #sendDelegation(args: Record<string, unknown>): Promise<unknown> {
|
||||
const delegation = this.#requireDelegation(args);
|
||||
const prompt = requiredArg(args, "prompt");
|
||||
const turn = await this.client.startTurn({
|
||||
threadId: delegation.codexThreadId,
|
||||
input: [{ type: "text", text: prompt, text_elements: [] }],
|
||||
cwd: delegation.cwd ?? null,
|
||||
model: this.config.model ?? null,
|
||||
serviceTier: this.config.serviceTier ?? null,
|
||||
effort: this.config.effort ?? null,
|
||||
summary: this.config.summary ?? null,
|
||||
approvalPolicy: this.config.approvalPolicy ?? null,
|
||||
permissions: this.config.permissions ?? null,
|
||||
outputSchema: null,
|
||||
});
|
||||
delegation.status = "active";
|
||||
delegation.updatedAt = this.#now().toISOString();
|
||||
await this.#persist();
|
||||
return { delegation, turnId: turn.turn.id };
|
||||
}
|
||||
|
||||
async #readDelegation(args: Record<string, unknown>): Promise<unknown> {
|
||||
const delegation = this.#requireDelegation(args);
|
||||
const response = await this.client.readThread({
|
||||
threadId: delegation.codexThreadId,
|
||||
includeTurns: true,
|
||||
});
|
||||
const snapshot = threadSnapshotFromThread(response.thread);
|
||||
const turns = Array.isArray(response.thread.turns) ? response.thread.turns : [];
|
||||
const latest = record(turns[turns.length - 1]);
|
||||
const latestStatus = stringValue(latest.status);
|
||||
if (latestStatus === "completed") {
|
||||
delegation.status = "idle";
|
||||
} else if (latestStatus === "failed" || latestStatus === "interrupted") {
|
||||
delegation.status = "failed";
|
||||
} else if (latestStatus) {
|
||||
delegation.status = "active";
|
||||
}
|
||||
delegation.updatedAt = this.#now().toISOString();
|
||||
await this.#persist();
|
||||
return {
|
||||
delegation,
|
||||
latestTurnId: stringValue(latest.id),
|
||||
latestStatus,
|
||||
lastFinal: snapshot.lastFinal,
|
||||
terminalTurnIds: snapshot.terminalTurnIds,
|
||||
};
|
||||
}
|
||||
|
||||
async #flowBackendGet(
|
||||
pathname: string,
|
||||
args: Record<string, unknown>,
|
||||
): Promise<unknown> {
|
||||
const baseUrl = this.config.flowBackendUrl;
|
||||
if (!baseUrl) {
|
||||
throw new Error("No flow backend URL configured.");
|
||||
}
|
||||
const url = new URL(pathname, baseUrl.endsWith("/") ? baseUrl : `${baseUrl}/`);
|
||||
for (const [key, value] of Object.entries(args)) {
|
||||
if (value !== undefined && value !== null) {
|
||||
url.searchParams.set(key, String(value));
|
||||
}
|
||||
}
|
||||
const response = await fetch(url);
|
||||
if (!response.ok) {
|
||||
throw new Error(`Flow backend ${url.pathname} failed with ${response.status}`);
|
||||
}
|
||||
return await response.json();
|
||||
}
|
||||
|
||||
#upsertDelegation(input: DiscordGatewayDelegation): DiscordGatewayDelegation {
|
||||
const delegations = this.#gatewayDelegations();
|
||||
const index = delegations.findIndex((delegation) =>
|
||||
delegation.id === input.id ||
|
||||
delegation.codexThreadId === input.codexThreadId
|
||||
);
|
||||
if (index >= 0) {
|
||||
delegations[index] = { ...delegations[index], ...input };
|
||||
return delegations[index] as DiscordGatewayDelegation;
|
||||
}
|
||||
delegations.push(input);
|
||||
return input;
|
||||
}
|
||||
|
||||
#requireDelegation(args: Record<string, unknown>): DiscordGatewayDelegation {
|
||||
const id = stringValue(args.delegationId) ?? stringValue(args.id);
|
||||
const threadId = stringValue(args.threadId);
|
||||
const delegation = this.#gatewayDelegations().find((candidate) =>
|
||||
(id && candidate.id === id) ||
|
||||
(threadId && candidate.codexThreadId === threadId)
|
||||
);
|
||||
if (!delegation) {
|
||||
throw new Error("Unknown gateway delegation.");
|
||||
}
|
||||
return delegation;
|
||||
}
|
||||
|
||||
#delegationForThread(threadId: string): DiscordGatewayDelegation | undefined {
|
||||
return this.#gatewayDelegations().find((delegation) =>
|
||||
delegation.codexThreadId === threadId
|
||||
);
|
||||
}
|
||||
|
||||
#isSessionRunning(
|
||||
session: DiscordBridgeSession,
|
||||
state: DiscordBridgeState,
|
||||
|
|
@ -1128,6 +1380,107 @@ function isDuplicate(state: DiscordBridgeState, messageId: string): boolean {
|
|||
);
|
||||
}
|
||||
|
||||
function gatewayToolSpecs(): v2.DynamicToolSpec[] {
|
||||
return [
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "list_delegations",
|
||||
description: "List delegated Codex sessions tracked by the Discord gateway.",
|
||||
inputSchema: objectSchema({}),
|
||||
},
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "start_delegation",
|
||||
description: "Start a delegated Codex session in a cwd and optionally start its first turn.",
|
||||
inputSchema: objectSchema({
|
||||
cwd: stringSchema("Workspace cwd for the delegated Codex session."),
|
||||
title: optionalStringSchema("Human title for the delegated work."),
|
||||
prompt: optionalStringSchema("Optional first prompt to send to the delegated session."),
|
||||
discordDetailThreadId: optionalStringSchema("Optional Discord detail thread id for noisy work."),
|
||||
parentDiscordMessageId: optionalStringSchema("Optional Discord message id that requested the delegation."),
|
||||
}, ["cwd"]),
|
||||
},
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "resume_delegation",
|
||||
description: "Register an existing Codex thread as delegated work.",
|
||||
inputSchema: objectSchema({
|
||||
threadId: stringSchema("Existing Codex thread id to resume and track."),
|
||||
cwd: optionalStringSchema("Optional cwd override for the resumed thread."),
|
||||
title: optionalStringSchema("Human title for the delegated work."),
|
||||
discordDetailThreadId: optionalStringSchema("Optional Discord detail thread id for noisy work."),
|
||||
parentDiscordMessageId: optionalStringSchema("Optional Discord message id that requested the delegation."),
|
||||
}, ["threadId"]),
|
||||
},
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "send_delegation",
|
||||
description: "Send a prompt as a new turn to a tracked delegated Codex session.",
|
||||
inputSchema: objectSchema({
|
||||
delegationId: optionalStringSchema("Tracked delegation id."),
|
||||
threadId: optionalStringSchema("Tracked delegated Codex thread id."),
|
||||
prompt: stringSchema("Prompt to send to the delegated session."),
|
||||
}, ["prompt"]),
|
||||
},
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "read_delegation",
|
||||
description: "Read and summarize a tracked delegated Codex session.",
|
||||
inputSchema: objectSchema({
|
||||
delegationId: optionalStringSchema("Tracked delegation id."),
|
||||
threadId: optionalStringSchema("Tracked delegated Codex thread id."),
|
||||
}),
|
||||
},
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "list_flow_runs",
|
||||
description: "List runs from the configured codex-flow-systemd-local backend.",
|
||||
inputSchema: objectSchema({
|
||||
eventId: optionalStringSchema("Optional event id filter."),
|
||||
status: optionalStringSchema("Optional run status filter."),
|
||||
limit: optionalStringSchema("Optional max result count."),
|
||||
}),
|
||||
},
|
||||
{
|
||||
namespace: "codex_gateway",
|
||||
name: "list_flow_events",
|
||||
description: "List events from the configured codex-flow-systemd-local backend.",
|
||||
inputSchema: objectSchema({
|
||||
type: optionalStringSchema("Optional event type filter."),
|
||||
limit: optionalStringSchema("Optional max result count."),
|
||||
}),
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
function objectSchema(
|
||||
properties: Record<string, JsonValue>,
|
||||
required: string[] = [],
|
||||
): JsonValue {
|
||||
return {
|
||||
type: "object",
|
||||
properties,
|
||||
required,
|
||||
additionalProperties: false,
|
||||
};
|
||||
}
|
||||
|
||||
function stringSchema(description: string): JsonValue {
|
||||
return { type: "string", description };
|
||||
}
|
||||
|
||||
function optionalStringSchema(description: string): JsonValue {
|
||||
return stringSchema(description);
|
||||
}
|
||||
|
||||
function requiredArg(args: Record<string, unknown>, name: string): string {
|
||||
const value = stringValue(args[name]);
|
||||
if (!value) {
|
||||
throw new Error(`Missing required argument: ${name}`);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function parseGatewayCommand(content: string): "status" | undefined {
|
||||
const normalized = content.trim().toLowerCase();
|
||||
return normalized === "status" || normalized === "/status"
|
||||
|
|
@ -1149,6 +1502,10 @@ function compactId(value: string): string {
|
|||
return value.length > 14 ? `${value.slice(0, 6)}...${value.slice(-6)}` : value;
|
||||
}
|
||||
|
||||
function delegationId(threadId: string): string {
|
||||
return `delegation-${createHash("sha256").update(threadId).digest("hex").slice(0, 12)}`;
|
||||
}
|
||||
|
||||
function clearSummary(input: {
|
||||
deleted: number;
|
||||
running: number;
|
||||
|
|
|
|||
|
|
@ -126,6 +126,10 @@ export function parseConfig(argv: string[], env: NodeJS.ProcessEnv): ParsedConfi
|
|||
),
|
||||
statePath,
|
||||
gateway: gatewayConfig(args, env),
|
||||
flowBackendUrl:
|
||||
stringFlag(args, "flow-backend-url") ??
|
||||
env.CODEX_FLOW_BACKEND_URL ??
|
||||
env.CODEX_GATEWAY_BACKEND_URL,
|
||||
cwd: resolveHomeDir(
|
||||
stringFlag(args, "dir") ??
|
||||
stringFlag(args, "positional-dir") ??
|
||||
|
|
@ -361,6 +365,7 @@ Options:
|
|||
--allowed-channel-ids <ids> Comma-separated parent channel ids
|
||||
--home-channel-id <id> Enable gateway mode for one Discord home channel
|
||||
--main-thread-id <id> Resume an existing Codex operator thread for gateway mode
|
||||
--flow-backend-url <url> Optional codex-flow-systemd-local backend URL
|
||||
[dir] Optional Codex thread directory, resolved from home
|
||||
--dir <path> Codex thread directory, resolved from home
|
||||
--cwd <path> Alias for --dir
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ export type DiscordBridgeConfig = {
|
|||
allowedChannelIds: Set<string>;
|
||||
statePath: string;
|
||||
gateway?: DiscordGatewayConfig;
|
||||
flowBackendUrl?: string;
|
||||
cwd?: string;
|
||||
model?: string;
|
||||
modelProvider?: string;
|
||||
|
|
@ -127,7 +128,9 @@ export type CodexBridgeClient = {
|
|||
startTurn(params: v2.TurnStartParams): Promise<v2.TurnStartResponse>;
|
||||
steerTurn(params: v2.TurnSteerParams): Promise<v2.TurnSteerResponse>;
|
||||
readThread(params: v2.ThreadReadParams): Promise<v2.ThreadReadResponse>;
|
||||
listThreads(params: v2.ThreadListParams): Promise<v2.ThreadListResponse>;
|
||||
getThreadGoal(params: v2.ThreadGoalGetParams): Promise<v2.ThreadGoalGetResponse>;
|
||||
respond(id: string | number, result: unknown): void;
|
||||
respondError(id: string | number, code: number, message: string, data?: unknown): void;
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -52,6 +52,18 @@ describe("DiscordCodexBridge", () => {
|
|||
await bridge.start();
|
||||
await waitFor(() => bridge.stateForTest().sessions.length === 1);
|
||||
expect(client.startThreadCalls).toHaveLength(1);
|
||||
expect(client.startThreadCalls[0]?.dynamicTools).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
namespace: "codex_gateway",
|
||||
name: "start_delegation",
|
||||
}),
|
||||
expect.objectContaining({
|
||||
namespace: "codex_gateway",
|
||||
name: "list_flow_runs",
|
||||
}),
|
||||
]),
|
||||
);
|
||||
expect(client.setThreadNameCalls[0]).toEqual({
|
||||
threadId: "codex-thread-1",
|
||||
name: "[discord-gateway] Codex Gateway",
|
||||
|
|
@ -98,6 +110,119 @@ describe("DiscordCodexBridge", () => {
|
|||
await bridge.stop();
|
||||
});
|
||||
|
||||
test("gateway tool starts and tracks delegated Codex sessions without privileged tools", async () => {
|
||||
const client = new FakeCodexClient();
|
||||
const transport = new FakeDiscordTransport();
|
||||
const store = new MemoryStateStore();
|
||||
const bridge = new DiscordCodexBridge({
|
||||
client,
|
||||
transport,
|
||||
store,
|
||||
config: testConfig({
|
||||
gateway: { homeChannelId: "home-channel" },
|
||||
}),
|
||||
now: () => new Date("2026-05-14T12:00:00.000Z"),
|
||||
});
|
||||
|
||||
await bridge.start();
|
||||
await waitFor(() => bridge.stateForTest().sessions.length === 1);
|
||||
client.emitRequest({
|
||||
id: "tool-1",
|
||||
method: "item/tool/call",
|
||||
params: {
|
||||
threadId: "codex-thread-1",
|
||||
turnId: "turn-main",
|
||||
callId: "call-1",
|
||||
namespace: "codex_gateway",
|
||||
tool: "start_delegation",
|
||||
arguments: {
|
||||
cwd: "/workspace/other",
|
||||
title: "Other workspace",
|
||||
prompt: "Inspect the remaining gateway work.",
|
||||
discordDetailThreadId: "detail-thread",
|
||||
parentDiscordMessageId: "home-message",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await waitFor(() => client.responses.length === 1);
|
||||
expect(client.responseErrors).toEqual([]);
|
||||
expect(client.startThreadCalls).toHaveLength(2);
|
||||
expect(client.startThreadCalls[1]).toEqual(
|
||||
expect.objectContaining({ cwd: "/workspace/other" }),
|
||||
);
|
||||
expect(client.startThreadCalls[1]?.dynamicTools).toBeUndefined();
|
||||
expect(client.setThreadNameCalls[1]).toEqual({
|
||||
threadId: "codex-thread-2",
|
||||
name: "[delegated] Other workspace",
|
||||
});
|
||||
expect(client.startTurnCalls[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
threadId: "codex-thread-2",
|
||||
cwd: "/workspace/other",
|
||||
}),
|
||||
);
|
||||
expect(inputText(client.startTurnCalls[0]?.input[0])).toBe(
|
||||
"Inspect the remaining gateway work.",
|
||||
);
|
||||
expect(bridge.stateForTest().gateway?.delegations).toEqual([
|
||||
expect.objectContaining({
|
||||
codexThreadId: "codex-thread-2",
|
||||
title: "Other workspace",
|
||||
status: "active",
|
||||
cwd: "/workspace/other",
|
||||
discordDetailThreadId: "detail-thread",
|
||||
parentDiscordMessageId: "home-message",
|
||||
}),
|
||||
]);
|
||||
expect(gatewayToolResult(client.responses[0]?.result)).toEqual(
|
||||
expect.objectContaining({
|
||||
turnId: "turn-1",
|
||||
delegation: expect.objectContaining({
|
||||
codexThreadId: "codex-thread-2",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
await bridge.stop();
|
||||
});
|
||||
|
||||
test("gateway rejects dynamic tool calls outside the main operator thread", async () => {
|
||||
const client = new FakeCodexClient();
|
||||
const transport = new FakeDiscordTransport();
|
||||
const bridge = new DiscordCodexBridge({
|
||||
client,
|
||||
transport,
|
||||
store: new MemoryStateStore(),
|
||||
config: testConfig({
|
||||
gateway: { homeChannelId: "home-channel" },
|
||||
}),
|
||||
});
|
||||
|
||||
await bridge.start();
|
||||
await waitFor(() => bridge.stateForTest().sessions.length === 1);
|
||||
client.emitRequest({
|
||||
id: "tool-1",
|
||||
method: "item/tool/call",
|
||||
params: {
|
||||
threadId: "codex-thread-elsewhere",
|
||||
namespace: "codex_gateway",
|
||||
tool: "list_delegations",
|
||||
arguments: {},
|
||||
},
|
||||
});
|
||||
|
||||
await waitFor(() => client.responseErrors.length === 1);
|
||||
expect(client.responseErrors[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
id: "tool-1",
|
||||
code: -32601,
|
||||
message: "Unknown dynamic tool request",
|
||||
}),
|
||||
);
|
||||
expect(client.responses).toEqual([]);
|
||||
await bridge.stop();
|
||||
});
|
||||
|
||||
test("answers gateway status in the home channel without starting a turn", async () => {
|
||||
const client = new FakeCodexClient();
|
||||
const transport = new FakeDiscordTransport();
|
||||
|
|
@ -2404,7 +2529,15 @@ class FakeCodexClient implements CodexBridgeClient {
|
|||
startTurnCalls: v2.TurnStartParams[] = [];
|
||||
steerTurnCalls: v2.TurnSteerParams[] = [];
|
||||
readThreadCalls: v2.ThreadReadParams[] = [];
|
||||
listThreadsCalls: v2.ThreadListParams[] = [];
|
||||
getThreadGoalCalls: v2.ThreadGoalGetParams[] = [];
|
||||
responses: Array<{ id: string | number; result: unknown }> = [];
|
||||
responseErrors: Array<{
|
||||
id: string | number;
|
||||
code: number;
|
||||
message: string;
|
||||
data?: unknown;
|
||||
}> = [];
|
||||
threadTurns = new Map<string, v2.Turn[]>();
|
||||
threadCwds = new Map<string, string>();
|
||||
threadGoals = new Map<string, v2.ThreadGoal | null>();
|
||||
|
|
@ -2492,6 +2625,15 @@ class FakeCodexClient implements CodexBridgeClient {
|
|||
} as unknown as v2.ThreadReadResponse;
|
||||
}
|
||||
|
||||
async listThreads(params: v2.ThreadListParams): Promise<v2.ThreadListResponse> {
|
||||
this.listThreadsCalls.push(params);
|
||||
return {
|
||||
data: [],
|
||||
nextCursor: null,
|
||||
backwardsCursor: null,
|
||||
};
|
||||
}
|
||||
|
||||
async getThreadGoal(
|
||||
params: v2.ThreadGoalGetParams,
|
||||
): Promise<v2.ThreadGoalGetResponse> {
|
||||
|
|
@ -2501,7 +2643,18 @@ class FakeCodexClient implements CodexBridgeClient {
|
|||
};
|
||||
}
|
||||
|
||||
respondError(): void {}
|
||||
respond(id: string | number, result: unknown): void {
|
||||
this.responses.push({ id, result });
|
||||
}
|
||||
|
||||
respondError(
|
||||
id: string | number,
|
||||
code: number,
|
||||
message: string,
|
||||
data?: unknown,
|
||||
): void {
|
||||
this.responseErrors.push({ id, code, message, data });
|
||||
}
|
||||
|
||||
resolveAllStartTurns(): void {
|
||||
for (const resolve of this.#startTurnResolvers.splice(0)) {
|
||||
|
|
@ -2514,6 +2667,12 @@ class FakeCodexClient implements CodexBridgeClient {
|
|||
listener(message);
|
||||
}
|
||||
}
|
||||
|
||||
emitRequest(message: JsonRpcRequest): void {
|
||||
for (const listener of this.#requestListeners) {
|
||||
listener(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FakeDiscordTransport implements DiscordBridgeTransport {
|
||||
|
|
@ -2665,6 +2824,18 @@ function inputText(value: unknown): string {
|
|||
return typeof text === "string" ? text : "";
|
||||
}
|
||||
|
||||
function gatewayToolResult(value: unknown): unknown {
|
||||
if (typeof value !== "object" || value === null || !("contentItems" in value)) {
|
||||
return undefined;
|
||||
}
|
||||
const items = (value as { contentItems?: unknown }).contentItems;
|
||||
if (!Array.isArray(items)) {
|
||||
return undefined;
|
||||
}
|
||||
const text = inputText(items[0]);
|
||||
return text ? JSON.parse(text) : undefined;
|
||||
}
|
||||
|
||||
function statusMessageText(transport: FakeDiscordTransport): string {
|
||||
return transport.messages.find((message) => message.id === "message-out-1")
|
||||
?.text ?? "";
|
||||
|
|
|
|||
|
|
@ -196,6 +196,8 @@ describe("parseConfig", () => {
|
|||
"home-channel",
|
||||
"--main-thread-id",
|
||||
"main-thread",
|
||||
"--flow-backend-url",
|
||||
"http://127.0.0.1:8089",
|
||||
],
|
||||
{},
|
||||
);
|
||||
|
|
@ -204,6 +206,7 @@ describe("parseConfig", () => {
|
|||
{
|
||||
CODEX_DISCORD_GATEWAY_HOME_CHANNEL_ID: "env-home",
|
||||
CODEX_DISCORD_GATEWAY_MAIN_THREAD_ID: "env-thread",
|
||||
CODEX_FLOW_BACKEND_URL: "http://127.0.0.1:8090",
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -214,10 +217,12 @@ describe("parseConfig", () => {
|
|||
homeChannelId: "home-channel",
|
||||
mainThreadId: "main-thread",
|
||||
});
|
||||
expect(fromFlag.config.flowBackendUrl).toBe("http://127.0.0.1:8089");
|
||||
expect(fromEnv.config.gateway).toEqual({
|
||||
homeChannelId: "env-home",
|
||||
mainThreadId: "env-thread",
|
||||
});
|
||||
expect(fromEnv.config.flowBackendUrl).toBe("http://127.0.0.1:8090");
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue