Add flow runtime and Codex release flows
Some checks failed
ci / check (push) Failing after 16s

This commit is contained in:
matamune 2026-05-13 02:42:13 +00:00
parent 5241b634e2
commit e68b8adfb9
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
35 changed files with 2957 additions and 5 deletions

View file

@ -2,12 +2,14 @@
Thin browser UI plus TypeScript client for `codex app-server`.
This branch intentionally drops the workspace service, runtime, gateways, jobs,
delegation, and host setup layer. The remaining source is:
The current source is:
- `apps/web`: React/Vite UI that connects directly to a Codex app-server WebSocket.
- `apps/cli`: Bun CLI that sends JSON-RPC actions to a listening Codex app-server.
- `apps/flow-runner`: CLI for discovering and firing packaged flows.
- `apps/flow-backend-systemd-local`: local HTTP backend for executing flows from dispatch events.
- `packages/codex-client`: JSON-RPC client, app-server transports, flow helpers, and generated protocol types.
- `packages/flow-runtime`: flow manifest loading, event matching, and runner primitives.
- `packages/ui`: small shared UI primitives and styling.
## Run
@ -59,7 +61,23 @@ bun run build
bun run test
```
`bun run test` currently runs the `@peezy.tech/codex-flows` transport tests.
`bun run test` runs the client, flow runtime, local flow backend, CLI, and
Discord bridge tests.
## Flow Automation
Flow packages live under `flows/*` and installed copies can live under
`.codex/flows/*`. See [docs/flows.md](docs/flows.md) for `flow.toml`, generic
`FlowEvent` dispatch, Bun and Code Mode runners, the systemd-local backend, and
the Codex release flows.
```bash
bun run flow list
bun run flow:backend serve --cwd "$(pwd)"
```
Code Mode flow steps are present on `main` but require
`CODEX_FLOWS_ENABLE_CODE_MODE=1` before execution.
## Development Flow
@ -112,6 +130,23 @@ The low-level app-server client package. It exports:
- `@peezy.tech/codex-flows/rpc`: JSON-RPC helpers and types.
- `@peezy.tech/codex-flows/generated`: generated Codex app-server protocol types.
### `flow-runner`
CLI package for listing flow packages, firing every step that matches a
`FlowEvent`, or running one explicit flow step.
### `flow-backend-systemd-local`
HTTP and CLI backend that persists dispatched flow events/runs to SQLite and
starts matching steps locally. It is intended to run as a small systemd-managed
service, with optional transient `systemd-run` units per step.
### `@peezy.tech/flow-runtime`
Shared runtime package for loading `flow.toml`, validating payload JSON Schema,
matching steps to generic events, and invoking Bun or feature-flagged Code Mode
steps.
### `web`
The browser app imports `@peezy.tech/codex-flows/browser`, opens a direct WebSocket

View file

@ -0,0 +1,25 @@
{
"name": "codex-flow-systemd-local",
"version": "0.1.0",
"description": "Local flow execution backend suitable for a systemd-managed service.",
"type": "module",
"private": true,
"license": "Apache-2.0",
"bin": {
"codex-flow-systemd-local": "./src/index.ts"
},
"scripts": {
"build": "tsc --noEmit",
"check:types": "tsc --noEmit",
"start": "bun ./src/index.ts serve",
"test": "bun test test/*.test.ts"
},
"dependencies": {
"@peezy.tech/flow-runtime": "workspace:*"
},
"devDependencies": {
"@types/bun": "catalog:",
"@types/node": "catalog:",
"typescript": "catalog:"
}
}

View file

@ -0,0 +1,174 @@
import { createHash } from "node:crypto";
import { mkdir } from "node:fs/promises";
import path from "node:path";
import {
discoverFlows,
matchingSteps,
type FlowEvent,
type FlowStep,
type LoadedFlow,
} from "@peezy.tech/flow-runtime";
import type { FlowBackendConfig } from "./config.ts";
import { executeCommand, flowCommand, parseRunnerResult } from "./executor.ts";
import { FlowBackendStore, type FlowRunRecord } from "./store.ts";
export type DispatchFlowEventOptions = {
config: FlowBackendConfig;
store: FlowBackendStore;
event: FlowEvent;
wait?: boolean;
env?: Record<string, string | undefined>;
};
export type DispatchFlowEventResult = {
status: "accepted" | "duplicate";
eventId: string;
runIds: string[];
matched: number;
};
export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Promise<DispatchFlowEventResult> {
const inserted = options.store.insertEvent(options.event);
if (!inserted) {
return {
status: "duplicate",
eventId: options.event.id,
runIds: options.store.listRunsByEvent(options.event.id).map((run) => run.id),
matched: 0,
};
}
const eventPath = await writeEventFile(options.config.dataDir, options.event);
const flows = await discoverFlows({ cwd: options.config.cwd });
const matches = await matchingSteps(flows, options.event);
const promises: Array<Promise<void>> = [];
for (const match of matches) {
const run = createRunRecord(options.config, options.event, match.flow, match.step, eventPath);
options.store.createRun(run);
const promise = executeAndRecord({
config: options.config,
store: options.store,
run,
env: options.env,
});
if (options.wait) {
promises.push(promise);
} else {
promise.catch((error) => {
options.store.markRunCompleted(run.id, {
status: "failed",
stdout: "",
stderr: "",
error: error instanceof Error ? error.message : String(error),
});
});
}
}
if (promises.length > 0) {
await Promise.all(promises);
}
return {
status: "accepted",
eventId: options.event.id,
runIds: matches.map((match) => runId(options.event.id, match.flow.manifest.name, match.step.name)),
matched: matches.length,
};
}
export async function readFlowEvent(pathValue: string): Promise<FlowEvent> {
return normalizeFlowEvent(JSON.parse(await Bun.file(path.resolve(pathValue)).text()) as unknown);
}
export function normalizeFlowEvent(value: unknown): FlowEvent {
if (!isRecord(value) || typeof value.id !== "string" || typeof value.type !== "string") {
throw new Error("FlowEvent requires string id and type");
}
return {
receivedAt: typeof value.receivedAt === "string" ? value.receivedAt : new Date().toISOString(),
payload: isRecord(value.payload) ? value.payload : {},
...value,
} as FlowEvent;
}
async function executeAndRecord(options: {
config: FlowBackendConfig;
store: FlowBackendStore;
run: FlowRunRecord;
env?: Record<string, string | undefined>;
}): Promise<void> {
const command = flowCommand({
config: options.config,
runId: options.run.id,
eventPath: options.run.eventPath,
flowName: options.run.flowName,
stepName: options.run.stepName,
env: options.env,
});
options.store.markRunRunning(options.run.id, JSON.stringify(command), command.unit);
let result: Awaited<ReturnType<typeof executeCommand>>;
try {
result = await executeCommand(command, options.config, options.env);
} catch (error) {
options.store.markRunCompleted(options.run.id, {
status: "failed",
stdout: "",
stderr: "",
error: error instanceof Error ? error.message : String(error),
});
return;
}
const status = result.exitCode === 0 ? "completed" : "failed";
options.store.markRunCompleted(options.run.id, {
status,
resultJson: parseRunnerResult(result.stdout),
stdout: result.stdout,
stderr: result.stderr,
...(status === "failed" ? { error: `flow runner exited with ${result.exitCode ?? "unknown"}` } : {}),
});
}
function createRunRecord(
config: FlowBackendConfig,
event: FlowEvent,
flow: LoadedFlow,
step: FlowStep,
eventPath: string,
): FlowRunRecord {
return {
id: runId(event.id, flow.manifest.name, step.name),
eventId: event.id,
flowName: flow.manifest.name,
stepName: step.name,
status: "queued",
backend: "systemd-local",
executor: config.executor,
eventPath,
createdAt: new Date().toISOString(),
};
}
function runId(eventId: string, flowName: string, stepName: string): string {
const hash = createHash("sha256")
.update(`${eventId}\0${flowName}\0${stepName}`)
.digest("hex")
.slice(0, 12);
return `run_${hash}`;
}
async function writeEventFile(dataDir: string, event: FlowEvent): Promise<string> {
const directory = path.join(dataDir, "events");
await mkdir(directory, { recursive: true });
const filePath = path.join(directory, `${safeFileName(event.id)}.json`);
await Bun.write(filePath, JSON.stringify(event, null, 2));
return filePath;
}
function safeFileName(value: string): string {
const hash = createHash("sha256").update(value).digest("hex").slice(0, 12);
const base = value.toLowerCase().replace(/[^a-z0-9._-]+/g, "-").replace(/^-+|-+$/g, "").slice(0, 64);
return `${base || "event"}-${hash}`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View file

@ -0,0 +1,191 @@
import path from "node:path";
export type FlowBackendExecutor = "direct" | "systemd-run";
export type FlowBackendConfig = {
cwd: string;
dataDir: string;
host: string;
port: number;
secret?: string;
executor: FlowBackendExecutor;
bunCommand: string;
flowRunnerPath: string;
forwardEnv: string[];
};
export type FlowBackendCli =
| { kind: "help" }
| { kind: "serve"; config: FlowBackendConfig }
| { kind: "dispatch"; config: FlowBackendConfig; eventPath: string; wait: boolean };
export function readConfig(
env: Record<string, string | undefined> = process.env,
overrides: Partial<FlowBackendConfig> = {},
): FlowBackendConfig {
const cwd = path.resolve(overrides.cwd ?? env.CODEX_FLOW_BACKEND_CWD ?? process.cwd());
const dataDir = path.resolve(overrides.dataDir ?? env.CODEX_FLOW_BACKEND_DATA_DIR ?? path.join(cwd, ".codex", "flow-backend"));
return {
cwd,
dataDir,
host: overrides.host ?? env.CODEX_FLOW_BACKEND_HOST ?? "127.0.0.1",
port: overrides.port ?? numberEnv(env.CODEX_FLOW_BACKEND_PORT, 7345),
...(overrides.secret ?? env.CODEX_FLOW_BACKEND_SECRET
? { secret: overrides.secret ?? env.CODEX_FLOW_BACKEND_SECRET }
: {}),
executor: overrides.executor ?? executorEnv(env.CODEX_FLOW_BACKEND_EXECUTOR),
bunCommand: overrides.bunCommand ?? env.CODEX_FLOW_BACKEND_BUN ?? process.execPath,
flowRunnerPath: path.resolve(
overrides.flowRunnerPath ?? env.CODEX_FLOW_RUNNER_PATH ?? defaultFlowRunnerPath(),
),
forwardEnv: overrides.forwardEnv ?? forwardEnv(env.CODEX_FLOW_BACKEND_FORWARD_ENV),
};
}
export function parseCli(argv: string[], env: Record<string, string | undefined> = process.env): FlowBackendCli {
const command = argv[0];
if (!command || command === "help" || command === "-h" || command === "--help") {
return { kind: "help" };
}
let cwd: string | undefined;
let dataDir: string | undefined;
let host: string | undefined;
let port: number | undefined;
let secret: string | undefined;
let executor: FlowBackendExecutor | undefined;
let bunCommand: string | undefined;
let flowRunnerPath: string | undefined;
let wait = false;
let eventPath: string | undefined;
const rest = argv.slice(1);
for (let index = 0; index < rest.length; index += 1) {
const arg = rest[index];
if (!arg) {
continue;
}
if (arg === "--cwd") {
cwd = required(rest, ++index, arg);
continue;
}
if (arg === "--data-dir") {
dataDir = required(rest, ++index, arg);
continue;
}
if (arg === "--host") {
host = required(rest, ++index, arg);
continue;
}
if (arg === "--port") {
port = Number(required(rest, ++index, arg));
continue;
}
if (arg === "--secret") {
secret = required(rest, ++index, arg);
continue;
}
if (arg === "--executor") {
executor = executorEnv(required(rest, ++index, arg));
continue;
}
if (arg === "--bun") {
bunCommand = required(rest, ++index, arg);
continue;
}
if (arg === "--flow-runner") {
flowRunnerPath = required(rest, ++index, arg);
continue;
}
if (arg === "--event") {
eventPath = required(rest, ++index, arg);
continue;
}
if (arg === "--wait") {
wait = true;
continue;
}
throw new Error(`Unknown option: ${arg}`);
}
const config = readConfig(env, {
...(cwd ? { cwd } : {}),
...(dataDir ? { dataDir } : {}),
...(host ? { host } : {}),
...(port !== undefined ? { port } : {}),
...(secret ? { secret } : {}),
...(executor ? { executor } : {}),
...(bunCommand ? { bunCommand } : {}),
...(flowRunnerPath ? { flowRunnerPath } : {}),
});
if (command === "serve") {
return { kind: "serve", config };
}
if (command === "dispatch") {
if (!eventPath) {
throw new Error("dispatch requires --event <path>");
}
return { kind: "dispatch", config, eventPath, wait };
}
throw new Error(`Unknown command: ${command}`);
}
export function defaultFlowRunnerPath(): string {
return path.resolve(import.meta.dir, "..", "..", "flow-runner", "src", "index.ts");
}
export function helpText(): string {
return [
"Usage:",
" codex-flow-systemd-local serve [--cwd <dir>] [--data-dir <dir>] [--host <host>] [--port <port>]",
" codex-flow-systemd-local dispatch --event <event.json> [--cwd <dir>] [--data-dir <dir>] [--wait]",
"",
"Environment:",
" CODEX_FLOW_BACKEND_SECRET Optional HMAC secret for HTTP dispatches",
" CODEX_FLOW_BACKEND_EXECUTOR direct or systemd-run",
" CODEX_FLOWS_ENABLE_CODE_MODE Enables runner = \"code-mode\" steps",
"",
].join("\n");
}
function numberEnv(value: string | undefined, fallback: number): number {
if (!value) {
return fallback;
}
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : fallback;
}
function executorEnv(value: string | undefined): FlowBackendExecutor {
if (value === "systemd-run") {
return "systemd-run";
}
if (!value || value === "direct") {
return "direct";
}
throw new Error("executor must be direct or systemd-run");
}
function forwardEnv(value: string | undefined): string[] {
const defaults = [
"CODEX_FLOWS_ENABLE_CODE_MODE",
"CODEX_APP_SERVER_CODEX_COMMAND",
"CODEX_HOME",
"HOME",
"PATH",
];
if (!value) {
return defaults;
}
return value
.split(",")
.map((entry) => entry.trim())
.filter(Boolean);
}
function required(args: string[], index: number, flag: string): string {
const value = args[index];
if (!value) {
throw new Error(`${flag} requires a value`);
}
return value;
}

View file

@ -0,0 +1,113 @@
import { createHash } from "node:crypto";
import type { FlowBackendConfig } from "./config.ts";
export type FlowCommandSpec = {
command: string;
args: string[];
unit?: string;
};
export type ExecuteFlowRunOptions = {
config: FlowBackendConfig;
runId: string;
eventPath: string;
flowName: string;
stepName: string;
env?: Record<string, string | undefined>;
};
export type ExecuteFlowRunResult = {
command: FlowCommandSpec;
exitCode: number | null;
stdout: string;
stderr: string;
};
export async function executeFlowRun(options: ExecuteFlowRunOptions): Promise<ExecuteFlowRunResult> {
const command = flowCommand(options);
const result = await executeCommand(command, options.config, options.env);
return { command, ...result };
}
export async function executeCommand(
command: FlowCommandSpec,
config: FlowBackendConfig,
env: Record<string, string | undefined> = process.env,
): Promise<Omit<ExecuteFlowRunResult, "command">> {
const child = Bun.spawn([command.command, ...command.args], {
cwd: config.cwd,
env: forwardedEnv(config, env),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
new Response(child.stdout).text(),
new Response(child.stderr).text(),
child.exited,
]);
return { exitCode, stdout, stderr };
}
export function flowCommand(options: ExecuteFlowRunOptions): FlowCommandSpec {
const runnerArgs = [
options.config.flowRunnerPath,
"--cwd",
options.config.cwd,
"run",
options.flowName,
options.stepName,
"--event",
options.eventPath,
];
if (options.config.executor === "direct") {
return { command: options.config.bunCommand, args: runnerArgs };
}
const unit = `codex-flow-${safeUnit(options.runId)}`;
return {
command: "systemd-run",
unit,
args: [
"--user",
"--collect",
"--wait",
`--unit=${unit}`,
`--working-directory=${options.config.cwd}`,
...systemdSetEnvArgs(options.config, options.env ?? process.env),
options.config.bunCommand,
...runnerArgs,
],
};
}
export function parseRunnerResult(stdout: string): string | undefined {
const trimmed = stdout.trim();
if (!trimmed.startsWith("{")) {
return undefined;
}
try {
return JSON.stringify(JSON.parse(trimmed));
} catch {
return undefined;
}
}
function forwardedEnv(config: FlowBackendConfig, env: Record<string, string | undefined>): Record<string, string> {
const next: Record<string, string> = {};
const source: Record<string, string | undefined> = { ...process.env, ...env };
for (const name of config.forwardEnv) {
const value = source[name];
if (value !== undefined) {
next[name] = value;
}
}
return next;
}
function systemdSetEnvArgs(config: FlowBackendConfig, env: Record<string, string | undefined>): string[] {
return Object.entries(forwardedEnv(config, env)).map(([key, value]) => `--setenv=${key}=${value}`);
}
function safeUnit(value: string): string {
const hash = createHash("sha256").update(value).digest("hex").slice(0, 10);
return `${value.toLowerCase().replace(/[^a-z0-9-]+/g, "-").replace(/^-+|-+$/g, "").slice(0, 48)}-${hash}`;
}

View file

@ -0,0 +1,38 @@
#!/usr/bin/env bun
import path from "node:path";
import { dispatchFlowEvent, readFlowEvent } from "./backend.ts";
import { helpText, parseCli } from "./config.ts";
import { serveFlowBackend } from "./server.ts";
import { FlowBackendStore } from "./store.ts";
await main().catch((error) => {
process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
process.exitCode = 1;
});
async function main(): Promise<void> {
const cli = parseCli(Bun.argv.slice(2));
if (cli.kind === "help") {
process.stdout.write(helpText());
return;
}
if (cli.kind === "serve") {
const server = serveFlowBackend(cli.config);
process.stdout.write(`codex-flow-systemd-local listening on http://${server.hostname}:${server.port}\n`);
return new Promise(() => undefined);
}
const store = new FlowBackendStore(path.join(cli.config.dataDir, "flow-backend.sqlite"));
try {
const event = await readFlowEvent(cli.eventPath);
const result = await dispatchFlowEvent({
config: cli.config,
store,
event,
wait: cli.wait,
env: process.env,
});
process.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
} finally {
store.close();
}
}

View file

@ -0,0 +1,43 @@
import path from "node:path";
import type { FlowBackendConfig } from "./config.ts";
import { dispatchFlowEvent, normalizeFlowEvent } from "./backend.ts";
import { requestSignature, verifyBodySignature } from "./signature.ts";
import { FlowBackendStore } from "./store.ts";
export function serveFlowBackend(config: FlowBackendConfig): ReturnType<typeof Bun.serve> {
const store = new FlowBackendStore(path.join(config.dataDir, "flow-backend.sqlite"));
return Bun.serve({
hostname: config.host,
port: config.port,
async fetch(request) {
const url = new URL(request.url);
if (request.method === "GET" && url.pathname === "/healthz") {
return json({ ok: true });
}
if (request.method === "POST" && (url.pathname === "/events" || url.pathname === "/flow-events")) {
const body = await request.text();
if (config.secret && !verifyBodySignature(config.secret, body, requestSignature(request.headers))) {
return json({ error: "invalid signature" }, 401);
}
const event = normalizeFlowEvent(JSON.parse(body) as unknown);
const result = await dispatchFlowEvent({ config, store, event });
return json(result, 202);
}
if (request.method === "GET" && url.pathname === "/runs") {
const eventId = url.searchParams.get("eventId");
if (!eventId) {
return json({ error: "missing eventId" }, 400);
}
return json({ eventId, runs: store.listRunsByEvent(eventId) });
}
return json({ error: "not found" }, 404);
},
});
}
function json(value: unknown, status = 200): Response {
return new Response(JSON.stringify(value, null, 2), {
status,
headers: { "content-type": "application/json" },
});
}

View file

@ -0,0 +1,18 @@
import { createHmac, timingSafeEqual } from "node:crypto";
export function signBody(secret: string, body: string): string {
return `sha256=${createHmac("sha256", secret).update(body).digest("hex")}`;
}
export function verifyBodySignature(secret: string, body: string, signature: string | null): boolean {
if (!signature?.startsWith("sha256=")) {
return false;
}
const expected = Buffer.from(signBody(secret, body));
const actual = Buffer.from(signature);
return expected.length === actual.length && timingSafeEqual(expected, actual);
}
export function requestSignature(headers: Headers): string | null {
return headers.get("x-flow-signature-256") ?? headers.get("x-patchbay-flow-signature-256");
}

View file

@ -0,0 +1,198 @@
import { mkdirSync } from "node:fs";
import path from "node:path";
import { Database } from "bun:sqlite";
import type { FlowEvent } from "@peezy.tech/flow-runtime";
export type FlowRunStatus = "queued" | "running" | "completed" | "failed";
export type FlowRunRecord = {
id: string;
eventId: string;
flowName: string;
stepName: string;
status: FlowRunStatus;
backend: "systemd-local";
executor: string;
unit?: string;
eventPath: string;
commandJson?: string;
resultJson?: string;
stdout?: string;
stderr?: string;
error?: string;
createdAt: string;
startedAt?: string;
completedAt?: string;
};
export class FlowBackendStore {
readonly dbPath: string;
#db: Database;
constructor(dbPath: string) {
this.dbPath = dbPath;
mkdirSync(path.dirname(dbPath), { recursive: true });
this.#db = new Database(dbPath);
this.#db.exec(`
create table if not exists flow_events (
id text primary key,
type text not null,
source text,
occurred_at text,
received_at text not null,
payload_json text not null,
raw_json text not null,
created_at text not null
);
create table if not exists flow_runs (
id text primary key,
event_id text not null,
flow_name text not null,
step_name text not null,
status text not null,
backend text not null,
executor text not null,
unit text,
event_path text not null,
command_json text,
result_json text,
stdout text,
stderr text,
error text,
created_at text not null,
started_at text,
completed_at text
);
create index if not exists flow_runs_event_id_idx on flow_runs(event_id);
`);
}
insertEvent(event: FlowEvent): boolean {
const result = this.#db
.query(
`insert or ignore into flow_events
(id, type, source, occurred_at, received_at, payload_json, raw_json, created_at)
values ($id, $type, $source, $occurredAt, $receivedAt, $payloadJson, $rawJson, $createdAt)`,
)
.run({
$id: event.id,
$type: event.type,
$source: event.source ?? null,
$occurredAt: event.occurredAt ?? null,
$receivedAt: event.receivedAt,
$payloadJson: JSON.stringify(event.payload),
$rawJson: JSON.stringify(event),
$createdAt: new Date().toISOString(),
});
return result.changes > 0;
}
createRun(record: FlowRunRecord): void {
this.#db
.query(
`insert into flow_runs
(id, event_id, flow_name, step_name, status, backend, executor, unit, event_path,
command_json, result_json, stdout, stderr, error, created_at, started_at, completed_at)
values
($id, $eventId, $flowName, $stepName, $status, $backend, $executor, $unit, $eventPath,
$commandJson, $resultJson, $stdout, $stderr, $error, $createdAt, $startedAt, $completedAt)`,
)
.run(runParams(record));
}
markRunRunning(id: string, commandJson: string, unit?: string): void {
this.#db
.query(
`update flow_runs
set status = 'running', started_at = $startedAt, command_json = $commandJson, unit = $unit
where id = $id`,
)
.run({
$id: id,
$startedAt: new Date().toISOString(),
$commandJson: commandJson,
$unit: unit ?? null,
});
}
markRunCompleted(id: string, values: { status: FlowRunStatus; resultJson?: string; stdout: string; stderr: string; error?: string }): void {
this.#db
.query(
`update flow_runs
set status = $status, completed_at = $completedAt, result_json = $resultJson,
stdout = $stdout, stderr = $stderr, error = $error
where id = $id`,
)
.run({
$id: id,
$status: values.status,
$completedAt: new Date().toISOString(),
$resultJson: values.resultJson ?? null,
$stdout: values.stdout,
$stderr: values.stderr,
$error: values.error ?? null,
});
}
listRunsByEvent(eventId: string): FlowRunRecord[] {
return this.#db
.query("select * from flow_runs where event_id = $eventId order by created_at, id")
.all({ $eventId: eventId })
.map(rowToRunRecord);
}
close(): void {
this.#db.close();
}
}
function runParams(record: FlowRunRecord): Record<string, string | null> {
return {
$id: record.id,
$eventId: record.eventId,
$flowName: record.flowName,
$stepName: record.stepName,
$status: record.status,
$backend: record.backend,
$executor: record.executor,
$unit: record.unit ?? null,
$eventPath: record.eventPath,
$commandJson: record.commandJson ?? null,
$resultJson: record.resultJson ?? null,
$stdout: record.stdout ?? null,
$stderr: record.stderr ?? null,
$error: record.error ?? null,
$createdAt: record.createdAt,
$startedAt: record.startedAt ?? null,
$completedAt: record.completedAt ?? null,
};
}
function rowToRunRecord(row: unknown): FlowRunRecord {
if (!isRecord(row)) {
throw new Error("invalid run row");
}
return {
id: String(row.id),
eventId: String(row.event_id),
flowName: String(row.flow_name),
stepName: String(row.step_name),
status: String(row.status) as FlowRunStatus,
backend: "systemd-local",
executor: String(row.executor),
...(typeof row.unit === "string" ? { unit: row.unit } : {}),
eventPath: String(row.event_path),
...(typeof row.command_json === "string" ? { commandJson: row.command_json } : {}),
...(typeof row.result_json === "string" ? { resultJson: row.result_json } : {}),
...(typeof row.stdout === "string" ? { stdout: row.stdout } : {}),
...(typeof row.stderr === "string" ? { stderr: row.stderr } : {}),
...(typeof row.error === "string" ? { error: row.error } : {}),
createdAt: String(row.created_at),
...(typeof row.started_at === "string" ? { startedAt: row.started_at } : {}),
...(typeof row.completed_at === "string" ? { completedAt: row.completed_at } : {}),
};
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View file

@ -0,0 +1,121 @@
import { expect, test } from "bun:test";
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { dispatchFlowEvent } from "../src/backend.ts";
import { readConfig } from "../src/config.ts";
import { flowCommand } from "../src/executor.ts";
import { signBody, verifyBodySignature } from "../src/signature.ts";
import { FlowBackendStore } from "../src/store.ts";
test("signs and verifies dispatch bodies", () => {
const body = JSON.stringify({ id: "event-1" });
const signature = signBody("secret", body);
expect(verifyBodySignature("secret", body, signature)).toBe(true);
expect(verifyBodySignature("secret", `${body}\n`, signature)).toBe(false);
});
test("dispatches matching flow steps and records runs", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-backend-"));
try {
await writeFlow(directory);
const config = readConfig(
{},
{
cwd: directory,
dataDir: path.join(directory, ".codex", "flow-backend"),
executor: "direct",
bunCommand: process.execPath,
},
);
const store = new FlowBackendStore(path.join(config.dataDir, "flow-backend.sqlite"));
try {
const result = await dispatchFlowEvent({
config,
store,
wait: true,
env: {},
event: {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { name: "Ada" },
},
});
expect(result).toMatchObject({ status: "accepted", eventId: "event-1", matched: 1 });
const runs = store.listRunsByEvent("event-1");
expect(runs).toHaveLength(1);
expect(runs[0]).toMatchObject({
flowName: "demo",
stepName: "hello",
status: "completed",
});
expect(runs[0]?.stdout).toContain("hello Ada");
} finally {
store.close();
}
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("builds systemd-run commands without executing them", () => {
const config = readConfig({}, { cwd: "/tmp/project", executor: "systemd-run", bunCommand: "/usr/bin/bun" });
const command = flowCommand({
config,
runId: "run_123",
eventPath: "/tmp/event.json",
flowName: "demo",
stepName: "hello",
env: { CODEX_FLOWS_ENABLE_CODE_MODE: "1" },
});
expect(command.command).toBe("systemd-run");
expect(command.args).toContain("--user");
expect(command.args).toContain("--wait");
expect(command.args).toContain("--setenv=CODEX_FLOWS_ENABLE_CODE_MODE=1");
expect(command.args).toContain("/usr/bin/bun");
});
async function writeFlow(root: string): Promise<void> {
const flowRoot = path.join(root, "flows", "demo");
await mkdir(path.join(flowRoot, "exec"), { recursive: true });
await mkdir(path.join(flowRoot, "schemas"), { recursive: true });
await Bun.write(
path.join(flowRoot, "flow.toml"),
[
'name = "demo"',
"version = 1",
"",
"[[steps]]",
'name = "hello"',
'runner = "bun"',
'script = "exec/hello.ts"',
"timeout_ms = 30000",
"",
"[steps.trigger]",
'type = "demo.event"',
'schema = "schemas/demo-event.schema.json"',
"",
].join("\n"),
);
await Bun.write(
path.join(flowRoot, "schemas/demo-event.schema.json"),
JSON.stringify({
type: "object",
required: ["name"],
properties: { name: { type: "string" } },
}),
);
await Bun.write(
path.join(flowRoot, "exec/hello.ts"),
[
"const context = JSON.parse(await Bun.stdin.text());",
"const name = context.flow.event.payload.name;",
"console.log(`FLOW_RESULT ${JSON.stringify({ status: 'completed', message: `hello ${name}` })}`);",
"",
].join("\n"),
);
}

View file

@ -0,0 +1,21 @@
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "Bundler",
"allowImportingTsExtensions": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"target": "ES2022",
"lib": ["ESNext"],
"strict": true,
"skipLibCheck": true,
"noUncheckedIndexedAccess": true,
"isolatedModules": true,
"verbatimModuleSyntax": true,
"forceConsistentCasingInFileNames": true,
"noEmit": true,
"types": ["node", "bun"],
"rootDir": "."
},
"include": ["src/**/*.ts", "test/**/*.ts"]
}

View file

@ -0,0 +1,24 @@
{
"name": "codex-flow-runner",
"version": "0.1.0",
"description": "CLI for listing, firing, and running Codex flow packages.",
"type": "module",
"private": true,
"license": "Apache-2.0",
"bin": {
"codex-flow-runner": "./src/index.ts"
},
"scripts": {
"build": "tsc --noEmit",
"check:types": "tsc --noEmit",
"test": "bun test test/*.test.ts"
},
"dependencies": {
"@peezy.tech/flow-runtime": "workspace:*"
},
"devDependencies": {
"@types/bun": "catalog:",
"@types/node": "catalog:",
"typescript": "catalog:"
}
}

View file

@ -0,0 +1,172 @@
#!/usr/bin/env bun
import path from "node:path";
import {
discoverFlows,
matchingSteps,
runFlowStep,
type FlowEvent,
type LoadedFlow,
type FlowStep,
} from "@peezy.tech/flow-runtime";
type Cli =
| { kind: "help" }
| { kind: "list"; cwd: string }
| { kind: "fire"; cwd: string; eventPath: string }
| { kind: "run"; cwd: string; flow: string; step: string; eventPath: string };
await main().catch((error) => {
process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
process.exitCode = 1;
});
async function main(): Promise<void> {
const cli = parseArgs(Bun.argv.slice(2));
if (cli.kind === "help") {
process.stdout.write(helpText());
return;
}
const flows = await discoverFlows({ cwd: cli.cwd });
if (cli.kind === "list") {
for (const flow of flows) {
process.stdout.write(`${flow.manifest.name}\t${flow.root}\n`);
}
return;
}
const event = await readEvent(cli.eventPath);
if (cli.kind === "fire") {
const matches = await matchingSteps(flows, event);
const results = [];
for (const match of matches) {
results.push(await runAndReport(match.flow, match.step, event));
}
process.stdout.write(`${JSON.stringify({ eventId: event.id, results }, null, 2)}\n`);
return;
}
const flow = requireFlow(flows, cli.flow);
const step = requireStep(flow, cli.step);
const result = await runAndReport(flow, step, event);
process.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
}
async function runAndReport(flow: LoadedFlow, step: FlowStep, event: FlowEvent): Promise<Record<string, unknown>> {
const result = await runFlowStep({
flow,
step,
event,
env: process.env,
codeMode: {
codexCommand: process.env.CODEX_APP_SERVER_CODEX_COMMAND,
codexHome: process.env.CODEX_HOME,
stream: true,
},
});
return {
flow: flow.manifest.name,
step: step.name,
result,
};
}
async function readEvent(eventPath: string): Promise<FlowEvent> {
const parsed = JSON.parse(await Bun.file(path.resolve(eventPath)).text()) as unknown;
if (!isRecord(parsed) || typeof parsed.id !== "string" || typeof parsed.type !== "string") {
throw new Error("event file must contain at least string id and type");
}
return {
receivedAt: new Date().toISOString(),
payload: {},
...parsed,
} as FlowEvent;
}
function requireFlow(flows: LoadedFlow[], name: string): LoadedFlow {
const flow = flows.find((entry) => entry.manifest.name === name);
if (!flow) {
throw new Error(`Unknown flow: ${name}`);
}
return flow;
}
function requireStep(flow: LoadedFlow, name: string): FlowStep {
const step = flow.manifest.steps.find((entry) => entry.name === name);
if (!step) {
throw new Error(`Unknown step ${name} in flow ${flow.manifest.name}`);
}
return step;
}
function parseArgs(argv: string[]): Cli {
let cwd = process.cwd();
const args: string[] = [];
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
if (!arg) {
continue;
}
if (arg === "--cwd") {
cwd = path.resolve(required(argv, ++index, arg));
continue;
}
if (arg.startsWith("--cwd=")) {
cwd = path.resolve(arg.slice("--cwd=".length));
continue;
}
args.push(arg);
}
const command = args[0];
if (!command || command === "-h" || command === "--help" || command === "help") {
return { kind: "help" };
}
if (command === "list") {
return { kind: "list", cwd };
}
if (command === "fire") {
return { kind: "fire", cwd, eventPath: eventPathArg(args, 1) };
}
if (command === "run") {
const flow = args[1];
const step = args[2];
if (!flow || !step) {
throw new Error("run requires <flow> <step>");
}
return { kind: "run", cwd, flow, step, eventPath: eventPathArg(args, 3) };
}
throw new Error(`Unknown command: ${command}`);
}
function eventPathArg(args: string[], start: number): string {
for (let index = start; index < args.length; index += 1) {
const arg = args[index];
if (arg === "--event") {
return required(args, index + 1, "--event");
}
if (arg?.startsWith("--event=")) {
return arg.slice("--event=".length);
}
}
throw new Error("missing --event <path>");
}
function required(args: string[], index: number, flag: string): string {
const value = args[index];
if (!value) {
throw new Error(`${flag} requires a value`);
}
return value;
}
function helpText(): string {
return [
"Usage:",
" codex-flow-runner [--cwd <dir>] list",
" codex-flow-runner [--cwd <dir>] fire --event <event.json>",
" codex-flow-runner [--cwd <dir>] run <flow> <step> --event <event.json>",
"",
].join("\n");
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View file

@ -0,0 +1,21 @@
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "Bundler",
"allowImportingTsExtensions": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"target": "ES2022",
"lib": ["ESNext"],
"strict": true,
"skipLibCheck": true,
"noUncheckedIndexedAccess": true,
"isolatedModules": true,
"verbatimModuleSyntax": true,
"forceConsistentCasingInFileNames": true,
"noEmit": true,
"types": ["node", "bun"],
"rootDir": "."
},
"include": ["src/**/*.ts", "test/**/*.ts"]
}

View file

@ -36,6 +36,36 @@
"typescript": "catalog:",
},
},
"apps/flow-backend-systemd-local": {
"name": "codex-flow-systemd-local",
"version": "0.1.0",
"bin": {
"codex-flow-systemd-local": "./src/index.ts",
},
"dependencies": {
"@peezy.tech/flow-runtime": "workspace:*",
},
"devDependencies": {
"@types/bun": "catalog:",
"@types/node": "catalog:",
"typescript": "catalog:",
},
},
"apps/flow-runner": {
"name": "codex-flow-runner",
"version": "0.1.0",
"bin": {
"codex-flow-runner": "./src/index.ts",
},
"dependencies": {
"@peezy.tech/flow-runtime": "workspace:*",
},
"devDependencies": {
"@types/bun": "catalog:",
"@types/node": "catalog:",
"typescript": "catalog:",
},
},
"apps/web": {
"name": "web",
"version": "0.0.1",
@ -58,13 +88,25 @@
},
"packages/codex-client": {
"name": "@peezy.tech/codex-flows",
"version": "0.1.0",
"version": "0.1.1",
"devDependencies": {
"@types/bun": "^1.3.13",
"@types/node": "^22.10.10",
"typescript": "^5.9.2",
},
},
"packages/flow-runtime": {
"name": "@peezy.tech/flow-runtime",
"version": "0.1.0",
"dependencies": {
"@peezy.tech/codex-flows": "workspace:*",
},
"devDependencies": {
"@types/bun": "catalog:",
"@types/node": "catalog:",
"typescript": "catalog:",
},
},
"packages/ui": {
"name": "@workspace/ui",
"version": "0.0.0",
@ -297,6 +339,8 @@
"@peezy.tech/codex-flows": ["@peezy.tech/codex-flows@workspace:packages/codex-client"],
"@peezy.tech/flow-runtime": ["@peezy.tech/flow-runtime@workspace:packages/flow-runtime"],
"@rolldown/pluginutils": ["@rolldown/pluginutils@1.0.0-rc.3", "", {}, "sha512-eybk3TjzzzV97Dlj5c+XrBFW57eTNhzod66y9HrBlzJ6NsCrWCp/2kaPS3K9wJmurBC0Tdw4yPjXKZqlznim3Q=="],
"@rollup/rollup-android-arm-eabi": ["@rollup/rollup-android-arm-eabi@4.60.3", "", { "os": "android", "cpu": "arm" }, "sha512-x35CNW/ANXG3hE/EZpRU8MXX1JDN86hBb2wMGAtltkz7pc6cxgjpy1OMMfDosOQ+2hWqIkag/fGok1Yady9nGw=="],
@ -485,6 +529,10 @@
"codex-discord-bridge": ["codex-discord-bridge@workspace:apps/discord-bridge"],
"codex-flow-runner": ["codex-flow-runner@workspace:apps/flow-runner"],
"codex-flow-systemd-local": ["codex-flow-systemd-local@workspace:apps/flow-backend-systemd-local"],
"color-convert": ["color-convert@2.0.1", "", { "dependencies": { "color-name": "~1.1.4" } }, "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ=="],
"color-name": ["color-name@1.1.4", "", {}, "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA=="],

164
docs/flows.md Normal file
View file

@ -0,0 +1,164 @@
# Flows
Flows are packaged automation units. They are discovered from `.codex/flows/*`
first and then `flows/*`, with the installed `.codex` copy taking precedence.
Each flow has:
```text
flow.toml
schemas/*.schema.json
exec/*
```
`flow.toml` is the manifest. Use `flow` naming consistently:
```toml
name = "example-flow"
version = 1
description = "Short operational purpose."
[config]
commit = true
[[steps]]
name = "do-work"
runner = "bun"
script = "exec/do-work.ts"
timeout_ms = 300000
[steps.trigger]
type = "upstream.release"
schema = "schemas/upstream-release.schema.json"
```
The runtime passes a generic event to every step:
```ts
type FlowEvent<T = unknown> = {
id: string;
type: string;
source?: string;
occurredAt?: string;
receivedAt: string;
payload: T;
};
```
Domain payload types live in each flow package as JSON Schema files and are
referenced by `steps.trigger.schema`.
## Runners
`runner = "bun"` executes the script directly with Bun. The step receives JSON
on stdin:
```json
{
"flow": {
"name": "example-flow",
"version": 1,
"root": "/repo/flows/example-flow",
"step": "do-work",
"config": {},
"event": {}
}
}
```
The script must print a final line beginning with `FLOW_RESULT ` followed by
JSON.
`runner = "code-mode"` starts a Codex app-server and calls the fork-only
`thread/codeMode/execute` method through a raw JSON-RPC request. Code Mode code
is present on `main`, but execution is disabled unless:
```bash
CODEX_FLOWS_ENABLE_CODE_MODE=1
```
Set `CODEX_APP_SERVER_CODEX_COMMAND` when Code Mode should run against the
Peezy fork instead of the default `codex` binary.
## Commands
List flows:
```bash
bun run flow list
```
Fire all matching steps for an event:
```bash
bun run flow fire --event event.json
```
Run one step:
```bash
bun run flow run openai-codex-bindings regenerate-bindings --event event.json
```
## Systemd-Local Backend
`codex-flow-systemd-local` is the first execution backend. Patchbay posts
generic `FlowEvent` JSON to this service; the service persists events and runs
to SQLite, discovers matching flow steps, and starts each step locally.
Run it directly:
```bash
bun run flow:backend serve --cwd /home/peezy/codex-flows-public
```
Useful environment:
```bash
CODEX_FLOW_BACKEND_HOST=127.0.0.1
CODEX_FLOW_BACKEND_PORT=7345
CODEX_FLOW_BACKEND_DATA_DIR=/var/lib/codex-flow-systemd-local
CODEX_FLOW_BACKEND_SECRET=shared-hmac-secret
CODEX_FLOW_BACKEND_EXECUTOR=direct
```
`CODEX_FLOW_BACKEND_EXECUTOR=systemd-run` wraps each step in a transient
`systemd-run --user --wait --collect` unit. The default `direct` executor is
still suitable when the backend service itself is managed by systemd.
Endpoints:
- `POST /events` or `POST /flow-events`: accept one `FlowEvent`
- `GET /runs?eventId=<id>`: inspect recorded runs for an event
- `GET /healthz`: health check
## Convex Backend Direction
Convex should be a durable orchestration backend, not the place where long
running Codex or shell work executes. A future Convex backend should:
- accept the same generic `FlowEvent` shape
- persist event, run, step, retry, and result records durably
- choose matching flow steps from a stored or installed flow manifest
- lease work to an external worker or remote app-server
- receive heartbeats and final `FLOW_RESULT` records from that worker
- expose programmatic fire/retry/cancel APIs
This keeps Patchbay dispatch-only, keeps Convex durable, and keeps process-heavy
work on infrastructure that can run Codex, Bun, Git, Cargo, and system tools.
## Codex Release Flows
The upstream `openai/codex` release event fans out to two flow packages:
- `openai-codex-bindings`: Bun runner. Uses canonical `@openai/codex@version`,
regenerates `@peezy.tech/codex-flows` app-server bindings, runs checks,
commits when changed, and can push/trigger trusted publishing when configured.
- `peezy-codex-fork`: Code Mode runner. Rebases the Peezy fork patch stack onto
the upstream release tag, optionally squashes the patch stack, verifies the
fork, and can push/tag to trigger the fork release workflow when configured.
Publishing is controlled by flow config and environment. The packaged defaults
commit local changes when appropriate but do not push or publish until
`push = true`, `publish = true`, or matching `CODEX_FLOW_PUSH=1` /
`CODEX_FLOW_PUBLISH=1` deployment configuration is set.

View file

@ -0,0 +1,196 @@
import { readFile, writeFile } from "node:fs/promises";
import path from "node:path";
type FlowContext = {
flow: {
config?: Record<string, unknown>;
event: {
id: string;
type: string;
payload: Record<string, unknown>;
};
};
};
type CommandResult = {
label: string;
cmd: string[];
cwd: string;
exitCode: number | null;
stdout: string;
stderr: string;
};
const context = JSON.parse(await Bun.stdin.text()) as FlowContext;
const config = context.flow.config ?? {};
const repoRoot = process.cwd();
const commands: CommandResult[] = [];
try {
const tag = stringValue(context.flow.event.payload.tag, "payload.tag");
const version = versionFromTag(tag);
const packageName = stringConfig("package_name", "@peezy.tech/codex-flows");
const generatedDir = path.resolve(repoRoot, stringConfig("generated_dir", "packages/codex-client/src/app-server/generated"));
const packageJsonPath = path.resolve(repoRoot, stringConfig("package_json", "packages/codex-client/package.json"));
const published = await npmPackageExists(packageName, version);
if (published && !enabled("force", false)) {
finish("skipped", `${packageName}@${version} is already published.`, { version, tag });
}
await requireCleanWorktree();
await run("regenerate app-server TypeScript bindings", [
"npx",
"-y",
`@openai/codex@${version}`,
"app-server",
"generate-ts",
"--experimental",
"--out",
generatedDir,
]);
await updatePackageVersion(packageJsonPath, version);
await run("refresh Bun lockfile", ["bun", "install"]);
await run("codex-flows package release check", ["bun", "run", "--filter", packageName, "release:check"]);
await run("workspace typecheck", ["bun", "run", "check:types"]);
await run("workspace tests", ["bun", "run", "test"]);
await run("git diff check", ["git", "diff", "--check"]);
const status = await run("final git status", ["git", "status", "--short"]);
if (!status.stdout.trim()) {
finish("skipped", `No generated binding changes for ${tag}.`, { version, tag });
}
if (enabled("commit", true)) {
await run("stage binding update", ["git", "add", "--", generatedDir, packageJsonPath, path.join(repoRoot, "bun.lock")]);
await run("commit binding update", [
"git",
"commit",
"-m",
`flow: update codex-flows for openai codex ${version}`,
]);
}
if (enabled("push", false)) {
await run("push jojo main", ["git", "push", "origin", "HEAD:main"]);
}
if (enabled("publish", false)) {
await run("push GitHub main", ["git", "push", "github", "HEAD:main"]);
await run("trigger GitHub trusted publish", [
"gh",
"workflow",
"run",
stringConfig("github_publish_workflow", "publish-codex-flows.yml"),
"--repo",
stringConfig("github_repo", "peezy-tech/codex-flows"),
"-f",
`confirm_package=${packageName}`,
]);
}
finish("changed", `${packageName} regenerated for openai/codex ${tag}.`, {
version,
tag,
committed: enabled("commit", true),
pushed: enabled("push", false),
published: enabled("publish", false),
});
} catch (error) {
finish("failed", error instanceof Error ? error.message : String(error));
}
async function requireCleanWorktree(): Promise<void> {
const status = await run("dirty worktree check", ["git", "status", "--porcelain=v1"]);
if (status.stdout.trim()) {
finish("blocked", "codex-flows checkout has local changes before the release update.", {
dirtyStatus: status.stdout,
});
}
}
async function npmPackageExists(packageName: string, version: string): Promise<boolean> {
const result = await run("published package check", [
"npm",
"view",
`${packageName}@${version}`,
"version",
"--json",
], { allowFailure: true });
return result.exitCode === 0 && result.stdout.includes(version);
}
async function updatePackageVersion(packageJsonPath: string, version: string): Promise<void> {
const parsed = JSON.parse(await readFile(packageJsonPath, "utf8")) as Record<string, unknown>;
parsed.version = version;
await writeFile(packageJsonPath, `${JSON.stringify(parsed, null, "\t")}\n`);
}
async function run(
label: string,
cmd: string[],
options: { allowFailure?: boolean; cwd?: string } = {},
): Promise<CommandResult> {
const child = Bun.spawn(cmd, {
cwd: options.cwd ?? repoRoot,
env: process.env,
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
child.stdout.text(),
child.stderr.text(),
child.exited,
]);
const result = { label, cmd, cwd: options.cwd ?? repoRoot, exitCode, stdout, stderr };
commands.push(result);
if (exitCode !== 0 && !options.allowFailure) {
throw new Error(`${label} failed with exit ${exitCode}:\n${stderr || stdout}`);
}
return result;
}
function finish(status: string, message: string, artifacts: Record<string, unknown> = {}): never {
const trimmedCommands = commands.map((command) => ({
...command,
stdout: truncate(command.stdout),
stderr: truncate(command.stderr),
}));
console.log(`FLOW_RESULT ${JSON.stringify({ status, message, artifacts: { ...artifacts, commands: trimmedCommands } })}`);
process.exit(0);
}
function enabled(name: string, fallback: boolean): boolean {
const envName = `CODEX_FLOW_${name.toUpperCase()}`;
const envValue = process.env[envName];
if (envValue !== undefined) {
return ["1", "true", "yes", "on"].includes(envValue.trim().toLowerCase());
}
const value = config[name];
return typeof value === "boolean" ? value : fallback;
}
function stringConfig(name: string, fallback: string): string {
const value = config[name];
return typeof value === "string" && value.trim() ? value : fallback;
}
function stringValue(value: unknown, name: string): string {
if (typeof value !== "string" || !value.trim()) {
throw new Error(`${name} must be a non-empty string`);
}
return value;
}
function versionFromTag(tag: string): string {
const match = tag.match(/[0-9]+\.[0-9]+\.[0-9]+(?:-[0-9A-Za-z.-]+)?/);
if (!match) {
throw new Error(`Could not infer semantic version from release tag ${tag}`);
}
return match[0];
}
function truncate(value: string, max = 4000): string {
return value.length <= max ? value : `${value.slice(0, max)}\n...[truncated ${value.length - max} chars]`;
}

View file

@ -0,0 +1,24 @@
name = "openai-codex-bindings"
version = 1
description = "Regenerate @peezy.tech/codex-flows bindings from a canonical openai/codex release."
[config]
package_name = "@peezy.tech/codex-flows"
generated_dir = "packages/codex-client/src/app-server/generated"
package_json = "packages/codex-client/package.json"
commit = true
push = false
publish = false
github_repo = "peezy-tech/codex-flows"
github_publish_workflow = "publish-codex-flows.yml"
[[steps]]
name = "regenerate-bindings"
runner = "bun"
script = "exec/update-bindings.ts"
cwd = "../.."
timeout_ms = 1200000
[steps.trigger]
type = "upstream.release"
schema = "schemas/upstream-release.schema.json"

View file

@ -0,0 +1,11 @@
{
"type": "object",
"required": ["repo", "tag"],
"properties": {
"provider": { "type": "string" },
"repo": { "type": "string", "enum": ["openai/codex"] },
"tag": { "type": "string" },
"url": { "type": "string" },
"publishedAt": { "type": "string" }
}
}

View file

@ -0,0 +1,374 @@
const config = flow.config || {};
const payload = flow.event.payload || {};
const commands = [];
function q(value) {
return "'" + String(value).replaceAll("'", "'\\''") + "'";
}
function trim(value) {
return String(value || "").trim();
}
function truncate(value, max) {
const textValue = String(value || "");
if (textValue.length <= max) {
return textValue;
}
return textValue.slice(0, max) + "\n...[truncated " + String(textValue.length - max) + " chars]";
}
function outputOf(result) {
if (typeof result?.output === "string") {
return result.output;
}
return JSON.stringify(result ?? {});
}
function exitCodeOf(result) {
if (typeof result?.exit_code === "number") {
return result.exit_code;
}
if (typeof result?.exitCode === "number") {
return result.exitCode;
}
return null;
}
function ok(result) {
return result.exit_code === 0;
}
function cfg(name, fallback) {
const value = config[name];
return typeof value === "string" && value.trim() ? value : fallback;
}
function enabled(name, fallback) {
const value = config[name];
return typeof value === "boolean" ? value : fallback;
}
function versionFromTag(tag) {
const match = String(tag).match(/[0-9]+\.[0-9]+\.[0-9]+(?:-[0-9A-Za-z.-]+)?/);
return match ? match[0] : "";
}
async function env(name) {
if (!name) {
return "";
}
const result = await tools.exec_command({
cmd: "printf %s \"${" + name + ":-}\"",
workdir: flow.root,
yield_time_ms: 1000,
max_output_tokens: 2000
});
return trim(outputOf(result));
}
async function run(label, cmd, options = {}) {
const workdir = options.workdir || codexRepo;
text("\n### " + label + "\n$ " + cmd + "\n");
const raw = await tools.exec_command({
cmd,
workdir,
yield_time_ms: options.yield_time_ms || 1000,
max_output_tokens: options.max_output_tokens || 12000
});
const result = {
label,
cmd,
workdir,
exit_code: exitCodeOf(raw),
output: outputOf(raw)
};
commands.push({ ...result, output: truncate(result.output, 4000) });
text("exit_code=" + String(result.exit_code) + "\n" + truncate(result.output, options.textLimit || 12000) + "\n");
return result;
}
function finish(status, message, artifacts = {}) {
result({
status,
message,
artifacts: {
releaseTag,
version,
codexRepo,
targetBranch,
commands,
...artifacts
}
});
}
async function collectRebaseContext(rebaseOutput, beforeSha) {
const status = await run("rebase conflict status", "git status --short --branch", { max_output_tokens: 12000 });
const unmerged = await run("unmerged files", "git diff --name-only --diff-filter=U", { max_output_tokens: 12000 });
const diffStat = await run("conflict diff stat", "git diff --cc --stat", { max_output_tokens: 12000 });
const conflictDiff = await run("conflict diff", "git diff --cc", { max_output_tokens: 30000, textLimit: 20000 });
const currentPatch = await run("current rebase patch", "git rebase --show-current-patch", { max_output_tokens: 20000, textLimit: 12000 });
return {
beforeSha,
rebaseOutput,
statusOutput: status.output,
unmergedFiles: unmerged.output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean),
diffStat: diffStat.output,
conflictDiff: truncate(conflictDiff.output, 20000),
currentPatch: truncate(currentPatch.output, 12000),
interventionPrompt: "Continue this same Code Mode thread to resolve the paused rebase. Preserve the fork patch stack, do not abort or reset unless explicitly instructed, then run the configured verification commands."
};
}
const releaseTag = String(payload.tag || "");
const version = versionFromTag(releaseTag);
const packageName = cfg("package_name", "@peezy.tech/codex");
const targetBranch = cfg("target_branch", "main");
const upstreamRemote = cfg("upstream_remote", "upstream");
const upstreamRepoUrl = cfg("upstream_repo_url", "https://github.com/openai/codex.git");
const cargoTargetDir = (await env(cfg("cargo_target_dir_env", ""))) || cfg("cargo_target_dir", "/tmp/peezy-codex-flow-target");
const codexRepo = (await env(cfg("codex_repo_env", ""))) || cfg("codex_repo", "");
const codexRustDir = codexRepo + "/codex-rs";
const codexBinary = cargoTargetDir + "/debug/codex";
if (!releaseTag) {
finish("failed", "Release payload is missing tag.");
}
if (!version) {
finish("failed", "Could not infer semantic version from release tag " + releaseTag);
}
if (!codexRepo) {
finish("blocked", "No Codex fork checkout configured. Set codex_repo or codex_repo_env in flow.toml.");
}
text([
"Peezy Codex fork update flow",
"",
"Release: " + releaseTag,
"Version: " + version,
"Target branch: " + targetBranch,
"Codex repo: " + codexRepo,
"Upstream remote: " + upstreamRemote + " -> " + upstreamRepoUrl,
"Cargo target dir: " + cargoTargetDir
].join("\n") + "\n");
const published = await run("published fork package check", "npm view " + q(packageName + "@" + version) + " version --json", {
max_output_tokens: 4000
});
if (ok(published) && !enabled("force", false)) {
finish("skipped", packageName + "@" + version + " is already published.");
}
const repoCheck = await run("verify codex repo", "git rev-parse --show-toplevel");
if (!ok(repoCheck)) {
finish("failed", "codex repo is not a git checkout", { repoCheck: repoCheck.output });
}
const rustWorkspaceCheck = await run("verify codex Rust workspace", "test -f " + q(codexRustDir + "/Cargo.toml"), {
max_output_tokens: 4000
});
if (!ok(rustWorkspaceCheck)) {
finish("failed", "codex Rust workspace was not found at the expected codex-rs path.", {
codexRustDir,
rustWorkspaceCheck: rustWorkspaceCheck.output
});
}
const existingRebase = await run(
"check existing rebase state",
"test -d \"$(git rev-parse --git-path rebase-merge)\" -o -d \"$(git rev-parse --git-path rebase-apply)\"",
{ max_output_tokens: 4000 }
);
if (existingRebase.exit_code === 0) {
const context = await collectRebaseContext("A rebase was already in progress before this flow started.", undefined);
finish("blocked", "A rebase is already in progress in the Codex checkout.", context);
}
await run("codex status before update", "git status --short --branch", { max_output_tokens: 12000 });
const branch = await run("current branch", "git rev-parse --abbrev-ref HEAD", { max_output_tokens: 4000 });
if (!ok(branch)) {
finish("failed", "could not read current branch", { branchOutput: branch.output });
}
if (trim(branch.output) !== targetBranch) {
const dirtyBeforeSwitch = await run("dirty check before branch switch", "git status --porcelain=v1", { max_output_tokens: 12000 });
if (trim(dirtyBeforeSwitch.output)) {
finish("blocked", "codex checkout has local changes before switching branches.", {
dirtyStatus: dirtyBeforeSwitch.output
});
}
const switched = await run("switch target branch", "git switch " + q(targetBranch), { max_output_tokens: 12000 });
if (!ok(switched)) {
finish("failed", "could not switch to target branch", { switchOutput: switched.output });
}
}
const dirty = await run("dirty check on target branch", "git status --porcelain=v1", { max_output_tokens: 12000 });
if (trim(dirty.output)) {
finish("blocked", "codex target branch has local changes. Resolve or stash them before updating.", {
dirtyStatus: dirty.output
});
}
const remote = await run(
"ensure upstream openai/codex remote",
"git remote get-url " + q(upstreamRemote) + " >/dev/null 2>&1 && git remote set-url " + q(upstreamRemote) + " " + q(upstreamRepoUrl) + " || git remote add " + q(upstreamRemote) + " " + q(upstreamRepoUrl),
{ max_output_tokens: 12000 }
);
if (!ok(remote)) {
finish("failed", "could not configure upstream remote", { remoteOutput: remote.output });
}
const fetch = await run("fetch upstream tags", "git fetch " + q(upstreamRemote) + " --tags --prune", {
max_output_tokens: 20000
});
if (!ok(fetch)) {
finish("failed", "could not fetch upstream release tags", { fetchOutput: fetch.output });
}
const releaseCommit = await run("resolve release tag", "git rev-parse --verify " + q("refs/tags/" + releaseTag + "^{commit}"), {
max_output_tokens: 4000
});
if (!ok(releaseCommit)) {
finish("failed", "could not resolve upstream release tag after fetch", {
releaseTag,
resolveOutput: releaseCommit.output
});
}
const beforeHead = await run("codex head before rebase", "git rev-parse HEAD", { max_output_tokens: 4000 });
const rebase = await run("rebase target branch onto upstream release", "git rebase " + q(releaseTag), {
max_output_tokens: 30000,
textLimit: 20000
});
if (!ok(rebase)) {
const context = await collectRebaseContext(rebase.output, trim(beforeHead.output));
finish("needs_intervention", "Rebase paused with conflicts.", context);
}
if (enabled("squash_patch_stack", true)) {
const count = await run("count fork patch commits", "git rev-list --count " + q(releaseTag) + "..HEAD", {
max_output_tokens: 4000
});
const commitCount = Number(trim(count.output));
if (Number.isFinite(commitCount) && commitCount > 1) {
const reset = await run("squash patch stack reset", "git reset --soft " + q(releaseTag), { max_output_tokens: 12000 });
if (!ok(reset)) {
finish("failed", "could not soft reset patch stack for squashing", { resetOutput: reset.output });
}
const commit = await run("squash patch stack commit", "git commit -m " + q("peezy: codex fork patches for " + releaseTag), {
max_output_tokens: 20000
});
if (!ok(commit)) {
finish("failed", "could not commit squashed patch stack", { commitOutput: commit.output });
}
}
}
const afterHead = await run("codex head after rebase", "git rev-parse HEAD", { max_output_tokens: 4000 });
await run("codex status after rebase", "git status --short --branch", { max_output_tokens: 12000 });
const build = await run(
"build fork binary",
"CARGO_TARGET_DIR=" + q(cargoTargetDir) + " cargo build -p codex-cli --bin codex",
{ workdir: codexRustDir, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(build)) {
finish("failed", "fork binary build failed", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
buildOutput: build.output
});
}
const versionCheck = await run("verify fork binary", q(codexBinary) + " --version", { max_output_tokens: 4000 });
if (!ok(versionCheck)) {
finish("failed", "built fork binary did not run", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
versionOutput: versionCheck.output
});
}
const cargoCheck = await run(
"cargo check code mode packages",
"CARGO_TARGET_DIR=" + q(cargoTargetDir) + " cargo check -p codex-app-server -p codex-core -p codex-app-server-protocol",
{ workdir: codexRustDir, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(cargoCheck)) {
finish("failed", "cargo check failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
cargoCheckOutput: cargoCheck.output
});
}
const protocolTest = await run(
"protocol code mode execute test",
"CARGO_TARGET_DIR=" + q(cargoTargetDir) + " cargo test -p codex-app-server-protocol thread_code_mode_execute -- --nocapture",
{ workdir: codexRustDir, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(protocolTest)) {
finish("failed", "protocol Code Mode API test failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
protocolTestOutput: protocolTest.output
});
}
const fmt = await run("cargo fmt check", "cargo fmt --check", {
workdir: codexRustDir,
max_output_tokens: 20000
});
if (!ok(fmt)) {
finish("failed", "cargo fmt --check failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
fmtOutput: fmt.output
});
}
const diffCheck = await run("codex diff whitespace check", "git diff --check", { max_output_tokens: 12000 });
if (!ok(diffCheck)) {
finish("failed", "codex git diff --check failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
diffCheckOutput: diffCheck.output
});
}
if (enabled("push", false)) {
const push = await run("push fork branch", "git push origin HEAD:" + q(targetBranch) + " --force-with-lease", {
max_output_tokens: 20000
});
if (!ok(push)) {
finish("failed", "could not push rebased fork branch", { pushOutput: push.output });
}
}
if (enabled("publish", false)) {
const tagCommand = "git tag -a " + q("rust-v" + version) + " -m " + q("Release " + version);
const tag = await run("create release tag", tagCommand, { max_output_tokens: 12000 });
if (!ok(tag)) {
finish("failed", "could not create release tag", { tagOutput: tag.output });
}
const pushTag = await run("push release tag", "git push origin " + q("rust-v" + version), {
max_output_tokens: 20000
});
if (!ok(pushTag)) {
finish("failed", "could not push release tag", { pushTagOutput: pushTag.output });
}
}
const finalStatus = await run("final codex status", "git status --short --branch", { max_output_tokens: 12000 });
finish("changed", "Peezy Codex fork rebased onto upstream release and verified.", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
codexHead: trim(afterHead.output),
codexBinary,
codexVersion: trim(versionCheck.output),
finalStatus: finalStatus.output,
pushed: enabled("push", false),
published: enabled("publish", false)
});

