Use workspace backend for patch execution

This commit is contained in:
matamune 2026-05-16 05:40:34 +00:00
parent 5b7477dfa8
commit 43fc5183c7
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
24 changed files with 1009 additions and 225 deletions

View file

@ -10,7 +10,7 @@ Canonical public host: `https://patch.moi`.
This is a Bun monorepo:
- `apps/patch`: the Patch service, feed poller, JSONL store, Discord output,
and flow dispatch adapter.
and workspace backend adapter.
- `docs`: Tome documentation site for patch.moi.
patch.moi treats Git as the source of truth for maintained projects. Upstream
@ -26,7 +26,10 @@ GET /flow-events
GET /flow-events/:id
POST /flow-events/:id/retry
POST /flow-events/:id/replay
GET /flow-dispatches
GET /maintenance-attempts
GET /workspace-dispatches
GET /workspace-events
GET /workspace-runs
```
## Environment
@ -39,6 +42,9 @@ DISCORD_OUTPUT_ENABLED=false
DISCORD_WEBHOOK_URL=
DISCORD_NOTIFY_EVENTS=push,release
FEED_SOURCES_PATH=./feed-sources.json
PATCH_WORKSPACE_BACKEND_URL=
PATCH_WORKSPACE_BACKEND_SECRET=
PATCH_FLOW_BACKEND_URL=
PATCH_FLOW_DISPATCH_URL=
PATCH_FLOW_DISPATCH_SECRET=
PATCH_ADMIN_TOKEN=
@ -59,10 +65,12 @@ bun run dev
Feed watcher events are configured in `apps/patch/feed-sources.json`. The first
poll primes `DATA_DIR/feed-state.json`; later polls append upstream activity to
`DATA_DIR/feed-events.jsonl`. Targets using `mode: "fork_sync"` append legacy
work to `DATA_DIR/feed-jobs.jsonl`. Targets using `mode: "flow_dispatch"`
append generic flow events to `DATA_DIR/flow-events.jsonl`, POST them to the
configured dispatch URL, and record dispatch outcomes in
`DATA_DIR/flow-dispatches.jsonl`.
work to `DATA_DIR/feed-jobs.jsonl`. Targets using `mode: "workspace_flow"`
append generic flow events to `DATA_DIR/flow-events.jsonl`, send them to the
configured workspace backend or local adapter, and record dispatch outcomes in
`DATA_DIR/workspace-dispatches.jsonl`. Each dispatch also creates a
patch.moi-owned `DATA_DIR/maintenance-attempts.jsonl` entry that links the
upstream update to workspace run ids and future candidate refs.
## Documentation

View file

@ -73,10 +73,10 @@
"defaultBranch": "main"
},
"target": {
"mode": "flow_dispatch",
"mode": "workspace_flow",
"eventType": "upstream.release",
"dispatchUrlEnv": "PATCH_FLOW_DISPATCH_URL",
"dispatchSecretEnv": "PATCH_FLOW_DISPATCH_SECRET",
"workspaceUrlEnv": "PATCH_WORKSPACE_BACKEND_URL",
"workspaceSecretEnv": "PATCH_WORKSPACE_BACKEND_SECRET",
"payload": {
"provider": "github",
"repo": "openai/codex"

View file

@ -2,7 +2,11 @@ import { readFile, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { mkdir } from "node:fs/promises";
import { notifyDiscord, type DiscordConfig } from "./discord";
import { dispatchFlowEventForFeedSignal, type FlowDispatchConfig } from "./flow";
import {
dispatchWorkspaceEventForFeedSignal,
maintenanceAttemptForWorkspaceDispatch,
type WorkspaceDispatchConfig,
} from "./flow";
import { EventStore, jobForFeedSignal } from "./queue";
import type { FeedEventName, FeedSignal, FeedSourceConfig } from "./types";
@ -24,7 +28,8 @@ type FeedPollerConfig = {
dataDir: string;
sourcesPath: string;
discord?: DiscordConfig;
flowDispatch?: FlowDispatchConfig;
workspaceBackend?: WorkspaceDispatchConfig;
flowDispatch?: WorkspaceDispatchConfig;
};
type FetchLike = (url: string, init?: RequestInit) => Promise<Response>;
@ -164,7 +169,8 @@ export async function pollFeedSource(input: {
statePath: string;
store: EventStore;
discord?: DiscordConfig;
flowDispatch?: FlowDispatchConfig;
workspaceBackend?: WorkspaceDispatchConfig;
flowDispatch?: WorkspaceDispatchConfig;
fetchImpl?: FetchLike;
}): Promise<{ signals: FeedSignal[]; jobs: number; flowDispatches: number; primed: boolean }> {
const response = await (input.fetchImpl ?? fetch)(input.source.url, {
@ -191,13 +197,21 @@ export async function pollFeedSource(input: {
await input.store.appendFeedJob(job);
jobs += 1;
}
const flowDispatch = await dispatchFlowEventForFeedSignal(signal, input.flowDispatch);
if (flowDispatch.event) {
await input.store.appendFlowEvent(flowDispatch.event);
const workspaceDispatch = await dispatchWorkspaceEventForFeedSignal(
signal,
input.workspaceBackend ?? input.flowDispatch,
);
if (workspaceDispatch.event) {
await input.store.appendFlowEvent(workspaceDispatch.event);
}
if (flowDispatch.record) {
await input.store.appendFlowDispatch(flowDispatch.record);
if (flowDispatch.record.status === "dispatched") {
if (workspaceDispatch.record) {
await input.store.appendWorkspaceDispatch(workspaceDispatch.record);
if (workspaceDispatch.event) {
await input.store.appendMaintenanceAttempt(
maintenanceAttemptForWorkspaceDispatch(workspaceDispatch.event, workspaceDispatch.record),
);
}
if (workspaceDispatch.record.status === "dispatched") {
flowDispatches += 1;
}
}
@ -210,8 +224,8 @@ export async function pollFeedSource(input: {
event: signal.event,
entryId: signal.entryId,
job: job?.id,
flowEvent: flowDispatch.event?.id,
flowDispatch: flowDispatch.record?.status,
flowEvent: workspaceDispatch.event?.id,
workspaceDispatch: workspaceDispatch.record?.status,
}));
}
@ -240,6 +254,7 @@ export async function pollFeedsOnce(config: FeedPollerConfig, fetchImpl?: FetchL
statePath,
store,
discord: config.discord,
workspaceBackend: config.workspaceBackend,
flowDispatch: config.flowDispatch,
fetchImpl,
});

View file

@ -1,26 +1,27 @@
import { createFlowClient, type FlowClient } from "@peezy.tech/flow-runtime/client";
import type {
FeedFlowDispatchTarget,
FeedWorkspaceFlowTarget,
FeedSignal,
FlowDispatchRecord,
FlowEvent,
MaintenanceAttemptRecord,
} from "./types";
type FetchLike = (url: string, init: RequestInit) => Promise<Response>;
import {
createPatchWorkspaceBackend,
targetWorkspaceBackendUrl,
type WorkspaceBackendConfig,
} from "./workspace-backend";
const serviceSource = "patch";
export type FlowDispatchConfig = {
env?: Record<string, string | undefined>;
fetchImpl?: FetchLike;
cwd?: string;
};
export type FlowDispatchConfig = WorkspaceBackendConfig;
export type WorkspaceDispatchConfig = WorkspaceBackendConfig;
function isFlowDispatchTarget(value: unknown): value is FeedFlowDispatchTarget {
function isWorkspaceFlowTarget(value: unknown): value is FeedWorkspaceFlowTarget {
return (
typeof value === "object" &&
value !== null &&
(value as { mode?: unknown }).mode === "flow_dispatch"
((value as { mode?: unknown }).mode === "workspace_flow" ||
(value as { mode?: unknown }).mode === "flow_dispatch")
);
}
@ -59,7 +60,7 @@ export function flowEventForFeedSignal(
signal: FeedSignal,
receivedAt = new Date().toISOString(),
): FlowEvent<Record<string, unknown>> | undefined {
if (!isFlowDispatchTarget(signal.target)) {
if (!isWorkspaceFlowTarget(signal.target)) {
return undefined;
}
@ -76,63 +77,6 @@ export function flowEventForFeedSignal(
};
}
function targetFlowUrl(
target: FeedFlowDispatchTarget,
env: Record<string, string | undefined>,
): string | undefined {
const explicit = target.dispatchUrl?.trim();
if (explicit) {
return explicit;
}
const envName = target.dispatchUrlEnv?.trim();
if (envName) {
return env[envName]?.trim() || undefined;
}
const backendUrl = env.PATCH_FLOW_BACKEND_URL?.trim();
if (backendUrl) {
return backendUrl;
}
return env.PATCH_FLOW_DISPATCH_URL?.trim() || undefined;
}
function targetDispatchSecret(
target: FeedFlowDispatchTarget,
env: Record<string, string | undefined>,
): string | undefined {
const envName = target.dispatchSecretEnv?.trim();
if (envName) {
return env[envName]?.trim() || undefined;
}
return env.PATCH_FLOW_DISPATCH_SECRET?.trim() || undefined;
}
export function createFlowClientFromPatchConfig(
target: Partial<FeedFlowDispatchTarget> = {},
config: FlowDispatchConfig = {},
): FlowClient {
const env = config.env ?? process.env;
const flowTarget = { mode: "flow_dispatch" as const, eventType: target.eventType ?? "flow.event", ...target };
const url = targetFlowUrl(flowTarget, env);
if (url) {
return createFlowClient({
mode: "http",
baseUrl: flowBackendBaseUrl(url),
hmacSecret: targetDispatchSecret(flowTarget, env),
...(config.fetchImpl ? { fetch: patchFetch(config.fetchImpl) } : {}),
});
}
return createFlowClient({
mode: "local",
cwd: config.cwd ?? process.cwd(),
env,
codex: {
command: env.CODEX_APP_SERVER_CODEX_COMMAND,
codexHome: env.CODEX_HOME,
stream: true,
},
});
}
export function patchUpstreamReleaseEvent(input: {
repo: string;
tag: string;
@ -152,21 +96,34 @@ export function patchUpstreamReleaseEvent(input: {
export async function dispatchFlowEvent(
event: FlowEvent,
target: Partial<FeedFlowDispatchTarget> = {},
target: Partial<FeedWorkspaceFlowTarget> = {},
config: FlowDispatchConfig = {},
): Promise<FlowDispatchRecord> {
const env = config.env ?? process.env;
const flowTarget = { mode: "flow_dispatch" as const, eventType: event.type, ...target };
const url = targetFlowUrl(flowTarget, env);
const client = createFlowClientFromPatchConfig(flowTarget, config);
return dispatchWorkspaceEvent(event, target, config);
}
export async function dispatchWorkspaceEvent(
event: FlowEvent,
target: Partial<FeedWorkspaceFlowTarget> = {},
config: WorkspaceDispatchConfig = {},
): Promise<FlowDispatchRecord> {
const workspaceTarget = { mode: "workspace_flow" as const, eventType: event.type, ...target };
const backend = createPatchWorkspaceBackend(workspaceTarget, config);
try {
await client.dispatchEvent(event);
const result = await backend.client.dispatchEvent(event);
return {
eventId: event.id,
eventType: event.type,
url: url ? flowEventsUrl(url) : undefined,
operation: "dispatch",
target: backend.mode === "local" ? "local" : "workspace-backend",
transport: backend.mode,
workspaceBackendUrl: backend.url,
url: backend.eventsUrl,
status: "dispatched",
runIds: result.runIds,
matched: result.matched,
idempotent: result.idempotent,
createdAt: new Date().toISOString(),
};
} catch (error) {
@ -174,7 +131,11 @@ export async function dispatchFlowEvent(
return {
eventId: event.id,
eventType: event.type,
url,
operation: "dispatch",
target: backend.mode === "local" ? "local" : "workspace-backend",
transport: backend.mode,
workspaceBackendUrl: backend.url,
url: backend.eventsUrl,
status: "failed",
...(httpStatus ? { httpStatus } : {}),
error: error instanceof Error ? error.message : String(error),
@ -185,25 +146,38 @@ export async function dispatchFlowEvent(
export async function replayFlowEvent(
event: FlowEvent,
target: Partial<FeedFlowDispatchTarget> = {},
target: Partial<FeedWorkspaceFlowTarget> = {},
config: FlowDispatchConfig = {},
): Promise<FlowDispatchRecord> {
return replayWorkspaceEvent(event, target, config);
}
export async function replayWorkspaceEvent(
event: FlowEvent,
target: Partial<FeedWorkspaceFlowTarget> = {},
config: WorkspaceDispatchConfig = {},
): Promise<FlowDispatchRecord> {
const env = config.env ?? process.env;
const flowTarget = { mode: "flow_dispatch" as const, eventType: event.type, ...target };
const url = targetFlowUrl(flowTarget, env);
const client = createFlowClientFromPatchConfig(flowTarget, config);
const workspaceTarget = { mode: "workspace_flow" as const, eventType: event.type, ...target };
const backend = createPatchWorkspaceBackend(workspaceTarget, config);
const configuredUrl = targetWorkspaceBackendUrl(workspaceTarget, env);
try {
if (url) {
await client.replayEvent(event.id, { wait: false });
} else {
await client.dispatchEvent(event);
}
const result = configuredUrl
? await backend.client.replayEvent(event.id, { wait: false })
: await backend.client.dispatchEvent(event);
return {
eventId: event.id,
eventType: event.type,
url: url ? `${flowBackendBaseUrl(url)}/events/${encodeURIComponent(event.id)}/replay` : undefined,
operation: "replay",
target: backend.mode === "local" ? "local" : "workspace-backend",
transport: backend.mode,
workspaceBackendUrl: backend.url,
url: backend.eventsUrl ? `${backend.eventsUrl}/${encodeURIComponent(event.id)}/replay` : undefined,
status: "dispatched",
runIds: result.runIds,
matched: result.matched,
idempotent: result.idempotent,
createdAt: new Date().toISOString(),
};
} catch (error) {
@ -211,7 +185,11 @@ export async function replayFlowEvent(
return {
eventId: event.id,
eventType: event.type,
url,
operation: "replay",
target: backend.mode === "local" ? "local" : "workspace-backend",
transport: backend.mode,
workspaceBackendUrl: backend.url,
url: backend.eventsUrl,
status: "failed",
...(httpStatus ? { httpStatus } : {}),
error: error instanceof Error ? error.message : String(error),
@ -220,11 +198,34 @@ export async function replayFlowEvent(
}
}
export async function dispatchFlowEventForFeedSignal(
export async function listWorkspaceRuns(config: WorkspaceDispatchConfig = {}, options: {
eventId?: string;
status?: string;
limit?: number;
} = {}) {
return await createPatchWorkspaceBackend({}, config).client.listRuns(options);
}
export async function getWorkspaceRun(runId: string, config: WorkspaceDispatchConfig = {}) {
return await createPatchWorkspaceBackend({}, config).client.getRun(runId);
}
export async function getWorkspaceEvent(eventId: string, config: WorkspaceDispatchConfig = {}) {
return await createPatchWorkspaceBackend({}, config).client.getEvent(eventId);
}
export async function listWorkspaceEvents(config: WorkspaceDispatchConfig = {}, options: {
type?: string;
limit?: number;
} = {}) {
return await createPatchWorkspaceBackend({}, config).client.listEvents(options);
}
export async function dispatchWorkspaceEventForFeedSignal(
signal: FeedSignal,
config: FlowDispatchConfig = {},
config: WorkspaceDispatchConfig = {},
): Promise<{ event?: FlowEvent<Record<string, unknown>>; record?: FlowDispatchRecord }> {
if (!isFlowDispatchTarget(signal.target)) {
if (!isWorkspaceFlowTarget(signal.target)) {
return {};
}
@ -235,21 +236,41 @@ export async function dispatchFlowEventForFeedSignal(
return {
event,
record: await dispatchFlowEvent(event, signal.target, config),
record: await dispatchWorkspaceEvent(event, signal.target, config),
};
}
function flowBackendBaseUrl(url: string): string {
return url.replace(/\/(?:events|flow-events)\/?$/, "").replace(/\/+$/, "");
export async function dispatchFlowEventForFeedSignal(
signal: FeedSignal,
config: FlowDispatchConfig = {},
): Promise<{ event?: FlowEvent<Record<string, unknown>>; record?: FlowDispatchRecord }> {
return dispatchWorkspaceEventForFeedSignal(signal, config);
}
function flowEventsUrl(url: string): string {
return `${flowBackendBaseUrl(url)}/events`;
}
export function maintenanceAttemptForWorkspaceDispatch(
event: FlowEvent,
record: FlowDispatchRecord,
): MaintenanceAttemptRecord {
const payload = typeof event.payload === "object" && event.payload !== null
? event.payload as Record<string, unknown>
: {};
const operation = record.operation ?? "dispatch";
function patchFetch(fetchImpl: FetchLike) {
return async (input: string | URL | Request, init?: RequestInit): Promise<Response> => {
return fetchImpl(String(input), init ?? {});
return {
id: `${event.id}:${operation}:${record.createdAt}`,
eventId: event.id,
eventType: event.type,
operation,
status: record.status === "dispatched" ? "started" : record.status,
upstreamRepo: stringValue(payload.repo),
upstreamRef: stringValue(payload.ref),
upstreamSha: stringValue(payload.sha),
upstreamTag: stringValue(payload.tag),
workspaceBackendUrl: record.workspaceBackendUrl,
workspaceRunIds: record.runIds ?? [],
candidateRefs: [],
error: record.error,
createdAt: record.createdAt,
};
}
@ -258,3 +279,7 @@ function httpStatusFromError(error: unknown): number | undefined {
const match = message.match(/\bfailed with (\d{3})\b/);
return match?.[1] ? Number(match[1]) : undefined;
}
function stringValue(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value : undefined;
}

View file

@ -1,6 +1,13 @@
import { appendFile, mkdir, readFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { FeedJob, FeedSignal, FlowDispatchRecord, FlowEvent } from "./types";
import type {
FeedJob,
FeedSignal,
FlowDispatchRecord,
FlowEvent,
MaintenanceAttemptRecord,
WorkspaceDispatchRecord,
} from "./types";
async function appendJsonLine(path: string, value: unknown): Promise<void> {
await mkdir(dirname(path), { recursive: true });
@ -34,12 +41,16 @@ export class EventStore {
readonly feedJobsPath: string;
readonly flowEventsPath: string;
readonly flowDispatchesPath: string;
readonly workspaceDispatchesPath: string;
readonly maintenanceAttemptsPath: string;
constructor(dataDir: string) {
this.feedEventsPath = join(dataDir, "feed-events.jsonl");
this.feedJobsPath = join(dataDir, "feed-jobs.jsonl");
this.flowEventsPath = join(dataDir, "flow-events.jsonl");
this.flowDispatchesPath = join(dataDir, "flow-dispatches.jsonl");
this.workspaceDispatchesPath = join(dataDir, "workspace-dispatches.jsonl");
this.maintenanceAttemptsPath = join(dataDir, "maintenance-attempts.jsonl");
}
async appendFeedSignal(signal: FeedSignal): Promise<void> {
@ -55,7 +66,15 @@ export class EventStore {
}
async appendFlowDispatch(record: FlowDispatchRecord): Promise<void> {
await appendJsonLine(this.flowDispatchesPath, record);
await this.appendWorkspaceDispatch(record);
}
async appendWorkspaceDispatch(record: WorkspaceDispatchRecord): Promise<void> {
await appendJsonLine(this.workspaceDispatchesPath, record);
}
async appendMaintenanceAttempt(record: MaintenanceAttemptRecord): Promise<void> {
await appendJsonLine(this.maintenanceAttemptsPath, record);
}
async listFlowEvents(options: { limit?: number; type?: string } = {}): Promise<FlowEvent[]> {
@ -77,7 +96,29 @@ export class EventStore {
}
async listFlowDispatches(options: { limit?: number; eventId?: string; status?: FlowDispatchRecord["status"] } = {}): Promise<FlowDispatchRecord[]> {
const records = await readJsonLines<FlowDispatchRecord>(this.flowDispatchesPath);
return this.listWorkspaceDispatches(options);
}
async listWorkspaceDispatches(options: { limit?: number; eventId?: string; status?: WorkspaceDispatchRecord["status"] } = {}): Promise<WorkspaceDispatchRecord[]> {
const records = [
...await readJsonLines<WorkspaceDispatchRecord>(this.flowDispatchesPath),
...await readJsonLines<WorkspaceDispatchRecord>(this.workspaceDispatchesPath),
];
return limitNewest(
records.filter((record) =>
(!options.eventId || record.eventId === options.eventId) &&
(!options.status || record.status === options.status),
),
options.limit,
);
}
async listMaintenanceAttempts(options: {
limit?: number;
eventId?: string;
status?: MaintenanceAttemptRecord["status"];
} = {}): Promise<MaintenanceAttemptRecord[]> {
const records = await readJsonLines<MaintenanceAttemptRecord>(this.maintenanceAttemptsPath);
return limitNewest(
records.filter((record) =>
(!options.eventId || record.eventId === options.eventId) &&

View file

@ -1,6 +1,14 @@
import { parseDiscordConfig, type DiscordConfig } from "./discord";
import { startFeedPolling } from "./feed";
import { dispatchFlowEvent, replayFlowEvent } from "./flow";
import {
dispatchWorkspaceEvent,
getWorkspaceEvent,
getWorkspaceRun,
listWorkspaceEvents,
listWorkspaceRuns,
maintenanceAttemptForWorkspaceDispatch,
replayWorkspaceEvent,
} from "./flow";
import { jsonResponse, methodNotAllowed, textResponse } from "./http";
import { EventStore } from "./queue";
@ -32,7 +40,13 @@ function numberParam(value: string | null): number | undefined {
function dispatchStatus(value: string | null) {
if (!value) return undefined;
if (value === "dispatched" || value === "failed" || value === "skipped") return value;
throw new Error("flow dispatch status must be dispatched, failed, or skipped");
throw new Error("workspace dispatch status must be dispatched, failed, or skipped");
}
function maintenanceStatus(value: string | null) {
if (!value) return undefined;
if (value === "started" || value === "failed" || value === "skipped") return value;
throw new Error("maintenance attempt status must be started, failed, or skipped");
}
async function handleFlowEvents(request: Request, config: ServerConfig, store: EventStore): Promise<Response> {
@ -64,13 +78,15 @@ async function handleFlowEvents(request: Request, config: ServerConfig, store: E
});
}
if (request.method === "POST" && eventMatch[2] === "retry") {
const record = await dispatchFlowEvent(event, {}, { env: process.env });
await store.appendFlowDispatch(record);
const record = await dispatchWorkspaceEvent(event, {}, { env: process.env });
await store.appendWorkspaceDispatch(record);
await store.appendMaintenanceAttempt(maintenanceAttemptForWorkspaceDispatch(event, record));
return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 });
}
if (request.method === "POST" && eventMatch[2] === "replay") {
const record = await replayFlowEvent(event, {}, { env: process.env });
await store.appendFlowDispatch(record);
const record = await replayWorkspaceEvent(event, {}, { env: process.env });
await store.appendWorkspaceDispatch(record);
await store.appendMaintenanceAttempt(maintenanceAttemptForWorkspaceDispatch(event, record));
return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 });
}
return methodNotAllowed();
@ -82,7 +98,7 @@ async function handleFlowDispatches(request: Request, config: ServerConfig, stor
if (request.method !== "GET") return methodNotAllowed();
const url = new URL(request.url);
return jsonResponse({
dispatches: await store.listFlowDispatches({
dispatches: await store.listWorkspaceDispatches({
eventId: url.searchParams.get("eventId") ?? undefined,
status: dispatchStatus(url.searchParams.get("status")),
limit: numberParam(url.searchParams.get("limit")),
@ -90,6 +106,61 @@ async function handleFlowDispatches(request: Request, config: ServerConfig, stor
});
}
async function handleMaintenanceAttempts(request: Request, config: ServerConfig, store: EventStore): Promise<Response> {
const unauthorized = requireAdmin(request, config);
if (unauthorized) return unauthorized;
if (request.method !== "GET") return methodNotAllowed();
const url = new URL(request.url);
return jsonResponse({
attempts: await store.listMaintenanceAttempts({
eventId: url.searchParams.get("eventId") ?? undefined,
status: maintenanceStatus(url.searchParams.get("status")),
limit: numberParam(url.searchParams.get("limit")),
}),
});
}
async function handleWorkspaceRuns(request: Request, config: ServerConfig): Promise<Response> {
const unauthorized = requireAdmin(request, config);
if (unauthorized) return unauthorized;
const url = new URL(request.url);
if (request.method === "GET" && url.pathname === "/workspace-runs") {
return jsonResponse(await listWorkspaceRuns({ env: process.env }, {
eventId: url.searchParams.get("eventId") ?? undefined,
status: url.searchParams.get("status") ?? undefined,
limit: numberParam(url.searchParams.get("limit")),
}));
}
const runMatch = url.pathname.match(/^\/workspace-runs\/([^/]+)$/);
if (request.method === "GET" && runMatch?.[1]) {
return jsonResponse({ run: await getWorkspaceRun(decodeURIComponent(runMatch[1]), { env: process.env }) });
}
return methodNotAllowed();
}
async function handleWorkspaceEvents(request: Request, config: ServerConfig): Promise<Response> {
const unauthorized = requireAdmin(request, config);
if (unauthorized) return unauthorized;
const url = new URL(request.url);
if (request.method === "GET" && url.pathname === "/workspace-events") {
return jsonResponse(await listWorkspaceEvents({ env: process.env }, {
type: url.searchParams.get("type") ?? undefined,
limit: numberParam(url.searchParams.get("limit")),
}));
}
const eventMatch = url.pathname.match(/^\/workspace-events\/([^/]+)$/);
if (request.method === "GET" && eventMatch?.[1]) {
return jsonResponse({ event: await getWorkspaceEvent(decodeURIComponent(eventMatch[1]), { env: process.env }) });
}
return methodNotAllowed();
}
export function createHandler(config: ServerConfig): (request: Request) => Promise<Response> | Response {
const store = new EventStore(config.dataDir);
@ -101,9 +172,18 @@ export function createHandler(config: ServerConfig): (request: Request) => Promi
if (url.pathname === "/flow-events" || url.pathname.startsWith("/flow-events/")) {
return handleFlowEvents(request, config, store);
}
if (url.pathname === "/flow-dispatches") {
if (url.pathname === "/workspace-dispatches" || url.pathname === "/flow-dispatches") {
return handleFlowDispatches(request, config, store);
}
if (url.pathname === "/maintenance-attempts") {
return handleMaintenanceAttempts(request, config, store);
}
if (url.pathname === "/workspace-runs" || url.pathname.startsWith("/workspace-runs/")) {
return handleWorkspaceRuns(request, config);
}
if (url.pathname === "/workspace-events" || url.pathname.startsWith("/workspace-events/")) {
return handleWorkspaceEvents(request, config);
}
return jsonResponse({ error: "not_found" }, { status: 404 });
};
}
@ -126,7 +206,7 @@ if (import.meta.main) {
dataDir: config.dataDir,
sourcesPath: process.env.FEED_SOURCES_PATH,
discord: config.discord,
flowDispatch: {
workspaceBackend: {
env: process.env,
},
}).catch((error) => {

View file

@ -9,9 +9,12 @@ export type FeedForkSyncTarget = {
mode: "notify_only" | "fork_sync";
};
export type FeedFlowDispatchTarget = {
mode: "flow_dispatch";
export type FeedWorkspaceFlowTarget = {
mode: "workspace_flow" | "flow_dispatch";
eventType: string;
workspaceUrl?: string;
workspaceUrlEnv?: string;
workspaceSecretEnv?: string;
dispatchUrl?: string;
dispatchUrlEnv?: string;
dispatchSecretEnv?: string;
@ -30,7 +33,7 @@ export type FeedSourceConfig = {
webUrl: string;
defaultBranch?: string;
};
target?: FeedForkSyncTarget | FeedFlowDispatchTarget;
target?: FeedForkSyncTarget | FeedWorkspaceFlowTarget;
pollIntervalSeconds?: number;
primeOnly?: boolean;
};
@ -78,9 +81,35 @@ export type FlowEvent<T = unknown> = {
export type FlowDispatchRecord = {
eventId: string;
eventType: string;
operation?: "dispatch" | "replay";
target?: "local" | "workspace-backend";
transport?: "local" | "workspace-http" | "workspace-ws";
workspaceBackendUrl?: string;
url?: string;
status: "dispatched" | "failed" | "skipped";
runIds?: string[];
matched?: number;
idempotent?: boolean;
httpStatus?: number;
error?: string;
createdAt: string;
};
export type WorkspaceDispatchRecord = FlowDispatchRecord;
export type MaintenanceAttemptRecord = {
id: string;
eventId: string;
eventType: string;
operation: "dispatch" | "replay";
status: "started" | "failed" | "skipped";
upstreamRepo?: string;
upstreamRef?: string;
upstreamSha?: string;
upstreamTag?: string;
workspaceBackendUrl?: string;
workspaceRunIds: string[];
candidateRefs: string[];
error?: string;
createdAt: string;
};

View file

@ -0,0 +1,295 @@
import {
createFlowClient,
type FlowCancelResult,
type FlowClient,
type FlowDispatchOptions,
type FlowDispatchResult,
type FlowEventList,
type FlowListEventsOptions,
type FlowListRunsOptions,
type FlowReplayOptions,
type FlowReplayResult,
type FlowRunList,
type FlowRunView,
} from "@peezy.tech/flow-runtime/client";
import {
normalizeDispatchResult,
normalizeEvent,
normalizeEventList,
normalizeRun,
normalizeRunList,
} from "@peezy.tech/flow-runtime/backend-client";
import type { FeedWorkspaceFlowTarget, FlowEvent } from "./types";
export type WorkspaceBackendFetch = (url: string, init: RequestInit) => Promise<Response>;
export type WorkspaceBackendConfig = {
env?: Record<string, string | undefined>;
fetchImpl?: WorkspaceBackendFetch;
cwd?: string;
};
export type PatchWorkspaceBackend = {
mode: "local" | "workspace-http" | "workspace-ws";
url?: string;
eventsUrl?: string;
client: FlowClient;
};
type WorkspaceJsonRpcResponse = {
jsonrpc: "2.0";
id: string | number | null;
result?: unknown;
error?: {
code?: number;
message?: string;
data?: unknown;
};
};
export function createPatchWorkspaceBackend(
target: Partial<FeedWorkspaceFlowTarget> = {},
config: WorkspaceBackendConfig = {},
): PatchWorkspaceBackend {
const env = config.env ?? process.env;
const url = targetWorkspaceBackendUrl(target, env);
if (url) {
if (isWebSocketUrl(url)) {
return {
mode: "workspace-ws",
url,
client: new WorkspaceBackendWebSocketFlowClient(url),
};
}
const baseUrl = workspaceBackendHttpBaseUrl(url);
return {
mode: "workspace-http",
url: baseUrl,
eventsUrl: workspaceBackendEventsUrl(url),
client: createFlowClient({
mode: "http",
baseUrl,
hmacSecret: targetWorkspaceSecret(target, env),
...(config.fetchImpl ? { fetch: patchFetch(config.fetchImpl) } : {}),
}),
};
}
return {
mode: "local",
client: createFlowClient({
mode: "local",
cwd: config.cwd ?? process.cwd(),
env,
codex: {
command: env.CODEX_APP_SERVER_CODEX_COMMAND,
codexHome: env.CODEX_HOME,
stream: true,
},
}),
};
}
export function targetWorkspaceBackendUrl(
target: Partial<FeedWorkspaceFlowTarget>,
env: Record<string, string | undefined>,
): string | undefined {
const explicit = target.workspaceUrl?.trim() || target.dispatchUrl?.trim();
if (explicit) {
return explicit;
}
for (const envName of [target.workspaceUrlEnv, target.dispatchUrlEnv]) {
const value = envName?.trim() ? env[envName.trim()]?.trim() : undefined;
if (value) {
return value;
}
}
return env.PATCH_WORKSPACE_BACKEND_URL?.trim() ||
env.PATCH_FLOW_BACKEND_URL?.trim() ||
env.PATCH_FLOW_DISPATCH_URL?.trim() ||
undefined;
}
export function targetWorkspaceSecret(
target: Partial<FeedWorkspaceFlowTarget>,
env: Record<string, string | undefined>,
): string | undefined {
for (const envName of [target.workspaceSecretEnv, target.dispatchSecretEnv]) {
const value = envName?.trim() ? env[envName.trim()]?.trim() : undefined;
if (value) {
return value;
}
}
return env.PATCH_WORKSPACE_BACKEND_SECRET?.trim() ||
env.PATCH_FLOW_DISPATCH_SECRET?.trim() ||
undefined;
}
export function workspaceBackendHttpBaseUrl(url: string): string {
return url.replace(/\/(?:events|flow-events)\/?$/, "").replace(/\/+$/, "");
}
export function workspaceBackendEventsUrl(url: string): string {
return `${workspaceBackendHttpBaseUrl(url)}/events`;
}
function isWebSocketUrl(url: string): boolean {
return url.startsWith("ws://") || url.startsWith("wss://");
}
function patchFetch(fetchImpl: WorkspaceBackendFetch) {
return async (input: string | URL | Request, init?: RequestInit): Promise<Response> => {
return fetchImpl(String(input), init ?? {});
};
}
class WorkspaceBackendWebSocketFlowClient implements FlowClient {
#url: string;
constructor(url: string) {
this.#url = url;
}
async listRuns(options: FlowListRunsOptions = {}): Promise<FlowRunList> {
return normalizeRunList(await this.#request("flow.listRuns", options));
}
async getRun(runId: string): Promise<FlowRunView> {
const raw = await this.#request("flow.getRun", { runId });
return normalizeRun(record(raw).run ?? raw);
}
async listEvents(options: FlowListEventsOptions = {}): Promise<FlowEventList> {
return normalizeEventList(await this.#request("flow.listEvents", options));
}
async getEvent(eventId: string) {
const raw = await this.#request("flow.getEvent", { eventId });
return normalizeEvent(record(raw).event ?? raw, record(raw).runs);
}
async dispatchEvent(event: FlowEvent, options: FlowDispatchOptions = {}): Promise<FlowDispatchResult> {
return normalizeDispatchResult(await this.#request("flow.dispatch", {
event,
...options,
}));
}
async replayEvent(eventId: string, options: FlowReplayOptions = {}): Promise<FlowReplayResult> {
return normalizeDispatchResult(await this.#request("flow.replay", {
eventId,
...options,
}));
}
async cancelRun(runId: string): Promise<FlowCancelResult> {
const raw = await this.#request("flow.cancelRun", { runId });
return {
run: normalizeRun(record(raw).run ?? raw),
raw,
};
}
async #request(method: string, params?: unknown): Promise<unknown> {
return workspaceBackendWebSocketRequest(this.#url, method, params);
}
}
function workspaceBackendWebSocketRequest(url: string, method: string, params?: unknown): Promise<unknown> {
return new Promise((resolve, reject) => {
const socket = new WebSocket(url);
let nextId = 1;
let initId = 0;
let callId = 0;
let settled = false;
const timer = setTimeout(() => fail(new Error(`Workspace backend ${method} timed out`)), 90_000);
function requestId(): number {
const id = nextId;
nextId += 1;
return id;
}
function sendRequest(requestMethod: string, requestParams?: unknown): number {
const id = requestId();
socket.send(JSON.stringify({
jsonrpc: "2.0",
id,
method: requestMethod,
params: requestParams,
}));
return id;
}
function finish(value: unknown): void {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
socket.close();
resolve(value);
}
function fail(error: unknown): void {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
socket.close();
reject(error);
}
socket.addEventListener("open", () => {
initId = sendRequest("workspace.initialize", {
clientInfo: {
name: "patch-moi",
title: "patch.moi",
version: "0.1.0",
},
capabilities: {
appServerPassThrough: true,
},
});
});
socket.addEventListener("message", (event) => {
let response: WorkspaceJsonRpcResponse;
try {
response = JSON.parse(String(event.data)) as WorkspaceJsonRpcResponse;
} catch {
fail(new Error("Workspace backend returned invalid JSON-RPC"));
return;
}
if (response.id === undefined || response.id === null) {
return;
}
if (response.error) {
fail(new Error(response.error.message ?? `Workspace backend ${method} failed`));
return;
}
if (response.id === initId) {
callId = sendRequest(method, params);
return;
}
if (response.id === callId) {
finish(response.result);
}
});
socket.addEventListener("error", () => {
fail(new Error(`Workspace backend WebSocket request failed: ${method}`));
});
socket.addEventListener("close", () => {
if (!settled) {
fail(new Error(`Workspace backend WebSocket closed before ${method} completed`));
}
});
});
}
function record(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value)
? value as Record<string, unknown>
: {};
}

View file

@ -3,7 +3,7 @@ import { join } from "node:path";
import { tmpdir } from "node:os";
import { describe, expect, test } from "bun:test";
import { loadSources, parseFeedEntries, pollFeedsOnce, signalFromEntry } from "../src/feed";
import { dispatchFlowEvent, patchUpstreamReleaseEvent } from "../src/flow";
import { dispatchWorkspaceEvent, patchUpstreamReleaseEvent } from "../src/flow";
import type { FeedSourceConfig } from "../src/types";
const atom = `<?xml version="1.0"?>
@ -139,7 +139,7 @@ describe("feed watcher", () => {
expect(feedCalls).toBe(1);
});
test("later polls dispatch generic flow events", async () => {
test("later polls dispatch generic flow events through the workspace backend adapter", async () => {
const dataDir = await mkdtemp(join(tmpdir(), "patch-feed-"));
const sourcesPath = join(dataDir, "sources.json");
const releaseSource: FeedSourceConfig = {
@ -148,10 +148,10 @@ describe("feed watcher", () => {
url: "https://github.com/openai/codex/releases.atom",
event: "release",
target: {
mode: "flow_dispatch",
mode: "workspace_flow",
eventType: "upstream.release",
dispatchUrlEnv: "FLOW_URL",
dispatchSecretEnv: "FLOW_SECRET",
workspaceUrlEnv: "WORKSPACE_URL",
workspaceSecretEnv: "WORKSPACE_SECRET",
payload: {
repo: "openai/codex",
provider: "github",
@ -174,8 +174,8 @@ describe("feed watcher", () => {
discord: { enabled: false, notifyEvents: new Set(["release"]) },
flowDispatch: {
env: {
FLOW_URL: "https://flow.example/events",
FLOW_SECRET: "secret",
WORKSPACE_URL: "https://workspace.example/events",
WORKSPACE_SECRET: "secret",
},
fetchImpl: async (_url, init) => {
dispatchedBody = String(init.body);
@ -195,14 +195,26 @@ describe("feed watcher", () => {
expect(flowEvent.payload.tag).toBe("v1.2.3");
expect(JSON.parse(dispatchedBody).id).toBe(flowEvent.id);
expect(dispatchedSignature).toMatch(/^sha256=[0-9a-f]{64}$/);
expect(await readFile(join(dataDir, "flow-dispatches.jsonl"), "utf8")).toContain("\"status\":\"dispatched\"");
expect(await readFile(join(dataDir, "workspace-dispatches.jsonl"), "utf8")).toContain("\"transport\":\"workspace-http\"");
const attempt = JSON.parse((await readFile(join(dataDir, "maintenance-attempts.jsonl"), "utf8")).trim());
expect(attempt).toMatchObject({
eventId: flowEvent.id,
eventType: "upstream.release",
operation: "dispatch",
status: "started",
upstreamRepo: "openai/codex",
upstreamTag: "v1.2.3",
workspaceBackendUrl: "https://workspace.example",
workspaceRunIds: [],
candidateRefs: [],
});
});
test("flow dispatch uses default Patch env names", async () => {
test("workspace dispatch uses default workspace backend env names", async () => {
let dispatchedUrl = "";
let dispatchedSignature = "";
const record = await dispatchFlowEvent({
const record = await dispatchWorkspaceEvent({
id: "patch:source:entry:upstream.release",
type: "upstream.release",
source: "patch",
@ -210,8 +222,8 @@ describe("feed watcher", () => {
payload: { repo: "openai/codex", tag: "v1.2.3" },
}, {}, {
env: {
PATCH_FLOW_DISPATCH_URL: "https://flow.example/events",
PATCH_FLOW_DISPATCH_SECRET: "secret",
PATCH_WORKSPACE_BACKEND_URL: "https://workspace.example",
PATCH_WORKSPACE_BACKEND_SECRET: "secret",
},
fetchImpl: async (url, init) => {
dispatchedUrl = url;
@ -221,12 +233,130 @@ describe("feed watcher", () => {
});
expect(record.status).toBe("dispatched");
expect(dispatchedUrl).toBe("https://flow.example/events");
expect(record).toMatchObject({ target: "workspace-backend", transport: "workspace-http" });
expect(dispatchedUrl).toBe("https://workspace.example/events");
expect(dispatchedSignature).toMatch(/^sha256=[0-9a-f]{64}$/);
});
test("flow dispatch records backend HTTP failures", async () => {
const record = await dispatchFlowEvent({
test("workspace dispatch accepts legacy Patch dispatch URL env name", async () => {
let dispatchedUrl = "";
const record = await dispatchWorkspaceEvent({
id: "patch:source:entry:upstream.release",
type: "upstream.release",
source: "patch",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { repo: "openai/codex", tag: "v1.2.3" },
}, {}, {
env: {
PATCH_FLOW_DISPATCH_URL: "https://flow.example/events",
},
fetchImpl: async (url) => {
dispatchedUrl = url;
return Response.json({ status: "accepted", eventId: "event-1", runIds: [], matched: 0 }, { status: 202 });
},
});
expect(record.status).toBe("dispatched");
expect(dispatchedUrl).toBe("https://flow.example/events");
});
test("workspace dispatch accepts legacy Patch backend URL env name", async () => {
let dispatchedUrl = "";
const record = await dispatchWorkspaceEvent({
id: "patch:source:entry:upstream.release",
type: "upstream.release",
source: "patch",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { repo: "openai/codex", tag: "v1.2.3" },
}, {}, {
env: {
PATCH_FLOW_BACKEND_URL: "https://flow.example",
},
fetchImpl: async (url) => {
dispatchedUrl = url;
return Response.json({ status: "accepted", eventId: "event-1", runIds: [], matched: 0 }, { status: 202 });
},
});
expect(record.status).toBe("dispatched");
expect(dispatchedUrl).toBe("https://flow.example/events");
});
test("workspace dispatch can use the workspace backend websocket flow method", async () => {
const methods: string[] = [];
const server = Bun.serve({
hostname: "127.0.0.1",
port: 0,
fetch(request, bunServer) {
if (bunServer.upgrade(request)) {
return undefined;
}
return new Response("upgrade required", { status: 426 });
},
websocket: {
message(socket, message) {
const request = JSON.parse(String(message)) as {
id: number;
method: string;
params?: { capabilities?: { appServerPassThrough?: boolean }; event?: { id?: string } };
};
methods.push(request.method);
if (request.method === "workspace.initialize") {
expect(request.params?.capabilities?.appServerPassThrough).toBe(true);
socket.send(JSON.stringify({
jsonrpc: "2.0",
id: request.id,
result: {
ok: true,
serverInfo: { name: "test", version: "0.0.0" },
capabilities: { appServerPassThrough: true, workspaceMethods: ["flow.dispatch"], flowInspection: true },
},
}));
return;
}
socket.send(JSON.stringify({
jsonrpc: "2.0",
id: request.id,
result: {
status: "accepted",
eventId: request.params?.event?.id,
runIds: ["run-1"],
matched: 1,
},
}));
},
},
});
try {
const record = await dispatchWorkspaceEvent({
id: "patch:source:entry:upstream.release",
type: "upstream.release",
source: "patch",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { repo: "openai/codex", tag: "v1.2.3" },
}, {}, {
env: {
PATCH_WORKSPACE_BACKEND_URL: `ws://127.0.0.1:${server.port}`,
},
});
expect(methods).toEqual(["workspace.initialize", "flow.dispatch"]);
expect(record).toMatchObject({
status: "dispatched",
transport: "workspace-ws",
runIds: ["run-1"],
matched: 1,
});
} finally {
server.stop(true);
}
});
test("workspace dispatch records backend HTTP failures", async () => {
const record = await dispatchWorkspaceEvent({
id: "patch:source:entry:upstream.release",
type: "upstream.release",
source: "patch",
@ -246,11 +376,11 @@ describe("feed watcher", () => {
});
});
test("flow dispatch uses local mode when no backend URL is configured", async () => {
test("workspace dispatch uses local mode when no backend URL is configured", async () => {
const dataDir = await mkdtemp(join(tmpdir(), "patch-flow-local-"));
await writeDemoFlow(dataDir);
const record = await dispatchFlowEvent({
const record = await dispatchWorkspaceEvent({
id: "patch:local:demo",
type: "demo.event",
source: "patch",
@ -265,6 +395,8 @@ describe("feed watcher", () => {
eventId: "patch:local:demo",
eventType: "demo.event",
status: "dispatched",
target: "local",
transport: "local",
});
expect(record.url).toBeUndefined();
expect(JSON.parse(await readFile(join(dataDir, "local-flow-output.json"), "utf8"))).toEqual({

View file

@ -27,6 +27,9 @@ describe("server", () => {
test("lists, retries, and replays stored flow events behind admin auth", async () => {
const originalFetch = globalThis.fetch;
const originalWorkspaceUrl = process.env.PATCH_WORKSPACE_BACKEND_URL;
const originalWorkspaceSecret = process.env.PATCH_WORKSPACE_BACKEND_SECRET;
const originalBackendUrl = process.env.PATCH_FLOW_BACKEND_URL;
const originalDispatchUrl = process.env.PATCH_FLOW_DISPATCH_URL;
const originalDispatchSecret = process.env.PATCH_FLOW_DISPATCH_SECRET;
const dataDir = await mkdtemp(join(tmpdir(), "patch-"));
@ -39,7 +42,7 @@ describe("server", () => {
payload: { repo: "openai/codex", tag: "v1.2.3" },
};
await store.appendFlowEvent(event);
await store.appendFlowDispatch({
await store.appendWorkspaceDispatch({
eventId: event.id,
eventType: event.type,
status: "failed",
@ -48,8 +51,11 @@ describe("server", () => {
});
const calls: Array<{ url: string; body: string; headers: Headers }> = [];
process.env.PATCH_FLOW_DISPATCH_URL = "http://172.20.0.1:7345/events";
process.env.PATCH_FLOW_DISPATCH_SECRET = "secret";
process.env.PATCH_WORKSPACE_BACKEND_URL = "http://172.20.0.1:3586/events";
process.env.PATCH_WORKSPACE_BACKEND_SECRET = "secret";
delete process.env.PATCH_FLOW_BACKEND_URL;
delete process.env.PATCH_FLOW_DISPATCH_URL;
delete process.env.PATCH_FLOW_DISPATCH_SECRET;
globalThis.fetch = (async (url: string | URL | Request, init?: RequestInit) => {
calls.push({
url: String(url),
@ -74,7 +80,7 @@ describe("server", () => {
expect(list.status).toBe(200);
expect(await list.json()).toMatchObject({ events: [{ id: event.id, type: event.type }] });
const dispatches = await handler(new Request("http://localhost/flow-dispatches?status=failed", {
const dispatches = await handler(new Request("http://localhost/workspace-dispatches?status=failed", {
headers: { "x-patch-admin-token": "admin" },
}));
expect(dispatches.status).toBe(200);
@ -85,7 +91,7 @@ describe("server", () => {
headers: { authorization: "Bearer admin" },
}));
expect(retry.status).toBe(202);
expect(calls.at(-1)?.url).toBe("http://172.20.0.1:7345/events");
expect(calls.at(-1)?.url).toBe("http://172.20.0.1:3586/events");
expect(JSON.parse(calls.at(-1)?.body ?? "{}")).toMatchObject({ id: event.id });
expect(calls.at(-1)?.headers.get("x-flow-signature-256")).toMatch(/^sha256=[0-9a-f]{64}$/);
@ -94,10 +100,36 @@ describe("server", () => {
headers: { authorization: "Bearer admin" },
}));
expect(replay.status).toBe(202);
expect(calls.at(-1)?.url).toBe(`http://172.20.0.1:7345/events/${encodeURIComponent(event.id)}/replay`);
expect(calls.at(-1)?.url).toBe(`http://172.20.0.1:3586/events/${encodeURIComponent(event.id)}/replay`);
expect(JSON.parse(calls.at(-1)?.body ?? "{}")).toEqual({ wait: false });
const attempts = await handler(new Request(`http://localhost/maintenance-attempts?eventId=${encodeURIComponent(event.id)}`, {
headers: { authorization: "Bearer admin" },
}));
expect(attempts.status).toBe(200);
expect(await attempts.json()).toMatchObject({
attempts: [
{ eventId: event.id, operation: "replay", status: "started", upstreamRepo: "openai/codex" },
{ eventId: event.id, operation: "dispatch", status: "started", upstreamRepo: "openai/codex" },
],
});
} finally {
globalThis.fetch = originalFetch;
if (originalWorkspaceUrl === undefined) {
delete process.env.PATCH_WORKSPACE_BACKEND_URL;
} else {
process.env.PATCH_WORKSPACE_BACKEND_URL = originalWorkspaceUrl;
}
if (originalWorkspaceSecret === undefined) {
delete process.env.PATCH_WORKSPACE_BACKEND_SECRET;
} else {
process.env.PATCH_WORKSPACE_BACKEND_SECRET = originalWorkspaceSecret;
}
if (originalBackendUrl === undefined) {
delete process.env.PATCH_FLOW_BACKEND_URL;
} else {
process.env.PATCH_FLOW_BACKEND_URL = originalBackendUrl;
}
if (originalDispatchUrl === undefined) {
delete process.env.PATCH_FLOW_DISPATCH_URL;
} else {
@ -110,4 +142,65 @@ describe("server", () => {
}
}
});
test("inspects workspace backend runs and events behind admin auth", async () => {
const originalFetch = globalThis.fetch;
const originalWorkspaceUrl = process.env.PATCH_WORKSPACE_BACKEND_URL;
const calls: Array<{ url: string; method?: string }> = [];
process.env.PATCH_WORKSPACE_BACKEND_URL = "http://127.0.0.1:3586";
globalThis.fetch = (async (url: string | URL | Request, init?: RequestInit) => {
calls.push({ url: String(url), method: init?.method });
if (String(url).includes("/runs/run-1")) {
return Response.json({ run: { id: "run-1", eventId: "event-1", status: "completed" } });
}
if (String(url).includes("/events/event-1")) {
return Response.json({ event: { id: "event-1", type: "upstream.release", runIds: ["run-1"] }, runs: [] });
}
if (String(url).includes("/events")) {
return Response.json({ events: [{ id: "event-1", type: "upstream.release", runIds: ["run-1"] }] });
}
return Response.json({ runs: [{ id: "run-1", eventId: "event-1", status: "completed" }] });
}) as unknown as typeof fetch;
try {
const handler = createHandler({
dataDir: await mkdtemp(join(tmpdir(), "patch-")),
adminToken: "admin",
});
const unauthorized = await handler(new Request("http://localhost/workspace-runs"));
expect(unauthorized.status).toBe(401);
const runs = await handler(new Request("http://localhost/workspace-runs?eventId=event-1", {
headers: { authorization: "Bearer admin" },
}));
expect(runs.status).toBe(200);
expect(await runs.json()).toMatchObject({ runs: [{ id: "run-1", eventId: "event-1" }] });
const run = await handler(new Request("http://localhost/workspace-runs/run-1", {
headers: { authorization: "Bearer admin" },
}));
expect(run.status).toBe(200);
expect(await run.json()).toMatchObject({ run: { id: "run-1", eventId: "event-1" } });
const event = await handler(new Request("http://localhost/workspace-events/event-1", {
headers: { authorization: "Bearer admin" },
}));
expect(event.status).toBe(200);
expect(await event.json()).toMatchObject({ event: { id: "event-1", type: "upstream.release" } });
expect(calls.map((call) => new URL(call.url).pathname)).toEqual([
"/runs",
"/runs/run-1",
"/events/event-1",
]);
} finally {
globalThis.fetch = originalFetch;
if (originalWorkspaceUrl === undefined) {
delete process.env.PATCH_WORKSPACE_BACKEND_URL;
} else {
process.env.PATCH_WORKSPACE_BACKEND_URL = originalWorkspaceUrl;
}
}
});
});

View file

@ -1,8 +1,8 @@
import { describe, expect, test } from "bun:test";
import { hmacSha256Hex } from "../src/signatures";
describe("flow signatures", () => {
test("builds HMAC-SHA256 digests for flow dispatch signing", async () => {
describe("workspace signatures", () => {
test("builds HMAC-SHA256 digests for workspace HTTP signing", async () => {
expect(await hmacSha256Hex("secret", "payload")).toBe(
"b82fcb791acec57859b989b430a826488ce2e479fdf92326bd0a2e8375a42ba4",
);

View file

@ -22,9 +22,10 @@ The current service has these runtime pieces:
- The HTTP server exposes health and admin flow endpoints.
- The feed poller reads configured upstream feeds on an interval.
- The JSONL store keeps feed signals, flow events, and dispatch outcomes under
`DATA_DIR`.
- The flow client can execute locally or dispatch to an HTTP flow backend.
- The JSONL store keeps feed signals, flow events, maintenance attempts, and
workspace dispatch records under `DATA_DIR`.
- The workspace backend adapter can execute locally or call a configured Codex
workspace backend, such as `codex-workspace-backend-local`.
Those pieces are the intake layer. The patch-stack layer can run in local mode
against a checkout, or in service mode through a remote forge workflow and

View file

@ -1,6 +1,6 @@
---
title: Flow boundary
description: What patch.moi owns, what codex-flow owns, and where gateway-style workspace orchestration belongs.
description: What patch.moi owns, what codex-flow owns, and where workspace backend orchestration belongs.
---
# Flow boundary
@ -12,6 +12,11 @@ workflow runs, dispatch attempts, and operator-visible state.
codex-flow owns portable event execution. Flow packages match `FlowEvent.type`
and payload schema, run Bun or Code Mode steps, and emit `FLOW_RESULT`.
The Codex workspace backend owns workspace control surfaces: app-server
pass-through, delegation, flow execution transport, workbench state, and the
HTTP/WebSocket protocol around those capabilities. patch.moi should use that
surface instead of redefining it.
The boundary should stay narrow:
| Layer | Owns |
@ -19,6 +24,7 @@ The boundary should stay narrow:
| patch.moi intake | feeds, source ids, feed state, update records |
| patch.moi orchestration | maintained repo selection, remote branch policy, workflow triggers, retry and review state |
| codex-flow | generic event matching, step execution, run state, `FLOW_RESULT` |
| codex workspace backend | workspace transport, app-server bridge, delegation, flow HTTP routes |
| local workspace or forge runner | git operations, conflict resolution, checks, candidate refs |
| release channel | deploy, publish, trusted publishing, rollback policy |
@ -31,13 +37,13 @@ patch.moi should be able to say: this upstream release produced this workflow
run, which produced this candidate branch, which was used by this internal build
or public release. A single flow event cannot hold that lifecycle cleanly.
## Service Backend
## Service State
A patch.moi service backend is useful when patch.moi needs to coordinate a
remote forge, human intervention, and operator surfaces. That backend can own
patch.moi-specific service state while still using codex-flow for generic event
execution where it fits.
patch.moi-specific service state while still dispatching generic flow events to
the Codex workspace backend where it fits.
The rule is simple: use flow events for portable automation triggers, and use a
patch.moi backend for patch-stack product state: remote refs, workflow runs,
patch.moi service for patch-stack product state: remote refs, workflow runs,
pull requests, issues, checks, artifacts, and review status.

View file

@ -29,21 +29,22 @@ feeds. Patch normalizes both into `FeedSignal` records.
}
```
`flow_dispatch` creates a generic `FlowEvent` and dispatches it:
`workspace_flow` creates a generic `FlowEvent` and submits it to the workspace
backend adapter:
```json
{
"mode": "flow_dispatch",
"mode": "workspace_flow",
"eventType": "upstream.release",
"dispatchUrlEnv": "PATCH_FLOW_DISPATCH_URL",
"dispatchSecretEnv": "PATCH_FLOW_DISPATCH_SECRET",
"workspaceUrlEnv": "PATCH_WORKSPACE_BACKEND_URL",
"workspaceSecretEnv": "PATCH_WORKSPACE_BACKEND_SECRET",
"payload": {
"repo": "openai/codex"
}
}
```
For patch-stack maintenance, prefer `flow_dispatch` to create an
For patch-stack maintenance, prefer `workspace_flow` to create an
`upstream.release` or `upstream.update` trigger. Let the receiving workspace read
Git to discover the maintained patch branch and candidate refs.

View file

@ -57,9 +57,10 @@ DATA_DIR=./data FEED_SOURCES_PATH=./feed-sources.json bun run dev
`GET /healthz` returns `ok` when the server is running.
Local flow dispatch runs from the process working directory when
`PATCH_FLOW_DISPATCH_URL` is unset. That makes local mode useful for testing a
patch application workspace before sending the same event to a service backend.
Local workspace execution runs from the process working directory when no
workspace backend URL is set. That makes local mode useful for testing a patch
application workspace before sending the same event to a configured workspace
backend or service runner.
Local mode is checkout-oriented. Service mode is forge-oriented: patch.moi
should talk to the remote forge, trigger a runner, and let that runner perform

View file

@ -16,8 +16,8 @@ The product boundary is Git-first:
- Maintained forks stay fork remotes and patch branches.
- Patch stacks are commits, branches, and tags, not a second Patch-specific
project file.
- patch.moi records observations, dispatch attempts, workflow runs, and review
state around those Git facts.
- patch.moi records observations, maintenance attempts, workspace run ids, and
review state around those Git facts.
- Local Codex workspaces or forge runners do the maintenance work: rebase patch
commits, resolve conflicts, build candidates, and leave human intervention
points when needed.
@ -50,8 +50,8 @@ flowchart LR
## What is in this repo
- `apps/patch`: the Patch Bun service, feed poller, JSONL store, and flow
dispatch adapter.
- `apps/patch`: the Patch Bun service, feed poller, JSONL store, and workspace
backend adapter.
- `docs`: this Tome documentation site, organized with the Diataxis framework.
- `Dockerfile`: container image for the Patch service app.

View file

@ -1,6 +1,6 @@
---
title: Flow event retry and replay
description: Reference for local or HTTP flow execution and retrying stored update triggers.
description: Reference for local or workspace backend execution and retrying stored update triggers.
---
# Flow Event Retry And Replay
@ -11,27 +11,29 @@ Patch creates deterministic event ids:
patch:<sourceId>:<entryId>:<eventType>
```
Dispatch is idempotent at the flow backend by event id. Replay intentionally
asks the backend to create another attempt for the stored event.
Dispatch is idempotent at the workspace backend flow capability by event id.
Replay intentionally asks that capability to create another attempt for the
stored event.
For patch-stack maintenance, treat the event as an update trigger. The
authoritative patch state still comes from Git when the local workspace or forge
runner runs.
## Select HTTP dispatch
## Select a workspace backend
```bash
PATCH_FLOW_DISPATCH_URL=http://127.0.0.1:7345/events
PATCH_FLOW_DISPATCH_SECRET=dev-secret
PATCH_WORKSPACE_BACKEND_URL=http://127.0.0.1:3586
PATCH_WORKSPACE_BACKEND_SECRET=dev-secret
```
Patch signs HTTP dispatches with the shared flow HMAC header when a secret is
configured.
Patch signs HTTP workspace dispatches with the shared flow HMAC header when a
secret is configured. The backend URL can be a base URL or `/events` URL.
## Use local dispatch
Leave `PATCH_FLOW_DISPATCH_URL` unset. Patch creates a local flow client rooted
at the app working directory and runs matching flows synchronously.
Leave `PATCH_WORKSPACE_BACKEND_URL`, `PATCH_FLOW_BACKEND_URL`, and
`PATCH_FLOW_DISPATCH_URL` unset. Patch creates a local flow client rooted at the
app working directory and runs matching flows synchronously.
## Retry or replay
@ -40,9 +42,22 @@ curl -X POST http://127.0.0.1:3000/flow-events/<event-id>/retry
curl -X POST http://127.0.0.1:3000/flow-events/<event-id>/replay
```
`retry` dispatches the stored event again. `replay` calls the backend replay
endpoint when HTTP mode is configured, or dispatches locally when no backend URL
is configured.
`retry` dispatches the stored event again. `replay` calls the configured
workspace backend replay operation when a backend URL is configured, or
dispatches locally when no backend URL is configured.
Each retry or replay writes a maintenance attempt record:
```bash
curl http://127.0.0.1:3000/maintenance-attempts?eventId=<event-id>
```
Use workspace inspection endpoints to read backend-owned run state:
```bash
curl http://127.0.0.1:3000/workspace-runs?eventId=<event-id>
curl http://127.0.0.1:3000/workspace-events/<event-id>
```
Retrying or replaying an event should not rewrite patch branches by itself. The
workspace or flow package decides whether to push candidate refs after it checks

View file

@ -12,17 +12,20 @@ description: Runtime environment variables used by patch.moi.
| `DATA_DIR` | `./data` | Directory for JSONL state files. |
| `FEED_SOURCES_PATH` | unset | Enables feed polling from the configured JSON file. |
| `PATCH_ADMIN_TOKEN` | unset | Protects admin flow endpoints when set. |
| `PATCH_FLOW_DISPATCH_URL` | unset | Default flow backend URL for dispatch targets. |
| `PATCH_FLOW_BACKEND_URL` | unset | Alternate default backend base URL. |
| `PATCH_FLOW_DISPATCH_SECRET` | unset | HMAC secret for HTTP flow dispatch. |
| `PATCH_WORKSPACE_BACKEND_URL` | unset | Preferred workspace backend URL for execution; accepts an HTTP base URL, `/events` URL, or workspace WebSocket URL. |
| `PATCH_WORKSPACE_BACKEND_SECRET` | unset | HMAC secret for HTTP workspace flow dispatch. |
| `PATCH_FLOW_BACKEND_URL` | unset | Legacy workspace flow HTTP surface URL fallback. |
| `PATCH_FLOW_DISPATCH_URL` | unset | Legacy or explicit flow dispatch URL fallback. |
| `PATCH_FLOW_DISPATCH_SECRET` | unset | Legacy HMAC secret fallback. |
| `CODEX_APP_SERVER_CODEX_COMMAND` | unset | Passed to local code-mode flow execution. |
| `CODEX_HOME` | unset | Passed to local code-mode flow execution. |
Feed target fields can override backend settings with `dispatchUrl`,
`dispatchUrlEnv`, and `dispatchSecretEnv`.
Feed target fields can override backend settings with `workspaceUrl`,
`workspaceUrlEnv`, and `workspaceSecretEnv`. Older `dispatchUrl`,
`dispatchUrlEnv`, and `dispatchSecretEnv` fields remain accepted.
Git topology is intentionally not represented here. Local mode should read
upstream, fork, branch, and tag state from Git. Service mode should read remote
repository, branch, workflow, and review state from the forge. Environment
variables should stay limited to runtime concerns such as dispatch URLs, data
directories, and Codex execution settings.
variables should stay limited to runtime concerns such as workspace backend
URLs, data directories, and Codex execution settings.

View file

@ -24,18 +24,21 @@ type FeedSourceConfig = {
webUrl: string;
defaultBranch?: string;
};
target?: FeedFlowDispatchTarget;
target?: FeedWorkspaceFlowTarget;
pollIntervalSeconds?: number;
primeOnly?: boolean;
};
```
## Flow dispatch target
## Workspace flow target
```ts
type FeedFlowDispatchTarget = {
mode: "flow_dispatch";
type FeedWorkspaceFlowTarget = {
mode: "workspace_flow" | "flow_dispatch";
eventType: string;
workspaceUrl?: string;
workspaceUrlEnv?: string;
workspaceSecretEnv?: string;
dispatchUrl?: string;
dispatchUrlEnv?: string;
dispatchSecretEnv?: string;
@ -43,9 +46,10 @@ type FeedFlowDispatchTarget = {
};
```
The flow payload includes provider, event, source id, entry id, title, URL,
author, published time, repository fields, ref, SHA, tag, and raw feed metadata.
Values from `target.payload` are merged last.
The target creates a generic `FlowEvent` and hands it to the workspace backend
adapter. The flow payload includes provider, event, source id, entry id, title,
URL, author, published time, repository fields, ref, SHA, tag, and raw feed
metadata. Values from `target.payload` are merged last.
For release maintenance, use a stable event type such as `upstream.release` and
include only routing hints in `payload`. Avoid copying branch topology into the

View file

@ -28,15 +28,41 @@ returns the event and matching dispatch records.
These endpoints inspect update triggers and dispatch attempts. They do not
inspect or modify Git patch stacks directly.
## Maintenance attempts
```text
GET /maintenance-attempts?eventId=<id>&status=started|failed|skipped&limit=<n>
```
Maintenance attempts are patch.moi-owned product records. They link an
upstream update trigger to workspace run ids and candidate refs without copying
workspace backend run state into patch.moi.
## Dispatches
```text
GET /workspace-dispatches?eventId=<id>&status=dispatched|failed|skipped&limit=<n>
GET /flow-dispatches?eventId=<id>&status=dispatched|failed|skipped&limit=<n>
```
`/flow-dispatches` is a compatibility alias for older operators.
## Workspace inspection
```text
GET /workspace-events?type=<type>&limit=<n>
GET /workspace-events/:id
GET /workspace-runs?eventId=<id>&status=<status>&limit=<n>
GET /workspace-runs/:id
```
These endpoints proxy the configured workspace backend flow capability. They
inspect backend-owned event and run state; patch.moi still owns update intake
and maintenance attempt records.
## Admin auth
When `PATCH_ADMIN_TOKEN` is set, flow endpoints require one of:
When `PATCH_ADMIN_TOKEN` is set, flow and workspace endpoints require one of:
```text
Authorization: Bearer <token>

View file

@ -13,12 +13,15 @@ operational state. They are not the patch stack.
| `feed-state.json` | Per-source last seen entry and last checked timestamp. |
| `feed-events.jsonl` | Normalized `FeedSignal` records. |
| `flow-events.jsonl` | Generic `FlowEvent` records emitted by flow targets. |
| `flow-dispatches.jsonl` | Dispatch, retry, replay, and failure records. |
| `maintenance-attempts.jsonl` | patch.moi-owned attempt records linking update events to workspace run ids and candidate refs. |
| `workspace-dispatches.jsonl` | Workspace dispatch, retry, replay, and failure records. |
| `flow-dispatches.jsonl` | Legacy dispatch record file read for compatibility. |
Admin endpoints read `flow-events.jsonl` and `flow-dispatches.jsonl`. The feed
poller appends to all relevant files as it accepts new feed entries.
Admin endpoints read `flow-events.jsonl`, `maintenance-attempts.jsonl`,
`workspace-dispatches.jsonl`, and the legacy `flow-dispatches.jsonl` file. The
feed poller appends to all relevant files as it accepts new feed entries.
If a runner checkout is lost, patch.moi should be able to recreate the
maintenance context from remote Git refs and forge records. JSONL state explains
feed and dispatch history; Git and the forge remain the source of truth for
patch contents and review state.
feed, attempt, and dispatch history; Git and the forge remain the source of
truth for patch contents and review state.

View file

@ -15,7 +15,8 @@ Responsibilities:
- Poll configured feeds.
- Normalize entries into `FeedSignal` records.
- Store JSONL state.
- Dispatch generic codex-flow events through `@peezy.tech/flow-runtime/client`.
- Submit generic `FlowEvent` triggers through the patch.moi workspace backend
adapter.
- Serve admin inspection, retry, and replay endpoints.
The service package does not store patch contents. Maintained patch stacks live

View file

@ -33,18 +33,20 @@ git status --short --branch
If `git status` shows local changes or untracked files, resolve them before an
automated rebase.
## 2. Point Patch at a backend
## 2. Point Patch at a workspace backend
```bash
PATCH_FLOW_DISPATCH_URL=http://127.0.0.1:7345/events \
PATCH_FLOW_DISPATCH_SECRET=dev-secret \
PATCH_WORKSPACE_BACKEND_URL=http://127.0.0.1:3586 \
PATCH_WORKSPACE_BACKEND_SECRET=dev-secret \
DATA_DIR=./data \
FEED_SOURCES_PATH=./feed-sources.json \
bun run --filter @peezy.tech/patch start
```
`PATCH_FLOW_DISPATCH_URL` can point at the `/events` endpoint or at the backend
base URL. Patch normalizes either form before it creates the shared flow client.
`PATCH_WORKSPACE_BACKEND_URL` can point at the Codex workspace backend base URL
or its `/events` endpoint. Patch normalizes either HTTP form before calling the
workspace flow capability. `PATCH_FLOW_BACKEND_URL` and
`PATCH_FLOW_DISPATCH_URL` remain accepted for older feed targets.
## 3. Inspect the stored event

View file

@ -38,10 +38,10 @@ Create or edit `apps/patch/feed-sources.json`:
"defaultBranch": "main"
},
"target": {
"mode": "flow_dispatch",
"mode": "workspace_flow",
"eventType": "upstream.release",
"dispatchUrlEnv": "PATCH_FLOW_DISPATCH_URL",
"dispatchSecretEnv": "PATCH_FLOW_DISPATCH_SECRET",
"workspaceUrlEnv": "PATCH_WORKSPACE_BACKEND_URL",
"workspaceSecretEnv": "PATCH_WORKSPACE_BACKEND_SECRET",
"payload": {
"provider": "github",
"repo": "openai/codex"
@ -72,10 +72,13 @@ When the feed later contains an unseen release entry, Patch appends:
- `data/feed-events.jsonl` for the normalized signal.
- `data/flow-events.jsonl` for the generic flow event.
- `data/flow-dispatches.jsonl` for the dispatch outcome.
- `data/maintenance-attempts.jsonl` for the patch.moi maintenance attempt.
- `data/workspace-dispatches.jsonl` for the workspace dispatch outcome.
If `PATCH_FLOW_DISPATCH_URL` is not set, Patch uses local flow execution from
the working directory. If it is set, Patch sends the event to the HTTP backend.
If no workspace backend URL is set, Patch uses local flow execution from the
working directory. If `PATCH_WORKSPACE_BACKEND_URL` is set, Patch sends the
event to that workspace backend's flow capability. Legacy
`PATCH_FLOW_BACKEND_URL` and `PATCH_FLOW_DISPATCH_URL` values remain accepted.
## 4. Connect patch work