diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/eval.c | 74 | ||||
-rw-r--r-- | src/os/event.c | 6 | ||||
-rw-r--r-- | src/os/event_defs.h | 10 | ||||
-rw-r--r-- | src/os/job.c | 103 | ||||
-rw-r--r-- | src/os/job.h | 31 | ||||
-rw-r--r-- | src/os/job_defs.h | 8 |
6 files changed, 150 insertions, 82 deletions
diff --git a/src/eval.c b/src/eval.c index 18d7597a24..8cd9f30f0a 100644 --- a/src/eval.c +++ b/src/eval.c @@ -405,10 +405,11 @@ static struct vimvar { static dictitem_T vimvars_var; /* variable used for v: */ #define vimvarht vimvardict.dv_hashtab -static void apply_job_autocmds(int id, - void *data, - RStream *target, - bool from_stdout); +static void on_job_stdout(RStream *rstream, void *data, bool eof); +static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void on_job_exit(Job *job, void *data); +static void on_job_data(RStream *rstream, void *data, bool eof, char *type); +static void apply_job_autocmds(Job *job, char *name, char *type, char *str); static void prepare_vimvar(int idx, typval_T *save_tv); static void restore_vimvar(int idx, typval_T *save_tv); static int ex_let_vars(char_u *arg, typval_T *tv, int copy, @@ -11073,7 +11074,9 @@ static void f_job_start(typval_T *argvars, typval_T *rettv) rettv->vval.v_number = job_start(argv, xstrdup((char *)argvars[0].vval.v_string), - apply_job_autocmds); + on_job_stdout, + on_job_stderr, + on_job_exit); if (rettv->vval.v_number <= 0) { if (rettv->vval.v_number == 0) { @@ -19796,31 +19799,56 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags) return ret; } -static void apply_job_autocmds(int id, - void *data, - RStream *target, - bool from_stdout) +static void on_job_stdout(RStream *rstream, void *data, bool eof) { - list_T *list; - char *str; - listitem_T *str_slot = listitem_alloc(); - uint32_t read_count = rstream_available(target); + if (!eof) { + on_job_data(rstream, data, eof, "stdout"); + } +} + +static void on_job_stderr(RStream *rstream, void *data, bool eof) +{ + if (!eof) { + on_job_data(rstream, data, eof, "stderr"); + } +} + +static void on_job_exit(Job *job, void *data) +{ + apply_job_autocmds(job, data, "exit", NULL); +} - // Prepare the list item containing the data read - str = xmalloc(read_count + 1); - rstream_read(target, str, read_count); +static void on_job_data(RStream *rstream, void *data, bool eof, char *type) +{ + Job *job = data; + uint32_t read_count = rstream_available(rstream); + char *str = xmalloc(read_count + 1); + + rstream_read(rstream, str, read_count); str[read_count] = NUL; - str_slot->li_tv.v_type = VAR_STRING; - str_slot->li_tv.v_lock = 0; - str_slot->li_tv.vval.v_string = (char_u *)str; + apply_job_autocmds(job, job_data(job), type, str); +} + +static void apply_job_autocmds(Job *job, char *name, char *type, char *str) +{ + list_T *list; + // Create the list which will be set to v:job_data list = list_alloc(); - list_append_number(list, id); - list_append(list, str_slot); - list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3); + list_append_number(list, job_id(job)); + list_append_string(list, (uint8_t *)type, -1); + + if (str) { + listitem_T *str_slot = listitem_alloc(); + str_slot->li_tv.v_type = VAR_STRING; + str_slot->li_tv.v_lock = 0; + str_slot->li_tv.vval.v_string = (uint8_t *)str; + list_append(list, str_slot); + } + // Update v:job_data for the autocommands set_vim_var_list(VV_JOB_DATA, list); // Call JobActivity autocommands - apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL); + apply_autocmds(EVENT_JOBACTIVITY, (uint8_t *)name, NULL, TRUE, NULL); } diff --git a/src/os/event.c b/src/os/event.c index 4c5be1e16b..1a35cb3f12 100644 --- a/src/os/event.c +++ b/src/os/event.c @@ -110,12 +110,12 @@ void event_process() case kEventSignal: signal_handle(event); break; - case kEventJobActivity: - job_handle(event); - break; case kEventRStreamData: rstream_read_event(event); break; + case kEventJobExit: + job_exit_event(event); + break; default: abort(); } diff --git a/src/os/event_defs.h b/src/os/event_defs.h index 5925a31718..428ae50f3e 100644 --- a/src/os/event_defs.h +++ b/src/os/event_defs.h @@ -6,8 +6,8 @@ typedef enum { kEventSignal, - kEventJobActivity, - kEventRStreamData + kEventRStreamData, + kEventJobExit } EventType; typedef struct { @@ -18,11 +18,7 @@ typedef struct { RStream *ptr; bool eof; } rstream; - struct { - Job *ptr; - RStream *target; - bool from_stdout; - } job; + Job *job; } data; } Event; diff --git a/src/os/job.c b/src/os/job.c index 5bd404e5be..f4cbbfb670 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -7,6 +7,8 @@ #include "os/job_defs.h" #include "os/rstream.h" #include "os/rstream_defs.h" +#include "os/event.h" +#include "os/event_defs.h" #include "os/time.h" #include "os/shell.h" #include "vim.h" @@ -22,12 +24,17 @@ struct job { int id; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; + // exit_cb may be called while there's still pending data from stdout/stderr. + // We use this reference count to ensure the JobExit event is only emitted + // when stdout/stderr are drained + int pending_refs; // If the job was already stopped bool stopped; // Data associated with the job void *data; - // Callback for consuming data from the buffer - job_read_cb read_cb; + // Callbacks + job_exit_cb exit_cb; + rstream_cb stdout_cb, stderr_cb; // Readable streams RStream *out, *err; // Structures for process spawning/management used by libuv @@ -40,7 +47,6 @@ struct job { static Job *table[MAX_RUNNING_JOBS] = {NULL}; static uv_prepare_t job_prepare; -static void read_cb(RStream *rstream, void *data, bool eof); // Some helpers shared in this module static bool is_alive(Job *job); static Job * find_job(int id); @@ -48,9 +54,10 @@ static void free_job(Job *job); // Callbacks for libuv static void job_prepare_cb(uv_prepare_t *handle); -// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); static void write_cb(uv_write_t *req, int status); +static void read_cb(RStream *rstream, void *data, bool eof); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); +static void emit_exit_event(Job *job); void job_init() { @@ -105,7 +112,11 @@ void job_teardown() } } -int job_start(char **argv, void *data, job_read_cb cb) +int job_start(char **argv, + void *data, + rstream_cb stdout_cb, + rstream_cb stderr_cb, + job_exit_cb job_exit_cb) { int i; Job *job; @@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb) job = xmalloc(sizeof(Job)); // Initialize job->id = i + 1; + job->pending_refs = 3; job->data = data; - job->read_cb = cb; + job->stdout_cb = stdout_cb; + job->stderr_cb = stderr_cb; + job->exit_cb = job_exit_cb; job->stopped = false; job->exit_timeout = EXIT_TIMEOUT; job->proc_opts.file = argv[0]; @@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb) } // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len) return true; } -void job_handle(Event event) +void job_exit_event(Event event) { - Job *job = event.data.job.ptr; + Job *job = event.data.job; + + // Invoke the exit callback + job->exit_cb(job, job->data); + + // Free the job resources + table[job->id - 1] = NULL; + shell_free_argv(job->proc_opts.args); + free_job(job); +} - // Invoke the job callback - job->read_cb(job->id, - job->data, - event.data.job.target, - event.data.job.from_stdout); +int job_id(Job *job) +{ + return job->id; +} + +void *job_data(Job *job) +{ + return job->data; } static bool is_alive(Job *job) @@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle) } } -/// Pushes a event object to the event queue, which will be handled later by -/// `job_handle` +static void write_cb(uv_write_t *req, int status) +{ + free(req->data); + free(req); +} + +// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. static void read_cb(RStream *rstream, void *data, bool eof) { - Event event; Job *job = data; - if (eof) { - uv_process_kill(&job->proc, SIGTERM); - return; + if (rstream == job->out) { + job->stdout_cb(rstream, data, eof); + } else { + job->stderr_cb(rstream, data, eof); } - event.type = kEventJobActivity; - event.data.job.ptr = job; - event.data.job.target = rstream; - event.data.job.from_stdout = rstream == job->out; - event_push(event); -} - -static void write_cb(uv_write_t *req, int status) -{ - free(req->data); - free(req); + if (eof && --job->pending_refs == 0) { + emit_exit_event(job); + } } -/// Cleanup all the resources associated with the job +// Emits a JobExit event if both rstreams are closed static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { Job *job = proc->data; - table[job->id - 1] = NULL; - shell_free_argv(job->proc_opts.args); - free_job(job); + if (--job->pending_refs == 0) { + emit_exit_event(job); + } +} + +static void emit_exit_event(Job *job) +{ + Event event; + event.type = kEventJobExit; + event.data.job = job; + event_push(event); } diff --git a/src/os/job.h b/src/os/job.h index c0f6734467..37e34700dc 100644 --- a/src/os/job.h +++ b/src/os/job.h @@ -11,7 +11,7 @@ #include <stdbool.h> #include "os/event_defs.h" -#include "os/event.h" +#include "os/rstream_defs.h" /// Initializes job control resources void job_init(void); @@ -24,12 +24,19 @@ void job_teardown(void); /// @param argv Argument vector for the process. The first item is the /// executable to run. /// @param data Caller data that will be associated with the job -/// @param cb Callback that will be invoked everytime data is available in -/// the job's stdout/stderr +/// @param stdout_cb Callback that will be invoked when data is available +/// on stdout +/// @param stderr_cb Callback that will be invoked when data is available +/// on stderr +/// @param exit_cb Callback that will be invoked when the job exits /// @return The job id if the job started successfully. If the the first item / /// of `argv`(the program) could not be executed, -1 will be returned. // 0 will be returned if the job table is full. -int job_start(char **argv, void *data, job_read_cb cb); +int job_start(char **argv, + void *data, + rstream_cb stdout_cb, + rstream_cb stderr_cb, + job_exit_cb exit_cb); /// Terminates a job. This is a non-blocking operation, but if the job exists /// it's guaranteed to succeed(SIGKILL will eventually be sent) @@ -49,10 +56,22 @@ bool job_stop(int id); /// id is invalid(probably because it has already stopped) bool job_write(int id, char *data, uint32_t len); -/// Runs the read callback associated with the job/event +/// Runs the read callback associated with the job exit event /// /// @param event Object containing data necessary to invoke the callback -void job_handle(Event event); +void job_exit_event(Event event); + +/// Get the job id +/// +/// @param job A pointer to the job +/// @return The job id +int job_id(Job *job); + +/// Get data associated with a job +/// +/// @param job A pointer to the job +/// @return The job data +void *job_data(Job *job); #endif // NEOVIM_OS_JOB_H diff --git a/src/os/job_defs.h b/src/os/job_defs.h index ada33d757e..e98b639154 100644 --- a/src/os/job_defs.h +++ b/src/os/job_defs.h @@ -9,13 +9,7 @@ typedef struct job Job; /// /// @param id The job id /// @param data Some data associated with the job by the caller -/// @param target The `RStream` instance containing data to be read -/// @param from_stdout This is true if data was read from the job's stdout, -/// false if it came from stderr. -typedef void (*job_read_cb)(int id, - void *data, - RStream *target, - bool from_stdout); +typedef void (*job_exit_cb)(Job *job, void *data); #endif // NEOVIM_OS_JOB_DEFS_H |