Merge Code Mode flow support into main
All checks were successful
ci / check (push) Successful in 30s

This commit is contained in:
matamune 2026-05-13 03:17:06 +00:00
parent 0860aeb42b
commit 0d3c2e2ab6
Signed by: matamune
GPG key ID: 3BB8E7D3B968A324
24 changed files with 2630 additions and 18 deletions

View file

@ -0,0 +1,351 @@
const release = config.release;
const commands = [];
function q(value) {
return "'" + String(value).replaceAll("'", "'\\''") + "'";
}
function trim(value) {
return String(value || "").trim();
}
function truncate(value, max) {
const textValue = String(value || "");
if (textValue.length <= max) {
return textValue;
}
return textValue.slice(0, max) + "\n...[truncated " + String(textValue.length - max) + " chars]";
}
function outputOf(result) {
if (typeof result?.output === "string") {
return result.output;
}
return JSON.stringify(result ?? {});
}
function exitCodeOf(result) {
if (typeof result?.exit_code === "number") {
return result.exit_code;
}
if (typeof result?.exitCode === "number") {
return result.exitCode;
}
return null;
}
function ok(result) {
return result.exit_code === 0;
}
async function run(label, cmd, options = {}) {
const workdir = options.workdir || config.codexRepo;
text("\n### " + label + "\n$ " + cmd + "\n");
const raw = await tools.exec_command({
cmd,
workdir,
yield_time_ms: options.yield_time_ms || 1000,
max_output_tokens: options.max_output_tokens || 12000
});
const result = {
label,
cmd,
workdir,
exit_code: exitCodeOf(raw),
output: outputOf(raw)
};
commands.push({
...result,
output: truncate(result.output, 4000)
});
text("exit_code=" + String(result.exit_code) + "\n" + truncate(result.output, options.textLimit || 12000) + "\n");
return result;
}
function finish(status, message, extra = {}) {
const summary = {
status,
message,
releaseTag: release.tagName,
releaseUrl: release.url,
targetCommitish: release.targetCommitish,
...extra,
commands
};
text("\nCODEX_UPDATE_RESULT " + JSON.stringify(summary) + "\n");
exit();
}
async function collectRebaseContext(rebaseOutput, beforeSha) {
const status = await run("rebase conflict status", "git status --short --branch", { max_output_tokens: 12000 });
const unmerged = await run("unmerged files", "git diff --name-only --diff-filter=U", { max_output_tokens: 12000 });
const diffStat = await run("conflict diff stat", "git diff --cc --stat", { max_output_tokens: 12000 });
const conflictDiff = await run("conflict diff", "git diff --cc", { max_output_tokens: 30000, textLimit: 20000 });
const currentPatch = await run("current rebase patch", "git rebase --show-current-patch", { max_output_tokens: 20000, textLimit: 12000 });
return {
beforeSha,
rebaseOutput,
statusOutput: status.output,
unmergedFiles: unmerged.output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean),
diffStat: diffStat.output,
conflictDiff: truncate(conflictDiff.output, 20000),
currentPatch: truncate(currentPatch.output, 12000),
interventionPrompt: "Continue this same thread to resolve the paused rebase. Preserve the native Code Mode replay/app-server changes, do not abort or reset unless explicitly instructed, then run the configured verification commands."
};
}
text([
"Codex upstream update job",
"",
"Release: " + release.tagName + (release.url ? " (" + release.url + ")" : ""),
"Target branch: " + config.targetBranch,
"Codex repo: " + config.codexRepo,
"Codex Rust workspace: " + config.codexRustDir,
"Service repo: " + config.serviceRepo,
"Upstream remote: " + config.upstreamRemote + " -> " + config.upstreamRepoUrl,
"Cargo target dir: " + config.cargoTargetDir
].join("\n") + "\n");
const repoCheck = await run("verify codex repo", "git rev-parse --show-toplevel");
if (!ok(repoCheck)) {
finish("failed", "codex repo is not a git checkout", { repoCheck: repoCheck.output });
}
const rustWorkspaceCheck = await run(
"verify codex Rust workspace",
"test -f " + q(config.codexRustDir + "/Cargo.toml"),
{ max_output_tokens: 4000 }
);
if (!ok(rustWorkspaceCheck)) {
finish("failed", "codex Rust workspace was not found at the expected codex-rs path", {
codexRustDir: config.codexRustDir,
rustWorkspaceCheck: rustWorkspaceCheck.output
});
}
const existingRebase = await run(
"check existing rebase state",
"test -d \"$(git rev-parse --git-path rebase-merge)\" -o -d \"$(git rev-parse --git-path rebase-apply)\"",
{ max_output_tokens: 4000 }
);
if (existingRebase.exit_code === 0) {
const context = await collectRebaseContext("A rebase was already in progress before this job started.", undefined);
finish("blocked", "A rebase is already in progress in the codex checkout.", context);
}
await run("codex status before update", "git status --short --branch", { max_output_tokens: 12000 });
const branch = await run("current branch", "git rev-parse --abbrev-ref HEAD", { max_output_tokens: 4000 });
if (!ok(branch)) {
finish("failed", "could not read current branch", { branchOutput: branch.output });
}
if (trim(branch.output) !== config.targetBranch) {
const dirtyBeforeSwitch = await run("dirty check before branch switch", "git status --porcelain=v1", { max_output_tokens: 12000 });
if (trim(dirtyBeforeSwitch.output)) {
finish("blocked", "codex checkout has local changes before switching branches.", {
dirtyStatus: dirtyBeforeSwitch.output
});
}
const switched = await run("switch target branch", "git switch " + q(config.targetBranch), { max_output_tokens: 12000 });
if (!ok(switched)) {
finish("failed", "could not switch to target branch", { switchOutput: switched.output });
}
}
const dirty = await run("dirty check on target branch", "git status --porcelain=v1", { max_output_tokens: 12000 });
if (trim(dirty.output)) {
finish("blocked", "codex target branch has local changes. Resolve or stash them before updating.", {
dirtyStatus: dirty.output
});
}
const remote = await run(
"ensure upstream openai/codex remote",
"git remote get-url " + q(config.upstreamRemote) + " >/dev/null 2>&1 && git remote set-url " + q(config.upstreamRemote) + " " + q(config.upstreamRepoUrl) + " || git remote add " + q(config.upstreamRemote) + " " + q(config.upstreamRepoUrl),
{ max_output_tokens: 12000 }
);
if (!ok(remote)) {
finish("failed", "could not configure upstream remote", { remoteOutput: remote.output });
}
const fetch = await run("fetch upstream tags", "git fetch " + q(config.upstreamRemote) + " --tags --prune", { max_output_tokens: 20000 });
if (!ok(fetch)) {
finish("failed", "could not fetch upstream release tags", { fetchOutput: fetch.output });
}
const releaseCommit = await run(
"resolve release tag",
"git rev-parse --verify " + q("refs/tags/" + release.tagName + "^{commit}"),
{ max_output_tokens: 4000 }
);
if (!ok(releaseCommit)) {
finish("failed", "could not resolve upstream release tag after fetch", {
releaseTag: release.tagName,
resolveOutput: releaseCommit.output
});
}
const beforeHead = await run("codex head before rebase", "git rev-parse HEAD", { max_output_tokens: 4000 });
const rebase = await run("rebase target branch onto upstream release", "git rebase " + q(release.tagName), { max_output_tokens: 30000, textLimit: 20000 });
if (!ok(rebase)) {
const context = await collectRebaseContext(rebase.output, trim(beforeHead.output));
finish("conflict", "Rebase paused with conflicts.", context);
}
const afterHead = await run("codex head after rebase", "git rev-parse HEAD", { max_output_tokens: 4000 });
await run("codex status after rebase", "git status --short --branch", { max_output_tokens: 12000 });
const build = await run(
"build explicit fork binary",
"CARGO_TARGET_DIR=" + q(config.cargoTargetDir) + " cargo build -p codex-cli --bin codex",
{ workdir: config.codexRustDir, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(build)) {
finish("failed", "fork binary build failed", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
buildOutput: build.output
});
}
const version = await run("verify explicit fork binary", q(config.codexBinary) + " --version", { max_output_tokens: 4000 });
if (!ok(version)) {
finish("failed", "built fork binary did not run", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
versionOutput: version.output
});
}
const cargoCheck = await run(
"cargo check replay packages",
"CARGO_TARGET_DIR=" + q(config.cargoTargetDir) + " cargo check -p codex-app-server -p codex-core -p codex-app-server-protocol",
{ workdir: config.codexRustDir, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(cargoCheck)) {
finish("failed", "cargo check failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
cargoCheckOutput: cargoCheck.output
});
}
const protocolTest = await run(
"protocol code mode execute test",
"CARGO_TARGET_DIR=" + q(config.cargoTargetDir) + " cargo test -p codex-app-server-protocol thread_code_mode_execute -- --nocapture",
{ workdir: config.codexRustDir, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(protocolTest)) {
finish("failed", "protocol replay API test failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
protocolTestOutput: protocolTest.output
});
}
const fmt = await run("cargo fmt check", "cargo fmt --check", {
workdir: config.codexRustDir,
max_output_tokens: 20000
});
if (!ok(fmt)) {
finish("failed", "cargo fmt --check failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
fmtOutput: fmt.output
});
}
const codexDiffCheck = await run("codex diff whitespace check", "git diff --check", { max_output_tokens: 12000 });
if (!ok(codexDiffCheck)) {
finish("failed", "codex git diff --check failed after rebase", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
diffCheckOutput: codexDiffCheck.output
});
}
const generate = await run(
"regenerate codex-flows app-server TypeScript bindings",
q(config.codexBinary) + " app-server generate-ts --experimental --out " + q(config.generatedDir),
{ workdir: config.serviceRepo, max_output_tokens: 30000, textLimit: 20000 }
);
if (!ok(generate)) {
finish("failed", "failed to regenerate codex-flows TypeScript bindings from fork binary", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
generateOutput: generate.output
});
}
const generatedStatus = await run(
"generated TypeScript binding status",
"git status --short -- packages/codex-client/src/app-server/generated",
{ workdir: config.serviceRepo, max_output_tokens: 12000 }
);
const bunInstall = await run("refresh service dependencies", "bun install --frozen-lockfile", {
workdir: config.serviceRepo,
max_output_tokens: 20000
});
if (!ok(bunInstall)) {
finish("failed", "bun install --frozen-lockfile failed in codex-flows", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
bunInstallOutput: bunInstall.output
});
}
const serviceTypes = await run("service typecheck", "bun run check:types", {
workdir: config.serviceRepo,
max_output_tokens: 30000,
textLimit: 20000
});
if (!ok(serviceTypes)) {
finish("failed", "codex-flows typecheck failed after generated binding update", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
serviceTypesOutput: serviceTypes.output
});
}
const serviceTests = await run("service tests", "bun run test", {
workdir: config.serviceRepo,
max_output_tokens: 30000,
textLimit: 20000
});
if (!ok(serviceTests)) {
finish("failed", "codex-flows tests failed after generated binding update", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
serviceTestsOutput: serviceTests.output
});
}
const serviceDiffCheck = await run("service diff whitespace check", "git diff --check", {
workdir: config.serviceRepo,
max_output_tokens: 12000
});
if (!ok(serviceDiffCheck)) {
finish("failed", "codex-flows git diff --check failed", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
serviceDiffCheckOutput: serviceDiffCheck.output
});
}
const codexStatus = await run("final codex status", "git status --short --branch", { max_output_tokens: 12000 });
const serviceStatus = await run("final service status", "git status --short --branch", {
workdir: config.serviceRepo,
max_output_tokens: 12000
});
finish("completed", "Codex fork rebased onto upstream release and verified. Review diffs, push explicitly, and publish @peezy.tech/codex to npm when ready.", {
beforeSha: trim(beforeHead.output),
afterSha: trim(afterHead.output),
codexHead: trim(afterHead.output),
codexBinary: config.codexBinary,
codexVersion: trim(version.output),
generatedStatus: generatedStatus.output,
codexStatus: codexStatus.output,
serviceStatus: serviceStatus.output
});

