Add reusable flow client runtime
Some checks failed
ci / check (push) Failing after 18s

This commit is contained in:
matamune 2026-05-15 20:38:09 +00:00
parent ae637e0b5d
commit e7b7b1b342
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
17 changed files with 1796 additions and 142 deletions

View file

@ -2,7 +2,7 @@
import path from "node:path";
import {
discoverFlows,
matchingSteps,
createLocalFlowClient,
runFlowStep,
type FlowEvent,
type LoadedFlow,
@ -35,11 +35,26 @@ async function main(): Promise<void> {
}
const event = await readEvent(cli.eventPath);
if (cli.kind === "fire") {
const matches = await matchingSteps(flows, event);
const results = [];
for (const match of matches) {
results.push(await runAndReport(match.flow, match.step, event));
}
const client = createLocalFlowClient({
cwd: cli.cwd,
env: process.env,
codex: {
command: process.env.CODEX_APP_SERVER_CODEX_COMMAND,
codexHome: process.env.CODEX_HOME,
stream: true,
},
});
const dispatch = await client.dispatchEvent(event);
const results = dispatch.runs.map((run) => {
if (!run.resultPayload && run.error) {
throw new Error(run.error);
}
return {
flow: run.flowName,
step: run.stepName,
result: run.resultPayload,
};
});
process.stdout.write(`${JSON.stringify({ eventId: event.id, results }, null, 2)}\n`);
return;
}

View file

@ -0,0 +1,95 @@
import { expect, test } from "bun:test";
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
test("fire preserves the existing event/results payload shape", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runner-"));
try {
await writeFlow(directory);
const eventPath = path.join(directory, "event.json");
await Bun.write(
eventPath,
JSON.stringify({
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: { name: "Ada" },
}),
);
const runner = path.resolve(import.meta.dir, "..", "src", "index.ts");
const process = Bun.spawn({
cmd: ["bun", runner, "--cwd", directory, "fire", "--event", eventPath],
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
new Response(process.stdout).text(),
new Response(process.stderr).text(),
process.exited,
]);
expect(stderr).toBe("");
expect(exitCode).toBe(0);
expect(JSON.parse(stdout)).toEqual({
eventId: "event-1",
results: [
{
flow: "demo",
step: "hello",
result: {
status: "completed",
message: "hello Ada",
},
},
],
});
} finally {
await rm(directory, { recursive: true, force: true });
}
});
async function writeFlow(root: string): Promise<void> {
const flowRoot = path.join(root, "flows/demo");
await mkdir(path.join(flowRoot, "exec"), { recursive: true });
await mkdir(path.join(flowRoot, "schemas"), { recursive: true });
await Bun.write(
path.join(flowRoot, "flow.toml"),
[
'name = "demo"',
"version = 1",
'description = "demo"',
"",
"[[steps]]",
'name = "hello"',
'runner = "bun"',
'script = "exec/hello.ts"',
"timeout_ms = 30000",
"",
"[steps.trigger]",
'type = "demo.event"',
'schema = "schemas/demo-event.schema.json"',
"",
].join("\n"),
);
await Bun.write(
path.join(flowRoot, "schemas/demo-event.schema.json"),
JSON.stringify({
type: "object",
required: ["name"],
properties: {
name: { type: "string" },
},
}),
);
await Bun.write(
path.join(flowRoot, "exec/hello.ts"),
[
"const context = JSON.parse(await Bun.stdin.text());",
"const name = context.flow.event.payload.name;",
"console.log(`FLOW_RESULT ${JSON.stringify({ status: 'completed', message: `hello ${name}` })}`);",
"",
].join("\n"),
);
}

View file

