From e5dd461d41d055eb640f17397787108c37281f82 Mon Sep 17 00:00:00 2001 From: matamune Date: Sat, 16 May 2026 05:50:00 +0000 Subject: [PATCH] Record maintenance outcomes from workspace runs --- README.md | 6 +- apps/patch/src/feed.ts | 6 +- apps/patch/src/flow.ts | 294 ++++++++++++++---- apps/patch/src/queue.ts | 26 +- apps/patch/src/server.ts | 96 +++++- apps/patch/src/types.ts | 27 +- apps/patch/test/harness-flow.test.ts | 10 + apps/patch/test/server.test.ts | 97 ++++++ .../dispatch-and-replay-flow-events.md | 7 + docs/pages/reference/http-api.md | 11 +- docs/pages/reference/jsonl-state.md | 4 +- .../tutorials/run-harness-maintenance-flow.md | 11 +- .../pages/tutorials/watch-upstream-release.md | 3 +- flows/patch-moi-harness/README.md | 3 +- flows/patch-moi-harness/exec/rebase-fork.ts | 17 + flows/patch-moi-harness/flow.toml | 1 + harness/README.md | 5 +- 17 files changed, 538 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 825a431..17dd50a 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ GET /flow-events/:id POST /flow-events/:id/retry POST /flow-events/:id/replay GET /maintenance-attempts +GET /maintenance-attempts/:id +POST /maintenance-attempts/:id/sync GET /workspace-dispatches GET /workspace-events GET /workspace-runs @@ -68,9 +70,9 @@ poll primes `DATA_DIR/feed-state.json`; later polls append upstream activity to work to `DATA_DIR/feed-jobs.jsonl`. Targets using `mode: "workspace_flow"` append generic flow events to `DATA_DIR/flow-events.jsonl`, send them to the configured workspace backend or local adapter, and record dispatch outcomes in -`DATA_DIR/workspace-dispatches.jsonl`. Each dispatch also creates a +`DATA_DIR/workspace-dispatches.jsonl`. Each dispatch also creates or updates a patch.moi-owned `DATA_DIR/maintenance-attempts.jsonl` entry that links the -upstream update to workspace run ids and future candidate refs. +upstream update to workspace run ids, final flow outcome, and candidate refs. ## Documentation diff --git a/apps/patch/src/feed.ts b/apps/patch/src/feed.ts index 81f5dc8..d0ff16b 100644 --- a/apps/patch/src/feed.ts +++ b/apps/patch/src/feed.ts @@ -208,7 +208,11 @@ export async function pollFeedSource(input: { await input.store.appendWorkspaceDispatch(workspaceDispatch.record); if (workspaceDispatch.event) { await input.store.appendMaintenanceAttempt( - maintenanceAttemptForWorkspaceDispatch(workspaceDispatch.event, workspaceDispatch.record), + maintenanceAttemptForWorkspaceDispatch( + workspaceDispatch.event, + workspaceDispatch.record, + workspaceDispatch.result?.runs, + ), ); } if (workspaceDispatch.record.status === "dispatched") { diff --git a/apps/patch/src/flow.ts b/apps/patch/src/flow.ts index da7510f..4a91897 100644 --- a/apps/patch/src/flow.ts +++ b/apps/patch/src/flow.ts @@ -1,9 +1,16 @@ import type { + FlowDispatchResult, + FlowReplayResult, + FlowRunView, +} from "@peezy.tech/flow-runtime/client"; +import type { + CandidateRefRecord, FeedWorkspaceFlowTarget, FeedSignal, FlowDispatchRecord, FlowEvent, MaintenanceAttemptRecord, + MaintenanceAttemptStatus, } from "./types"; import { createPatchWorkspaceBackend, @@ -16,6 +23,16 @@ const serviceSource = "patch"; export type FlowDispatchConfig = WorkspaceBackendConfig; export type WorkspaceDispatchConfig = WorkspaceBackendConfig; +export type WorkspaceDispatchOutcome = { + record: FlowDispatchRecord; + result?: FlowDispatchResult; +}; + +export type WorkspaceReplayOutcome = { + record: FlowDispatchRecord; + result?: FlowReplayResult; +}; + function isWorkspaceFlowTarget(value: unknown): value is FeedWorkspaceFlowTarget { return ( typeof value === "object" && @@ -107,39 +124,52 @@ export async function dispatchWorkspaceEvent( target: Partial = {}, config: WorkspaceDispatchConfig = {}, ): Promise { + return (await dispatchWorkspaceEventDetailed(event, target, config)).record; +} + +export async function dispatchWorkspaceEventDetailed( + event: FlowEvent, + target: Partial = {}, + config: WorkspaceDispatchConfig = {}, +): Promise { const workspaceTarget = { mode: "workspace_flow" as const, eventType: event.type, ...target }; const backend = createPatchWorkspaceBackend(workspaceTarget, config); try { const result = await backend.client.dispatchEvent(event); return { - eventId: event.id, - eventType: event.type, - operation: "dispatch", - target: backend.mode === "local" ? "local" : "workspace-backend", - transport: backend.mode, - workspaceBackendUrl: backend.url, - url: backend.eventsUrl, - status: "dispatched", - runIds: result.runIds, - matched: result.matched, - idempotent: result.idempotent, - createdAt: new Date().toISOString(), + result, + record: { + eventId: event.id, + eventType: event.type, + operation: "dispatch", + target: backend.mode === "local" ? "local" : "workspace-backend", + transport: backend.mode, + workspaceBackendUrl: backend.url, + url: backend.eventsUrl, + status: "dispatched", + runIds: result.runIds, + matched: result.matched, + idempotent: result.idempotent, + createdAt: new Date().toISOString(), + }, }; } catch (error) { const httpStatus = httpStatusFromError(error); return { - eventId: event.id, - eventType: event.type, - operation: "dispatch", - target: backend.mode === "local" ? "local" : "workspace-backend", - transport: backend.mode, - workspaceBackendUrl: backend.url, - url: backend.eventsUrl, - status: "failed", - ...(httpStatus ? { httpStatus } : {}), - error: error instanceof Error ? error.message : String(error), - createdAt: new Date().toISOString(), + record: { + eventId: event.id, + eventType: event.type, + operation: "dispatch", + target: backend.mode === "local" ? "local" : "workspace-backend", + transport: backend.mode, + workspaceBackendUrl: backend.url, + url: backend.eventsUrl, + status: "failed", + ...(httpStatus ? { httpStatus } : {}), + error: error instanceof Error ? error.message : String(error), + createdAt: new Date().toISOString(), + }, }; } } @@ -157,6 +187,14 @@ export async function replayWorkspaceEvent( target: Partial = {}, config: WorkspaceDispatchConfig = {}, ): Promise { + return (await replayWorkspaceEventDetailed(event, target, config)).record; +} + +export async function replayWorkspaceEventDetailed( + event: FlowEvent, + target: Partial = {}, + config: WorkspaceDispatchConfig = {}, +): Promise { const env = config.env ?? process.env; const workspaceTarget = { mode: "workspace_flow" as const, eventType: event.type, ...target }; const backend = createPatchWorkspaceBackend(workspaceTarget, config); @@ -167,33 +205,38 @@ export async function replayWorkspaceEvent( ? await backend.client.replayEvent(event.id, { wait: false }) : await backend.client.dispatchEvent(event); return { - eventId: event.id, - eventType: event.type, - operation: "replay", - target: backend.mode === "local" ? "local" : "workspace-backend", - transport: backend.mode, - workspaceBackendUrl: backend.url, - url: backend.eventsUrl ? `${backend.eventsUrl}/${encodeURIComponent(event.id)}/replay` : undefined, - status: "dispatched", - runIds: result.runIds, - matched: result.matched, - idempotent: result.idempotent, - createdAt: new Date().toISOString(), + result, + record: { + eventId: event.id, + eventType: event.type, + operation: "replay", + target: backend.mode === "local" ? "local" : "workspace-backend", + transport: backend.mode, + workspaceBackendUrl: backend.url, + url: backend.eventsUrl ? `${backend.eventsUrl}/${encodeURIComponent(event.id)}/replay` : undefined, + status: "dispatched", + runIds: result.runIds, + matched: result.matched, + idempotent: result.idempotent, + createdAt: new Date().toISOString(), + }, }; } catch (error) { const httpStatus = httpStatusFromError(error); return { - eventId: event.id, - eventType: event.type, - operation: "replay", - target: backend.mode === "local" ? "local" : "workspace-backend", - transport: backend.mode, - workspaceBackendUrl: backend.url, - url: backend.eventsUrl, - status: "failed", - ...(httpStatus ? { httpStatus } : {}), - error: error instanceof Error ? error.message : String(error), - createdAt: new Date().toISOString(), + record: { + eventId: event.id, + eventType: event.type, + operation: "replay", + target: backend.mode === "local" ? "local" : "workspace-backend", + transport: backend.mode, + workspaceBackendUrl: backend.url, + url: backend.eventsUrl, + status: "failed", + ...(httpStatus ? { httpStatus } : {}), + error: error instanceof Error ? error.message : String(error), + createdAt: new Date().toISOString(), + }, }; } } @@ -224,7 +267,7 @@ export async function listWorkspaceEvents(config: WorkspaceDispatchConfig = {}, export async function dispatchWorkspaceEventForFeedSignal( signal: FeedSignal, config: WorkspaceDispatchConfig = {}, -): Promise<{ event?: FlowEvent>; record?: FlowDispatchRecord }> { +): Promise<{ event?: FlowEvent>; record?: FlowDispatchRecord; result?: FlowDispatchResult }> { if (!isWorkspaceFlowTarget(signal.target)) { return {}; } @@ -234,10 +277,8 @@ export async function dispatchWorkspaceEventForFeedSignal( return {}; } - return { - event, - record: await dispatchWorkspaceEvent(event, signal.target, config), - }; + const outcome = await dispatchWorkspaceEventDetailed(event, signal.target, config); + return { event, ...outcome }; } export async function dispatchFlowEventForFeedSignal( @@ -250,13 +291,15 @@ export async function dispatchFlowEventForFeedSignal( export function maintenanceAttemptForWorkspaceDispatch( event: FlowEvent, record: FlowDispatchRecord, + runs: FlowRunView[] = [], ): MaintenanceAttemptRecord { const payload = typeof event.payload === "object" && event.payload !== null ? event.payload as Record : {}; const operation = record.operation ?? "dispatch"; + const createdAt = record.createdAt; - return { + return maintenanceAttemptWithWorkspaceRuns({ id: `${event.id}:${operation}:${record.createdAt}`, eventId: event.id, eventType: event.type, @@ -270,7 +313,56 @@ export function maintenanceAttemptForWorkspaceDispatch( workspaceRunIds: record.runIds ?? [], candidateRefs: [], error: record.error, - createdAt: record.createdAt, + createdAt, + updatedAt: createdAt, + }, runs, createdAt); +} + +export function maintenanceAttemptWithWorkspaceRuns( + attempt: MaintenanceAttemptRecord, + runs: FlowRunView[], + updatedAt = new Date().toISOString(), +): MaintenanceAttemptRecord { + if (runs.length === 0) { + return attempt; + } + + const statuses = Object.fromEntries( + runs.map((run) => [run.id, String(run.effectiveStatus ?? run.status ?? "unknown")]), + ); + const resultPayloads = runs + .map((run) => flowResultPayload(run.resultPayload)) + .filter((payload): payload is Record => payload !== undefined); + const status = statusFromRuns(runs); + const message = newestString(resultPayloads.map((payload) => payload.message)) ?? attempt.message; + const candidateRefs = uniqueCandidateRefs([ + ...attempt.candidateRefs, + ...resultPayloads.flatMap(candidateRefsFromFlowResult), + ]); + const error = newestString([ + ...runs.map((run) => run.error), + ...resultPayloads.map((payload) => payload.message).filter((_value, index) => { + const status = resultPayloads[index]?.status; + return status === "failed" || status === "blocked" || status === "needs_intervention"; + }), + ]) ?? attempt.error; + const completedAt = status === "started" + ? attempt.completedAt + : newestString(runs.map((run) => run.completedAt)) ?? updatedAt; + + return { + ...attempt, + status, + workspaceRunIds: uniqueStrings([ + ...attempt.workspaceRunIds, + ...runs.map((run) => run.id).filter(Boolean), + ]), + workspaceRunStatuses: statuses, + candidateRefs, + ...(message ? { message } : {}), + ...(error ? { error } : {}), + updatedAt, + ...(completedAt ? { completedAt } : {}), }; } @@ -283,3 +375,99 @@ function httpStatusFromError(error: unknown): number | undefined { function stringValue(value: unknown): string | undefined { return typeof value === "string" && value.trim() ? value : undefined; } + +function flowResultPayload(value: unknown): Record | undefined { + if (typeof value !== "object" || value === null || Array.isArray(value)) { + return undefined; + } + const payload = value as Record; + if (typeof payload.status === "string") { + return payload; + } + const nested = recordValue(payload.result); + return typeof nested.status === "string" ? nested : undefined; +} + +function statusFromRuns(runs: FlowRunView[]): MaintenanceAttemptStatus { + const statuses = runs.map((run) => resultStatus(run)); + if (statuses.some((status) => status === "needs_intervention")) return "needs_intervention"; + if (statuses.some((status) => status === "blocked")) return "blocked"; + if (statuses.some((status) => status === "failed")) return "failed"; + if (statuses.some((status) => status === "changed")) return "changed"; + if (statuses.length > 0 && statuses.every((status) => status === "skipped")) return "skipped"; + if (statuses.length > 0 && statuses.every((status) => status === "completed" || status === "skipped")) return "completed"; + return "started"; +} + +function resultStatus(run: FlowRunView): string { + const payload = flowResultPayload(run.resultPayload); + return stringValue(payload?.status) ?? String(run.effectiveStatus ?? run.status ?? "started"); +} + +function candidateRefsFromFlowResult(result: Record): CandidateRefRecord[] { + const artifacts = recordValue(result.artifacts); + const candidates = [ + ...arrayValue(artifacts.candidateRefs), + ...arrayValue(artifacts.candidates), + artifacts.candidateRef, + ]; + return candidates.flatMap(candidateRefValue); +} + +function candidateRefValue(value: unknown): CandidateRefRecord[] { + if (typeof value === "string" && value.trim()) { + return [{ kind: "ref", ref: value.trim() }]; + } + const record = recordValue(value); + const ref = stringValue(record.ref); + if (!ref) { + return []; + } + return [{ + kind: stringValue(record.kind) ?? "ref", + ref, + ...(stringValue(record.repo) ? { repo: stringValue(record.repo) } : {}), + ...(stringValue(record.remote) ? { remote: stringValue(record.remote) } : {}), + ...(stringValue(record.sha) ? { sha: stringValue(record.sha) } : {}), + ...(stringValue(record.url) ? { url: stringValue(record.url) } : {}), + ...(typeof record.pushed === "boolean" ? { pushed: record.pushed } : {}), + }]; +} + +function uniqueCandidateRefs(refs: CandidateRefRecord[]): CandidateRefRecord[] { + const seen = new Set(); + const result: CandidateRefRecord[] = []; + for (const ref of refs) { + const key = `${ref.kind}:${ref.repo ?? ""}:${ref.remote ?? ""}:${ref.ref}:${ref.sha ?? ""}`; + if (seen.has(key)) { + continue; + } + seen.add(key); + result.push(ref); + } + return result; +} + +function uniqueStrings(values: string[]): string[] { + return [...new Set(values)]; +} + +function newestString(values: unknown[]): string | undefined { + for (let index = values.length - 1; index >= 0; index -= 1) { + const value = stringValue(values[index]); + if (value) { + return value; + } + } + return undefined; +} + +function recordValue(value: unknown): Record { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? value as Record + : {}; +} + +function arrayValue(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} diff --git a/apps/patch/src/queue.ts b/apps/patch/src/queue.ts index db01f14..78e8eb1 100644 --- a/apps/patch/src/queue.ts +++ b/apps/patch/src/queue.ts @@ -118,7 +118,7 @@ export class EventStore { eventId?: string; status?: MaintenanceAttemptRecord["status"]; } = {}): Promise { - const records = await readJsonLines(this.maintenanceAttemptsPath); + const records = latestRecordsById(await readJsonLines(this.maintenanceAttemptsPath)); return limitNewest( records.filter((record) => (!options.eventId || record.eventId === options.eventId) && @@ -127,6 +127,30 @@ export class EventStore { options.limit, ); } + + async getMaintenanceAttempt(id: string): Promise { + const records = await readJsonLines(this.maintenanceAttemptsPath); + for (let index = records.length - 1; index >= 0; index -= 1) { + if (records[index]?.id === id) { + return records[index]; + } + } + return undefined; + } +} + +function latestRecordsById(records: T[]): T[] { + const seen = new Set(); + const latest: T[] = []; + for (let index = records.length - 1; index >= 0; index -= 1) { + const record = records[index]; + if (!record || seen.has(record.id)) { + continue; + } + seen.add(record.id); + latest.push(record); + } + return latest.reverse(); } export function jobForFeedSignal(signal: FeedSignal): FeedJob | null { diff --git a/apps/patch/src/server.ts b/apps/patch/src/server.ts index ec8580e..66e21ee 100644 --- a/apps/patch/src/server.ts +++ b/apps/patch/src/server.ts @@ -1,16 +1,18 @@ import { parseDiscordConfig, type DiscordConfig } from "./discord"; import { startFeedPolling } from "./feed"; import { - dispatchWorkspaceEvent, + dispatchWorkspaceEventDetailed, getWorkspaceEvent, getWorkspaceRun, listWorkspaceEvents, listWorkspaceRuns, maintenanceAttemptForWorkspaceDispatch, - replayWorkspaceEvent, + maintenanceAttemptWithWorkspaceRuns, + replayWorkspaceEventDetailed, } from "./flow"; import { jsonResponse, methodNotAllowed, textResponse } from "./http"; import { EventStore } from "./queue"; +import type { MaintenanceAttemptRecord } from "./types"; export type ServerConfig = { dataDir: string; @@ -45,8 +47,16 @@ function dispatchStatus(value: string | null) { function maintenanceStatus(value: string | null) { if (!value) return undefined; - if (value === "started" || value === "failed" || value === "skipped") return value; - throw new Error("maintenance attempt status must be started, failed, or skipped"); + if ( + value === "started" || + value === "completed" || + value === "changed" || + value === "needs_intervention" || + value === "blocked" || + value === "failed" || + value === "skipped" + ) return value; + throw new Error("maintenance attempt status is invalid"); } async function handleFlowEvents(request: Request, config: ServerConfig, store: EventStore): Promise { @@ -78,15 +88,15 @@ async function handleFlowEvents(request: Request, config: ServerConfig, store: E }); } if (request.method === "POST" && eventMatch[2] === "retry") { - const record = await dispatchWorkspaceEvent(event, {}, { env: process.env }); + const { record, result } = await dispatchWorkspaceEventDetailed(event, {}, { env: process.env }); await store.appendWorkspaceDispatch(record); - await store.appendMaintenanceAttempt(maintenanceAttemptForWorkspaceDispatch(event, record)); + await store.appendMaintenanceAttempt(maintenanceAttemptForWorkspaceDispatch(event, record, result?.runs)); return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 }); } if (request.method === "POST" && eventMatch[2] === "replay") { - const record = await replayWorkspaceEvent(event, {}, { env: process.env }); + const { record, result } = await replayWorkspaceEventDetailed(event, {}, { env: process.env }); await store.appendWorkspaceDispatch(record); - await store.appendMaintenanceAttempt(maintenanceAttemptForWorkspaceDispatch(event, record)); + await store.appendMaintenanceAttempt(maintenanceAttemptForWorkspaceDispatch(event, record, result?.runs)); return jsonResponse({ event, record }, { status: record.status === "failed" ? 502 : 202 }); } return methodNotAllowed(); @@ -109,14 +119,68 @@ async function handleFlowDispatches(request: Request, config: ServerConfig, stor async function handleMaintenanceAttempts(request: Request, config: ServerConfig, store: EventStore): Promise { const unauthorized = requireAdmin(request, config); if (unauthorized) return unauthorized; - if (request.method !== "GET") return methodNotAllowed(); const url = new URL(request.url); - return jsonResponse({ - attempts: await store.listMaintenanceAttempts({ - eventId: url.searchParams.get("eventId") ?? undefined, - status: maintenanceStatus(url.searchParams.get("status")), - limit: numberParam(url.searchParams.get("limit")), - }), + if (request.method === "GET" && url.pathname === "/maintenance-attempts") { + return jsonResponse({ + attempts: await store.listMaintenanceAttempts({ + eventId: url.searchParams.get("eventId") ?? undefined, + status: maintenanceStatus(url.searchParams.get("status")), + limit: numberParam(url.searchParams.get("limit")), + }), + }); + } + + const attemptMatch = url.pathname.match(/^\/maintenance-attempts\/([^/]+)(?:\/(sync))?$/); + if (!attemptMatch?.[1]) return jsonResponse({ error: "not_found" }, { status: 404 }); + const attemptId = decodeURIComponent(attemptMatch[1]); + const attempt = await store.getMaintenanceAttempt(attemptId); + if (!attempt) { + return jsonResponse({ error: "maintenance_attempt_not_found" }, { status: 404 }); + } + if (request.method === "GET" && !attemptMatch[2]) { + return jsonResponse({ attempt }); + } + if (request.method === "POST" && attemptMatch[2] === "sync") { + const next = await syncMaintenanceAttempt(store, attempt); + return jsonResponse({ attempt: next }, { status: 202 }); + } + return methodNotAllowed(); +} + +async function syncMaintenanceAttempt( + store: EventStore, + attempt: MaintenanceAttemptRecord, +): Promise { + const runs = attempt.workspaceRunIds.length > 0 + ? await Promise.all(attempt.workspaceRunIds.map((runId) => getWorkspaceRun(runId, { env: process.env }))) + : (await listWorkspaceRuns({ env: process.env }, { eventId: attempt.eventId })).runs; + const next = maintenanceAttemptWithWorkspaceRuns(attempt, runs); + if (maintenanceAttemptChanged(attempt, next)) { + await store.appendMaintenanceAttempt(next); + } + return next; +} + +function maintenanceAttemptChanged( + before: MaintenanceAttemptRecord, + after: MaintenanceAttemptRecord, +): boolean { + return JSON.stringify({ + status: before.status, + workspaceRunIds: before.workspaceRunIds, + workspaceRunStatuses: before.workspaceRunStatuses, + candidateRefs: before.candidateRefs, + message: before.message, + error: before.error, + completedAt: before.completedAt, + }) !== JSON.stringify({ + status: after.status, + workspaceRunIds: after.workspaceRunIds, + workspaceRunStatuses: after.workspaceRunStatuses, + candidateRefs: after.candidateRefs, + message: after.message, + error: after.error, + completedAt: after.completedAt, }); } @@ -175,7 +239,7 @@ export function createHandler(config: ServerConfig): (request: Request) => Promi if (url.pathname === "/workspace-dispatches" || url.pathname === "/flow-dispatches") { return handleFlowDispatches(request, config, store); } - if (url.pathname === "/maintenance-attempts") { + if (url.pathname === "/maintenance-attempts" || url.pathname.startsWith("/maintenance-attempts/")) { return handleMaintenanceAttempts(request, config, store); } if (url.pathname === "/workspace-runs" || url.pathname.startsWith("/workspace-runs/")) { diff --git a/apps/patch/src/types.ts b/apps/patch/src/types.ts index 967adb8..e859645 100644 --- a/apps/patch/src/types.ts +++ b/apps/patch/src/types.ts @@ -97,19 +97,42 @@ export type FlowDispatchRecord = { export type WorkspaceDispatchRecord = FlowDispatchRecord; +export type CandidateRefRecord = { + kind: string; + ref: string; + repo?: string; + remote?: string; + sha?: string; + url?: string; + pushed?: boolean; +}; + +export type MaintenanceAttemptStatus = + | "started" + | "completed" + | "changed" + | "needs_intervention" + | "blocked" + | "failed" + | "skipped"; + export type MaintenanceAttemptRecord = { id: string; eventId: string; eventType: string; operation: "dispatch" | "replay"; - status: "started" | "failed" | "skipped"; + status: MaintenanceAttemptStatus; upstreamRepo?: string; upstreamRef?: string; upstreamSha?: string; upstreamTag?: string; workspaceBackendUrl?: string; workspaceRunIds: string[]; - candidateRefs: string[]; + workspaceRunStatuses?: Record; + candidateRefs: CandidateRefRecord[]; + message?: string; error?: string; createdAt: string; + updatedAt: string; + completedAt?: string; }; diff --git a/apps/patch/test/harness-flow.test.ts b/apps/patch/test/harness-flow.test.ts index df76cb5..d44ef9e 100644 --- a/apps/patch/test/harness-flow.test.ts +++ b/apps/patch/test/harness-flow.test.ts @@ -40,6 +40,16 @@ describe("patch.moi harness flow", () => { expect(result.status).toBe("completed"); expect(result.message).toContain("package checks passed"); + expect(result.artifacts?.candidateRefs).toMatchObject([ + { + kind: "branch", + repo: "matamune-peezy/patch-moi-harness", + remote: "local", + ref: "refs/heads/main", + sha: afterHead, + pushed: false, + }, + ]); expect(afterHead).toBe(beforeHead); expect(await git(["status", "--porcelain=v1"])).toBe(""); }); diff --git a/apps/patch/test/server.test.ts b/apps/patch/test/server.test.ts index 1eec67a..2bbc182 100644 --- a/apps/patch/test/server.test.ts +++ b/apps/patch/test/server.test.ts @@ -203,4 +203,101 @@ describe("server", () => { } } }); + + test("syncs maintenance attempt outcomes from workspace run results", async () => { + const originalFetch = globalThis.fetch; + const originalWorkspaceUrl = process.env.PATCH_WORKSPACE_BACKEND_URL; + const dataDir = await mkdtemp(join(tmpdir(), "patch-")); + const store = new EventStore(dataDir); + const attempt = { + id: "patch:source:entry:upstream.release:dispatch:2026-05-13T00:00:01.000Z", + eventId: "patch:source:entry:upstream.release", + eventType: "upstream.release", + operation: "dispatch" as const, + status: "started" as const, + upstreamRepo: "openai/codex", + upstreamTag: "v1.2.3", + workspaceBackendUrl: "http://127.0.0.1:3586", + workspaceRunIds: ["run-1"], + candidateRefs: [], + createdAt: "2026-05-13T00:00:01.000Z", + updatedAt: "2026-05-13T00:00:01.000Z", + }; + await store.appendMaintenanceAttempt(attempt); + + process.env.PATCH_WORKSPACE_BACKEND_URL = "http://127.0.0.1:3586"; + globalThis.fetch = (async (url: string | URL | Request) => { + if (String(url).includes("/runs/run-1")) { + return Response.json({ + run: { + id: "run-1", + eventId: attempt.eventId, + status: "completed", + completedAt: "2026-05-13T00:00:05.000Z", + resultJson: JSON.stringify({ + status: "changed", + message: "candidate branch ready", + artifacts: { + candidateRefs: [{ + kind: "branch", + repo: "matamune-peezy/patch-moi-harness", + remote: "origin", + ref: "refs/heads/main", + sha: "abc123", + pushed: true, + }], + }, + }), + }, + }); + } + return Response.json({ error: "not found" }, { status: 404 }); + }) as unknown as typeof fetch; + + try { + const handler = createHandler({ + dataDir, + adminToken: "admin", + }); + const sync = await handler(new Request( + `http://localhost/maintenance-attempts/${encodeURIComponent(attempt.id)}/sync`, + { + method: "POST", + headers: { authorization: "Bearer admin" }, + }, + )); + expect(sync.status).toBe(202); + expect(await sync.json()).toMatchObject({ + attempt: { + id: attempt.id, + status: "changed", + message: "candidate branch ready", + workspaceRunStatuses: { "run-1": "changed" }, + candidateRefs: [{ + kind: "branch", + repo: "matamune-peezy/patch-moi-harness", + remote: "origin", + ref: "refs/heads/main", + sha: "abc123", + pushed: true, + }], + }, + }); + + const changed = await handler(new Request("http://localhost/maintenance-attempts?status=changed", { + headers: { authorization: "Bearer admin" }, + })); + expect(changed.status).toBe(200); + expect(await changed.json()).toMatchObject({ + attempts: [{ id: attempt.id, status: "changed" }], + }); + } finally { + globalThis.fetch = originalFetch; + if (originalWorkspaceUrl === undefined) { + delete process.env.PATCH_WORKSPACE_BACKEND_URL; + } else { + process.env.PATCH_WORKSPACE_BACKEND_URL = originalWorkspaceUrl; + } + } + }); }); diff --git a/docs/pages/reference/dispatch-and-replay-flow-events.md b/docs/pages/reference/dispatch-and-replay-flow-events.md index dce7984..eae289f 100644 --- a/docs/pages/reference/dispatch-and-replay-flow-events.md +++ b/docs/pages/reference/dispatch-and-replay-flow-events.md @@ -52,6 +52,13 @@ Each retry or replay writes a maintenance attempt record: curl http://127.0.0.1:3000/maintenance-attempts?eventId= ``` +After the workspace run finishes, sync the attempt to record the final +maintenance outcome and any candidate refs reported by the flow: + +```bash +curl -X POST http://127.0.0.1:3000/maintenance-attempts//sync +``` + Use workspace inspection endpoints to read backend-owned run state: ```bash diff --git a/docs/pages/reference/http-api.md b/docs/pages/reference/http-api.md index 17beb82..0a3c95f 100644 --- a/docs/pages/reference/http-api.md +++ b/docs/pages/reference/http-api.md @@ -31,12 +31,19 @@ inspect or modify Git patch stacks directly. ## Maintenance attempts ```text -GET /maintenance-attempts?eventId=&status=started|failed|skipped&limit= +GET /maintenance-attempts?eventId=&status=&limit= +GET /maintenance-attempts/:id +POST /maintenance-attempts/:id/sync ``` +`status` can be `started`, `completed`, `changed`, `needs_intervention`, +`blocked`, `failed`, or `skipped`. + Maintenance attempts are patch.moi-owned product records. They link an upstream update trigger to workspace run ids and candidate refs without copying -workspace backend run state into patch.moi. +workspace backend run state into patch.moi. `sync` reads the configured +workspace backend run results, extracts patch.moi outcome fields such as +candidate refs, and appends the latest attempt state. ## Dispatches diff --git a/docs/pages/reference/jsonl-state.md b/docs/pages/reference/jsonl-state.md index f72df6e..b250c9c 100644 --- a/docs/pages/reference/jsonl-state.md +++ b/docs/pages/reference/jsonl-state.md @@ -13,13 +13,15 @@ operational state. They are not the patch stack. | `feed-state.json` | Per-source last seen entry and last checked timestamp. | | `feed-events.jsonl` | Normalized `FeedSignal` records. | | `flow-events.jsonl` | Generic `FlowEvent` records emitted by flow targets. | -| `maintenance-attempts.jsonl` | patch.moi-owned attempt records linking update events to workspace run ids and candidate refs. | +| `maintenance-attempts.jsonl` | patch.moi-owned attempt records linking update events to workspace run ids, outcomes, and candidate refs. | | `workspace-dispatches.jsonl` | Workspace dispatch, retry, replay, and failure records. | | `flow-dispatches.jsonl` | Legacy dispatch record file read for compatibility. | Admin endpoints read `flow-events.jsonl`, `maintenance-attempts.jsonl`, `workspace-dispatches.jsonl`, and the legacy `flow-dispatches.jsonl` file. The feed poller appends to all relevant files as it accepts new feed entries. +Maintenance attempt sync also appends updated records; admin list endpoints show +the latest record for each attempt id. If a runner checkout is lost, patch.moi should be able to recreate the maintenance context from remote Git refs and forge records. JSONL state explains diff --git a/docs/pages/tutorials/run-harness-maintenance-flow.md b/docs/pages/tutorials/run-harness-maintenance-flow.md index d5bb69d..948c9d6 100644 --- a/docs/pages/tutorials/run-harness-maintenance-flow.md +++ b/docs/pages/tutorials/run-harness-maintenance-flow.md @@ -33,7 +33,8 @@ bun run harness:flow The fixture event is `v0.1.3`, which the current fork already contains. The flow should skip rebase work, run `npm test` and `npm run pack:dry-run` in the -fork, and leave the fork checkout unchanged. +fork, report `candidateRefs` for the maintained fork branch, and leave the fork +checkout unchanged. ## 3. Rehearse a real upstream release @@ -58,7 +59,8 @@ bun run harness:flow ``` Use an event file whose `payload.tag` is the new upstream tag. The flow rebases -`harness/fork` onto that tag, verifies the fork package, and keeps pushes off. +`harness/fork` onto that tag, verifies the fork package, reports the local +candidate branch, and keeps pushes off. ## 4. Push only after review @@ -69,5 +71,6 @@ CODEX_FLOW_PUSH=1 bun run harness:flow ``` That pushes the rebased fork branch to the configured `origin` and `jojo` -remotes with `--force-with-lease`. Public npm publishing remains a separate -trusted-publishing release path. +remotes with `--force-with-lease` and reports those pushed branch refs as +candidate refs. Public npm publishing remains a separate trusted-publishing +release path. diff --git a/docs/pages/tutorials/watch-upstream-release.md b/docs/pages/tutorials/watch-upstream-release.md index c8bcfec..5fee124 100644 --- a/docs/pages/tutorials/watch-upstream-release.md +++ b/docs/pages/tutorials/watch-upstream-release.md @@ -72,7 +72,8 @@ When the feed later contains an unseen release entry, Patch appends: - `data/feed-events.jsonl` for the normalized signal. - `data/flow-events.jsonl` for the generic flow event. -- `data/maintenance-attempts.jsonl` for the patch.moi maintenance attempt. +- `data/maintenance-attempts.jsonl` for the patch.moi maintenance attempt and + later candidate refs. - `data/workspace-dispatches.jsonl` for the workspace dispatch outcome. If no workspace backend URL is set, Patch uses local flow execution from the diff --git a/flows/patch-moi-harness/README.md b/flows/patch-moi-harness/README.md index 7255844..79a418f 100644 --- a/flows/patch-moi-harness/README.md +++ b/flows/patch-moi-harness/README.md @@ -11,6 +11,7 @@ The default behavior is local and reviewable: - switch to the maintained fork branch - rebase onto the release tag when the tag is not already an ancestor - run the configured package checks +- emit candidate branch refs in the `FLOW_RESULT` artifacts - leave pushes disabled unless `CODEX_FLOW_PUSH=1` is set Useful overrides: @@ -22,4 +23,4 @@ CODEX_FLOW_PUSH=1 bun run harness:flow The fixture event is `fixtures/upstream-release-v0.1.3.json`. It should be a no-op rebase against the current harness fork while still verifying the package -surface. +surface and reporting the local maintained branch as the candidate ref. diff --git a/flows/patch-moi-harness/exec/rebase-fork.ts b/flows/patch-moi-harness/exec/rebase-fork.ts index 27f2ff7..c28585f 100644 --- a/flows/patch-moi-harness/exec/rebase-fork.ts +++ b/flows/patch-moi-harness/exec/rebase-fork.ts @@ -32,6 +32,7 @@ const payload = context.flow.event.payload ?? {}; const releaseTag = tagFromPayload(payload); const repo = stringValue(payload.repo, "payload.repo"); const forkRepo = path.resolve(workspaceRoot, stringConfig("fork_repo", "harness/fork")); +const forkRepoFullName = stringConfig("fork_repo_full_name", "matamune-peezy/patch-moi-harness"); const targetBranch = stringConfig("target_branch", "main"); const upstreamRemote = stringConfig("upstream_remote", "upstream"); const upstreamRepoUrl = stringConfig("upstream_repo_url", "https://github.com/peezy-tech/patch-moi-harness.git"); @@ -114,12 +115,15 @@ try { eventId: context.flow.event.id, repo, forkRepo, + forkRepoFullName, targetBranch, releaseTag, releaseSha, beforeSha, afterSha, pushed: enabled("push", false), + checks: verifyCommands.map((command) => ({ name: command, status: "passed" })), + candidateRefs: candidateRefsFor(afterSha), }); } catch (error) { finish("failed", error instanceof Error ? error.message : String(error)); @@ -246,6 +250,19 @@ function harnessMessage(beforeSha: string, afterSha: string): string { return `Harness fork rebased onto ${releaseTag}; package checks passed.`; } +function candidateRefsFor(sha: string): Array> { + const pushed = enabled("push", false); + const remotes = pushed ? pushRemotes : ["local"]; + return remotes.map((remote) => ({ + kind: "branch", + repo: forkRepoFullName, + remote, + ref: `refs/heads/${targetBranch}`, + sha, + pushed, + })); +} + function enabled(name: string, fallback: boolean): boolean { const envValue = process.env[`CODEX_FLOW_${name.toUpperCase()}`]; if (envValue !== undefined) { diff --git a/flows/patch-moi-harness/flow.toml b/flows/patch-moi-harness/flow.toml index c8781e3..f6a04a4 100644 --- a/flows/patch-moi-harness/flow.toml +++ b/flows/patch-moi-harness/flow.toml @@ -4,6 +4,7 @@ description = "Rebase the maintained patch.moi harness fork onto an upstream har [config] fork_repo = "harness/fork" +fork_repo_full_name = "matamune-peezy/patch-moi-harness" target_branch = "main" upstream_remote = "upstream" upstream_repo_url = "https://github.com/peezy-tech/patch-moi-harness.git" diff --git a/harness/README.md b/harness/README.md index 11ca197..bb9631a 100644 --- a/harness/README.md +++ b/harness/README.md @@ -115,8 +115,9 @@ the upstream update event, creates a maintenance attempt record, and hands the same flow event to the configured workspace backend. The default fixture targets `v0.1.3`, which should verify the current fork -without changing it. For a new upstream tag, run the same command with an event -file whose `payload.tag` names that tag. +without changing it and report `candidateRefs` for the maintained fork branch. +For a new upstream tag, run the same command with an event file whose +`payload.tag` names that tag. ## Scenario: Fork Release