View file

@ -0,0 +1,634 @@
#!/usr/bin/env bun
import { readFile } from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { CodexAppServerClient } from "../packages/codex-client/src/index.ts";
type Args = {
candidate: string;
cwd?: string;
codexCommand?: string;
codexHome?: string;
cliPath: string;
timeoutMs: number;
ephemeral: boolean;
stream: boolean;
injectContext: boolean;
injectResult: boolean;
notes: string[];
threadName?: string | null;
mode: ReplayMode;
};
type ReplayMode = "native" | "shim";
type CandidateMetadata = Record<string, unknown>;
const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "..");
const defaultCliPath = path.join(repoRoot, "apps/cli/src/index.ts");
async function main() {
const args = await parseArgs(process.argv.slice(2));
const candidate = path.resolve(args.candidate);
const metadata = await readCandidateMetadata(candidate);
const cwd = path.resolve(args.cwd ?? metadataCwd(metadata) ?? process.cwd());
const source = await readFile(candidate, "utf8");
const cliPath = path.resolve(args.cliPath);
const threadName =
args.threadName === undefined ? defaultThreadName(candidate) : args.threadName;
const command =
args.mode === "shim"
? replayCommand({
candidate,
cliPath,
codexCommand: args.codexCommand,
cwd,
timeoutMs: args.timeoutMs,
})
: undefined;
const client = new CodexAppServerClient({
transportOptions: {
codexCommand: args.codexCommand,
args: appServerArgs(),
env: args.codexHome ? { CODEX_HOME: path.resolve(args.codexHome) } : undefined,
requestTimeoutMs: args.timeoutMs,
},
clientName: "code-mode-replay-thread",
clientTitle: "Code Mode Replay Thread",
clientVersion: "0.1.0",
});
const output: string[] = [];
let completedItem: unknown;
let commandExitCode: number | null = null;
let resolveTurnCompleted: (value: unknown) => void = () => undefined;
const turnCompleted = new Promise((resolve) => {
resolveTurnCompleted = resolve;
});
client.on("request", (message) => {
client.respondError(message.id, -32603, "replay script does not handle server requests");
});
client.on("notification", (message) => {
if (message.method === "item/commandExecution/outputDelta") {
const delta = stringField(message.params, "delta");
if (delta) {
output.push(delta);
if (args.stream) {
process.stdout.write(delta);
}
}
}
if (message.method === "item/agentMessage/delta") {
const delta = stringField(message.params, "delta");
if (delta) {
output.push(delta);
if (args.stream) {
process.stdout.write(delta);
}
}
}
if (message.method === "item/completed") {
const item = recordField(message.params, "item");
if (item && stringField(item, "type") === "commandExecution") {
completedItem = item;
commandExitCode = numberField(item, "exitCode") ?? numberField(item, "exit_code");
}
if (item && stringField(item, "type") === "agentMessage") {
completedItem = item;
}
}
if (message.method === "turn/completed") {
resolveTurnCompleted(message.params);
}
});
try {
await client.connect();
const started = await client.startThread({
cwd,
approvalPolicy: "never",
sandbox: "danger-full-access",
ephemeral: args.ephemeral,
experimentalRawEvents: false,
persistExtendedHistory: false,
});
const threadId = started.thread.id;
if (threadName) {
await client.request("thread/name/set", {
threadId,
name: threadName,
});
}
let injectedContext = false;
if (args.injectContext || args.notes.length > 0) {
await injectAssistantText(
client,
threadId,
replayContextText({
candidate,
codexHome: args.codexHome ? path.resolve(args.codexHome) : undefined,
cwd,
metadata,
mode: args.mode,
notes: args.notes,
source,
}),
);
injectedContext = true;
}
if (args.mode === "shim") {
await client.request("thread/shellCommand", {
threadId,
command,
});
} else {
await client.request("thread/codeMode/execute", {
threadId,
source,
});
}
const completed = await withTimeout(
turnCompleted,
args.timeoutMs,
"timed out waiting for Code Mode replay completion",
);
let replayOutput = output.join("");
if (args.mode === "native") {
const read = await client.request("thread/read", {
threadId,
includeTurns: true,
});
replayOutput = latestAgentMessageText(read) ?? replayOutput;
if (args.stream && replayOutput && output.length === 0) {
process.stdout.write(replayOutput.endsWith("\n") ? replayOutput : replayOutput + "\n");
}
}
let injectedResult = false;
if (args.injectResult) {
await injectAssistantText(
client,
threadId,
replayResultText({
candidate,
command,
commandExitCode,
cwd,
mode: args.mode,
output: replayOutput,
}),
);
injectedResult = true;
}
const result = {
threadId,
cwd,
candidate,
mode: args.mode,
command,
commandExitCode: args.mode === "shim" ? commandExitCode : null,
output: replayOutput,
injectedContext,
injectedResult,
threadName,
codexHome: args.codexHome ? path.resolve(args.codexHome) : undefined,
notes: args.notes,
completed,
completedItem,
};
process.stdout.write(JSON.stringify(result, null, 2) + "\n");
process.stdout.write("threadId=" + threadId + "\n");
process.exitCode = args.mode === "shim" ? commandExitCode ?? 0 : 0;
} finally {
client.close();
}
}
function replayCommand(options: {
candidate: string;
cliPath: string;
codexCommand?: string;
cwd: string;
timeoutMs: number;
}) {
const command = [
"bun",
shellQuote(options.cliPath),
"--url",
"stdio://",
"--timeout-ms",
String(options.timeoutMs),
"run-code-mode",
shellQuote(options.candidate),
"--cwd",
shellQuote(options.cwd),
];
if (options.codexCommand) {
command.splice(4, 0, "--codex-command", shellQuote(options.codexCommand));
}
return command.join(" ");
}
function appServerArgs() {
return [
"app-server",
"--listen",
"stdio://",
"--enable",
"apps",
"--enable",
"hooks",
"--enable",
"code_mode",
"--enable",
"code_mode_only",
];
}
function defaultThreadName(candidate: string) {
return "Code Mode replay: " + path.basename(candidate);
}
async function readCandidateMetadata(candidate: string): Promise<CandidateMetadata | undefined> {
const metadataPath = candidate.replace(/\.[^.]+$/, ".json");
try {
const parsed = JSON.parse(await readFile(metadataPath, "utf8")) as unknown;
return isRecord(parsed) ? parsed : undefined;
} catch {
return undefined;
}
}
function metadataCwd(metadata: CandidateMetadata | undefined) {
const cwd = metadata?.cwd;
return typeof cwd === "string" && cwd ? cwd : undefined;
}
function latestAgentMessageText(value: unknown) {
const thread = recordField(value, "thread");
const turns = Array.isArray(thread?.turns) ? thread.turns : [];
for (const turn of turns.slice().reverse()) {
const turnRecord = isRecord(turn) ? turn : undefined;
const items = Array.isArray(turnRecord?.items) ? turnRecord.items : [];
for (const item of items.slice().reverse()) {
if (!isRecord(item) || stringField(item, "type") !== "agentMessage") {
continue;
}
const text = stringField(item, "text");
if (text !== undefined) {
return text;
}
}
}
return undefined;
}
async function injectAssistantText(
client: CodexAppServerClient,
threadId: string,
text: string,
) {
await client.request("thread/inject_items", {
threadId,
items: [
{
type: "message",
role: "assistant",
content: [
{
type: "output_text",
text,
},
],
},
],
});
}
function replayContextText(options: {
candidate: string;
codexHome?: string;
cwd: string;
metadata: CandidateMetadata | undefined;
mode: ReplayMode;
notes: string[];
source: string;
}) {
const parts = [
"Code Mode replay context",
"",
"Candidate: " + options.candidate,
"Working directory: " + options.cwd,
"Replay mode: " + options.mode,
];
if (options.codexHome) {
parts.push("Codex home: " + options.codexHome);
}
if (options.notes.length > 0) {
parts.push("", "Notes:");
for (const note of options.notes) {
parts.push("- " + note);
}
}
parts.push(
"",
"Candidate metadata:",
options.metadata ? formatJson(options.metadata) : "unavailable",
"",
"Saved Code Mode script:",
truncateText(options.source, 50_000),
);
return parts.join("\n");
}
function replayResultText(options: {
candidate: string;
command: string | undefined;
commandExitCode: number | null;
cwd: string;
mode: ReplayMode;
output: string;
}) {
const lines = [
"Code Mode replay result",
"",
"Candidate: " + options.candidate,
"Working directory: " + options.cwd,
"Replay mode: " + options.mode,
];
if (options.command !== undefined) {
lines.push(
"Command exit code: " + String(options.commandExitCode),
"",
"Thread shell command:",
options.command,
);
}
lines.push(
"",
"Replay output:",
truncateText(options.output, 50_000),
);
return lines.join("\n");
}
function truncateText(value: string, limit: number) {
if (value.length <= limit) {
return value;
}
return (
value.slice(0, limit) +
"\n...[truncated " +
String(value.length - limit) +
" chars]"
);
}
function formatJson(value: unknown) {
return JSON.stringify(value, null, 2);
}
async function parseArgs(argv: string[]): Promise<Args> {
let candidate: string | undefined;
let cwd: string | undefined;
let codexCommand = process.env.CODEX_APP_SERVER_CODEX_COMMAND;
let codexHome: string | undefined;
let cliPath = defaultCliPath;
let timeoutMs = 180_000;
let ephemeral = false;
let stream = true;
let injectContext = true;
let injectResult = true;
const notes: string[] = [];
let threadName: string | null | undefined;
let mode: ReplayMode = "native";
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
if (!arg) {
continue;
}
if (arg === "-h" || arg === "--help") {
printHelp();
process.exit(0);
}
if (arg === "--cwd") {
cwd = requiredValue(argv, ++index, "--cwd");
continue;
}
if (arg.startsWith("--cwd=")) {
cwd = arg.slice("--cwd=".length);
continue;
}
if (arg === "--codex-command") {
codexCommand = requiredValue(argv, ++index, "--codex-command");
continue;
}
if (arg.startsWith("--codex-command=")) {
codexCommand = arg.slice("--codex-command=".length);
continue;
}
if (arg === "--codex-home") {
codexHome = requiredValue(argv, ++index, "--codex-home");
continue;
}
if (arg.startsWith("--codex-home=")) {
codexHome = arg.slice("--codex-home=".length);
continue;
}
if (arg === "--native") {
mode = "native";
continue;
}
if (arg === "--shim") {
mode = "shim";
continue;
}
if (arg === "--cli") {
cliPath = requiredValue(argv, ++index, "--cli");
continue;
}
if (arg.startsWith("--cli=")) {
cliPath = arg.slice("--cli=".length);
continue;
}
if (arg === "--timeout-ms") {
timeoutMs = parseTimeout(requiredValue(argv, ++index, "--timeout-ms"));
continue;
}
if (arg.startsWith("--timeout-ms=")) {
timeoutMs = parseTimeout(arg.slice("--timeout-ms=".length));
continue;
}
if (arg === "--ephemeral") {
ephemeral = true;
continue;
}
if (arg === "--no-stream") {
stream = false;
continue;
}
if (arg === "--no-inject-context") {
injectContext = false;
continue;
}
if (arg === "--no-inject-result") {
injectResult = false;
continue;
}
if (arg === "--name") {
threadName = requiredValue(argv, ++index, "--name");
continue;
}
if (arg.startsWith("--name=")) {
threadName = arg.slice("--name=".length);
continue;
}
if (arg === "--no-name") {
threadName = null;
continue;
}
if (arg === "--note") {
notes.push(requiredValue(argv, ++index, "--note"));
continue;
}
if (arg.startsWith("--note=")) {
notes.push(arg.slice("--note=".length));
continue;
}
if (arg.startsWith("-")) {
throw new Error("unknown option: " + arg);
}
if (candidate) {
throw new Error("unexpected positional argument: " + arg);
}
candidate = arg;
}
if (!candidate) {
printHelp();
throw new Error("candidate file is required");
}
return {
candidate,
cwd,
codexCommand,
codexHome,
cliPath,
timeoutMs,
ephemeral,
stream,
injectContext,
injectResult,
notes,
threadName,
mode,
};
}
function requiredValue(argv: string[], index: number, flag: string) {
const value = argv[index];
if (!value) {
throw new Error(flag + " requires a value");
}
return value;
}
function parseTimeout(value: string) {
const parsed = Number(value);
if (!Number.isFinite(parsed) || parsed <= 0) {
throw new Error("invalid --timeout-ms value: " + value);
}
return parsed;
}
function shellQuote(value: string) {
return "'" + value.replaceAll("'", "'\\''") + "'";
}
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number, message: string) {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error(message)), timeoutMs);
}),
]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function recordField(value: unknown, field: string) {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return undefined;
}
const record = value as Record<string, unknown>;
const nested = record[field];
return typeof nested === "object" && nested !== null && !Array.isArray(nested)
? (nested as Record<string, unknown>)
: undefined;
}
function stringField(value: unknown, field: string) {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return undefined;
}
const fieldValue = (value as Record<string, unknown>)[field];
return typeof fieldValue === "string" ? fieldValue : undefined;
}
function numberField(value: unknown, field: string) {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return undefined;
}
const fieldValue = (value as Record<string, unknown>)[field];
return typeof fieldValue === "number" ? fieldValue : undefined;
}
function printHelp() {
process.stdout.write(
[
"Run a saved Code Mode candidate in a new Codex thread without starting a model turn.",
"",
"Usage:",
" bun scripts/run-code-mode-in-new-thread.ts <candidate.mjs> [options]",
"",
"Options:",
" --cwd <dir> Thread cwd. Defaults to candidate sidecar cwd, then process cwd.",
" --codex-command <path> Codex binary for both app-server and replay.",
" Defaults to CODEX_APP_SERVER_CODEX_COMMAND.",
" With CODEX_FLOWS_MODE=code-mode, falls back to",
" bunx @peezy.tech/codex.",
" --codex-home <dir> CODEX_HOME for the spawned app-server, useful for prepared MCP config.",
" --native Use native thread/codeMode/execute replay. This is the default.",
" --shim Use the older TypeScript shell-command shim fallback.",
" --cli <path> codex-app CLI path. Defaults to " + defaultCliPath,
" --timeout-ms <ms> Timeout for app-server requests and completion wait.",
" --ephemeral Create an ephemeral thread.",
" --no-stream Do not stream command output while waiting.",
" --note <text> Add a note to the injected replay context. Repeatable.",
" --name <text> Set the thread title. Defaults to the candidate filename.",
" --no-name Leave the thread title unset.",
" --no-inject-context Skip injecting candidate metadata/source before execution.",
" --no-inject-result Skip injecting the replay summary after execution.",
" -h, --help Show this help.",
"",
].join("\n"),
);
}
main().catch((error) => {
console.error(error instanceof Error ? error.stack ?? error.message : String(error));
process.exit(1);
});

