Dispatch feed releases as flow events
All checks were successful
check / check (push) Successful in 35s

This commit is contained in:
matamune 2026-05-13 02:42:02 +00:00
parent 7261b48fa1
commit 65e6ca8bac
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
8 changed files with 329 additions and 17 deletions

View file

@ -21,6 +21,8 @@ GITHUB_WEBHOOK_SECRET=...
DISCORD_WEBHOOK_URL=
DISCORD_NOTIFY_EVENTS=push,pull_request,release
FEED_SOURCES_PATH=./feed-sources.json
PATCHBAY_FLOW_DISPATCH_URL=
PATCHBAY_FLOW_DISPATCH_SECRET=
```
Discord notifications are optional. When `DISCORD_WEBHOOK_URL` is unset, the
@ -40,5 +42,8 @@ items are appended to `DATA_DIR/jobs.jsonl`.
Feed watcher events are configured in `feed-sources.json`. The first poll primes
`DATA_DIR/feed-state.json`; later polls append upstream activity to
`DATA_DIR/feed-events.jsonl` and release-triggered fork sync work to
`DATA_DIR/feed-jobs.jsonl`.
`DATA_DIR/feed-events.jsonl`. Targets using `mode: "fork_sync"` append legacy
work to `DATA_DIR/feed-jobs.jsonl`. Targets using `mode: "flow_dispatch"`
append generic flow events to `DATA_DIR/flow-events.jsonl`, POST them to the
configured dispatch URL, and record dispatch outcomes in
`DATA_DIR/flow-dispatches.jsonl`.

View file

@ -73,10 +73,14 @@
"defaultBranch": "main"
},
"target": {
"provider": "github",
"repoFullName": "peezy-tech/codex",
"branch": "main",
"mode": "fork_sync"
"mode": "flow_dispatch",
"eventType": "upstream.release",
"dispatchUrlEnv": "PATCHBAY_FLOW_DISPATCH_URL",
"dispatchSecretEnv": "PATCHBAY_FLOW_DISPATCH_SECRET",
"payload": {
"provider": "github",
"repo": "openai/codex"
}
},
"pollIntervalSeconds": 300
}

View file

@ -2,6 +2,7 @@ import { readFile, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { mkdir } from "node:fs/promises";
import { notifyDiscord, type DiscordConfig } from "./discord";
import { dispatchFlowEventForFeedSignal, type FlowDispatchConfig } from "./flow";
import { EventStore, jobForFeedSignal } from "./queue";
import type { FeedEventName, FeedSignal, FeedSourceConfig } from "./types";
@ -23,6 +24,7 @@ type FeedPollerConfig = {
dataDir: string;
sourcesPath: string;
discord?: DiscordConfig;
flowDispatch?: FlowDispatchConfig;
};
type FetchLike = (url: string, init?: RequestInit) => Promise<Response>;
@ -162,8 +164,9 @@ export async function pollFeedSource(input: {
statePath: string;
store: EventStore;
discord?: DiscordConfig;
flowDispatch?: FlowDispatchConfig;
fetchImpl?: FetchLike;
}): Promise<{ signals: FeedSignal[]; jobs: number; primed: boolean }> {
}): Promise<{ signals: FeedSignal[]; jobs: number; flowDispatches: number; primed: boolean }> {
const response = await (input.fetchImpl ?? fetch)(input.source.url, {
headers: { accept: "application/atom+xml, application/rss+xml, application/xml, text/xml;q=0.9" },
});
@ -178,6 +181,7 @@ export async function pollFeedSource(input: {
const selectedEntries = primed && input.source.primeOnly !== false ? [] : unseenEntries(entries, previous?.lastSeenId);
const signals: FeedSignal[] = [];
let jobs = 0;
let flowDispatches = 0;
for (const entry of selectedEntries) {
const signal = signalFromEntry(input.source, entry);
@ -187,9 +191,28 @@ export async function pollFeedSource(input: {
await input.store.appendFeedJob(job);
jobs += 1;
}
const flowDispatch = await dispatchFlowEventForFeedSignal(signal, input.flowDispatch);
if (flowDispatch.event) {
await input.store.appendFlowEvent(flowDispatch.event);
}
if (flowDispatch.record) {
await input.store.appendFlowDispatch(flowDispatch.record);
if (flowDispatch.record.status === "dispatched") {
flowDispatches += 1;
}
}
await notifyDiscord(input.discord ?? { notifyEvents: new Set() }, { signal, job });
signals.push(signal);
console.log(JSON.stringify({ type: "feed.accepted", sourceId: signal.sourceId, provider: signal.provider, event: signal.event, entryId: signal.entryId, job: job?.id }));
console.log(JSON.stringify({
type: "feed.accepted",
sourceId: signal.sourceId,
provider: signal.provider,
event: signal.event,
entryId: signal.entryId,
job: job?.id,
flowEvent: flowDispatch.event?.id,
flowDispatch: flowDispatch.record?.status,
}));
}
if (newestId) {
@ -200,7 +223,7 @@ export async function pollFeedSource(input: {
await saveState(input.statePath, input.state);
}
return { signals, jobs, primed };
return { signals, jobs, flowDispatches, primed };
}
export async function pollFeedsOnce(config: FeedPollerConfig, fetchImpl?: FetchLike): Promise<void> {
@ -211,7 +234,15 @@ export async function pollFeedsOnce(config: FeedPollerConfig, fetchImpl?: FetchL
for (const source of sources) {
try {
await pollFeedSource({ source, state, statePath, store, discord: config.discord, fetchImpl });
await pollFeedSource({
source,
state,
statePath,
store,
discord: config.discord,
flowDispatch: config.flowDispatch,
fetchImpl,
});
} catch (error) {
console.error(JSON.stringify({
type: "feed.poll_failed",

167
src/flow.ts Normal file
View file

@ -0,0 +1,167 @@
import { hmacSha256Hex } from "./signatures";
import type {
FeedFlowDispatchTarget,
FeedSignal,
FlowDispatchRecord,
FlowEvent,
} from "./types";
type FetchLike = (url: string, init: RequestInit) => Promise<Response>;
export type FlowDispatchConfig = {
env?: Record<string, string | undefined>;
fetchImpl?: FetchLike;
};
function isFlowDispatchTarget(value: unknown): value is FeedFlowDispatchTarget {
return (
typeof value === "object" &&
value !== null &&
(value as { mode?: unknown }).mode === "flow_dispatch"
);
}
function tagFromSignal(signal: FeedSignal): string | undefined {
if (signal.event !== "release") {
return undefined;
}
const ref = signal.ref?.trim();
if (ref) {
return ref;
}
return signal.title.trim() || undefined;
}
function flowPayloadFromSignal(signal: FeedSignal): Record<string, unknown> {
return {
provider: signal.provider,
event: signal.event,
sourceId: signal.sourceId,
entryId: signal.entryId,
title: signal.title,
url: signal.url,
author: signal.author,
publishedAt: signal.publishedAt,
repo: signal.repo.fullName,
repoOwner: signal.repo.owner,
repoName: signal.repo.name,
ref: signal.ref,
sha: signal.sha,
tag: tagFromSignal(signal),
raw: signal.raw,
};
}
export function flowEventForFeedSignal(
signal: FeedSignal,
receivedAt = new Date().toISOString(),
): FlowEvent<Record<string, unknown>> | undefined {
if (!isFlowDispatchTarget(signal.target)) {
return undefined;
}
return {
id: `patchbay:${signal.sourceId}:${signal.entryId}:${signal.target.eventType}`,
type: signal.target.eventType,
source: "patchbay",
occurredAt: signal.publishedAt,
receivedAt,
payload: {
...flowPayloadFromSignal(signal),
...(signal.target.payload ?? {}),
},
};
}
function targetDispatchUrl(
target: FeedFlowDispatchTarget,
env: Record<string, string | undefined>,
): string | undefined {
const explicit = target.dispatchUrl?.trim();
if (explicit) {
return explicit;
}
const envName = target.dispatchUrlEnv?.trim() || "PATCHBAY_FLOW_DISPATCH_URL";
return env[envName]?.trim() || undefined;
}
function targetDispatchSecret(
target: FeedFlowDispatchTarget,
env: Record<string, string | undefined>,
): string | undefined {
const envName = target.dispatchSecretEnv?.trim() || "PATCHBAY_FLOW_DISPATCH_SECRET";
return env[envName]?.trim() || undefined;
}
export async function dispatchFlowEventForFeedSignal(
signal: FeedSignal,
config: FlowDispatchConfig = {},
): Promise<{ event?: FlowEvent<Record<string, unknown>>; record?: FlowDispatchRecord }> {
if (!isFlowDispatchTarget(signal.target)) {
return {};
}
const event = flowEventForFeedSignal(signal);
if (!event) {
return {};
}
const env = config.env ?? process.env;
const url = targetDispatchUrl(signal.target, env);
if (!url) {
return {
event,
record: {
eventId: event.id,
eventType: event.type,
status: "skipped",
error: "flow dispatch URL is not configured",
createdAt: new Date().toISOString(),
},
};
}
const body = JSON.stringify(event);
const secret = targetDispatchSecret(signal.target, env);
const headers: Record<string, string> = {
"content-type": "application/json",
"x-patchbay-flow-event": event.type,
"x-patchbay-flow-delivery": event.id,
};
if (secret) {
headers["x-patchbay-flow-signature-256"] =
`sha256=${await hmacSha256Hex(secret, body)}`;
}
try {
const response = await (config.fetchImpl ?? fetch)(url, {
method: "POST",
headers,
body,
});
return {
event,
record: {
eventId: event.id,
eventType: event.type,
url,
status: response.ok ? "dispatched" : "failed",
httpStatus: response.status,
error: response.ok ? undefined : `flow dispatch returned ${response.status}`,
createdAt: new Date().toISOString(),
},
};
} catch (error) {
return {
event,
record: {
eventId: event.id,
eventType: event.type,
url,
status: "failed",
error: error instanceof Error ? error.message : String(error),
createdAt: new Date().toISOString(),
},
};
}
}

View file

@ -1,6 +1,6 @@
import { appendFile, mkdir } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { FeedJob, FeedSignal, GitWebhookEvent, QueuedJob } from "./types";
import type { FeedJob, FeedSignal, FlowDispatchRecord, FlowEvent, GitWebhookEvent, QueuedJob } from "./types";
async function appendJsonLine(path: string, value: unknown): Promise<void> {
await mkdir(dirname(path), { recursive: true });
@ -12,12 +12,16 @@ export class EventStore {
readonly jobsPath: string;
readonly feedEventsPath: string;
readonly feedJobsPath: string;
readonly flowEventsPath: string;
readonly flowDispatchesPath: string;
constructor(dataDir: string) {
this.eventsPath = join(dataDir, "events.jsonl");
this.jobsPath = join(dataDir, "jobs.jsonl");
this.feedEventsPath = join(dataDir, "feed-events.jsonl");
this.feedJobsPath = join(dataDir, "feed-jobs.jsonl");
this.flowEventsPath = join(dataDir, "flow-events.jsonl");
this.flowDispatchesPath = join(dataDir, "flow-dispatches.jsonl");
}
async appendEvent(event: GitWebhookEvent): Promise<void> {
@ -35,6 +39,14 @@ export class EventStore {
async appendFeedJob(job: FeedJob): Promise<void> {
await appendJsonLine(this.feedJobsPath, job);
}
async appendFlowEvent(event: FlowEvent): Promise<void> {
await appendJsonLine(this.flowEventsPath, event);
}
async appendFlowDispatch(record: FlowDispatchRecord): Promise<void> {
await appendJsonLine(this.flowDispatchesPath, record);
}
}
export function jobForEvent(event: GitWebhookEvent): QueuedJob | null {

View file

@ -129,6 +129,9 @@ if (import.meta.main) {
dataDir: config.dataDir,
sourcesPath: process.env.FEED_SOURCES_PATH,
discord: config.discord,
flowDispatch: {
env: process.env,
},
}).catch((error) => {
console.error(JSON.stringify({ type: "feed.start_failed", error: error instanceof Error ? error.message : String(error) }));
});

View file

@ -48,6 +48,22 @@ export type FeedProvider = "codeberg" | "github" | "jojo";
export type FeedEventName = "push" | "release";
export type FeedForkSyncTarget = {
provider: FeedProvider;
repoFullName: string;
branch: string;
mode: "notify_only" | "fork_sync";
};
export type FeedFlowDispatchTarget = {
mode: "flow_dispatch";
eventType: string;
dispatchUrl?: string;
dispatchUrlEnv?: string;
dispatchSecretEnv?: string;
payload?: Record<string, unknown>;
};
export type FeedSourceConfig = {
id: string;
provider: FeedProvider;
@ -60,12 +76,7 @@ export type FeedSourceConfig = {
webUrl: string;
defaultBranch?: string;
};
target?: {
provider: FeedProvider;
repoFullName: string;
branch: string;
mode: "notify_only" | "fork_sync";
};
target?: FeedForkSyncTarget | FeedFlowDispatchTarget;
pollIntervalSeconds?: number;
primeOnly?: boolean;
};
@ -100,3 +111,22 @@ export type FeedJob = {
url?: string;
createdAt: string;
};
export type FlowEvent<T = unknown> = {
id: string;
type: string;
source?: string;
occurredAt?: string;
receivedAt: string;
payload: T;
};
export type FlowDispatchRecord = {
eventId: string;
eventType: string;
url?: string;
status: "dispatched" | "failed" | "skipped";
httpStatus?: number;
error?: string;
createdAt: string;
};

View file

@ -137,4 +137,64 @@ describe("feed watcher", () => {
expect(await readFile(join(dataDir, "feed-jobs.jsonl"), "utf8")).toContain("\"kind\":\"fork_sync\"");
expect(feedCalls).toBe(1);
});
test("later polls dispatch generic flow events", async () => {
const dataDir = await mkdtemp(join(tmpdir(), "patchbay-feed-"));
const sourcesPath = join(dataDir, "sources.json");
const releaseSource: FeedSourceConfig = {
...source,
id: "github-openai-codex-releases",
url: "https://github.com/openai/codex/releases.atom",
event: "release",
target: {
mode: "flow_dispatch",
eventType: "upstream.release",
dispatchUrlEnv: "FLOW_URL",
dispatchSecretEnv: "FLOW_SECRET",
payload: {
repo: "openai/codex",
provider: "github",
},
},
};
await writeFile(sourcesPath, JSON.stringify({ sources: [releaseSource] }), "utf8");
await writeFile(join(dataDir, "feed-state.json"), JSON.stringify({
"github-openai-codex-releases": {
lastSeenId: "older-release",
lastCheckedAt: "2026-05-12T09:00:00.000Z",
},
}), "utf8");
let dispatchedBody = "";
let dispatchedSignature = "";
await pollFeedsOnce({
dataDir,
sourcesPath,
discord: { notifyEvents: new Set(["release"]) },
flowDispatch: {
env: {
FLOW_URL: "https://flow.example/events",
FLOW_SECRET: "secret",
},
fetchImpl: async (_url, init) => {
dispatchedBody = String(init.body);
const headers = init.headers as Record<string, string>;
dispatchedSignature = String(headers["x-patchbay-flow-signature-256"]);
return new Response(null, { status: 202 });
},
},
}, async () => {
return new Response(rss, { status: 200 });
});
const flowEventText = await readFile(join(dataDir, "flow-events.jsonl"), "utf8");
const flowEvent = JSON.parse(flowEventText.trim()) as Record<string, any>;
expect(flowEvent.type).toBe("upstream.release");
expect(flowEvent.source).toBe("patchbay");
expect(flowEvent.payload.repo).toBe("openai/codex");
expect(flowEvent.payload.tag).toBe("v1.2.3");
expect(JSON.parse(dispatchedBody).id).toBe(flowEvent.id);
expect(dispatchedSignature).toMatch(/^sha256=[0-9a-f]{64}$/);
expect(await readFile(join(dataDir, "flow-dispatches.jsonl"), "utf8")).toContain("\"status\":\"dispatched\"");
});
});