View file

@ -0,0 +1,29 @@
name = "peezy-codex-fork"
version = 1
description = "Rebase the Peezy Codex fork patch stack onto a canonical openai/codex release."
[config]
package_name = "@peezy.tech/codex"
codex_repo_env = "PEEZY_CODEX_REPO"
codex_repo = "/home/peezy/codex-fork-workspace/codex"
target_branch = "main"
upstream_remote = "upstream"
upstream_repo_url = "https://github.com/openai/codex.git"
cargo_target_dir_env = "PEEZY_CODEX_CARGO_TARGET_DIR"
cargo_target_dir = "/tmp/peezy-codex-flow-target"
squash_patch_stack = true
push = false
publish = false
[guidance]
skills = ["jojo-development-flow"]
[[steps]]
name = "rebase-patch-stack"
runner = "code-mode"
script = "exec/update-fork.code-mode.js"
timeout_ms = 3600000
[steps.trigger]
type = "upstream.release"
schema = "schemas/upstream-release.schema.json"

View file

@ -0,0 +1,11 @@
{
"type": "object",
"required": ["repo", "tag"],
"properties": {
"provider": { "type": "string" },
"repo": { "type": "string", "enum": ["openai/codex"] },
"tag": { "type": "string" },
"url": { "type": "string" },
"publishedAt": { "type": "string" }
}
}

