Add flow backend inspection and replay
All checks were successful
ci / check (push) Successful in 29s
All checks were successful
ci / check (push) Successful in 29s
This commit is contained in:
parent
0d3c2e2ab6
commit
8bb6072006
8 changed files with 457 additions and 34 deletions
|
|
@ -74,6 +74,8 @@ the Codex release flows.
|
|||
```bash
|
||||
bun run flow list
|
||||
bun run flow:backend serve --cwd "$(pwd)"
|
||||
bun run flow:backend list-events --limit 20
|
||||
bun run flow:backend list-runs --status failed
|
||||
```
|
||||
|
||||
Code Mode flow steps are present on `main` behind one mode flag:
|
||||
|
|
@ -86,6 +88,11 @@ That mode enables `runner = "code-mode"` steps and makes stdio app-server
|
|||
launches default to `bunx @peezy.tech/codex`. `CODEX_APP_SERVER_CODEX_COMMAND`
|
||||
still wins when a specific local binary should be used.
|
||||
|
||||
For release readiness, inspect and replay stored events with
|
||||
`bun run flow:backend show-event`, `show-run`, and `replay-event`. Do not run a
|
||||
fabricated full `openai/codex` release lifecycle; the first full lifecycle test
|
||||
should happen on the next real upstream release.
|
||||
|
||||
## Development Flow
|
||||
|
||||
Development happens on jojo at `jojo.build`. Codeberg is configured as a push mirror, and GitHub is kept for npm trusted publishing only.
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ export type DispatchFlowEventOptions = {
|
|||
event: FlowEvent;
|
||||
wait?: boolean;
|
||||
env?: Record<string, string | undefined>;
|
||||
replay?: boolean;
|
||||
};
|
||||
|
||||
export type DispatchFlowEventResult = {
|
||||
|
|
@ -28,8 +29,8 @@ export type DispatchFlowEventResult = {
|
|||
};
|
||||
|
||||
export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Promise<DispatchFlowEventResult> {
|
||||
const inserted = options.store.insertEvent(options.event);
|
||||
if (!inserted) {
|
||||
const inserted = options.replay ? false : options.store.insertEvent(options.event);
|
||||
if (!inserted && !options.replay) {
|
||||
return {
|
||||
status: "duplicate",
|
||||
eventId: options.event.id,
|
||||
|
|
@ -38,12 +39,13 @@ export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Prom
|
|||
};
|
||||
}
|
||||
|
||||
const eventPath = await writeEventFile(options.config.dataDir, options.event);
|
||||
const eventPath = await writeEventFile(options.config.dataDir, options.event, options.replay ? "replay" : undefined);
|
||||
const flows = await discoverFlows({ cwd: options.config.cwd });
|
||||
const matches = await matchingSteps(flows, options.event);
|
||||
const promises: Array<Promise<void>> = [];
|
||||
const replayNonce = options.replay ? `${Date.now()}:${Math.random()}` : undefined;
|
||||
for (const match of matches) {
|
||||
const run = createRunRecord(options.config, options.event, match.flow, match.step, eventPath);
|
||||
const run = createRunRecord(options.config, options.event, match.flow, match.step, eventPath, replayNonce);
|
||||
options.store.createRun(run);
|
||||
const promise = executeAndRecord({
|
||||
config: options.config,
|
||||
|
|
@ -70,11 +72,25 @@ export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Prom
|
|||
return {
|
||||
status: "accepted",
|
||||
eventId: options.event.id,
|
||||
runIds: matches.map((match) => runId(options.event.id, match.flow.manifest.name, match.step.name)),
|
||||
runIds: matches.map((match) => runId(options.event.id, match.flow.manifest.name, match.step.name, replayNonce)),
|
||||
matched: matches.length,
|
||||
};
|
||||
}
|
||||
|
||||
export async function replayFlowEvent(options: Omit<DispatchFlowEventOptions, "event" | "replay"> & {
|
||||
eventId: string;
|
||||
}): Promise<DispatchFlowEventResult> {
|
||||
const event = options.store.getEvent(options.eventId);
|
||||
if (!event) {
|
||||
throw new Error(`Unknown event: ${options.eventId}`);
|
||||
}
|
||||
return dispatchFlowEvent({
|
||||
...options,
|
||||
event: event.raw,
|
||||
replay: true,
|
||||
});
|
||||
}
|
||||
|
||||
export async function readFlowEvent(pathValue: string): Promise<FlowEvent> {
|
||||
return normalizeFlowEvent(JSON.parse(await Bun.file(path.resolve(pathValue)).text()) as unknown);
|
||||
}
|
||||
|
|
@ -133,9 +149,10 @@ function createRunRecord(
|
|||
flow: LoadedFlow,
|
||||
step: FlowStep,
|
||||
eventPath: string,
|
||||
replayNonce?: string,
|
||||
): FlowRunRecord {
|
||||
return {
|
||||
id: runId(event.id, flow.manifest.name, step.name),
|
||||
id: runId(event.id, flow.manifest.name, step.name, replayNonce),
|
||||
eventId: event.id,
|
||||
flowName: flow.manifest.name,
|
||||
stepName: step.name,
|
||||
|
|
@ -147,18 +164,18 @@ function createRunRecord(
|
|||
};
|
||||
}
|
||||
|
||||
function runId(eventId: string, flowName: string, stepName: string): string {
|
||||
function runId(eventId: string, flowName: string, stepName: string, replayNonce?: string): string {
|
||||
const hash = createHash("sha256")
|
||||
.update(`${eventId}\0${flowName}\0${stepName}`)
|
||||
.update(`${eventId}\0${flowName}\0${stepName}${replayNonce ? `\0${replayNonce}` : ""}`)
|
||||
.digest("hex")
|
||||
.slice(0, 12);
|
||||
return `run_${hash}`;
|
||||
return replayNonce ? `run_${hash}_replay` : `run_${hash}`;
|
||||
}
|
||||
|
||||
async function writeEventFile(dataDir: string, event: FlowEvent): Promise<string> {
|
||||
async function writeEventFile(dataDir: string, event: FlowEvent, suffix?: string): Promise<string> {
|
||||
const directory = path.join(dataDir, "events");
|
||||
await mkdir(directory, { recursive: true });
|
||||
const filePath = path.join(directory, `${safeFileName(event.id)}.json`);
|
||||
const filePath = path.join(directory, `${safeFileName(suffix ? `${event.id}:${suffix}:${Date.now()}` : event.id)}.json`);
|
||||
await Bun.write(filePath, JSON.stringify(event, null, 2));
|
||||
return filePath;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,12 @@ export type FlowBackendConfig = {
|
|||
export type FlowBackendCli =
|
||||
| { kind: "help" }
|
||||
| { kind: "serve"; config: FlowBackendConfig }
|
||||
| { kind: "dispatch"; config: FlowBackendConfig; eventPath: string; wait: boolean };
|
||||
| { kind: "dispatch"; config: FlowBackendConfig; eventPath: string; wait: boolean }
|
||||
| { kind: "list-events"; config: FlowBackendConfig; limit?: number; type?: string }
|
||||
| { kind: "show-event"; config: FlowBackendConfig; eventId: string }
|
||||
| { kind: "replay-event"; config: FlowBackendConfig; eventId: string; wait: boolean }
|
||||
| { kind: "list-runs"; config: FlowBackendConfig; eventId?: string; status?: string; limit?: number }
|
||||
| { kind: "show-run"; config: FlowBackendConfig; runId: string };
|
||||
|
||||
export function readConfig(
|
||||
env: Record<string, string | undefined> = process.env,
|
||||
|
|
@ -58,6 +63,11 @@ export function parseCli(argv: string[], env: Record<string, string | undefined>
|
|||
let flowRunnerPath: string | undefined;
|
||||
let wait = false;
|
||||
let eventPath: string | undefined;
|
||||
let eventId: string | undefined;
|
||||
let runId: string | undefined;
|
||||
let status: string | undefined;
|
||||
let limit: number | undefined;
|
||||
let type: string | undefined;
|
||||
const rest = argv.slice(1);
|
||||
for (let index = 0; index < rest.length; index += 1) {
|
||||
const arg = rest[index];
|
||||
|
|
@ -104,6 +114,54 @@ export function parseCli(argv: string[], env: Record<string, string | undefined>
|
|||
wait = true;
|
||||
continue;
|
||||
}
|
||||
if (arg === "--event-id") {
|
||||
eventId = required(rest, ++index, arg);
|
||||
continue;
|
||||
}
|
||||
if (arg.startsWith("--event-id=")) {
|
||||
eventId = arg.slice("--event-id=".length);
|
||||
continue;
|
||||
}
|
||||
if (arg === "--run-id") {
|
||||
runId = required(rest, ++index, arg);
|
||||
continue;
|
||||
}
|
||||
if (arg.startsWith("--run-id=")) {
|
||||
runId = arg.slice("--run-id=".length);
|
||||
continue;
|
||||
}
|
||||
if (arg === "--status") {
|
||||
status = required(rest, ++index, arg);
|
||||
continue;
|
||||
}
|
||||
if (arg.startsWith("--status=")) {
|
||||
status = arg.slice("--status=".length);
|
||||
continue;
|
||||
}
|
||||
if (arg === "--limit") {
|
||||
limit = Number(required(rest, ++index, arg));
|
||||
continue;
|
||||
}
|
||||
if (arg.startsWith("--limit=")) {
|
||||
limit = Number(arg.slice("--limit=".length));
|
||||
continue;
|
||||
}
|
||||
if (arg === "--type") {
|
||||
type = required(rest, ++index, arg);
|
||||
continue;
|
||||
}
|
||||
if (arg.startsWith("--type=")) {
|
||||
type = arg.slice("--type=".length);
|
||||
continue;
|
||||
}
|
||||
if (!arg.startsWith("-") && !eventId && (command === "show-event" || command === "replay-event")) {
|
||||
eventId = arg;
|
||||
continue;
|
||||
}
|
||||
if (!arg.startsWith("-") && !runId && command === "show-run") {
|
||||
runId = arg;
|
||||
continue;
|
||||
}
|
||||
throw new Error(`Unknown option: ${arg}`);
|
||||
}
|
||||
|
||||
|
|
@ -126,6 +184,21 @@ export function parseCli(argv: string[], env: Record<string, string | undefined>
|
|||
}
|
||||
return { kind: "dispatch", config, eventPath, wait };
|
||||
}
|
||||
if (command === "list-events" || command === "events") {
|
||||
return { kind: "list-events", config, limit, type };
|
||||
}
|
||||
if (command === "show-event" || command === "event") {
|
||||
return { kind: "show-event", config, eventId: requireValue(eventId, "show-event requires <event-id>") };
|
||||
}
|
||||
if (command === "replay-event" || command === "replay") {
|
||||
return { kind: "replay-event", config, eventId: requireValue(eventId, "replay-event requires <event-id>"), wait };
|
||||
}
|
||||
if (command === "list-runs" || command === "runs") {
|
||||
return { kind: "list-runs", config, eventId, status, limit };
|
||||
}
|
||||
if (command === "show-run" || command === "run") {
|
||||
return { kind: "show-run", config, runId: requireValue(runId, "show-run requires <run-id>") };
|
||||
}
|
||||
throw new Error(`Unknown command: ${command}`);
|
||||
}
|
||||
|
||||
|
|
@ -138,6 +211,11 @@ export function helpText(): string {
|
|||
"Usage:",
|
||||
" codex-flow-systemd-local serve [--cwd <dir>] [--data-dir <dir>] [--host <host>] [--port <port>]",
|
||||
" codex-flow-systemd-local dispatch --event <event.json> [--cwd <dir>] [--data-dir <dir>] [--wait]",
|
||||
" codex-flow-systemd-local list-events [--type <type>] [--limit <n>]",
|
||||
" codex-flow-systemd-local show-event <event-id>",
|
||||
" codex-flow-systemd-local replay-event <event-id> [--wait]",
|
||||
" codex-flow-systemd-local list-runs [--event-id <event-id>] [--status <status>] [--limit <n>]",
|
||||
" codex-flow-systemd-local show-run <run-id>",
|
||||
"",
|
||||
"Environment:",
|
||||
" CODEX_FLOW_BACKEND_SECRET Optional HMAC secret for HTTP dispatches",
|
||||
|
|
@ -202,3 +280,10 @@ function required(args: string[], index: number, flag: string): string {
|
|||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function requireValue<T>(value: T | undefined, message: string): T {
|
||||
if (value === undefined) {
|
||||
throw new Error(message);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
#!/usr/bin/env bun
|
||||
import path from "node:path";
|
||||
import { dispatchFlowEvent, readFlowEvent } from "./backend.ts";
|
||||
import { dispatchFlowEvent, readFlowEvent, replayFlowEvent } from "./backend.ts";
|
||||
import { helpText, parseCli } from "./config.ts";
|
||||
import { serveFlowBackend } from "./server.ts";
|
||||
import { FlowBackendStore } from "./store.ts";
|
||||
import { FlowBackendStore, type FlowRunStatus } from "./store.ts";
|
||||
|
||||
await main().catch((error) => {
|
||||
process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
|
||||
|
|
@ -23,16 +23,70 @@ async function main(): Promise<void> {
|
|||
}
|
||||
const store = new FlowBackendStore(path.join(cli.config.dataDir, "flow-backend.sqlite"));
|
||||
try {
|
||||
const event = await readFlowEvent(cli.eventPath);
|
||||
const result = await dispatchFlowEvent({
|
||||
config: cli.config,
|
||||
store,
|
||||
event,
|
||||
wait: cli.wait,
|
||||
env: process.env,
|
||||
});
|
||||
process.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
|
||||
if (cli.kind === "dispatch") {
|
||||
const event = await readFlowEvent(cli.eventPath);
|
||||
const result = await dispatchFlowEvent({
|
||||
config: cli.config,
|
||||
store,
|
||||
event,
|
||||
wait: cli.wait,
|
||||
env: process.env,
|
||||
});
|
||||
writeJson(result);
|
||||
return;
|
||||
}
|
||||
if (cli.kind === "list-events") {
|
||||
writeJson({ events: store.listEvents({ type: cli.type, limit: cli.limit }) });
|
||||
return;
|
||||
}
|
||||
if (cli.kind === "show-event") {
|
||||
const event = store.getEvent(cli.eventId);
|
||||
if (!event) {
|
||||
throw new Error(`Unknown event: ${cli.eventId}`);
|
||||
}
|
||||
writeJson({ event, runs: store.listRunsByEvent(cli.eventId) });
|
||||
return;
|
||||
}
|
||||
if (cli.kind === "replay-event") {
|
||||
writeJson(await replayFlowEvent({
|
||||
config: cli.config,
|
||||
store,
|
||||
eventId: cli.eventId,
|
||||
wait: cli.wait,
|
||||
env: process.env,
|
||||
}));
|
||||
return;
|
||||
}
|
||||
if (cli.kind === "list-runs") {
|
||||
writeJson({
|
||||
runs: store.listRuns({
|
||||
eventId: cli.eventId,
|
||||
status: cli.status ? requireRunStatus(cli.status) : undefined,
|
||||
limit: cli.limit,
|
||||
}),
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (cli.kind === "show-run") {
|
||||
const run = store.getRun(cli.runId);
|
||||
if (!run) {
|
||||
throw new Error(`Unknown run: ${cli.runId}`);
|
||||
}
|
||||
writeJson({ run });
|
||||
return;
|
||||
}
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
|
||||
function writeJson(value: unknown): void {
|
||||
process.stdout.write(`${JSON.stringify(value, null, 2)}\n`);
|
||||
}
|
||||
|
||||
function requireRunStatus(value: string): FlowRunStatus {
|
||||
if (value === "queued" || value === "running" || value === "completed" || value === "failed") {
|
||||
return value;
|
||||
}
|
||||
throw new Error("run status must be queued, running, completed, or failed");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import path from "node:path";
|
||||
import type { FlowBackendConfig } from "./config.ts";
|
||||
import { dispatchFlowEvent, normalizeFlowEvent } from "./backend.ts";
|
||||
import { dispatchFlowEvent, normalizeFlowEvent, replayFlowEvent } from "./backend.ts";
|
||||
import { requestSignature, verifyBodySignature } from "./signature.ts";
|
||||
import { FlowBackendStore } from "./store.ts";
|
||||
import { FlowBackendStore, type FlowRunStatus } from "./store.ts";
|
||||
|
||||
export function serveFlowBackend(config: FlowBackendConfig): ReturnType<typeof Bun.serve> {
|
||||
const store = new FlowBackendStore(path.join(config.dataDir, "flow-backend.sqlite"));
|
||||
|
|
@ -16,28 +16,104 @@ export function serveFlowBackend(config: FlowBackendConfig): ReturnType<typeof B
|
|||
}
|
||||
if (request.method === "POST" && (url.pathname === "/events" || url.pathname === "/flow-events")) {
|
||||
const body = await request.text();
|
||||
if (config.secret && !verifyBodySignature(config.secret, body, requestSignature(request.headers))) {
|
||||
if (!validSignature(config, body, request.headers)) {
|
||||
return json({ error: "invalid signature" }, 401);
|
||||
}
|
||||
const event = normalizeFlowEvent(JSON.parse(body) as unknown);
|
||||
const result = await dispatchFlowEvent({ config, store, event });
|
||||
return json(result, 202);
|
||||
}
|
||||
if (request.method === "GET" && url.pathname === "/events") {
|
||||
return json({
|
||||
events: store.listEvents({
|
||||
type: url.searchParams.get("type") ?? undefined,
|
||||
limit: numberParam(url.searchParams.get("limit")),
|
||||
}),
|
||||
});
|
||||
}
|
||||
const eventMatch = url.pathname.match(/^\/events\/([^/]+)(?:\/(replay))?$/);
|
||||
if (eventMatch?.[1] && request.method === "GET" && !eventMatch[2]) {
|
||||
const eventId = decodeURIComponent(eventMatch[1]);
|
||||
const event = store.getEvent(eventId);
|
||||
if (!event) {
|
||||
return json({ error: "event not found" }, 404);
|
||||
}
|
||||
return json({ event, runs: store.listRunsByEvent(eventId) });
|
||||
}
|
||||
if (eventMatch?.[1] && eventMatch[2] === "replay" && request.method === "POST") {
|
||||
const body = await request.text();
|
||||
if (!validSignature(config, body, request.headers)) {
|
||||
return json({ error: "invalid signature" }, 401);
|
||||
}
|
||||
const params = parseBody(body);
|
||||
const result = await replayFlowEvent({
|
||||
config,
|
||||
store,
|
||||
eventId: decodeURIComponent(eventMatch[1]),
|
||||
wait: Boolean(params.wait),
|
||||
env: process.env,
|
||||
});
|
||||
return json(result, 202);
|
||||
}
|
||||
if (request.method === "GET" && url.pathname === "/runs") {
|
||||
const eventId = url.searchParams.get("eventId");
|
||||
if (!eventId) {
|
||||
return json({ error: "missing eventId" }, 400);
|
||||
const status = url.searchParams.get("status");
|
||||
return json({
|
||||
...(eventId ? { eventId } : {}),
|
||||
runs: store.listRuns({
|
||||
eventId: eventId ?? undefined,
|
||||
status: status ? requireRunStatus(status) : undefined,
|
||||
limit: numberParam(url.searchParams.get("limit")),
|
||||
}),
|
||||
});
|
||||
}
|
||||
const runMatch = url.pathname.match(/^\/runs\/([^/]+)$/);
|
||||
if (runMatch?.[1] && request.method === "GET") {
|
||||
const run = store.getRun(decodeURIComponent(runMatch[1]));
|
||||
if (!run) {
|
||||
return json({ error: "run not found" }, 404);
|
||||
}
|
||||
return json({ eventId, runs: store.listRunsByEvent(eventId) });
|
||||
return json({ run });
|
||||
}
|
||||
return json({ error: "not found" }, 404);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function validSignature(config: FlowBackendConfig, body: string, headers: Headers): boolean {
|
||||
return !config.secret || verifyBodySignature(config.secret, body, requestSignature(headers));
|
||||
}
|
||||
|
||||
function numberParam(value: string | null): number | undefined {
|
||||
if (!value) {
|
||||
return undefined;
|
||||
}
|
||||
const parsed = Number(value);
|
||||
return Number.isFinite(parsed) ? parsed : undefined;
|
||||
}
|
||||
|
||||
function parseBody(body: string): Record<string, unknown> {
|
||||
if (!body.trim()) {
|
||||
return {};
|
||||
}
|
||||
const parsed = JSON.parse(body) as unknown;
|
||||
return isRecord(parsed) ? parsed : {};
|
||||
}
|
||||
|
||||
function requireRunStatus(value: string): FlowRunStatus {
|
||||
if (value === "queued" || value === "running" || value === "completed" || value === "failed") {
|
||||
return value;
|
||||
}
|
||||
throw new Error("run status must be queued, running, completed, or failed");
|
||||
}
|
||||
|
||||
function json(value: unknown, status = 200): Response {
|
||||
return new Response(JSON.stringify(value, null, 2), {
|
||||
status,
|
||||
headers: { "content-type": "application/json" },
|
||||
});
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,28 @@ export type FlowRunRecord = {
|
|||
completedAt?: string;
|
||||
};
|
||||
|
||||
export type FlowEventRecord = {
|
||||
id: string;
|
||||
type: string;
|
||||
source?: string;
|
||||
occurredAt?: string;
|
||||
receivedAt: string;
|
||||
payload: Record<string, unknown>;
|
||||
raw: FlowEvent;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
export type ListRunsOptions = {
|
||||
eventId?: string;
|
||||
status?: FlowRunStatus;
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
export type ListEventsOptions = {
|
||||
type?: string;
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
export class FlowBackendStore {
|
||||
readonly dbPath: string;
|
||||
#db: Database;
|
||||
|
|
@ -64,6 +86,8 @@ export class FlowBackendStore {
|
|||
completed_at text
|
||||
);
|
||||
create index if not exists flow_runs_event_id_idx on flow_runs(event_id);
|
||||
create index if not exists flow_runs_status_idx on flow_runs(status);
|
||||
create index if not exists flow_events_type_idx on flow_events(type);
|
||||
`);
|
||||
}
|
||||
|
||||
|
|
@ -135,17 +159,67 @@ export class FlowBackendStore {
|
|||
}
|
||||
|
||||
listRunsByEvent(eventId: string): FlowRunRecord[] {
|
||||
return this.listRuns({ eventId, limit: 1_000 });
|
||||
}
|
||||
|
||||
listRuns(options: ListRunsOptions = {}): FlowRunRecord[] {
|
||||
const clauses: string[] = [];
|
||||
const params: Record<string, string | number> = {
|
||||
$limit: clampLimit(options.limit),
|
||||
};
|
||||
if (options.eventId) {
|
||||
clauses.push("event_id = $eventId");
|
||||
params.$eventId = options.eventId;
|
||||
}
|
||||
if (options.status) {
|
||||
clauses.push("status = $status");
|
||||
params.$status = options.status;
|
||||
}
|
||||
const where = clauses.length > 0 ? `where ${clauses.join(" and ")}` : "";
|
||||
return this.#db
|
||||
.query("select * from flow_runs where event_id = $eventId order by created_at, id")
|
||||
.all({ $eventId: eventId })
|
||||
.query(`select * from flow_runs ${where} order by created_at desc, id desc limit $limit`)
|
||||
.all(params)
|
||||
.map(rowToRunRecord);
|
||||
}
|
||||
|
||||
getRun(id: string): FlowRunRecord | undefined {
|
||||
const row = this.#db.query("select * from flow_runs where id = $id").get({ $id: id });
|
||||
return row ? rowToRunRecord(row) : undefined;
|
||||
}
|
||||
|
||||
listEvents(options: ListEventsOptions = {}): FlowEventRecord[] {
|
||||
const clauses: string[] = [];
|
||||
const params: Record<string, string | number> = {
|
||||
$limit: clampLimit(options.limit),
|
||||
};
|
||||
if (options.type) {
|
||||
clauses.push("type = $type");
|
||||
params.$type = options.type;
|
||||
}
|
||||
const where = clauses.length > 0 ? `where ${clauses.join(" and ")}` : "";
|
||||
return this.#db
|
||||
.query(`select * from flow_events ${where} order by created_at desc, id desc limit $limit`)
|
||||
.all(params)
|
||||
.map(rowToEventRecord);
|
||||
}
|
||||
|
||||
getEvent(id: string): FlowEventRecord | undefined {
|
||||
const row = this.#db.query("select * from flow_events where id = $id").get({ $id: id });
|
||||
return row ? rowToEventRecord(row) : undefined;
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.#db.close();
|
||||
}
|
||||
}
|
||||
|
||||
function clampLimit(value: number | undefined): number {
|
||||
if (!value || !Number.isFinite(value)) {
|
||||
return 50;
|
||||
}
|
||||
return Math.max(1, Math.min(500, Math.trunc(value)));
|
||||
}
|
||||
|
||||
function runParams(record: FlowRunRecord): Record<string, string | null> {
|
||||
return {
|
||||
$id: record.id,
|
||||
|
|
@ -193,6 +267,27 @@ function rowToRunRecord(row: unknown): FlowRunRecord {
|
|||
};
|
||||
}
|
||||
|
||||
function rowToEventRecord(row: unknown): FlowEventRecord {
|
||||
if (!isRecord(row)) {
|
||||
throw new Error("invalid event row");
|
||||
}
|
||||
const payload = JSON.parse(String(row.payload_json)) as unknown;
|
||||
const raw = JSON.parse(String(row.raw_json)) as unknown;
|
||||
if (!isRecord(payload) || !isRecord(raw)) {
|
||||
throw new Error("invalid event json");
|
||||
}
|
||||
return {
|
||||
id: String(row.id),
|
||||
type: String(row.type),
|
||||
...(typeof row.source === "string" ? { source: row.source } : {}),
|
||||
...(typeof row.occurred_at === "string" ? { occurredAt: row.occurred_at } : {}),
|
||||
receivedAt: String(row.received_at),
|
||||
payload,
|
||||
raw: raw as FlowEvent,
|
||||
createdAt: String(row.created_at),
|
||||
};
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@ import { expect, test } from "bun:test";
|
|||
import { mkdir, mkdtemp, rm } from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { dispatchFlowEvent } from "../src/backend.ts";
|
||||
import { readConfig } from "../src/config.ts";
|
||||
import { dispatchFlowEvent, replayFlowEvent } from "../src/backend.ts";
|
||||
import { parseCli, readConfig } from "../src/config.ts";
|
||||
import { flowCommand } from "../src/executor.ts";
|
||||
import { signBody, verifyBodySignature } from "../src/signature.ts";
|
||||
import { FlowBackendStore } from "../src/store.ts";
|
||||
|
|
@ -53,6 +53,28 @@ test("dispatches matching flow steps and records runs", async () => {
|
|||
status: "completed",
|
||||
});
|
||||
expect(runs[0]?.stdout).toContain("hello Ada");
|
||||
expect(store.listEvents()).toHaveLength(1);
|
||||
expect(store.getEvent("event-1")).toMatchObject({
|
||||
id: "event-1",
|
||||
type: "demo.event",
|
||||
payload: { name: "Ada" },
|
||||
});
|
||||
|
||||
const replay = await replayFlowEvent({
|
||||
config,
|
||||
store,
|
||||
eventId: "event-1",
|
||||
wait: true,
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(replay).toMatchObject({ status: "accepted", eventId: "event-1", matched: 1 });
|
||||
const replayRuns = store.listRuns({ eventId: "event-1" });
|
||||
expect(replayRuns).toHaveLength(2);
|
||||
expect(replayRuns.map((run) => run.status).sort()).toEqual(["completed", "completed"]);
|
||||
expect(replayRuns.some((run) => run.id.endsWith("_replay"))).toBe(true);
|
||||
const replayRun = replayRuns.find((run) => run.id.endsWith("_replay"));
|
||||
expect(replayRun ? store.getRun(replayRun.id)?.stdout : "").toContain("hello Ada");
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
|
|
@ -61,6 +83,32 @@ test("dispatches matching flow steps and records runs", async () => {
|
|||
}
|
||||
});
|
||||
|
||||
test("parses inspection and replay commands", () => {
|
||||
expect(parseCli(["list-events", "--type", "demo.event", "--limit", "10"], {})).toMatchObject({
|
||||
kind: "list-events",
|
||||
type: "demo.event",
|
||||
limit: 10,
|
||||
});
|
||||
expect(parseCli(["show-event", "event-1"], {})).toMatchObject({
|
||||
kind: "show-event",
|
||||
eventId: "event-1",
|
||||
});
|
||||
expect(parseCli(["replay-event", "event-1", "--wait"], {})).toMatchObject({
|
||||
kind: "replay-event",
|
||||
eventId: "event-1",
|
||||
wait: true,
|
||||
});
|
||||
expect(parseCli(["list-runs", "--event-id", "event-1", "--status", "failed"], {})).toMatchObject({
|
||||
kind: "list-runs",
|
||||
eventId: "event-1",
|
||||
status: "failed",
|
||||
});
|
||||
expect(parseCli(["show-run", "run_123"], {})).toMatchObject({
|
||||
kind: "show-run",
|
||||
runId: "run_123",
|
||||
});
|
||||
});
|
||||
|
||||
test("builds systemd-run commands without executing them", () => {
|
||||
const config = readConfig({}, { cwd: "/tmp/project", executor: "systemd-run", bunCommand: "/usr/bin/bun" });
|
||||
const command = flowCommand({
|
||||
|
|
|
|||
|
|
@ -134,9 +134,50 @@ still suitable when the backend service itself is managed by systemd.
|
|||
Endpoints:
|
||||
|
||||
- `POST /events` or `POST /flow-events`: accept one `FlowEvent`
|
||||
- `GET /runs?eventId=<id>`: inspect recorded runs for an event
|
||||
- `GET /events?limit=<n>`: list stored events
|
||||
- `GET /events/<event-id>`: inspect a stored event and its runs
|
||||
- `POST /events/<event-id>/replay`: start a new run attempt for a stored event
|
||||
- `GET /runs?eventId=<id>&status=<status>&limit=<n>`: inspect recorded runs
|
||||
- `GET /runs/<run-id>`: inspect one recorded run
|
||||
- `GET /healthz`: health check
|
||||
|
||||
The CLI exposes the same operational surface:
|
||||
|
||||
```bash
|
||||
bun run flow:backend list-events --limit 20
|
||||
bun run flow:backend show-event 'patchbay:source:entry:upstream.release'
|
||||
bun run flow:backend list-runs --status failed --limit 20
|
||||
bun run flow:backend show-run run_abc123
|
||||
bun run flow:backend replay-event 'patchbay:source:entry:upstream.release' --wait
|
||||
```
|
||||
|
||||
Normal dispatch is idempotent by `event.id`: a duplicate `POST /events` returns
|
||||
the existing run ids and does not start another attempt. `replay-event` and
|
||||
`POST /events/<event-id>/replay` intentionally create a new run attempt for the
|
||||
stored event, which is the recovery path for accepted events whose flow step
|
||||
failed or blocked.
|
||||
|
||||
The live backend state is a SQLite database under
|
||||
`CODEX_FLOW_BACKEND_DATA_DIR` plus per-event JSON files under
|
||||
`CODEX_FLOW_BACKEND_DATA_DIR/events`. Back up both by copying the whole data
|
||||
directory while the service is stopped, or by using SQLite online backup plus a
|
||||
copy of the `events/` directory. For the current host deployment the intended
|
||||
values are:
|
||||
|
||||
```text
|
||||
CODEX_FLOW_BACKEND_CWD=/home/peezy/codex-flows-public
|
||||
CODEX_FLOW_BACKEND_DATA_DIR=/home/peezy/.local/state/codex-flow-systemd-local
|
||||
CODEX_FLOWS_MODE=code-mode
|
||||
CODEX_FLOW_PUSH=1
|
||||
CODEX_FLOW_PUBLISH=1
|
||||
PEEZY_CODEX_REPO=/home/peezy/codex-flow-worktrees/codex
|
||||
PEEZY_CODEX_TARGET_BRANCH=code-mode-exec-hooks
|
||||
```
|
||||
|
||||
Do not fabricate an upstream Codex release lifecycle test. Until the next real
|
||||
`openai/codex` release, use health checks, non-release smoke events, and stored
|
||||
event inspection/replay tooling only.
|
||||
|
||||
## Convex Backend Direction
|
||||
|
||||
Convex should be a durable orchestration backend, not the place where long
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue