Split Discord gateway backend from transport

This commit is contained in:
matamune 2026-05-15 22:49:37 +00:00
parent e7b7b1b342
commit 94046e6216
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
5 changed files with 4639 additions and 4350 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,45 @@
import type {
DiscordBridgeCommandRegistration,
DiscordBridgeState,
DiscordInbound,
} from "./types.ts";
export type CodexGatewayBackend = {
start(): Promise<void>;
startTransportDependentWork?(): Promise<void>;
startBackgroundWork?(): Promise<void>;
stop(): Promise<void>;
handleInbound(inbound: DiscordInbound): Promise<void>;
commandRegistration(): DiscordBridgeCommandRegistration;
stateForTest?(): DiscordBridgeState;
flushSummariesForTest?(): Promise<void>;
};
export type CodexGatewayPresenter = {
createWorkspacePost?(
locationId: string,
title: string,
body: string,
): Promise<{ threadId: string; messageId?: string }>;
createThread(
locationId: string,
title: string,
sourceMessageId?: string,
): Promise<string>;
sendMessage(locationId: string, text: string): Promise<string[]>;
updateMessage?(
locationId: string,
messageId: string,
text: string,
): Promise<void>;
deleteMessage(locationId: string, messageId: string): Promise<void>;
deleteWebhookMessages?(
locationId: string,
options?: { webhookUrl?: string },
): Promise<{ deleted: number; failed: number }>;
deleteThread?(locationId: string): Promise<void>;
addThreadMembers?(threadId: string, userIds: string[]): Promise<void>;
addReactions?(locationId: string, messageId: string, reactions: string[]): Promise<void>;
pinMessage?(locationId: string, messageId: string): Promise<void>;
sendTyping(locationId: string): Promise<void>;
};

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,6 @@
import type { JsonRpcNotification } from "@peezy.tech/codex-flows/rpc";
import type { v2 } from "@peezy.tech/codex-flows/generated";
import type { CodexGatewayPresenter } from "./gateway-backend.ts";
import type {
DiscordConsoleMessageKind,
@ -13,7 +14,6 @@ import type {
DiscordBridgeQueueItem,
DiscordBridgeSession,
DiscordBridgeState,
DiscordBridgeTransport,
DiscordMessageInbound,
} from "./types.ts";
@ -25,7 +25,7 @@ const runningCommandStatusDelayMs = 5_000;
export type ThreadRunnerContext = {
client: CodexBridgeClient;
transport: DiscordBridgeTransport;
presenter: CodexGatewayPresenter;
config: DiscordBridgeConfig;
getState(): DiscordBridgeState;
persist(): Promise<void>;
@ -860,7 +860,7 @@ export class DiscordThreadRunner {
): Promise<void> {
const active = this.#activeTurnForTurn(turnId) ??
this.#upsertActiveTurn({ turnId, origin: "external" });
const outboundMessageIds = await this.#context.transport.sendMessage(
const outboundMessageIds = await this.#context.presenter.sendMessage(
active.discordThreadId,
text,
);
@ -919,7 +919,7 @@ export class DiscordThreadRunner {
});
return;
}
const outboundMessageIds = await this.#context.transport.sendMessage(
const outboundMessageIds = await this.#context.presenter.sendMessage(
active.discordThreadId,
text,
);
@ -1070,7 +1070,7 @@ export class DiscordThreadRunner {
finalTextFromTurn(completedTurn).trim() ||
(await this.#readFinalTurnText(turnId)).trim();
if (finalText && !this.#hasDelivery(turnId, "final")) {
const outboundMessageIds = await this.#context.transport.sendMessage(
const outboundMessageIds = await this.#context.presenter.sendMessage(
active.discordThreadId,
finalText,
);
@ -1232,7 +1232,7 @@ export class DiscordThreadRunner {
): Promise<void> {
const active = isActiveTurn(target) ? target : undefined;
const item = active ? undefined : target as DiscordBridgeQueueItem;
const outboundMessageIds = await this.#context.transport.sendMessage(
const outboundMessageIds = await this.#context.presenter.sendMessage(
target.discordThreadId,
`Codex turn failed: ${message}`,
);
@ -1293,7 +1293,7 @@ export class DiscordThreadRunner {
const deletedMessageIds = new Set<string>();
for (const messageId of messageIds) {
try {
await this.#context.transport.deleteMessage(
await this.#context.presenter.deleteMessage(
active.discordThreadId,
messageId,
);
@ -1327,7 +1327,7 @@ export class DiscordThreadRunner {
async #startTypingHeartbeat(active: DiscordBridgeActiveTurn): Promise<void> {
this.#stopTypingHeartbeat();
this.#typingTurnKey = turnKey(active.codexThreadId, active.turnId);
await this.#context.transport.sendTyping(active.discordThreadId);
await this.#context.presenter.sendTyping(active.discordThreadId);
const intervalMs =
this.#context.config.typingIntervalMs ?? defaultTypingIntervalMs;
this.#debug("typing.start", {
@ -1338,7 +1338,7 @@ export class DiscordThreadRunner {
});
const timer = setInterval(() => {
void this.#enqueue("typing.tick", async () => {
await this.#context.transport.sendTyping(active.discordThreadId);
await this.#context.presenter.sendTyping(active.discordThreadId);
this.#debug("typing.tick", {
turnId: active.turnId,
});
@ -1422,7 +1422,7 @@ export class DiscordThreadRunner {
await this.#pinStatusMessage(this.session.statusMessageId);
return;
}
const [messageId] = await this.#context.transport.sendMessage(
const [messageId] = await this.#context.presenter.sendMessage(
this.session.discordThreadId,
text,
);
@ -1444,12 +1444,12 @@ export class DiscordThreadRunner {
await this.#ensureStatusMessage();
return;
}
if (!this.#context.transport.updateMessage) {
if (!this.#context.presenter.updateMessage) {
return;
}
try {
const text = this.#renderStatusMessage();
await this.#context.transport.updateMessage(
await this.#context.presenter.updateMessage(
this.session.discordThreadId,
messageId,
text,
@ -1470,14 +1470,14 @@ export class DiscordThreadRunner {
}
async #pinStatusMessage(messageId: string): Promise<void> {
if (!this.#context.transport.pinMessage) {
if (!this.#context.presenter.pinMessage) {
return;
}
if (this.#pinnedStatusMessageId === messageId) {
return;
}
try {
await this.#context.transport.pinMessage(
await this.#context.presenter.pinMessage(
this.session.discordThreadId,
messageId,
);

View file

@ -7,13 +7,21 @@ import type { JsonRpcNotification, JsonRpcRequest } from "@peezy.tech/codex-flow
import type { v2 } from "@peezy.tech/codex-flows/generated";
import type { FlowBackendClient } from "@peezy.tech/flow-runtime/backend-client";
import { DiscordCodexBridge, parseThreadStartIntent } from "../src/bridge.ts";
import {
DiscordCodexBridge,
LocalCodexGatewayBackend,
parseThreadStartIntent,
} from "../src/bridge.ts";
import type {
DiscordConsoleMessage,
DiscordConsoleOutput,
} from "../src/console-output.ts";
import { MemoryStateStore, emptyState } from "../src/state.ts";
import { writeStopHookSpoolEvent } from "../src/stop-hook-spool.ts";
import type {
CodexGatewayBackend,
CodexGatewayPresenter,
} from "../src/gateway-backend.ts";
import type {
CodexBridgeClient,
DiscordBridgeConfig,
@ -42,6 +50,118 @@ describe("DiscordCodexBridge", () => {
});
});
test("can run Discord as a transport over a gateway backend", async () => {
const transport = new FakeDiscordTransport();
const calls: string[] = [];
const inboundEvents: DiscordInbound[] = [];
const backend: CodexGatewayBackend = {
async start() {
calls.push("backend.start");
},
async startTransportDependentWork() {
calls.push("backend.transportWork");
},
async startBackgroundWork() {
calls.push("backend.backgroundWork");
},
async stop() {
calls.push("backend.stop");
},
async handleInbound(inbound) {
inboundEvents.push(inbound);
},
commandRegistration() {
return { channelIds: ["home-channel"] };
},
stateForTest() {
return emptyState();
},
};
const bridge = new DiscordCodexBridge({
backend,
transport,
});
await bridge.start();
expect(calls).toEqual([
"backend.start",
"backend.transportWork",
"backend.backgroundWork",
]);
expect(transport.registeredCommands).toEqual([
{ channelIds: ["home-channel"] },
]);
transport.emit({
kind: "message",
channelId: "home-channel",
messageId: "message-1",
author: { id: "user-1", name: "Peezy", isBot: false },
content: "status",
createdAt: "2026-05-15T00:00:00.000Z",
});
await waitFor(() => inboundEvents.length === 1);
expect(inboundEvents[0]?.kind).toBe("message");
await bridge.stop();
expect(calls).toContain("backend.stop");
});
test("local gateway backend runs against a presenter without Discord transport lifecycle", async () => {
const client = new FakeCodexClient();
const sentMessages: Array<{ locationId: string; text: string }> = [];
const typingLocations: string[] = [];
const presenter: CodexGatewayPresenter = {
async createThread(locationId, title, sourceMessageId) {
expect(locationId).toBe("parent-channel");
expect(title).toBe("Existing thread");
expect(sourceMessageId).toBe("source-message-1");
return "presenter-thread-1";
},
async sendMessage(locationId, text) {
sentMessages.push({ locationId, text });
return [`presenter-message-${sentMessages.length}`];
},
async deleteMessage() {},
async sendTyping(locationId) {
typingLocations.push(locationId);
},
};
const backend = new LocalCodexGatewayBackend({
client,
presenter,
store: new MemoryStateStore(),
config: testConfig({
gateway: { homeChannelId: "home-channel" },
allowedChannelIds: new Set(["parent-channel"]),
}),
});
await backend.start();
expect(client.startThreadCalls).toHaveLength(1);
expect(backend.commandRegistration()).toEqual({
channelIds: ["parent-channel", "home-channel"],
});
await backend.startTransportDependentWork();
await backend.startBackgroundWork();
await backend.handleInbound({
kind: "message",
channelId: "home-channel",
messageId: "home-message-1",
author: { id: "user-1", name: "Peezy", isBot: false },
content: "status across the workspaces",
createdAt: "2026-05-15T00:00:00.000Z",
});
await waitFor(() => client.startTurnCalls.length === 1);
expect(inputText(client.startTurnCalls[0]?.input[0])).toContain(
"status across the workspaces",
);
expect(typingLocations).toContain("home-channel");
await backend.stop();
});
test("starts a gateway main thread and routes home channel messages to it", async () => {
const client = new FakeCodexClient();
const transport = new FakeDiscordTransport();