diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-04-17 11:59:50 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-04-18 16:11:59 -0300 |
commit | 9acb9607134a461fc342f29a098b83b1bad7134d (patch) | |
tree | 155980a8550ca2e4243e4612129a2cdd3fad499a /src/os/job.c | |
parent | 350144f5113e111fea0d5b33589d6d478280f298 (diff) | |
download | rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.tar.gz rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.tar.bz2 rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.zip |
Refactor job control to use RStream events
Instead of a single 'job read' callback, job control consumers need to provide
callbacks for "stdout read", "stderr read" and "exit". For vimscript, the
JobActivity autocommand is still used to handle every job event, for example:
```vim
:let srv1_id = jobstart('netcat-server-1', 'nc', ['-l', '9991'])
:let srv2_id = jobstart('netcat-server-2', 'nc', ['-l', '9991'])
function JobEvent()
" v:job_data[0] = the job id
" v:job_data[1] = the event type, one of "stdout", "stderr" or "exit"
" v:job_data[2] = data read from stdout or stderr
if v:job_data[1] == 'stdout'
let str = 'Message from job '.v:job_data[0].': '.v:job_data[2]
elseif v:job_data[1] == 'stderr'
let str = 'Error message from job '.v:job_data[0].': '.v:job_data[2]
else
" Exit
let str = 'Job '.v:job_data[0].' exited'
endif
call append(line('$'), str)
endfunction
au JobActivity netcat-server-* call JobEvent()
```
And to see messages from 'job 1', run in another terminal:
```sh
bash -c "while true; do echo 123; sleep 1; done" | nc localhost 9991
```
Diffstat (limited to 'src/os/job.c')
-rw-r--r-- | src/os/job.c | 103 |
1 files changed, 67 insertions, 36 deletions
diff --git a/src/os/job.c b/src/os/job.c index 5bd404e5be..f4cbbfb670 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -7,6 +7,8 @@ #include "os/job_defs.h" #include "os/rstream.h" #include "os/rstream_defs.h" +#include "os/event.h" +#include "os/event_defs.h" #include "os/time.h" #include "os/shell.h" #include "vim.h" @@ -22,12 +24,17 @@ struct job { int id; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; + // exit_cb may be called while there's still pending data from stdout/stderr. + // We use this reference count to ensure the JobExit event is only emitted + // when stdout/stderr are drained + int pending_refs; // If the job was already stopped bool stopped; // Data associated with the job void *data; - // Callback for consuming data from the buffer - job_read_cb read_cb; + // Callbacks + job_exit_cb exit_cb; + rstream_cb stdout_cb, stderr_cb; // Readable streams RStream *out, *err; // Structures for process spawning/management used by libuv @@ -40,7 +47,6 @@ struct job { static Job *table[MAX_RUNNING_JOBS] = {NULL}; static uv_prepare_t job_prepare; -static void read_cb(RStream *rstream, void *data, bool eof); // Some helpers shared in this module static bool is_alive(Job *job); static Job * find_job(int id); @@ -48,9 +54,10 @@ static void free_job(Job *job); // Callbacks for libuv static void job_prepare_cb(uv_prepare_t *handle); -// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); static void write_cb(uv_write_t *req, int status); +static void read_cb(RStream *rstream, void *data, bool eof); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); +static void emit_exit_event(Job *job); void job_init() { @@ -105,7 +112,11 @@ void job_teardown() } } -int job_start(char **argv, void *data, job_read_cb cb) +int job_start(char **argv, + void *data, + rstream_cb stdout_cb, + rstream_cb stderr_cb, + job_exit_cb job_exit_cb) { int i; Job *job; @@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb) job = xmalloc(sizeof(Job)); // Initialize job->id = i + 1; + job->pending_refs = 3; job->data = data; - job->read_cb = cb; + job->stdout_cb = stdout_cb; + job->stderr_cb = stderr_cb; + job->exit_cb = job_exit_cb; job->stopped = false; job->exit_timeout = EXIT_TIMEOUT; job->proc_opts.file = argv[0]; @@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb) } // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len) return true; } -void job_handle(Event event) +void job_exit_event(Event event) { - Job *job = event.data.job.ptr; + Job *job = event.data.job; + + // Invoke the exit callback + job->exit_cb(job, job->data); + + // Free the job resources + table[job->id - 1] = NULL; + shell_free_argv(job->proc_opts.args); + free_job(job); +} - // Invoke the job callback - job->read_cb(job->id, - job->data, - event.data.job.target, - event.data.job.from_stdout); +int job_id(Job *job) +{ + return job->id; +} + +void *job_data(Job *job) +{ + return job->data; } static bool is_alive(Job *job) @@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle) } } -/// Pushes a event object to the event queue, which will be handled later by -/// `job_handle` +static void write_cb(uv_write_t *req, int status) +{ + free(req->data); + free(req); +} + +// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. static void read_cb(RStream *rstream, void *data, bool eof) { - Event event; Job *job = data; - if (eof) { - uv_process_kill(&job->proc, SIGTERM); - return; + if (rstream == job->out) { + job->stdout_cb(rstream, data, eof); + } else { + job->stderr_cb(rstream, data, eof); } - event.type = kEventJobActivity; - event.data.job.ptr = job; - event.data.job.target = rstream; - event.data.job.from_stdout = rstream == job->out; - event_push(event); -} - -static void write_cb(uv_write_t *req, int status) -{ - free(req->data); - free(req); + if (eof && --job->pending_refs == 0) { + emit_exit_event(job); + } } -/// Cleanup all the resources associated with the job +// Emits a JobExit event if both rstreams are closed static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { Job *job = proc->data; - table[job->id - 1] = NULL; - shell_free_argv(job->proc_opts.args); - free_job(job); + if (--job->pending_refs == 0) { + emit_exit_event(job); + } +} + +static void emit_exit_event(Job *job) +{ + Event event; + event.type = kEventJobExit; + event.data.job = job; + event_push(event); } |