Add flow dispatch admin operations
All checks were successful
check / check (push) Successful in 38s

This commit is contained in:
matamune 2026-05-13 03:49:11 +00:00
parent 65e6ca8bac
commit 34802ac712
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
4 changed files with 316 additions and 42 deletions

View file

@ -93,36 +93,25 @@ function targetDispatchSecret(
return env[envName]?.trim() || undefined;
}
export async function dispatchFlowEventForFeedSignal(
signal: FeedSignal,
export async function dispatchFlowEvent(
event: FlowEvent,
target: Partial<FeedFlowDispatchTarget> = {},
config: FlowDispatchConfig = {},
): Promise<{ event?: FlowEvent<Record<string, unknown>>; record?: FlowDispatchRecord }> {
if (!isFlowDispatchTarget(signal.target)) {
return {};
}
const event = flowEventForFeedSignal(signal);
if (!event) {
return {};
}
): Promise<FlowDispatchRecord> {
const env = config.env ?? process.env;
const url = targetDispatchUrl(signal.target, env);
const url = targetDispatchUrl({ mode: "flow_dispatch", eventType: event.type, ...target }, env);
if (!url) {
return {
event,
record: {
eventId: event.id,
eventType: event.type,
status: "skipped",
error: "flow dispatch URL is not configured",
createdAt: new Date().toISOString(),
},
eventId: event.id,
eventType: event.type,
status: "skipped",
error: "flow dispatch URL is not configured",
createdAt: new Date().toISOString(),
};
}
const body = JSON.stringify(event);
const secret = targetDispatchSecret(signal.target, env);
const secret = targetDispatchSecret({ mode: "flow_dispatch", eventType: event.type, ...target }, env);
const headers: Record<string, string> = {
"content-type": "application/json",
"x-patchbay-flow-event": event.type,
@ -140,28 +129,97 @@ export async function dispatchFlowEventForFeedSignal(
body,
});
return {
event,
record: {
eventId: event.id,
eventType: event.type,
url,
status: response.ok ? "dispatched" : "failed",
httpStatus: response.status,
error: response.ok ? undefined : `flow dispatch returned ${response.status}`,
createdAt: new Date().toISOString(),
},
eventId: event.id,
eventType: event.type,
url,
status: response.ok ? "dispatched" : "failed",
httpStatus: response.status,
error: response.ok ? undefined : `flow dispatch returned ${response.status}`,
createdAt: new Date().toISOString(),
};
} catch (error) {
return {
event,
record: {
eventId: event.id,
eventType: event.type,
url,
status: "failed",
error: error instanceof Error ? error.message : String(error),
createdAt: new Date().toISOString(),
},
eventId: event.id,
eventType: event.type,
url,
status: "failed",
error: error instanceof Error ? error.message : String(error),
createdAt: new Date().toISOString(),
};
}
}
export async function replayFlowEvent(
event: FlowEvent,
target: Partial<FeedFlowDispatchTarget> = {},
config: FlowDispatchConfig = {},
): Promise<FlowDispatchRecord> {
const env = config.env ?? process.env;
const dispatchUrl = targetDispatchUrl({ mode: "flow_dispatch", eventType: event.type, ...target }, env);
if (!dispatchUrl) {
return {
eventId: event.id,
eventType: event.type,
status: "skipped",
error: "flow dispatch URL is not configured",
createdAt: new Date().toISOString(),
};
}
const url = `${dispatchUrl.replace(/\/(?:events|flow-events)\/?$/, "")}/events/${encodeURIComponent(event.id)}/replay`;
const body = JSON.stringify({ wait: false });
const secret = targetDispatchSecret({ mode: "flow_dispatch", eventType: event.type, ...target }, env);
const headers: Record<string, string> = {
"content-type": "application/json",
"x-patchbay-flow-event": event.type,
"x-patchbay-flow-delivery": event.id,
};
if (secret) {
headers["x-patchbay-flow-signature-256"] =
`sha256=${await hmacSha256Hex(secret, body)}`;
}
try {
const response = await (config.fetchImpl ?? fetch)(url, {
method: "POST",
headers,
body,
});
return {
eventId: event.id,
eventType: event.type,
url,
status: response.ok ? "dispatched" : "failed",
httpStatus: response.status,
error: response.ok ? undefined : `flow replay returned ${response.status}`,
createdAt: new Date().toISOString(),
};
} catch (error) {
return {
eventId: event.id,
eventType: event.type,
url,
status: "failed",
error: error instanceof Error ? error.message : String(error),
createdAt: new Date().toISOString(),
};
}
}
export async function dispatchFlowEventForFeedSignal(
signal: FeedSignal,
config: FlowDispatchConfig = {},
): Promise<{ event?: FlowEvent<Record<string, unknown>>; record?: FlowDispatchRecord }> {
if (!isFlowDispatchTarget(signal.target)) {
return {};
}
const event = flowEventForFeedSignal(signal);
if (!event) {
return {};
}
return {
event,
record: await dispatchFlowEvent(event, signal.target, config),
};
}

