diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-04-17 11:59:50 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-04-18 16:11:59 -0300 |
commit | 9acb9607134a461fc342f29a098b83b1bad7134d (patch) | |
tree | 155980a8550ca2e4243e4612129a2cdd3fad499a | |
parent | 350144f5113e111fea0d5b33589d6d478280f298 (diff) | |
download | rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.tar.gz rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.tar.bz2 rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.zip |
Refactor job control to use RStream events
Instead of a single 'job read' callback, job control consumers need to provide
callbacks for "stdout read", "stderr read" and "exit". For vimscript, the
JobActivity autocommand is still used to handle every job event, for example:
```vim
:let srv1_id = jobstart('netcat-server-1', 'nc', ['-l', '9991'])
:let srv2_id = jobstart('netcat-server-2', 'nc', ['-l', '9991'])
function JobEvent()
" v:job_data[0] = the job id
" v:job_data[1] = the event type, one of "stdout", "stderr" or "exit"
" v:job_data[2] = data read from stdout or stderr
if v:job_data[1] == 'stdout'
let str = 'Message from job '.v:job_data[0].': '.v:job_data[2]
elseif v:job_data[1] == 'stderr'
let str = 'Error message from job '.v:job_data[0].': '.v:job_data[2]
else
" Exit
let str = 'Job '.v:job_data[0].' exited'
endif
call append(line('$'), str)
endfunction
au JobActivity netcat-server-* call JobEvent()
```
And to see messages from 'job 1', run in another terminal:
```sh
bash -c "while true; do echo 123; sleep 1; done" | nc localhost 9991
```
-rw-r--r-- | src/eval.c | 74 | ||||
-rw-r--r-- | src/os/event.c | 6 | ||||
-rw-r--r-- | src/os/event_defs.h | 10 | ||||
-rw-r--r-- | src/os/job.c | 103 | ||||
-rw-r--r-- | src/os/job.h | 31 | ||||
-rw-r--r-- | src/os/job_defs.h | 8 |
6 files changed, 150 insertions, 82 deletions
diff --git a/src/eval.c b/src/eval.c index 18d7597a24..8cd9f30f0a 100644 --- a/src/eval.c +++ b/src/eval.c @@ -405,10 +405,11 @@ static struct vimvar { static dictitem_T vimvars_var; /* variable used for v: */ #define vimvarht vimvardict.dv_hashtab -static void apply_job_autocmds(int id, - void *data, - RStream *target, - bool from_stdout); +static void on_job_stdout(RStream *rstream, void *data, bool eof); +static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void on_job_exit(Job *job, void *data); +static void on_job_data(RStream *rstream, void *data, bool eof, char *type); +static void apply_job_autocmds(Job *job, char *name, char *type, char *str); static void prepare_vimvar(int idx, typval_T *save_tv); static void restore_vimvar(int idx, typval_T *save_tv); static int ex_let_vars(char_u *arg, typval_T *tv, int copy, @@ -11073,7 +11074,9 @@ static void f_job_start(typval_T *argvars, typval_T *rettv) rettv->vval.v_number = job_start(argv, xstrdup((char *)argvars[0].vval.v_string), - apply_job_autocmds); + on_job_stdout, + on_job_stderr, + on_job_exit); if (rettv->vval.v_number <= 0) { if (rettv->vval.v_number == 0) { @@ -19796,31 +19799,56 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags) return ret; } -static void apply_job_autocmds(int id, - void *data, - RStream *target, - bool from_stdout) +static void on_job_stdout(RStream *rstream, void *data, bool eof) { - list_T *list; - char *str; - listitem_T *str_slot = listitem_alloc(); - uint32_t read_count = rstream_available(target); + if (!eof) { + on_job_data(rstream, data, eof, "stdout"); + } +} + +static void on_job_stderr(RStream *rstream, void *data, bool eof) +{ + if (!eof) { + on_job_data(rstream, data, eof, "stderr"); + } +} + +static void on_job_exit(Job *job, void *data) +{ + apply_job_autocmds(job, data, "exit", NULL); +} - // Prepare the list item containing the data read - str = xmalloc(read_count + 1); - rstream_read(target, str, read_count); +static void on_job_data(RStream *rstream, void *data, bool eof, char *type) +{ + Job *job = data; + uint32_t read_count = rstream_available(rstream); + char *str = xmalloc(read_count + 1); + + rstream_read(rstream, str, read_count); str[read_count] = NUL; - str_slot->li_tv.v_type = VAR_STRING; - str_slot->li_tv.v_lock = 0; - str_slot->li_tv.vval.v_string = (char_u *)str; + apply_job_autocmds(job, job_data(job), type, str); +} + +static void apply_job_autocmds(Job *job, char *name, char *type, char *str) +{ + list_T *list; + // Create the list which will be set to v:job_data list = list_alloc(); - list_append_number(list, id); - list_append(list, str_slot); - list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3); + list_append_number(list, job_id(job)); + list_append_string(list, (uint8_t *)type, -1); + + if (str) { + listitem_T *str_slot = listitem_alloc(); + str_slot->li_tv.v_type = VAR_STRING; + str_slot->li_tv.v_lock = 0; + str_slot->li_tv.vval.v_string = (uint8_t *)str; + list_append(list, str_slot); + } + // Update v:job_data for the autocommands set_vim_var_list(VV_JOB_DATA, list); // Call JobActivity autocommands - apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL); + apply_autocmds(EVENT_JOBACTIVITY, (uint8_t *)name, NULL, TRUE, NULL); } diff --git a/src/os/event.c b/src/os/event.c index 4c5be1e16b..1a35cb3f12 100644 --- a/src/os/event.c +++ b/src/os/event.c @@ -110,12 +110,12 @@ void event_process() case kEventSignal: signal_handle(event); break; - case kEventJobActivity: - job_handle(event); - break; case kEventRStreamData: rstream_read_event(event); break; + case kEventJobExit: + job_exit_event(event); + break; default: abort(); } diff --git a/src/os/event_defs.h b/src/os/event_defs.h index 5925a31718..428ae50f3e 100644 --- a/src/os/event_defs.h +++ b/src/os/event_defs.h @@ -6,8 +6,8 @@ typedef enum { kEventSignal, - kEventJobActivity, - kEventRStreamData + kEventRStreamData, + kEventJobExit } EventType; typedef struct { @@ -18,11 +18,7 @@ typedef struct { RStream *ptr; bool eof; } rstream; - struct { - Job *ptr; - RStream *target; - bool from_stdout; - } job; + Job *job; } data; } Event; diff --git a/src/os/job.c b/src/os/job.c index 5bd404e5be..f4cbbfb670 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -7,6 +7,8 @@ #include "os/job_defs.h" #include "os/rstream.h" #include "os/rstream_defs.h" +#include "os/event.h" +#include "os/event_defs.h" #include "os/time.h" #include "os/shell.h" #include "vim.h" @@ -22,12 +24,17 @@ struct job { int id; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; + // exit_cb may be called while there's still pending data from stdout/stderr. + // We use this reference count to ensure the JobExit event is only emitted + // when stdout/stderr are drained + int pending_refs; // If the job was already stopped bool stopped; // Data associated with the job void *data; - // Callback for consuming data from the buffer - job_read_cb read_cb; + // Callbacks + job_exit_cb exit_cb; + rstream_cb stdout_cb, stderr_cb; // Readable streams RStream *out, *err; // Structures for process spawning/management used by libuv @@ -40,7 +47,6 @@ struct job { static Job *table[MAX_RUNNING_JOBS] = {NULL}; static uv_prepare_t job_prepare; -static void read_cb(RStream *rstream, void *data, bool eof); // Some helpers shared in this module static bool is_alive(Job *job); static Job * find_job(int id); @@ -48,9 +54,10 @@ static void free_job(Job *job); // Callbacks for libuv static void job_prepare_cb(uv_prepare_t *handle); -// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); static void write_cb(uv_write_t *req, int status); +static void read_cb(RStream *rstream, void *data, bool eof); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); +static void emit_exit_event(Job *job); void job_init() { @@ -105,7 +112,11 @@ void job_teardown() } } -int job_start(char **argv, void *data, job_read_cb cb) +int job_start(char **argv, + void *data, + rstream_cb stdout_cb, + rstream_cb stderr_cb, + job_exit_cb job_exit_cb) { int i; Job *job; @@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb) job = xmalloc(sizeof(Job)); // Initialize job->id = i + 1; + job->pending_refs = 3; job->data = data; - job->read_cb = cb; + job->stdout_cb = stdout_cb; + job->stderr_cb = stderr_cb; + job->exit_cb = job_exit_cb; job->stopped = false; job->exit_timeout = EXIT_TIMEOUT; job->proc_opts.file = argv[0]; @@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb) } // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len) return true; } -void job_handle(Event event) +void job_exit_event(Event event) { - Job *job = event.data.job.ptr; + Job *job = event.data.job; + + // Invoke the exit callback + job->exit_cb(job, job->data); + + // Free the job resources + table[job->id - 1] = NULL; + shell_free_argv(job->proc_opts.args); + free_job(job); +} - // Invoke the job callback - job->read_cb(job->id, - job->data, - event.data.job.target, - event.data.job.from_stdout); +int job_id(Job *job) +{ + return job->id; +} + +void *job_data(Job *job) +{ + return job->data; } static bool is_alive(Job *job) @@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle) } } -/// Pushes a event object to the event queue, which will be handled later by -/// `job_handle` +static void write_cb(uv_write_t *req, int status) +{ + free(req->data); + free(req); +} + +// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. static void read_cb(RStream *rstream, void *data, bool eof) { - Event event; Job *job = data; - if (eof) { - uv_process_kill(&job->proc, SIGTERM); - return; + if (rstream == job->out) { + job->stdout_cb(rstream, data, eof); + } else { + job->stderr_cb(rstream, data, eof); } - event.type = kEventJobActivity; - event.data.job.ptr = job; - event.data.job.target = rstream; - event.data.job.from_stdout = rstream == job->out; - event_push(event); -} - -static void write_cb(uv_write_t *req, int status) -{ - free(req->data); - free(req); + if (eof && --job->pending_refs == 0) { + emit_exit_event(job); + } } -/// Cleanup all the resources associated with the job +// Emits a JobExit event if both rstreams are closed static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { Job *job = proc->data; - table[job->id - 1] = NULL; - shell_free_argv(job->proc_opts.args); - free_job(job); + if (--job->pending_refs == 0) { + emit_exit_event(job); + } +} + +static void emit_exit_event(Job *job) +{ + Event event; + event.type = kEventJobExit; + event.data.job = job; + event_push(event); } diff --git a/src/os/job.h b/src/os/job.h index c0f6734467..37e34700dc 100644 --- a/src/os/job.h +++ b/src/os/job.h @@ -11,7 +11,7 @@ #include <stdbool.h> #include "os/event_defs.h" -#include "os/event.h" +#include "os/rstream_defs.h" /// Initializes job control resources void job_init(void); @@ -24,12 +24,19 @@ void job_teardown(void); /// @param argv Argument vector for the process. The first item is the /// executable to run. /// @param data Caller data that will be associated with the job -/// @param cb Callback that will be invoked everytime data is available in -/// the job's stdout/stderr +/// @param stdout_cb Callback that will be invoked when data is available +/// on stdout +/// @param stderr_cb Callback that will be invoked when data is available +/// on stderr +/// @param exit_cb Callback that will be invoked when the job exits /// @return The job id if the job started successfully. If the the first item / /// of `argv`(the program) could not be executed, -1 will be returned. // 0 will be returned if the job table is full. -int job_start(char **argv, void *data, job_read_cb cb); +int job_start(char **argv, + void *data, + rstream_cb stdout_cb, + rstream_cb stderr_cb, + job_exit_cb exit_cb); /// Terminates a job. This is a non-blocking operation, but if the job exists /// it's guaranteed to succeed(SIGKILL will eventually be sent) @@ -49,10 +56,22 @@ bool job_stop(int id); /// id is invalid(probably because it has already stopped) bool job_write(int id, char *data, uint32_t len); -/// Runs the read callback associated with the job/event +/// Runs the read callback associated with the job exit event /// /// @param event Object containing data necessary to invoke the callback -void job_handle(Event event); +void job_exit_event(Event event); + +/// Get the job id +/// +/// @param job A pointer to the job +/// @return The job id +int job_id(Job *job); + +/// Get data associated with a job +/// +/// @param job A pointer to the job +/// @return The job data +void *job_data(Job *job); #endif // NEOVIM_OS_JOB_H diff --git a/src/os/job_defs.h b/src/os/job_defs.h index ada33d757e..e98b639154 100644 --- a/src/os/job_defs.h +++ b/src/os/job_defs.h @@ -9,13 +9,7 @@ typedef struct job Job; /// /// @param id The job id /// @param data Some data associated with the job by the caller -/// @param target The `RStream` instance containing data to be read -/// @param from_stdout This is true if data was read from the job's stdout, -/// false if it came from stderr. -typedef void (*job_read_cb)(int id, - void *data, - RStream *target, - bool from_stdout); +typedef void (*job_exit_cb)(Job *job, void *data); #endif // NEOVIM_OS_JOB_DEFS_H |