Add deferred run collection cursors

This commit is contained in:
matamune 2026-05-30 05:18:09 +00:00
parent 020dd2be94
commit 3b095824b0
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
11 changed files with 253 additions and 12 deletions

View file

@ -6,10 +6,11 @@ description: Recommended order for turning deferred run intents into a complete
# Deferred Runs Roadmap
Deferred runs give codex-toys a durable way to say "do this later" without
leaving an ad hoc agent process running forever. The first slice is live in
`codex-toys@0.137.0`: workspaces can create one-shot future intents, run due
leaving an ad hoc agent process running forever. The first slices are live in
`codex-toys@0.138.0`: workspaces can create one-shot future intents, run due
intents locally or over SSH, inspect pending/completed/failed state, prune old
terminal history, and pull saved attempt output back through the toybox.
terminal history, pull one saved attempt output back through the toybox, and
collect unseen terminal results with a queue-local cursor.
This roadmap keeps the next work ordered around operator trust: first make
results easy to harvest, then make scheduled runners easy to install, then add
@ -44,20 +45,26 @@ interval scheduling.
## Recommended Order
1. Add deferred result collection.
1. Harden deferred result collection.
Today, an operator can inspect one completed result with:
An operator can inspect one completed result with:
```bash
codex-toys workspace deferred pull <intent-id> --json
codex-toys --ssh rammstein --cwd /repo workspace deferred pull <intent-id> --json
```
The next step is a `deferred collect` style workflow with a local cursor:
"pull completed results I have not seen yet." This matters for local
operator work because a remote runner can finish work while the local vault
or browser session is offline. Collection should be read-oriented and should
not re-run completed work.
`deferred collect` adds the batch form:
```bash
codex-toys workspace deferred collect --cursor operator --json
codex-toys --ssh rammstein --cwd /repo workspace deferred collect --cursor operator --json
```
The cursor lives with the queue being collected. Local collection advances a
local queue cursor; SSH collection advances a remote queue cursor. The next
hardening work is about operator presentation, not semantics: friendlier
summaries, dashboard use, and optional cursor naming conventions.
2. Add CI and scheduled-runner setup.
@ -69,6 +76,14 @@ interval scheduling.
This should land after result collection so CI-produced outputs have a clear
local harvest path.
Recommended next slice: add scheduled runner scaffolding for
`workspace tick --mode actions`. The scaffold should create or update a
Forgejo/GitHub workflow that runs on `workflow_dispatch` and a configurable
cron, installs codex-toys, runs `codex-toys actions prepare-auth`, runs
`codex-toys workspace tick --mode actions`, always runs
`codex-toys actions cleanup`, and commits only durable workspace state when
that state changed.
3. Add an explicit retention policy.
`workspace deferred prune --older-than-days <days> [--dry-run]` is available
@ -118,6 +133,7 @@ Pull a completed result:
```bash
codex-toys workspace deferred pull <intent-id> --json
codex-toys workspace deferred collect --cursor operator --json
```
Operate a remote workspace's local queue over SSH:
@ -126,6 +142,7 @@ Operate a remote workspace's local queue over SSH:
codex-toys --ssh rammstein --cwd /remote/workspace workspace deferred list --json
codex-toys --ssh rammstein --cwd /remote/workspace workspace deferred run-due
codex-toys --ssh rammstein --cwd /remote/workspace workspace deferred pull <intent-id> --json
codex-toys --ssh rammstein --cwd /remote/workspace workspace deferred collect --cursor operator --json
```
Configure a low-frequency local scheduled task in `.codex/workspace.toml`:

View file

@ -63,6 +63,7 @@ codex-toys workspace run morning-brief --mode actions
codex-toys workspace deferred create --params-json '{"runAt":"2026-01-01T14:00:00.000Z","target":{"kind":"turn","prompt":"Review the workspace."}}'
codex-toys workspace deferred list --json
codex-toys workspace deferred pull <intent-id> --json
codex-toys workspace deferred collect --cursor operator --json
codex-toys workspace deferred run-due
codex-toys workspace init actions --forgejo
CODEX_WORKSPACE_MODE=actions codex-toys workspace doctor
@ -78,7 +79,7 @@ evaluates reactive rules.
`run <task-id>` runs one configured task immediately.
`deferred create`, `list`, `read`, `cancel`, `run-due`, and `prune` manage
`deferred create`, `list`, `read`, `collect`, `cancel`, `run-due`, and `prune` manage
durable future run intents. A deferred target can wrap a direct Codex turn, a
named turn automation, or a configured workspace task. Pruning is explicit and
only removes terminal history older than the requested retention window.
@ -86,6 +87,12 @@ only removes terminal history older than the requested retention window.
saved attempt outputs in the JSON response, which lets SSH callers pull remote
deferred results back to the local operator surface.
`deferred collect` is the cursor-based harvest path for completed, failed, or
canceled runs. Reusing the same cursor returns only terminal results newer than
that cursor; using a new cursor replays the terminal queue from the beginning.
The cursor lives in the queue being collected, including remote queues accessed
with `--ssh`.
`init actions` scaffolds an Actions-ready workspace. The command can generate:
- `.codex/workspace.toml`

View file

@ -130,6 +130,7 @@ codex-toys workspace doctor
codex-toys workspace tick --mode local
codex-toys workspace deferred list --json
codex-toys workspace deferred pull <intent-id> --json
codex-toys workspace deferred collect --cursor operator --json
codex-toys workspace deferred prune --older-than-days 30 --dry-run
```