View file

@ -1,4 +1,4 @@
import { appendFile, mkdir } from "node:fs/promises";
import { appendFile, mkdir, readFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { FeedJob, FeedSignal, FlowDispatchRecord, FlowEvent, GitWebhookEvent, QueuedJob } from "./types";
@ -7,6 +7,28 @@ async function appendJsonLine(path: string, value: unknown): Promise<void> {
await appendFile(path, `${JSON.stringify(value)}\n`, "utf8");
}
async function readJsonLines<T>(path: string): Promise<T[]> {
let text = "";
try {
text = await readFile(path, "utf8");
} catch (error) {
if (typeof error === "object" && error !== null && (error as { code?: unknown }).code === "ENOENT") {
return [];
}
throw error;
}
return text
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean)
.map((line) => JSON.parse(line) as T);
}
function limitNewest<T>(items: T[], limit = 50): T[] {
const safeLimit = Math.max(1, Math.min(500, Math.trunc(limit)));
return items.slice(-safeLimit).reverse();
}
export class EventStore {
readonly eventsPath: string;
readonly jobsPath: string;
@ -47,6 +69,35 @@ export class EventStore {
async appendFlowDispatch(record: FlowDispatchRecord): Promise<void> {
await appendJsonLine(this.flowDispatchesPath, record);
}
async listFlowEvents(options: { limit?: number; type?: string } = {}): Promise<FlowEvent[]> {
const events = await readJsonLines<FlowEvent>(this.flowEventsPath);
return limitNewest(
options.type ? events.filter((event) => event.type === options.type) : events,
options.limit,
);
}
async getFlowEvent(eventId: string): Promise<FlowEvent | undefined> {
const events = await readJsonLines<FlowEvent>(this.flowEventsPath);
for (let index = events.length - 1; index >= 0; index -= 1) {
if (events[index]?.id === eventId) {
return events[index];
}
}
return undefined;
}
async listFlowDispatches(options: { limit?: number; eventId?: string; status?: FlowDispatchRecord["status"] } = {}): Promise<FlowDispatchRecord[]> {
const records = await readJsonLines<FlowDispatchRecord>(this.flowDispatchesPath);
return limitNewest(
records.filter((record) =>
(!options.eventId || record.eventId === options.eventId) &&
(!options.status || record.status === options.status),
),
options.limit,
);
}
}
export function jobForEvent(event: GitWebhookEvent): QueuedJob | null {

View file

@ -1,6 +1,7 @@
import { randomUUID } from "node:crypto";
import { notifyDiscord, parseDiscordConfig, type DiscordConfig } from "./discord";
import { startFeedPolling } from "./feed";
import { dispatchFlowEvent, replayFlowEvent } from "./flow";
import { jsonResponse, methodNotAllowed, textResponse } from "./http";
import { normalizeGithubEvent } from "./providers/github";
import { normalizeJojoEvent } from "./providers/jojo";
@ -15,6 +16,7 @@ export type ServerConfig = {
jojoSecret: string;
dataDir: string;
discord?: DiscordConfig;
adminToken?: string;
};
function getHeader(headers: Headers, name: string, fallback: string): string {
@ -59,6 +61,86 @@ async function persistAcceptedEvent(store: EventStore, event: GitWebhookEvent, d
});
}
function adminAuthorized(request: Request, config: ServerConfig): boolean {
if (!config.adminToken) {
return true;
}
const bearer = request.headers.get("authorization")?.match(/^Bearer\s+(.+)$/i)?.[1];
const header = request.headers.get("x-patchbay-admin-token");
return bearer === config.adminToken || header === config.adminToken;
}
function requireAdmin(request: Request, config: ServerConfig): Response | undefined {
return adminAuthorized(request, config) ? undefined : jsonResponse({ error: "unauthorized" }, { status: 401 });
}
function numberParam(value: string | null): number | undefined {
if (!value) return undefined;
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : undefined;
}
function dispatchStatus(value: string | null) {
if (!value) return undefined;
if (value === "dispatched" || value === "failed" || value === "skipped") return value;
throw new Error("flow dispatch status must be dispatched, failed, or skipped");
}
async function handleFlowEvents(request: Request, config: ServerConfig, store: EventStore): Promise<Response> {
const unauthorized = requireAdmin(request, config);
if (unauthorized) return unauthorized;
const url = new URL(request.url);
if (request.method === "GET" && url.pathname === "/flow-events") {
return jsonResponse({
events: await store.listFlowEvents({
type: url.searchParams.get("type") ?? undefined,
limit: numberParam(url.searchParams.get("limit")),
}),
});
}
const eventMatch = url.pathname.match(/^\/flow-events\/([^/]+)(?:\/(retry|replay))?$/);
if (!eventMatch?.[1]) return jsonResponse({ error: "not_found" }, { status: 404 });
const eventId = decodeURIComponent(eventMatch[1]);
const event = await store.getFlowEvent(eventId);
if (!event) {
return jsonResponse({ error: "flow_event_not_found" }, { status: 404 });
}
if (request.method === "GET" && !eventMatch[2]) {
return jsonResponse({
event,
dispatches: await store.listFlowDispatches({ eventId, limit: numberParam(url.searchParams.get("limit")) }),
});
}
if (request.method === "POST" && eventMatch[2] === "retry") {
const record = await dispatchFlowEvent(event, {}, { env: process.env });
await store.appendFlowDispatch(record);
return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 });
}
if (request.method === "POST" && eventMatch[2] === "replay") {
const record = await replayFlowEvent(event, {}, { env: process.env });
await store.appendFlowDispatch(record);
return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 });
}
return methodNotAllowed();
}
async function handleFlowDispatches(request: Request, config: ServerConfig, store: EventStore): Promise<Response> {
const unauthorized = requireAdmin(request, config);
if (unauthorized) return unauthorized;
if (request.method !== "GET") return methodNotAllowed();
const url = new URL(request.url);
return jsonResponse({
dispatches: await store.listFlowDispatches({
eventId: url.searchParams.get("eventId") ?? undefined,
status: dispatchStatus(url.searchParams.get("status")),
limit: numberParam(url.searchParams.get("limit")),
}),
});
}
async function handleGithub(request: Request, config: ServerConfig, store: EventStore): Promise<Response> {
if (request.method !== "POST") return methodNotAllowed();
const parsed = await parseJsonBody(request);
@ -107,6 +189,12 @@ export function createHandler(config: ServerConfig): (request: Request) => Promi
if (url.pathname === "/jojo") {
return handleJojo(request, config, store);
}
if (url.pathname === "/flow-events" || url.pathname.startsWith("/flow-events/")) {
return handleFlowEvents(request, config, store);
}
if (url.pathname === "/flow-dispatches") {
return handleFlowDispatches(request, config, store);
}
return jsonResponse({ error: "not_found" }, { status: 404 });
};
}
@ -118,6 +206,7 @@ if (import.meta.main) {
githubSecret: process.env.GITHUB_WEBHOOK_SECRET ?? "",
jojoSecret: process.env.JOJO_WEBHOOK_SECRET ?? "",
dataDir: process.env.DATA_DIR ?? "./data",
adminToken: process.env.PATCHBAY_ADMIN_TOKEN,
discord: parseDiscordConfig({
webhookUrl: process.env.DISCORD_WEBHOOK_URL,
notifyEvents: process.env.DISCORD_NOTIFY_EVENTS,

View file

@ -2,6 +2,7 @@ import { mkdtemp, readFile } from "node:fs/promises";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { describe, expect, test } from "bun:test";
import { EventStore } from "../src/queue";
import { createHandler } from "../src/server";
import { hmacSha256Hex } from "../src/signatures";
@ -99,4 +100,79 @@ describe("server", () => {
globalThis.fetch = originalFetch;
}
});
test("lists, retries, and replays stored flow events behind admin auth", async () => {
const originalFetch = globalThis.fetch;
const originalDispatchUrl = process.env.PATCHBAY_FLOW_DISPATCH_URL;
const dataDir = await mkdtemp(join(tmpdir(), "patchbay-"));
const store = new EventStore(dataDir);
const event = {
id: "patchbay:source:entry:upstream.release",
type: "upstream.release",
source: "patchbay",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { repo: "openai/codex", tag: "v1.2.3" },
};
await store.appendFlowEvent(event);
await store.appendFlowDispatch({
eventId: event.id,
eventType: event.type,
status: "failed",
error: "network",
createdAt: "2026-05-13T00:00:01.000Z",
});
const calls: Array<{ url: string; body: string }> = [];
process.env.PATCHBAY_FLOW_DISPATCH_URL = "http://172.20.0.1:7345/events";
globalThis.fetch = (async (url: string | URL | Request, init?: RequestInit) => {
calls.push({ url: String(url), body: String(init?.body ?? "") });
return new Response("accepted", { status: 202 });
}) as unknown as typeof fetch;
try {
const handler = createHandler({
githubSecret: "gh",
jojoSecret: "jojo",
dataDir,
adminToken: "admin",
});
const unauthorized = await handler(new Request("http://localhost/flow-events"));
expect(unauthorized.status).toBe(401);
const list = await handler(new Request("http://localhost/flow-events", {
headers: { authorization: "Bearer admin" },
}));
expect(list.status).toBe(200);
expect(await list.json()).toMatchObject({ events: [{ id: event.id, type: event.type }] });
const dispatches = await handler(new Request("http://localhost/flow-dispatches?status=failed", {
headers: { "x-patchbay-admin-token": "admin" },
}));
expect(dispatches.status).toBe(200);
expect(await dispatches.json()).toMatchObject({ dispatches: [{ status: "failed", eventId: event.id }] });
const retry = await handler(new Request(`http://localhost/flow-events/${encodeURIComponent(event.id)}/retry`, {
method: "POST",
headers: { authorization: "Bearer admin" },
}));
expect(retry.status).toBe(202);
expect(calls.at(-1)?.url).toBe("http://172.20.0.1:7345/events");
expect(JSON.parse(calls.at(-1)?.body ?? "{}")).toMatchObject({ id: event.id });
const replay = await handler(new Request(`http://localhost/flow-events/${encodeURIComponent(event.id)}/replay`, {
method: "POST",
headers: { authorization: "Bearer admin" },
}));
expect(replay.status).toBe(202);
expect(calls.at(-1)?.url).toBe(`http://172.20.0.1:7345/events/${encodeURIComponent(event.id)}/replay`);
} finally {
globalThis.fetch = originalFetch;
if (originalDispatchUrl === undefined) {
delete process.env.PATCHBAY_FLOW_DISPATCH_URL;
} else {
process.env.PATCHBAY_FLOW_DISPATCH_URL = originalDispatchUrl;
}
}
});
});