View file

@ -0,0 +1,749 @@
#!/usr/bin/env bun
import { $ } from "bun";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { CodexAppServerClient } from "../packages/codex-client/src/index.ts";
type Args = {
cargoTargetDir: string;
codexCommand?: string;
codexHome?: string;
codexRepo: string;
ephemeral: boolean;
force: boolean;
handledNpmPackage?: string;
npmRegistry: string;
releaseTag?: string;
serviceRepo: string;
stream: boolean;
targetBranch: string;
threadName?: string;
timeoutMs: number;
upstreamRemote: string;
upstreamRepo: string;
};
type ReleaseInfo = {
tagName: string;
name?: string;
publishedAt?: string;
url?: string;
body?: string;
targetCommitish?: string;
};
type HandledNpmRelease = {
packageName: string;
registry: string;
version: string;
};
type CodeModeUpdateResult = {
status: "blocked" | "completed" | "conflict" | "failed";
message?: string;
releaseTag?: string;
releaseUrl?: string;
beforeSha?: string;
afterSha?: string;
codexHead?: string;
commands?: unknown[];
};
const serviceRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "..");
const workspaceRoot = path.resolve(serviceRoot, "..");
const defaultCodexRepo = path.join(workspaceRoot, "codex");
const defaultCargoTargetDir = "/tmp/codex-fork-workspace-target";
const defaultHandledNpmPackage =
process.env.CODEX_UPDATE_HANDLED_NPM_PACKAGE ?? "@peezy.tech/codex";
const defaultNpmRegistry =
process.env.CODEX_UPDATE_NPM_REGISTRY ?? "https://registry.npmjs.org/";
async function main() {
const args = parseArgs(Bun.argv.slice(2));
const release = await latestRelease(args);
const handledRelease = await latestHandledNpmRelease(args, release);
if (!args.force && handledRelease) {
writeJson({
status: "skipped",
message: `Release ${release.tagName} is already covered by ${handledRelease.packageName}@${handledRelease.version}.`,
release,
handledRelease,
});
return;
}
await ensureCodexCommandExists(args.codexCommand);
const result = await runUpdateThread(args, release, handledRelease);
writeJson(result);
process.stdout.write(`threadId=${result.threadId}\n`);
if (result.updateResult?.status === "completed") {
return;
}
if (result.updateResult?.status === "conflict") {
process.exitCode = 2;
return;
}
process.exitCode = 1;
}
async function latestRelease(args: Args): Promise<ReleaseInfo> {
const fields = "tagName,name,publishedAt,url,body,targetCommitish";
try {
const release = args.releaseTag
? await $`gh release view ${args.releaseTag} --repo ${args.upstreamRepo} --json ${fields}`.json()
: await $`gh release view --repo ${args.upstreamRepo} --json ${fields}`.json();
return requireReleaseInfo(release);
} catch (error) {
throw new Error(`Failed to read ${args.upstreamRepo} release: ${errorMessage(error)}`);
}
}
async function latestHandledNpmRelease(
args: Args,
release: ReleaseInfo,
): Promise<HandledNpmRelease | undefined> {
if (!args.handledNpmPackage) {
return undefined;
}
const version = releaseVersion(release.tagName);
if (!version) {
throw new Error(`Could not normalize release tag to an npm version: ${release.tagName}`);
}
const spec = `${args.handledNpmPackage}@${version}`;
const result = await $`npm view ${spec} version --registry ${args.npmRegistry} --json`.nothrow().quiet();
if (result.exitCode !== 0) {
return undefined;
}
try {
const parsed = JSON.parse(result.stdout.toString()) as unknown;
if (parsed !== version) {
throw new Error(`expected ${version}, got ${String(parsed)}`);
}
return {
packageName: args.handledNpmPackage,
registry: args.npmRegistry,
version,
};
} catch (error) {
throw new Error(
`Failed to parse npm package version for ${spec}: ${errorMessage(error)}`,
);
}
}
async function runUpdateThread(
args: Args,
release: ReleaseInfo,
handledRelease: HandledNpmRelease | undefined,
) {
const threadName =
args.threadName ??
`Codex upstream update: ${release.tagName} -> ${args.targetBranch}`;
const source = await updateCodeModeSource(args, release);
const client = new CodexAppServerClient({
transportOptions: {
codexCommand: args.codexCommand,
args: appServerArgs(),
cwd: args.codexRepo,
env: args.codexHome
? { CODEX_HOME: path.resolve(args.codexHome) }
: undefined,
requestTimeoutMs: args.timeoutMs,
},
clientName: "codex-update-thread",
clientTitle: "Codex Update Thread",
clientVersion: "0.1.0",
});
const output: string[] = [];
let threadId = "";
let completedItem: unknown;
let resolveTurnCompleted: (value: unknown) => void = () => undefined;
const turnCompleted = new Promise((resolve) => {
resolveTurnCompleted = resolve;
});
client.on("request", (message) => {
client.respondError(
message.id,
-32603,
"codex update launcher does not handle server requests",
);
});
client.on("notification", (message) => {
if (message.method === "item/commandExecution/outputDelta") {
const delta = stringField(message.params, "delta");
if (delta) {
output.push(delta);
if (args.stream) {
process.stdout.write(delta);
}
}
}
if (message.method === "item/agentMessage/delta") {
const delta = stringField(message.params, "delta");
if (delta) {
output.push(delta);
if (args.stream) {
process.stdout.write(delta);
}
}
}
if (message.method === "item/completed") {
completedItem = recordField(message.params, "item") ?? completedItem;
}
if (
message.method === "turn/completed" &&
(!threadId || stringField(message.params, "threadId") === threadId)
) {
resolveTurnCompleted(message.params);
}
});
try {
await client.connect();
const started = await client.startThread({
cwd: args.codexRepo,
approvalPolicy: "never",
sandbox: "danger-full-access",
ephemeral: args.ephemeral,
experimentalRawEvents: false,
persistExtendedHistory: true,
});
threadId = started.thread.id;
await client.request("thread/name/set", {
threadId,
name: threadName,
});
await injectAssistantText(
client,
threadId,
updateContextText(args, release, handledRelease, source),
);
await client.request("thread/codeMode/execute", {
threadId,
source,
});
const completed = await withTimeout(
turnCompleted,
args.timeoutMs,
"timed out waiting for Codex update Code Mode completion",
);
const read = await client.request("thread/read", {
threadId,
includeTurns: true,
});
const agentText = allAgentMessageText(read).join("\n");
const replayOutput = agentText || output.join("");
const updateResult = parseUpdateResult(replayOutput);
return {
status: updateResult?.status ?? "unknown",
threadId,
threadName,
release,
handledRelease,
codexRepo: args.codexRepo,
serviceRepo: args.serviceRepo,
updateResult,
output: replayOutput,
completed,
completedItem,
};
} finally {
client.close();
}
}
async function updateCodeModeSource(args: Args, release: ReleaseInfo): Promise<string> {
const config = {
cargoTargetDir: args.cargoTargetDir,
codexBinary: path.join(args.cargoTargetDir, "debug", "codex"),
codexRepo: args.codexRepo,
codexRustDir: path.join(args.codexRepo, "codex-rs"),
generatedDir: path.join(
args.serviceRepo,
"packages",
"codex-client",
"src",
"app-server",
"generated",
),
release,
serviceRepo: args.serviceRepo,
targetBranch: args.targetBranch,
upstreamRemote: args.upstreamRemote,
upstreamRepoUrl: `https://github.com/${args.upstreamRepo}.git`,
};
const configSource = `const config = ${JSON.stringify(config, null, 2)};\n`;
const bodySource = await Bun.file(
path.join(serviceRoot, "scripts", "codex-release-update.code-mode.js"),
).text();
return configSource + bodySource;
}
function updateContextText(
args: Args,
release: ReleaseInfo,
handledRelease: HandledNpmRelease | undefined,
source: string,
) {
return [
"Codex upstream update job context",
"",
"Purpose: update the local codex fork branch from the latest openai/codex GitHub release through native Code Mode.",
"",
"Release:",
formatJson(release),
"",
"Paths:",
"- codex repo: " + args.codexRepo,
"- codex-flows repo: " + args.serviceRepo,
"- cargo target dir: " + args.cargoTargetDir,
"- app-server command for this thread: " +
(args.codexCommand ?? "bunx @peezy.tech/codex"),
args.handledNpmPackage
? "- handled npm package: " + args.handledNpmPackage
: "- handled npm package: disabled",
"- npm registry: " + args.npmRegistry,
"",
"Policy:",
"- Do not run a global Codex install.",
"- Rebase " + args.targetBranch + " onto the upstream release tag from " + args.upstreamRepo + ".",
"- If rebase conflicts occur, preserve the paused rebase state and continue this same thread for intervention.",
"- Treat the published npm package version as the durable handled-release marker; do not write local hidden version state.",
"",
"Handled npm package version:",
handledRelease ? formatJson(handledRelease) : "unavailable or disabled",
"",
"Generated Code Mode source:",
truncateText(source, 50_000),
].join("\n");
}
function appServerArgs() {
return [
"app-server",
"--listen",
"stdio://",
"--enable",
"apps",
"--enable",
"hooks",
"--enable",
"code_mode",
"--enable",
"code_mode_only",
];
}
async function injectAssistantText(
client: CodexAppServerClient,
threadId: string,
text: string,
) {
await client.request("thread/inject_items", {
threadId,
items: [
{
type: "message",
role: "assistant",
content: [
{
type: "output_text",
text,
},
],
},
],
});
}
function parseUpdateResult(output: string): CodeModeUpdateResult | undefined {
for (const line of output.split(/\r?\n/).reverse()) {
const prefix = "CODEX_UPDATE_RESULT ";
const index = line.indexOf(prefix);
if (index === -1) {
continue;
}
const text = line.slice(index + prefix.length).trim();
try {
const parsed = JSON.parse(text) as unknown;
if (
isRecord(parsed) &&
(parsed.status === "completed" ||
parsed.status === "conflict" ||
parsed.status === "blocked" ||
parsed.status === "failed")
) {
return parsed as CodeModeUpdateResult;
}
} catch {
return undefined;
}
}
return undefined;
}
async function ensureCodexCommandExists(codexCommand: string | undefined) {
if (!codexCommand) {
return;
}
if (!path.isAbsolute(codexCommand)) {
throw new Error(
`Codex command must be an explicit local fork binary path, not a PATH lookup: ${codexCommand}`,
);
}
const file = Bun.file(codexCommand);
if (!(await file.exists())) {
throw new Error(
`Codex command does not exist: ${codexCommand}. Build the fork binary first or pass --codex-command.`,
);
}
}
function allAgentMessageText(value: unknown) {
const thread = recordField(value, "thread");
const turns = Array.isArray(thread?.turns) ? thread.turns : [];
const texts: string[] = [];
for (const turn of turns) {
const turnRecord = isRecord(turn) ? turn : undefined;
const items = Array.isArray(turnRecord?.items) ? turnRecord.items : [];
for (const item of items) {
if (!isRecord(item) || stringField(item, "type") !== "agentMessage") {
continue;
}
const text = stringField(item, "text");
if (text !== undefined) {
texts.push(text);
}
}
}
return texts;
}
function requireReleaseInfo(value: unknown): ReleaseInfo {
if (!isRecord(value) || typeof value.tagName !== "string" || !value.tagName.trim()) {
throw new Error("GitHub release response did not include tagName");
}
return {
tagName: value.tagName,
...(typeof value.name === "string" ? { name: value.name } : {}),
...(typeof value.publishedAt === "string"
? { publishedAt: value.publishedAt }
: {}),
...(typeof value.url === "string" ? { url: value.url } : {}),
...(typeof value.body === "string" ? { body: value.body } : {}),
...(typeof value.targetCommitish === "string"
? { targetCommitish: value.targetCommitish }
: {}),
};
}
function releaseVersion(tagName: string) {
return tagName.match(/\d+\.\d+\.\d+(?:[-+][0-9A-Za-z.-]+)?/)?.[0];
}
function parseArgs(argv: string[]): Args {
let cargoTargetDir = process.env.CARGO_TARGET_DIR ?? defaultCargoTargetDir;
let codexCommand = process.env.CODEX_APP_SERVER_CODEX_COMMAND;
let codexHome: string | undefined;
let codexRepo = defaultCodexRepo;
let ephemeral = false;
let force = false;
let handledNpmPackage: string | undefined = defaultHandledNpmPackage;
let npmRegistry = defaultNpmRegistry;
let releaseTag: string | undefined;
let serviceRepo = serviceRoot;
let stream = true;
let targetBranch = "code-mode-exec-hooks";
let threadName: string | undefined;
let timeoutMs = 1_800_000;
let upstreamRemote = "upstream";
let upstreamRepo = "openai/codex";
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
if (!arg) {
continue;
}
if (arg === "-h" || arg === "--help") {
printHelp();
process.exit(0);
}
if (arg === "--cargo-target-dir") {
cargoTargetDir = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--cargo-target-dir=")) {
cargoTargetDir = arg.slice("--cargo-target-dir=".length);
continue;
}
if (arg === "--codex-command") {
codexCommand = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--codex-command=")) {
codexCommand = arg.slice("--codex-command=".length);
continue;
}
if (arg === "--codex-home") {
codexHome = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--codex-home=")) {
codexHome = arg.slice("--codex-home=".length);
continue;
}
if (arg === "--codex-repo") {
codexRepo = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--codex-repo=")) {
codexRepo = arg.slice("--codex-repo=".length);
continue;
}
if (arg === "--ephemeral") {
ephemeral = true;
continue;
}
if (arg === "--force") {
force = true;
continue;
}
if (arg === "--handled-npm-package") {
handledNpmPackage = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--handled-npm-package=")) {
handledNpmPackage = arg.slice("--handled-npm-package=".length);
continue;
}
if (arg === "--no-handled-npm-check" || arg === "--no-handled-release-check") {
handledNpmPackage = undefined;
continue;
}
if (arg === "--npm-registry") {
npmRegistry = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--npm-registry=")) {
npmRegistry = arg.slice("--npm-registry=".length);
continue;
}
if (arg === "--release-tag") {
releaseTag = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--release-tag=")) {
releaseTag = arg.slice("--release-tag=".length);
continue;
}
if (arg === "--service-repo") {
serviceRepo = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--service-repo=")) {
serviceRepo = arg.slice("--service-repo=".length);
continue;
}
if (arg === "--no-stream") {
stream = false;
continue;
}
if (arg === "--target-branch") {
targetBranch = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--target-branch=")) {
targetBranch = arg.slice("--target-branch=".length);
continue;
}
if (arg === "--name") {
threadName = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--name=")) {
threadName = arg.slice("--name=".length);
continue;
}
if (arg === "--timeout-ms") {
timeoutMs = parsePositiveInteger(requiredValue(argv, ++index, arg), arg);
continue;
}
if (arg.startsWith("--timeout-ms=")) {
timeoutMs = parsePositiveInteger(arg.slice("--timeout-ms=".length), "--timeout-ms");
continue;
}
if (arg === "--upstream-remote") {
upstreamRemote = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--upstream-remote=")) {
upstreamRemote = arg.slice("--upstream-remote=".length);
continue;
}
if (arg === "--upstream-repo") {
upstreamRepo = requiredValue(argv, ++index, arg);
continue;
}
if (arg.startsWith("--upstream-repo=")) {
upstreamRepo = arg.slice("--upstream-repo=".length);
continue;
}
throw new Error("unknown argument: " + arg);
}
cargoTargetDir = path.resolve(cargoTargetDir);
codexCommand = codexCommand ? resolveCommand(codexCommand) : undefined;
codexRepo = path.resolve(codexRepo);
serviceRepo = path.resolve(serviceRepo);
return {
cargoTargetDir,
codexCommand,
codexHome,
codexRepo,
ephemeral,
force,
handledNpmPackage,
npmRegistry,
releaseTag,
serviceRepo,
stream,
targetBranch,
threadName,
timeoutMs,
upstreamRemote,
upstreamRepo,
};
}
function requiredValue(argv: string[], index: number, flag: string) {
const value = argv[index];
if (!value) {
throw new Error(flag + " requires a value");
}
return value;
}
function parsePositiveInteger(value: string, flag: string) {
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new Error(`invalid ${flag} value: ${value}`);
}
return parsed;
}
function resolveCommand(command: string) {
if (path.isAbsolute(command) || command.includes("/") || command.includes("\\")) {
return path.resolve(command);
}
return command;
}
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number, message: string) {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error(message)), timeoutMs);
}),
]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
function writeJson(value: unknown) {
process.stdout.write(`${JSON.stringify(value, null, 2)}\n`);
}
function formatJson(value: unknown) {
return JSON.stringify(value, null, 2);
}
function truncateText(value: string, limit: number) {
if (value.length <= limit) {
return value;
}
return `${value.slice(0, limit)}\n...[truncated ${value.length - limit} chars]`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function recordField(value: unknown, field: string) {
if (!isRecord(value)) {
return undefined;
}
const nested = value[field];
return isRecord(nested) ? nested : undefined;
}
function stringField(value: unknown, field: string) {
if (!isRecord(value)) {
return undefined;
}
const fieldValue = value[field];
return typeof fieldValue === "string" ? fieldValue : undefined;
}
function errorMessage(error: unknown) {
return error instanceof Error ? error.message : String(error);
}
function printHelp() {
process.stdout.write(
[
"Run the openai/codex release update flow inside a native Code Mode thread.",
"",
"Usage:",
" bun scripts/run-codex-release-update-thread.ts [options]",
"",
"Options:",
" --release-tag <tag> Use a specific openai/codex release tag instead of latest.",
" --force Run even when the handled npm package version exists.",
" --handled-npm-package <pkg> Durable handled npm package. Defaults to " +
defaultHandledNpmPackage,
" --npm-registry <url> npm registry URL. Defaults to " +
defaultNpmRegistry,
" --no-handled-npm-check Do not compare against a handled npm package.",
" --no-handled-release-check Alias for --no-handled-npm-check.",
" --codex-repo <dir> Local codex fork checkout. Defaults to " + defaultCodexRepo,
" --service-repo <dir> codex-flows checkout. Defaults to " + serviceRoot,
" --target-branch <branch> Fork branch to rebase. Defaults to code-mode-exec-hooks.",
" --upstream-repo <owner/repo> GitHub release source. Defaults to openai/codex.",
" --upstream-remote <name> Local remote name for upstream. Defaults to upstream.",
" --cargo-target-dir <dir> Cargo target dir. Defaults to " + defaultCargoTargetDir,
" --codex-command <path> Explicit fork Codex binary used to start app-server.",
" Defaults to CODEX_APP_SERVER_CODEX_COMMAND.",
" With CODEX_FLOWS_MODE=code-mode, falls back to",
" bunx @peezy.tech/codex.",
" --codex-home <dir> CODEX_HOME for the spawned app-server.",
" --timeout-ms <ms> App-server request and flow timeout. Defaults to 1800000.",
" --name <text> Thread name.",
" --ephemeral Create an ephemeral thread.",
" --no-stream Do not stream Code Mode output.",
" -h, --help Show this help.",
"",
"Exit codes:",
" 0 completed or skipped",
" 1 failed or blocked",
" 2 rebase conflict, with rebase state intentionally left paused",
"",
].join("\n"),
);
}
await main().catch((error) => {
process.stderr.write(`${errorMessage(error)}\n`);
process.exitCode = 1;
});