View file

@ -40,8 +40,10 @@
"check:types": "bun run --workspaces check:types",
"dev": "bun run --filter web dev",
"dev:web": "bun run --filter web dev",
"flow": "bun apps/flow-runner/src/index.ts",
"flow:backend": "bun apps/flow-backend-systemd-local/src/index.ts",
"start": "bun run --filter web preview",
"start:discord:debug:commentary": "bun run --filter codex-discord-bridge start:debug:commentary",
"test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test"
"test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter @peezy.tech/flow-runtime test && bun run --filter codex-flow-systemd-local test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test"
}
}

View file

@ -0,0 +1,24 @@
{
"name": "@peezy.tech/flow-runtime",
"version": "0.1.0",
"description": "Generic flow package loader and runner primitives.",
"type": "module",
"private": true,
"license": "Apache-2.0",
"exports": {
".": "./src/index.ts"
},
"scripts": {
"build": "tsc --noEmit",
"check:types": "tsc --noEmit",
"test": "bun test test/*.test.ts"
},
"dependencies": {
"@peezy.tech/codex-flows": "workspace:*"
},
"devDependencies": {
"@types/bun": "catalog:",
"@types/node": "catalog:",
"typescript": "catalog:"
}
}

View file

@ -0,0 +1,18 @@
export { discoverFlows, loadFlow, stepSchemaPath, stepScriptPath } from "./manifest.ts";
export { parseFlowResult, stringifyFlowResult } from "./result.ts";
export { runFlowStep } from "./run.ts";
export { runBunStep } from "./runners/bun.ts";
export { runCodeModeStep } from "./runners/code-mode.ts";
export { readJsonSchema, validateJsonSchema } from "./schema.ts";
export { matchingSteps, stepMatchesEvent } from "./triggers.ts";
export type {
FlowEvent,
FlowManifest,
FlowResult,
FlowResultStatus,
FlowRunContext,
FlowStep,
FlowStepRunner,
FlowStepTrigger,
LoadedFlow,
} from "./types.ts";

