diff --git a/README.md b/README.md index 5f0e173..3b44bc0 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,14 @@ Thin browser UI plus TypeScript client for `codex app-server`. -This branch intentionally drops the workspace service, runtime, gateways, jobs, -delegation, and host setup layer. The remaining source is: +The current source is: - `apps/web`: React/Vite UI that connects directly to a Codex app-server WebSocket. - `apps/cli`: Bun CLI that sends JSON-RPC actions to a listening Codex app-server. +- `apps/flow-runner`: CLI for discovering and firing packaged flows. +- `apps/flow-backend-systemd-local`: local HTTP backend for executing flows from dispatch events. - `packages/codex-client`: JSON-RPC client, app-server transports, flow helpers, and generated protocol types. +- `packages/flow-runtime`: flow manifest loading, event matching, and runner primitives. - `packages/ui`: small shared UI primitives and styling. ## Run @@ -59,7 +61,23 @@ bun run build bun run test ``` -`bun run test` currently runs the `@peezy.tech/codex-flows` transport tests. +`bun run test` runs the client, flow runtime, local flow backend, CLI, and +Discord bridge tests. + +## Flow Automation + +Flow packages live under `flows/*` and installed copies can live under +`.codex/flows/*`. See [docs/flows.md](docs/flows.md) for `flow.toml`, generic +`FlowEvent` dispatch, Bun and Code Mode runners, the systemd-local backend, and +the Codex release flows. + +```bash +bun run flow list +bun run flow:backend serve --cwd "$(pwd)" +``` + +Code Mode flow steps are present on `main` but require +`CODEX_FLOWS_ENABLE_CODE_MODE=1` before execution. ## Development Flow @@ -112,6 +130,23 @@ The low-level app-server client package. It exports: - `@peezy.tech/codex-flows/rpc`: JSON-RPC helpers and types. - `@peezy.tech/codex-flows/generated`: generated Codex app-server protocol types. +### `flow-runner` + +CLI package for listing flow packages, firing every step that matches a +`FlowEvent`, or running one explicit flow step. + +### `flow-backend-systemd-local` + +HTTP and CLI backend that persists dispatched flow events/runs to SQLite and +starts matching steps locally. It is intended to run as a small systemd-managed +service, with optional transient `systemd-run` units per step. + +### `@peezy.tech/flow-runtime` + +Shared runtime package for loading `flow.toml`, validating payload JSON Schema, +matching steps to generic events, and invoking Bun or feature-flagged Code Mode +steps. + ### `web` The browser app imports `@peezy.tech/codex-flows/browser`, opens a direct WebSocket diff --git a/apps/flow-backend-systemd-local/package.json b/apps/flow-backend-systemd-local/package.json new file mode 100644 index 0000000..d62a228 --- /dev/null +++ b/apps/flow-backend-systemd-local/package.json @@ -0,0 +1,25 @@ +{ + "name": "codex-flow-systemd-local", + "version": "0.1.0", + "description": "Local flow execution backend suitable for a systemd-managed service.", + "type": "module", + "private": true, + "license": "Apache-2.0", + "bin": { + "codex-flow-systemd-local": "./src/index.ts" + }, + "scripts": { + "build": "tsc --noEmit", + "check:types": "tsc --noEmit", + "start": "bun ./src/index.ts serve", + "test": "bun test test/*.test.ts" + }, + "dependencies": { + "@peezy.tech/flow-runtime": "workspace:*" + }, + "devDependencies": { + "@types/bun": "catalog:", + "@types/node": "catalog:", + "typescript": "catalog:" + } +} diff --git a/apps/flow-backend-systemd-local/src/backend.ts b/apps/flow-backend-systemd-local/src/backend.ts new file mode 100644 index 0000000..d268468 --- /dev/null +++ b/apps/flow-backend-systemd-local/src/backend.ts @@ -0,0 +1,174 @@ +import { createHash } from "node:crypto"; +import { mkdir } from "node:fs/promises"; +import path from "node:path"; +import { + discoverFlows, + matchingSteps, + type FlowEvent, + type FlowStep, + type LoadedFlow, +} from "@peezy.tech/flow-runtime"; +import type { FlowBackendConfig } from "./config.ts"; +import { executeCommand, flowCommand, parseRunnerResult } from "./executor.ts"; +import { FlowBackendStore, type FlowRunRecord } from "./store.ts"; + +export type DispatchFlowEventOptions = { + config: FlowBackendConfig; + store: FlowBackendStore; + event: FlowEvent; + wait?: boolean; + env?: Record; +}; + +export type DispatchFlowEventResult = { + status: "accepted" | "duplicate"; + eventId: string; + runIds: string[]; + matched: number; +}; + +export async function dispatchFlowEvent(options: DispatchFlowEventOptions): Promise { + const inserted = options.store.insertEvent(options.event); + if (!inserted) { + return { + status: "duplicate", + eventId: options.event.id, + runIds: options.store.listRunsByEvent(options.event.id).map((run) => run.id), + matched: 0, + }; + } + + const eventPath = await writeEventFile(options.config.dataDir, options.event); + const flows = await discoverFlows({ cwd: options.config.cwd }); + const matches = await matchingSteps(flows, options.event); + const promises: Array> = []; + for (const match of matches) { + const run = createRunRecord(options.config, options.event, match.flow, match.step, eventPath); + options.store.createRun(run); + const promise = executeAndRecord({ + config: options.config, + store: options.store, + run, + env: options.env, + }); + if (options.wait) { + promises.push(promise); + } else { + promise.catch((error) => { + options.store.markRunCompleted(run.id, { + status: "failed", + stdout: "", + stderr: "", + error: error instanceof Error ? error.message : String(error), + }); + }); + } + } + if (promises.length > 0) { + await Promise.all(promises); + } + return { + status: "accepted", + eventId: options.event.id, + runIds: matches.map((match) => runId(options.event.id, match.flow.manifest.name, match.step.name)), + matched: matches.length, + }; +} + +export async function readFlowEvent(pathValue: string): Promise { + return normalizeFlowEvent(JSON.parse(await Bun.file(path.resolve(pathValue)).text()) as unknown); +} + +export function normalizeFlowEvent(value: unknown): FlowEvent { + if (!isRecord(value) || typeof value.id !== "string" || typeof value.type !== "string") { + throw new Error("FlowEvent requires string id and type"); + } + return { + receivedAt: typeof value.receivedAt === "string" ? value.receivedAt : new Date().toISOString(), + payload: isRecord(value.payload) ? value.payload : {}, + ...value, + } as FlowEvent; +} + +async function executeAndRecord(options: { + config: FlowBackendConfig; + store: FlowBackendStore; + run: FlowRunRecord; + env?: Record; +}): Promise { + const command = flowCommand({ + config: options.config, + runId: options.run.id, + eventPath: options.run.eventPath, + flowName: options.run.flowName, + stepName: options.run.stepName, + env: options.env, + }); + options.store.markRunRunning(options.run.id, JSON.stringify(command), command.unit); + let result: Awaited>; + try { + result = await executeCommand(command, options.config, options.env); + } catch (error) { + options.store.markRunCompleted(options.run.id, { + status: "failed", + stdout: "", + stderr: "", + error: error instanceof Error ? error.message : String(error), + }); + return; + } + const status = result.exitCode === 0 ? "completed" : "failed"; + options.store.markRunCompleted(options.run.id, { + status, + resultJson: parseRunnerResult(result.stdout), + stdout: result.stdout, + stderr: result.stderr, + ...(status === "failed" ? { error: `flow runner exited with ${result.exitCode ?? "unknown"}` } : {}), + }); +} + +function createRunRecord( + config: FlowBackendConfig, + event: FlowEvent, + flow: LoadedFlow, + step: FlowStep, + eventPath: string, +): FlowRunRecord { + return { + id: runId(event.id, flow.manifest.name, step.name), + eventId: event.id, + flowName: flow.manifest.name, + stepName: step.name, + status: "queued", + backend: "systemd-local", + executor: config.executor, + eventPath, + createdAt: new Date().toISOString(), + }; +} + +function runId(eventId: string, flowName: string, stepName: string): string { + const hash = createHash("sha256") + .update(`${eventId}\0${flowName}\0${stepName}`) + .digest("hex") + .slice(0, 12); + return `run_${hash}`; +} + +async function writeEventFile(dataDir: string, event: FlowEvent): Promise { + const directory = path.join(dataDir, "events"); + await mkdir(directory, { recursive: true }); + const filePath = path.join(directory, `${safeFileName(event.id)}.json`); + await Bun.write(filePath, JSON.stringify(event, null, 2)); + return filePath; +} + +function safeFileName(value: string): string { + const hash = createHash("sha256").update(value).digest("hex").slice(0, 12); + const base = value.toLowerCase().replace(/[^a-z0-9._-]+/g, "-").replace(/^-+|-+$/g, "").slice(0, 64); + return `${base || "event"}-${hash}`; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/apps/flow-backend-systemd-local/src/config.ts b/apps/flow-backend-systemd-local/src/config.ts new file mode 100644 index 0000000..38de0be --- /dev/null +++ b/apps/flow-backend-systemd-local/src/config.ts @@ -0,0 +1,191 @@ +import path from "node:path"; + +export type FlowBackendExecutor = "direct" | "systemd-run"; + +export type FlowBackendConfig = { + cwd: string; + dataDir: string; + host: string; + port: number; + secret?: string; + executor: FlowBackendExecutor; + bunCommand: string; + flowRunnerPath: string; + forwardEnv: string[]; +}; + +export type FlowBackendCli = + | { kind: "help" } + | { kind: "serve"; config: FlowBackendConfig } + | { kind: "dispatch"; config: FlowBackendConfig; eventPath: string; wait: boolean }; + +export function readConfig( + env: Record = process.env, + overrides: Partial = {}, +): FlowBackendConfig { + const cwd = path.resolve(overrides.cwd ?? env.CODEX_FLOW_BACKEND_CWD ?? process.cwd()); + const dataDir = path.resolve(overrides.dataDir ?? env.CODEX_FLOW_BACKEND_DATA_DIR ?? path.join(cwd, ".codex", "flow-backend")); + return { + cwd, + dataDir, + host: overrides.host ?? env.CODEX_FLOW_BACKEND_HOST ?? "127.0.0.1", + port: overrides.port ?? numberEnv(env.CODEX_FLOW_BACKEND_PORT, 7345), + ...(overrides.secret ?? env.CODEX_FLOW_BACKEND_SECRET + ? { secret: overrides.secret ?? env.CODEX_FLOW_BACKEND_SECRET } + : {}), + executor: overrides.executor ?? executorEnv(env.CODEX_FLOW_BACKEND_EXECUTOR), + bunCommand: overrides.bunCommand ?? env.CODEX_FLOW_BACKEND_BUN ?? process.execPath, + flowRunnerPath: path.resolve( + overrides.flowRunnerPath ?? env.CODEX_FLOW_RUNNER_PATH ?? defaultFlowRunnerPath(), + ), + forwardEnv: overrides.forwardEnv ?? forwardEnv(env.CODEX_FLOW_BACKEND_FORWARD_ENV), + }; +} + +export function parseCli(argv: string[], env: Record = process.env): FlowBackendCli { + const command = argv[0]; + if (!command || command === "help" || command === "-h" || command === "--help") { + return { kind: "help" }; + } + + let cwd: string | undefined; + let dataDir: string | undefined; + let host: string | undefined; + let port: number | undefined; + let secret: string | undefined; + let executor: FlowBackendExecutor | undefined; + let bunCommand: string | undefined; + let flowRunnerPath: string | undefined; + let wait = false; + let eventPath: string | undefined; + const rest = argv.slice(1); + for (let index = 0; index < rest.length; index += 1) { + const arg = rest[index]; + if (!arg) { + continue; + } + if (arg === "--cwd") { + cwd = required(rest, ++index, arg); + continue; + } + if (arg === "--data-dir") { + dataDir = required(rest, ++index, arg); + continue; + } + if (arg === "--host") { + host = required(rest, ++index, arg); + continue; + } + if (arg === "--port") { + port = Number(required(rest, ++index, arg)); + continue; + } + if (arg === "--secret") { + secret = required(rest, ++index, arg); + continue; + } + if (arg === "--executor") { + executor = executorEnv(required(rest, ++index, arg)); + continue; + } + if (arg === "--bun") { + bunCommand = required(rest, ++index, arg); + continue; + } + if (arg === "--flow-runner") { + flowRunnerPath = required(rest, ++index, arg); + continue; + } + if (arg === "--event") { + eventPath = required(rest, ++index, arg); + continue; + } + if (arg === "--wait") { + wait = true; + continue; + } + throw new Error(`Unknown option: ${arg}`); + } + + const config = readConfig(env, { + ...(cwd ? { cwd } : {}), + ...(dataDir ? { dataDir } : {}), + ...(host ? { host } : {}), + ...(port !== undefined ? { port } : {}), + ...(secret ? { secret } : {}), + ...(executor ? { executor } : {}), + ...(bunCommand ? { bunCommand } : {}), + ...(flowRunnerPath ? { flowRunnerPath } : {}), + }); + if (command === "serve") { + return { kind: "serve", config }; + } + if (command === "dispatch") { + if (!eventPath) { + throw new Error("dispatch requires --event "); + } + return { kind: "dispatch", config, eventPath, wait }; + } + throw new Error(`Unknown command: ${command}`); +} + +export function defaultFlowRunnerPath(): string { + return path.resolve(import.meta.dir, "..", "..", "flow-runner", "src", "index.ts"); +} + +export function helpText(): string { + return [ + "Usage:", + " codex-flow-systemd-local serve [--cwd ] [--data-dir ] [--host ] [--port ]", + " codex-flow-systemd-local dispatch --event [--cwd ] [--data-dir ] [--wait]", + "", + "Environment:", + " CODEX_FLOW_BACKEND_SECRET Optional HMAC secret for HTTP dispatches", + " CODEX_FLOW_BACKEND_EXECUTOR direct or systemd-run", + " CODEX_FLOWS_ENABLE_CODE_MODE Enables runner = \"code-mode\" steps", + "", + ].join("\n"); +} + +function numberEnv(value: string | undefined, fallback: number): number { + if (!value) { + return fallback; + } + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : fallback; +} + +function executorEnv(value: string | undefined): FlowBackendExecutor { + if (value === "systemd-run") { + return "systemd-run"; + } + if (!value || value === "direct") { + return "direct"; + } + throw new Error("executor must be direct or systemd-run"); +} + +function forwardEnv(value: string | undefined): string[] { + const defaults = [ + "CODEX_FLOWS_ENABLE_CODE_MODE", + "CODEX_APP_SERVER_CODEX_COMMAND", + "CODEX_HOME", + "HOME", + "PATH", + ]; + if (!value) { + return defaults; + } + return value + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean); +} + +function required(args: string[], index: number, flag: string): string { + const value = args[index]; + if (!value) { + throw new Error(`${flag} requires a value`); + } + return value; +} diff --git a/apps/flow-backend-systemd-local/src/executor.ts b/apps/flow-backend-systemd-local/src/executor.ts new file mode 100644 index 0000000..c5de6e6 --- /dev/null +++ b/apps/flow-backend-systemd-local/src/executor.ts @@ -0,0 +1,113 @@ +import { createHash } from "node:crypto"; +import type { FlowBackendConfig } from "./config.ts"; + +export type FlowCommandSpec = { + command: string; + args: string[]; + unit?: string; +}; + +export type ExecuteFlowRunOptions = { + config: FlowBackendConfig; + runId: string; + eventPath: string; + flowName: string; + stepName: string; + env?: Record; +}; + +export type ExecuteFlowRunResult = { + command: FlowCommandSpec; + exitCode: number | null; + stdout: string; + stderr: string; +}; + +export async function executeFlowRun(options: ExecuteFlowRunOptions): Promise { + const command = flowCommand(options); + const result = await executeCommand(command, options.config, options.env); + return { command, ...result }; +} + +export async function executeCommand( + command: FlowCommandSpec, + config: FlowBackendConfig, + env: Record = process.env, +): Promise> { + const child = Bun.spawn([command.command, ...command.args], { + cwd: config.cwd, + env: forwardedEnv(config, env), + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(child.stdout).text(), + new Response(child.stderr).text(), + child.exited, + ]); + return { exitCode, stdout, stderr }; +} + +export function flowCommand(options: ExecuteFlowRunOptions): FlowCommandSpec { + const runnerArgs = [ + options.config.flowRunnerPath, + "--cwd", + options.config.cwd, + "run", + options.flowName, + options.stepName, + "--event", + options.eventPath, + ]; + if (options.config.executor === "direct") { + return { command: options.config.bunCommand, args: runnerArgs }; + } + const unit = `codex-flow-${safeUnit(options.runId)}`; + return { + command: "systemd-run", + unit, + args: [ + "--user", + "--collect", + "--wait", + `--unit=${unit}`, + `--working-directory=${options.config.cwd}`, + ...systemdSetEnvArgs(options.config, options.env ?? process.env), + options.config.bunCommand, + ...runnerArgs, + ], + }; +} + +export function parseRunnerResult(stdout: string): string | undefined { + const trimmed = stdout.trim(); + if (!trimmed.startsWith("{")) { + return undefined; + } + try { + return JSON.stringify(JSON.parse(trimmed)); + } catch { + return undefined; + } +} + +function forwardedEnv(config: FlowBackendConfig, env: Record): Record { + const next: Record = {}; + const source: Record = { ...process.env, ...env }; + for (const name of config.forwardEnv) { + const value = source[name]; + if (value !== undefined) { + next[name] = value; + } + } + return next; +} + +function systemdSetEnvArgs(config: FlowBackendConfig, env: Record): string[] { + return Object.entries(forwardedEnv(config, env)).map(([key, value]) => `--setenv=${key}=${value}`); +} + +function safeUnit(value: string): string { + const hash = createHash("sha256").update(value).digest("hex").slice(0, 10); + return `${value.toLowerCase().replace(/[^a-z0-9-]+/g, "-").replace(/^-+|-+$/g, "").slice(0, 48)}-${hash}`; +} diff --git a/apps/flow-backend-systemd-local/src/index.ts b/apps/flow-backend-systemd-local/src/index.ts new file mode 100644 index 0000000..916a5e0 --- /dev/null +++ b/apps/flow-backend-systemd-local/src/index.ts @@ -0,0 +1,38 @@ +#!/usr/bin/env bun +import path from "node:path"; +import { dispatchFlowEvent, readFlowEvent } from "./backend.ts"; +import { helpText, parseCli } from "./config.ts"; +import { serveFlowBackend } from "./server.ts"; +import { FlowBackendStore } from "./store.ts"; + +await main().catch((error) => { + process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + process.exitCode = 1; +}); + +async function main(): Promise { + const cli = parseCli(Bun.argv.slice(2)); + if (cli.kind === "help") { + process.stdout.write(helpText()); + return; + } + if (cli.kind === "serve") { + const server = serveFlowBackend(cli.config); + process.stdout.write(`codex-flow-systemd-local listening on http://${server.hostname}:${server.port}\n`); + return new Promise(() => undefined); + } + const store = new FlowBackendStore(path.join(cli.config.dataDir, "flow-backend.sqlite")); + try { + const event = await readFlowEvent(cli.eventPath); + const result = await dispatchFlowEvent({ + config: cli.config, + store, + event, + wait: cli.wait, + env: process.env, + }); + process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + } finally { + store.close(); + } +} diff --git a/apps/flow-backend-systemd-local/src/server.ts b/apps/flow-backend-systemd-local/src/server.ts new file mode 100644 index 0000000..10d2dbc --- /dev/null +++ b/apps/flow-backend-systemd-local/src/server.ts @@ -0,0 +1,43 @@ +import path from "node:path"; +import type { FlowBackendConfig } from "./config.ts"; +import { dispatchFlowEvent, normalizeFlowEvent } from "./backend.ts"; +import { requestSignature, verifyBodySignature } from "./signature.ts"; +import { FlowBackendStore } from "./store.ts"; + +export function serveFlowBackend(config: FlowBackendConfig): ReturnType { + const store = new FlowBackendStore(path.join(config.dataDir, "flow-backend.sqlite")); + return Bun.serve({ + hostname: config.host, + port: config.port, + async fetch(request) { + const url = new URL(request.url); + if (request.method === "GET" && url.pathname === "/healthz") { + return json({ ok: true }); + } + if (request.method === "POST" && (url.pathname === "/events" || url.pathname === "/flow-events")) { + const body = await request.text(); + if (config.secret && !verifyBodySignature(config.secret, body, requestSignature(request.headers))) { + return json({ error: "invalid signature" }, 401); + } + const event = normalizeFlowEvent(JSON.parse(body) as unknown); + const result = await dispatchFlowEvent({ config, store, event }); + return json(result, 202); + } + if (request.method === "GET" && url.pathname === "/runs") { + const eventId = url.searchParams.get("eventId"); + if (!eventId) { + return json({ error: "missing eventId" }, 400); + } + return json({ eventId, runs: store.listRunsByEvent(eventId) }); + } + return json({ error: "not found" }, 404); + }, + }); +} + +function json(value: unknown, status = 200): Response { + return new Response(JSON.stringify(value, null, 2), { + status, + headers: { "content-type": "application/json" }, + }); +} diff --git a/apps/flow-backend-systemd-local/src/signature.ts b/apps/flow-backend-systemd-local/src/signature.ts new file mode 100644 index 0000000..8e9de19 --- /dev/null +++ b/apps/flow-backend-systemd-local/src/signature.ts @@ -0,0 +1,18 @@ +import { createHmac, timingSafeEqual } from "node:crypto"; + +export function signBody(secret: string, body: string): string { + return `sha256=${createHmac("sha256", secret).update(body).digest("hex")}`; +} + +export function verifyBodySignature(secret: string, body: string, signature: string | null): boolean { + if (!signature?.startsWith("sha256=")) { + return false; + } + const expected = Buffer.from(signBody(secret, body)); + const actual = Buffer.from(signature); + return expected.length === actual.length && timingSafeEqual(expected, actual); +} + +export function requestSignature(headers: Headers): string | null { + return headers.get("x-flow-signature-256") ?? headers.get("x-patchbay-flow-signature-256"); +} diff --git a/apps/flow-backend-systemd-local/src/store.ts b/apps/flow-backend-systemd-local/src/store.ts new file mode 100644 index 0000000..d0e5a93 --- /dev/null +++ b/apps/flow-backend-systemd-local/src/store.ts @@ -0,0 +1,198 @@ +import { mkdirSync } from "node:fs"; +import path from "node:path"; +import { Database } from "bun:sqlite"; +import type { FlowEvent } from "@peezy.tech/flow-runtime"; + +export type FlowRunStatus = "queued" | "running" | "completed" | "failed"; + +export type FlowRunRecord = { + id: string; + eventId: string; + flowName: string; + stepName: string; + status: FlowRunStatus; + backend: "systemd-local"; + executor: string; + unit?: string; + eventPath: string; + commandJson?: string; + resultJson?: string; + stdout?: string; + stderr?: string; + error?: string; + createdAt: string; + startedAt?: string; + completedAt?: string; +}; + +export class FlowBackendStore { + readonly dbPath: string; + #db: Database; + + constructor(dbPath: string) { + this.dbPath = dbPath; + mkdirSync(path.dirname(dbPath), { recursive: true }); + this.#db = new Database(dbPath); + this.#db.exec(` + create table if not exists flow_events ( + id text primary key, + type text not null, + source text, + occurred_at text, + received_at text not null, + payload_json text not null, + raw_json text not null, + created_at text not null + ); + create table if not exists flow_runs ( + id text primary key, + event_id text not null, + flow_name text not null, + step_name text not null, + status text not null, + backend text not null, + executor text not null, + unit text, + event_path text not null, + command_json text, + result_json text, + stdout text, + stderr text, + error text, + created_at text not null, + started_at text, + completed_at text + ); + create index if not exists flow_runs_event_id_idx on flow_runs(event_id); + `); + } + + insertEvent(event: FlowEvent): boolean { + const result = this.#db + .query( + `insert or ignore into flow_events + (id, type, source, occurred_at, received_at, payload_json, raw_json, created_at) + values ($id, $type, $source, $occurredAt, $receivedAt, $payloadJson, $rawJson, $createdAt)`, + ) + .run({ + $id: event.id, + $type: event.type, + $source: event.source ?? null, + $occurredAt: event.occurredAt ?? null, + $receivedAt: event.receivedAt, + $payloadJson: JSON.stringify(event.payload), + $rawJson: JSON.stringify(event), + $createdAt: new Date().toISOString(), + }); + return result.changes > 0; + } + + createRun(record: FlowRunRecord): void { + this.#db + .query( + `insert into flow_runs + (id, event_id, flow_name, step_name, status, backend, executor, unit, event_path, + command_json, result_json, stdout, stderr, error, created_at, started_at, completed_at) + values + ($id, $eventId, $flowName, $stepName, $status, $backend, $executor, $unit, $eventPath, + $commandJson, $resultJson, $stdout, $stderr, $error, $createdAt, $startedAt, $completedAt)`, + ) + .run(runParams(record)); + } + + markRunRunning(id: string, commandJson: string, unit?: string): void { + this.#db + .query( + `update flow_runs + set status = 'running', started_at = $startedAt, command_json = $commandJson, unit = $unit + where id = $id`, + ) + .run({ + $id: id, + $startedAt: new Date().toISOString(), + $commandJson: commandJson, + $unit: unit ?? null, + }); + } + + markRunCompleted(id: string, values: { status: FlowRunStatus; resultJson?: string; stdout: string; stderr: string; error?: string }): void { + this.#db + .query( + `update flow_runs + set status = $status, completed_at = $completedAt, result_json = $resultJson, + stdout = $stdout, stderr = $stderr, error = $error + where id = $id`, + ) + .run({ + $id: id, + $status: values.status, + $completedAt: new Date().toISOString(), + $resultJson: values.resultJson ?? null, + $stdout: values.stdout, + $stderr: values.stderr, + $error: values.error ?? null, + }); + } + + listRunsByEvent(eventId: string): FlowRunRecord[] { + return this.#db + .query("select * from flow_runs where event_id = $eventId order by created_at, id") + .all({ $eventId: eventId }) + .map(rowToRunRecord); + } + + close(): void { + this.#db.close(); + } +} + +function runParams(record: FlowRunRecord): Record { + return { + $id: record.id, + $eventId: record.eventId, + $flowName: record.flowName, + $stepName: record.stepName, + $status: record.status, + $backend: record.backend, + $executor: record.executor, + $unit: record.unit ?? null, + $eventPath: record.eventPath, + $commandJson: record.commandJson ?? null, + $resultJson: record.resultJson ?? null, + $stdout: record.stdout ?? null, + $stderr: record.stderr ?? null, + $error: record.error ?? null, + $createdAt: record.createdAt, + $startedAt: record.startedAt ?? null, + $completedAt: record.completedAt ?? null, + }; +} + +function rowToRunRecord(row: unknown): FlowRunRecord { + if (!isRecord(row)) { + throw new Error("invalid run row"); + } + return { + id: String(row.id), + eventId: String(row.event_id), + flowName: String(row.flow_name), + stepName: String(row.step_name), + status: String(row.status) as FlowRunStatus, + backend: "systemd-local", + executor: String(row.executor), + ...(typeof row.unit === "string" ? { unit: row.unit } : {}), + eventPath: String(row.event_path), + ...(typeof row.command_json === "string" ? { commandJson: row.command_json } : {}), + ...(typeof row.result_json === "string" ? { resultJson: row.result_json } : {}), + ...(typeof row.stdout === "string" ? { stdout: row.stdout } : {}), + ...(typeof row.stderr === "string" ? { stderr: row.stderr } : {}), + ...(typeof row.error === "string" ? { error: row.error } : {}), + createdAt: String(row.created_at), + ...(typeof row.started_at === "string" ? { startedAt: row.started_at } : {}), + ...(typeof row.completed_at === "string" ? { completedAt: row.completed_at } : {}), + }; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/apps/flow-backend-systemd-local/test/backend.test.ts b/apps/flow-backend-systemd-local/test/backend.test.ts new file mode 100644 index 0000000..956633f --- /dev/null +++ b/apps/flow-backend-systemd-local/test/backend.test.ts @@ -0,0 +1,121 @@ +import { expect, test } from "bun:test"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { dispatchFlowEvent } from "../src/backend.ts"; +import { readConfig } from "../src/config.ts"; +import { flowCommand } from "../src/executor.ts"; +import { signBody, verifyBodySignature } from "../src/signature.ts"; +import { FlowBackendStore } from "../src/store.ts"; + +test("signs and verifies dispatch bodies", () => { + const body = JSON.stringify({ id: "event-1" }); + const signature = signBody("secret", body); + + expect(verifyBodySignature("secret", body, signature)).toBe(true); + expect(verifyBodySignature("secret", `${body}\n`, signature)).toBe(false); +}); + +test("dispatches matching flow steps and records runs", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-backend-")); + try { + await writeFlow(directory); + const config = readConfig( + {}, + { + cwd: directory, + dataDir: path.join(directory, ".codex", "flow-backend"), + executor: "direct", + bunCommand: process.execPath, + }, + ); + const store = new FlowBackendStore(path.join(config.dataDir, "flow-backend.sqlite")); + try { + const result = await dispatchFlowEvent({ + config, + store, + wait: true, + env: {}, + event: { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { name: "Ada" }, + }, + }); + + expect(result).toMatchObject({ status: "accepted", eventId: "event-1", matched: 1 }); + const runs = store.listRunsByEvent("event-1"); + expect(runs).toHaveLength(1); + expect(runs[0]).toMatchObject({ + flowName: "demo", + stepName: "hello", + status: "completed", + }); + expect(runs[0]?.stdout).toContain("hello Ada"); + } finally { + store.close(); + } + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("builds systemd-run commands without executing them", () => { + const config = readConfig({}, { cwd: "/tmp/project", executor: "systemd-run", bunCommand: "/usr/bin/bun" }); + const command = flowCommand({ + config, + runId: "run_123", + eventPath: "/tmp/event.json", + flowName: "demo", + stepName: "hello", + env: { CODEX_FLOWS_ENABLE_CODE_MODE: "1" }, + }); + + expect(command.command).toBe("systemd-run"); + expect(command.args).toContain("--user"); + expect(command.args).toContain("--wait"); + expect(command.args).toContain("--setenv=CODEX_FLOWS_ENABLE_CODE_MODE=1"); + expect(command.args).toContain("/usr/bin/bun"); +}); + +async function writeFlow(root: string): Promise { + const flowRoot = path.join(root, "flows", "demo"); + await mkdir(path.join(flowRoot, "exec"), { recursive: true }); + await mkdir(path.join(flowRoot, "schemas"), { recursive: true }); + await Bun.write( + path.join(flowRoot, "flow.toml"), + [ + 'name = "demo"', + "version = 1", + "", + "[[steps]]", + 'name = "hello"', + 'runner = "bun"', + 'script = "exec/hello.ts"', + "timeout_ms = 30000", + "", + "[steps.trigger]", + 'type = "demo.event"', + 'schema = "schemas/demo-event.schema.json"', + "", + ].join("\n"), + ); + await Bun.write( + path.join(flowRoot, "schemas/demo-event.schema.json"), + JSON.stringify({ + type: "object", + required: ["name"], + properties: { name: { type: "string" } }, + }), + ); + await Bun.write( + path.join(flowRoot, "exec/hello.ts"), + [ + "const context = JSON.parse(await Bun.stdin.text());", + "const name = context.flow.event.payload.name;", + "console.log(`FLOW_RESULT ${JSON.stringify({ status: 'completed', message: `hello ${name}` })}`);", + "", + ].join("\n"), + ); +} diff --git a/apps/flow-backend-systemd-local/tsconfig.json b/apps/flow-backend-systemd-local/tsconfig.json new file mode 100644 index 0000000..37f0b51 --- /dev/null +++ b/apps/flow-backend-systemd-local/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "module": "ESNext", + "moduleResolution": "Bundler", + "allowImportingTsExtensions": true, + "esModuleInterop": true, + "resolveJsonModule": true, + "target": "ES2022", + "lib": ["ESNext"], + "strict": true, + "skipLibCheck": true, + "noUncheckedIndexedAccess": true, + "isolatedModules": true, + "verbatimModuleSyntax": true, + "forceConsistentCasingInFileNames": true, + "noEmit": true, + "types": ["node", "bun"], + "rootDir": "." + }, + "include": ["src/**/*.ts", "test/**/*.ts"] +} diff --git a/apps/flow-runner/package.json b/apps/flow-runner/package.json new file mode 100644 index 0000000..3f6254e --- /dev/null +++ b/apps/flow-runner/package.json @@ -0,0 +1,24 @@ +{ + "name": "codex-flow-runner", + "version": "0.1.0", + "description": "CLI for listing, firing, and running Codex flow packages.", + "type": "module", + "private": true, + "license": "Apache-2.0", + "bin": { + "codex-flow-runner": "./src/index.ts" + }, + "scripts": { + "build": "tsc --noEmit", + "check:types": "tsc --noEmit", + "test": "bun test test/*.test.ts" + }, + "dependencies": { + "@peezy.tech/flow-runtime": "workspace:*" + }, + "devDependencies": { + "@types/bun": "catalog:", + "@types/node": "catalog:", + "typescript": "catalog:" + } +} diff --git a/apps/flow-runner/src/index.ts b/apps/flow-runner/src/index.ts new file mode 100644 index 0000000..87078b1 --- /dev/null +++ b/apps/flow-runner/src/index.ts @@ -0,0 +1,172 @@ +#!/usr/bin/env bun +import path from "node:path"; +import { + discoverFlows, + matchingSteps, + runFlowStep, + type FlowEvent, + type LoadedFlow, + type FlowStep, +} from "@peezy.tech/flow-runtime"; + +type Cli = + | { kind: "help" } + | { kind: "list"; cwd: string } + | { kind: "fire"; cwd: string; eventPath: string } + | { kind: "run"; cwd: string; flow: string; step: string; eventPath: string }; + +await main().catch((error) => { + process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + process.exitCode = 1; +}); + +async function main(): Promise { + const cli = parseArgs(Bun.argv.slice(2)); + if (cli.kind === "help") { + process.stdout.write(helpText()); + return; + } + const flows = await discoverFlows({ cwd: cli.cwd }); + if (cli.kind === "list") { + for (const flow of flows) { + process.stdout.write(`${flow.manifest.name}\t${flow.root}\n`); + } + return; + } + const event = await readEvent(cli.eventPath); + if (cli.kind === "fire") { + const matches = await matchingSteps(flows, event); + const results = []; + for (const match of matches) { + results.push(await runAndReport(match.flow, match.step, event)); + } + process.stdout.write(`${JSON.stringify({ eventId: event.id, results }, null, 2)}\n`); + return; + } + const flow = requireFlow(flows, cli.flow); + const step = requireStep(flow, cli.step); + const result = await runAndReport(flow, step, event); + process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); +} + +async function runAndReport(flow: LoadedFlow, step: FlowStep, event: FlowEvent): Promise> { + const result = await runFlowStep({ + flow, + step, + event, + env: process.env, + codeMode: { + codexCommand: process.env.CODEX_APP_SERVER_CODEX_COMMAND, + codexHome: process.env.CODEX_HOME, + stream: true, + }, + }); + return { + flow: flow.manifest.name, + step: step.name, + result, + }; +} + +async function readEvent(eventPath: string): Promise { + const parsed = JSON.parse(await Bun.file(path.resolve(eventPath)).text()) as unknown; + if (!isRecord(parsed) || typeof parsed.id !== "string" || typeof parsed.type !== "string") { + throw new Error("event file must contain at least string id and type"); + } + return { + receivedAt: new Date().toISOString(), + payload: {}, + ...parsed, + } as FlowEvent; +} + +function requireFlow(flows: LoadedFlow[], name: string): LoadedFlow { + const flow = flows.find((entry) => entry.manifest.name === name); + if (!flow) { + throw new Error(`Unknown flow: ${name}`); + } + return flow; +} + +function requireStep(flow: LoadedFlow, name: string): FlowStep { + const step = flow.manifest.steps.find((entry) => entry.name === name); + if (!step) { + throw new Error(`Unknown step ${name} in flow ${flow.manifest.name}`); + } + return step; +} + +function parseArgs(argv: string[]): Cli { + let cwd = process.cwd(); + const args: string[] = []; + for (let index = 0; index < argv.length; index += 1) { + const arg = argv[index]; + if (!arg) { + continue; + } + if (arg === "--cwd") { + cwd = path.resolve(required(argv, ++index, arg)); + continue; + } + if (arg.startsWith("--cwd=")) { + cwd = path.resolve(arg.slice("--cwd=".length)); + continue; + } + args.push(arg); + } + + const command = args[0]; + if (!command || command === "-h" || command === "--help" || command === "help") { + return { kind: "help" }; + } + if (command === "list") { + return { kind: "list", cwd }; + } + if (command === "fire") { + return { kind: "fire", cwd, eventPath: eventPathArg(args, 1) }; + } + if (command === "run") { + const flow = args[1]; + const step = args[2]; + if (!flow || !step) { + throw new Error("run requires "); + } + return { kind: "run", cwd, flow, step, eventPath: eventPathArg(args, 3) }; + } + throw new Error(`Unknown command: ${command}`); +} + +function eventPathArg(args: string[], start: number): string { + for (let index = start; index < args.length; index += 1) { + const arg = args[index]; + if (arg === "--event") { + return required(args, index + 1, "--event"); + } + if (arg?.startsWith("--event=")) { + return arg.slice("--event=".length); + } + } + throw new Error("missing --event "); +} + +function required(args: string[], index: number, flag: string): string { + const value = args[index]; + if (!value) { + throw new Error(`${flag} requires a value`); + } + return value; +} + +function helpText(): string { + return [ + "Usage:", + " codex-flow-runner [--cwd ] list", + " codex-flow-runner [--cwd ] fire --event ", + " codex-flow-runner [--cwd ] run --event ", + "", + ].join("\n"); +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/apps/flow-runner/tsconfig.json b/apps/flow-runner/tsconfig.json new file mode 100644 index 0000000..37f0b51 --- /dev/null +++ b/apps/flow-runner/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "module": "ESNext", + "moduleResolution": "Bundler", + "allowImportingTsExtensions": true, + "esModuleInterop": true, + "resolveJsonModule": true, + "target": "ES2022", + "lib": ["ESNext"], + "strict": true, + "skipLibCheck": true, + "noUncheckedIndexedAccess": true, + "isolatedModules": true, + "verbatimModuleSyntax": true, + "forceConsistentCasingInFileNames": true, + "noEmit": true, + "types": ["node", "bun"], + "rootDir": "." + }, + "include": ["src/**/*.ts", "test/**/*.ts"] +} diff --git a/bun.lock b/bun.lock index 36d618a..ca6b232 100644 --- a/bun.lock +++ b/bun.lock @@ -36,6 +36,36 @@ "typescript": "catalog:", }, }, + "apps/flow-backend-systemd-local": { + "name": "codex-flow-systemd-local", + "version": "0.1.0", + "bin": { + "codex-flow-systemd-local": "./src/index.ts", + }, + "dependencies": { + "@peezy.tech/flow-runtime": "workspace:*", + }, + "devDependencies": { + "@types/bun": "catalog:", + "@types/node": "catalog:", + "typescript": "catalog:", + }, + }, + "apps/flow-runner": { + "name": "codex-flow-runner", + "version": "0.1.0", + "bin": { + "codex-flow-runner": "./src/index.ts", + }, + "dependencies": { + "@peezy.tech/flow-runtime": "workspace:*", + }, + "devDependencies": { + "@types/bun": "catalog:", + "@types/node": "catalog:", + "typescript": "catalog:", + }, + }, "apps/web": { "name": "web", "version": "0.0.1", @@ -58,13 +88,25 @@ }, "packages/codex-client": { "name": "@peezy.tech/codex-flows", - "version": "0.1.0", + "version": "0.1.1", "devDependencies": { "@types/bun": "^1.3.13", "@types/node": "^22.10.10", "typescript": "^5.9.2", }, }, + "packages/flow-runtime": { + "name": "@peezy.tech/flow-runtime", + "version": "0.1.0", + "dependencies": { + "@peezy.tech/codex-flows": "workspace:*", + }, + "devDependencies": { + "@types/bun": "catalog:", + "@types/node": "catalog:", + "typescript": "catalog:", + }, + }, "packages/ui": { "name": "@workspace/ui", "version": "0.0.0", @@ -297,6 +339,8 @@ "@peezy.tech/codex-flows": ["@peezy.tech/codex-flows@workspace:packages/codex-client"], + "@peezy.tech/flow-runtime": ["@peezy.tech/flow-runtime@workspace:packages/flow-runtime"], + "@rolldown/pluginutils": ["@rolldown/pluginutils@1.0.0-rc.3", "", {}, "sha512-eybk3TjzzzV97Dlj5c+XrBFW57eTNhzod66y9HrBlzJ6NsCrWCp/2kaPS3K9wJmurBC0Tdw4yPjXKZqlznim3Q=="], "@rollup/rollup-android-arm-eabi": ["@rollup/rollup-android-arm-eabi@4.60.3", "", { "os": "android", "cpu": "arm" }, "sha512-x35CNW/ANXG3hE/EZpRU8MXX1JDN86hBb2wMGAtltkz7pc6cxgjpy1OMMfDosOQ+2hWqIkag/fGok1Yady9nGw=="], @@ -485,6 +529,10 @@ "codex-discord-bridge": ["codex-discord-bridge@workspace:apps/discord-bridge"], + "codex-flow-runner": ["codex-flow-runner@workspace:apps/flow-runner"], + + "codex-flow-systemd-local": ["codex-flow-systemd-local@workspace:apps/flow-backend-systemd-local"], + "color-convert": ["color-convert@2.0.1", "", { "dependencies": { "color-name": "~1.1.4" } }, "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ=="], "color-name": ["color-name@1.1.4", "", {}, "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA=="], diff --git a/docs/flows.md b/docs/flows.md new file mode 100644 index 0000000..d953c17 --- /dev/null +++ b/docs/flows.md @@ -0,0 +1,164 @@ +# Flows + +Flows are packaged automation units. They are discovered from `.codex/flows/*` +first and then `flows/*`, with the installed `.codex` copy taking precedence. + +Each flow has: + +```text +flow.toml +schemas/*.schema.json +exec/* +``` + +`flow.toml` is the manifest. Use `flow` naming consistently: + +```toml +name = "example-flow" +version = 1 +description = "Short operational purpose." + +[config] +commit = true + +[[steps]] +name = "do-work" +runner = "bun" +script = "exec/do-work.ts" +timeout_ms = 300000 + +[steps.trigger] +type = "upstream.release" +schema = "schemas/upstream-release.schema.json" +``` + +The runtime passes a generic event to every step: + +```ts +type FlowEvent = { + id: string; + type: string; + source?: string; + occurredAt?: string; + receivedAt: string; + payload: T; +}; +``` + +Domain payload types live in each flow package as JSON Schema files and are +referenced by `steps.trigger.schema`. + +## Runners + +`runner = "bun"` executes the script directly with Bun. The step receives JSON +on stdin: + +```json +{ + "flow": { + "name": "example-flow", + "version": 1, + "root": "/repo/flows/example-flow", + "step": "do-work", + "config": {}, + "event": {} + } +} +``` + +The script must print a final line beginning with `FLOW_RESULT ` followed by +JSON. + +`runner = "code-mode"` starts a Codex app-server and calls the fork-only +`thread/codeMode/execute` method through a raw JSON-RPC request. Code Mode code +is present on `main`, but execution is disabled unless: + +```bash +CODEX_FLOWS_ENABLE_CODE_MODE=1 +``` + +Set `CODEX_APP_SERVER_CODEX_COMMAND` when Code Mode should run against the +Peezy fork instead of the default `codex` binary. + +## Commands + +List flows: + +```bash +bun run flow list +``` + +Fire all matching steps for an event: + +```bash +bun run flow fire --event event.json +``` + +Run one step: + +```bash +bun run flow run openai-codex-bindings regenerate-bindings --event event.json +``` + +## Systemd-Local Backend + +`codex-flow-systemd-local` is the first execution backend. Patchbay posts +generic `FlowEvent` JSON to this service; the service persists events and runs +to SQLite, discovers matching flow steps, and starts each step locally. + +Run it directly: + +```bash +bun run flow:backend serve --cwd /home/peezy/codex-flows-public +``` + +Useful environment: + +```bash +CODEX_FLOW_BACKEND_HOST=127.0.0.1 +CODEX_FLOW_BACKEND_PORT=7345 +CODEX_FLOW_BACKEND_DATA_DIR=/var/lib/codex-flow-systemd-local +CODEX_FLOW_BACKEND_SECRET=shared-hmac-secret +CODEX_FLOW_BACKEND_EXECUTOR=direct +``` + +`CODEX_FLOW_BACKEND_EXECUTOR=systemd-run` wraps each step in a transient +`systemd-run --user --wait --collect` unit. The default `direct` executor is +still suitable when the backend service itself is managed by systemd. + +Endpoints: + +- `POST /events` or `POST /flow-events`: accept one `FlowEvent` +- `GET /runs?eventId=`: inspect recorded runs for an event +- `GET /healthz`: health check + +## Convex Backend Direction + +Convex should be a durable orchestration backend, not the place where long +running Codex or shell work executes. A future Convex backend should: + +- accept the same generic `FlowEvent` shape +- persist event, run, step, retry, and result records durably +- choose matching flow steps from a stored or installed flow manifest +- lease work to an external worker or remote app-server +- receive heartbeats and final `FLOW_RESULT` records from that worker +- expose programmatic fire/retry/cancel APIs + +This keeps Patchbay dispatch-only, keeps Convex durable, and keeps process-heavy +work on infrastructure that can run Codex, Bun, Git, Cargo, and system tools. + +## Codex Release Flows + +The upstream `openai/codex` release event fans out to two flow packages: + +- `openai-codex-bindings`: Bun runner. Uses canonical `@openai/codex@version`, + regenerates `@peezy.tech/codex-flows` app-server bindings, runs checks, + commits when changed, and can push/trigger trusted publishing when configured. +- `peezy-codex-fork`: Code Mode runner. Rebases the Peezy fork patch stack onto + the upstream release tag, optionally squashes the patch stack, verifies the + fork, and can push/tag to trigger the fork release workflow when configured. + +Publishing is controlled by flow config and environment. The packaged defaults +commit local changes when appropriate but do not push or publish until +`push = true`, `publish = true`, or matching `CODEX_FLOW_PUSH=1` / +`CODEX_FLOW_PUBLISH=1` deployment configuration is set. diff --git a/flows/openai-codex-bindings/exec/update-bindings.ts b/flows/openai-codex-bindings/exec/update-bindings.ts new file mode 100644 index 0000000..f67bc00 --- /dev/null +++ b/flows/openai-codex-bindings/exec/update-bindings.ts @@ -0,0 +1,196 @@ +import { readFile, writeFile } from "node:fs/promises"; +import path from "node:path"; + +type FlowContext = { + flow: { + config?: Record; + event: { + id: string; + type: string; + payload: Record; + }; + }; +}; + +type CommandResult = { + label: string; + cmd: string[]; + cwd: string; + exitCode: number | null; + stdout: string; + stderr: string; +}; + +const context = JSON.parse(await Bun.stdin.text()) as FlowContext; +const config = context.flow.config ?? {}; +const repoRoot = process.cwd(); +const commands: CommandResult[] = []; + +try { + const tag = stringValue(context.flow.event.payload.tag, "payload.tag"); + const version = versionFromTag(tag); + const packageName = stringConfig("package_name", "@peezy.tech/codex-flows"); + const generatedDir = path.resolve(repoRoot, stringConfig("generated_dir", "packages/codex-client/src/app-server/generated")); + const packageJsonPath = path.resolve(repoRoot, stringConfig("package_json", "packages/codex-client/package.json")); + + const published = await npmPackageExists(packageName, version); + if (published && !enabled("force", false)) { + finish("skipped", `${packageName}@${version} is already published.`, { version, tag }); + } + + await requireCleanWorktree(); + await run("regenerate app-server TypeScript bindings", [ + "npx", + "-y", + `@openai/codex@${version}`, + "app-server", + "generate-ts", + "--experimental", + "--out", + generatedDir, + ]); + + await updatePackageVersion(packageJsonPath, version); + await run("refresh Bun lockfile", ["bun", "install"]); + await run("codex-flows package release check", ["bun", "run", "--filter", packageName, "release:check"]); + await run("workspace typecheck", ["bun", "run", "check:types"]); + await run("workspace tests", ["bun", "run", "test"]); + await run("git diff check", ["git", "diff", "--check"]); + + const status = await run("final git status", ["git", "status", "--short"]); + if (!status.stdout.trim()) { + finish("skipped", `No generated binding changes for ${tag}.`, { version, tag }); + } + + if (enabled("commit", true)) { + await run("stage binding update", ["git", "add", "--", generatedDir, packageJsonPath, path.join(repoRoot, "bun.lock")]); + await run("commit binding update", [ + "git", + "commit", + "-m", + `flow: update codex-flows for openai codex ${version}`, + ]); + } + + if (enabled("push", false)) { + await run("push jojo main", ["git", "push", "origin", "HEAD:main"]); + } + + if (enabled("publish", false)) { + await run("push GitHub main", ["git", "push", "github", "HEAD:main"]); + await run("trigger GitHub trusted publish", [ + "gh", + "workflow", + "run", + stringConfig("github_publish_workflow", "publish-codex-flows.yml"), + "--repo", + stringConfig("github_repo", "peezy-tech/codex-flows"), + "-f", + `confirm_package=${packageName}`, + ]); + } + + finish("changed", `${packageName} regenerated for openai/codex ${tag}.`, { + version, + tag, + committed: enabled("commit", true), + pushed: enabled("push", false), + published: enabled("publish", false), + }); +} catch (error) { + finish("failed", error instanceof Error ? error.message : String(error)); +} + +async function requireCleanWorktree(): Promise { + const status = await run("dirty worktree check", ["git", "status", "--porcelain=v1"]); + if (status.stdout.trim()) { + finish("blocked", "codex-flows checkout has local changes before the release update.", { + dirtyStatus: status.stdout, + }); + } +} + +async function npmPackageExists(packageName: string, version: string): Promise { + const result = await run("published package check", [ + "npm", + "view", + `${packageName}@${version}`, + "version", + "--json", + ], { allowFailure: true }); + return result.exitCode === 0 && result.stdout.includes(version); +} + +async function updatePackageVersion(packageJsonPath: string, version: string): Promise { + const parsed = JSON.parse(await readFile(packageJsonPath, "utf8")) as Record; + parsed.version = version; + await writeFile(packageJsonPath, `${JSON.stringify(parsed, null, "\t")}\n`); +} + +async function run( + label: string, + cmd: string[], + options: { allowFailure?: boolean; cwd?: string } = {}, +): Promise { + const child = Bun.spawn(cmd, { + cwd: options.cwd ?? repoRoot, + env: process.env, + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([ + child.stdout.text(), + child.stderr.text(), + child.exited, + ]); + const result = { label, cmd, cwd: options.cwd ?? repoRoot, exitCode, stdout, stderr }; + commands.push(result); + if (exitCode !== 0 && !options.allowFailure) { + throw new Error(`${label} failed with exit ${exitCode}:\n${stderr || stdout}`); + } + return result; +} + +function finish(status: string, message: string, artifacts: Record = {}): never { + const trimmedCommands = commands.map((command) => ({ + ...command, + stdout: truncate(command.stdout), + stderr: truncate(command.stderr), + })); + console.log(`FLOW_RESULT ${JSON.stringify({ status, message, artifacts: { ...artifacts, commands: trimmedCommands } })}`); + process.exit(0); +} + +function enabled(name: string, fallback: boolean): boolean { + const envName = `CODEX_FLOW_${name.toUpperCase()}`; + const envValue = process.env[envName]; + if (envValue !== undefined) { + return ["1", "true", "yes", "on"].includes(envValue.trim().toLowerCase()); + } + const value = config[name]; + return typeof value === "boolean" ? value : fallback; +} + +function stringConfig(name: string, fallback: string): string { + const value = config[name]; + return typeof value === "string" && value.trim() ? value : fallback; +} + +function stringValue(value: unknown, name: string): string { + if (typeof value !== "string" || !value.trim()) { + throw new Error(`${name} must be a non-empty string`); + } + return value; +} + +function versionFromTag(tag: string): string { + const match = tag.match(/[0-9]+\.[0-9]+\.[0-9]+(?:-[0-9A-Za-z.-]+)?/); + if (!match) { + throw new Error(`Could not infer semantic version from release tag ${tag}`); + } + return match[0]; +} + +function truncate(value: string, max = 4000): string { + return value.length <= max ? value : `${value.slice(0, max)}\n...[truncated ${value.length - max} chars]`; +} diff --git a/flows/openai-codex-bindings/flow.toml b/flows/openai-codex-bindings/flow.toml new file mode 100644 index 0000000..a81250a --- /dev/null +++ b/flows/openai-codex-bindings/flow.toml @@ -0,0 +1,24 @@ +name = "openai-codex-bindings" +version = 1 +description = "Regenerate @peezy.tech/codex-flows bindings from a canonical openai/codex release." + +[config] +package_name = "@peezy.tech/codex-flows" +generated_dir = "packages/codex-client/src/app-server/generated" +package_json = "packages/codex-client/package.json" +commit = true +push = false +publish = false +github_repo = "peezy-tech/codex-flows" +github_publish_workflow = "publish-codex-flows.yml" + +[[steps]] +name = "regenerate-bindings" +runner = "bun" +script = "exec/update-bindings.ts" +cwd = "../.." +timeout_ms = 1200000 + +[steps.trigger] +type = "upstream.release" +schema = "schemas/upstream-release.schema.json" diff --git a/flows/openai-codex-bindings/schemas/upstream-release.schema.json b/flows/openai-codex-bindings/schemas/upstream-release.schema.json new file mode 100644 index 0000000..da9eea5 --- /dev/null +++ b/flows/openai-codex-bindings/schemas/upstream-release.schema.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "required": ["repo", "tag"], + "properties": { + "provider": { "type": "string" }, + "repo": { "type": "string", "enum": ["openai/codex"] }, + "tag": { "type": "string" }, + "url": { "type": "string" }, + "publishedAt": { "type": "string" } + } +} diff --git a/flows/peezy-codex-fork/exec/update-fork.code-mode.js b/flows/peezy-codex-fork/exec/update-fork.code-mode.js new file mode 100644 index 0000000..5eb1b3c --- /dev/null +++ b/flows/peezy-codex-fork/exec/update-fork.code-mode.js @@ -0,0 +1,374 @@ +const config = flow.config || {}; +const payload = flow.event.payload || {}; +const commands = []; + +function q(value) { + return "'" + String(value).replaceAll("'", "'\\''") + "'"; +} + +function trim(value) { + return String(value || "").trim(); +} + +function truncate(value, max) { + const textValue = String(value || ""); + if (textValue.length <= max) { + return textValue; + } + return textValue.slice(0, max) + "\n...[truncated " + String(textValue.length - max) + " chars]"; +} + +function outputOf(result) { + if (typeof result?.output === "string") { + return result.output; + } + return JSON.stringify(result ?? {}); +} + +function exitCodeOf(result) { + if (typeof result?.exit_code === "number") { + return result.exit_code; + } + if (typeof result?.exitCode === "number") { + return result.exitCode; + } + return null; +} + +function ok(result) { + return result.exit_code === 0; +} + +function cfg(name, fallback) { + const value = config[name]; + return typeof value === "string" && value.trim() ? value : fallback; +} + +function enabled(name, fallback) { + const value = config[name]; + return typeof value === "boolean" ? value : fallback; +} + +function versionFromTag(tag) { + const match = String(tag).match(/[0-9]+\.[0-9]+\.[0-9]+(?:-[0-9A-Za-z.-]+)?/); + return match ? match[0] : ""; +} + +async function env(name) { + if (!name) { + return ""; + } + const result = await tools.exec_command({ + cmd: "printf %s \"${" + name + ":-}\"", + workdir: flow.root, + yield_time_ms: 1000, + max_output_tokens: 2000 + }); + return trim(outputOf(result)); +} + +async function run(label, cmd, options = {}) { + const workdir = options.workdir || codexRepo; + text("\n### " + label + "\n$ " + cmd + "\n"); + const raw = await tools.exec_command({ + cmd, + workdir, + yield_time_ms: options.yield_time_ms || 1000, + max_output_tokens: options.max_output_tokens || 12000 + }); + const result = { + label, + cmd, + workdir, + exit_code: exitCodeOf(raw), + output: outputOf(raw) + }; + commands.push({ ...result, output: truncate(result.output, 4000) }); + text("exit_code=" + String(result.exit_code) + "\n" + truncate(result.output, options.textLimit || 12000) + "\n"); + return result; +} + +function finish(status, message, artifacts = {}) { + result({ + status, + message, + artifacts: { + releaseTag, + version, + codexRepo, + targetBranch, + commands, + ...artifacts + } + }); +} + +async function collectRebaseContext(rebaseOutput, beforeSha) { + const status = await run("rebase conflict status", "git status --short --branch", { max_output_tokens: 12000 }); + const unmerged = await run("unmerged files", "git diff --name-only --diff-filter=U", { max_output_tokens: 12000 }); + const diffStat = await run("conflict diff stat", "git diff --cc --stat", { max_output_tokens: 12000 }); + const conflictDiff = await run("conflict diff", "git diff --cc", { max_output_tokens: 30000, textLimit: 20000 }); + const currentPatch = await run("current rebase patch", "git rebase --show-current-patch", { max_output_tokens: 20000, textLimit: 12000 }); + return { + beforeSha, + rebaseOutput, + statusOutput: status.output, + unmergedFiles: unmerged.output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean), + diffStat: diffStat.output, + conflictDiff: truncate(conflictDiff.output, 20000), + currentPatch: truncate(currentPatch.output, 12000), + interventionPrompt: "Continue this same Code Mode thread to resolve the paused rebase. Preserve the fork patch stack, do not abort or reset unless explicitly instructed, then run the configured verification commands." + }; +} + +const releaseTag = String(payload.tag || ""); +const version = versionFromTag(releaseTag); +const packageName = cfg("package_name", "@peezy.tech/codex"); +const targetBranch = cfg("target_branch", "main"); +const upstreamRemote = cfg("upstream_remote", "upstream"); +const upstreamRepoUrl = cfg("upstream_repo_url", "https://github.com/openai/codex.git"); +const cargoTargetDir = (await env(cfg("cargo_target_dir_env", ""))) || cfg("cargo_target_dir", "/tmp/peezy-codex-flow-target"); +const codexRepo = (await env(cfg("codex_repo_env", ""))) || cfg("codex_repo", ""); +const codexRustDir = codexRepo + "/codex-rs"; +const codexBinary = cargoTargetDir + "/debug/codex"; + +if (!releaseTag) { + finish("failed", "Release payload is missing tag."); +} +if (!version) { + finish("failed", "Could not infer semantic version from release tag " + releaseTag); +} +if (!codexRepo) { + finish("blocked", "No Codex fork checkout configured. Set codex_repo or codex_repo_env in flow.toml."); +} + +text([ + "Peezy Codex fork update flow", + "", + "Release: " + releaseTag, + "Version: " + version, + "Target branch: " + targetBranch, + "Codex repo: " + codexRepo, + "Upstream remote: " + upstreamRemote + " -> " + upstreamRepoUrl, + "Cargo target dir: " + cargoTargetDir +].join("\n") + "\n"); + +const published = await run("published fork package check", "npm view " + q(packageName + "@" + version) + " version --json", { + max_output_tokens: 4000 +}); +if (ok(published) && !enabled("force", false)) { + finish("skipped", packageName + "@" + version + " is already published."); +} + +const repoCheck = await run("verify codex repo", "git rev-parse --show-toplevel"); +if (!ok(repoCheck)) { + finish("failed", "codex repo is not a git checkout", { repoCheck: repoCheck.output }); +} + +const rustWorkspaceCheck = await run("verify codex Rust workspace", "test -f " + q(codexRustDir + "/Cargo.toml"), { + max_output_tokens: 4000 +}); +if (!ok(rustWorkspaceCheck)) { + finish("failed", "codex Rust workspace was not found at the expected codex-rs path.", { + codexRustDir, + rustWorkspaceCheck: rustWorkspaceCheck.output + }); +} + +const existingRebase = await run( + "check existing rebase state", + "test -d \"$(git rev-parse --git-path rebase-merge)\" -o -d \"$(git rev-parse --git-path rebase-apply)\"", + { max_output_tokens: 4000 } +); +if (existingRebase.exit_code === 0) { + const context = await collectRebaseContext("A rebase was already in progress before this flow started.", undefined); + finish("blocked", "A rebase is already in progress in the Codex checkout.", context); +} + +await run("codex status before update", "git status --short --branch", { max_output_tokens: 12000 }); +const branch = await run("current branch", "git rev-parse --abbrev-ref HEAD", { max_output_tokens: 4000 }); +if (!ok(branch)) { + finish("failed", "could not read current branch", { branchOutput: branch.output }); +} + +if (trim(branch.output) !== targetBranch) { + const dirtyBeforeSwitch = await run("dirty check before branch switch", "git status --porcelain=v1", { max_output_tokens: 12000 }); + if (trim(dirtyBeforeSwitch.output)) { + finish("blocked", "codex checkout has local changes before switching branches.", { + dirtyStatus: dirtyBeforeSwitch.output + }); + } + const switched = await run("switch target branch", "git switch " + q(targetBranch), { max_output_tokens: 12000 }); + if (!ok(switched)) { + finish("failed", "could not switch to target branch", { switchOutput: switched.output }); + } +} + +const dirty = await run("dirty check on target branch", "git status --porcelain=v1", { max_output_tokens: 12000 }); +if (trim(dirty.output)) { + finish("blocked", "codex target branch has local changes. Resolve or stash them before updating.", { + dirtyStatus: dirty.output + }); +} + +const remote = await run( + "ensure upstream openai/codex remote", + "git remote get-url " + q(upstreamRemote) + " >/dev/null 2>&1 && git remote set-url " + q(upstreamRemote) + " " + q(upstreamRepoUrl) + " || git remote add " + q(upstreamRemote) + " " + q(upstreamRepoUrl), + { max_output_tokens: 12000 } +); +if (!ok(remote)) { + finish("failed", "could not configure upstream remote", { remoteOutput: remote.output }); +} + +const fetch = await run("fetch upstream tags", "git fetch " + q(upstreamRemote) + " --tags --prune", { + max_output_tokens: 20000 +}); +if (!ok(fetch)) { + finish("failed", "could not fetch upstream release tags", { fetchOutput: fetch.output }); +} + +const releaseCommit = await run("resolve release tag", "git rev-parse --verify " + q("refs/tags/" + releaseTag + "^{commit}"), { + max_output_tokens: 4000 +}); +if (!ok(releaseCommit)) { + finish("failed", "could not resolve upstream release tag after fetch", { + releaseTag, + resolveOutput: releaseCommit.output + }); +} + +const beforeHead = await run("codex head before rebase", "git rev-parse HEAD", { max_output_tokens: 4000 }); +const rebase = await run("rebase target branch onto upstream release", "git rebase " + q(releaseTag), { + max_output_tokens: 30000, + textLimit: 20000 +}); +if (!ok(rebase)) { + const context = await collectRebaseContext(rebase.output, trim(beforeHead.output)); + finish("needs_intervention", "Rebase paused with conflicts.", context); +} + +if (enabled("squash_patch_stack", true)) { + const count = await run("count fork patch commits", "git rev-list --count " + q(releaseTag) + "..HEAD", { + max_output_tokens: 4000 + }); + const commitCount = Number(trim(count.output)); + if (Number.isFinite(commitCount) && commitCount > 1) { + const reset = await run("squash patch stack reset", "git reset --soft " + q(releaseTag), { max_output_tokens: 12000 }); + if (!ok(reset)) { + finish("failed", "could not soft reset patch stack for squashing", { resetOutput: reset.output }); + } + const commit = await run("squash patch stack commit", "git commit -m " + q("peezy: codex fork patches for " + releaseTag), { + max_output_tokens: 20000 + }); + if (!ok(commit)) { + finish("failed", "could not commit squashed patch stack", { commitOutput: commit.output }); + } + } +} + +const afterHead = await run("codex head after rebase", "git rev-parse HEAD", { max_output_tokens: 4000 }); +await run("codex status after rebase", "git status --short --branch", { max_output_tokens: 12000 }); + +const build = await run( + "build fork binary", + "CARGO_TARGET_DIR=" + q(cargoTargetDir) + " cargo build -p codex-cli --bin codex", + { workdir: codexRustDir, max_output_tokens: 30000, textLimit: 20000 } +); +if (!ok(build)) { + finish("failed", "fork binary build failed", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + buildOutput: build.output + }); +} + +const versionCheck = await run("verify fork binary", q(codexBinary) + " --version", { max_output_tokens: 4000 }); +if (!ok(versionCheck)) { + finish("failed", "built fork binary did not run", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + versionOutput: versionCheck.output + }); +} + +const cargoCheck = await run( + "cargo check code mode packages", + "CARGO_TARGET_DIR=" + q(cargoTargetDir) + " cargo check -p codex-app-server -p codex-core -p codex-app-server-protocol", + { workdir: codexRustDir, max_output_tokens: 30000, textLimit: 20000 } +); +if (!ok(cargoCheck)) { + finish("failed", "cargo check failed after rebase", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + cargoCheckOutput: cargoCheck.output + }); +} + +const protocolTest = await run( + "protocol code mode execute test", + "CARGO_TARGET_DIR=" + q(cargoTargetDir) + " cargo test -p codex-app-server-protocol thread_code_mode_execute -- --nocapture", + { workdir: codexRustDir, max_output_tokens: 30000, textLimit: 20000 } +); +if (!ok(protocolTest)) { + finish("failed", "protocol Code Mode API test failed after rebase", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + protocolTestOutput: protocolTest.output + }); +} + +const fmt = await run("cargo fmt check", "cargo fmt --check", { + workdir: codexRustDir, + max_output_tokens: 20000 +}); +if (!ok(fmt)) { + finish("failed", "cargo fmt --check failed after rebase", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + fmtOutput: fmt.output + }); +} + +const diffCheck = await run("codex diff whitespace check", "git diff --check", { max_output_tokens: 12000 }); +if (!ok(diffCheck)) { + finish("failed", "codex git diff --check failed after rebase", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + diffCheckOutput: diffCheck.output + }); +} + +if (enabled("push", false)) { + const push = await run("push fork branch", "git push origin HEAD:" + q(targetBranch) + " --force-with-lease", { + max_output_tokens: 20000 + }); + if (!ok(push)) { + finish("failed", "could not push rebased fork branch", { pushOutput: push.output }); + } +} + +if (enabled("publish", false)) { + const tagCommand = "git tag -a " + q("rust-v" + version) + " -m " + q("Release " + version); + const tag = await run("create release tag", tagCommand, { max_output_tokens: 12000 }); + if (!ok(tag)) { + finish("failed", "could not create release tag", { tagOutput: tag.output }); + } + const pushTag = await run("push release tag", "git push origin " + q("rust-v" + version), { + max_output_tokens: 20000 + }); + if (!ok(pushTag)) { + finish("failed", "could not push release tag", { pushTagOutput: pushTag.output }); + } +} + +const finalStatus = await run("final codex status", "git status --short --branch", { max_output_tokens: 12000 }); +finish("changed", "Peezy Codex fork rebased onto upstream release and verified.", { + beforeSha: trim(beforeHead.output), + afterSha: trim(afterHead.output), + codexHead: trim(afterHead.output), + codexBinary, + codexVersion: trim(versionCheck.output), + finalStatus: finalStatus.output, + pushed: enabled("push", false), + published: enabled("publish", false) +}); diff --git a/flows/peezy-codex-fork/flow.toml b/flows/peezy-codex-fork/flow.toml new file mode 100644 index 0000000..7032236 --- /dev/null +++ b/flows/peezy-codex-fork/flow.toml @@ -0,0 +1,29 @@ +name = "peezy-codex-fork" +version = 1 +description = "Rebase the Peezy Codex fork patch stack onto a canonical openai/codex release." + +[config] +package_name = "@peezy.tech/codex" +codex_repo_env = "PEEZY_CODEX_REPO" +codex_repo = "/home/peezy/codex-fork-workspace/codex" +target_branch = "main" +upstream_remote = "upstream" +upstream_repo_url = "https://github.com/openai/codex.git" +cargo_target_dir_env = "PEEZY_CODEX_CARGO_TARGET_DIR" +cargo_target_dir = "/tmp/peezy-codex-flow-target" +squash_patch_stack = true +push = false +publish = false + +[guidance] +skills = ["jojo-development-flow"] + +[[steps]] +name = "rebase-patch-stack" +runner = "code-mode" +script = "exec/update-fork.code-mode.js" +timeout_ms = 3600000 + +[steps.trigger] +type = "upstream.release" +schema = "schemas/upstream-release.schema.json" diff --git a/flows/peezy-codex-fork/schemas/upstream-release.schema.json b/flows/peezy-codex-fork/schemas/upstream-release.schema.json new file mode 100644 index 0000000..da9eea5 --- /dev/null +++ b/flows/peezy-codex-fork/schemas/upstream-release.schema.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "required": ["repo", "tag"], + "properties": { + "provider": { "type": "string" }, + "repo": { "type": "string", "enum": ["openai/codex"] }, + "tag": { "type": "string" }, + "url": { "type": "string" }, + "publishedAt": { "type": "string" } + } +} diff --git a/package.json b/package.json index dfad556..9468dcb 100644 --- a/package.json +++ b/package.json @@ -40,8 +40,10 @@ "check:types": "bun run --workspaces check:types", "dev": "bun run --filter web dev", "dev:web": "bun run --filter web dev", + "flow": "bun apps/flow-runner/src/index.ts", + "flow:backend": "bun apps/flow-backend-systemd-local/src/index.ts", "start": "bun run --filter web preview", "start:discord:debug:commentary": "bun run --filter codex-discord-bridge start:debug:commentary", - "test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test" + "test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter @peezy.tech/flow-runtime test && bun run --filter codex-flow-systemd-local test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test" } } diff --git a/packages/flow-runtime/package.json b/packages/flow-runtime/package.json new file mode 100644 index 0000000..6dc2a3a --- /dev/null +++ b/packages/flow-runtime/package.json @@ -0,0 +1,24 @@ +{ + "name": "@peezy.tech/flow-runtime", + "version": "0.1.0", + "description": "Generic flow package loader and runner primitives.", + "type": "module", + "private": true, + "license": "Apache-2.0", + "exports": { + ".": "./src/index.ts" + }, + "scripts": { + "build": "tsc --noEmit", + "check:types": "tsc --noEmit", + "test": "bun test test/*.test.ts" + }, + "dependencies": { + "@peezy.tech/codex-flows": "workspace:*" + }, + "devDependencies": { + "@types/bun": "catalog:", + "@types/node": "catalog:", + "typescript": "catalog:" + } +} diff --git a/packages/flow-runtime/src/index.ts b/packages/flow-runtime/src/index.ts new file mode 100644 index 0000000..6aa4271 --- /dev/null +++ b/packages/flow-runtime/src/index.ts @@ -0,0 +1,18 @@ +export { discoverFlows, loadFlow, stepSchemaPath, stepScriptPath } from "./manifest.ts"; +export { parseFlowResult, stringifyFlowResult } from "./result.ts"; +export { runFlowStep } from "./run.ts"; +export { runBunStep } from "./runners/bun.ts"; +export { runCodeModeStep } from "./runners/code-mode.ts"; +export { readJsonSchema, validateJsonSchema } from "./schema.ts"; +export { matchingSteps, stepMatchesEvent } from "./triggers.ts"; +export type { + FlowEvent, + FlowManifest, + FlowResult, + FlowResultStatus, + FlowRunContext, + FlowStep, + FlowStepRunner, + FlowStepTrigger, + LoadedFlow, +} from "./types.ts"; diff --git a/packages/flow-runtime/src/manifest.ts b/packages/flow-runtime/src/manifest.ts new file mode 100644 index 0000000..65f1abd --- /dev/null +++ b/packages/flow-runtime/src/manifest.ts @@ -0,0 +1,137 @@ +import { readdir } from "node:fs/promises"; +import path from "node:path"; +import type { FlowManifest, FlowStep, LoadedFlow } from "./types.ts"; + +export type DiscoverFlowsOptions = { + cwd: string; + roots?: string[]; +}; + +export async function loadFlow(root: string): Promise { + const manifestPath = path.join(root, "flow.toml"); + const parsed = Bun.TOML.parse(await Bun.file(manifestPath).text()) as unknown; + const manifest = normalizeManifest(parsed, manifestPath); + return { root, manifestPath, manifest }; +} + +export async function discoverFlows(options: DiscoverFlowsOptions): Promise { + const roots = options.roots ?? [ + path.join(options.cwd, ".codex", "flows"), + path.join(options.cwd, "flows"), + ]; + const flows: LoadedFlow[] = []; + const seen = new Set(); + for (const root of roots) { + for (const directory of await childDirectories(root)) { + const manifestPath = path.join(directory, "flow.toml"); + if (!(await Bun.file(manifestPath).exists())) { + continue; + } + const flow = await loadFlow(directory); + if (seen.has(flow.manifest.name)) { + continue; + } + seen.add(flow.manifest.name); + flows.push(flow); + } + } + return flows; +} + +export function stepScriptPath(flow: LoadedFlow, step: FlowStep): string { + return path.resolve(flow.root, step.script); +} + +export function stepSchemaPath(flow: LoadedFlow, step: FlowStep): string | undefined { + return step.trigger?.schema ? path.resolve(flow.root, step.trigger.schema) : undefined; +} + +async function childDirectories(root: string): Promise { + try { + const entries = await readdir(root, { withFileTypes: true }); + return entries + .filter((entry) => entry.isDirectory()) + .map((entry) => path.join(root, entry.name)) + .sort(); + } catch (error) { + if (isErrno(error, "ENOENT")) { + return []; + } + throw error; + } +} + +function normalizeManifest(value: unknown, manifestPath: string): FlowManifest { + if (!isRecord(value)) { + throw new Error(`flow.toml must contain a table: ${manifestPath}`); + } + const name = requiredString(value.name, "name", manifestPath); + const version = requiredNumber(value.version, "version", manifestPath); + const rawSteps = Array.isArray(value.steps) ? value.steps : undefined; + if (!rawSteps || rawSteps.length === 0) { + throw new Error(`flow.toml requires at least one [[steps]] entry: ${manifestPath}`); + } + return { + name, + version, + ...(typeof value.description === "string" ? { description: value.description } : {}), + ...(isRecord(value.config) ? { config: value.config } : {}), + ...(isRecord(value.guidance) ? { guidance: normalizeGuidance(value.guidance) } : {}), + steps: rawSteps.map((step, index) => normalizeStep(step, index, manifestPath)), + }; +} + +function normalizeGuidance(value: Record): FlowManifest["guidance"] { + return { + ...(Array.isArray(value.skills) + ? { skills: value.skills.filter((entry): entry is string => typeof entry === "string") } + : {}), + }; +} + +function normalizeStep(value: unknown, index: number, manifestPath: string): FlowStep { + if (!isRecord(value)) { + throw new Error(`steps[${index}] must be a table: ${manifestPath}`); + } + const runner = requiredString(value.runner, `steps[${index}].runner`, manifestPath); + if (runner !== "bun" && runner !== "code-mode") { + throw new Error(`steps[${index}].runner must be bun or code-mode: ${manifestPath}`); + } + return { + name: requiredString(value.name, `steps[${index}].name`, manifestPath), + runner, + script: requiredString(value.script, `steps[${index}].script`, manifestPath), + timeoutMs: typeof value.timeout_ms === "number" ? value.timeout_ms : 300_000, + ...(typeof value.cwd === "string" ? { cwd: value.cwd } : {}), + ...(isRecord(value.trigger) ? { trigger: normalizeTrigger(value.trigger, index, manifestPath) } : {}), + }; +} + +function normalizeTrigger(value: Record, index: number, manifestPath: string): FlowStep["trigger"] { + return { + type: requiredString(value.type, `steps[${index}].trigger.type`, manifestPath), + ...(typeof value.schema === "string" ? { schema: value.schema } : {}), + }; +} + +function requiredString(value: unknown, name: string, pathValue: string): string { + if (typeof value !== "string" || !value.trim()) { + throw new Error(`flow.toml requires ${name}: ${pathValue}`); + } + return value; +} + +function requiredNumber(value: unknown, name: string, pathValue: string): number { + if (typeof value !== "number" || !Number.isFinite(value)) { + throw new Error(`flow.toml requires numeric ${name}: ${pathValue}`); + } + return value; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function isErrno(error: unknown, code: string): boolean { + return isRecord(error) && error.code === code; +} diff --git a/packages/flow-runtime/src/result.ts b/packages/flow-runtime/src/result.ts new file mode 100644 index 0000000..5c2db6f --- /dev/null +++ b/packages/flow-runtime/src/result.ts @@ -0,0 +1,37 @@ +import type { FlowResult, FlowResultStatus } from "./types.ts"; + +const validStatuses = new Set([ + "skipped", + "completed", + "changed", + "needs_intervention", + "blocked", + "failed", +]); + +export function parseFlowResult(stdout: string): FlowResult { + for (const line of stdout.split(/\r?\n/).reverse()) { + const index = line.indexOf("FLOW_RESULT "); + if (index === -1) { + continue; + } + const text = line.slice(index + "FLOW_RESULT ".length).trim(); + const parsed = JSON.parse(text) as unknown; + if (!isRecord(parsed)) { + throw new Error("FLOW_RESULT must be a JSON object"); + } + if (typeof parsed.status !== "string" || !validStatuses.has(parsed.status as FlowResultStatus)) { + throw new Error("FLOW_RESULT status is invalid"); + } + return parsed as FlowResult; + } + throw new Error("Step did not emit FLOW_RESULT"); +} + +export function stringifyFlowResult(value: FlowResult): string { + return `FLOW_RESULT ${JSON.stringify(value)}\n`; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/packages/flow-runtime/src/run.ts b/packages/flow-runtime/src/run.ts new file mode 100644 index 0000000..f8517d1 --- /dev/null +++ b/packages/flow-runtime/src/run.ts @@ -0,0 +1,33 @@ +import { runBunStep } from "./runners/bun.ts"; +import { runCodeModeStep, type RunCodeModeStepOptions } from "./runners/code-mode.ts"; +import type { FlowEvent, FlowResult, FlowStep, LoadedFlow } from "./types.ts"; + +export type RunFlowStepOptions = { + flow: LoadedFlow; + step: FlowStep; + event: FlowEvent; + env?: Record; + codeMode?: Pick; +}; + +export async function runFlowStep(options: RunFlowStepOptions): Promise { + if (options.step.runner === "bun") { + return runBunStep(options); + } + if (!codeModeEnabled(options.env ?? process.env)) { + throw new Error( + `Code Mode flow step ${options.flow.manifest.name}/${options.step.name} requires CODEX_FLOWS_ENABLE_CODE_MODE=1`, + ); + } + return runCodeModeStep({ + flow: options.flow, + step: options.step, + event: options.event, + ...options.codeMode, + }); +} + +export function codeModeEnabled(env: Record): boolean { + const value = env.CODEX_FLOWS_ENABLE_CODE_MODE?.trim().toLowerCase(); + return value === "1" || value === "true" || value === "yes" || value === "on"; +} diff --git a/packages/flow-runtime/src/runners/bun.ts b/packages/flow-runtime/src/runners/bun.ts new file mode 100644 index 0000000..5bfc80f --- /dev/null +++ b/packages/flow-runtime/src/runners/bun.ts @@ -0,0 +1,54 @@ +import path from "node:path"; +import { stepScriptPath } from "../manifest.ts"; +import { parseFlowResult } from "../result.ts"; +import type { FlowEvent, FlowResult, FlowRunContext, FlowStep, LoadedFlow } from "../types.ts"; + +export type RunBunStepOptions = { + flow: LoadedFlow; + step: FlowStep; + event: FlowEvent; + env?: Record; +}; + +export async function runBunStep(options: RunBunStepOptions): Promise { + const scriptPath = stepScriptPath(options.flow, options.step); + const cwd = options.step.cwd + ? path.resolve(options.flow.root, options.step.cwd) + : options.flow.root; + const subprocess = Bun.spawn({ + cmd: [process.execPath, scriptPath], + cwd, + env: { + ...process.env, + ...options.env, + }, + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }); + subprocess.stdin.write(`${JSON.stringify(runContext(options), null, 2)}\n`); + subprocess.stdin.end(); + const timer = setTimeout(() => subprocess.kill("SIGTERM"), options.step.timeoutMs); + const [stdout, stderr, exitCode] = await Promise.all([ + subprocess.stdout.text(), + subprocess.stderr.text(), + subprocess.exited, + ]).finally(() => clearTimeout(timer)); + if (exitCode !== 0) { + throw new Error(`Bun flow step ${options.flow.manifest.name}/${options.step.name} failed:\n${stderr || stdout}`); + } + return parseFlowResult(stdout); +} + +function runContext(options: RunBunStepOptions): FlowRunContext { + return { + flow: { + name: options.flow.manifest.name, + version: options.flow.manifest.version, + root: options.flow.root, + step: options.step.name, + ...(options.flow.manifest.config ? { config: options.flow.manifest.config } : {}), + event: options.event, + }, + }; +} diff --git a/packages/flow-runtime/src/runners/code-mode.ts b/packages/flow-runtime/src/runners/code-mode.ts new file mode 100644 index 0000000..1b48359 --- /dev/null +++ b/packages/flow-runtime/src/runners/code-mode.ts @@ -0,0 +1,176 @@ +import path from "node:path"; +import { CodexAppServerClient } from "@peezy.tech/codex-flows"; +import { stepScriptPath } from "../manifest.ts"; +import { parseFlowResult } from "../result.ts"; +import type { FlowEvent, FlowResult, FlowStep, LoadedFlow } from "../types.ts"; + +export type RunCodeModeStepOptions = { + flow: LoadedFlow; + step: FlowStep; + event: FlowEvent; + codexCommand?: string; + codexHome?: string; + stream?: boolean; +}; + +export async function runCodeModeStep(options: RunCodeModeStepOptions): Promise { + const source = await codeModeSource(options); + const client = new CodexAppServerClient({ + transportOptions: { + codexCommand: options.codexCommand, + args: appServerArgs(), + env: options.codexHome ? { CODEX_HOME: path.resolve(options.codexHome) } : undefined, + requestTimeoutMs: options.step.timeoutMs, + }, + clientName: "codex-flow-runner", + clientTitle: "Codex Flow Runner", + clientVersion: "0.1.0", + }); + const output: string[] = []; + let threadId = ""; + let resolveTurnCompleted: (value: unknown) => void = () => undefined; + const turnCompleted = new Promise((resolve) => { + resolveTurnCompleted = resolve; + }); + + client.on("request", (message) => { + client.respondError(message.id, -32603, "flow runner does not handle server requests"); + }); + client.on("notification", (message) => { + if (message.method === "item/commandExecution/outputDelta" || message.method === "item/agentMessage/delta") { + const delta = stringField(message.params, "delta"); + if (delta) { + output.push(delta); + if (options.stream) { + process.stdout.write(delta); + } + } + } + if ( + message.method === "turn/completed" && + (!threadId || stringField(message.params, "threadId") === threadId) + ) { + resolveTurnCompleted(message.params); + } + }); + + try { + await client.connect(); + const started = await client.startThread({ + cwd: options.step.cwd ? path.resolve(options.flow.root, options.step.cwd) : options.flow.root, + approvalPolicy: "never", + sandbox: "danger-full-access", + ephemeral: false, + experimentalRawEvents: false, + persistExtendedHistory: true, + }); + threadId = started.thread.id; + await client.request("thread/codeMode/execute", { + threadId, + source, + }); + await withTimeout( + turnCompleted, + options.step.timeoutMs, + `timed out waiting for Code Mode flow step ${options.flow.manifest.name}/${options.step.name}`, + ); + const read = await client.request("thread/read", { + threadId, + includeTurns: true, + }); + return parseFlowResult(allAgentMessageText(read).join("\n") || output.join("")); + } finally { + client.close(); + } +} + +async function codeModeSource(options: RunCodeModeStepOptions): Promise { + const body = await Bun.file(stepScriptPath(options.flow, options.step)).text(); + const flow = { + name: options.flow.manifest.name, + version: options.flow.manifest.version, + root: options.flow.root, + step: options.step.name, + ...(options.flow.manifest.config ? { config: options.flow.manifest.config } : {}), + event: options.event, + }; + return [ + `const flow = ${JSON.stringify(flow, null, 2)};`, + "function result(value) {", + " text('\\nFLOW_RESULT ' + JSON.stringify(value) + '\\n');", + " exit();", + "}", + body, + ].join("\n"); +} + +function appServerArgs(): string[] { + return [ + "app-server", + "--listen", + "stdio://", + "--enable", + "apps", + "--enable", + "hooks", + "--enable", + "code_mode", + "--enable", + "code_mode_only", + ]; +} + +async function withTimeout(promise: Promise, timeoutMs: number, message: string): Promise { + let timer: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(message)), timeoutMs); + }), + ]); + } finally { + if (timer) { + clearTimeout(timer); + } + } +} + +function allAgentMessageText(value: unknown): string[] { + const thread = recordField(value, "thread"); + const turns = Array.isArray(thread?.turns) ? thread.turns : []; + const texts: string[] = []; + for (const turn of turns) { + const turnRecord = isRecord(turn) ? turn : undefined; + const items = Array.isArray(turnRecord?.items) ? turnRecord.items : []; + for (const item of items) { + if (!isRecord(item) || stringField(item, "type") !== "agentMessage") { + continue; + } + const text = stringField(item, "text"); + if (text !== undefined) { + texts.push(text); + } + } + } + return texts; +} + +function recordField(value: unknown, field: string): Record | undefined { + if (!isRecord(value)) { + return undefined; + } + return isRecord(value[field]) ? value[field] : undefined; +} + +function stringField(value: unknown, field: string): string | undefined { + if (!isRecord(value)) { + return undefined; + } + const fieldValue = value[field]; + return typeof fieldValue === "string" ? fieldValue : undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/packages/flow-runtime/src/schema.ts b/packages/flow-runtime/src/schema.ts new file mode 100644 index 0000000..68a3cef --- /dev/null +++ b/packages/flow-runtime/src/schema.ts @@ -0,0 +1,81 @@ +type JsonSchema = { + type?: string | string[]; + required?: string[]; + properties?: Record; + enum?: unknown[]; +}; + +export type SchemaValidationResult = + | { ok: true } + | { ok: false; errors: string[] }; + +export async function readJsonSchema(path: string): Promise { + const parsed = JSON.parse(await Bun.file(path).text()) as unknown; + if (!isRecord(parsed)) { + throw new Error(`Schema must be a JSON object: ${path}`); + } + return parsed as JsonSchema; +} + +export function validateJsonSchema(value: unknown, schema: JsonSchema): SchemaValidationResult { + const errors: string[] = []; + validateValue(value, schema, "$", errors); + return errors.length === 0 ? { ok: true } : { ok: false, errors }; +} + +function validateValue( + value: unknown, + schema: JsonSchema, + path: string, + errors: string[], +): void { + if (schema.enum && !schema.enum.some((entry) => Object.is(entry, value))) { + errors.push(`${path} must be one of ${schema.enum.map(String).join(", ")}`); + return; + } + + if (schema.type && !typeMatches(value, schema.type)) { + errors.push(`${path} must be ${Array.isArray(schema.type) ? schema.type.join(" or ") : schema.type}`); + return; + } + + if (schema.type === "object" || (schema.properties && isRecord(value))) { + if (!isRecord(value)) { + errors.push(`${path} must be object`); + return; + } + for (const key of schema.required ?? []) { + if (!(key in value)) { + errors.push(`${path}.${key} is required`); + } + } + for (const [key, childSchema] of Object.entries(schema.properties ?? {})) { + if (key in value) { + validateValue(value[key], childSchema, `${path}.${key}`, errors); + } + } + } +} + +function typeMatches(value: unknown, type: string | string[]): boolean { + const types = Array.isArray(type) ? type : [type]; + return types.some((entry) => { + if (entry === "array") { + return Array.isArray(value); + } + if (entry === "null") { + return value === null; + } + if (entry === "integer") { + return Number.isInteger(value); + } + if (entry === "object") { + return isRecord(value); + } + return typeof value === entry; + }); +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/packages/flow-runtime/src/triggers.ts b/packages/flow-runtime/src/triggers.ts new file mode 100644 index 0000000..99dcab7 --- /dev/null +++ b/packages/flow-runtime/src/triggers.ts @@ -0,0 +1,41 @@ +import { stepSchemaPath } from "./manifest.ts"; +import { readJsonSchema, validateJsonSchema } from "./schema.ts"; +import type { FlowEvent, FlowStep, LoadedFlow } from "./types.ts"; + +export type TriggerMatch = + | { ok: true } + | { ok: false; reason: string }; + +export async function stepMatchesEvent( + flow: LoadedFlow, + step: FlowStep, + event: FlowEvent, +): Promise { + if (!step.trigger) { + return { ok: false, reason: "step has no trigger" }; + } + if (step.trigger.type !== event.type) { + return { ok: false, reason: `event type ${event.type} does not match ${step.trigger.type}` }; + } + const schemaPath = stepSchemaPath(flow, step); + if (!schemaPath) { + return { ok: true }; + } + const result = validateJsonSchema(event.payload, await readJsonSchema(schemaPath)); + return result.ok ? { ok: true } : { ok: false, reason: result.errors.join("; ") }; +} + +export async function matchingSteps( + flows: LoadedFlow[], + event: FlowEvent, +): Promise> { + const matches: Array<{ flow: LoadedFlow; step: FlowStep }> = []; + for (const flow of flows) { + for (const step of flow.manifest.steps) { + if ((await stepMatchesEvent(flow, step, event)).ok) { + matches.push({ flow, step }); + } + } + } + return matches; +} diff --git a/packages/flow-runtime/src/types.ts b/packages/flow-runtime/src/types.ts new file mode 100644 index 0000000..734b9c6 --- /dev/null +++ b/packages/flow-runtime/src/types.ts @@ -0,0 +1,68 @@ +export type FlowEvent = { + id: string; + type: string; + source?: string; + occurredAt?: string; + receivedAt: string; + payload: TPayload; +}; + +export type FlowResultStatus = + | "skipped" + | "completed" + | "changed" + | "needs_intervention" + | "blocked" + | "failed"; + +export type FlowResult = { + status: FlowResultStatus; + message?: string; + artifacts?: Record; + next?: Array>>; + [key: string]: unknown; +}; + +export type FlowStepRunner = "bun" | "code-mode"; + +export type FlowStepTrigger = { + type: string; + schema?: string; +}; + +export type FlowStep = { + name: string; + runner: FlowStepRunner; + script: string; + timeoutMs: number; + cwd?: string; + trigger?: FlowStepTrigger; +}; + +export type FlowManifest = { + name: string; + version: number; + description?: string; + config?: Record; + guidance?: { + skills?: string[]; + }; + steps: FlowStep[]; +}; + +export type LoadedFlow = { + root: string; + manifestPath: string; + manifest: FlowManifest; +}; + +export type FlowRunContext = { + flow: { + name: string; + version: number; + root: string; + step: string; + config?: Record; + event: FlowEvent; + }; +}; diff --git a/packages/flow-runtime/test/flow-runtime.test.ts b/packages/flow-runtime/test/flow-runtime.test.ts new file mode 100644 index 0000000..43ea6e2 --- /dev/null +++ b/packages/flow-runtime/test/flow-runtime.test.ts @@ -0,0 +1,209 @@ +import { expect, test } from "bun:test"; +import { mkdtemp, rm, mkdir } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { + discoverFlows, + matchingSteps, + runBunStep, + runFlowStep, + validateJsonSchema, +} from "../src/index.ts"; +import type { FlowEvent } from "../src/index.ts"; + +test("discovers installed flows before source flows", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-")); + try { + await writeFlow(directory, ".codex/flows/demo", "installed"); + await writeFlow(directory, "flows/demo", "source"); + + const flows = await discoverFlows({ cwd: directory }); + + expect(flows.map((flow) => flow.manifest.name)).toEqual(["demo"]); + expect(flows[0]?.manifest.description).toBe("installed"); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("matches flow steps by event type and payload schema", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-")); + try { + await writeFlow(directory, "flows/demo", "source"); + const flows = await discoverFlows({ cwd: directory }); + const event: FlowEvent = { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { name: "Ada" }, + }; + + expect((await matchingSteps(flows, event)).map(({ step }) => step.name)).toEqual([ + "hello", + ]); + expect(await matchingSteps(flows, { ...event, payload: {} })).toEqual([]); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("bundled Codex release flows match one generic upstream release event", async () => { + const root = path.resolve(import.meta.dir, "..", "..", ".."); + const flows = await discoverFlows({ cwd: root }); + const event: FlowEvent = { + id: "event-1", + type: "upstream.release", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { repo: "openai/codex", tag: "rust-v1.2.3" }, + }; + + const matches = await matchingSteps(flows, event); + + expect(matches.map(({ flow, step }) => `${flow.manifest.name}/${step.name}`)).toEqual([ + "openai-codex-bindings/regenerate-bindings", + "peezy-codex-fork/rebase-patch-stack", + ]); +}); + +test("bundled Code Mode flow remains gated by the feature flag", async () => { + const root = path.resolve(import.meta.dir, "..", "..", ".."); + const flows = await discoverFlows({ cwd: root }); + const flow = flows.find((entry) => entry.manifest.name === "peezy-codex-fork"); + const step = flow?.manifest.steps.find((entry) => entry.name === "rebase-patch-stack"); + if (!flow || !step) { + throw new Error("expected bundled peezy-codex-fork flow"); + } + + await expect( + runFlowStep({ + flow, + step, + event: { + id: "event-1", + type: "upstream.release", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { repo: "openai/codex", tag: "rust-v1.2.3" }, + }, + env: {}, + }), + ).rejects.toThrow("requires CODEX_FLOWS_ENABLE_CODE_MODE=1"); +}); + +test("validates simple JSON schema constraints", () => { + const schema = { + type: "object", + required: ["name"], + properties: { + name: { type: "string" }, + kind: { enum: ["demo"] }, + }, + }; + + expect(validateJsonSchema({ name: "Ada", kind: "demo" }, schema)).toEqual({ ok: true }); + expect(validateJsonSchema({ kind: "other" }, schema)).toEqual({ + ok: false, + errors: ["$.name is required", "$.kind must be one of demo"], + }); +}); + +test("runs Bun flow steps and parses FLOW_RESULT", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-")); + try { + await writeFlow(directory, "flows/demo", "source"); + const [flow] = await discoverFlows({ cwd: directory }); + const step = flow?.manifest.steps[0]; + if (!flow || !step) { + throw new Error("expected fixture flow"); + } + + const result = await runBunStep({ + flow, + step, + event: { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { name: "Ada" }, + }, + }); + + expect(result).toEqual({ + status: "completed", + message: "hello Ada", + }); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("requires a feature flag before running Code Mode flow steps", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runtime-")); + try { + await writeFlow(directory, "flows/demo", "source"); + const [flow] = await discoverFlows({ cwd: directory }); + const step = flow?.manifest.steps[0]; + if (!flow || !step) { + throw new Error("expected fixture flow"); + } + + await expect( + runFlowStep({ + flow, + step: { ...step, runner: "code-mode" }, + event: { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-13T00:00:00.000Z", + payload: { name: "Ada" }, + }, + env: {}, + }), + ).rejects.toThrow("requires CODEX_FLOWS_ENABLE_CODE_MODE=1"); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +async function writeFlow(root: string, relative: string, description: string): Promise { + const flowRoot = path.join(root, relative); + await mkdir(path.join(flowRoot, "exec"), { recursive: true }); + await mkdir(path.join(flowRoot, "schemas"), { recursive: true }); + await Bun.write( + path.join(flowRoot, "flow.toml"), + [ + 'name = "demo"', + "version = 1", + `description = "${description}"`, + "", + "[[steps]]", + 'name = "hello"', + 'runner = "bun"', + 'script = "exec/hello.ts"', + "timeout_ms = 30000", + "", + "[steps.trigger]", + 'type = "demo.event"', + 'schema = "schemas/demo-event.schema.json"', + "", + ].join("\n"), + ); + await Bun.write( + path.join(flowRoot, "schemas/demo-event.schema.json"), + JSON.stringify({ + type: "object", + required: ["name"], + properties: { + name: { type: "string" }, + }, + }), + ); + await Bun.write( + path.join(flowRoot, "exec/hello.ts"), + [ + "const context = JSON.parse(await Bun.stdin.text());", + "const name = context.flow.event.payload.name;", + "console.log(`FLOW_RESULT ${JSON.stringify({ status: 'completed', message: `hello ${name}` })}`);", + "", + ].join("\n"), + ); +} diff --git a/packages/flow-runtime/tsconfig.json b/packages/flow-runtime/tsconfig.json new file mode 100644 index 0000000..37f0b51 --- /dev/null +++ b/packages/flow-runtime/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "module": "ESNext", + "moduleResolution": "Bundler", + "allowImportingTsExtensions": true, + "esModuleInterop": true, + "resolveJsonModule": true, + "target": "ES2022", + "lib": ["ESNext"], + "strict": true, + "skipLibCheck": true, + "noUncheckedIndexedAccess": true, + "isolatedModules": true, + "verbatimModuleSyntax": true, + "forceConsistentCasingInFileNames": true, + "noEmit": true, + "types": ["node", "bun"], + "rootDir": "." + }, + "include": ["src/**/*.ts", "test/**/*.ts"] +}