feat: add patch moi maintenance cli
Some checks failed
check / check (push) Failing after 11s

This commit is contained in:
matamune 2026-05-16 20:12:50 +00:00
parent 39e843fb29
commit e7a85c5bcf
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
13 changed files with 1040 additions and 40 deletions

View file

@ -3,10 +3,14 @@
"version": "0.1.0",
"private": true,
"type": "module",
"bin": {
"patch.moi": "./src/cli.ts"
},
"scripts": {
"build": "tsc --noEmit",
"dev": "bun --watch src/server.ts",
"harness:flow": "bun src/run-harness-flow.ts",
"patch.moi": "bun src/cli.ts",
"start": "bun src/server.ts",
"test": "bun test test/*.test.ts",
"check:types": "tsc --noEmit",

636
apps/patch/src/cli.ts Normal file
View file

@ -0,0 +1,636 @@
#!/usr/bin/env bun
import { existsSync } from "node:fs";
import { readFile } from "node:fs/promises";
import path from "node:path";
import { discoverFlows, matchingSteps, type FlowEvent as RuntimeFlowEvent } from "@peezy.tech/flow-runtime";
import {
dispatchWorkspaceEventDetailed,
maintenanceAttemptForWorkspaceDispatch,
patchUpstreamReleaseEvent,
replayWorkspaceEventDetailed,
type WorkspaceDispatchConfig,
} from "./flow";
import { syncMaintenanceAttempt } from "./maintenance";
import { EventStore } from "./queue";
import type { FlowDispatchRecord, FlowEvent, MaintenanceAttemptRecord } from "./types";
import type { WorkspaceBackendFetch } from "./workspace-backend";
type CliOptions = {
cwd?: string;
env?: Record<string, string | undefined>;
stdout?: (text: string) => void;
stderr?: (text: string) => void;
fetchImpl?: WorkspaceBackendFetch;
};
type ParsedArgs = {
positionals: string[];
flags: Map<string, string[]>;
};
class UsageError extends Error {
readonly exitCode: number;
constructor(message: string, exitCode = 2) {
super(message);
this.exitCode = exitCode;
}
}
const usage = `patch.moi CLI
Usage:
patch.moi status [--data-dir DIR] [--json]
patch.moi events [--type TYPE] [--limit N] [--data-dir DIR] [--json]
patch.moi dispatches [--event-id ID] [--status STATUS] [--limit N] [--data-dir DIR] [--json]
patch.moi attempts [--event-id ID] [--status STATUS] [--limit N] [--data-dir DIR] [--json]
patch.moi run harness [--event FILE] [--workspace-root DIR] [--data-dir DIR] [--dry-run] [--json]
patch.moi run codex-release --tag TAG [--repo openai/codex] [--workspace-root DIR] [--data-dir DIR] [--dry-run] [--record-only] [--allow-local] [--json]
patch.moi run event --file FILE [--workspace-root DIR] [--data-dir DIR] [--dry-run] [--record-only] [--json]
patch.moi retry EVENT_ID [--workspace-root DIR] [--data-dir DIR] [--json]
patch.moi replay EVENT_ID [--workspace-root DIR] [--data-dir DIR] [--json]
patch.moi sync ATTEMPT_ID [--workspace-root DIR] [--data-dir DIR] [--json]
patch.moi setup codex [--repo DIR] [--upstream-url URL] [--target-branch BRANCH] [--apply] [--json]
`;
export async function runCli(args = Bun.argv.slice(2), options: CliOptions = {}): Promise<number> {
const parsed = parseArgs(args);
const env = options.env ?? process.env;
const cwd = options.cwd ?? process.cwd();
const out = options.stdout ?? ((text) => process.stdout.write(text));
const err = options.stderr ?? ((text) => process.stderr.write(text));
try {
if (parsed.positionals.length === 0 || flagBool(parsed, "help") || flagBool(parsed, "h")) {
out(usage);
return 0;
}
const command = parsed.positionals[0];
const context = cliContext(parsed, cwd, env, out, options.fetchImpl);
switch (command) {
case "status":
return await handleStatus(context);
case "events":
return await handleEvents(context);
case "dispatches":
return await handleDispatches(context);
case "attempts":
return await handleAttempts(context);
case "run":
return await handleRun(parsed.positionals.slice(1), context);
case "retry":
return await handleDispatchOperation("retry", parsed.positionals[1], context);
case "replay":
return await handleDispatchOperation("replay", parsed.positionals[1], context);
case "sync":
return await handleSync(parsed.positionals[1], context);
case "setup":
return await handleSetup(parsed.positionals.slice(1), context);
default:
throw new UsageError(`unknown command: ${command}`);
}
} catch (error) {
if (error instanceof UsageError) {
err(`error: ${error.message}\n`);
return error.exitCode;
}
err(`error: ${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}
}
type CliContext = {
parsed: ParsedArgs;
cwd: string;
env: Record<string, string | undefined>;
dataDir: string;
workspaceRoot: string;
store: EventStore;
json: boolean;
stdout: (text: string) => void;
fetchImpl?: WorkspaceBackendFetch;
};
function cliContext(
parsed: ParsedArgs,
cwd: string,
env: Record<string, string | undefined>,
stdout: (text: string) => void,
fetchImpl?: WorkspaceBackendFetch,
): CliContext {
const workspaceRoot = resolvePath(cwd, flagValue(parsed, "workspace-root") ?? findWorkspaceRoot(cwd));
const dataDir = resolvePath(workspaceRoot, flagValue(parsed, "data-dir") ?? env.DATA_DIR ?? "./data");
return {
parsed,
cwd,
env,
dataDir,
workspaceRoot,
store: new EventStore(dataDir),
json: flagBool(parsed, "json"),
stdout,
fetchImpl,
};
}
async function handleStatus(context: CliContext): Promise<number> {
const limit = numberFlag(context.parsed, "limit", 20);
const [events, dispatches, attempts] = await Promise.all([
context.store.listFlowEvents({ limit }),
context.store.listWorkspaceDispatches({ limit }),
context.store.listMaintenanceAttempts({ limit }),
]);
const payload = {
dataDir: context.dataDir,
latest: {
events,
dispatches,
attempts,
},
attemptStatusCounts: countBy(attempts, (attempt) => attempt.status),
dispatchStatusCounts: countBy(dispatches, (record) => record.status),
};
if (context.json) {
writeJson(context, payload);
return 0;
}
writeLine(context, `data dir: ${context.dataDir}`);
writeLine(context, `events shown: ${events.length}`);
writeLine(context, `dispatches shown: ${dispatches.length}`);
writeLine(context, `attempts shown: ${attempts.length}`);
writeLine(context, `attempt statuses: ${formatCounts(payload.attemptStatusCounts)}`);
const latestAttempt = attempts[0];
if (latestAttempt) {
writeLine(context, `latest attempt: ${latestAttempt.status} ${latestAttempt.id}`);
}
return 0;
}
async function handleEvents(context: CliContext): Promise<number> {
const events = await context.store.listFlowEvents({
type: flagValue(context.parsed, "type"),
limit: numberFlag(context.parsed, "limit", 50),
});
if (context.json) {
writeJson(context, { events });
} else {
for (const event of events) {
writeLine(context, `${event.receivedAt} ${event.type} ${event.id}`);
}
}
return 0;
}
async function handleDispatches(context: CliContext): Promise<number> {
const dispatches = await context.store.listWorkspaceDispatches({
eventId: flagValue(context.parsed, "event-id"),
status: dispatchStatus(flagValue(context.parsed, "status")),
limit: numberFlag(context.parsed, "limit", 50),
});
if (context.json) {
writeJson(context, { dispatches });
} else {
for (const record of dispatches) {
writeLine(context, `${record.createdAt} ${record.status} ${record.operation ?? "dispatch"} ${record.eventId}`);
}
}
return 0;
}
async function handleAttempts(context: CliContext): Promise<number> {
const attempts = await context.store.listMaintenanceAttempts({
eventId: flagValue(context.parsed, "event-id"),
status: maintenanceStatus(flagValue(context.parsed, "status")),
limit: numberFlag(context.parsed, "limit", 50),
});
if (context.json) {
writeJson(context, { attempts });
} else {
for (const attempt of attempts) {
writeLine(context, `${attempt.updatedAt} ${attempt.status} ${attempt.operation} ${attempt.id}`);
}
}
return 0;
}
async function handleRun(positionals: string[], context: CliContext): Promise<number> {
const target = positionals[0];
if (!target) {
throw new UsageError("run requires harness, codex-release, or event");
}
if (target === "harness") {
const eventFile = flagValue(context.parsed, "event") ??
path.join(context.workspaceRoot, "flows/patch-moi-harness/fixtures/upstream-release-v0.1.3.json");
const event = await readFlowEvent(eventFile, context.workspaceRoot);
return await runEvent(event, context);
}
if (target === "codex-release") {
const tag = flagValue(context.parsed, "tag");
if (!tag) {
throw new UsageError("run codex-release requires --tag");
}
const repo = flagValue(context.parsed, "repo") ?? "openai/codex";
const event = patchUpstreamReleaseEvent({ repo, tag });
if (!flagBool(context.parsed, "dry-run") && !flagBool(context.parsed, "record-only")) {
assertCodexDispatchAllowed(context);
}
return await runEvent(event, context);
}
if (target === "event") {
const eventFile = flagValue(context.parsed, "file");
if (!eventFile) {
throw new UsageError("run event requires --file");
}
return await runEvent(await readFlowEvent(eventFile, context.workspaceRoot), context);
}
throw new UsageError(`unknown run target: ${target}`);
}
async function runEvent(event: FlowEvent, context: CliContext): Promise<number> {
if (flagBool(context.parsed, "dry-run")) {
const matches = await matchingSteps(
await discoverFlows({ cwd: context.workspaceRoot }),
event as RuntimeFlowEvent<Record<string, unknown>>,
);
const payload = {
event,
matches: matches.map(({ flow, step }) => ({
flow: flow.manifest.name,
step: step.name,
runner: step.runner,
})),
};
if (context.json) {
writeJson(context, payload);
} else {
writeLine(context, `${event.id} matches ${payload.matches.length} step(s)`);
for (const match of payload.matches) {
writeLine(context, `- ${match.flow}/${match.step} (${match.runner})`);
}
}
return 0;
}
const recorded = await appendFlowEventIfMissing(context.store, event);
if (flagBool(context.parsed, "record-only")) {
writeOutput(context, { event, recorded });
return 0;
}
const { record, result } = await dispatchWorkspaceEventDetailed(event, {}, workspaceConfig(context));
await context.store.appendWorkspaceDispatch(record);
const attempt = maintenanceAttemptForWorkspaceDispatch(event, record, result?.runs);
await context.store.appendMaintenanceAttempt(attempt);
writeOutput(context, { event, recorded, record, attempt });
return record.status === "failed" ? 1 : 0;
}
async function handleDispatchOperation(
operation: "retry" | "replay",
eventId: string | undefined,
context: CliContext,
): Promise<number> {
if (!eventId) {
throw new UsageError(`${operation} requires EVENT_ID`);
}
const event = await context.store.getFlowEvent(eventId);
if (!event) {
throw new UsageError(`flow event not found: ${eventId}`, 1);
}
const outcome = operation === "retry"
? await dispatchWorkspaceEventDetailed(event, {}, workspaceConfig(context))
: await replayWorkspaceEventDetailed(event, {}, workspaceConfig(context));
await context.store.appendWorkspaceDispatch(outcome.record);
const attempt = maintenanceAttemptForWorkspaceDispatch(event, outcome.record, outcome.result?.runs);
await context.store.appendMaintenanceAttempt(attempt);
writeOutput(context, { event, record: outcome.record, attempt });
return outcome.record.status === "failed" ? 1 : 0;
}
async function handleSync(attemptId: string | undefined, context: CliContext): Promise<number> {
if (!attemptId) {
throw new UsageError("sync requires ATTEMPT_ID");
}
const attempt = await context.store.getMaintenanceAttempt(attemptId);
if (!attempt) {
throw new UsageError(`maintenance attempt not found: ${attemptId}`, 1);
}
const next = await syncMaintenanceAttempt(context.store, attempt, workspaceConfig(context));
writeOutput(context, { attempt: next });
return 0;
}
async function handleSetup(positionals: string[], context: CliContext): Promise<number> {
const target = positionals[0];
if (target !== "codex") {
throw new UsageError("setup requires codex");
}
const repoPath = resolvePath(context.workspaceRoot, flagValue(context.parsed, "repo") ?? context.env.PEEZY_CODEX_REPO ?? "../codex");
const upstreamRemote = flagValue(context.parsed, "upstream-remote") ?? "upstream";
const upstreamUrl = flagValue(context.parsed, "upstream-url") ?? "https://github.com/openai/codex.git";
const targetBranch = flagValue(context.parsed, "target-branch") ?? "code-mode-exec-hooks";
const apply = flagBool(context.parsed, "apply");
const repo = await inspectGitRepo(repoPath, {
upstreamRemote,
upstreamUrl,
targetBranch,
apply,
});
if (context.json) {
writeJson(context, repo);
return 0;
}
writeLine(context, `repo: ${repo.path}`);
writeLine(context, `branch: ${repo.branch ?? "unknown"}${repo.branchMatchesTarget ? "" : ` (expected ${targetBranch})`}`);
writeLine(context, `origin: ${repo.origin ?? "missing"}`);
writeLine(context, `${upstreamRemote}: ${repo.upstream ?? "missing"}`);
writeLine(context, `worktree: ${repo.clean ? "clean" : "dirty"}`);
if (repo.addedUpstream) {
writeLine(context, `added ${upstreamRemote}: ${upstreamUrl}`);
}
writeLine(context, `ready: ${repo.ready ? "yes" : "no"}`);
for (const issue of repo.issues) {
writeLine(context, `- ${issue}`);
}
return 0;
}
type CodexSetupReport = {
path: string;
branch?: string;
branchMatchesTarget: boolean;
origin?: string;
upstream?: string;
upstreamRemote: string;
upstreamUrl: string;
addedUpstream: boolean;
clean: boolean;
ready: boolean;
issues: string[];
};
async function inspectGitRepo(
repoPath: string,
options: {
upstreamRemote: string;
upstreamUrl: string;
targetBranch: string;
apply: boolean;
},
): Promise<CodexSetupReport> {
await git(repoPath, ["rev-parse", "--is-inside-work-tree"]);
const [branchResult, originResult, upstreamResult, statusResult] = await Promise.all([
git(repoPath, ["symbolic-ref", "--short", "HEAD"]),
git(repoPath, ["remote", "get-url", "origin"], { allowFailure: true }),
git(repoPath, ["remote", "get-url", options.upstreamRemote], { allowFailure: true }),
git(repoPath, ["status", "--porcelain=v1"]),
]);
let upstream = upstreamResult.code === 0 ? upstreamResult.stdout.trim() : undefined;
let addedUpstream = false;
if (!upstream && options.apply) {
await git(repoPath, ["remote", "add", options.upstreamRemote, options.upstreamUrl]);
upstream = options.upstreamUrl;
addedUpstream = true;
}
const branch = branchResult.stdout.trim();
const origin = originResult.code === 0 ? originResult.stdout.trim() : undefined;
const clean = statusResult.stdout.trim().length === 0;
const issues = [
...(origin ? [] : ["missing origin remote"]),
...(upstream ? [] : [`missing ${options.upstreamRemote} remote; rerun with --apply to add ${options.upstreamUrl}`]),
...(upstream && upstream !== options.upstreamUrl ? [`${options.upstreamRemote} points to ${upstream}, expected ${options.upstreamUrl}`] : []),
...(branch === options.targetBranch ? [] : [`current branch is ${branch}, expected ${options.targetBranch}`]),
...(clean ? [] : ["working tree has local changes or untracked files"]),
];
return {
path: repoPath,
branch,
branchMatchesTarget: branch === options.targetBranch,
origin,
upstream,
upstreamRemote: options.upstreamRemote,
upstreamUrl: options.upstreamUrl,
addedUpstream,
clean,
ready: issues.length === 0,
issues,
};
}
async function git(
cwd: string,
args: string[],
options: { allowFailure?: boolean } = {},
): Promise<{ code: number; stdout: string; stderr: string }> {
const proc = Bun.spawn({
cmd: ["git", ...args],
cwd,
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, code] = await Promise.all([
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
proc.exited,
]);
if (code !== 0 && !options.allowFailure) {
throw new Error(`git ${args.join(" ")} failed in ${cwd}: ${stderr.trim() || stdout.trim() || `exit ${code}`}`);
}
return { code, stdout, stderr };
}
async function readFlowEvent(filePath: string, cwd: string): Promise<FlowEvent> {
const raw = JSON.parse(await readFile(resolvePath(cwd, filePath), "utf8")) as FlowEvent;
if (!raw || typeof raw !== "object" || typeof raw.id !== "string" || typeof raw.type !== "string") {
throw new UsageError(`invalid flow event file: ${filePath}`);
}
return raw;
}
async function appendFlowEventIfMissing(store: EventStore, event: FlowEvent): Promise<boolean> {
if (await store.getFlowEvent(event.id)) {
return false;
}
await store.appendFlowEvent(event);
return true;
}
function assertCodexDispatchAllowed(context: CliContext): void {
if (flagBool(context.parsed, "allow-local") || workspaceBackendConfigured(context.env)) {
return;
}
throw new UsageError(
"codex-release dispatch requires PATCH_WORKSPACE_BACKEND_URL or --allow-local; use --dry-run to verify matching without executing release work",
);
}
function workspaceBackendConfigured(env: Record<string, string | undefined>): boolean {
return Boolean(
env.PATCH_WORKSPACE_BACKEND_URL?.trim() ||
env.PATCH_FLOW_BACKEND_URL?.trim() ||
env.PATCH_FLOW_DISPATCH_URL?.trim(),
);
}
function workspaceConfig(context: CliContext): WorkspaceDispatchConfig {
return {
env: context.env,
cwd: context.workspaceRoot,
...(context.fetchImpl ? { fetchImpl: context.fetchImpl } : {}),
};
}
function writeOutput(context: CliContext, payload: {
event?: FlowEvent;
recorded?: boolean;
record?: FlowDispatchRecord;
attempt?: MaintenanceAttemptRecord;
}): void {
if (context.json) {
writeJson(context, payload);
return;
}
if (payload.event) {
writeLine(context, `event: ${payload.event.id}${payload.recorded === false ? " (already recorded)" : ""}`);
}
if (payload.record) {
writeLine(context, `dispatch: ${payload.record.status} ${payload.record.operation ?? "dispatch"} ${payload.record.transport ?? "unknown"}`);
if (payload.record.error) {
writeLine(context, `error: ${payload.record.error}`);
}
}
if (payload.attempt) {
writeLine(context, `attempt: ${payload.attempt.status} ${payload.attempt.id}`);
}
}
function writeJson(context: CliContext, value: unknown): void {
writeLine(context, JSON.stringify(value, null, 2));
}
function writeLine(context: CliContext, value: string): void {
context.stdout(`${value}\n`);
}
function parseArgs(args: string[]): ParsedArgs {
const positionals: string[] = [];
const flags = new Map<string, string[]>();
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (arg === "--") {
positionals.push(...args.slice(index + 1));
break;
}
if (arg.startsWith("--")) {
const [rawName, inlineValue] = arg.slice(2).split(/=(.*)/s, 2);
if (!rawName) {
continue;
}
const values = flags.get(rawName) ?? [];
if (inlineValue !== undefined) {
values.push(inlineValue);
} else if (args[index + 1] && !args[index + 1].startsWith("-")) {
values.push(args[index + 1]);
index += 1;
} else {
values.push("true");
}
flags.set(rawName, values);
continue;
}
if (arg.startsWith("-") && arg.length > 1) {
flags.set(arg.slice(1), ["true"]);
continue;
}
positionals.push(arg);
}
return { positionals, flags };
}
function flagValue(parsed: ParsedArgs, name: string): string | undefined {
const values = parsed.flags.get(name);
const value = values?.at(-1);
return value === "true" ? undefined : value;
}
function flagBool(parsed: ParsedArgs, name: string): boolean {
const value = parsed.flags.get(name)?.at(-1);
return value === "true" || value === "1" || value === "yes";
}
function numberFlag(parsed: ParsedArgs, name: string, fallback: number): number {
const value = flagValue(parsed, name);
if (!value) {
return fallback;
}
const parsedValue = Number(value);
if (!Number.isFinite(parsedValue)) {
throw new UsageError(`--${name} must be a number`);
}
return parsedValue;
}
function resolvePath(cwd: string, value: string): string {
return path.isAbsolute(value) ? value : path.resolve(cwd, value);
}
function findWorkspaceRoot(cwd: string): string {
let current = cwd;
while (true) {
if (
existsSync(path.join(current, ".codex/workspace.toml")) ||
existsSync(path.join(current, "flows/patch-moi-harness/flow.toml"))
) {
return current;
}
const parent = path.dirname(current);
if (parent === current) {
return cwd;
}
current = parent;
}
}
function dispatchStatus(value: string | undefined): FlowDispatchRecord["status"] | undefined {
if (!value) return undefined;
if (value === "dispatched" || value === "failed" || value === "skipped") return value;
throw new UsageError("--status must be dispatched, failed, or skipped");
}
function maintenanceStatus(value: string | undefined): MaintenanceAttemptRecord["status"] | undefined {
if (!value) return undefined;
if (
value === "started" ||
value === "completed" ||
value === "changed" ||
value === "needs_intervention" ||
value === "blocked" ||
value === "failed" ||
value === "skipped"
) return value;
throw new UsageError("--status is not a valid maintenance attempt status");
}
function countBy<T>(items: T[], key: (item: T) => string): Record<string, number> {
const counts: Record<string, number> = {};
for (const item of items) {
const value = key(item);
counts[value] = (counts[value] ?? 0) + 1;
}
return counts;
}
function formatCounts(counts: Record<string, number>): string {
const entries = Object.entries(counts);
return entries.length > 0 ? entries.map(([key, value]) => `${key}=${value}`).join(" ") : "none";
}
if (import.meta.main) {
process.exit(await runCli());
}

View file

@ -0,0 +1,46 @@
import {
getWorkspaceRun,
listWorkspaceRuns,
maintenanceAttemptWithWorkspaceRuns,
type WorkspaceDispatchConfig,
} from "./flow";
import { EventStore } from "./queue";
import type { MaintenanceAttemptRecord } from "./types";
export async function syncMaintenanceAttempt(
store: EventStore,
attempt: MaintenanceAttemptRecord,
config: WorkspaceDispatchConfig = {},
): Promise<MaintenanceAttemptRecord> {
const runs = attempt.workspaceRunIds.length > 0
? await Promise.all(attempt.workspaceRunIds.map((runId) => getWorkspaceRun(runId, config)))
: (await listWorkspaceRuns(config, { eventId: attempt.eventId })).runs;
const next = maintenanceAttemptWithWorkspaceRuns(attempt, runs);
if (maintenanceAttemptChanged(attempt, next)) {
await store.appendMaintenanceAttempt(next);
}
return next;
}
export function maintenanceAttemptChanged(
before: MaintenanceAttemptRecord,
after: MaintenanceAttemptRecord,
): boolean {
return JSON.stringify({
status: before.status,
workspaceRunIds: before.workspaceRunIds,
workspaceRunStatuses: before.workspaceRunStatuses,
candidateRefs: before.candidateRefs,
message: before.message,
error: before.error,
completedAt: before.completedAt,
}) !== JSON.stringify({
status: after.status,
workspaceRunIds: after.workspaceRunIds,
workspaceRunStatuses: after.workspaceRunStatuses,
candidateRefs: after.candidateRefs,
message: after.message,
error: after.error,
completedAt: after.completedAt,
});
}

View file

@ -7,12 +7,11 @@ import {
listWorkspaceEvents,
listWorkspaceRuns,
maintenanceAttemptForWorkspaceDispatch,
maintenanceAttemptWithWorkspaceRuns,
replayWorkspaceEventDetailed,
} from "./flow";
import { jsonResponse, methodNotAllowed, textResponse } from "./http";
import { syncMaintenanceAttempt } from "./maintenance";
import { EventStore } from "./queue";
import type { MaintenanceAttemptRecord } from "./types";
export type ServerConfig = {
dataDir: string;
@ -147,43 +146,6 @@ async function handleMaintenanceAttempts(request: Request, config: ServerConfig,
return methodNotAllowed();
}
async function syncMaintenanceAttempt(
store: EventStore,
attempt: MaintenanceAttemptRecord,
): Promise<MaintenanceAttemptRecord> {
const runs = attempt.workspaceRunIds.length > 0
? await Promise.all(attempt.workspaceRunIds.map((runId) => getWorkspaceRun(runId, { env: process.env })))
: (await listWorkspaceRuns({ env: process.env }, { eventId: attempt.eventId })).runs;
const next = maintenanceAttemptWithWorkspaceRuns(attempt, runs);
if (maintenanceAttemptChanged(attempt, next)) {
await store.appendMaintenanceAttempt(next);
}
return next;
}
function maintenanceAttemptChanged(
before: MaintenanceAttemptRecord,
after: MaintenanceAttemptRecord,
): boolean {
return JSON.stringify({
status: before.status,
workspaceRunIds: before.workspaceRunIds,
workspaceRunStatuses: before.workspaceRunStatuses,
candidateRefs: before.candidateRefs,
message: before.message,
error: before.error,
completedAt: before.completedAt,
}) !== JSON.stringify({
status: after.status,
workspaceRunIds: after.workspaceRunIds,
workspaceRunStatuses: after.workspaceRunStatuses,
candidateRefs: after.candidateRefs,
message: after.message,
error: after.error,
completedAt: after.completedAt,
});
}
async function handleWorkspaceRuns(request: Request, config: ServerConfig): Promise<Response> {
const unauthorized = requireAdmin(request, config);
if (unauthorized) return unauthorized;

218
apps/patch/test/cli.test.ts Normal file
View file

@ -0,0 +1,218 @@
import { mkdir, mkdtemp } from "node:fs/promises";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { describe, expect, test } from "bun:test";
import { runCli } from "../src/cli";
import { EventStore } from "../src/queue";
const workspaceRoot = join(import.meta.dir, "../../..");
describe("patch.moi CLI", () => {
test("dispatches the harness event and records patch.moi state", async () => {
const dataDir = await mkdtemp(join(tmpdir(), "patch-cli-"));
const calls: Array<{ url: string; body: string }> = [];
const result = await invoke([
"run",
"harness",
"--workspace-root",
workspaceRoot,
"--data-dir",
dataDir,
"--json",
], {
env: {
PATCH_WORKSPACE_BACKEND_URL: "https://workspace.example",
},
fetchImpl: async (url, init) => {
calls.push({ url, body: String(init.body ?? "") });
const eventId = JSON.parse(String(init.body ?? "{}")).id;
return Response.json({
status: "accepted",
eventId,
runIds: ["run-harness"],
matched: 1,
}, { status: 202 });
},
});
expect(result.code).toBe(0);
expect(calls).toHaveLength(1);
expect(calls[0]?.url).toBe("https://workspace.example/events");
const payload = JSON.parse(result.stdout);
expect(payload).toMatchObject({
event: { id: "patch:harness:v0.1.3:upstream.release" },
recorded: true,
record: { status: "dispatched", runIds: ["run-harness"], matched: 1 },
attempt: {
status: "started",
eventId: "patch:harness:v0.1.3:upstream.release",
workspaceRunIds: ["run-harness"],
},
});
const store = new EventStore(dataDir);
expect(await store.getFlowEvent("patch:harness:v0.1.3:upstream.release")).toMatchObject({
type: "upstream.release",
});
expect(await store.listMaintenanceAttempts()).toMatchObject([
{ eventId: "patch:harness:v0.1.3:upstream.release", status: "started" },
]);
});
test("dry-runs Codex release matching and blocks accidental local execution", async () => {
const blocked = await invoke([
"run",
"codex-release",
"--tag",
"rust-v0.130.0",
"--workspace-root",
workspaceRoot,
]);
expect(blocked.code).toBe(2);
expect(blocked.stderr).toContain("requires PATCH_WORKSPACE_BACKEND_URL or --allow-local");
const dryRun = await invoke([
"run",
"codex-release",
"--tag",
"rust-v0.130.0",
"--workspace-root",
workspaceRoot,
"--dry-run",
"--json",
]);
expect(dryRun.code).toBe(0);
expect(JSON.parse(dryRun.stdout).matches).toEqual([
{ flow: "openai-codex-bindings", step: "regenerate-bindings", runner: "bun" },
{ flow: "peezy-codex-fork", step: "rebase-patch-stack", runner: "code-mode" },
]);
});
test("syncs a maintenance attempt from workspace run state", async () => {
const dataDir = await mkdtemp(join(tmpdir(), "patch-cli-"));
const store = new EventStore(dataDir);
await store.appendMaintenanceAttempt({
id: "attempt-1",
eventId: "event-1",
eventType: "upstream.release",
operation: "dispatch",
status: "started",
upstreamRepo: "openai/codex",
upstreamTag: "rust-v0.130.0",
workspaceRunIds: ["run-1"],
candidateRefs: [],
createdAt: "2026-05-16T00:00:00.000Z",
updatedAt: "2026-05-16T00:00:00.000Z",
});
const result = await invoke([
"sync",
"attempt-1",
"--data-dir",
dataDir,
"--json",
], {
env: {
PATCH_WORKSPACE_BACKEND_URL: "https://workspace.example",
},
fetchImpl: async () => Response.json({
run: {
id: "run-1",
eventId: "event-1",
status: "completed",
completedAt: "2026-05-16T00:01:00.000Z",
resultJson: JSON.stringify({
status: "changed",
message: "candidate branch ready",
artifacts: {
candidateRefs: [{
kind: "branch",
repo: "peezy-tech/codex",
ref: "refs/heads/candidate",
sha: "abc123",
pushed: false,
}],
},
}),
},
}),
});
expect(result.code).toBe(0);
expect(JSON.parse(result.stdout)).toMatchObject({
attempt: {
id: "attempt-1",
status: "changed",
message: "candidate branch ready",
candidateRefs: [{ ref: "refs/heads/candidate", sha: "abc123" }],
},
});
expect(await store.listMaintenanceAttempts({ status: "changed" })).toMatchObject([
{ id: "attempt-1", status: "changed" },
]);
});
test("sets up the Codex upstream remote when explicitly applied", async () => {
const repo = await mkdtemp(join(tmpdir(), "patch-cli-codex-"));
await mkdir(repo, { recursive: true });
await git(repo, ["init", "-b", "code-mode-exec-hooks"]);
await git(repo, ["remote", "add", "origin", "https://github.com/peezy-tech/codex"]);
const result = await invoke([
"setup",
"codex",
"--repo",
repo,
"--apply",
"--json",
]);
expect(result.code).toBe(0);
expect(JSON.parse(result.stdout)).toMatchObject({
path: repo,
branch: "code-mode-exec-hooks",
upstream: "https://github.com/openai/codex.git",
addedUpstream: true,
clean: true,
ready: true,
});
expect((await git(repo, ["remote", "get-url", "upstream"])).stdout.trim()).toBe("https://github.com/openai/codex.git");
});
});
async function invoke(
args: string[],
options: Parameters<typeof runCli>[1] = {},
): Promise<{ code: number; stdout: string; stderr: string }> {
let stdout = "";
let stderr = "";
const code = await runCli(args, {
cwd: workspaceRoot,
...options,
stdout: (text) => {
stdout += text;
},
stderr: (text) => {
stderr += text;
},
});
return { code, stdout, stderr };
}
async function git(cwd: string, args: string[]): Promise<{ stdout: string; stderr: string }> {
const proc = Bun.spawn({
cmd: ["git", ...args],
cwd,
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, code] = await Promise.all([
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
proc.exited,
]);
if (code !== 0) {
throw new Error(`git ${args.join(" ")} failed: ${stderr || stdout}`);
}
return { stdout, stderr };
}

View file

@ -45,6 +45,7 @@ Root scripts delegate into the workspace:
```bash
bun run check
bun run test
bun run patch.moi -- status
bun run docs:dev
bun run workspace:doctor
```
@ -63,6 +64,14 @@ workspace backend URL is set. That makes local mode useful for testing a patch
application workspace before sending the same event to a configured workspace
backend or service runner.
The CLI uses the same dispatch and state path as the service. To record a local
harness maintenance attempt:
```bash
CODEX_FLOW_FETCH=0 CODEX_FLOW_PUSH=0 bun run patch.moi -- run harness
bun run patch.moi -- attempts
```
The repo-native workspace task runs the harness fixture through the same direct
flow command:

View file

@ -67,6 +67,13 @@ Run the harness directly:
CODEX_FLOW_FETCH=0 CODEX_FLOW_PUSH=0 bun run harness:flow
```
Run the same harness through the patch.moi CLI and record `DATA_DIR` state:
```bash
CODEX_FLOW_FETCH=0 CODEX_FLOW_PUSH=0 bun run patch.moi -- run harness
bun run patch.moi -- status
```
Run the same harness through repo-native workspace autonomy:
```bash
@ -93,6 +100,7 @@ DATA_DIR=./data FEED_SOURCES_PATH=./feed-sources.json bun run --filter @peezy.te
- First harness run: [Run the harness maintenance flow](tutorials/run-harness-maintenance-flow).
- Feed intake: [Watch an upstream release](tutorials/watch-upstream-release).
- CLI operations: [CLI](reference/cli).
- System model: [Architecture](concepts/architecture).
- Durable state: [JSONL state](reference/jsonl-state).
- Retry and replay: [Flow event retry and replay](reference/dispatch-and-replay-flow-events).

104
docs/pages/reference/cli.md Normal file
View file

@ -0,0 +1,104 @@
---
title: CLI
description: Local patch.moi commands for setup, maintenance runs, status, retry, replay, and sync.
---
# CLI
Run the CLI from the repository root:
```bash
bun run patch.moi -- status
```
By default, the CLI writes and reads patch.moi product state under `DATA_DIR`
relative to the workspace root. Use `--data-dir` to point at another JSONL
state directory and `--json` for machine-readable output.
## Setup
Inspect the neighboring Codex fork checkout:
```bash
bun run patch.moi -- setup codex --json
```
Add the canonical OpenAI upstream remote when it is missing:
```bash
bun run patch.moi -- setup codex --apply
```
The setup command reports the current branch, `origin`, `upstream`, worktree
cleanliness, and whether the checkout is ready for an automated maintenance
run. It does not clean local changes.
## Run Maintenance
Dispatch the harness release fixture through the patch.moi state path:
```bash
CODEX_FLOW_FETCH=0 CODEX_FLOW_PUSH=0 \
bun run patch.moi -- run harness
```
The command records the flow event, dispatch record, and maintenance attempt
under `DATA_DIR`. If `PATCH_WORKSPACE_BACKEND_URL` is unset, the dispatch uses
local flow execution from the workspace root. If it is set, the dispatch goes to
the configured workspace backend.
Verify Codex release flow matching without executing release work:
```bash
bun run patch.moi -- run codex-release --tag rust-v0.130.0 --dry-run
```
Dispatching the Codex release task requires a configured workspace backend by
default:
```bash
PATCH_WORKSPACE_BACKEND_URL=ws://127.0.0.1:3586 \
bun run patch.moi -- run codex-release --tag rust-v0.130.0
```
Use `--allow-local` only when you intentionally want the local Patch process to
execute matching Codex release flows. The Code Mode step still requires its own
`CODEX_FLOWS_ENABLE_CODE_MODE=1` gate.
## Status
Read patch.moi-owned state:
```bash
bun run patch.moi -- status
bun run patch.moi -- events --type upstream.release
bun run patch.moi -- dispatches --status failed
bun run patch.moi -- attempts --status needs_intervention
```
These commands inspect JSONL state. They do not reach into Git or a workspace
backend unless a command explicitly dispatches, replays, or syncs.
## Retry, Replay, And Sync
Retry dispatch transport failures:
```bash
bun run patch.moi -- retry '<event-id>'
```
Replay an accepted event to create another backend attempt:
```bash
bun run patch.moi -- replay '<event-id>'
```
Sync the latest workspace run outcome and candidate refs into the patch.moi
maintenance attempt:
```bash
bun run patch.moi -- sync '<attempt-id>'
```
Retry and replay append new dispatch and maintenance-attempt records. Sync
appends a newer record for the same attempt id when backend run state changes.

View file

@ -17,6 +17,8 @@ description: Runtime environment variables used by patch.moi.
| `PATCH_FLOW_BACKEND_URL` | unset | Legacy workspace flow HTTP surface URL fallback. |
| `PATCH_FLOW_DISPATCH_URL` | unset | Legacy or explicit flow dispatch URL fallback. |
| `PATCH_FLOW_DISPATCH_SECRET` | unset | Legacy HMAC secret fallback. |
| `PEEZY_CODEX_REPO` | `../codex` | Optional Codex checkout path for `patch.moi setup codex`. |
| `CODEX_FLOWS_ENABLE_CODE_MODE` | unset | Required by Code Mode flow steps before local Code Mode execution. |
| `CODEX_APP_SERVER_CODEX_COMMAND` | unset | Passed to local code-mode flow execution. |
| `CODEX_HOME` | unset | Passed to local code-mode flow execution. |

View file

@ -8,7 +8,8 @@ description: Workspace packages in the patch.moi monorepo.
## `@peezy.tech/patch`
The Bun service in `apps/patch`. It exports no public package API; its runtime
entry point is `src/server.ts`.
entry points are `src/server.ts` for the service and `src/cli.ts` for local
operator commands.
Responsibilities:
@ -17,6 +18,8 @@ Responsibilities:
- Store JSONL state.
- Submit generic `FlowEvent` triggers through the patch.moi workspace backend
adapter.
- Provide the `patch.moi` CLI for setup, local maintenance runs, status,
retry, replay, and sync.
- Serve admin inspection, retry, and replay endpoints.
The service package does not store patch contents. Maintained patch stacks live

View file

@ -59,6 +59,12 @@ selects both installed Codex release steps, and that the Code Mode step still
requires `CODEX_FLOWS_ENABLE_CODE_MODE=1`. Do not fabricate a full
`openai/codex` release lifecycle just to exercise the flow.
You can run the same safe match check through the CLI:
```bash
bun run patch.moi -- run codex-release --tag rust-v0.130.0 --dry-run
```
## 3. Point Patch at a workspace backend
```bash

View file

@ -27,6 +27,7 @@ export default {
{
group: "Reference",
pages: [
"reference/cli",
"reference/environment",
"reference/feed-sources",
"reference/http-api",

View file

@ -33,6 +33,7 @@
"docs:deploy": "bun run --filter @peezy.tech/patch-docs deploy",
"docs:dev": "bun run --filter @peezy.tech/patch-docs dev",
"harness:flow": "bun run --filter @peezy.tech/patch harness:flow",
"patch.moi": "bun run --filter @peezy.tech/patch patch.moi",
"start": "bun run --filter @peezy.tech/patch start",
"test": "bun run --filter @peezy.tech/patch test",
"workspace:doctor": "codex-flows workspace doctor --workspace-root .",