View file

@ -0,0 +1,137 @@
import { readdir } from "node:fs/promises";
import path from "node:path";
import type { FlowManifest, FlowStep, LoadedFlow } from "./types.ts";
export type DiscoverFlowsOptions = {
cwd: string;
roots?: string[];
};
export async function loadFlow(root: string): Promise<LoadedFlow> {
const manifestPath = path.join(root, "flow.toml");
const parsed = Bun.TOML.parse(await Bun.file(manifestPath).text()) as unknown;
const manifest = normalizeManifest(parsed, manifestPath);
return { root, manifestPath, manifest };
}
export async function discoverFlows(options: DiscoverFlowsOptions): Promise<LoadedFlow[]> {
const roots = options.roots ?? [
path.join(options.cwd, ".codex", "flows"),
path.join(options.cwd, "flows"),
];
const flows: LoadedFlow[] = [];
const seen = new Set<string>();
for (const root of roots) {
for (const directory of await childDirectories(root)) {
const manifestPath = path.join(directory, "flow.toml");
if (!(await Bun.file(manifestPath).exists())) {
continue;
}
const flow = await loadFlow(directory);
if (seen.has(flow.manifest.name)) {
continue;
}
seen.add(flow.manifest.name);
flows.push(flow);
}
}
return flows;
}
export function stepScriptPath(flow: LoadedFlow, step: FlowStep): string {
return path.resolve(flow.root, step.script);
}
export function stepSchemaPath(flow: LoadedFlow, step: FlowStep): string | undefined {
return step.trigger?.schema ? path.resolve(flow.root, step.trigger.schema) : undefined;
}
async function childDirectories(root: string): Promise<string[]> {
try {
const entries = await readdir(root, { withFileTypes: true });
return entries
.filter((entry) => entry.isDirectory())
.map((entry) => path.join(root, entry.name))
.sort();
} catch (error) {
if (isErrno(error, "ENOENT")) {
return [];
}
throw error;
}
}
function normalizeManifest(value: unknown, manifestPath: string): FlowManifest {
if (!isRecord(value)) {
throw new Error(`flow.toml must contain a table: ${manifestPath}`);
}
const name = requiredString(value.name, "name", manifestPath);
const version = requiredNumber(value.version, "version", manifestPath);
const rawSteps = Array.isArray(value.steps) ? value.steps : undefined;
if (!rawSteps || rawSteps.length === 0) {
throw new Error(`flow.toml requires at least one [[steps]] entry: ${manifestPath}`);
}
return {
name,
version,
...(typeof value.description === "string" ? { description: value.description } : {}),
...(isRecord(value.config) ? { config: value.config } : {}),
...(isRecord(value.guidance) ? { guidance: normalizeGuidance(value.guidance) } : {}),
steps: rawSteps.map((step, index) => normalizeStep(step, index, manifestPath)),
};
}
function normalizeGuidance(value: Record<string, unknown>): FlowManifest["guidance"] {
return {
...(Array.isArray(value.skills)
? { skills: value.skills.filter((entry): entry is string => typeof entry === "string") }
: {}),
};
}
function normalizeStep(value: unknown, index: number, manifestPath: string): FlowStep {
if (!isRecord(value)) {
throw new Error(`steps[${index}] must be a table: ${manifestPath}`);
}
const runner = requiredString(value.runner, `steps[${index}].runner`, manifestPath);
if (runner !== "bun" && runner !== "code-mode") {
throw new Error(`steps[${index}].runner must be bun or code-mode: ${manifestPath}`);
}
return {
name: requiredString(value.name, `steps[${index}].name`, manifestPath),
runner,
script: requiredString(value.script, `steps[${index}].script`, manifestPath),
timeoutMs: typeof value.timeout_ms === "number" ? value.timeout_ms : 300_000,
...(typeof value.cwd === "string" ? { cwd: value.cwd } : {}),
...(isRecord(value.trigger) ? { trigger: normalizeTrigger(value.trigger, index, manifestPath) } : {}),
};
}
function normalizeTrigger(value: Record<string, unknown>, index: number, manifestPath: string): FlowStep["trigger"] {
return {
type: requiredString(value.type, `steps[${index}].trigger.type`, manifestPath),
...(typeof value.schema === "string" ? { schema: value.schema } : {}),
};
}
function requiredString(value: unknown, name: string, pathValue: string): string {
if (typeof value !== "string" || !value.trim()) {
throw new Error(`flow.toml requires ${name}: ${pathValue}`);
}
return value;
}
function requiredNumber(value: unknown, name: string, pathValue: string): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
throw new Error(`flow.toml requires numeric ${name}: ${pathValue}`);
}
return value;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function isErrno(error: unknown, code: string): boolean {
return isRecord(error) && error.code === code;
}

