diff options
Diffstat (limited to 'src/os/job.c')
-rw-r--r-- | src/os/job.c | 103 |
1 files changed, 67 insertions, 36 deletions
diff --git a/src/os/job.c b/src/os/job.c index 5bd404e5be..f4cbbfb670 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -7,6 +7,8 @@ #include "os/job_defs.h" #include "os/rstream.h" #include "os/rstream_defs.h" +#include "os/event.h" +#include "os/event_defs.h" #include "os/time.h" #include "os/shell.h" #include "vim.h" @@ -22,12 +24,17 @@ struct job { int id; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; + // exit_cb may be called while there's still pending data from stdout/stderr. + // We use this reference count to ensure the JobExit event is only emitted + // when stdout/stderr are drained + int pending_refs; // If the job was already stopped bool stopped; // Data associated with the job void *data; - // Callback for consuming data from the buffer - job_read_cb read_cb; + // Callbacks + job_exit_cb exit_cb; + rstream_cb stdout_cb, stderr_cb; // Readable streams RStream *out, *err; // Structures for process spawning/management used by libuv @@ -40,7 +47,6 @@ struct job { static Job *table[MAX_RUNNING_JOBS] = {NULL}; static uv_prepare_t job_prepare; -static void read_cb(RStream *rstream, void *data, bool eof); // Some helpers shared in this module static bool is_alive(Job *job); static Job * find_job(int id); @@ -48,9 +54,10 @@ static void free_job(Job *job); // Callbacks for libuv static void job_prepare_cb(uv_prepare_t *handle); -// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); static void write_cb(uv_write_t *req, int status); +static void read_cb(RStream *rstream, void *data, bool eof); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); +static void emit_exit_event(Job *job); void job_init() { @@ -105,7 +112,11 @@ void job_teardown() } } -int job_start(char **argv, void *data, job_read_cb cb) +int job_start(char **argv, + void *data, + rstream_cb stdout_cb, + rstream_cb stderr_cb, + job_exit_cb job_exit_cb) { int i; Job *job; @@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb) job = xmalloc(sizeof(Job)); // Initialize job->id = i + 1; + job->pending_refs = 3; job->data = data; - job->read_cb = cb; + job->stdout_cb = stdout_cb; + job->stderr_cb = stderr_cb; + job->exit_cb = job_exit_cb; job->stopped = false; job->exit_timeout = EXIT_TIMEOUT; job->proc_opts.file = argv[0]; @@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb) } // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len) return true; } -void job_handle(Event event) +void job_exit_event(Event event) { - Job *job = event.data.job.ptr; + Job *job = event.data.job; + + // Invoke the exit callback + job->exit_cb(job, job->data); + + // Free the job resources + table[job->id - 1] = NULL; + shell_free_argv(job->proc_opts.args); + free_job(job); +} - // Invoke the job callback - job->read_cb(job->id, - job->data, - event.data.job.target, - event.data.job.from_stdout); +int job_id(Job *job) +{ + return job->id; +} + +void *job_data(Job *job) +{ + return job->data; } static bool is_alive(Job *job) @@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle) } } -/// Pushes a event object to the event queue, which will be handled later by -/// `job_handle` +static void write_cb(uv_write_t *req, int status) +{ + free(req->data); + free(req); +} + +// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. static void read_cb(RStream *rstream, void *data, bool eof) { - Event event; Job *job = data; - if (eof) { - uv_process_kill(&job->proc, SIGTERM); - return; + if (rstream == job->out) { + job->stdout_cb(rstream, data, eof); + } else { + job->stderr_cb(rstream, data, eof); } - event.type = kEventJobActivity; - event.data.job.ptr = job; - event.data.job.target = rstream; - event.data.job.from_stdout = rstream == job->out; - event_push(event); -} - -static void write_cb(uv_write_t *req, int status) -{ - free(req->data); - free(req); + if (eof && --job->pending_refs == 0) { + emit_exit_event(job); + } } -/// Cleanup all the resources associated with the job +// Emits a JobExit event if both rstreams are closed static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { Job *job = proc->data; - table[job->id - 1] = NULL; - shell_free_argv(job->proc_opts.args); - free_job(job); + if (--job->pending_refs == 0) { + emit_exit_event(job); + } +} + +static void emit_exit_event(Job *job) +{ + Event event; + event.type = kEventJobExit; + event.data.job = job; + event_push(event); } |