diff --git a/codex-wrapper/main.go b/codex-wrapper/main.go index 0cd5d9b..2a92a5f 100644 --- a/codex-wrapper/main.go +++ b/codex-wrapper/main.go @@ -167,6 +167,7 @@ type TaskResult struct { Message string `json:"message"` SessionID string `json:"session_id"` Error string `json:"error"` + LogPath string `json:"log_path"` } func parseParallelConfig(data []byte) (*ParallelConfig, error) { @@ -336,6 +337,27 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { failed := make(map[string]TaskResult, totalTasks) resultsCh := make(chan TaskResult, totalTasks) + var startPrintMu sync.Mutex + bannerPrinted := false + + printTaskStart := func(taskID string) { + logger := activeLogger() + if logger == nil { + return + } + path := logger.Path() + if path == "" { + return + } + startPrintMu.Lock() + if !bannerPrinted { + fmt.Fprintln(os.Stderr, "=== Starting Parallel Execution ===") + bannerPrinted = true + } + fmt.Fprintf(os.Stderr, "Task %s: Log: %s\n", taskID, path) + startPrintMu.Unlock() + } + for _, layer := range layers { var wg sync.WaitGroup executed := 0 @@ -357,6 +379,7 @@ func executeConcurrent(layers [][]TaskSpec, timeout int) []TaskResult { resultsCh <- TaskResult{TaskID: ts.ID, ExitCode: 1, Error: fmt.Sprintf("panic: %v", r)} } }() + printTaskStart(ts.ID) resultsCh <- runCodexTaskFn(ts, timeout) }(task) } @@ -422,6 +445,9 @@ func generateFinalOutput(results []TaskResult) string { if res.SessionID != "" { sb.WriteString(fmt.Sprintf("Session: %s\n", res.SessionID)) } + if res.LogPath != "" { + sb.WriteString(fmt.Sprintf("Log: %s\n", res.LogPath)) + } if res.Message != "" { sb.WriteString(fmt.Sprintf("\n%s\n", res.Message)) } @@ -616,7 +642,20 @@ func run() (exitCode int) { fmt.Fprintf(os.Stderr, "[codex-wrapper]\n") fmt.Fprintf(os.Stderr, " Command: %s %s\n", codexCommand, strings.Join(codexArgs, " ")) fmt.Fprintf(os.Stderr, " PID: %d\n", os.Getpid()) - fmt.Fprintf(os.Stderr, " Log: %s\n", logger.Path()) + logPath := "" + if logger != nil { + logPath = logger.Path() + } + fmt.Fprintf(os.Stderr, " Log: %s\n", logPath) + printedLogPath := logPath + printLogPath := func(path string) { + if path == "" || path == printedLogPath { + return + } + fmt.Fprintf(os.Stderr, "[codex-wrapper]\n") + fmt.Fprintf(os.Stderr, " Log: %s\n", path) + printedLogPath = path + } if useStdin { var reasons []string @@ -663,6 +702,7 @@ func run() (exitCode int) { } result := runCodexTask(taskSpec, false, cfg.Timeout) + printLogPath(result.LogPath) if result.ExitCode != 0 { return result.ExitCode @@ -801,8 +841,8 @@ func runCodexProcess(parentCtx context.Context, codexArgs []string, taskText str return res.Message, res.SessionID, res.ExitCode } -func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) TaskResult { - result := TaskResult{TaskID: taskSpec.ID} +func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, customArgs []string, useCustomArgs bool, silent bool, timeoutSec int) (result TaskResult) { + result.TaskID = taskSpec.ID cfg := &Config{ Mode: taskSpec.Mode, @@ -837,6 +877,15 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo return fmt.Sprintf("[Task: %s] %s", taskSpec.ID, msg) } + captureLogPath := func() { + if result.LogPath != "" { + return + } + if logger := activeLogger(); logger != nil { + result.LogPath = logger.Path() + } + } + var logInfoFn func(string) var logWarnFn func(string) var logErrorFn func(string) @@ -877,6 +926,7 @@ func runCodexTaskWithContext(parentCtx context.Context, taskSpec TaskSpec, custo } } defer func() { + captureLogPath() if tempLogger != nil { closeLogger() } diff --git a/codex-wrapper/main_integration_test.go b/codex-wrapper/main_integration_test.go index c29cd1f..5cb4ff0 100644 --- a/codex-wrapper/main_integration_test.go +++ b/codex-wrapper/main_integration_test.go @@ -80,6 +80,8 @@ func parseIntegrationOutput(t *testing.T, out string) integrationOutput { currentTask.Error = strings.TrimPrefix(line, "Error: ") } else if strings.HasPrefix(line, "Session:") { currentTask.SessionID = strings.TrimPrefix(line, "Session: ") + } else if strings.HasPrefix(line, "Log:") { + currentTask.LogPath = strings.TrimSpace(strings.TrimPrefix(line, "Log:")) } else if line != "" && !strings.HasPrefix(line, "===") && !strings.HasPrefix(line, "---") { if currentTask.Message != "" { currentTask.Message += "\n" @@ -96,6 +98,32 @@ func parseIntegrationOutput(t *testing.T, out string) integrationOutput { return payload } +func extractTaskBlock(t *testing.T, output, taskID string) string { + t.Helper() + header := fmt.Sprintf("--- Task: %s ---", taskID) + lines := strings.Split(output, "\n") + var block []string + collecting := false + for _, raw := range lines { + trimmed := strings.TrimSpace(raw) + if !collecting { + if trimmed == header { + collecting = true + block = append(block, trimmed) + } + continue + } + if strings.HasPrefix(trimmed, "--- Task: ") && trimmed != header { + break + } + block = append(block, trimmed) + } + if len(block) == 0 { + t.Fatalf("task block %s not found in output:\n%s", taskID, output) + } + return strings.Join(block, "\n") +} + func findResultByID(t *testing.T, payload integrationOutput, id string) TaskResult { t.Helper() for _, res := range payload.Results { @@ -256,6 +284,194 @@ b` } } +func TestRunParallelOutputsIncludeLogPaths(t *testing.T) { + defer resetTestHooks() + origRun := runCodexTaskFn + t.Cleanup(func() { + runCodexTaskFn = origRun + resetTestHooks() + }) + + tempDir := t.TempDir() + logPathFor := func(id string) string { + return filepath.Join(tempDir, fmt.Sprintf("%s.log", id)) + } + + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + res := TaskResult{ + TaskID: task.ID, + Message: fmt.Sprintf("result-%s", task.ID), + SessionID: fmt.Sprintf("session-%s", task.ID), + LogPath: logPathFor(task.ID), + } + if task.ID == "beta" { + res.ExitCode = 9 + res.Error = "boom" + } + return res + } + + input := `---TASK--- +id: alpha +---CONTENT--- +task-alpha +---TASK--- +id: beta +---CONTENT--- +task-beta` + stdinReader = bytes.NewReader([]byte(input)) + os.Args = []string{"codex-wrapper", "--parallel"} + + var exitCode int + output := captureStdout(t, func() { + exitCode = run() + }) + + if exitCode != 9 { + t.Fatalf("parallel run exit=%d, want 9", exitCode) + } + + payload := parseIntegrationOutput(t, output) + alpha := findResultByID(t, payload, "alpha") + beta := findResultByID(t, payload, "beta") + + if alpha.LogPath != logPathFor("alpha") { + t.Fatalf("alpha log path = %q, want %q", alpha.LogPath, logPathFor("alpha")) + } + if beta.LogPath != logPathFor("beta") { + t.Fatalf("beta log path = %q, want %q", beta.LogPath, logPathFor("beta")) + } + + for _, id := range []string{"alpha", "beta"} { + want := fmt.Sprintf("Log: %s", logPathFor(id)) + if !strings.Contains(output, want) { + t.Fatalf("parallel output missing %q for %s:\n%s", want, id, output) + } + } +} + +func TestRunParallelStartupLogsPrinted(t *testing.T) { + defer resetTestHooks() + + tempDir := setTempDirEnv(t, t.TempDir()) + input := `---TASK--- +id: a +---CONTENT--- +fail +---TASK--- +id: b +---CONTENT--- +ok-b +---TASK--- +id: c +dependencies: a +---CONTENT--- +should-skip +---TASK--- +id: d +---CONTENT--- +ok-d` + stdinReader = bytes.NewReader([]byte(input)) + os.Args = []string{"codex-wrapper", "--parallel"} + + expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid())) + + origRun := runCodexTaskFn + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + path := expectedLog + if logger := activeLogger(); logger != nil && logger.Path() != "" { + path = logger.Path() + } + if task.ID == "a" { + return TaskResult{TaskID: task.ID, ExitCode: 3, Error: "boom", LogPath: path} + } + return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task, LogPath: path} + } + t.Cleanup(func() { runCodexTaskFn = origRun }) + + var exitCode int + var stdoutOut string + stderrOut := captureStderr(t, func() { + stdoutOut = captureStdout(t, func() { + exitCode = run() + }) + }) + + if exitCode == 0 { + t.Fatalf("expected non-zero exit due to task failure, got %d", exitCode) + } + if stdoutOut == "" { + t.Fatalf("expected parallel summary on stdout") + } + + lines := strings.Split(strings.TrimSpace(stderrOut), "\n") + var bannerSeen bool + var taskLines []string + for _, raw := range lines { + line := strings.TrimSpace(raw) + if line == "" { + continue + } + if line == "=== Starting Parallel Execution ===" { + if bannerSeen { + t.Fatalf("banner printed multiple times:\n%s", stderrOut) + } + bannerSeen = true + continue + } + taskLines = append(taskLines, line) + } + + if !bannerSeen { + t.Fatalf("expected startup banner in stderr, got:\n%s", stderrOut) + } + + expectedLines := map[string]struct{}{ + fmt.Sprintf("Task a: Log: %s", expectedLog): {}, + fmt.Sprintf("Task b: Log: %s", expectedLog): {}, + fmt.Sprintf("Task d: Log: %s", expectedLog): {}, + } + + if len(taskLines) != len(expectedLines) { + t.Fatalf("startup log lines mismatch, got %d lines:\n%s", len(taskLines), stderrOut) + } + + for _, line := range taskLines { + if _, ok := expectedLines[line]; !ok { + t.Fatalf("unexpected startup line %q\nstderr:\n%s", line, stderrOut) + } + } +} + +func TestRunNonParallelOutputsIncludeLogPathsIntegration(t *testing.T) { + defer resetTestHooks() + + tempDir := setTempDirEnv(t, t.TempDir()) + os.Args = []string{"codex-wrapper", "integration-log-check"} + stdinReader = strings.NewReader("") + isTerminalFn = func() bool { return true } + codexCommand = "echo" + buildCodexArgsFn = func(cfg *Config, targetArg string) []string { + return []string{`{"type":"thread.started","thread_id":"integration-session"}` + "\n" + `{"type":"item.completed","item":{"type":"agent_message","text":"done"}}`} + } + + var exitCode int + stderr := captureStderr(t, func() { + _ = captureStdout(t, func() { + exitCode = run() + }) + }) + + if exitCode != 0 { + t.Fatalf("run() exit=%d, want 0", exitCode) + } + expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid())) + wantLine := fmt.Sprintf("Log: %s", expectedLog) + if !strings.Contains(stderr, wantLine) { + t.Fatalf("stderr missing %q, got: %q", wantLine, stderr) + } +} + func TestRunParallelPartialFailureBlocksDependents(t *testing.T) { defer resetTestHooks() origRun := runCodexTaskFn @@ -264,11 +480,17 @@ func TestRunParallelPartialFailureBlocksDependents(t *testing.T) { resetTestHooks() }) + tempDir := t.TempDir() + logPathFor := func(id string) string { + return filepath.Join(tempDir, fmt.Sprintf("%s.log", id)) + } + runCodexTaskFn = func(task TaskSpec, timeout int) TaskResult { + path := logPathFor(task.ID) if task.ID == "A" { - return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom"} + return TaskResult{TaskID: "A", ExitCode: 2, Error: "boom", LogPath: path} } - return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task} + return TaskResult{TaskID: task.ID, ExitCode: 0, Message: task.Task, LogPath: path} } input := `---TASK--- @@ -318,6 +540,26 @@ ok-e` if payload.Summary.Failed != 2 || payload.Summary.Total != 4 { t.Fatalf("unexpected summary after partial failure: %+v", payload.Summary) } + if resA.LogPath != logPathFor("A") { + t.Fatalf("task A log path = %q, want %q", resA.LogPath, logPathFor("A")) + } + if resB.LogPath != "" { + t.Fatalf("task B should not report a log path when skipped, got %q", resB.LogPath) + } + if resD.LogPath != logPathFor("D") || resE.LogPath != logPathFor("E") { + t.Fatalf("expected log paths for D/E, got D=%q E=%q", resD.LogPath, resE.LogPath) + } + for _, id := range []string{"A", "D", "E"} { + block := extractTaskBlock(t, output, id) + want := fmt.Sprintf("Log: %s", logPathFor(id)) + if !strings.Contains(block, want) { + t.Fatalf("task %s block missing %q:\n%s", id, want, block) + } + } + blockB := extractTaskBlock(t, output, "B") + if strings.Contains(blockB, "Log:") { + t.Fatalf("skipped task B should not emit a log line:\n%s", blockB) + } } func TestRunParallelTimeoutPropagation(t *testing.T) { diff --git a/codex-wrapper/main_test.go b/codex-wrapper/main_test.go index 53f819c..872e06a 100644 --- a/codex-wrapper/main_test.go +++ b/codex-wrapper/main_test.go @@ -618,7 +618,7 @@ func TestFakeCmdInfra(t *testing.T) { Data: `{"type":"item.completed","item":{"type":"agent_message","text":"fake-msg"}}` + "\n", }, }, - WaitDelay: 5 * time.Millisecond, + WaitDelay: 5 * time.Millisecond, }) newCommandRunner = func(ctx context.Context, name string, args ...string) commandRunner { @@ -1415,6 +1415,83 @@ func TestRunCodexTask_WithEcho(t *testing.T) { } } +func TestRunCodexTask_LogPathWithActiveLogger(t *testing.T) { + defer resetTestHooks() + + logger, err := NewLoggerWithSuffix("active-logpath") + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + setLogger(logger) + + codexCommand = "echo" + buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{targetArg} } + + jsonOutput := `{"type":"thread.started","thread_id":"fake-thread"} +{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}` + + result := runCodexTask(TaskSpec{Task: jsonOutput}, false, 5) + if result.LogPath != logger.Path() { + t.Fatalf("LogPath = %q, want %q", result.LogPath, logger.Path()) + } + if result.ExitCode != 0 { + t.Fatalf("exit = %d, want 0 (%s)", result.ExitCode, result.Error) + } +} + +func TestRunCodexTask_LogPathWithTempLogger(t *testing.T) { + defer resetTestHooks() + + codexCommand = "echo" + buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{targetArg} } + + jsonOutput := `{"type":"thread.started","thread_id":"temp-thread"} +{"type":"item.completed","item":{"type":"agent_message","text":"temp"}}` + + result := runCodexTask(TaskSpec{Task: jsonOutput}, true, 5) + t.Cleanup(func() { + if result.LogPath != "" { + os.Remove(result.LogPath) + } + }) + if result.LogPath == "" { + t.Fatalf("LogPath should not be empty for temp logger") + } + if _, err := os.Stat(result.LogPath); err != nil { + t.Fatalf("log file %q should exist (err=%v)", result.LogPath, err) + } + if activeLogger() != nil { + t.Fatalf("active logger should be cleared after silent run") + } +} + +func TestRunCodexTask_LogPathOnStartError(t *testing.T) { + defer resetTestHooks() + + logger, err := NewLoggerWithSuffix("start-error") + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + setLogger(logger) + + tmpFile, err := os.CreateTemp("", "start-error") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + + codexCommand = tmpFile.Name() + buildCodexArgsFn = func(cfg *Config, targetArg string) []string { return []string{} } + + result := runCodexTask(TaskSpec{Task: "ignored"}, false, 5) + if result.ExitCode == 0 { + t.Fatalf("expected non-zero exit") + } + if result.LogPath != logger.Path() { + t.Fatalf("LogPath = %q, want %q", result.LogPath, logger.Path()) + } +} + func TestRunCodexTask_NoMessage(t *testing.T) { defer resetTestHooks() codexCommand = "echo" @@ -1568,6 +1645,34 @@ func TestRunGenerateFinalOutput(t *testing.T) { if !strings.Contains(out, "Task: a") || !strings.Contains(out, "Task: b") { t.Fatalf("task entries missing") } + if strings.Contains(out, "Log:") { + t.Fatalf("unexpected log line when LogPath empty, got %q", out) + } +} + +func TestRunGenerateFinalOutput_LogPath(t *testing.T) { + results := []TaskResult{ + { + TaskID: "a", + ExitCode: 0, + Message: "ok", + SessionID: "sid", + LogPath: "/tmp/log-a", + }, + { + TaskID: "b", + ExitCode: 7, + Error: "bad", + LogPath: "/tmp/log-b", + }, + } + out := generateFinalOutput(results) + if !strings.Contains(out, "Session: sid\nLog: /tmp/log-a") { + t.Fatalf("output missing log line after session: %q", out) + } + if !strings.Contains(out, "Log: /tmp/log-b") { + t.Fatalf("output missing log line for failed task: %q", out) + } } func TestRunTopologicalSort_LinearChain(t *testing.T) { @@ -2321,6 +2426,27 @@ func TestRunStartupCleanupNil(t *testing.T) { runStartupCleanup() } +func TestRunStartupCleanupErrorLogged(t *testing.T) { + defer resetTestHooks() + + logger, err := NewLoggerWithSuffix("startup-error") + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + setLogger(logger) + t.Cleanup(func() { + logger.Flush() + logger.Close() + os.Remove(logger.Path()) + }) + + cleanupLogsFn = func() (CleanupStats, error) { + return CleanupStats{}, errors.New("zapped") + } + + runStartupCleanup() +} + func TestRun_CleanupFailureDoesNotBlock(t *testing.T) { defer resetTestHooks() stdout := captureStdoutPipe() @@ -2440,6 +2566,17 @@ func TestRunLogWriter(t *testing.T) { os.Remove(logger.Path()) } +func TestNewLogWriterDefaultLimit(t *testing.T) { + lw := newLogWriter("TEST: ", 0) + if lw.maxLen != codexLogLineLimit { + t.Fatalf("newLogWriter maxLen = %d, want %d", lw.maxLen, codexLogLineLimit) + } + lw = newLogWriter("TEST: ", -5) + if lw.maxLen != codexLogLineLimit { + t.Fatalf("negative maxLen should default, got %d", lw.maxLen) + } +} + func TestRunDiscardInvalidJSON(t *testing.T) { reader := bufio.NewReader(strings.NewReader("bad line\n{\"type\":\"ok\"}\n")) next, err := discardInvalidJSON(nil, reader) @@ -2529,6 +2666,149 @@ func TestRunForwardSignals(t *testing.T) { } } +func TestRunNonParallelPrintsLogPath(t *testing.T) { + defer resetTestHooks() + + tempDir := t.TempDir() + t.Setenv("TMPDIR", tempDir) + + os.Args = []string{"codex-wrapper", "do-stuff"} + stdinReader = strings.NewReader("") + isTerminalFn = func() bool { return true } + codexCommand = "echo" + buildCodexArgsFn = func(cfg *Config, targetArg string) []string { + return []string{`{"type":"thread.started","thread_id":"cli-session"}` + "\n" + `{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}`} + } + + var exitCode int + stderr := captureStderr(t, func() { + _ = captureOutput(t, func() { + exitCode = run() + }) + }) + if exitCode != 0 { + t.Fatalf("run() exit = %d, want 0", exitCode) + } + expectedLog := filepath.Join(tempDir, fmt.Sprintf("codex-wrapper-%d.log", os.Getpid())) + wantLine := fmt.Sprintf("Log: %s", expectedLog) + if !strings.Contains(stderr, wantLine) { + t.Fatalf("stderr missing %q, got: %q", wantLine, stderr) + } +} + +func TestRealProcessNilSafety(t *testing.T) { + var proc *realProcess + if pid := proc.Pid(); pid != 0 { + t.Fatalf("Pid() = %d, want 0", pid) + } + if err := proc.Kill(); err != nil { + t.Fatalf("Kill() error = %v", err) + } + if err := proc.Signal(syscall.SIGTERM); err != nil { + t.Fatalf("Signal() error = %v", err) + } +} + +func TestRealProcessKill(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("sleep command not available on Windows") + } + + cmd := exec.Command("sleep", "5") + if err := cmd.Start(); err != nil { + t.Skipf("unable to start sleep command: %v", err) + } + waited := false + defer func() { + if waited { + return + } + if cmd.Process != nil { + _ = cmd.Process.Kill() + cmd.Wait() + } + }() + + proc := &realProcess{proc: cmd.Process} + if proc.Pid() == 0 { + t.Fatalf("Pid() returned 0 for active process") + } + if err := proc.Kill(); err != nil { + t.Fatalf("Kill() error = %v", err) + } + waitErr := cmd.Wait() + waited = true + if waitErr == nil { + t.Fatalf("Kill() should lead to non-nil wait error") + } +} + +func TestRealProcessSignal(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("sleep command not available on Windows") + } + + cmd := exec.Command("sleep", "5") + if err := cmd.Start(); err != nil { + t.Skipf("unable to start sleep command: %v", err) + } + waited := false + defer func() { + if waited { + return + } + if cmd.Process != nil { + _ = cmd.Process.Kill() + cmd.Wait() + } + }() + + proc := &realProcess{proc: cmd.Process} + if err := proc.Signal(syscall.SIGTERM); err != nil { + t.Fatalf("Signal() error = %v", err) + } + waitErr := cmd.Wait() + waited = true + if waitErr == nil { + t.Fatalf("Signal() should lead to non-nil wait error") + } +} + +func TestRealCmdProcess(t *testing.T) { + rc := &realCmd{} + if rc.Process() != nil { + t.Fatalf("Process() should return nil when realCmd has no command") + } + rc = &realCmd{cmd: &exec.Cmd{}} + if rc.Process() != nil { + t.Fatalf("Process() should return nil when exec.Cmd has no process") + } + + if runtime.GOOS == "windows" { + return + } + + cmd := exec.Command("sleep", "5") + if err := cmd.Start(); err != nil { + t.Skipf("unable to start sleep command: %v", err) + } + defer func() { + if cmd.Process != nil { + _ = cmd.Process.Kill() + cmd.Wait() + } + }() + + rc = &realCmd{cmd: cmd} + handle := rc.Process() + if handle == nil { + t.Fatalf("expected non-nil process handle") + } + if pid := handle.Pid(); pid == 0 { + t.Fatalf("process handle returned pid=0") + } +} + func TestRun_CLI_Success(t *testing.T) { defer resetTestHooks() os.Args = []string{"codex-wrapper", "do-things"}