@ -89,7 +89,7 @@
},
"packages/codex-client": {
"name": "@peezy.tech/codex-flows",
"version": "0.3.0",
"version": "0.3.1",
"devDependencies": {
"@types/bun": "^1.3.13",
"@types/node": "^22.10.10",
@ -105,7 +105,7 @@
},
"packages/flow-backend-convex": {
"name": "@peezy.tech/flow-backend-convex",
"version": "0.3.0",
"version": "0.3.1",
"dependencies": {
"@peezy.tech/flow-runtime": "workspace:*",
"convex": "^1.38.0",
@ -118,9 +118,9 @@
},
"packages/flow-runtime": {
"name": "@peezy.tech/flow-runtime",
"version": "0.3.0",
"version": "0.3.1",
"dependencies": {
"@peezy.tech/codex-flows": "^0.3.0",
"@peezy.tech/codex-flows": "^0.3.1",
},
"devDependencies": {
"@types/bun": "catalog:",

View file

@ -0,0 +1,399 @@
# Flow Client Design - 2026-05-15
This note designs the next reusable client layer for Codex flows. It builds on
the current split between `@peezy.tech/flow-runtime`,
`@peezy.tech/flow-runtime/backend-client`, `codex-flow-runner`, and
`codex-flow-systemd-local`.
## Intent
Consumers such as `patch.moi` should be able to use Codex flows as a product
capability without caring whether execution is backendless in the current
workspace or delegated to a flow backend.
The event-shaped contract remains the stable ABI. The client hides ceremony,
not semantics:
- flow packages still receive `FlowEvent` through the existing runner context
- steps still emit `FLOW_RESULT`
- backend dispatch still uses `event.id` for idempotency
- app-owned domain completion stays outside generic flow clients and backends
The product-facing experience can be direct:
```bash
patch upstream release openai/codex rust-v1.2.3
```
Patch can translate that into:
```ts
await flows.dispatchEvent({
id: "patch:upstream.release:openai/codex:rust-v1.2.3",
type: "upstream.release",
source: "patch",
receivedAt: new Date().toISOString(),
payload: {
repo: "openai/codex",
tag: "rust-v1.2.3",
},
});
```
## Existing Surfaces
`@peezy.tech/flow-runtime` already provides the local building blocks:
- `discoverFlows({ cwd, roots })`
- `matchingSteps(flows, event)`
- `runFlowStep({ flow, step, event, env, codeMode })`
- Bun and gated Code Mode runners
- JSON Schema trigger validation
`codex-flow-runner` proves backendless execution works, but only as a CLI over
event JSON files. It does not expose a reusable client object, normalized
run/event views, local idempotency, or replay/list APIs.
`codex-flow-systemd-local` provides durable backend behavior:
- `POST /events`
- `GET /events`
- `GET /runs`
- `POST /events/:id/replay`
- process status in backend records
- semantic result status in `FLOW_RESULT`
- idempotency by `event.id`
`@peezy.tech/flow-runtime/backend-client` normalizes HTTP backend responses and
already covers backend-native list/get/dispatch/replay/cancel operations.
`patch.moi` currently builds `FlowEvent` objects from feed signals and POSTs
them to a configured backend URL. The same product should also support a local
CLI mode where flows run in the current repository without a daemon.
## Proposed Packages
Add two subpath exports to `@peezy.tech/flow-runtime`:
- `@peezy.tech/flow-runtime/local-client`
- `@peezy.tech/flow-runtime/client`
Keep the existing:
- `@peezy.tech/flow-runtime/backend-client`
`local-client` owns backendless execution. `backend-client` owns remote backend
HTTP access. `client` is a tiny factory and shared type surface over both.
No new package named `sdk` should be introduced.
## Client API
The common client should expose flow-native operations:
```ts
export type FlowClient = {
listRuns(options?: FlowListRunsOptions): Promise<FlowRunList>;
getRun(runId: string): Promise<FlowRunView>;
listEvents(options?: FlowListEventsOptions): Promise<FlowEventList>;
getEvent(eventId: string): Promise<FlowEventView>;
dispatchEvent(event: FlowEvent, options?: FlowDispatchOptions): Promise<FlowDispatchResult>;
replayEvent(eventId: string, options?: FlowReplayOptions): Promise<FlowDispatchResult>;
cancelRun(runId: string): Promise<FlowCancelResult>;
};
```
The view models should be neutral aliases or successors of the current backend
views:
- `FlowRunView`
- `FlowEventView`
- `FlowDispatchResult`
- `FlowRunList`
- `FlowEventList`
They should preserve the fields proven by `backend-client`:
- process status
- semantic `FLOW_RESULT` status
- `effectiveStatus`
- `needsAttention`
- attempts
- output
- latest output
- result payload
- raw backend/local record
The term `backend` can remain as a field value, but common client names should
not force local execution to pretend it is a backend.
## Factory
The factory should be discriminated by mode:
```ts
const flows = createFlowClient({
mode: "local",
cwd: process.cwd(),
});
```
```ts
const flows = createFlowClient({
mode: "http",
baseUrl: process.env.PATCH_FLOW_BACKEND_URL,
hmacSecret: process.env.PATCH_FLOW_DISPATCH_SECRET,
});
```
`mode: "http"` should wrap `createFlowBackendHttpClient()`.
`mode: "local"` should wrap `createLocalFlowClient()`.
## Local Client
Local mode should execute matching flow steps directly in the selected
workspace:
```ts
export type LocalFlowClientOptions = {
cwd: string;
roots?: string[];
env?: Record<string, string | undefined>;
state?: false | "memory" | {
kind: "file";
dataDir?: string;
};
codex?: LocalFlowCodexOptions;
};
```
Discovery follows the runtime default unless roots are provided:
- `.codex/flows/*`
- `flows/*`
The first implementation should support `state: "memory"` by default. That is
enough for direct CLI runs and tests. A file-backed state mode can be added next
under `.codex/flow-runs` or `.codex/flow-client` without introducing a daemon.
Local dispatch behavior:
- normalize the incoming `FlowEvent`
- discover flows
- match trigger type and schema
- create one run view per matching step
- execute steps using `runFlowStep`
- return a normalized dispatch result
- mark semantic `blocked` and `needs_intervention` as attention states
For local mode, `dispatchEvent` should run synchronously by default. A future
`wait: false` local option can queue to file state, but it should not fake async
behavior before there is a worker loop.
## Idempotency And Replay
The client must not silently generate random ids for normal dispatch. A product
can hide event construction, but it must still provide a deterministic event id
or idempotency key when duplicate suppression matters.
Recommended product helper pattern:
```ts
const event = patchUpstreamReleaseEvent({
repo: "openai/codex",
tag: "rust-v1.2.3",
});
await flows.dispatchEvent(event);
```
HTTP mode inherits backend idempotency and replay semantics:
- duplicate `event.id` dispatch returns the backend's duplicate/idempotent
response
- `replayEvent(eventId)` creates a new backend attempt
Local memory mode can dedupe duplicate event ids only for the lifetime of the
client process. File-backed local state should make idempotency durable and make
`replayEvent(eventId)` create a new local attempt.
If local state is disabled, `listEvents`, `getEvent`, `listRuns`, `getRun`, and
`replayEvent` should fail with a clear unsupported-state error rather than
returning misleading empty data.
## Code Mode Configuration
Flow orchestration location and Codex execution location are separate axes.
Local flow execution can still use a local or remote Codex executor for
`runner = "code-mode"` steps.
Initial local Code Mode support should preserve current behavior:
```ts
codex: {
mode: "stdio",
command: process.env.CODEX_APP_SERVER_CODEX_COMMAND,
codexHome: process.env.CODEX_HOME,
stream: true,
}
```
The design should leave room for:
```ts
codex: {
mode: "remote",
url: "https://codex-worker.internal",
headers: { authorization: "Bearer ..." },
}
```
Remote Code Mode should not be added until `runCodeModeStep` has a real remote
app-server transport. Do not simulate remote Codex by shelling out through
unrelated HTTP APIs.
Code Mode remains gated by `CODEX_FLOWS_MODE=code-mode` or
`CODEX_FLOWS_ENABLE_CODE_MODE=1`.
## Workspace Configuration
The existing `.codex/workspace.toml` is already used for Discord gateway
surfaces. Flow client configuration can use the same file, but should stay in a
separate table:
```toml
[flows]
mode = "local"
roots = ["flows"]
state = "memory"
[flows.codex]
mode = "stdio"
stream = true
```
Patch can resolve configuration in this order:
1. CLI flags
2. environment variables
3. `.codex/workspace.toml`
4. defaults
The Discord-specific `[[discord.gateway.surfaces]]` table must remain unrelated
to flow execution.
## Patch Integration
Patch should be able to use one abstraction in both roles:
- CLI utility run from a fork checkout
- feed watching service that dispatches upstream activity
Suggested Patch adapter:
```ts
const flows = createFlowClientFromPatchConfig({
cwd,
env: process.env,
});
```
Resolution:
- if a backend URL is configured, use `mode: "http"`
- otherwise use `mode: "local"` in the current workspace
The existing Patch server can replace its hand-rolled fetch/HMAC dispatch logic
with the HTTP client mode. The future Patch CLI can use local mode by default.
Patch-specific commands should create domain events but not bypass the generic
flow ABI:
```bash
patch upstream release openai/codex rust-v1.2.3
patch flow fire upstream.release --repo openai/codex --tag rust-v1.2.3
patch flow runs
```
## Boundaries
This client is not an app-server SDK. It must not expose convenience wrappers
for app-server thread methods such as `setGoal`, `readThread`, `startTurn`, or
`renameThread`.
The client owns generic flow concerns:
- event dispatch
- flow discovery
- local step execution
- backend dispatch
- run/event normalization
- replay/cancel when supported by the selected mode
It does not own:
- Patch fork policy
- organization release rules
- pet-game asset registration
- payment state
- minting
- Discord gateway write tools
- arbitrary app-server command wrappers
Domain completion remains app-owned. A flow result can say work completed or
needs intervention; the consuming app decides what that means in its product.
## Implementation Plan
1. Extract neutral view model names from `backend-client` without breaking the
existing backend-client export.
2. Add `local-client` with in-memory state and synchronous dispatch.
3. Add `client` factory that returns local or HTTP clients.
4. Refactor `codex-flow-runner` to use the local client internally.
5. Add optional file-backed local state for durable idempotency, list/get, and
replay.
6. Update Patch to consume the common client instead of hand-rolled HTTP
dispatch.
7. Add `.codex/workspace.toml` flow configuration support only after the direct
constructor API is stable.
## Test Plan
Local client tests:
- discovers `.codex/flows/*` before `flows/*`
- dispatches one event to all matching steps
- returns normalized run/event/result views
- preserves `FLOW_RESULT.status` as semantic result status
- marks `blocked` and `needs_intervention` as `needsAttention`
- forwards Code Mode configuration to `runFlowStep`
- rejects replay/list/get when durable or memory state does not contain the
requested event/run
- dedupes duplicate event ids in memory mode
HTTP factory tests:
- delegates to `FlowBackendHttpClient`
- preserves HMAC/header/auth construction
- returns the same normalized payload shape as direct backend-client use
CLI and Patch-oriented tests:
- `codex-flow-runner fire` continues to produce the existing payload shape
- Patch can select local mode when no backend URL is configured
- Patch can select HTTP mode when a backend URL is configured
- Patch product helpers create deterministic event ids for fork-maintenance
commands
## Open Decisions
The first implementation can proceed with conservative defaults, but these
should be decided before exposing a stable CLI promise:
- whether local mode should default to in-memory state or file state
- exact default local state directory if file state is enabled
- whether local dispatch should ever support `wait: false`
- final remote Code Mode transport shape
- whether `cancelRun` in local synchronous mode should be unsupported or tied to
a future worker process

View file

@ -206,6 +206,34 @@ completion logic stays outside generic backends. For example, a pet-game worker
may upload generated assets, update payment state, and mint before completing a
generic flow run.
## Flow Client
Reusable product code can use `@peezy.tech/flow-runtime/client` when it should
dispatch, inspect, or replay generic flow events without hard-coding whether
execution is local or delegated to an HTTP backend.
```ts
import { createFlowClient } from "@peezy.tech/flow-runtime/client";
const flows = createFlowClient({
mode: "local",
cwd: process.cwd(),
});
```
`mode: "local"` wraps `@peezy.tech/flow-runtime/local-client` and runs matching
steps synchronously in the selected workspace with in-memory run/event state by
default. Set `state: { kind: "file" }` to persist local state under
`.codex/flow-client`. `mode: "http"` wraps the existing backend HTTP client and
inherits the backend's durable idempotency, replay, and cancellation semantics.
Both modes preserve the same `FlowEvent` and `FLOW_RESULT` contracts. The client
hides dispatch ceremony, not event semantics: products still provide stable
event ids when duplicate suppression matters, and product-specific completion
logic stays outside the generic client. See
`docs/2026-05-15-flow-client-design.md` for the design rationale, Patch
integration path, local state model, and remaining open decisions.
## Convex Backend Direction
Convex should be a durable orchestration backend, not the place where long

View file

@ -46,7 +46,7 @@
"replay:thread": "bun scripts/run-code-mode-in-new-thread.ts",
"start": "bun run --filter web preview",
"start:discord:debug:commentary": "bun run --filter codex-discord-bridge start:debug:commentary",
"test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter @peezy.tech/flow-runtime test && bun run --filter @peezy.tech/flow-backend-convex test && bun run --filter @peezy.tech/codex-opencode-go-router test && bun run --filter codex-flow-systemd-local test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test",
"test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter @peezy.tech/flow-runtime test && bun run --filter @peezy.tech/flow-backend-convex test && bun run --filter @peezy.tech/codex-opencode-go-router test && bun run --filter codex-flow-systemd-local test && bun run --filter codex-flow-runner test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test",
"release:check": "bun run --filter @peezy.tech/codex-flows release:check && bun run --filter @peezy.tech/flow-runtime release:check && bun run --filter @peezy.tech/flow-backend-convex release:check"
}
}

View file

@ -1,6 +1,6 @@
{
"name": "@peezy.tech/codex-flows",
"version": "0.3.0",
"version": "0.3.1",
"description": "Codex app-server JSON-RPC client, flow helpers, and generated protocol types.",
"type": "module",
"license": "Apache-2.0",

View file

@ -1,6 +1,6 @@
{
"name": "@peezy.tech/flow-backend-convex",
"version": "0.3.0",
"version": "0.3.1",
"description": "Reusable Convex component for durable codex-flow event, run, lease, and result state.",
"type": "module",
"license": "Apache-2.0",
@ -52,7 +52,7 @@
"test": "bun test test/*.test.ts"
},
"dependencies": {
"@peezy.tech/flow-runtime": "^0.3.0",
"@peezy.tech/flow-runtime": "^0.3.1",
"convex": "^1.38.0"
},
"devDependencies": {

View file

@ -10,6 +10,46 @@ Code Mode runners.
import { discoverFlows, matchingSteps, runFlowStep } from "@peezy.tech/flow-runtime";
```
## Flow Client
`@peezy.tech/flow-runtime/client` exposes a small flow-native client factory for
product code that should not care whether flows run locally or through an HTTP
backend:
```ts
import { createFlowClient } from "@peezy.tech/flow-runtime/client";
const flows = createFlowClient({
mode: "local",
cwd: process.cwd(),
});
await flows.dispatchEvent({
id: "patch:upstream.release:openai/codex:rust-v1.2.3",
type: "upstream.release",
source: "patch",
receivedAt: new Date().toISOString(),
payload: { repo: "openai/codex", tag: "rust-v1.2.3" },
});
```
Use `mode: "http"` to wrap the existing backend HTTP client:
```ts
const flows = createFlowClient({
mode: "http",
baseUrl: "http://127.0.0.1:7345",
hmacSecret: process.env.PATCH_FLOW_DISPATCH_SECRET,
});
```
`@peezy.tech/flow-runtime/local-client` runs matching steps synchronously in the
selected workspace and keeps in-memory run/event state by default. Set
`state: { kind: "file" }` to persist local run/event state under
`.codex/flow-client`. It preserves the generic `FlowEvent` and `FLOW_RESULT`
contracts; callers still provide deterministic event ids when idempotency
matters.
## Backend Client
`@peezy.tech/flow-runtime/backend-client` exposes backend-native inspection and

View file

@ -1,6 +1,6 @@
{
"name": "@peezy.tech/flow-runtime",
"version": "0.3.0",
"version": "0.3.1",
"description": "Generic flow package loader and runner primitives.",
"type": "module",
"license": "Apache-2.0",
@ -32,6 +32,14 @@
"./backend-client": {
"types": "./dist/backend-client.d.ts",
"import": "./dist/backend-client.js"
},
"./local-client": {
"types": "./dist/local-client.d.ts",
"import": "./dist/local-client.js"
},
"./client": {
"types": "./dist/client.d.ts",
"import": "./dist/client.js"
}
},
"scripts": {
@ -44,7 +52,7 @@
"test": "bun test test/*.test.ts"
},
"dependencies": {
"@peezy.tech/codex-flows": "^0.3.0"
"@peezy.tech/codex-flows": "^0.3.1"
},
"devDependencies": {
"@types/bun": "catalog:",

View file

@ -1,36 +1,30 @@
import { createHmac } from "node:crypto";
import type {
FlowAttemptView,
FlowCancelResult,
FlowClient,
FlowDispatchOptions,
FlowDispatchResult,
FlowEffectiveStatus,
FlowEventList,
FlowEventView,
FlowListEventsOptions,
FlowListRunsOptions,
FlowOutputView,
FlowProcessStatus,
FlowReplayOptions,
FlowReplayResult,
FlowRunList,
FlowRunView,
} from "./client-types.ts";
import type { FlowEvent, FlowResultStatus } from "./types.ts";
export type FlowBackendProcessStatus =
| "queued"
| "running"
| "completed"
| "failed"
| "canceled"
| string;
export type FlowBackendEffectiveStatus =
| FlowBackendProcessStatus
| FlowResultStatus;
export type FlowBackendListRunsOptions = {
eventId?: string;
status?: string;
limit?: number;
};
export type FlowBackendListEventsOptions = {
type?: string;
limit?: number;
};
export type FlowBackendDispatchOptions = {
wait?: boolean;
};
export type FlowBackendReplayOptions = {
wait?: boolean;
};
export type FlowBackendProcessStatus = FlowProcessStatus;
export type FlowBackendEffectiveStatus = FlowEffectiveStatus;
export type FlowBackendListRunsOptions = FlowListRunsOptions;
export type FlowBackendListEventsOptions = FlowListEventsOptions;
export type FlowBackendDispatchOptions = FlowDispatchOptions;
export type FlowBackendReplayOptions = FlowReplayOptions;
export type FlowBackendHttpHeaders =
| Headers
@ -42,102 +36,16 @@ export type FlowBackendFetch = (
init?: RequestInit,
) => Promise<Response>;
export type FlowBackendOutputView = {
kind: string;
text: string;
createdAt?: string;
raw: unknown;
};
export type FlowBackendAttemptView = {
id: string;
status?: string;
attemptNumber?: number;
workerId?: string;
leaseExpiresAt?: number;
startedAt?: string;
completedAt?: string;
error?: string;
raw: unknown;
};
export type FlowBackendRunView = {
id: string;
eventId?: string;
flowName?: string;
flowVersion?: number;
stepName?: string;
runner?: string;
backend?: string;
processStatus?: FlowBackendProcessStatus;
resultStatus?: FlowResultStatus;
status: FlowBackendEffectiveStatus;
effectiveStatus: FlowBackendEffectiveStatus;
needsAttention: boolean;
attemptCount: number;
attempts: FlowBackendAttemptView[];
output: FlowBackendOutputView[];
latestOutput?: FlowBackendOutputView;
resultPayload?: unknown;
error?: string;
createdAt?: string;
startedAt?: string;
completedAt?: string;
updatedAt?: string;
raw: unknown;
};
export type FlowBackendEventView = {
id: string;
type?: string;
source?: string;
occurredAt?: string;
receivedAt?: string;
payload?: unknown;
runIds: string[];
runs: FlowBackendRunView[];
createdAt?: string;
raw: unknown;
};
export type FlowBackendRunList = {
runs: FlowBackendRunView[];
eventId?: string;
raw: unknown;
};
export type FlowBackendEventList = {
events: FlowBackendEventView[];
raw: unknown;
};
export type FlowBackendDispatchResult = {
status?: string;
eventId?: string;
runIds: string[];
matched?: number;
idempotent?: boolean;
event?: FlowBackendEventView;
runs: FlowBackendRunView[];
raw: unknown;
};
export type FlowBackendReplayResult = FlowBackendDispatchResult;
export type FlowBackendCancelResult = {
run: FlowBackendRunView;
raw: unknown;
};
export type FlowBackendClient = {
listRuns(options?: FlowBackendListRunsOptions): Promise<FlowBackendRunList>;
getRun(runId: string): Promise<FlowBackendRunView>;
listEvents(options?: FlowBackendListEventsOptions): Promise<FlowBackendEventList>;
getEvent(eventId: string): Promise<FlowBackendEventView>;
dispatchEvent(event: FlowEvent, options?: FlowBackendDispatchOptions): Promise<FlowBackendDispatchResult>;
replayEvent(eventId: string, options?: FlowBackendReplayOptions): Promise<FlowBackendReplayResult>;
cancelRun(runId: string): Promise<FlowBackendCancelResult>;
};
export type FlowBackendOutputView = FlowOutputView;
export type FlowBackendAttemptView = FlowAttemptView;
export type FlowBackendRunView = FlowRunView;
export type FlowBackendEventView = FlowEventView;
export type FlowBackendRunList = FlowRunList;
export type FlowBackendEventList = FlowEventList;
export type FlowBackendDispatchResult = FlowDispatchResult;
export type FlowBackendReplayResult = FlowReplayResult;
export type FlowBackendCancelResult = FlowCancelResult;
export type FlowBackendClient = FlowClient;
export type FlowBackendHttpClientOptions = {
baseUrl: string;

View file

@ -0,0 +1,127 @@
import type { FlowEvent, FlowResultStatus } from "./types.ts";
export type FlowProcessStatus =
| "queued"
| "running"
| "completed"
| "failed"
| "canceled"
| string;
export type FlowEffectiveStatus = FlowProcessStatus | FlowResultStatus;
export type FlowListRunsOptions = {
eventId?: string;
status?: string;
limit?: number;
};
export type FlowListEventsOptions = {
type?: string;
limit?: number;
};
export type FlowDispatchOptions = {
wait?: boolean;
};
export type FlowReplayOptions = {
wait?: boolean;
};
export type FlowOutputView = {
kind: string;
text: string;
createdAt?: string;
raw: unknown;
};
export type FlowAttemptView = {
id: string;
status?: string;
attemptNumber?: number;
workerId?: string;
leaseExpiresAt?: number;
startedAt?: string;
completedAt?: string;
error?: string;
raw: unknown;
};
export type FlowRunView = {
id: string;
eventId?: string;
flowName?: string;
flowVersion?: number;
stepName?: string;
runner?: string;
backend?: string;
processStatus?: FlowProcessStatus;
resultStatus?: FlowResultStatus;
status: FlowEffectiveStatus;
effectiveStatus: FlowEffectiveStatus;
needsAttention: boolean;
attemptCount: number;
attempts: FlowAttemptView[];
output: FlowOutputView[];
latestOutput?: FlowOutputView;
resultPayload?: unknown;
error?: string;
createdAt?: string;
startedAt?: string;
completedAt?: string;
updatedAt?: string;
raw: unknown;
};
export type FlowEventView = {
id: string;
type?: string;
source?: string;
occurredAt?: string;
receivedAt?: string;
payload?: unknown;
runIds: string[];
runs: FlowRunView[];
createdAt?: string;
raw: unknown;
};
export type FlowRunList = {
runs: FlowRunView[];
eventId?: string;
raw: unknown;
};
export type FlowEventList = {
events: FlowEventView[];
raw: unknown;
};
export type FlowDispatchResult = {
status?: string;
eventId?: string;
runIds: string[];
matched?: number;
idempotent?: boolean;
event?: FlowEventView;
runs: FlowRunView[];
raw: unknown;
};
export type FlowReplayResult = FlowDispatchResult;
export type FlowCancelResult = {
run: FlowRunView;
raw: unknown;
};
export type FlowClient = {
listRuns(options?: FlowListRunsOptions): Promise<FlowRunList>;
getRun(runId: string): Promise<FlowRunView>;
listEvents(options?: FlowListEventsOptions): Promise<FlowEventList>;
getEvent(eventId: string): Promise<FlowEventView>;
dispatchEvent(event: FlowEvent, options?: FlowDispatchOptions): Promise<FlowDispatchResult>;
replayEvent(eventId: string, options?: FlowReplayOptions): Promise<FlowReplayResult>;
cancelRun(runId: string): Promise<FlowCancelResult>;
};

View file

@ -0,0 +1,40 @@
import {
createFlowBackendHttpClient,
type FlowBackendHttpClientOptions,
} from "./backend-client.ts";
import {
createLocalFlowClient,
type LocalFlowClientOptions,
} from "./local-client.ts";
import type { FlowClient } from "./client-types.ts";
export type {
FlowAttemptView,
FlowCancelResult,
FlowClient,
FlowDispatchOptions,
FlowDispatchResult,
FlowEffectiveStatus,
FlowEventList,
FlowEventView,
FlowListEventsOptions,
FlowListRunsOptions,
FlowOutputView,
FlowProcessStatus,
FlowReplayOptions,
FlowReplayResult,
FlowRunList,
FlowRunView,
} from "./client-types.ts";
export type FlowClientOptions =
| ({ mode: "local" } & LocalFlowClientOptions)
| ({ mode: "http" } & FlowBackendHttpClientOptions);
export function createFlowClient(options: FlowClientOptions): FlowClient {
if (options.mode === "local") {
const { mode: _mode, ...localOptions } = options;
return createLocalFlowClient(localOptions);
}
const { mode: _mode, ...httpOptions } = options;
return createFlowBackendHttpClient(httpOptions);
}

View file

@ -5,6 +5,33 @@ export { runBunStep } from "./runners/bun.ts";
export { runCodeModeStep } from "./runners/code-mode.ts";
export { readJsonSchema, validateJsonSchema } from "./schema.ts";
export { matchingSteps, stepMatchesEvent } from "./triggers.ts";
export { createFlowClient } from "./client.ts";
export { createLocalFlowClient, LocalFlowClient } from "./local-client.ts";
export type {
FlowAttemptView,
FlowCancelResult,
FlowClient,
FlowDispatchOptions,
FlowDispatchResult,
FlowEffectiveStatus,
FlowEventList,
FlowEventView,
FlowListEventsOptions,
FlowListRunsOptions,
FlowOutputView,
FlowProcessStatus,
FlowReplayOptions,
FlowReplayResult,
FlowRunList,
FlowRunView,
} from "./client-types.ts";
export type {
FlowClientOptions,
} from "./client.ts";
export type {
LocalFlowClientOptions,
LocalFlowCodexOptions,
} from "./local-client.ts";
export type {
FlowEvent,
FlowManifest,

View file

@ -0,0 +1,649 @@
import { createHash } from "node:crypto";
import { mkdirSync, readFileSync, writeFileSync } from "node:fs";
import path from "node:path";
import { discoverFlows } from "./manifest.ts";
import { runFlowStep } from "./run.ts";
import { matchingSteps } from "./triggers.ts";
import type {
FlowAttemptView,
FlowCancelResult,
FlowClient,
FlowDispatchOptions,
FlowDispatchResult,
FlowEffectiveStatus,
FlowEventList,
FlowEventView,
FlowListEventsOptions,
FlowListRunsOptions,
FlowOutputView,
FlowProcessStatus,
FlowReplayOptions,
FlowReplayResult,
FlowRunList,
FlowRunView,
} from "./client-types.ts";
import type {
FlowEvent,
FlowResult,
FlowResultStatus,
FlowStep,
LoadedFlow,
} from "./types.ts";
export type LocalFlowCodexOptions = {
mode?: "stdio";
command?: string;
codexHome?: string;
stream?: boolean;
};
export type LocalFlowClientOptions = {
cwd: string;
roots?: string[];
env?: Record<string, string | undefined>;
state?: false | "memory" | {
kind: "file";
dataDir?: string;
};
codex?: LocalFlowCodexOptions;
};
type StoredEvent = {
event: FlowEvent;
createdAt: string;
runIds: string[];
};
type LocalFlowStateSnapshot = {
events: StoredEvent[];
runs: FlowRunView[];
};
const resultStatuses = new Set<FlowResultStatus>([
"skipped",
"completed",
"changed",
"needs_intervention",
"blocked",
"failed",
]);
const attentionStatuses = new Set(["blocked", "needs_intervention"]);
export class LocalFlowClientUnsupportedStateError extends Error {
constructor(operation: string) {
super(`Local flow client ${operation} requires local state`);
this.name = "LocalFlowClientUnsupportedStateError";
}
}
export class LocalFlowClient implements FlowClient {
#cwd: string;
#roots: string[] | undefined;
#env: Record<string, string | undefined>;
#codex: LocalFlowCodexOptions | undefined;
#state: LocalFlowMemoryState | undefined;
constructor(options: LocalFlowClientOptions) {
this.#cwd = path.resolve(options.cwd);
this.#roots = options.roots?.map((root) =>
path.isAbsolute(root) ? root : path.resolve(this.#cwd, root),
);
this.#env = options.env ?? process.env;
this.#codex = options.codex;
this.#state = localState(options.state, this.#cwd);
}
async listRuns(options: FlowListRunsOptions = {}): Promise<FlowRunList> {
return this.#requireState("listRuns").listRuns(options);
}
async getRun(runId: string): Promise<FlowRunView> {
return this.#requireState("getRun").getRun(runId);
}
async listEvents(options: FlowListEventsOptions = {}): Promise<FlowEventList> {
return this.#requireState("listEvents").listEvents(options);
}
async getEvent(eventId: string): Promise<FlowEventView> {
return this.#requireState("getEvent").getEvent(eventId);
}
async dispatchEvent(
input: FlowEvent,
options: FlowDispatchOptions = {},
): Promise<FlowDispatchResult> {
ensureSynchronousLocalDispatch(options);
const event = normalizeFlowEvent(input);
const duplicate = this.#state?.duplicateDispatch(event.id);
if (duplicate) {
return duplicate;
}
const createdAt = new Date().toISOString();
const flows = await discoverFlows({
cwd: this.#cwd,
...(this.#roots ? { roots: this.#roots } : {}),
});
const matches = await matchingSteps(flows, event);
const runs: FlowRunView[] = [];
for (const match of matches) {
const run = await this.#executeMatch({
event,
flow: match.flow,
step: match.step,
});
runs.push(run);
}
const result = dispatchResult({
status: "accepted",
event,
runs,
matched: matches.length,
raw: {
status: "accepted",
eventId: event.id,
runIds: runs.map((run) => run.id),
matched: matches.length,
},
});
this.#state?.recordDispatch(event, createdAt, runs);
return result;
}
async replayEvent(
eventId: string,
options: FlowReplayOptions = {},
): Promise<FlowReplayResult> {
ensureSynchronousLocalDispatch(options);
const state = this.#requireState("replayEvent");
const event = state.rawEvent(eventId);
const replayNonce = `${Date.now()}:${Math.random()}`;
const flows = await discoverFlows({
cwd: this.#cwd,
...(this.#roots ? { roots: this.#roots } : {}),
});
const matches = await matchingSteps(flows, event);
const runs: FlowRunView[] = [];
for (const match of matches) {
const run = await this.#executeMatch({
event,
flow: match.flow,
step: match.step,
replayNonce,
});
runs.push(run);
}
state.recordReplay(event.id, runs);
return dispatchResult({
status: "accepted",
event,
runs,
matched: matches.length,
raw: {
status: "accepted",
eventId: event.id,
runIds: runs.map((run) => run.id),
matched: matches.length,
replay: true,
},
});
}
async cancelRun(_runId: string): Promise<FlowCancelResult> {
throw new LocalFlowClientUnsupportedStateError("cancelRun");
}
async #executeMatch(options: {
event: FlowEvent;
flow: LoadedFlow;
step: FlowStep;
replayNonce?: string;
}): Promise<FlowRunView> {
const runId = localRunId(
options.event.id,
options.flow.manifest.name,
options.step.name,
options.replayNonce,
);
const startedAt = new Date().toISOString();
try {
const result = await runFlowStep({
flow: options.flow,
step: options.step,
event: options.event,
env: this.#env,
codeMode: {
codexCommand: this.#codex?.command ?? this.#env.CODEX_APP_SERVER_CODEX_COMMAND,
codexHome: this.#codex?.codexHome ?? this.#env.CODEX_HOME,
stream: this.#codex?.stream ?? true,
},
});
const completedAt = new Date().toISOString();
return localRunView({
runId,
event: options.event,
flow: options.flow,
step: options.step,
processStatus: "completed",
result,
startedAt,
completedAt,
});
} catch (error) {
const completedAt = new Date().toISOString();
return localRunView({
runId,
event: options.event,
flow: options.flow,
step: options.step,
processStatus: "failed",
error: error instanceof Error ? error.message : String(error),
startedAt,
completedAt,
});
}
}
#requireState(operation: string): LocalFlowMemoryState {
if (!this.#state) {
throw new LocalFlowClientUnsupportedStateError(operation);
}
return this.#state;
}
}
export function createLocalFlowClient(options: LocalFlowClientOptions): LocalFlowClient {
return new LocalFlowClient(options);
}
class LocalFlowMemoryState {
#events = new Map<string, StoredEvent>();
#runs = new Map<string, FlowRunView>();
constructor(snapshot?: LocalFlowStateSnapshot) {
for (const event of snapshot?.events ?? []) {
this.#events.set(event.event.id, event);
}
for (const run of snapshot?.runs ?? []) {
this.#runs.set(run.id, run);
}
}
duplicateDispatch(eventId: string): FlowDispatchResult | undefined {
const stored = this.#events.get(eventId);
if (!stored) {
return undefined;
}
const runs = stored.runIds.map((runId) => this.#runs.get(runId)).filter(isDefined);
return {
status: "duplicate",
eventId,
runIds: stored.runIds,
matched: 0,
idempotent: true,
event: this.eventView(eventId),
runs,
raw: {
status: "duplicate",
eventId,
runIds: stored.runIds,
matched: 0,
idempotent: true,
},
};
}
recordDispatch(event: FlowEvent, createdAt: string, runs: FlowRunView[]): void {
for (const run of runs) {
this.#runs.set(run.id, run);
}
this.#events.set(event.id, {
event,
createdAt,
runIds: runs.map((run) => run.id),
});
}
recordReplay(eventId: string, runs: FlowRunView[]): void {
const stored = this.#events.get(eventId);
if (!stored) {
throw new Error(`Unknown event: ${eventId}`);
}
for (const run of runs) {
this.#runs.set(run.id, run);
stored.runIds.push(run.id);
}
}
listRuns(options: FlowListRunsOptions): FlowRunList {
let runs = Array.from(this.#runs.values());
if (options.eventId) {
runs = runs.filter((run) => run.eventId === options.eventId);
}
if (options.status) {
runs = runs.filter((run) =>
run.processStatus === options.status || run.effectiveStatus === options.status,
);
}
runs = runs.slice(-clampLimit(options.limit)).reverse();
return {
runs,
...(options.eventId ? { eventId: options.eventId } : {}),
raw: { runs },
};
}
getRun(runId: string): FlowRunView {
const run = this.#runs.get(runId);
if (!run) {
throw new Error(`Unknown run: ${runId}`);
}
return run;
}
listEvents(options: FlowListEventsOptions): FlowEventList {
let events = Array.from(this.#events.values());
if (options.type) {
events = events.filter((event) => event.event.type === options.type);
}
events = events.slice(-clampLimit(options.limit)).reverse();
const views = events.map((event) => this.eventView(event.event.id));
return {
events: views,
raw: { events: views },
};
}
getEvent(eventId: string): FlowEventView {
return this.eventView(eventId);
}
rawEvent(eventId: string): FlowEvent {
const stored = this.#events.get(eventId);
if (!stored) {
throw new Error(`Unknown event: ${eventId}`);
}
return stored.event;
}
eventView(eventId: string): FlowEventView {
const stored = this.#events.get(eventId);
if (!stored) {
throw new Error(`Unknown event: ${eventId}`);
}
const runs = stored.runIds.map((runId) => this.#runs.get(runId)).filter(isDefined);
return {
id: stored.event.id,
type: stored.event.type,
...(stored.event.source ? { source: stored.event.source } : {}),
...(stored.event.occurredAt ? { occurredAt: stored.event.occurredAt } : {}),
receivedAt: stored.event.receivedAt,
payload: stored.event.payload,
runIds: stored.runIds,
runs,
createdAt: stored.createdAt,
raw: {
kind: "local-event",
event: stored.event,
createdAt: stored.createdAt,
runIds: stored.runIds,
},
};
}
snapshot(): LocalFlowStateSnapshot {
return {
events: Array.from(this.#events.values()),
runs: Array.from(this.#runs.values()),
};
}
}
class LocalFlowFileState extends LocalFlowMemoryState {
#statePath: string;
constructor(dataDir: string) {
const statePath = path.join(dataDir, "state.json");
super(readStateSnapshot(statePath));
this.#statePath = statePath;
mkdirSync(path.dirname(this.#statePath), { recursive: true });
}
override recordDispatch(event: FlowEvent, createdAt: string, runs: FlowRunView[]): void {
super.recordDispatch(event, createdAt, runs);
this.#save();
}
override recordReplay(eventId: string, runs: FlowRunView[]): void {
super.recordReplay(eventId, runs);
this.#save();
}
#save(): void {
writeFileSync(this.#statePath, JSON.stringify(this.snapshot(), null, 2));
}
}
function dispatchResult(options: {
status: string;
event: FlowEvent;
runs: FlowRunView[];
matched: number;
raw: unknown;
}): FlowDispatchResult {
const runIds = options.runs.map((run) => run.id);
return {
status: options.status,
eventId: options.event.id,
runIds,
matched: options.matched,
event: {
id: options.event.id,
type: options.event.type,
...(options.event.source ? { source: options.event.source } : {}),
...(options.event.occurredAt ? { occurredAt: options.event.occurredAt } : {}),
receivedAt: options.event.receivedAt,
payload: options.event.payload,
runIds,
runs: options.runs,
raw: { kind: "local-event", event: options.event, runIds },
},
runs: options.runs,
raw: options.raw,
};
}
function localRunView(options: {
runId: string;
event: FlowEvent;
flow: LoadedFlow;
step: FlowStep;
processStatus: FlowProcessStatus;
result?: FlowResult;
error?: string;
startedAt: string;
completedAt: string;
}): FlowRunView {
const resultStatus = resultStatusFrom(options.result);
const effectiveStatus: FlowEffectiveStatus =
resultStatus ?? options.processStatus;
const attempt = localAttemptView({
runId: options.runId,
status: options.processStatus,
startedAt: options.startedAt,
completedAt: options.completedAt,
error: options.error,
});
const output: FlowOutputView[] = [];
return {
id: options.runId,
eventId: options.event.id,
flowName: options.flow.manifest.name,
flowVersion: options.flow.manifest.version,
stepName: options.step.name,
runner: options.step.runner,
backend: "local",
processStatus: options.processStatus,
...(resultStatus ? { resultStatus } : {}),
status: effectiveStatus,
effectiveStatus,
needsAttention: attentionStatuses.has(effectiveStatus),
attemptCount: 1,
attempts: [attempt],
output,
...(options.result ? { resultPayload: options.result } : {}),
...(options.error ? { error: options.error } : {}),
createdAt: options.startedAt,
startedAt: options.startedAt,
completedAt: options.completedAt,
updatedAt: options.completedAt,
raw: {
kind: "local-run",
event: options.event,
flowRoot: options.flow.root,
flowName: options.flow.manifest.name,
stepName: options.step.name,
result: options.result,
error: options.error,
},
};
}
function localAttemptView(options: {
runId: string;
status: FlowProcessStatus;
startedAt: string;
completedAt: string;
error?: string;
}): FlowAttemptView {
return {
id: `${options.runId}:attempt:1`,
status: options.status,
attemptNumber: 1,
startedAt: options.startedAt,
completedAt: options.completedAt,
...(options.error ? { error: options.error } : {}),
raw: {
kind: "local-attempt",
runId: options.runId,
},
};
}
function normalizeFlowEvent(value: unknown): FlowEvent {
const record = isRecord(value) ? value : {};
if (typeof record.id !== "string" || typeof record.type !== "string") {
throw new Error("FlowEvent requires string id and type");
}
return {
...record,
id: record.id,
type: record.type,
receivedAt: typeof record.receivedAt === "string" && record.receivedAt
? record.receivedAt
: new Date().toISOString(),
payload: "payload" in record ? record.payload : {},
} as FlowEvent;
}
function localState(
state: LocalFlowClientOptions["state"],
cwd: string,
): LocalFlowMemoryState | undefined {
if (state === false) {
return undefined;
}
if (typeof state === "object") {
return new LocalFlowFileState(state.dataDir ?? path.join(cwd, ".codex", "flow-client"));
}
return new LocalFlowMemoryState();
}
function readStateSnapshot(statePath: string): LocalFlowStateSnapshot | undefined {
try {
const parsed = JSON.parse(readFileSync(statePath, "utf8")) as unknown;
if (!isRecord(parsed) || !Array.isArray(parsed.events) || !Array.isArray(parsed.runs)) {
return undefined;
}
return {
events: parsed.events.filter(isStoredEvent),
runs: parsed.runs.filter(isFlowRunView),
};
} catch (error) {
if (isErrno(error, "ENOENT")) {
return undefined;
}
throw error;
}
}
function resultStatusFrom(result: FlowResult | undefined): FlowResultStatus | undefined {
return result && resultStatuses.has(result.status) ? result.status : undefined;
}
function localRunId(
eventId: string,
flowName: string,
stepName: string,
replayNonce?: string,
): string {
const hash = createHash("sha256")
.update(`${eventId}\0${flowName}\0${stepName}${replayNonce ? `\0${replayNonce}` : ""}`)
.digest("hex")
.slice(0, 12);
return replayNonce ? `run_${hash}_replay` : `run_${hash}`;
}
function ensureSynchronousLocalDispatch(
options: FlowDispatchOptions | FlowReplayOptions,
): void {
if (options.wait === false) {
throw new Error("Local flow dispatch does not support wait: false without a worker loop");
}
}
function clampLimit(value: number | undefined): number {
if (!value || !Number.isFinite(value)) {
return 50;
}
return Math.max(1, Math.min(500, Math.trunc(value)));
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function isStoredEvent(value: unknown): value is StoredEvent {
if (!isRecord(value) || !isRecord(value.event)) {
return false;
}
return typeof value.event.id === "string" &&
typeof value.event.type === "string" &&
typeof value.event.receivedAt === "string" &&
Array.isArray(value.runIds) &&
value.runIds.every((runId) => typeof runId === "string") &&
typeof value.createdAt === "string";
}
function isFlowRunView(value: unknown): value is FlowRunView {
return isRecord(value) &&
typeof value.id === "string" &&
typeof value.status === "string" &&
typeof value.effectiveStatus === "string" &&
typeof value.needsAttention === "boolean" &&
typeof value.attemptCount === "number" &&
Array.isArray(value.attempts) &&
Array.isArray(value.output) &&
"raw" in value;
}
function isErrno(error: unknown, code: string): boolean {
return isRecord(error) && error.code === code;
}
function isDefined<T>(value: T | undefined): value is T {
return value !== undefined;
}

View file

@ -0,0 +1,48 @@
import { expect, test } from "bun:test";
import { createFlowClient } from "../src/client.ts";
test("client factory creates an HTTP backend client with auth handling", async () => {
const requests: Request[] = [];
const client = createFlowClient({
mode: "http",
baseUrl: "https://flow.example",
hmacSecret: "secret",
fetch: async (request, init) => {
const normalized = request instanceof Request
? request
: new Request(String(request), init);
requests.push(normalized);
return Response.json({
status: "accepted",
eventId: "event-1",
runIds: ["run-1"],
matched: 1,
});
},
});
const result = await client.dispatchEvent({
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: {},
});
expect(result).toMatchObject({
status: "accepted",
eventId: "event-1",
runIds: ["run-1"],
matched: 1,
});
expect(requests[0]?.headers.get("x-flow-signature-256")).toStartWith("sha256=");
});
test("client factory creates a local client", async () => {
const client = createFlowClient({
mode: "local",
cwd: process.cwd(),
state: false,
});
await expect(client.listEvents()).rejects.toThrow("requires local state");
});

View file

@ -0,0 +1,270 @@
import { expect, test } from "bun:test";
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { createLocalFlowClient } from "../src/local-client.ts";
import type { FlowEvent } from "../src/index.ts";
test("local client dispatches matching steps and returns normalized views", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-"));
try {
await writeFlow(directory, "flows/demo", "source");
await writeFlow(directory, ".codex/flows/demo", "installed");
const client = createLocalFlowClient({ cwd: directory, env: {} });
const event: FlowEvent = {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: { name: "Ada" },
};
const result = await client.dispatchEvent(event);
expect(result).toMatchObject({
status: "accepted",
eventId: "event-1",
matched: 1,
runs: [
{
eventId: "event-1",
flowName: "demo",
stepName: "hello",
backend: "local",
processStatus: "completed",
resultStatus: "completed",
effectiveStatus: "completed",
needsAttention: false,
resultPayload: {
status: "completed",
message: "installed Ada",
},
},
],
});
const run = result.runs[0];
if (!run) {
throw new Error("expected one local run");
}
expect(result.runIds).toEqual([run.id]);
const eventView = await client.getEvent("event-1");
expect(eventView).toMatchObject({
id: "event-1",
type: "demo.event",
runIds: result.runIds,
runs: [{ id: result.runIds[0] }],
});
const runs = await client.listRuns({ eventId: "event-1" });
expect(runs.runs.map((run) => run.id)).toEqual(result.runIds);
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("local memory state dedupes normal dispatch and replays new attempts", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-"));
try {
await writeFlow(directory, "flows/demo", "demo");
const client = createLocalFlowClient({ cwd: directory, env: {} });
const event: FlowEvent = {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: { name: "Ada" },
};
const first = await client.dispatchEvent(event);
const duplicate = await client.dispatchEvent(event);
expect(duplicate).toMatchObject({
status: "duplicate",
eventId: "event-1",
matched: 0,
idempotent: true,
runIds: first.runIds,
});
const replay = await client.replayEvent("event-1");
expect(replay.status).toBe("accepted");
expect(replay.runIds).toHaveLength(1);
expect(replay.runIds[0]).not.toBe(first.runIds[0]);
expect(replay.runIds[0]).toEndWith("_replay");
const eventView = await client.getEvent("event-1");
expect(eventView.runIds).toEqual([...first.runIds, ...replay.runIds]);
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("local file state persists events and runs across client instances", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-"));
try {
await writeFlow(directory, "flows/demo", "demo");
const dataDir = path.join(directory, ".codex", "flow-client");
const event: FlowEvent = {
id: "event-1",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: { name: "Ada" },
};
const firstClient = createLocalFlowClient({
cwd: directory,
env: {},
state: { kind: "file", dataDir },
});
const first = await firstClient.dispatchEvent(event);
const secondClient = createLocalFlowClient({
cwd: directory,
env: {},
state: { kind: "file", dataDir },
});
const eventView = await secondClient.getEvent("event-1");
expect(eventView.runIds).toEqual(first.runIds);
const duplicate = await secondClient.dispatchEvent(event);
expect(duplicate).toMatchObject({
status: "duplicate",
idempotent: true,
runIds: first.runIds,
});
const replay = await secondClient.replayEvent("event-1");
expect(replay.runIds[0]).toEndWith("_replay");
const thirdClient = createLocalFlowClient({
cwd: directory,
env: {},
state: { kind: "file", dataDir },
});
expect((await thirdClient.getEvent("event-1")).runIds).toEqual([
...first.runIds,
...replay.runIds,
]);
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("local client marks semantic attention statuses from FLOW_RESULT", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-"));
try {
await writeFlow(directory, "flows/demo", "demo");
const client = createLocalFlowClient({ cwd: directory, env: {} });
const result = await client.dispatchEvent({
id: "event-blocked",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: { name: "Ada", status: "blocked" },
});
expect(result.runs[0]).toMatchObject({
processStatus: "completed",
resultStatus: "blocked",
effectiveStatus: "blocked",
needsAttention: true,
});
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("local client keeps Code Mode flow steps gated", async () => {
const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-"));
try {
await writeFlow(directory, "flows/demo", "demo", "code-mode");
const client = createLocalFlowClient({ cwd: directory, env: {} });
const result = await client.dispatchEvent({
id: "event-code-mode",
type: "demo.event",
receivedAt: "2026-05-15T00:00:00.000Z",
payload: { name: "Ada" },
});
expect(result.runs[0]).toMatchObject({
processStatus: "failed",
effectiveStatus: "failed",
needsAttention: false,
});
expect(result.runs[0]?.error).toContain("requires CODEX_FLOWS_ENABLE_CODE_MODE=1");
} finally {
await rm(directory, { recursive: true, force: true });
}
});
test("local client reports unsupported operations when state is disabled", async () => {
const client = createLocalFlowClient({
cwd: await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")),
state: false,
});
await expect(client.listEvents()).rejects.toThrow("requires local state");
await expect(client.getRun("missing")).rejects.toThrow("requires local state");
await expect(client.replayEvent("missing")).rejects.toThrow("requires local state");
});
test("local memory state rejects unknown events and runs", async () => {
const client = createLocalFlowClient({
cwd: await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")),
});
await expect(client.getEvent("missing")).rejects.toThrow("Unknown event");
await expect(client.getRun("missing")).rejects.toThrow("Unknown run");
await expect(client.replayEvent("missing")).rejects.toThrow("Unknown event");
});
async function writeFlow(
root: string,
relative: string,
label: string,
runner: "bun" | "code-mode" = "bun",
): Promise<void> {
const flowRoot = path.join(root, relative);
await mkdir(path.join(flowRoot, "exec"), { recursive: true });
await mkdir(path.join(flowRoot, "schemas"), { recursive: true });
await Bun.write(
path.join(flowRoot, "flow.toml"),
[
'name = "demo"',
"version = 1",
`description = ${JSON.stringify(label)}`,
"",
"[[steps]]",
'name = "hello"',
`runner = "${runner}"`,
'script = "exec/hello.ts"',
"timeout_ms = 30000",
"",
"[steps.trigger]",
'type = "demo.event"',
'schema = "schemas/demo-event.schema.json"',
"",
].join("\n"),
);
await Bun.write(
path.join(flowRoot, "schemas/demo-event.schema.json"),
JSON.stringify({
type: "object",
required: ["name"],
properties: {
name: { type: "string" },
status: { enum: ["completed", "changed", "blocked", "needs_intervention", "failed"] },
},
}),
);
await Bun.write(
path.join(flowRoot, "exec/hello.ts"),
[
"const context = JSON.parse(await Bun.stdin.text());",
"const payload = context.flow.event.payload;",
`const label = ${JSON.stringify(label)};`,
"const status = payload.status ?? 'completed';",
"console.log(`FLOW_RESULT ${JSON.stringify({ status, message: `${label} ${payload.name}` })}`);",
"",
].join("\n"),
);
}