View file

@ -205,6 +205,7 @@ codex-toys workspace deferred create --params-json <json>
codex-toys workspace deferred list [--mode auto|local|actions] [--json]
codex-toys workspace deferred read <intent-id> [--include-output] [--json]
codex-toys workspace deferred pull <intent-id> [--json]
codex-toys workspace deferred collect [--cursor <name>] [--json]
codex-toys workspace deferred cancel <intent-id>
codex-toys workspace deferred run-due [--mode auto|local|actions]
codex-toys workspace deferred prune --older-than-days <days> [--dry-run]
@ -229,6 +230,11 @@ in the response. `workspace deferred pull` is a shorthand for that form, which
is useful when a local operator wants to inspect a remote deferred run result
without separately reading remote filesystem paths.
`workspace deferred collect` returns terminal deferred runs that have not been
seen by the named cursor yet, including saved attempt outputs. The cursor is
stored with the queue being collected; over SSH, that means the remote
workspace queue advances its own cursor.
`workspace deferred prune` removes only terminal deferred history (`completed`,
`failed`, or `canceled`) older than the requested retention window. Pending and
running intents are never pruned.

View file

@ -214,6 +214,16 @@ type ParsedCliBase =
json: boolean;
pretty: boolean;
}
| {
type: "workspace-deferred-collect";
cursor?: string;
mode?: WorkspaceModeInput;
workspaceRoot?: string;
url: string;
timeoutMs: number;
json: boolean;
pretty: boolean;
}
| {
type: "workspace-deferred-cancel";
intentId: string;
@ -389,6 +399,7 @@ export function parseArgs(
let dryRun = false;
let includeOutput = false;
let olderThanDays: number | undefined;
let cursor: string | undefined;
let model: string | undefined;
let paramsJson: string | undefined;
let paramsFile: string | undefined;
@ -665,6 +676,14 @@ export function parseArgs(
olderThanDays = positiveInteger(arg.slice("--older-than-days=".length), "--older-than-days");
continue;
}
if (arg === "--cursor") {
cursor = required(argv, ++index, arg);
continue;
}
if (arg.startsWith("--cursor=")) {
cursor = arg.slice("--cursor=".length);
continue;
}
if (arg === "--allow-absolute-cwd") {
allowAbsoluteCwd = true;
continue;
@ -1088,6 +1107,19 @@ export function parseArgs(
...remoteFields(),
};
}
if (action === "collect") {
return {
type: "workspace-deferred-collect",
cursor,
mode,
workspaceRoot,
url: workspaceUrl,
timeoutMs,
json,
pretty,
...remoteFields(),
};
}
if (action === "cancel") {
return {
type: "workspace-deferred-cancel",
@ -1127,7 +1159,7 @@ export function parseArgs(
...remoteFields(),
};
}
throw new Error("workspace deferred requires create, list, read, cancel, run-due, or prune");
throw new Error("workspace deferred requires create, list, read, collect, cancel, run-due, or prune");
}
if (subcommand === "run") {
return {

View file

@ -93,6 +93,7 @@ import {
transplantThreadRollout,
} from "../threads.ts";
import {
collectDeferredRuns,
collectWorkspaceDoctorInfo,
commitActionsWorkspaceState,
cancelDeferredRunIntent,
@ -468,6 +469,25 @@ async function main(): Promise<void> {
: `${JSON.stringify(result, null, 2)}\n`);
return;
}
if (parsed.type === "workspace-deferred-collect") {
const result = hasSshRemote(parsed)
? await callToybox("deferred.collect", compactUndefined({
cursor: parsed.cursor,
mode: parsed.mode,
workspaceRoot: parsed.workspaceRoot,
}), parsed)
: await collectDeferredRuns(
await createWorkspaceContext({
workspaceRoot: parsed.workspaceRoot,
mode: parsed.mode,
}),
{ cursor: parsed.cursor },
);
write(parsed.json
? `${JSON.stringify(result, null, parsed.pretty ? 2 : 0)}\n`
: `${JSON.stringify(result, null, 2)}\n`);
return;
}
if (parsed.type === "workspace-deferred-cancel") {
const result = hasSshRemote(parsed)
? await callToybox("deferred.cancel", compactUndefined({
@ -1307,6 +1327,7 @@ Usage:
codex-toys workspace deferred list [--mode auto|local|actions] [--json]
codex-toys workspace deferred read <intent-id> [--include-output] [--json]
codex-toys workspace deferred pull <intent-id> [--json]
codex-toys workspace deferred collect [--cursor <name>] [--json]
codex-toys workspace deferred cancel <intent-id>
codex-toys workspace deferred run-due [--mode auto|local|actions]
codex-toys workspace deferred prune --older-than-days <days> [--dry-run]
@ -1371,6 +1392,7 @@ Options:
remote workspace root.
--dry-run Preview supported write operations.
--older-than-days <days> Retention window for deferred prune.
--cursor <name> Deferred collect cursor name.
--via <workspace|app> Turn surface. Defaults to workspace.
--sandbox <mode> Turn sandbox: danger-full-access,
workspace-write, or read-only.

View file

@ -189,6 +189,22 @@ export type DeferredRunReadResult = {
outputs?: DeferredRunAttemptOutput[];
};
export type DeferredRunCollectCursor = {
cursor: string;
updatedAt: string;
lastUpdatedAt?: string;
lastIntentId?: string;
};
export type DeferredRunCollectResult = {
mode: WorkspaceMode;
cursor: string;
collectedAt: string;
previousCursor?: DeferredRunCollectCursor;
cursorState: DeferredRunCollectCursor;
intents: DeferredRunReadResult[];
};
export type DeferredRunExecution = {
intent: DeferredRunIntent;
attempt: DeferredRunAttempt;
@ -620,6 +636,46 @@ export async function readDeferredRun(
return compactUndefined({ intent, attempts, outputs });
}
export async function collectDeferredRuns(
context: WorkspaceContext,
options: {
cursor?: string;
now?: Date;
} = {},
): Promise<DeferredRunCollectResult> {
await ensureDeferredRunDirs(context);
const cursor = deferredCollectCursorName(options.cursor);
const previousCursor = await readDeferredRunCollectCursor(context, cursor);
const collectedAt = (options.now ?? new Date()).toISOString();
const terminalIntents = (await listDeferredRunIntents(context))
.filter((intent) => isTerminalDeferredRunStatus(intent.status))
.toSorted((left, right) =>
left.updatedAt.localeCompare(right.updatedAt) || left.id.localeCompare(right.id)
)
.filter((intent) => isAfterDeferredRunCollectCursor(intent, previousCursor));
const intents = await Promise.all(
terminalIntents.map(async (intent) =>
await readDeferredRun(context, intent.id, { includeOutput: true })
),
);
const last = terminalIntents.at(-1);
const cursorState: DeferredRunCollectCursor = compactUndefined({
cursor,
updatedAt: collectedAt,
lastUpdatedAt: last?.updatedAt ?? previousCursor?.lastUpdatedAt,
lastIntentId: last?.id ?? previousCursor?.lastIntentId,
});
await writeJsonFileAtomic(deferredCollectCursorPath(context, cursor), cursorState);
return compactUndefined({
mode: context.mode,
cursor,
collectedAt,
previousCursor,
cursorState,
intents,
});
}
export async function cancelDeferredRunIntent(
context: WorkspaceContext,
intentId: string,
@ -1249,6 +1305,21 @@ async function readDeferredRunAttemptOutputs(
return outputs;
}
async function readDeferredRunCollectCursor(
context: WorkspaceContext,
cursor: string,
): Promise<DeferredRunCollectCursor | undefined> {
const file = deferredCollectCursorPath(context, cursor);
try {
return normalizeDeferredRunCollectCursor(parseJsonText(await readFile(file, "utf8"), file), cursor);
} catch (error) {
if (isNotFoundError(error)) {
return undefined;
}
throw error;
}
}
async function claimDeferredRunIntent(
context: WorkspaceContext,
intent: DeferredRunIntent,
@ -1434,6 +1505,19 @@ function normalizeDeferredRunAttempt(value: unknown): DeferredRunAttempt {
};
}
function normalizeDeferredRunCollectCursor(
value: unknown,
fallbackCursor: string,
): DeferredRunCollectCursor {
const input = record(value);
return compactUndefined({
cursor: optionalString(input.cursor) ?? fallbackCursor,
updatedAt: requiredString(input.updatedAt, "deferred run collect cursor updatedAt"),
lastUpdatedAt: optionalString(input.lastUpdatedAt),
lastIntentId: optionalString(input.lastIntentId),
});
}
function dueTasks(
tasks: WorkspaceTask[],
runs: WorkspaceRunRecord[],
@ -1511,6 +1595,20 @@ function isTerminalDeferredRunStatus(
return status === "completed" || status === "failed" || status === "canceled";
}
function isAfterDeferredRunCollectCursor(
intent: DeferredRunIntent,
cursor: DeferredRunCollectCursor | undefined,
): boolean {
if (!cursor?.lastUpdatedAt) {
return true;
}
const updatedAtOrder = intent.updatedAt.localeCompare(cursor.lastUpdatedAt);
if (updatedAtOrder !== 0) {
return updatedAtOrder > 0;
}
return cursor.lastIntentId ? intent.id.localeCompare(cursor.lastIntentId) > 0 : true;
}
function isWorkspaceRunRecord(value: unknown): value is WorkspaceRunRecord {
const input = record(value);
return typeof input.id === "string" &&
@ -1580,6 +1678,7 @@ async function ensureDeferredRunDirs(context: WorkspaceContext): Promise<void> {
deferredAttemptDir(context),
deferredOutputDir(context),
deferredClaimDir(context),
deferredCollectCursorDir(context),
]) {
await mkdir(dir, { recursive: true });
}
@ -1605,6 +1704,10 @@ function deferredClaimDir(context: WorkspaceContext): string {
return path.join(deferredRoot(context), "claims");
}
function deferredCollectCursorDir(context: WorkspaceContext): string {
return path.join(deferredRoot(context), "collect-cursors");
}
function deferredIntentPath(context: WorkspaceContext, intentId: string): string {
return path.join(deferredIntentDir(context), `${safeFileSegment(intentId)}.json`);
}
@ -1617,6 +1720,18 @@ function deferredClaimPath(context: WorkspaceContext, intentId: string): string
return path.join(deferredClaimDir(context), `${safeFileSegment(intentId)}.json`);
}
function deferredCollectCursorPath(context: WorkspaceContext, cursor: string): string {
return path.join(deferredCollectCursorDir(context), `${safeFileSegment(cursor)}.json`);
}
function deferredCollectCursorName(value: string | undefined): string {
const cursor = value?.trim() || "default";
if (!/^[A-Za-z0-9][A-Za-z0-9_.-]*$/.test(cursor)) {
throw new Error(`Invalid deferred collect cursor: ${cursor}`);
}
return cursor;
}
function deferredRunId(createdAt: string): string {
return `deferred-${createdAt.replace(/[:.]/g, "-")}-${randomUUID().slice(0, 8)}`;
}

View file

@ -5,6 +5,7 @@ import {
import type { ToyboxMethodHandler } from "./server.ts";
import {
cancelDeferredRunIntent,
collectDeferredRuns,
createDeferredRunIntent,
createWorkspaceContext,
listDeferredRunIntents,
@ -18,6 +19,7 @@ import {
export const WORKSPACE_DEFERRED_CREATE_METHOD = "deferred.create";
export const WORKSPACE_DEFERRED_LIST_METHOD = "deferred.list";
export const WORKSPACE_DEFERRED_READ_METHOD = "deferred.read";
export const WORKSPACE_DEFERRED_COLLECT_METHOD = "deferred.collect";
export const WORKSPACE_DEFERRED_CANCEL_METHOD = "deferred.cancel";
export const WORKSPACE_DEFERRED_RUN_DUE_METHOD = "deferred.runDue";
export const WORKSPACE_DEFERRED_PRUNE_METHOD = "deferred.prune";
@ -41,6 +43,12 @@ export const workspaceDeferredRunMethodMetadata: ToyboxMethodMetadata[] = [
sideEffects: "read-only",
category: "deferred",
},
{
name: WORKSPACE_DEFERRED_COLLECT_METHOD,
description: "Collect terminal deferred run results after a named cursor.",
sideEffects: "writes-local",
category: "deferred",
},
{
name: WORKSPACE_DEFERRED_CANCEL_METHOD,
description: "Cancel a pending deferred run intent.",
@ -93,6 +101,13 @@ export function createWorkspaceDeferredRunMethods(
includeOutput: input.includeOutput === true,
});
},
[WORKSPACE_DEFERRED_COLLECT_METHOD]: async (params) => {
const context = await contextFromParams(params, options);
const input = record(params);
return await collectDeferredRuns(context, {
cursor: stringValue(input.cursor),
});
},
[WORKSPACE_DEFERRED_CANCEL_METHOD]: async (params) => {
const context = await contextFromParams(params, options);
return {

View file

@ -281,6 +281,12 @@ describe("Codex toybox protocol", () => {
},
jsonRpcRequest("read", "deferred.read"),
) as { intent: { id: string }; attempts: unknown[]; outputs: unknown[] };
const collected = await methods["deferred.collect"]!(
{
cursor: "operator",
},
jsonRpcRequest("collect", "deferred.collect"),
) as { cursor: string; intents: unknown[] };
const pruned = await methods["deferred.prune"]!(
{
olderThanDays: 1,
@ -299,6 +305,10 @@ describe("Codex toybox protocol", () => {
attempts: [],
outputs: [],
});
expect(collected).toMatchObject({
cursor: "operator",
intents: [],
});
expect(pruned.pruned).toBe(0);
});
});

View file

@ -5,6 +5,7 @@ import path from "node:path";
import { parseArgs } from "../src/cli/args.ts";
import {
cancelDeferredRunIntent,
collectDeferredRuns,
collectWorkspaceDoctorInfo,
createDeferredRunIntent,
createWorkspaceContext,
@ -80,6 +81,8 @@ describe("workspace autonomy", () => {
.toMatchObject({ type: "workspace-deferred-read", intentId: "later-1", includeOutput: true });
expect(parseArgs(["workspace", "deferred", "pull", "later-1"], {}))
.toMatchObject({ type: "workspace-deferred-read", intentId: "later-1", includeOutput: true });
expect(parseArgs(["workspace", "deferred", "collect", "--cursor", "operator", "--json"], {}))
.toMatchObject({ type: "workspace-deferred-collect", cursor: "operator", json: true });
expect(parseArgs(["workspace", "deferred", "run-due"], {}))
.toMatchObject({ type: "workspace-deferred-run-due" });
expect(parseArgs(["workspace", "deferred", "prune", "--older-than-days", "30", "--dry-run"], {}))
@ -246,6 +249,18 @@ command = ["node", "-e", "console.log('hello deferred')"]
});
expect((readWithOutput.outputs?.[0]?.output as { workspaceRun?: { taskId?: string } }).workspaceRun)
.toMatchObject({ taskId: "hello" });
const firstCollect = await collectDeferredRuns(context, { now: new Date("2026-01-01T00:00:03.000Z") });
const secondCollect = await collectDeferredRuns(context, { now: new Date("2026-01-01T00:00:04.000Z") });
const namedCollect = await collectDeferredRuns(context, {
cursor: "operator",
now: new Date("2026-01-01T00:00:05.000Z"),
});
expect(firstCollect.intents).toHaveLength(1);
expect(firstCollect.cursorState.lastIntentId).toBe(intent.id);
expect(firstCollect.intents[0]?.outputs).toHaveLength(1);
expect(secondCollect.intents).toHaveLength(0);
expect(secondCollect.previousCursor?.lastIntentId).toBe(intent.id);
expect(namedCollect.intents.map((item) => item.intent.id)).toEqual([intent.id]);
const workspaceRun = JSON.parse(await readFile(read.attempts[0]!.outputPath!, "utf8"))
.workspaceRun as { taskId: string; status: string };
expect(workspaceRun).toMatchObject({

View file

@ -69,6 +69,7 @@ const requiredCliLines = [
"codex-toys workspace deferred list [--mode auto|local|actions] [--json]",
"codex-toys workspace deferred read <intent-id> [--include-output] [--json]",
"codex-toys workspace deferred pull <intent-id> [--json]",
"codex-toys workspace deferred collect [--cursor <name>] [--json]",
"codex-toys workspace deferred run-due [--mode auto|local|actions]",
"codex-toys workspace deferred prune --older-than-days <days> [--dry-run]",
"codex-toys memories transplant global-to-workspace [--apply]",