From e7b7b1b34269bfcdbb87cd913666497d235cc176 Mon Sep 17 00:00:00 2001 From: matamune Date: Fri, 15 May 2026 20:38:09 +0000 Subject: [PATCH] Add reusable flow client runtime --- apps/flow-runner/src/index.ts | 27 +- apps/flow-runner/test/flow-runner.test.ts | 95 +++ bun.lock | 8 +- docs/2026-05-15-flow-client-design.md | 399 +++++++++++ docs/flows.md | 28 + package.json | 2 +- packages/codex-client/package.json | 2 +- packages/flow-backend-convex/package.json | 4 +- packages/flow-runtime/README.md | 40 ++ packages/flow-runtime/package.json | 12 +- packages/flow-runtime/src/backend-client.ts | 160 +---- packages/flow-runtime/src/client-types.ts | 127 ++++ packages/flow-runtime/src/client.ts | 40 ++ packages/flow-runtime/src/index.ts | 27 + packages/flow-runtime/src/local-client.ts | 649 ++++++++++++++++++ packages/flow-runtime/test/client.test.ts | 48 ++ .../flow-runtime/test/local-client.test.ts | 270 ++++++++ 17 files changed, 1796 insertions(+), 142 deletions(-) create mode 100644 apps/flow-runner/test/flow-runner.test.ts create mode 100644 docs/2026-05-15-flow-client-design.md create mode 100644 packages/flow-runtime/src/client-types.ts create mode 100644 packages/flow-runtime/src/client.ts create mode 100644 packages/flow-runtime/src/local-client.ts create mode 100644 packages/flow-runtime/test/client.test.ts create mode 100644 packages/flow-runtime/test/local-client.test.ts diff --git a/apps/flow-runner/src/index.ts b/apps/flow-runner/src/index.ts index 87078b1..fb234d0 100644 --- a/apps/flow-runner/src/index.ts +++ b/apps/flow-runner/src/index.ts @@ -2,7 +2,7 @@ import path from "node:path"; import { discoverFlows, - matchingSteps, + createLocalFlowClient, runFlowStep, type FlowEvent, type LoadedFlow, @@ -35,11 +35,26 @@ async function main(): Promise { } const event = await readEvent(cli.eventPath); if (cli.kind === "fire") { - const matches = await matchingSteps(flows, event); - const results = []; - for (const match of matches) { - results.push(await runAndReport(match.flow, match.step, event)); - } + const client = createLocalFlowClient({ + cwd: cli.cwd, + env: process.env, + codex: { + command: process.env.CODEX_APP_SERVER_CODEX_COMMAND, + codexHome: process.env.CODEX_HOME, + stream: true, + }, + }); + const dispatch = await client.dispatchEvent(event); + const results = dispatch.runs.map((run) => { + if (!run.resultPayload && run.error) { + throw new Error(run.error); + } + return { + flow: run.flowName, + step: run.stepName, + result: run.resultPayload, + }; + }); process.stdout.write(`${JSON.stringify({ eventId: event.id, results }, null, 2)}\n`); return; } diff --git a/apps/flow-runner/test/flow-runner.test.ts b/apps/flow-runner/test/flow-runner.test.ts new file mode 100644 index 0000000..43fcecc --- /dev/null +++ b/apps/flow-runner/test/flow-runner.test.ts @@ -0,0 +1,95 @@ +import { expect, test } from "bun:test"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +test("fire preserves the existing event/results payload shape", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-runner-")); + try { + await writeFlow(directory); + const eventPath = path.join(directory, "event.json"); + await Bun.write( + eventPath, + JSON.stringify({ + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: { name: "Ada" }, + }), + ); + + const runner = path.resolve(import.meta.dir, "..", "src", "index.ts"); + const process = Bun.spawn({ + cmd: ["bun", runner, "--cwd", directory, "fire", "--event", eventPath], + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([ + new Response(process.stdout).text(), + new Response(process.stderr).text(), + process.exited, + ]); + + expect(stderr).toBe(""); + expect(exitCode).toBe(0); + expect(JSON.parse(stdout)).toEqual({ + eventId: "event-1", + results: [ + { + flow: "demo", + step: "hello", + result: { + status: "completed", + message: "hello Ada", + }, + }, + ], + }); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +async function writeFlow(root: string): Promise { + const flowRoot = path.join(root, "flows/demo"); + await mkdir(path.join(flowRoot, "exec"), { recursive: true }); + await mkdir(path.join(flowRoot, "schemas"), { recursive: true }); + await Bun.write( + path.join(flowRoot, "flow.toml"), + [ + 'name = "demo"', + "version = 1", + 'description = "demo"', + "", + "[[steps]]", + 'name = "hello"', + 'runner = "bun"', + 'script = "exec/hello.ts"', + "timeout_ms = 30000", + "", + "[steps.trigger]", + 'type = "demo.event"', + 'schema = "schemas/demo-event.schema.json"', + "", + ].join("\n"), + ); + await Bun.write( + path.join(flowRoot, "schemas/demo-event.schema.json"), + JSON.stringify({ + type: "object", + required: ["name"], + properties: { + name: { type: "string" }, + }, + }), + ); + await Bun.write( + path.join(flowRoot, "exec/hello.ts"), + [ + "const context = JSON.parse(await Bun.stdin.text());", + "const name = context.flow.event.payload.name;", + "console.log(`FLOW_RESULT ${JSON.stringify({ status: 'completed', message: `hello ${name}` })}`);", + "", + ].join("\n"), + ); +} diff --git a/bun.lock b/bun.lock index 0f7d1bd..b10accc 100644 --- a/bun.lock +++ b/bun.lock @@ -89,7 +89,7 @@ }, "packages/codex-client": { "name": "@peezy.tech/codex-flows", - "version": "0.3.0", + "version": "0.3.1", "devDependencies": { "@types/bun": "^1.3.13", "@types/node": "^22.10.10", @@ -105,7 +105,7 @@ }, "packages/flow-backend-convex": { "name": "@peezy.tech/flow-backend-convex", - "version": "0.3.0", + "version": "0.3.1", "dependencies": { "@peezy.tech/flow-runtime": "workspace:*", "convex": "^1.38.0", @@ -118,9 +118,9 @@ }, "packages/flow-runtime": { "name": "@peezy.tech/flow-runtime", - "version": "0.3.0", + "version": "0.3.1", "dependencies": { - "@peezy.tech/codex-flows": "^0.3.0", + "@peezy.tech/codex-flows": "^0.3.1", }, "devDependencies": { "@types/bun": "catalog:", diff --git a/docs/2026-05-15-flow-client-design.md b/docs/2026-05-15-flow-client-design.md new file mode 100644 index 0000000..3e729f3 --- /dev/null +++ b/docs/2026-05-15-flow-client-design.md @@ -0,0 +1,399 @@ +# Flow Client Design - 2026-05-15 + +This note designs the next reusable client layer for Codex flows. It builds on +the current split between `@peezy.tech/flow-runtime`, +`@peezy.tech/flow-runtime/backend-client`, `codex-flow-runner`, and +`codex-flow-systemd-local`. + +## Intent + +Consumers such as `patch.moi` should be able to use Codex flows as a product +capability without caring whether execution is backendless in the current +workspace or delegated to a flow backend. + +The event-shaped contract remains the stable ABI. The client hides ceremony, +not semantics: + +- flow packages still receive `FlowEvent` through the existing runner context +- steps still emit `FLOW_RESULT` +- backend dispatch still uses `event.id` for idempotency +- app-owned domain completion stays outside generic flow clients and backends + +The product-facing experience can be direct: + +```bash +patch upstream release openai/codex rust-v1.2.3 +``` + +Patch can translate that into: + +```ts +await flows.dispatchEvent({ + id: "patch:upstream.release:openai/codex:rust-v1.2.3", + type: "upstream.release", + source: "patch", + receivedAt: new Date().toISOString(), + payload: { + repo: "openai/codex", + tag: "rust-v1.2.3", + }, +}); +``` + +## Existing Surfaces + +`@peezy.tech/flow-runtime` already provides the local building blocks: + +- `discoverFlows({ cwd, roots })` +- `matchingSteps(flows, event)` +- `runFlowStep({ flow, step, event, env, codeMode })` +- Bun and gated Code Mode runners +- JSON Schema trigger validation + +`codex-flow-runner` proves backendless execution works, but only as a CLI over +event JSON files. It does not expose a reusable client object, normalized +run/event views, local idempotency, or replay/list APIs. + +`codex-flow-systemd-local` provides durable backend behavior: + +- `POST /events` +- `GET /events` +- `GET /runs` +- `POST /events/:id/replay` +- process status in backend records +- semantic result status in `FLOW_RESULT` +- idempotency by `event.id` + +`@peezy.tech/flow-runtime/backend-client` normalizes HTTP backend responses and +already covers backend-native list/get/dispatch/replay/cancel operations. + +`patch.moi` currently builds `FlowEvent` objects from feed signals and POSTs +them to a configured backend URL. The same product should also support a local +CLI mode where flows run in the current repository without a daemon. + +## Proposed Packages + +Add two subpath exports to `@peezy.tech/flow-runtime`: + +- `@peezy.tech/flow-runtime/local-client` +- `@peezy.tech/flow-runtime/client` + +Keep the existing: + +- `@peezy.tech/flow-runtime/backend-client` + +`local-client` owns backendless execution. `backend-client` owns remote backend +HTTP access. `client` is a tiny factory and shared type surface over both. + +No new package named `sdk` should be introduced. + +## Client API + +The common client should expose flow-native operations: + +```ts +export type FlowClient = { + listRuns(options?: FlowListRunsOptions): Promise; + getRun(runId: string): Promise; + listEvents(options?: FlowListEventsOptions): Promise; + getEvent(eventId: string): Promise; + dispatchEvent(event: FlowEvent, options?: FlowDispatchOptions): Promise; + replayEvent(eventId: string, options?: FlowReplayOptions): Promise; + cancelRun(runId: string): Promise; +}; +``` + +The view models should be neutral aliases or successors of the current backend +views: + +- `FlowRunView` +- `FlowEventView` +- `FlowDispatchResult` +- `FlowRunList` +- `FlowEventList` + +They should preserve the fields proven by `backend-client`: + +- process status +- semantic `FLOW_RESULT` status +- `effectiveStatus` +- `needsAttention` +- attempts +- output +- latest output +- result payload +- raw backend/local record + +The term `backend` can remain as a field value, but common client names should +not force local execution to pretend it is a backend. + +## Factory + +The factory should be discriminated by mode: + +```ts +const flows = createFlowClient({ + mode: "local", + cwd: process.cwd(), +}); +``` + +```ts +const flows = createFlowClient({ + mode: "http", + baseUrl: process.env.PATCH_FLOW_BACKEND_URL, + hmacSecret: process.env.PATCH_FLOW_DISPATCH_SECRET, +}); +``` + +`mode: "http"` should wrap `createFlowBackendHttpClient()`. + +`mode: "local"` should wrap `createLocalFlowClient()`. + +## Local Client + +Local mode should execute matching flow steps directly in the selected +workspace: + +```ts +export type LocalFlowClientOptions = { + cwd: string; + roots?: string[]; + env?: Record; + state?: false | "memory" | { + kind: "file"; + dataDir?: string; + }; + codex?: LocalFlowCodexOptions; +}; +``` + +Discovery follows the runtime default unless roots are provided: + +- `.codex/flows/*` +- `flows/*` + +The first implementation should support `state: "memory"` by default. That is +enough for direct CLI runs and tests. A file-backed state mode can be added next +under `.codex/flow-runs` or `.codex/flow-client` without introducing a daemon. + +Local dispatch behavior: + +- normalize the incoming `FlowEvent` +- discover flows +- match trigger type and schema +- create one run view per matching step +- execute steps using `runFlowStep` +- return a normalized dispatch result +- mark semantic `blocked` and `needs_intervention` as attention states + +For local mode, `dispatchEvent` should run synchronously by default. A future +`wait: false` local option can queue to file state, but it should not fake async +behavior before there is a worker loop. + +## Idempotency And Replay + +The client must not silently generate random ids for normal dispatch. A product +can hide event construction, but it must still provide a deterministic event id +or idempotency key when duplicate suppression matters. + +Recommended product helper pattern: + +```ts +const event = patchUpstreamReleaseEvent({ + repo: "openai/codex", + tag: "rust-v1.2.3", +}); +await flows.dispatchEvent(event); +``` + +HTTP mode inherits backend idempotency and replay semantics: + +- duplicate `event.id` dispatch returns the backend's duplicate/idempotent + response +- `replayEvent(eventId)` creates a new backend attempt + +Local memory mode can dedupe duplicate event ids only for the lifetime of the +client process. File-backed local state should make idempotency durable and make +`replayEvent(eventId)` create a new local attempt. + +If local state is disabled, `listEvents`, `getEvent`, `listRuns`, `getRun`, and +`replayEvent` should fail with a clear unsupported-state error rather than +returning misleading empty data. + +## Code Mode Configuration + +Flow orchestration location and Codex execution location are separate axes. + +Local flow execution can still use a local or remote Codex executor for +`runner = "code-mode"` steps. + +Initial local Code Mode support should preserve current behavior: + +```ts +codex: { + mode: "stdio", + command: process.env.CODEX_APP_SERVER_CODEX_COMMAND, + codexHome: process.env.CODEX_HOME, + stream: true, +} +``` + +The design should leave room for: + +```ts +codex: { + mode: "remote", + url: "https://codex-worker.internal", + headers: { authorization: "Bearer ..." }, +} +``` + +Remote Code Mode should not be added until `runCodeModeStep` has a real remote +app-server transport. Do not simulate remote Codex by shelling out through +unrelated HTTP APIs. + +Code Mode remains gated by `CODEX_FLOWS_MODE=code-mode` or +`CODEX_FLOWS_ENABLE_CODE_MODE=1`. + +## Workspace Configuration + +The existing `.codex/workspace.toml` is already used for Discord gateway +surfaces. Flow client configuration can use the same file, but should stay in a +separate table: + +```toml +[flows] +mode = "local" +roots = ["flows"] +state = "memory" + +[flows.codex] +mode = "stdio" +stream = true +``` + +Patch can resolve configuration in this order: + +1. CLI flags +2. environment variables +3. `.codex/workspace.toml` +4. defaults + +The Discord-specific `[[discord.gateway.surfaces]]` table must remain unrelated +to flow execution. + +## Patch Integration + +Patch should be able to use one abstraction in both roles: + +- CLI utility run from a fork checkout +- feed watching service that dispatches upstream activity + +Suggested Patch adapter: + +```ts +const flows = createFlowClientFromPatchConfig({ + cwd, + env: process.env, +}); +``` + +Resolution: + +- if a backend URL is configured, use `mode: "http"` +- otherwise use `mode: "local"` in the current workspace + +The existing Patch server can replace its hand-rolled fetch/HMAC dispatch logic +with the HTTP client mode. The future Patch CLI can use local mode by default. + +Patch-specific commands should create domain events but not bypass the generic +flow ABI: + +```bash +patch upstream release openai/codex rust-v1.2.3 +patch flow fire upstream.release --repo openai/codex --tag rust-v1.2.3 +patch flow runs +``` + +## Boundaries + +This client is not an app-server SDK. It must not expose convenience wrappers +for app-server thread methods such as `setGoal`, `readThread`, `startTurn`, or +`renameThread`. + +The client owns generic flow concerns: + +- event dispatch +- flow discovery +- local step execution +- backend dispatch +- run/event normalization +- replay/cancel when supported by the selected mode + +It does not own: + +- Patch fork policy +- organization release rules +- pet-game asset registration +- payment state +- minting +- Discord gateway write tools +- arbitrary app-server command wrappers + +Domain completion remains app-owned. A flow result can say work completed or +needs intervention; the consuming app decides what that means in its product. + +## Implementation Plan + +1. Extract neutral view model names from `backend-client` without breaking the + existing backend-client export. +2. Add `local-client` with in-memory state and synchronous dispatch. +3. Add `client` factory that returns local or HTTP clients. +4. Refactor `codex-flow-runner` to use the local client internally. +5. Add optional file-backed local state for durable idempotency, list/get, and + replay. +6. Update Patch to consume the common client instead of hand-rolled HTTP + dispatch. +7. Add `.codex/workspace.toml` flow configuration support only after the direct + constructor API is stable. + +## Test Plan + +Local client tests: + +- discovers `.codex/flows/*` before `flows/*` +- dispatches one event to all matching steps +- returns normalized run/event/result views +- preserves `FLOW_RESULT.status` as semantic result status +- marks `blocked` and `needs_intervention` as `needsAttention` +- forwards Code Mode configuration to `runFlowStep` +- rejects replay/list/get when durable or memory state does not contain the + requested event/run +- dedupes duplicate event ids in memory mode + +HTTP factory tests: + +- delegates to `FlowBackendHttpClient` +- preserves HMAC/header/auth construction +- returns the same normalized payload shape as direct backend-client use + +CLI and Patch-oriented tests: + +- `codex-flow-runner fire` continues to produce the existing payload shape +- Patch can select local mode when no backend URL is configured +- Patch can select HTTP mode when a backend URL is configured +- Patch product helpers create deterministic event ids for fork-maintenance + commands + +## Open Decisions + +The first implementation can proceed with conservative defaults, but these +should be decided before exposing a stable CLI promise: + +- whether local mode should default to in-memory state or file state +- exact default local state directory if file state is enabled +- whether local dispatch should ever support `wait: false` +- final remote Code Mode transport shape +- whether `cancelRun` in local synchronous mode should be unsupported or tied to + a future worker process diff --git a/docs/flows.md b/docs/flows.md index 953ce6f..ea8ca2d 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -206,6 +206,34 @@ completion logic stays outside generic backends. For example, a pet-game worker may upload generated assets, update payment state, and mint before completing a generic flow run. +## Flow Client + +Reusable product code can use `@peezy.tech/flow-runtime/client` when it should +dispatch, inspect, or replay generic flow events without hard-coding whether +execution is local or delegated to an HTTP backend. + +```ts +import { createFlowClient } from "@peezy.tech/flow-runtime/client"; + +const flows = createFlowClient({ + mode: "local", + cwd: process.cwd(), +}); +``` + +`mode: "local"` wraps `@peezy.tech/flow-runtime/local-client` and runs matching +steps synchronously in the selected workspace with in-memory run/event state by +default. Set `state: { kind: "file" }` to persist local state under +`.codex/flow-client`. `mode: "http"` wraps the existing backend HTTP client and +inherits the backend's durable idempotency, replay, and cancellation semantics. + +Both modes preserve the same `FlowEvent` and `FLOW_RESULT` contracts. The client +hides dispatch ceremony, not event semantics: products still provide stable +event ids when duplicate suppression matters, and product-specific completion +logic stays outside the generic client. See +`docs/2026-05-15-flow-client-design.md` for the design rationale, Patch +integration path, local state model, and remaining open decisions. + ## Convex Backend Direction Convex should be a durable orchestration backend, not the place where long diff --git a/package.json b/package.json index 8ad36ee..eebc28f 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,7 @@ "replay:thread": "bun scripts/run-code-mode-in-new-thread.ts", "start": "bun run --filter web preview", "start:discord:debug:commentary": "bun run --filter codex-discord-bridge start:debug:commentary", - "test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter @peezy.tech/flow-runtime test && bun run --filter @peezy.tech/flow-backend-convex test && bun run --filter @peezy.tech/codex-opencode-go-router test && bun run --filter codex-flow-systemd-local test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test", + "test": "bun run --filter @peezy.tech/codex-flows test && bun run --filter @peezy.tech/flow-runtime test && bun run --filter @peezy.tech/flow-backend-convex test && bun run --filter @peezy.tech/codex-opencode-go-router test && bun run --filter codex-flow-systemd-local test && bun run --filter codex-flow-runner test && bun run --filter codex-app-cli test && bun run --filter codex-discord-bridge test", "release:check": "bun run --filter @peezy.tech/codex-flows release:check && bun run --filter @peezy.tech/flow-runtime release:check && bun run --filter @peezy.tech/flow-backend-convex release:check" } } diff --git a/packages/codex-client/package.json b/packages/codex-client/package.json index 5efa822..5e979fd 100644 --- a/packages/codex-client/package.json +++ b/packages/codex-client/package.json @@ -1,6 +1,6 @@ { "name": "@peezy.tech/codex-flows", - "version": "0.3.0", + "version": "0.3.1", "description": "Codex app-server JSON-RPC client, flow helpers, and generated protocol types.", "type": "module", "license": "Apache-2.0", diff --git a/packages/flow-backend-convex/package.json b/packages/flow-backend-convex/package.json index 38a031f..841fd62 100644 --- a/packages/flow-backend-convex/package.json +++ b/packages/flow-backend-convex/package.json @@ -1,6 +1,6 @@ { "name": "@peezy.tech/flow-backend-convex", - "version": "0.3.0", + "version": "0.3.1", "description": "Reusable Convex component for durable codex-flow event, run, lease, and result state.", "type": "module", "license": "Apache-2.0", @@ -52,7 +52,7 @@ "test": "bun test test/*.test.ts" }, "dependencies": { - "@peezy.tech/flow-runtime": "^0.3.0", + "@peezy.tech/flow-runtime": "^0.3.1", "convex": "^1.38.0" }, "devDependencies": { diff --git a/packages/flow-runtime/README.md b/packages/flow-runtime/README.md index 3551191..f5e60e8 100644 --- a/packages/flow-runtime/README.md +++ b/packages/flow-runtime/README.md @@ -10,6 +10,46 @@ Code Mode runners. import { discoverFlows, matchingSteps, runFlowStep } from "@peezy.tech/flow-runtime"; ``` +## Flow Client + +`@peezy.tech/flow-runtime/client` exposes a small flow-native client factory for +product code that should not care whether flows run locally or through an HTTP +backend: + +```ts +import { createFlowClient } from "@peezy.tech/flow-runtime/client"; + +const flows = createFlowClient({ + mode: "local", + cwd: process.cwd(), +}); + +await flows.dispatchEvent({ + id: "patch:upstream.release:openai/codex:rust-v1.2.3", + type: "upstream.release", + source: "patch", + receivedAt: new Date().toISOString(), + payload: { repo: "openai/codex", tag: "rust-v1.2.3" }, +}); +``` + +Use `mode: "http"` to wrap the existing backend HTTP client: + +```ts +const flows = createFlowClient({ + mode: "http", + baseUrl: "http://127.0.0.1:7345", + hmacSecret: process.env.PATCH_FLOW_DISPATCH_SECRET, +}); +``` + +`@peezy.tech/flow-runtime/local-client` runs matching steps synchronously in the +selected workspace and keeps in-memory run/event state by default. Set +`state: { kind: "file" }` to persist local run/event state under +`.codex/flow-client`. It preserves the generic `FlowEvent` and `FLOW_RESULT` +contracts; callers still provide deterministic event ids when idempotency +matters. + ## Backend Client `@peezy.tech/flow-runtime/backend-client` exposes backend-native inspection and diff --git a/packages/flow-runtime/package.json b/packages/flow-runtime/package.json index 89dee83..0ad9705 100644 --- a/packages/flow-runtime/package.json +++ b/packages/flow-runtime/package.json @@ -1,6 +1,6 @@ { "name": "@peezy.tech/flow-runtime", - "version": "0.3.0", + "version": "0.3.1", "description": "Generic flow package loader and runner primitives.", "type": "module", "license": "Apache-2.0", @@ -32,6 +32,14 @@ "./backend-client": { "types": "./dist/backend-client.d.ts", "import": "./dist/backend-client.js" + }, + "./local-client": { + "types": "./dist/local-client.d.ts", + "import": "./dist/local-client.js" + }, + "./client": { + "types": "./dist/client.d.ts", + "import": "./dist/client.js" } }, "scripts": { @@ -44,7 +52,7 @@ "test": "bun test test/*.test.ts" }, "dependencies": { - "@peezy.tech/codex-flows": "^0.3.0" + "@peezy.tech/codex-flows": "^0.3.1" }, "devDependencies": { "@types/bun": "catalog:", diff --git a/packages/flow-runtime/src/backend-client.ts b/packages/flow-runtime/src/backend-client.ts index c650e1a..59e4ff3 100644 --- a/packages/flow-runtime/src/backend-client.ts +++ b/packages/flow-runtime/src/backend-client.ts @@ -1,36 +1,30 @@ import { createHmac } from "node:crypto"; +import type { + FlowAttemptView, + FlowCancelResult, + FlowClient, + FlowDispatchOptions, + FlowDispatchResult, + FlowEffectiveStatus, + FlowEventList, + FlowEventView, + FlowListEventsOptions, + FlowListRunsOptions, + FlowOutputView, + FlowProcessStatus, + FlowReplayOptions, + FlowReplayResult, + FlowRunList, + FlowRunView, +} from "./client-types.ts"; import type { FlowEvent, FlowResultStatus } from "./types.ts"; -export type FlowBackendProcessStatus = - | "queued" - | "running" - | "completed" - | "failed" - | "canceled" - | string; - -export type FlowBackendEffectiveStatus = - | FlowBackendProcessStatus - | FlowResultStatus; - -export type FlowBackendListRunsOptions = { - eventId?: string; - status?: string; - limit?: number; -}; - -export type FlowBackendListEventsOptions = { - type?: string; - limit?: number; -}; - -export type FlowBackendDispatchOptions = { - wait?: boolean; -}; - -export type FlowBackendReplayOptions = { - wait?: boolean; -}; +export type FlowBackendProcessStatus = FlowProcessStatus; +export type FlowBackendEffectiveStatus = FlowEffectiveStatus; +export type FlowBackendListRunsOptions = FlowListRunsOptions; +export type FlowBackendListEventsOptions = FlowListEventsOptions; +export type FlowBackendDispatchOptions = FlowDispatchOptions; +export type FlowBackendReplayOptions = FlowReplayOptions; export type FlowBackendHttpHeaders = | Headers @@ -42,102 +36,16 @@ export type FlowBackendFetch = ( init?: RequestInit, ) => Promise; -export type FlowBackendOutputView = { - kind: string; - text: string; - createdAt?: string; - raw: unknown; -}; - -export type FlowBackendAttemptView = { - id: string; - status?: string; - attemptNumber?: number; - workerId?: string; - leaseExpiresAt?: number; - startedAt?: string; - completedAt?: string; - error?: string; - raw: unknown; -}; - -export type FlowBackendRunView = { - id: string; - eventId?: string; - flowName?: string; - flowVersion?: number; - stepName?: string; - runner?: string; - backend?: string; - processStatus?: FlowBackendProcessStatus; - resultStatus?: FlowResultStatus; - status: FlowBackendEffectiveStatus; - effectiveStatus: FlowBackendEffectiveStatus; - needsAttention: boolean; - attemptCount: number; - attempts: FlowBackendAttemptView[]; - output: FlowBackendOutputView[]; - latestOutput?: FlowBackendOutputView; - resultPayload?: unknown; - error?: string; - createdAt?: string; - startedAt?: string; - completedAt?: string; - updatedAt?: string; - raw: unknown; -}; - -export type FlowBackendEventView = { - id: string; - type?: string; - source?: string; - occurredAt?: string; - receivedAt?: string; - payload?: unknown; - runIds: string[]; - runs: FlowBackendRunView[]; - createdAt?: string; - raw: unknown; -}; - -export type FlowBackendRunList = { - runs: FlowBackendRunView[]; - eventId?: string; - raw: unknown; -}; - -export type FlowBackendEventList = { - events: FlowBackendEventView[]; - raw: unknown; -}; - -export type FlowBackendDispatchResult = { - status?: string; - eventId?: string; - runIds: string[]; - matched?: number; - idempotent?: boolean; - event?: FlowBackendEventView; - runs: FlowBackendRunView[]; - raw: unknown; -}; - -export type FlowBackendReplayResult = FlowBackendDispatchResult; - -export type FlowBackendCancelResult = { - run: FlowBackendRunView; - raw: unknown; -}; - -export type FlowBackendClient = { - listRuns(options?: FlowBackendListRunsOptions): Promise; - getRun(runId: string): Promise; - listEvents(options?: FlowBackendListEventsOptions): Promise; - getEvent(eventId: string): Promise; - dispatchEvent(event: FlowEvent, options?: FlowBackendDispatchOptions): Promise; - replayEvent(eventId: string, options?: FlowBackendReplayOptions): Promise; - cancelRun(runId: string): Promise; -}; +export type FlowBackendOutputView = FlowOutputView; +export type FlowBackendAttemptView = FlowAttemptView; +export type FlowBackendRunView = FlowRunView; +export type FlowBackendEventView = FlowEventView; +export type FlowBackendRunList = FlowRunList; +export type FlowBackendEventList = FlowEventList; +export type FlowBackendDispatchResult = FlowDispatchResult; +export type FlowBackendReplayResult = FlowReplayResult; +export type FlowBackendCancelResult = FlowCancelResult; +export type FlowBackendClient = FlowClient; export type FlowBackendHttpClientOptions = { baseUrl: string; diff --git a/packages/flow-runtime/src/client-types.ts b/packages/flow-runtime/src/client-types.ts new file mode 100644 index 0000000..af83481 --- /dev/null +++ b/packages/flow-runtime/src/client-types.ts @@ -0,0 +1,127 @@ +import type { FlowEvent, FlowResultStatus } from "./types.ts"; + +export type FlowProcessStatus = + | "queued" + | "running" + | "completed" + | "failed" + | "canceled" + | string; + +export type FlowEffectiveStatus = FlowProcessStatus | FlowResultStatus; + +export type FlowListRunsOptions = { + eventId?: string; + status?: string; + limit?: number; +}; + +export type FlowListEventsOptions = { + type?: string; + limit?: number; +}; + +export type FlowDispatchOptions = { + wait?: boolean; +}; + +export type FlowReplayOptions = { + wait?: boolean; +}; + +export type FlowOutputView = { + kind: string; + text: string; + createdAt?: string; + raw: unknown; +}; + +export type FlowAttemptView = { + id: string; + status?: string; + attemptNumber?: number; + workerId?: string; + leaseExpiresAt?: number; + startedAt?: string; + completedAt?: string; + error?: string; + raw: unknown; +}; + +export type FlowRunView = { + id: string; + eventId?: string; + flowName?: string; + flowVersion?: number; + stepName?: string; + runner?: string; + backend?: string; + processStatus?: FlowProcessStatus; + resultStatus?: FlowResultStatus; + status: FlowEffectiveStatus; + effectiveStatus: FlowEffectiveStatus; + needsAttention: boolean; + attemptCount: number; + attempts: FlowAttemptView[]; + output: FlowOutputView[]; + latestOutput?: FlowOutputView; + resultPayload?: unknown; + error?: string; + createdAt?: string; + startedAt?: string; + completedAt?: string; + updatedAt?: string; + raw: unknown; +}; + +export type FlowEventView = { + id: string; + type?: string; + source?: string; + occurredAt?: string; + receivedAt?: string; + payload?: unknown; + runIds: string[]; + runs: FlowRunView[]; + createdAt?: string; + raw: unknown; +}; + +export type FlowRunList = { + runs: FlowRunView[]; + eventId?: string; + raw: unknown; +}; + +export type FlowEventList = { + events: FlowEventView[]; + raw: unknown; +}; + +export type FlowDispatchResult = { + status?: string; + eventId?: string; + runIds: string[]; + matched?: number; + idempotent?: boolean; + event?: FlowEventView; + runs: FlowRunView[]; + raw: unknown; +}; + +export type FlowReplayResult = FlowDispatchResult; + +export type FlowCancelResult = { + run: FlowRunView; + raw: unknown; +}; + +export type FlowClient = { + listRuns(options?: FlowListRunsOptions): Promise; + getRun(runId: string): Promise; + listEvents(options?: FlowListEventsOptions): Promise; + getEvent(eventId: string): Promise; + dispatchEvent(event: FlowEvent, options?: FlowDispatchOptions): Promise; + replayEvent(eventId: string, options?: FlowReplayOptions): Promise; + cancelRun(runId: string): Promise; +}; diff --git a/packages/flow-runtime/src/client.ts b/packages/flow-runtime/src/client.ts new file mode 100644 index 0000000..566f1ff --- /dev/null +++ b/packages/flow-runtime/src/client.ts @@ -0,0 +1,40 @@ +import { + createFlowBackendHttpClient, + type FlowBackendHttpClientOptions, +} from "./backend-client.ts"; +import { + createLocalFlowClient, + type LocalFlowClientOptions, +} from "./local-client.ts"; +import type { FlowClient } from "./client-types.ts"; +export type { + FlowAttemptView, + FlowCancelResult, + FlowClient, + FlowDispatchOptions, + FlowDispatchResult, + FlowEffectiveStatus, + FlowEventList, + FlowEventView, + FlowListEventsOptions, + FlowListRunsOptions, + FlowOutputView, + FlowProcessStatus, + FlowReplayOptions, + FlowReplayResult, + FlowRunList, + FlowRunView, +} from "./client-types.ts"; + +export type FlowClientOptions = + | ({ mode: "local" } & LocalFlowClientOptions) + | ({ mode: "http" } & FlowBackendHttpClientOptions); + +export function createFlowClient(options: FlowClientOptions): FlowClient { + if (options.mode === "local") { + const { mode: _mode, ...localOptions } = options; + return createLocalFlowClient(localOptions); + } + const { mode: _mode, ...httpOptions } = options; + return createFlowBackendHttpClient(httpOptions); +} diff --git a/packages/flow-runtime/src/index.ts b/packages/flow-runtime/src/index.ts index 6aa4271..9e87872 100644 --- a/packages/flow-runtime/src/index.ts +++ b/packages/flow-runtime/src/index.ts @@ -5,6 +5,33 @@ export { runBunStep } from "./runners/bun.ts"; export { runCodeModeStep } from "./runners/code-mode.ts"; export { readJsonSchema, validateJsonSchema } from "./schema.ts"; export { matchingSteps, stepMatchesEvent } from "./triggers.ts"; +export { createFlowClient } from "./client.ts"; +export { createLocalFlowClient, LocalFlowClient } from "./local-client.ts"; +export type { + FlowAttemptView, + FlowCancelResult, + FlowClient, + FlowDispatchOptions, + FlowDispatchResult, + FlowEffectiveStatus, + FlowEventList, + FlowEventView, + FlowListEventsOptions, + FlowListRunsOptions, + FlowOutputView, + FlowProcessStatus, + FlowReplayOptions, + FlowReplayResult, + FlowRunList, + FlowRunView, +} from "./client-types.ts"; +export type { + FlowClientOptions, +} from "./client.ts"; +export type { + LocalFlowClientOptions, + LocalFlowCodexOptions, +} from "./local-client.ts"; export type { FlowEvent, FlowManifest, diff --git a/packages/flow-runtime/src/local-client.ts b/packages/flow-runtime/src/local-client.ts new file mode 100644 index 0000000..a13ee11 --- /dev/null +++ b/packages/flow-runtime/src/local-client.ts @@ -0,0 +1,649 @@ +import { createHash } from "node:crypto"; +import { mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import path from "node:path"; +import { discoverFlows } from "./manifest.ts"; +import { runFlowStep } from "./run.ts"; +import { matchingSteps } from "./triggers.ts"; +import type { + FlowAttemptView, + FlowCancelResult, + FlowClient, + FlowDispatchOptions, + FlowDispatchResult, + FlowEffectiveStatus, + FlowEventList, + FlowEventView, + FlowListEventsOptions, + FlowListRunsOptions, + FlowOutputView, + FlowProcessStatus, + FlowReplayOptions, + FlowReplayResult, + FlowRunList, + FlowRunView, +} from "./client-types.ts"; +import type { + FlowEvent, + FlowResult, + FlowResultStatus, + FlowStep, + LoadedFlow, +} from "./types.ts"; + +export type LocalFlowCodexOptions = { + mode?: "stdio"; + command?: string; + codexHome?: string; + stream?: boolean; +}; + +export type LocalFlowClientOptions = { + cwd: string; + roots?: string[]; + env?: Record; + state?: false | "memory" | { + kind: "file"; + dataDir?: string; + }; + codex?: LocalFlowCodexOptions; +}; + +type StoredEvent = { + event: FlowEvent; + createdAt: string; + runIds: string[]; +}; + +type LocalFlowStateSnapshot = { + events: StoredEvent[]; + runs: FlowRunView[]; +}; + +const resultStatuses = new Set([ + "skipped", + "completed", + "changed", + "needs_intervention", + "blocked", + "failed", +]); + +const attentionStatuses = new Set(["blocked", "needs_intervention"]); + +export class LocalFlowClientUnsupportedStateError extends Error { + constructor(operation: string) { + super(`Local flow client ${operation} requires local state`); + this.name = "LocalFlowClientUnsupportedStateError"; + } +} + +export class LocalFlowClient implements FlowClient { + #cwd: string; + #roots: string[] | undefined; + #env: Record; + #codex: LocalFlowCodexOptions | undefined; + #state: LocalFlowMemoryState | undefined; + + constructor(options: LocalFlowClientOptions) { + this.#cwd = path.resolve(options.cwd); + this.#roots = options.roots?.map((root) => + path.isAbsolute(root) ? root : path.resolve(this.#cwd, root), + ); + this.#env = options.env ?? process.env; + this.#codex = options.codex; + this.#state = localState(options.state, this.#cwd); + } + + async listRuns(options: FlowListRunsOptions = {}): Promise { + return this.#requireState("listRuns").listRuns(options); + } + + async getRun(runId: string): Promise { + return this.#requireState("getRun").getRun(runId); + } + + async listEvents(options: FlowListEventsOptions = {}): Promise { + return this.#requireState("listEvents").listEvents(options); + } + + async getEvent(eventId: string): Promise { + return this.#requireState("getEvent").getEvent(eventId); + } + + async dispatchEvent( + input: FlowEvent, + options: FlowDispatchOptions = {}, + ): Promise { + ensureSynchronousLocalDispatch(options); + const event = normalizeFlowEvent(input); + const duplicate = this.#state?.duplicateDispatch(event.id); + if (duplicate) { + return duplicate; + } + + const createdAt = new Date().toISOString(); + const flows = await discoverFlows({ + cwd: this.#cwd, + ...(this.#roots ? { roots: this.#roots } : {}), + }); + const matches = await matchingSteps(flows, event); + const runs: FlowRunView[] = []; + for (const match of matches) { + const run = await this.#executeMatch({ + event, + flow: match.flow, + step: match.step, + }); + runs.push(run); + } + + const result = dispatchResult({ + status: "accepted", + event, + runs, + matched: matches.length, + raw: { + status: "accepted", + eventId: event.id, + runIds: runs.map((run) => run.id), + matched: matches.length, + }, + }); + this.#state?.recordDispatch(event, createdAt, runs); + return result; + } + + async replayEvent( + eventId: string, + options: FlowReplayOptions = {}, + ): Promise { + ensureSynchronousLocalDispatch(options); + const state = this.#requireState("replayEvent"); + const event = state.rawEvent(eventId); + const replayNonce = `${Date.now()}:${Math.random()}`; + const flows = await discoverFlows({ + cwd: this.#cwd, + ...(this.#roots ? { roots: this.#roots } : {}), + }); + const matches = await matchingSteps(flows, event); + const runs: FlowRunView[] = []; + for (const match of matches) { + const run = await this.#executeMatch({ + event, + flow: match.flow, + step: match.step, + replayNonce, + }); + runs.push(run); + } + state.recordReplay(event.id, runs); + return dispatchResult({ + status: "accepted", + event, + runs, + matched: matches.length, + raw: { + status: "accepted", + eventId: event.id, + runIds: runs.map((run) => run.id), + matched: matches.length, + replay: true, + }, + }); + } + + async cancelRun(_runId: string): Promise { + throw new LocalFlowClientUnsupportedStateError("cancelRun"); + } + + async #executeMatch(options: { + event: FlowEvent; + flow: LoadedFlow; + step: FlowStep; + replayNonce?: string; + }): Promise { + const runId = localRunId( + options.event.id, + options.flow.manifest.name, + options.step.name, + options.replayNonce, + ); + const startedAt = new Date().toISOString(); + try { + const result = await runFlowStep({ + flow: options.flow, + step: options.step, + event: options.event, + env: this.#env, + codeMode: { + codexCommand: this.#codex?.command ?? this.#env.CODEX_APP_SERVER_CODEX_COMMAND, + codexHome: this.#codex?.codexHome ?? this.#env.CODEX_HOME, + stream: this.#codex?.stream ?? true, + }, + }); + const completedAt = new Date().toISOString(); + return localRunView({ + runId, + event: options.event, + flow: options.flow, + step: options.step, + processStatus: "completed", + result, + startedAt, + completedAt, + }); + } catch (error) { + const completedAt = new Date().toISOString(); + return localRunView({ + runId, + event: options.event, + flow: options.flow, + step: options.step, + processStatus: "failed", + error: error instanceof Error ? error.message : String(error), + startedAt, + completedAt, + }); + } + } + + #requireState(operation: string): LocalFlowMemoryState { + if (!this.#state) { + throw new LocalFlowClientUnsupportedStateError(operation); + } + return this.#state; + } +} + +export function createLocalFlowClient(options: LocalFlowClientOptions): LocalFlowClient { + return new LocalFlowClient(options); +} + +class LocalFlowMemoryState { + #events = new Map(); + #runs = new Map(); + + constructor(snapshot?: LocalFlowStateSnapshot) { + for (const event of snapshot?.events ?? []) { + this.#events.set(event.event.id, event); + } + for (const run of snapshot?.runs ?? []) { + this.#runs.set(run.id, run); + } + } + + duplicateDispatch(eventId: string): FlowDispatchResult | undefined { + const stored = this.#events.get(eventId); + if (!stored) { + return undefined; + } + const runs = stored.runIds.map((runId) => this.#runs.get(runId)).filter(isDefined); + return { + status: "duplicate", + eventId, + runIds: stored.runIds, + matched: 0, + idempotent: true, + event: this.eventView(eventId), + runs, + raw: { + status: "duplicate", + eventId, + runIds: stored.runIds, + matched: 0, + idempotent: true, + }, + }; + } + + recordDispatch(event: FlowEvent, createdAt: string, runs: FlowRunView[]): void { + for (const run of runs) { + this.#runs.set(run.id, run); + } + this.#events.set(event.id, { + event, + createdAt, + runIds: runs.map((run) => run.id), + }); + } + + recordReplay(eventId: string, runs: FlowRunView[]): void { + const stored = this.#events.get(eventId); + if (!stored) { + throw new Error(`Unknown event: ${eventId}`); + } + for (const run of runs) { + this.#runs.set(run.id, run); + stored.runIds.push(run.id); + } + } + + listRuns(options: FlowListRunsOptions): FlowRunList { + let runs = Array.from(this.#runs.values()); + if (options.eventId) { + runs = runs.filter((run) => run.eventId === options.eventId); + } + if (options.status) { + runs = runs.filter((run) => + run.processStatus === options.status || run.effectiveStatus === options.status, + ); + } + runs = runs.slice(-clampLimit(options.limit)).reverse(); + return { + runs, + ...(options.eventId ? { eventId: options.eventId } : {}), + raw: { runs }, + }; + } + + getRun(runId: string): FlowRunView { + const run = this.#runs.get(runId); + if (!run) { + throw new Error(`Unknown run: ${runId}`); + } + return run; + } + + listEvents(options: FlowListEventsOptions): FlowEventList { + let events = Array.from(this.#events.values()); + if (options.type) { + events = events.filter((event) => event.event.type === options.type); + } + events = events.slice(-clampLimit(options.limit)).reverse(); + const views = events.map((event) => this.eventView(event.event.id)); + return { + events: views, + raw: { events: views }, + }; + } + + getEvent(eventId: string): FlowEventView { + return this.eventView(eventId); + } + + rawEvent(eventId: string): FlowEvent { + const stored = this.#events.get(eventId); + if (!stored) { + throw new Error(`Unknown event: ${eventId}`); + } + return stored.event; + } + + eventView(eventId: string): FlowEventView { + const stored = this.#events.get(eventId); + if (!stored) { + throw new Error(`Unknown event: ${eventId}`); + } + const runs = stored.runIds.map((runId) => this.#runs.get(runId)).filter(isDefined); + return { + id: stored.event.id, + type: stored.event.type, + ...(stored.event.source ? { source: stored.event.source } : {}), + ...(stored.event.occurredAt ? { occurredAt: stored.event.occurredAt } : {}), + receivedAt: stored.event.receivedAt, + payload: stored.event.payload, + runIds: stored.runIds, + runs, + createdAt: stored.createdAt, + raw: { + kind: "local-event", + event: stored.event, + createdAt: stored.createdAt, + runIds: stored.runIds, + }, + }; + } + + snapshot(): LocalFlowStateSnapshot { + return { + events: Array.from(this.#events.values()), + runs: Array.from(this.#runs.values()), + }; + } +} + +class LocalFlowFileState extends LocalFlowMemoryState { + #statePath: string; + + constructor(dataDir: string) { + const statePath = path.join(dataDir, "state.json"); + super(readStateSnapshot(statePath)); + this.#statePath = statePath; + mkdirSync(path.dirname(this.#statePath), { recursive: true }); + } + + override recordDispatch(event: FlowEvent, createdAt: string, runs: FlowRunView[]): void { + super.recordDispatch(event, createdAt, runs); + this.#save(); + } + + override recordReplay(eventId: string, runs: FlowRunView[]): void { + super.recordReplay(eventId, runs); + this.#save(); + } + + #save(): void { + writeFileSync(this.#statePath, JSON.stringify(this.snapshot(), null, 2)); + } +} + +function dispatchResult(options: { + status: string; + event: FlowEvent; + runs: FlowRunView[]; + matched: number; + raw: unknown; +}): FlowDispatchResult { + const runIds = options.runs.map((run) => run.id); + return { + status: options.status, + eventId: options.event.id, + runIds, + matched: options.matched, + event: { + id: options.event.id, + type: options.event.type, + ...(options.event.source ? { source: options.event.source } : {}), + ...(options.event.occurredAt ? { occurredAt: options.event.occurredAt } : {}), + receivedAt: options.event.receivedAt, + payload: options.event.payload, + runIds, + runs: options.runs, + raw: { kind: "local-event", event: options.event, runIds }, + }, + runs: options.runs, + raw: options.raw, + }; +} + +function localRunView(options: { + runId: string; + event: FlowEvent; + flow: LoadedFlow; + step: FlowStep; + processStatus: FlowProcessStatus; + result?: FlowResult; + error?: string; + startedAt: string; + completedAt: string; +}): FlowRunView { + const resultStatus = resultStatusFrom(options.result); + const effectiveStatus: FlowEffectiveStatus = + resultStatus ?? options.processStatus; + const attempt = localAttemptView({ + runId: options.runId, + status: options.processStatus, + startedAt: options.startedAt, + completedAt: options.completedAt, + error: options.error, + }); + const output: FlowOutputView[] = []; + return { + id: options.runId, + eventId: options.event.id, + flowName: options.flow.manifest.name, + flowVersion: options.flow.manifest.version, + stepName: options.step.name, + runner: options.step.runner, + backend: "local", + processStatus: options.processStatus, + ...(resultStatus ? { resultStatus } : {}), + status: effectiveStatus, + effectiveStatus, + needsAttention: attentionStatuses.has(effectiveStatus), + attemptCount: 1, + attempts: [attempt], + output, + ...(options.result ? { resultPayload: options.result } : {}), + ...(options.error ? { error: options.error } : {}), + createdAt: options.startedAt, + startedAt: options.startedAt, + completedAt: options.completedAt, + updatedAt: options.completedAt, + raw: { + kind: "local-run", + event: options.event, + flowRoot: options.flow.root, + flowName: options.flow.manifest.name, + stepName: options.step.name, + result: options.result, + error: options.error, + }, + }; +} + +function localAttemptView(options: { + runId: string; + status: FlowProcessStatus; + startedAt: string; + completedAt: string; + error?: string; +}): FlowAttemptView { + return { + id: `${options.runId}:attempt:1`, + status: options.status, + attemptNumber: 1, + startedAt: options.startedAt, + completedAt: options.completedAt, + ...(options.error ? { error: options.error } : {}), + raw: { + kind: "local-attempt", + runId: options.runId, + }, + }; +} + +function normalizeFlowEvent(value: unknown): FlowEvent { + const record = isRecord(value) ? value : {}; + if (typeof record.id !== "string" || typeof record.type !== "string") { + throw new Error("FlowEvent requires string id and type"); + } + return { + ...record, + id: record.id, + type: record.type, + receivedAt: typeof record.receivedAt === "string" && record.receivedAt + ? record.receivedAt + : new Date().toISOString(), + payload: "payload" in record ? record.payload : {}, + } as FlowEvent; +} + +function localState( + state: LocalFlowClientOptions["state"], + cwd: string, +): LocalFlowMemoryState | undefined { + if (state === false) { + return undefined; + } + if (typeof state === "object") { + return new LocalFlowFileState(state.dataDir ?? path.join(cwd, ".codex", "flow-client")); + } + return new LocalFlowMemoryState(); +} + +function readStateSnapshot(statePath: string): LocalFlowStateSnapshot | undefined { + try { + const parsed = JSON.parse(readFileSync(statePath, "utf8")) as unknown; + if (!isRecord(parsed) || !Array.isArray(parsed.events) || !Array.isArray(parsed.runs)) { + return undefined; + } + return { + events: parsed.events.filter(isStoredEvent), + runs: parsed.runs.filter(isFlowRunView), + }; + } catch (error) { + if (isErrno(error, "ENOENT")) { + return undefined; + } + throw error; + } +} + +function resultStatusFrom(result: FlowResult | undefined): FlowResultStatus | undefined { + return result && resultStatuses.has(result.status) ? result.status : undefined; +} + +function localRunId( + eventId: string, + flowName: string, + stepName: string, + replayNonce?: string, +): string { + const hash = createHash("sha256") + .update(`${eventId}\0${flowName}\0${stepName}${replayNonce ? `\0${replayNonce}` : ""}`) + .digest("hex") + .slice(0, 12); + return replayNonce ? `run_${hash}_replay` : `run_${hash}`; +} + +function ensureSynchronousLocalDispatch( + options: FlowDispatchOptions | FlowReplayOptions, +): void { + if (options.wait === false) { + throw new Error("Local flow dispatch does not support wait: false without a worker loop"); + } +} + +function clampLimit(value: number | undefined): number { + if (!value || !Number.isFinite(value)) { + return 50; + } + return Math.max(1, Math.min(500, Math.trunc(value))); +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function isStoredEvent(value: unknown): value is StoredEvent { + if (!isRecord(value) || !isRecord(value.event)) { + return false; + } + return typeof value.event.id === "string" && + typeof value.event.type === "string" && + typeof value.event.receivedAt === "string" && + Array.isArray(value.runIds) && + value.runIds.every((runId) => typeof runId === "string") && + typeof value.createdAt === "string"; +} + +function isFlowRunView(value: unknown): value is FlowRunView { + return isRecord(value) && + typeof value.id === "string" && + typeof value.status === "string" && + typeof value.effectiveStatus === "string" && + typeof value.needsAttention === "boolean" && + typeof value.attemptCount === "number" && + Array.isArray(value.attempts) && + Array.isArray(value.output) && + "raw" in value; +} + +function isErrno(error: unknown, code: string): boolean { + return isRecord(error) && error.code === code; +} + +function isDefined(value: T | undefined): value is T { + return value !== undefined; +} diff --git a/packages/flow-runtime/test/client.test.ts b/packages/flow-runtime/test/client.test.ts new file mode 100644 index 0000000..8fdea9f --- /dev/null +++ b/packages/flow-runtime/test/client.test.ts @@ -0,0 +1,48 @@ +import { expect, test } from "bun:test"; +import { createFlowClient } from "../src/client.ts"; + +test("client factory creates an HTTP backend client with auth handling", async () => { + const requests: Request[] = []; + const client = createFlowClient({ + mode: "http", + baseUrl: "https://flow.example", + hmacSecret: "secret", + fetch: async (request, init) => { + const normalized = request instanceof Request + ? request + : new Request(String(request), init); + requests.push(normalized); + return Response.json({ + status: "accepted", + eventId: "event-1", + runIds: ["run-1"], + matched: 1, + }); + }, + }); + + const result = await client.dispatchEvent({ + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: {}, + }); + + expect(result).toMatchObject({ + status: "accepted", + eventId: "event-1", + runIds: ["run-1"], + matched: 1, + }); + expect(requests[0]?.headers.get("x-flow-signature-256")).toStartWith("sha256="); +}); + +test("client factory creates a local client", async () => { + const client = createFlowClient({ + mode: "local", + cwd: process.cwd(), + state: false, + }); + + await expect(client.listEvents()).rejects.toThrow("requires local state"); +}); diff --git a/packages/flow-runtime/test/local-client.test.ts b/packages/flow-runtime/test/local-client.test.ts new file mode 100644 index 0000000..961ee9e --- /dev/null +++ b/packages/flow-runtime/test/local-client.test.ts @@ -0,0 +1,270 @@ +import { expect, test } from "bun:test"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { createLocalFlowClient } from "../src/local-client.ts"; +import type { FlowEvent } from "../src/index.ts"; + +test("local client dispatches matching steps and returns normalized views", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")); + try { + await writeFlow(directory, "flows/demo", "source"); + await writeFlow(directory, ".codex/flows/demo", "installed"); + const client = createLocalFlowClient({ cwd: directory, env: {} }); + const event: FlowEvent = { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: { name: "Ada" }, + }; + + const result = await client.dispatchEvent(event); + + expect(result).toMatchObject({ + status: "accepted", + eventId: "event-1", + matched: 1, + runs: [ + { + eventId: "event-1", + flowName: "demo", + stepName: "hello", + backend: "local", + processStatus: "completed", + resultStatus: "completed", + effectiveStatus: "completed", + needsAttention: false, + resultPayload: { + status: "completed", + message: "installed Ada", + }, + }, + ], + }); + const run = result.runs[0]; + if (!run) { + throw new Error("expected one local run"); + } + expect(result.runIds).toEqual([run.id]); + + const eventView = await client.getEvent("event-1"); + expect(eventView).toMatchObject({ + id: "event-1", + type: "demo.event", + runIds: result.runIds, + runs: [{ id: result.runIds[0] }], + }); + + const runs = await client.listRuns({ eventId: "event-1" }); + expect(runs.runs.map((run) => run.id)).toEqual(result.runIds); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("local memory state dedupes normal dispatch and replays new attempts", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")); + try { + await writeFlow(directory, "flows/demo", "demo"); + const client = createLocalFlowClient({ cwd: directory, env: {} }); + const event: FlowEvent = { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: { name: "Ada" }, + }; + + const first = await client.dispatchEvent(event); + const duplicate = await client.dispatchEvent(event); + expect(duplicate).toMatchObject({ + status: "duplicate", + eventId: "event-1", + matched: 0, + idempotent: true, + runIds: first.runIds, + }); + + const replay = await client.replayEvent("event-1"); + expect(replay.status).toBe("accepted"); + expect(replay.runIds).toHaveLength(1); + expect(replay.runIds[0]).not.toBe(first.runIds[0]); + expect(replay.runIds[0]).toEndWith("_replay"); + + const eventView = await client.getEvent("event-1"); + expect(eventView.runIds).toEqual([...first.runIds, ...replay.runIds]); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("local file state persists events and runs across client instances", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")); + try { + await writeFlow(directory, "flows/demo", "demo"); + const dataDir = path.join(directory, ".codex", "flow-client"); + const event: FlowEvent = { + id: "event-1", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: { name: "Ada" }, + }; + + const firstClient = createLocalFlowClient({ + cwd: directory, + env: {}, + state: { kind: "file", dataDir }, + }); + const first = await firstClient.dispatchEvent(event); + + const secondClient = createLocalFlowClient({ + cwd: directory, + env: {}, + state: { kind: "file", dataDir }, + }); + const eventView = await secondClient.getEvent("event-1"); + expect(eventView.runIds).toEqual(first.runIds); + + const duplicate = await secondClient.dispatchEvent(event); + expect(duplicate).toMatchObject({ + status: "duplicate", + idempotent: true, + runIds: first.runIds, + }); + + const replay = await secondClient.replayEvent("event-1"); + expect(replay.runIds[0]).toEndWith("_replay"); + + const thirdClient = createLocalFlowClient({ + cwd: directory, + env: {}, + state: { kind: "file", dataDir }, + }); + expect((await thirdClient.getEvent("event-1")).runIds).toEqual([ + ...first.runIds, + ...replay.runIds, + ]); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("local client marks semantic attention statuses from FLOW_RESULT", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")); + try { + await writeFlow(directory, "flows/demo", "demo"); + const client = createLocalFlowClient({ cwd: directory, env: {} }); + + const result = await client.dispatchEvent({ + id: "event-blocked", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: { name: "Ada", status: "blocked" }, + }); + + expect(result.runs[0]).toMatchObject({ + processStatus: "completed", + resultStatus: "blocked", + effectiveStatus: "blocked", + needsAttention: true, + }); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("local client keeps Code Mode flow steps gated", async () => { + const directory = await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")); + try { + await writeFlow(directory, "flows/demo", "demo", "code-mode"); + const client = createLocalFlowClient({ cwd: directory, env: {} }); + + const result = await client.dispatchEvent({ + id: "event-code-mode", + type: "demo.event", + receivedAt: "2026-05-15T00:00:00.000Z", + payload: { name: "Ada" }, + }); + + expect(result.runs[0]).toMatchObject({ + processStatus: "failed", + effectiveStatus: "failed", + needsAttention: false, + }); + expect(result.runs[0]?.error).toContain("requires CODEX_FLOWS_ENABLE_CODE_MODE=1"); + } finally { + await rm(directory, { recursive: true, force: true }); + } +}); + +test("local client reports unsupported operations when state is disabled", async () => { + const client = createLocalFlowClient({ + cwd: await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")), + state: false, + }); + + await expect(client.listEvents()).rejects.toThrow("requires local state"); + await expect(client.getRun("missing")).rejects.toThrow("requires local state"); + await expect(client.replayEvent("missing")).rejects.toThrow("requires local state"); +}); + +test("local memory state rejects unknown events and runs", async () => { + const client = createLocalFlowClient({ + cwd: await mkdtemp(path.join(os.tmpdir(), "flow-local-client-")), + }); + + await expect(client.getEvent("missing")).rejects.toThrow("Unknown event"); + await expect(client.getRun("missing")).rejects.toThrow("Unknown run"); + await expect(client.replayEvent("missing")).rejects.toThrow("Unknown event"); +}); + +async function writeFlow( + root: string, + relative: string, + label: string, + runner: "bun" | "code-mode" = "bun", +): Promise { + const flowRoot = path.join(root, relative); + await mkdir(path.join(flowRoot, "exec"), { recursive: true }); + await mkdir(path.join(flowRoot, "schemas"), { recursive: true }); + await Bun.write( + path.join(flowRoot, "flow.toml"), + [ + 'name = "demo"', + "version = 1", + `description = ${JSON.stringify(label)}`, + "", + "[[steps]]", + 'name = "hello"', + `runner = "${runner}"`, + 'script = "exec/hello.ts"', + "timeout_ms = 30000", + "", + "[steps.trigger]", + 'type = "demo.event"', + 'schema = "schemas/demo-event.schema.json"', + "", + ].join("\n"), + ); + await Bun.write( + path.join(flowRoot, "schemas/demo-event.schema.json"), + JSON.stringify({ + type: "object", + required: ["name"], + properties: { + name: { type: "string" }, + status: { enum: ["completed", "changed", "blocked", "needs_intervention", "failed"] }, + }, + }), + ); + await Bun.write( + path.join(flowRoot, "exec/hello.ts"), + [ + "const context = JSON.parse(await Bun.stdin.text());", + "const payload = context.flow.event.payload;", + `const label = ${JSON.stringify(label)};`, + "const status = payload.status ?? 'completed';", + "console.log(`FLOW_RESULT ${JSON.stringify({ status, message: `${label} ${payload.name}` })}`);", + "", + ].join("\n"), + ); +}