View file

@ -0,0 +1,37 @@
import type { FlowResult, FlowResultStatus } from "./types.ts";
const validStatuses = new Set<FlowResultStatus>([
"skipped",
"completed",
"changed",
"needs_intervention",
"blocked",
"failed",
]);
export function parseFlowResult(stdout: string): FlowResult {
for (const line of stdout.split(/\r?\n/).reverse()) {
const index = line.indexOf("FLOW_RESULT ");
if (index === -1) {
continue;
}
const text = line.slice(index + "FLOW_RESULT ".length).trim();
const parsed = JSON.parse(text) as unknown;
if (!isRecord(parsed)) {
throw new Error("FLOW_RESULT must be a JSON object");
}
if (typeof parsed.status !== "string" || !validStatuses.has(parsed.status as FlowResultStatus)) {
throw new Error("FLOW_RESULT status is invalid");
}
return parsed as FlowResult;
}
throw new Error("Step did not emit FLOW_RESULT");
}
export function stringifyFlowResult(value: FlowResult): string {
return `FLOW_RESULT ${JSON.stringify(value)}\n`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View file

@ -0,0 +1,33 @@
import { runBunStep } from "./runners/bun.ts";
import { runCodeModeStep, type RunCodeModeStepOptions } from "./runners/code-mode.ts";
import type { FlowEvent, FlowResult, FlowStep, LoadedFlow } from "./types.ts";
export type RunFlowStepOptions = {
flow: LoadedFlow;
step: FlowStep;
event: FlowEvent;
env?: Record<string, string | undefined>;
codeMode?: Pick<RunCodeModeStepOptions, "codexCommand" | "codexHome" | "stream">;
};
export async function runFlowStep(options: RunFlowStepOptions): Promise<FlowResult> {
if (options.step.runner === "bun") {
return runBunStep(options);
}
if (!codeModeEnabled(options.env ?? process.env)) {
throw new Error(
`Code Mode flow step ${options.flow.manifest.name}/${options.step.name} requires CODEX_FLOWS_ENABLE_CODE_MODE=1`,
);
}
return runCodeModeStep({
flow: options.flow,
step: options.step,
event: options.event,
...options.codeMode,
});
}
export function codeModeEnabled(env: Record<string, string | undefined>): boolean {
const value = env.CODEX_FLOWS_ENABLE_CODE_MODE?.trim().toLowerCase();
return value === "1" || value === "true" || value === "yes" || value === "on";
}

View file

@ -0,0 +1,54 @@
import path from "node:path";
import { stepScriptPath } from "../manifest.ts";
import { parseFlowResult } from "../result.ts";
import type { FlowEvent, FlowResult, FlowRunContext, FlowStep, LoadedFlow } from "../types.ts";
export type RunBunStepOptions = {
flow: LoadedFlow;
step: FlowStep;
event: FlowEvent;
env?: Record<string, string | undefined>;
};
export async function runBunStep(options: RunBunStepOptions): Promise<FlowResult> {
const scriptPath = stepScriptPath(options.flow, options.step);
const cwd = options.step.cwd
? path.resolve(options.flow.root, options.step.cwd)
: options.flow.root;
const subprocess = Bun.spawn({
cmd: [process.execPath, scriptPath],
cwd,
env: {
...process.env,
...options.env,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
subprocess.stdin.write(`${JSON.stringify(runContext(options), null, 2)}\n`);
subprocess.stdin.end();
const timer = setTimeout(() => subprocess.kill("SIGTERM"), options.step.timeoutMs);
const [stdout, stderr, exitCode] = await Promise.all([
subprocess.stdout.text(),
subprocess.stderr.text(),
subprocess.exited,
]).finally(() => clearTimeout(timer));
if (exitCode !== 0) {
throw new Error(`Bun flow step ${options.flow.manifest.name}/${options.step.name} failed:\n${stderr || stdout}`);
}
return parseFlowResult(stdout);
}
function runContext(options: RunBunStepOptions): FlowRunContext {
return {
flow: {
name: options.flow.manifest.name,
version: options.flow.manifest.version,
root: options.flow.root,
step: options.step.name,
...(options.flow.manifest.config ? { config: options.flow.manifest.config } : {}),
event: options.event,
},
};
}

View file

@ -0,0 +1,176 @@
import path from "node:path";
import { CodexAppServerClient } from "@peezy.tech/codex-flows";
import { stepScriptPath } from "../manifest.ts";
import { parseFlowResult } from "../result.ts";
import type { FlowEvent, FlowResult, FlowStep, LoadedFlow } from "../types.ts";
export type RunCodeModeStepOptions = {
flow: LoadedFlow;
step: FlowStep;
event: FlowEvent;
codexCommand?: string;
codexHome?: string;
stream?: boolean;
};
export async function runCodeModeStep(options: RunCodeModeStepOptions): Promise<FlowResult> {
const source = await codeModeSource(options);
const client = new CodexAppServerClient({
transportOptions: {
codexCommand: options.codexCommand,
args: appServerArgs(),
env: options.codexHome ? { CODEX_HOME: path.resolve(options.codexHome) } : undefined,
requestTimeoutMs: options.step.timeoutMs,
},
clientName: "codex-flow-runner",
clientTitle: "Codex Flow Runner",
clientVersion: "0.1.0",
});
const output: string[] = [];
let threadId = "";
let resolveTurnCompleted: (value: unknown) => void = () => undefined;
const turnCompleted = new Promise((resolve) => {
resolveTurnCompleted = resolve;
});
client.on("request", (message) => {
client.respondError(message.id, -32603, "flow runner does not handle server requests");
});
client.on("notification", (message) => {
if (message.method === "item/commandExecution/outputDelta" || message.method === "item/agentMessage/delta") {
const delta = stringField(message.params, "delta");
if (delta) {
output.push(delta);
if (options.stream) {
process.stdout.write(delta);
}
}
}
if (
message.method === "turn/completed" &&
(!threadId || stringField(message.params, "threadId") === threadId)
) {
resolveTurnCompleted(message.params);
}
});
try {
await client.connect();
const started = await client.startThread({
cwd: options.step.cwd ? path.resolve(options.flow.root, options.step.cwd) : options.flow.root,
approvalPolicy: "never",
sandbox: "danger-full-access",
ephemeral: false,
experimentalRawEvents: false,
persistExtendedHistory: true,
});
threadId = started.thread.id;
await client.request("thread/codeMode/execute", {
threadId,
source,
});
await withTimeout(
turnCompleted,
options.step.timeoutMs,
`timed out waiting for Code Mode flow step ${options.flow.manifest.name}/${options.step.name}`,
);
const read = await client.request("thread/read", {
threadId,
includeTurns: true,
});
return parseFlowResult(allAgentMessageText(read).join("\n") || output.join(""));
} finally {
client.close();
}
}
async function codeModeSource(options: RunCodeModeStepOptions): Promise<string> {
const body = await Bun.file(stepScriptPath(options.flow, options.step)).text();
const flow = {
name: options.flow.manifest.name,
version: options.flow.manifest.version,
root: options.flow.root,
step: options.step.name,
...(options.flow.manifest.config ? { config: options.flow.manifest.config } : {}),
event: options.event,
};
return [
`const flow = ${JSON.stringify(flow, null, 2)};`,
"function result(value) {",
" text('\\nFLOW_RESULT ' + JSON.stringify(value) + '\\n');",
" exit();",
"}",
body,
].join("\n");
}
function appServerArgs(): string[] {
return [
"app-server",
"--listen",
"stdio://",
"--enable",
"apps",
"--enable",
"hooks",
"--enable",
"code_mode",
"--enable",
"code_mode_only",
];
}
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number, message: string): Promise<T> {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error(message)), timeoutMs);
}),
]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
function allAgentMessageText(value: unknown): string[] {
const thread = recordField(value, "thread");
const turns = Array.isArray(thread?.turns) ? thread.turns : [];
const texts: string[] = [];
for (const turn of turns) {
const turnRecord = isRecord(turn) ? turn : undefined;
const items = Array.isArray(turnRecord?.items) ? turnRecord.items : [];
for (const item of items) {
if (!isRecord(item) || stringField(item, "type") !== "agentMessage") {
continue;
}
const text = stringField(item, "text");
if (text !== undefined) {
texts.push(text);
}
}
}
return texts;
}
function recordField(value: unknown, field: string): Record<string, unknown> | undefined {
if (!isRecord(value)) {
return undefined;
}
return isRecord(value[field]) ? value[field] : undefined;
}
function stringField(value: unknown, field: string): string | undefined {
if (!isRecord(value)) {
return undefined;
}
const fieldValue = value[field];
return typeof fieldValue === "string" ? fieldValue : undefined;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View file

@ -0,0 +1,81 @@
type JsonSchema = {
type?: string | string[];
required?: string[];
properties?: Record<string, JsonSchema>;
enum?: unknown[];
};
export type SchemaValidationResult =
| { ok: true }
| { ok: false; errors: string[] };
export async function readJsonSchema(path: string): Promise<JsonSchema> {
const parsed = JSON.parse(await Bun.file(path).text()) as unknown;
if (!isRecord(parsed)) {
throw new Error(`Schema must be a JSON object: ${path}`);
}
return parsed as JsonSchema;
}
export function validateJsonSchema(value: unknown, schema: JsonSchema): SchemaValidationResult {
const errors: string[] = [];
validateValue(value, schema, "$", errors);
return errors.length === 0 ? { ok: true } : { ok: false, errors };
}
function validateValue(
value: unknown,
schema: JsonSchema,
path: string,
errors: string[],
): void {
if (schema.enum && !schema.enum.some((entry) => Object.is(entry, value))) {
errors.push(`${path} must be one of ${schema.enum.map(String).join(", ")}`);
return;
}
if (schema.type && !typeMatches(value, schema.type)) {
errors.push(`${path} must be ${Array.isArray(schema.type) ? schema.type.join(" or ") : schema.type}`);
return;
}
if (schema.type === "object" || (schema.properties && isRecord(value))) {
if (!isRecord(value)) {
errors.push(`${path} must be object`);
return;
}
for (const key of schema.required ?? []) {
if (!(key in value)) {
errors.push(`${path}.${key} is required`);
}
}
for (const [key, childSchema] of Object.entries(schema.properties ?? {})) {
if (key in value) {
validateValue(value[key], childSchema, `${path}.${key}`, errors);
}
}
}
}
function typeMatches(value: unknown, type: string | string[]): boolean {
const types = Array.isArray(type) ? type : [type];
return types.some((entry) => {
if (entry === "array") {
return Array.isArray(value);
}
if (entry === "null") {
return value === null;
}
if (entry === "integer") {
return Number.isInteger(value);
}
if (entry === "object") {
return isRecord(value);
}
return typeof value === entry;
});
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

View file

@ -0,0 +1,41 @@
import { stepSchemaPath } from "./manifest.ts";
import { readJsonSchema, validateJsonSchema } from "./schema.ts";
import type { FlowEvent, FlowStep, LoadedFlow } from "./types.ts";
export type TriggerMatch =
| { ok: true }
| { ok: false; reason: string };
export async function stepMatchesEvent(
flow: LoadedFlow,
step: FlowStep,
event: FlowEvent,
): Promise<TriggerMatch> {
if (!step.trigger) {
return { ok: false, reason: "step has no trigger" };
}
if (step.trigger.type !== event.type) {
return { ok: false, reason: `event type ${event.type} does not match ${step.trigger.type}` };
}
const schemaPath = stepSchemaPath(flow, step);
if (!schemaPath) {
return { ok: true };
}
const result = validateJsonSchema(event.payload, await readJsonSchema(schemaPath));
return result.ok ? { ok: true } : { ok: false, reason: result.errors.join("; ") };
}
export async function matchingSteps(
flows: LoadedFlow[],
event: FlowEvent,
): Promise<Array<{ flow: LoadedFlow; step: FlowStep }>> {
const matches: Array<{ flow: LoadedFlow; step: FlowStep }> = [];
for (const flow of flows) {
for (const step of flow.manifest.steps) {
if ((await stepMatchesEvent(flow, step, event)).ok) {
matches.push({ flow, step });
}
}
}
return matches;
}

View file

@ -0,0 +1,68 @@
export type FlowEvent<TPayload = unknown> = {
id: string;
type: string;
source?: string;
occurredAt?: string;
receivedAt: string;
payload: TPayload;
};
export type FlowResultStatus =
| "skipped"
| "completed"
| "changed"
| "needs_intervention"
| "blocked"
| "failed";
export type FlowResult = {
status: FlowResultStatus;
message?: string;
artifacts?: Record<string, unknown>;
next?: Array<FlowEvent<Record<string, unknown>>>;
[key: string]: unknown;
};
export type FlowStepRunner = "bun" | "code-mode";
export type FlowStepTrigger = {
type: string;
schema?: string;
};
export type FlowStep = {
name: string;
runner: FlowStepRunner;
script: string;
timeoutMs: number;
cwd?: string;
trigger?: FlowStepTrigger;
};
export type FlowManifest = {
name: string;
version: number;
description?: string;
config?: Record<string, unknown>;
guidance?: {
skills?: string[];
};
steps: FlowStep[];
};
export type LoadedFlow = {
root: string;
manifestPath: string;
manifest: FlowManifest;
};
export type FlowRunContext = {
flow: {
name: string;
version: number;
root: string;
step: string;
config?: Record<string, unknown>;
event: FlowEvent;
};
};

View file

@ -0,0 +1,209 @@
import { expect, test } from "bun:test";
import { mkdtemp, rm, mkdir } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import {
discoverFlows,
matchingSteps,
runBunStep,
runFlowStep,
validateJsonSchema,
} from "../src/index.ts";
import type { FlowEvent } from "../src/index.ts";
test("discovers installed flows before source flows", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-"));
try {
await writeFlow(directory, ".codex/flows/demo", "installed");
await writeFlow(directory, "flows/demo", "source");
const flows = await discoverFlows({ cwd: directory });
expect(flows.map((flow) => flow.manifest.name)).toEqual(["demo"]);
expect(flows[0]?.manifest.description).toBe("installed");
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("matches flow steps by event type and payload schema", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-"));
try {
await writeFlow(directory, "flows/demo", "source");
const flows = await discoverFlows({ cwd: directory });
const event: FlowEvent = {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { name: "Ada" },
};
expect((await matchingSteps(flows, event)).map(({ step }) => step.name)).toEqual([
"hello",
]);
expect(await matchingSteps(flows, { ...event, payload: {} })).toEqual([]);
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("bundled Codex release flows match one generic upstream release event", async () => {
const root = path.resolve(import.meta.dir, "..", "..", "..");
const flows = await discoverFlows({ cwd: root });
const event: FlowEvent = {
id: "event-1",
type: "upstream.release",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { repo: "openai/codex", tag: "rust-v1.2.3" },
};
const matches = await matchingSteps(flows, event);
expect(matches.map(({ flow, step }) => `${flow.manifest.name}/${step.name}`)).toEqual([
"openai-codex-bindings/regenerate-bindings",
"peezy-codex-fork/rebase-patch-stack",
]);
});
test("bundled Code Mode flow remains gated by the feature flag", async () => {
const root = path.resolve(import.meta.dir, "..", "..", "..");
const flows = await discoverFlows({ cwd: root });
const flow = flows.find((entry) => entry.manifest.name === "peezy-codex-fork");
const step = flow?.manifest.steps.find((entry) => entry.name === "rebase-patch-stack");
if (!flow || !step) {
throw new Error("expected bundled peezy-codex-fork flow");
}
await expect(
runFlowStep({
flow,
step,
event: {
id: "event-1",
type: "upstream.release",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { repo: "openai/codex", tag: "rust-v1.2.3" },
},
env: {},
}),
).rejects.toThrow("requires CODEX_FLOWS_ENABLE_CODE_MODE=1");
});
test("validates simple JSON schema constraints", () => {
const schema = {
type: "object",
required: ["name"],
properties: {
name: { type: "string" },
kind: { enum: ["demo"] },
},
};
expect(validateJsonSchema({ name: "Ada", kind: "demo" }, schema)).toEqual({ ok: true });
expect(validateJsonSchema({ kind: "other" }, schema)).toEqual({
ok: false,
errors: ["$.name is required", "$.kind must be one of demo"],
});
});
test("runs Bun flow steps and parses FLOW_RESULT", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-"));
try {
await writeFlow(directory, "flows/demo", "source");
const [flow] = await discoverFlows({ cwd: directory });
const step = flow?.manifest.steps[0];
if (!flow || !step) {
throw new Error("expected fixture flow");
}
const result = await runBunStep({
flow,
step,
event: {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { name: "Ada" },
},
});
expect(result).toEqual({
status: "completed",
message: "hello Ada",
});
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("requires a feature flag before running Code Mode flow steps", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-"));
try {
await writeFlow(directory, "flows/demo", "source");
const [flow] = await discoverFlows({ cwd: directory });
const step = flow?.manifest.steps[0];
if (!flow || !step) {
throw new Error("expected fixture flow");
}
await expect(
runFlowStep({
flow,
step: { ...step, runner: "code-mode" },
event: {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-13T00:00:00.000Z",
payload: { name: "Ada" },
},
env: {},
}),
).rejects.toThrow("requires CODEX_FLOWS_ENABLE_CODE_MODE=1");
} finally {
await rm(directory, { recursive: true, force: true });
}
});
async function writeFlow(root: string, relative: string, description: string): Promise<void> {
const flowRoot = path.join(root, relative);
await mkdir(path.join(flowRoot, "exec"), { recursive: true });
await mkdir(path.join(flowRoot, "schemas"), { recursive: true });
await Bun.write(
path.join(flowRoot, "flow.toml"),
[
'name = "demo"',
"version = 1",
`description = "${description}"`,
"",
"[[steps]]",
'name = "hello"',
'runner = "bun"',
'script = "exec/hello.ts"',
"timeout_ms = 30000",
"",
"[steps.trigger]",
'type = "demo.event"',
'schema = "schemas/demo-event.schema.json"',
"",
].join("\n"),
);
await Bun.write(
path.join(flowRoot, "schemas/demo-event.schema.json"),
JSON.stringify({
type: "object",
required: ["name"],
properties: {
name: { type: "string" },
},
}),
);
await Bun.write(
path.join(flowRoot, "exec/hello.ts"),
[
"const context = JSON.parse(await Bun.stdin.text());",
"const name = context.flow.event.payload.name;",
"console.log(`FLOW_RESULT ${JSON.stringify({ status: 'completed', message: `hello ${name}` })}`);",
"",
].join("\n"),
);
}

View file

@ -0,0 +1,21 @@
{
"compilerOptions": {
"module": "ESNext",
"moduleResolution": "Bundler",
"allowImportingTsExtensions": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"target": "ES2022",
"lib": ["ESNext"],
"strict": true,
"skipLibCheck": true,
"noUncheckedIndexedAccess": true,
"isolatedModules": true,
"verbatimModuleSyntax": true,
"forceConsistentCasingInFileNames": true,
"noEmit": true,
"types": ["node", "bun"],
"rootDir": "."
},
"include": ["src/**/*.ts", "test/**/*.ts"]
}