From 34802ac71230058286f6e93c5f97f973a9342df1 Mon Sep 17 00:00:00 2001 From: matamune Date: Wed, 13 May 2026 03:49:11 +0000 Subject: [PATCH] Add flow dispatch admin operations --- src/flow.ts | 140 +++++++++++++++++++++++++++++++------------- src/queue.ts | 53 ++++++++++++++++- src/server.ts | 89 ++++++++++++++++++++++++++++ test/server.test.ts | 76 ++++++++++++++++++++++++ 4 files changed, 316 insertions(+), 42 deletions(-) diff --git a/src/flow.ts b/src/flow.ts index d7e1e36..e01d1c7 100644 --- a/src/flow.ts +++ b/src/flow.ts @@ -93,36 +93,25 @@ function targetDispatchSecret( return env[envName]?.trim() || undefined; } -export async function dispatchFlowEventForFeedSignal( - signal: FeedSignal, +export async function dispatchFlowEvent( + event: FlowEvent, + target: Partial = {}, config: FlowDispatchConfig = {}, -): Promise<{ event?: FlowEvent>; record?: FlowDispatchRecord }> { - if (!isFlowDispatchTarget(signal.target)) { - return {}; - } - - const event = flowEventForFeedSignal(signal); - if (!event) { - return {}; - } - +): Promise { const env = config.env ?? process.env; - const url = targetDispatchUrl(signal.target, env); + const url = targetDispatchUrl({ mode: "flow_dispatch", eventType: event.type, ...target }, env); if (!url) { return { - event, - record: { - eventId: event.id, - eventType: event.type, - status: "skipped", - error: "flow dispatch URL is not configured", - createdAt: new Date().toISOString(), - }, + eventId: event.id, + eventType: event.type, + status: "skipped", + error: "flow dispatch URL is not configured", + createdAt: new Date().toISOString(), }; } const body = JSON.stringify(event); - const secret = targetDispatchSecret(signal.target, env); + const secret = targetDispatchSecret({ mode: "flow_dispatch", eventType: event.type, ...target }, env); const headers: Record = { "content-type": "application/json", "x-patchbay-flow-event": event.type, @@ -140,28 +129,97 @@ export async function dispatchFlowEventForFeedSignal( body, }); return { - event, - record: { - eventId: event.id, - eventType: event.type, - url, - status: response.ok ? "dispatched" : "failed", - httpStatus: response.status, - error: response.ok ? undefined : `flow dispatch returned ${response.status}`, - createdAt: new Date().toISOString(), - }, + eventId: event.id, + eventType: event.type, + url, + status: response.ok ? "dispatched" : "failed", + httpStatus: response.status, + error: response.ok ? undefined : `flow dispatch returned ${response.status}`, + createdAt: new Date().toISOString(), }; } catch (error) { return { - event, - record: { - eventId: event.id, - eventType: event.type, - url, - status: "failed", - error: error instanceof Error ? error.message : String(error), - createdAt: new Date().toISOString(), - }, + eventId: event.id, + eventType: event.type, + url, + status: "failed", + error: error instanceof Error ? error.message : String(error), + createdAt: new Date().toISOString(), }; } } + +export async function replayFlowEvent( + event: FlowEvent, + target: Partial = {}, + config: FlowDispatchConfig = {}, +): Promise { + const env = config.env ?? process.env; + const dispatchUrl = targetDispatchUrl({ mode: "flow_dispatch", eventType: event.type, ...target }, env); + if (!dispatchUrl) { + return { + eventId: event.id, + eventType: event.type, + status: "skipped", + error: "flow dispatch URL is not configured", + createdAt: new Date().toISOString(), + }; + } + const url = `${dispatchUrl.replace(/\/(?:events|flow-events)\/?$/, "")}/events/${encodeURIComponent(event.id)}/replay`; + const body = JSON.stringify({ wait: false }); + const secret = targetDispatchSecret({ mode: "flow_dispatch", eventType: event.type, ...target }, env); + const headers: Record = { + "content-type": "application/json", + "x-patchbay-flow-event": event.type, + "x-patchbay-flow-delivery": event.id, + }; + if (secret) { + headers["x-patchbay-flow-signature-256"] = + `sha256=${await hmacSha256Hex(secret, body)}`; + } + + try { + const response = await (config.fetchImpl ?? fetch)(url, { + method: "POST", + headers, + body, + }); + return { + eventId: event.id, + eventType: event.type, + url, + status: response.ok ? "dispatched" : "failed", + httpStatus: response.status, + error: response.ok ? undefined : `flow replay returned ${response.status}`, + createdAt: new Date().toISOString(), + }; + } catch (error) { + return { + eventId: event.id, + eventType: event.type, + url, + status: "failed", + error: error instanceof Error ? error.message : String(error), + createdAt: new Date().toISOString(), + }; + } +} + +export async function dispatchFlowEventForFeedSignal( + signal: FeedSignal, + config: FlowDispatchConfig = {}, +): Promise<{ event?: FlowEvent>; record?: FlowDispatchRecord }> { + if (!isFlowDispatchTarget(signal.target)) { + return {}; + } + + const event = flowEventForFeedSignal(signal); + if (!event) { + return {}; + } + + return { + event, + record: await dispatchFlowEvent(event, signal.target, config), + }; +} diff --git a/src/queue.ts b/src/queue.ts index 75e872e..e7d654b 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,4 +1,4 @@ -import { appendFile, mkdir } from "node:fs/promises"; +import { appendFile, mkdir, readFile } from "node:fs/promises"; import { dirname, join } from "node:path"; import type { FeedJob, FeedSignal, FlowDispatchRecord, FlowEvent, GitWebhookEvent, QueuedJob } from "./types"; @@ -7,6 +7,28 @@ async function appendJsonLine(path: string, value: unknown): Promise { await appendFile(path, `${JSON.stringify(value)}\n`, "utf8"); } +async function readJsonLines(path: string): Promise { + let text = ""; + try { + text = await readFile(path, "utf8"); + } catch (error) { + if (typeof error === "object" && error !== null && (error as { code?: unknown }).code === "ENOENT") { + return []; + } + throw error; + } + return text + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => JSON.parse(line) as T); +} + +function limitNewest(items: T[], limit = 50): T[] { + const safeLimit = Math.max(1, Math.min(500, Math.trunc(limit))); + return items.slice(-safeLimit).reverse(); +} + export class EventStore { readonly eventsPath: string; readonly jobsPath: string; @@ -47,6 +69,35 @@ export class EventStore { async appendFlowDispatch(record: FlowDispatchRecord): Promise { await appendJsonLine(this.flowDispatchesPath, record); } + + async listFlowEvents(options: { limit?: number; type?: string } = {}): Promise { + const events = await readJsonLines(this.flowEventsPath); + return limitNewest( + options.type ? events.filter((event) => event.type === options.type) : events, + options.limit, + ); + } + + async getFlowEvent(eventId: string): Promise { + const events = await readJsonLines(this.flowEventsPath); + for (let index = events.length - 1; index >= 0; index -= 1) { + if (events[index]?.id === eventId) { + return events[index]; + } + } + return undefined; + } + + async listFlowDispatches(options: { limit?: number; eventId?: string; status?: FlowDispatchRecord["status"] } = {}): Promise { + const records = await readJsonLines(this.flowDispatchesPath); + return limitNewest( + records.filter((record) => + (!options.eventId || record.eventId === options.eventId) && + (!options.status || record.status === options.status), + ), + options.limit, + ); + } } export function jobForEvent(event: GitWebhookEvent): QueuedJob | null { diff --git a/src/server.ts b/src/server.ts index 075f8b2..063db4d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import { notifyDiscord, parseDiscordConfig, type DiscordConfig } from "./discord"; import { startFeedPolling } from "./feed"; +import { dispatchFlowEvent, replayFlowEvent } from "./flow"; import { jsonResponse, methodNotAllowed, textResponse } from "./http"; import { normalizeGithubEvent } from "./providers/github"; import { normalizeJojoEvent } from "./providers/jojo"; @@ -15,6 +16,7 @@ export type ServerConfig = { jojoSecret: string; dataDir: string; discord?: DiscordConfig; + adminToken?: string; }; function getHeader(headers: Headers, name: string, fallback: string): string { @@ -59,6 +61,86 @@ async function persistAcceptedEvent(store: EventStore, event: GitWebhookEvent, d }); } +function adminAuthorized(request: Request, config: ServerConfig): boolean { + if (!config.adminToken) { + return true; + } + const bearer = request.headers.get("authorization")?.match(/^Bearer\s+(.+)$/i)?.[1]; + const header = request.headers.get("x-patchbay-admin-token"); + return bearer === config.adminToken || header === config.adminToken; +} + +function requireAdmin(request: Request, config: ServerConfig): Response | undefined { + return adminAuthorized(request, config) ? undefined : jsonResponse({ error: "unauthorized" }, { status: 401 }); +} + +function numberParam(value: string | null): number | undefined { + if (!value) return undefined; + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : undefined; +} + +function dispatchStatus(value: string | null) { + if (!value) return undefined; + if (value === "dispatched" || value === "failed" || value === "skipped") return value; + throw new Error("flow dispatch status must be dispatched, failed, or skipped"); +} + +async function handleFlowEvents(request: Request, config: ServerConfig, store: EventStore): Promise { + const unauthorized = requireAdmin(request, config); + if (unauthorized) return unauthorized; + + const url = new URL(request.url); + if (request.method === "GET" && url.pathname === "/flow-events") { + return jsonResponse({ + events: await store.listFlowEvents({ + type: url.searchParams.get("type") ?? undefined, + limit: numberParam(url.searchParams.get("limit")), + }), + }); + } + + const eventMatch = url.pathname.match(/^\/flow-events\/([^/]+)(?:\/(retry|replay))?$/); + if (!eventMatch?.[1]) return jsonResponse({ error: "not_found" }, { status: 404 }); + + const eventId = decodeURIComponent(eventMatch[1]); + const event = await store.getFlowEvent(eventId); + if (!event) { + return jsonResponse({ error: "flow_event_not_found" }, { status: 404 }); + } + if (request.method === "GET" && !eventMatch[2]) { + return jsonResponse({ + event, + dispatches: await store.listFlowDispatches({ eventId, limit: numberParam(url.searchParams.get("limit")) }), + }); + } + if (request.method === "POST" && eventMatch[2] === "retry") { + const record = await dispatchFlowEvent(event, {}, { env: process.env }); + await store.appendFlowDispatch(record); + return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 }); + } + if (request.method === "POST" && eventMatch[2] === "replay") { + const record = await replayFlowEvent(event, {}, { env: process.env }); + await store.appendFlowDispatch(record); + return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 }); + } + return methodNotAllowed(); +} + +async function handleFlowDispatches(request: Request, config: ServerConfig, store: EventStore): Promise { + const unauthorized = requireAdmin(request, config); + if (unauthorized) return unauthorized; + if (request.method !== "GET") return methodNotAllowed(); + const url = new URL(request.url); + return jsonResponse({ + dispatches: await store.listFlowDispatches({ + eventId: url.searchParams.get("eventId") ?? undefined, + status: dispatchStatus(url.searchParams.get("status")), + limit: numberParam(url.searchParams.get("limit")), + }), + }); +} + async function handleGithub(request: Request, config: ServerConfig, store: EventStore): Promise { if (request.method !== "POST") return methodNotAllowed(); const parsed = await parseJsonBody(request); @@ -107,6 +189,12 @@ export function createHandler(config: ServerConfig): (request: Request) => Promi if (url.pathname === "/jojo") { return handleJojo(request, config, store); } + if (url.pathname === "/flow-events" || url.pathname.startsWith("/flow-events/")) { + return handleFlowEvents(request, config, store); + } + if (url.pathname === "/flow-dispatches") { + return handleFlowDispatches(request, config, store); + } return jsonResponse({ error: "not_found" }, { status: 404 }); }; } @@ -118,6 +206,7 @@ if (import.meta.main) { githubSecret: process.env.GITHUB_WEBHOOK_SECRET ?? "", jojoSecret: process.env.JOJO_WEBHOOK_SECRET ?? "", dataDir: process.env.DATA_DIR ?? "./data", + adminToken: process.env.PATCHBAY_ADMIN_TOKEN, discord: parseDiscordConfig({ webhookUrl: process.env.DISCORD_WEBHOOK_URL, notifyEvents: process.env.DISCORD_NOTIFY_EVENTS, diff --git a/test/server.test.ts b/test/server.test.ts index 55b6060..c95f729 100644 --- a/test/server.test.ts +++ b/test/server.test.ts @@ -2,6 +2,7 @@ import { mkdtemp, readFile } from "node:fs/promises"; import { join } from "node:path"; import { tmpdir } from "node:os"; import { describe, expect, test } from "bun:test"; +import { EventStore } from "../src/queue"; import { createHandler } from "../src/server"; import { hmacSha256Hex } from "../src/signatures"; @@ -99,4 +100,79 @@ describe("server", () => { globalThis.fetch = originalFetch; } }); + + test("lists, retries, and replays stored flow events behind admin auth", async () => { + const originalFetch = globalThis.fetch; + const originalDispatchUrl = process.env.PATCHBAY_FLOW_DISPATCH_URL; + const dataDir = await mkdtemp(join(tmpdir(), "patchbay-")); + const store = new EventStore(dataDir); + const event = { + id: "patchbay:source:entry:upstream.release", + type: "upstream.release", + source: "patchbay", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { repo: "openai/codex", tag: "v1.2.3" }, + }; + await store.appendFlowEvent(event); + await store.appendFlowDispatch({ + eventId: event.id, + eventType: event.type, + status: "failed", + error: "network", + createdAt: "2026-05-13T00:00:01.000Z", + }); + + const calls: Array<{ url: string; body: string }> = []; + process.env.PATCHBAY_FLOW_DISPATCH_URL = "http://172.20.0.1:7345/events"; + globalThis.fetch = (async (url: string | URL | Request, init?: RequestInit) => { + calls.push({ url: String(url), body: String(init?.body ?? "") }); + return new Response("accepted", { status: 202 }); + }) as unknown as typeof fetch; + + try { + const handler = createHandler({ + githubSecret: "gh", + jojoSecret: "jojo", + dataDir, + adminToken: "admin", + }); + + const unauthorized = await handler(new Request("http://localhost/flow-events")); + expect(unauthorized.status).toBe(401); + + const list = await handler(new Request("http://localhost/flow-events", { + headers: { authorization: "Bearer admin" }, + })); + expect(list.status).toBe(200); + expect(await list.json()).toMatchObject({ events: [{ id: event.id, type: event.type }] }); + + const dispatches = await handler(new Request("http://localhost/flow-dispatches?status=failed", { + headers: { "x-patchbay-admin-token": "admin" }, + })); + expect(dispatches.status).toBe(200); + expect(await dispatches.json()).toMatchObject({ dispatches: [{ status: "failed", eventId: event.id }] }); + + const retry = await handler(new Request(`http://localhost/flow-events/${encodeURIComponent(event.id)}/retry`, { + method: "POST", + headers: { authorization: "Bearer admin" }, + })); + expect(retry.status).toBe(202); + expect(calls.at(-1)?.url).toBe("http://172.20.0.1:7345/events"); + expect(JSON.parse(calls.at(-1)?.body ?? "{}")).toMatchObject({ id: event.id }); + + const replay = await handler(new Request(`http://localhost/flow-events/${encodeURIComponent(event.id)}/replay`, { + method: "POST", + headers: { authorization: "Bearer admin" }, + })); + expect(replay.status).toBe(202); + expect(calls.at(-1)?.url).toBe(`http://172.20.0.1:7345/events/${encodeURIComponent(event.id)}/replay`); + } finally { + globalThis.fetch = originalFetch; + if (originalDispatchUrl === undefined) { + delete process.env.PATCHBAY_FLOW_DISPATCH_URL; + } else { + process.env.PATCHBAY_FLOW_DISPATCH_URL = originalDispatchUrl; + } + } + }); });