From 8bb60720062fc3a12dcb4524d6ba2a875b007a6f Mon Sep 17 00:00:00 2001 From: matamune Date: Wed, 13 May 2026 03:49:06 +0000 Subject: [PATCH] Add flow backend inspection and replay --- README.md | 7 ++ .../flow-backend-systemd-local/src/backend.ts | 39 +++++--- apps/flow-backend-systemd-local/src/config.ts | 87 +++++++++++++++- apps/flow-backend-systemd-local/src/index.ts | 76 +++++++++++--- apps/flow-backend-systemd-local/src/server.ts | 88 +++++++++++++++-- apps/flow-backend-systemd-local/src/store.ts | 99 ++++++++++++++++++- .../test/backend.test.ts | 52 +++++++++- docs/flows.md | 43 +++++++- 8 files changed, 457 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index c0d1570..dd1daf1 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ the Codex release flows. ```bash bun run flow list bun run flow:backend serve --cwd "$(pwd)" +bun run flow:backend list-events --limit 20 +bun run flow:backend list-runs --status failed ``` Code Mode flow steps are present on `main` behind one mode flag: @@ -86,6 +88,11 @@ That mode enables `runner = "code-mode"` steps and makes stdio app-server launches default to `bunx @peezy.tech/codex`. `CODEX_APP_SERVER_CODEX_COMMAND` still wins when a specific local binary should be used. +For release readiness, inspect and replay stored events with +`bun run flow:backend show-event`, `show-run`, and `replay-event`. Do not run a +fabricated full `openai/codex` release lifecycle; the first full lifecycle test +should happen on the next real upstream release. + ## Development Flow Development happens on jojo at `jojo.build`. Codeberg is configured as a push mirror, and GitHub is kept for npm trusted publishing only. diff --git a/apps/flow-backend-systemd-local/src/backend.ts b/apps/flow-backend-systemd-local/src/backend.ts index d268468..28e4338 100644 --- a/apps/flow-backend-systemd-local/src/backend.ts +++ b/apps/flow-backend-systemd-local/src/backend.ts @@ -18,6 +18,7 @@ export type DispatchFlowEventOptions = { event: FlowEvent; wait?: boolean; env?: Record; + replay?: boolean; }; export type DispatchFlowEventResult = { @@ -28,8 +29,8 @@ export type DispatchFlowEventResult = { }; export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Promise { - const inserted = options.store.insertEvent(options.event); - if (!inserted) { + const inserted = options.replay ? false : options.store.insertEvent(options.event); + if (!inserted && !options.replay) { return { status: "duplicate", eventId: options.event.id, @@ -38,12 +39,13 @@ export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Prom }; } - const eventPath = await writeEventFile(options.config.dataDir, options.event); + const eventPath = await writeEventFile(options.config.dataDir, options.event, options.replay ? "replay" : undefined); const flows = await discoverFlows({ cwd: options.config.cwd }); const matches = await matchingSteps(flows, options.event); const promises: Array> = []; + const replayNonce = options.replay ? `${Date.now()}:${Math.random()}` : undefined; for (const match of matches) { - const run = createRunRecord(options.config, options.event, match.flow, match.step, eventPath); + const run = createRunRecord(options.config, options.event, match.flow, match.step, eventPath, replayNonce); options.store.createRun(run); const promise = executeAndRecord({ config: options.config, @@ -70,11 +72,25 @@ export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Prom return { status: "accepted", eventId: options.event.id, - runIds: matches.map((match) => runId(options.event.id, match.flow.manifest.name, match.step.name)), + runIds: matches.map((match) => runId(options.event.id, match.flow.manifest.name, match.step.name, replayNonce)), matched: matches.length, }; } +export async function replayFlowEvent(options: Omit & { + eventId: string; +}): Promise { + const event = options.store.getEvent(options.eventId); + if (!event) { + throw new Error(`Unknown event: ${options.eventId}`); + } + return dispatchFlowEvent({ + ...options, + event: event.raw, + replay: true, + }); +} + export async function readFlowEvent(pathValue: string): Promise { return normalizeFlowEvent(JSON.parse(await Bun.file(path.resolve(pathValue)).text()) as unknown); } @@ -133,9 +149,10 @@ function createRunRecord( flow: LoadedFlow, step: FlowStep, eventPath: string, + replayNonce?: string, ): FlowRunRecord { return { - id: runId(event.id, flow.manifest.name, step.name), + id: runId(event.id, flow.manifest.name, step.name, replayNonce), eventId: event.id, flowName: flow.manifest.name, stepName: step.name, @@ -147,18 +164,18 @@ function createRunRecord( }; } -function runId(eventId: string, flowName: string, stepName: string): string { +function runId(eventId: string, flowName: string, stepName: string, replayNonce?: string): string { const hash = createHash("sha256") - .update(`${eventId}\0${flowName}\0${stepName}`) + .update(`${eventId}\0${flowName}\0${stepName}${replayNonce ? `\0${replayNonce}` : ""}`) .digest("hex") .slice(0, 12); - return `run_${hash}`; + return replayNonce ? `run_${hash}_replay` : `run_${hash}`; } -async function writeEventFile(dataDir: string, event: FlowEvent): Promise { +async function writeEventFile(dataDir: string, event: FlowEvent, suffix?: string): Promise { const directory = path.join(dataDir, "events"); await mkdir(directory, { recursive: true }); - const filePath = path.join(directory, `${safeFileName(event.id)}.json`); + const filePath = path.join(directory, `${safeFileName(suffix ? `${event.id}:${suffix}:${Date.now()}` : event.id)}.json`); await Bun.write(filePath, JSON.stringify(event, null, 2)); return filePath; } diff --git a/apps/flow-backend-systemd-local/src/config.ts b/apps/flow-backend-systemd-local/src/config.ts index d494af1..b03f92c 100644 --- a/apps/flow-backend-systemd-local/src/config.ts +++ b/apps/flow-backend-systemd-local/src/config.ts @@ -17,7 +17,12 @@ export type FlowBackendConfig = { export type FlowBackendCli = | { kind: "help" } | { kind: "serve"; config: FlowBackendConfig } - | { kind: "dispatch"; config: FlowBackendConfig; eventPath: string; wait: boolean }; + | { kind: "dispatch"; config: FlowBackendConfig; eventPath: string; wait: boolean } + | { kind: "list-events"; config: FlowBackendConfig; limit?: number; type?: string } + | { kind: "show-event"; config: FlowBackendConfig; eventId: string } + | { kind: "replay-event"; config: FlowBackendConfig; eventId: string; wait: boolean } + | { kind: "list-runs"; config: FlowBackendConfig; eventId?: string; status?: string; limit?: number } + | { kind: "show-run"; config: FlowBackendConfig; runId: string }; export function readConfig( env: Record = process.env, @@ -58,6 +63,11 @@ export function parseCli(argv: string[], env: Record let flowRunnerPath: string | undefined; let wait = false; let eventPath: string | undefined; + let eventId: string | undefined; + let runId: string | undefined; + let status: string | undefined; + let limit: number | undefined; + let type: string | undefined; const rest = argv.slice(1); for (let index = 0; index < rest.length; index += 1) { const arg = rest[index]; @@ -104,6 +114,54 @@ export function parseCli(argv: string[], env: Record wait = true; continue; } + if (arg === "--event-id") { + eventId = required(rest, ++index, arg); + continue; + } + if (arg.startsWith("--event-id=")) { + eventId = arg.slice("--event-id=".length); + continue; + } + if (arg === "--run-id") { + runId = required(rest, ++index, arg); + continue; + } + if (arg.startsWith("--run-id=")) { + runId = arg.slice("--run-id=".length); + continue; + } + if (arg === "--status") { + status = required(rest, ++index, arg); + continue; + } + if (arg.startsWith("--status=")) { + status = arg.slice("--status=".length); + continue; + } + if (arg === "--limit") { + limit = Number(required(rest, ++index, arg)); + continue; + } + if (arg.startsWith("--limit=")) { + limit = Number(arg.slice("--limit=".length)); + continue; + } + if (arg === "--type") { + type = required(rest, ++index, arg); + continue; + } + if (arg.startsWith("--type=")) { + type = arg.slice("--type=".length); + continue; + } + if (!arg.startsWith("-") && !eventId && (command === "show-event" || command === "replay-event")) { + eventId = arg; + continue; + } + if (!arg.startsWith("-") && !runId && command === "show-run") { + runId = arg; + continue; + } throw new Error(`Unknown option: ${arg}`); } @@ -126,6 +184,21 @@ export function parseCli(argv: string[], env: Record } return { kind: "dispatch", config, eventPath, wait }; } + if (command === "list-events" || command === "events") { + return { kind: "list-events", config, limit, type }; + } + if (command === "show-event" || command === "event") { + return { kind: "show-event", config, eventId: requireValue(eventId, "show-event requires ") }; + } + if (command === "replay-event" || command === "replay") { + return { kind: "replay-event", config, eventId: requireValue(eventId, "replay-event requires "), wait }; + } + if (command === "list-runs" || command === "runs") { + return { kind: "list-runs", config, eventId, status, limit }; + } + if (command === "show-run" || command === "run") { + return { kind: "show-run", config, runId: requireValue(runId, "show-run requires ") }; + } throw new Error(`Unknown command: ${command}`); } @@ -138,6 +211,11 @@ export function helpText(): string { "Usage:", " codex-flow-systemd-local serve [--cwd ] [--data-dir ] [--host ] [--port ]", " codex-flow-systemd-local dispatch --event [--cwd ] [--data-dir ] [--wait]", + " codex-flow-systemd-local list-events [--type ] [--limit ]", + " codex-flow-systemd-local show-event ", + " codex-flow-systemd-local replay-event [--wait]", + " codex-flow-systemd-local list-runs [--event-id ] [--status ] [--limit ]", + " codex-flow-systemd-local show-run ", "", "Environment:", " CODEX_FLOW_BACKEND_SECRET Optional HMAC secret for HTTP dispatches", @@ -202,3 +280,10 @@ function required(args: string[], index: number, flag: string): string { } return value; } + +function requireValue(value: T | undefined, message: string): T { + if (value === undefined) { + throw new Error(message); + } + return value; +} diff --git a/apps/flow-backend-systemd-local/src/index.ts b/apps/flow-backend-systemd-local/src/index.ts index 916a5e0..1a9ef96 100644 --- a/apps/flow-backend-systemd-local/src/index.ts +++ b/apps/flow-backend-systemd-local/src/index.ts @@ -1,9 +1,9 @@ #!/usr/bin/env bun import path from "node:path"; -import { dispatchFlowEvent, readFlowEvent } from "./backend.ts"; +import { dispatchFlowEvent, readFlowEvent, replayFlowEvent } from "./backend.ts"; import { helpText, parseCli } from "./config.ts"; import { serveFlowBackend } from "./server.ts"; -import { FlowBackendStore } from "./store.ts"; +import { FlowBackendStore, type FlowRunStatus } from "./store.ts"; await main().catch((error) => { process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); @@ -23,16 +23,70 @@ async function main(): Promise { } const store = new FlowBackendStore(path.join(cli.config.dataDir, "flow-backend.sqlite")); try { - const event = await readFlowEvent(cli.eventPath); - const result = await dispatchFlowEvent({ - config: cli.config, - store, - event, - wait: cli.wait, - env: process.env, - }); - process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + if (cli.kind === "dispatch") { + const event = await readFlowEvent(cli.eventPath); + const result = await dispatchFlowEvent({ + config: cli.config, + store, + event, + wait: cli.wait, + env: process.env, + }); + writeJson(result); + return; + } + if (cli.kind === "list-events") { + writeJson({ events: store.listEvents({ type: cli.type, limit: cli.limit }) }); + return; + } + if (cli.kind === "show-event") { + const event = store.getEvent(cli.eventId); + if (!event) { + throw new Error(`Unknown event: ${cli.eventId}`); + } + writeJson({ event, runs: store.listRunsByEvent(cli.eventId) }); + return; + } + if (cli.kind === "replay-event") { + writeJson(await replayFlowEvent({ + config: cli.config, + store, + eventId: cli.eventId, + wait: cli.wait, + env: process.env, + })); + return; + } + if (cli.kind === "list-runs") { + writeJson({ + runs: store.listRuns({ + eventId: cli.eventId, + status: cli.status ? requireRunStatus(cli.status) : undefined, + limit: cli.limit, + }), + }); + return; + } + if (cli.kind === "show-run") { + const run = store.getRun(cli.runId); + if (!run) { + throw new Error(`Unknown run: ${cli.runId}`); + } + writeJson({ run }); + return; + } } finally { store.close(); } } + +function writeJson(value: unknown): void { + process.stdout.write(`${JSON.stringify(value, null, 2)}\n`); +} + +function requireRunStatus(value: string): FlowRunStatus { + if (value === "queued" || value === "running" || value === "completed" || value === "failed") { + return value; + } + throw new Error("run status must be queued, running, completed, or failed"); +} diff --git a/apps/flow-backend-systemd-local/src/server.ts b/apps/flow-backend-systemd-local/src/server.ts index 10d2dbc..1981d35 100644 --- a/apps/flow-backend-systemd-local/src/server.ts +++ b/apps/flow-backend-systemd-local/src/server.ts @@ -1,8 +1,8 @@ import path from "node:path"; import type { FlowBackendConfig } from "./config.ts"; -import { dispatchFlowEvent, normalizeFlowEvent } from "./backend.ts"; +import { dispatchFlowEvent, normalizeFlowEvent, replayFlowEvent } from "./backend.ts"; import { requestSignature, verifyBodySignature } from "./signature.ts"; -import { FlowBackendStore } from "./store.ts"; +import { FlowBackendStore, type FlowRunStatus } from "./store.ts"; export function serveFlowBackend(config: FlowBackendConfig): ReturnType { const store = new FlowBackendStore(path.join(config.dataDir, "flow-backend.sqlite")); @@ -16,28 +16,104 @@ export function serveFlowBackend(config: FlowBackendConfig): ReturnType { + if (!body.trim()) { + return {}; + } + const parsed = JSON.parse(body) as unknown; + return isRecord(parsed) ? parsed : {}; +} + +function requireRunStatus(value: string): FlowRunStatus { + if (value === "queued" || value === "running" || value === "completed" || value === "failed") { + return value; + } + throw new Error("run status must be queued, running, completed, or failed"); +} + function json(value: unknown, status = 200): Response { return new Response(JSON.stringify(value, null, 2), { status, headers: { "content-type": "application/json" }, }); } + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/apps/flow-backend-systemd-local/src/store.ts b/apps/flow-backend-systemd-local/src/store.ts index d0e5a93..a2e2dc0 100644 --- a/apps/flow-backend-systemd-local/src/store.ts +++ b/apps/flow-backend-systemd-local/src/store.ts @@ -25,6 +25,28 @@ export type FlowRunRecord = { completedAt?: string; }; +export type FlowEventRecord = { + id: string; + type: string; + source?: string; + occurredAt?: string; + receivedAt: string; + payload: Record; + raw: FlowEvent; + createdAt: string; +}; + +export type ListRunsOptions = { + eventId?: string; + status?: FlowRunStatus; + limit?: number; +}; + +export type ListEventsOptions = { + type?: string; + limit?: number; +}; + export class FlowBackendStore { readonly dbPath: string; #db: Database; @@ -64,6 +86,8 @@ export class FlowBackendStore { completed_at text ); create index if not exists flow_runs_event_id_idx on flow_runs(event_id); + create index if not exists flow_runs_status_idx on flow_runs(status); + create index if not exists flow_events_type_idx on flow_events(type); `); } @@ -135,17 +159,67 @@ export class FlowBackendStore { } listRunsByEvent(eventId: string): FlowRunRecord[] { + return this.listRuns({ eventId, limit: 1_000 }); + } + + listRuns(options: ListRunsOptions = {}): FlowRunRecord[] { + const clauses: string[] = []; + const params: Record = { + $limit: clampLimit(options.limit), + }; + if (options.eventId) { + clauses.push("event_id = $eventId"); + params.$eventId = options.eventId; + } + if (options.status) { + clauses.push("status = $status"); + params.$status = options.status; + } + const where = clauses.length > 0 ? `where ${clauses.join(" and ")}` : ""; return this.#db - .query("select * from flow_runs where event_id = $eventId order by created_at, id") - .all({ $eventId: eventId }) + .query(`select * from flow_runs ${where} order by created_at desc, id desc limit $limit`) + .all(params) .map(rowToRunRecord); } + getRun(id: string): FlowRunRecord | undefined { + const row = this.#db.query("select * from flow_runs where id = $id").get({ $id: id }); + return row ? rowToRunRecord(row) : undefined; + } + + listEvents(options: ListEventsOptions = {}): FlowEventRecord[] { + const clauses: string[] = []; + const params: Record = { + $limit: clampLimit(options.limit), + }; + if (options.type) { + clauses.push("type = $type"); + params.$type = options.type; + } + const where = clauses.length > 0 ? `where ${clauses.join(" and ")}` : ""; + return this.#db + .query(`select * from flow_events ${where} order by created_at desc, id desc limit $limit`) + .all(params) + .map(rowToEventRecord); + } + + getEvent(id: string): FlowEventRecord | undefined { + const row = this.#db.query("select * from flow_events where id = $id").get({ $id: id }); + return row ? rowToEventRecord(row) : undefined; + } + close(): void { this.#db.close(); } } +function clampLimit(value: number | undefined): number { + if (!value || !Number.isFinite(value)) { + return 50; + } + return Math.max(1, Math.min(500, Math.trunc(value))); +} + function runParams(record: FlowRunRecord): Record { return { $id: record.id, @@ -193,6 +267,27 @@ function rowToRunRecord(row: unknown): FlowRunRecord { }; } +function rowToEventRecord(row: unknown): FlowEventRecord { + if (!isRecord(row)) { + throw new Error("invalid event row"); + } + const payload = JSON.parse(String(row.payload_json)) as unknown; + const raw = JSON.parse(String(row.raw_json)) as unknown; + if (!isRecord(payload) || !isRecord(raw)) { + throw new Error("invalid event json"); + } + return { + id: String(row.id), + type: String(row.type), + ...(typeof row.source === "string" ? { source: row.source } : {}), + ...(typeof row.occurred_at === "string" ? { occurredAt: row.occurred_at } : {}), + receivedAt: String(row.received_at), + payload, + raw: raw as FlowEvent, + createdAt: String(row.created_at), + }; +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } diff --git a/apps/flow-backend-systemd-local/test/backend.test.ts b/apps/flow-backend-systemd-local/test/backend.test.ts index 0284547..a9f542f 100644 --- a/apps/flow-backend-systemd-local/test/backend.test.ts +++ b/apps/flow-backend-systemd-local/test/backend.test.ts @@ -2,8 +2,8 @@ import { expect, test } from "bun:test"; import { mkdir, mkdtemp, rm } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { dispatchFlowEvent } from "../src/backend.ts"; -import { readConfig } from "../src/config.ts"; +import { dispatchFlowEvent, replayFlowEvent } from "../src/backend.ts"; +import { parseCli, readConfig } from "../src/config.ts"; import { flowCommand } from "../src/executor.ts"; import { signBody, verifyBodySignature } from "../src/signature.ts"; import { FlowBackendStore } from "../src/store.ts"; @@ -53,6 +53,28 @@ test("dispatches matching flow steps and records runs", async () => { status: "completed", }); expect(runs[0]?.stdout).toContain("hello Ada"); + expect(store.listEvents()).toHaveLength(1); + expect(store.getEvent("event-1")).toMatchObject({ + id: "event-1", + type: "demo.event", + payload: { name: "Ada" }, + }); + + const replay = await replayFlowEvent({ + config, + store, + eventId: "event-1", + wait: true, + env: {}, + }); + + expect(replay).toMatchObject({ status: "accepted", eventId: "event-1", matched: 1 }); + const replayRuns = store.listRuns({ eventId: "event-1" }); + expect(replayRuns).toHaveLength(2); + expect(replayRuns.map((run) => run.status).sort()).toEqual(["completed", "completed"]); + expect(replayRuns.some((run) => run.id.endsWith("_replay"))).toBe(true); + const replayRun = replayRuns.find((run) => run.id.endsWith("_replay")); + expect(replayRun ? store.getRun(replayRun.id)?.stdout : "").toContain("hello Ada"); } finally { store.close(); } @@ -61,6 +83,32 @@ test("dispatches matching flow steps and records runs", async () => { } }); +test("parses inspection and replay commands", () => { + expect(parseCli(["list-events", "--type", "demo.event", "--limit", "10"], {})).toMatchObject({ + kind: "list-events", + type: "demo.event", + limit: 10, + }); + expect(parseCli(["show-event", "event-1"], {})).toMatchObject({ + kind: "show-event", + eventId: "event-1", + }); + expect(parseCli(["replay-event", "event-1", "--wait"], {})).toMatchObject({ + kind: "replay-event", + eventId: "event-1", + wait: true, + }); + expect(parseCli(["list-runs", "--event-id", "event-1", "--status", "failed"], {})).toMatchObject({ + kind: "list-runs", + eventId: "event-1", + status: "failed", + }); + expect(parseCli(["show-run", "run_123"], {})).toMatchObject({ + kind: "show-run", + runId: "run_123", + }); +}); + test("builds systemd-run commands without executing them", () => { const config = readConfig({}, { cwd: "/tmp/project", executor: "systemd-run", bunCommand: "/usr/bin/bun" }); const command = flowCommand({ diff --git a/docs/flows.md b/docs/flows.md index 41e8bf4..60f3f9b 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -134,9 +134,50 @@ still suitable when the backend service itself is managed by systemd. Endpoints: - `POST /events` or `POST /flow-events`: accept one `FlowEvent` -- `GET /runs?eventId=`: inspect recorded runs for an event +- `GET /events?limit=`: list stored events +- `GET /events/`: inspect a stored event and its runs +- `POST /events//replay`: start a new run attempt for a stored event +- `GET /runs?eventId=&status=&limit=`: inspect recorded runs +- `GET /runs/`: inspect one recorded run - `GET /healthz`: health check +The CLI exposes the same operational surface: + +```bash +bun run flow:backend list-events --limit 20 +bun run flow:backend show-event 'patchbay:source:entry:upstream.release' +bun run flow:backend list-runs --status failed --limit 20 +bun run flow:backend show-run run_abc123 +bun run flow:backend replay-event 'patchbay:source:entry:upstream.release' --wait +``` + +Normal dispatch is idempotent by `event.id`: a duplicate `POST /events` returns +the existing run ids and does not start another attempt. `replay-event` and +`POST /events//replay` intentionally create a new run attempt for the +stored event, which is the recovery path for accepted events whose flow step +failed or blocked. + +The live backend state is a SQLite database under +`CODEX_FLOW_BACKEND_DATA_DIR` plus per-event JSON files under +`CODEX_FLOW_BACKEND_DATA_DIR/events`. Back up both by copying the whole data +directory while the service is stopped, or by using SQLite online backup plus a +copy of the `events/` directory. For the current host deployment the intended +values are: + +```text +CODEX_FLOW_BACKEND_CWD=/home/peezy/codex-flows-public +CODEX_FLOW_BACKEND_DATA_DIR=/home/peezy/.local/state/codex-flow-systemd-local +CODEX_FLOWS_MODE=code-mode +CODEX_FLOW_PUSH=1 +CODEX_FLOW_PUBLISH=1 +PEEZY_CODEX_REPO=/home/peezy/codex-flow-worktrees/codex +PEEZY_CODEX_TARGET_BRANCH=code-mode-exec-hooks +``` + +Do not fabricate an upstream Codex release lifecycle test. Until the next real +`openai/codex` release, use health checks, non-release smoke events, and stored +event inspection/replay tooling only. + ## Convex Backend Direction Convex should be a durable orchestration backend, not the place where long