From 65e6ca8bac7c8c30162bce3e306d03625872116f Mon Sep 17 00:00:00 2001 From: matamune Date: Wed, 13 May 2026 02:42:02 +0000 Subject: [PATCH] Dispatch feed releases as flow events --- README.md | 9 ++- feed-sources.json | 12 ++-- src/feed.ts | 39 +++++++++-- src/flow.ts | 167 ++++++++++++++++++++++++++++++++++++++++++++++ src/queue.ts | 14 +++- src/server.ts | 3 + src/types.ts | 42 ++++++++++-- test/feed.test.ts | 60 +++++++++++++++++ 8 files changed, 329 insertions(+), 17 deletions(-) create mode 100644 src/flow.ts diff --git a/README.md b/README.md index 651edfd..5b97df4 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,8 @@ GITHUB_WEBHOOK_SECRET=... DISCORD_WEBHOOK_URL= DISCORD_NOTIFY_EVENTS=push,pull_request,release FEED_SOURCES_PATH=./feed-sources.json +PATCHBAY_FLOW_DISPATCH_URL= +PATCHBAY_FLOW_DISPATCH_SECRET= ``` Discord notifications are optional. When `DISCORD_WEBHOOK_URL` is unset, the @@ -40,5 +42,8 @@ items are appended to `DATA_DIR/jobs.jsonl`. Feed watcher events are configured in `feed-sources.json`. The first poll primes `DATA_DIR/feed-state.json`; later polls append upstream activity to -`DATA_DIR/feed-events.jsonl` and release-triggered fork sync work to -`DATA_DIR/feed-jobs.jsonl`. +`DATA_DIR/feed-events.jsonl`. Targets using `mode: "fork_sync"` append legacy +work to `DATA_DIR/feed-jobs.jsonl`. Targets using `mode: "flow_dispatch"` +append generic flow events to `DATA_DIR/flow-events.jsonl`, POST them to the +configured dispatch URL, and record dispatch outcomes in +`DATA_DIR/flow-dispatches.jsonl`. diff --git a/feed-sources.json b/feed-sources.json index 7e17892..0b180e2 100644 --- a/feed-sources.json +++ b/feed-sources.json @@ -73,10 +73,14 @@ "defaultBranch": "main" }, "target": { - "provider": "github", - "repoFullName": "peezy-tech/codex", - "branch": "main", - "mode": "fork_sync" + "mode": "flow_dispatch", + "eventType": "upstream.release", + "dispatchUrlEnv": "PATCHBAY_FLOW_DISPATCH_URL", + "dispatchSecretEnv": "PATCHBAY_FLOW_DISPATCH_SECRET", + "payload": { + "provider": "github", + "repo": "openai/codex" + } }, "pollIntervalSeconds": 300 } diff --git a/src/feed.ts b/src/feed.ts index 7b5d481..9daf589 100644 --- a/src/feed.ts +++ b/src/feed.ts @@ -2,6 +2,7 @@ import { readFile, writeFile } from "node:fs/promises"; import { dirname, join } from "node:path"; import { mkdir } from "node:fs/promises"; import { notifyDiscord, type DiscordConfig } from "./discord"; +import { dispatchFlowEventForFeedSignal, type FlowDispatchConfig } from "./flow"; import { EventStore, jobForFeedSignal } from "./queue"; import type { FeedEventName, FeedSignal, FeedSourceConfig } from "./types"; @@ -23,6 +24,7 @@ type FeedPollerConfig = { dataDir: string; sourcesPath: string; discord?: DiscordConfig; + flowDispatch?: FlowDispatchConfig; }; type FetchLike = (url: string, init?: RequestInit) => Promise; @@ -162,8 +164,9 @@ export async function pollFeedSource(input: { statePath: string; store: EventStore; discord?: DiscordConfig; + flowDispatch?: FlowDispatchConfig; fetchImpl?: FetchLike; -}): Promise<{ signals: FeedSignal[]; jobs: number; primed: boolean }> { +}): Promise<{ signals: FeedSignal[]; jobs: number; flowDispatches: number; primed: boolean }> { const response = await (input.fetchImpl ?? fetch)(input.source.url, { headers: { accept: "application/atom+xml, application/rss+xml, application/xml, text/xml;q=0.9" }, }); @@ -178,6 +181,7 @@ export async function pollFeedSource(input: { const selectedEntries = primed && input.source.primeOnly !== false ? [] : unseenEntries(entries, previous?.lastSeenId); const signals: FeedSignal[] = []; let jobs = 0; + let flowDispatches = 0; for (const entry of selectedEntries) { const signal = signalFromEntry(input.source, entry); @@ -187,9 +191,28 @@ export async function pollFeedSource(input: { await input.store.appendFeedJob(job); jobs += 1; } + const flowDispatch = await dispatchFlowEventForFeedSignal(signal, input.flowDispatch); + if (flowDispatch.event) { + await input.store.appendFlowEvent(flowDispatch.event); + } + if (flowDispatch.record) { + await input.store.appendFlowDispatch(flowDispatch.record); + if (flowDispatch.record.status === "dispatched") { + flowDispatches += 1; + } + } await notifyDiscord(input.discord ?? { notifyEvents: new Set() }, { signal, job }); signals.push(signal); - console.log(JSON.stringify({ type: "feed.accepted", sourceId: signal.sourceId, provider: signal.provider, event: signal.event, entryId: signal.entryId, job: job?.id })); + console.log(JSON.stringify({ + type: "feed.accepted", + sourceId: signal.sourceId, + provider: signal.provider, + event: signal.event, + entryId: signal.entryId, + job: job?.id, + flowEvent: flowDispatch.event?.id, + flowDispatch: flowDispatch.record?.status, + })); } if (newestId) { @@ -200,7 +223,7 @@ export async function pollFeedSource(input: { await saveState(input.statePath, input.state); } - return { signals, jobs, primed }; + return { signals, jobs, flowDispatches, primed }; } export async function pollFeedsOnce(config: FeedPollerConfig, fetchImpl?: FetchLike): Promise { @@ -211,7 +234,15 @@ export async function pollFeedsOnce(config: FeedPollerConfig, fetchImpl?: FetchL for (const source of sources) { try { - await pollFeedSource({ source, state, statePath, store, discord: config.discord, fetchImpl }); + await pollFeedSource({ + source, + state, + statePath, + store, + discord: config.discord, + flowDispatch: config.flowDispatch, + fetchImpl, + }); } catch (error) { console.error(JSON.stringify({ type: "feed.poll_failed", diff --git a/src/flow.ts b/src/flow.ts new file mode 100644 index 0000000..d7e1e36 --- /dev/null +++ b/src/flow.ts @@ -0,0 +1,167 @@ +import { hmacSha256Hex } from "./signatures"; +import type { + FeedFlowDispatchTarget, + FeedSignal, + FlowDispatchRecord, + FlowEvent, +} from "./types"; + +type FetchLike = (url: string, init: RequestInit) => Promise; + +export type FlowDispatchConfig = { + env?: Record; + fetchImpl?: FetchLike; +}; + +function isFlowDispatchTarget(value: unknown): value is FeedFlowDispatchTarget { + return ( + typeof value === "object" && + value !== null && + (value as { mode?: unknown }).mode === "flow_dispatch" + ); +} + +function tagFromSignal(signal: FeedSignal): string | undefined { + if (signal.event !== "release") { + return undefined; + } + const ref = signal.ref?.trim(); + if (ref) { + return ref; + } + return signal.title.trim() || undefined; +} + +function flowPayloadFromSignal(signal: FeedSignal): Record { + return { + provider: signal.provider, + event: signal.event, + sourceId: signal.sourceId, + entryId: signal.entryId, + title: signal.title, + url: signal.url, + author: signal.author, + publishedAt: signal.publishedAt, + repo: signal.repo.fullName, + repoOwner: signal.repo.owner, + repoName: signal.repo.name, + ref: signal.ref, + sha: signal.sha, + tag: tagFromSignal(signal), + raw: signal.raw, + }; +} + +export function flowEventForFeedSignal( + signal: FeedSignal, + receivedAt = new Date().toISOString(), +): FlowEvent> | undefined { + if (!isFlowDispatchTarget(signal.target)) { + return undefined; + } + + return { + id: `patchbay:${signal.sourceId}:${signal.entryId}:${signal.target.eventType}`, + type: signal.target.eventType, + source: "patchbay", + occurredAt: signal.publishedAt, + receivedAt, + payload: { + ...flowPayloadFromSignal(signal), + ...(signal.target.payload ?? {}), + }, + }; +} + +function targetDispatchUrl( + target: FeedFlowDispatchTarget, + env: Record, +): string | undefined { + const explicit = target.dispatchUrl?.trim(); + if (explicit) { + return explicit; + } + const envName = target.dispatchUrlEnv?.trim() || "PATCHBAY_FLOW_DISPATCH_URL"; + return env[envName]?.trim() || undefined; +} + +function targetDispatchSecret( + target: FeedFlowDispatchTarget, + env: Record, +): string | undefined { + const envName = target.dispatchSecretEnv?.trim() || "PATCHBAY_FLOW_DISPATCH_SECRET"; + return env[envName]?.trim() || undefined; +} + +export async function dispatchFlowEventForFeedSignal( + signal: FeedSignal, + config: FlowDispatchConfig = {}, +): Promise<{ event?: FlowEvent>; record?: FlowDispatchRecord }> { + if (!isFlowDispatchTarget(signal.target)) { + return {}; + } + + const event = flowEventForFeedSignal(signal); + if (!event) { + return {}; + } + + const env = config.env ?? process.env; + const url = targetDispatchUrl(signal.target, env); + if (!url) { + return { + event, + record: { + eventId: event.id, + eventType: event.type, + status: "skipped", + error: "flow dispatch URL is not configured", + createdAt: new Date().toISOString(), + }, + }; + } + + const body = JSON.stringify(event); + const secret = targetDispatchSecret(signal.target, env); + const headers: Record = { + "content-type": "application/json", + "x-patchbay-flow-event": event.type, + "x-patchbay-flow-delivery": event.id, + }; + if (secret) { + headers["x-patchbay-flow-signature-256"] = + `sha256=${await hmacSha256Hex(secret, body)}`; + } + + try { + const response = await (config.fetchImpl ?? fetch)(url, { + method: "POST", + headers, + body, + }); + return { + event, + record: { + eventId: event.id, + eventType: event.type, + url, + status: response.ok ? "dispatched" : "failed", + httpStatus: response.status, + error: response.ok ? undefined : `flow dispatch returned ${response.status}`, + createdAt: new Date().toISOString(), + }, + }; + } catch (error) { + return { + event, + record: { + eventId: event.id, + eventType: event.type, + url, + status: "failed", + error: error instanceof Error ? error.message : String(error), + createdAt: new Date().toISOString(), + }, + }; + } +} diff --git a/src/queue.ts b/src/queue.ts index ebca5b4..75e872e 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,6 +1,6 @@ import { appendFile, mkdir } from "node:fs/promises"; import { dirname, join } from "node:path"; -import type { FeedJob, FeedSignal, GitWebhookEvent, QueuedJob } from "./types"; +import type { FeedJob, FeedSignal, FlowDispatchRecord, FlowEvent, GitWebhookEvent, QueuedJob } from "./types"; async function appendJsonLine(path: string, value: unknown): Promise { await mkdir(dirname(path), { recursive: true }); @@ -12,12 +12,16 @@ export class EventStore { readonly jobsPath: string; readonly feedEventsPath: string; readonly feedJobsPath: string; + readonly flowEventsPath: string; + readonly flowDispatchesPath: string; constructor(dataDir: string) { this.eventsPath = join(dataDir, "events.jsonl"); this.jobsPath = join(dataDir, "jobs.jsonl"); this.feedEventsPath = join(dataDir, "feed-events.jsonl"); this.feedJobsPath = join(dataDir, "feed-jobs.jsonl"); + this.flowEventsPath = join(dataDir, "flow-events.jsonl"); + this.flowDispatchesPath = join(dataDir, "flow-dispatches.jsonl"); } async appendEvent(event: GitWebhookEvent): Promise { @@ -35,6 +39,14 @@ export class EventStore { async appendFeedJob(job: FeedJob): Promise { await appendJsonLine(this.feedJobsPath, job); } + + async appendFlowEvent(event: FlowEvent): Promise { + await appendJsonLine(this.flowEventsPath, event); + } + + async appendFlowDispatch(record: FlowDispatchRecord): Promise { + await appendJsonLine(this.flowDispatchesPath, record); + } } export function jobForEvent(event: GitWebhookEvent): QueuedJob | null { diff --git a/src/server.ts b/src/server.ts index 6526518..075f8b2 100644 --- a/src/server.ts +++ b/src/server.ts @@ -129,6 +129,9 @@ if (import.meta.main) { dataDir: config.dataDir, sourcesPath: process.env.FEED_SOURCES_PATH, discord: config.discord, + flowDispatch: { + env: process.env, + }, }).catch((error) => { console.error(JSON.stringify({ type: "feed.start_failed", error: error instanceof Error ? error.message : String(error) })); }); diff --git a/src/types.ts b/src/types.ts index 3b8ee27..0fb1db3 100644 --- a/src/types.ts +++ b/src/types.ts @@ -48,6 +48,22 @@ export type FeedProvider = "codeberg" | "github" | "jojo"; export type FeedEventName = "push" | "release"; +export type FeedForkSyncTarget = { + provider: FeedProvider; + repoFullName: string; + branch: string; + mode: "notify_only" | "fork_sync"; +}; + +export type FeedFlowDispatchTarget = { + mode: "flow_dispatch"; + eventType: string; + dispatchUrl?: string; + dispatchUrlEnv?: string; + dispatchSecretEnv?: string; + payload?: Record; +}; + export type FeedSourceConfig = { id: string; provider: FeedProvider; @@ -60,12 +76,7 @@ export type FeedSourceConfig = { webUrl: string; defaultBranch?: string; }; - target?: { - provider: FeedProvider; - repoFullName: string; - branch: string; - mode: "notify_only" | "fork_sync"; - }; + target?: FeedForkSyncTarget | FeedFlowDispatchTarget; pollIntervalSeconds?: number; primeOnly?: boolean; }; @@ -100,3 +111,22 @@ export type FeedJob = { url?: string; createdAt: string; }; + +export type FlowEvent = { + id: string; + type: string; + source?: string; + occurredAt?: string; + receivedAt: string; + payload: T; +}; + +export type FlowDispatchRecord = { + eventId: string; + eventType: string; + url?: string; + status: "dispatched" | "failed" | "skipped"; + httpStatus?: number; + error?: string; + createdAt: string; +}; diff --git a/test/feed.test.ts b/test/feed.test.ts index b4ff84b..4826b95 100644 --- a/test/feed.test.ts +++ b/test/feed.test.ts @@ -137,4 +137,64 @@ describe("feed watcher", () => { expect(await readFile(join(dataDir, "feed-jobs.jsonl"), "utf8")).toContain("\"kind\":\"fork_sync\""); expect(feedCalls).toBe(1); }); + + test("later polls dispatch generic flow events", async () => { + const dataDir = await mkdtemp(join(tmpdir(), "patchbay-feed-")); + const sourcesPath = join(dataDir, "sources.json"); + const releaseSource: FeedSourceConfig = { + ...source, + id: "github-openai-codex-releases", + url: "https://github.com/openai/codex/releases.atom", + event: "release", + target: { + mode: "flow_dispatch", + eventType: "upstream.release", + dispatchUrlEnv: "FLOW_URL", + dispatchSecretEnv: "FLOW_SECRET", + payload: { + repo: "openai/codex", + provider: "github", + }, + }, + }; + await writeFile(sourcesPath, JSON.stringify({ sources: [releaseSource] }), "utf8"); + await writeFile(join(dataDir, "feed-state.json"), JSON.stringify({ + "github-openai-codex-releases": { + lastSeenId: "older-release", + lastCheckedAt: "2026-05-12T09:00:00.000Z", + }, + }), "utf8"); + + let dispatchedBody = ""; + let dispatchedSignature = ""; + await pollFeedsOnce({ + dataDir, + sourcesPath, + discord: { notifyEvents: new Set(["release"]) }, + flowDispatch: { + env: { + FLOW_URL: "https://flow.example/events", + FLOW_SECRET: "secret", + }, + fetchImpl: async (_url, init) => { + dispatchedBody = String(init.body); + const headers = init.headers as Record; + dispatchedSignature = String(headers["x-patchbay-flow-signature-256"]); + return new Response(null, { status: 202 }); + }, + }, + }, async () => { + return new Response(rss, { status: 200 }); + }); + + const flowEventText = await readFile(join(dataDir, "flow-events.jsonl"), "utf8"); + const flowEvent = JSON.parse(flowEventText.trim()) as Record; + expect(flowEvent.type).toBe("upstream.release"); + expect(flowEvent.source).toBe("patchbay"); + expect(flowEvent.payload.repo).toBe("openai/codex"); + expect(flowEvent.payload.tag).toBe("v1.2.3"); + expect(JSON.parse(dispatchedBody).id).toBe(flowEvent.id); + expect(dispatchedSignature).toMatch(/^sha256=[0-9a-f]{64}$/); + expect(await readFile(join(dataDir, "flow-dispatches.jsonl"), "utf8")).toContain("\"status\":\"dispatched\""); + }); });