diff options
-rw-r--r-- | src/eval.c | 26 | ||||
-rw-r--r-- | src/os/event_defs.h | 7 | ||||
-rw-r--r-- | src/os/job.c | 87 | ||||
-rw-r--r-- | src/os/job.h | 15 | ||||
-rw-r--r-- | src/os/job_defs.h | 15 |
5 files changed, 64 insertions, 86 deletions
diff --git a/src/eval.c b/src/eval.c index 4704af1a68..ba34d83e41 100644 --- a/src/eval.c +++ b/src/eval.c @@ -65,6 +65,8 @@ #include "os/os.h" #include "os/job.h" #include "os/shell.h" +#include "os/rstream.h" +#include "os/rstream_defs.h" #if defined(FEAT_FLOAT) # include <math.h> @@ -406,8 +408,7 @@ static dictitem_T vimvars_var; /* variable used for v: */ static void apply_job_autocmds(int id, void *data, - char *buffer, - uint32_t len, + RStream *target, bool from_stdout); static void prepare_vimvar(int idx, typval_T *save_tv); static void restore_vimvar(int idx, typval_T *save_tv); @@ -19798,18 +19799,29 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags) static void apply_job_autocmds(int id, void *data, - char *buffer, - uint32_t len, + RStream *target, bool from_stdout) { list_T *list; - - // Call JobActivity autocommands + char *str; + listitem_T *str_slot = listitem_alloc(); + uint32_t read_count = rstream_available(target); + + // Prepare the list item containing the data read + str = xmalloc(read_count + 1); + rstream_read(target, str, read_count); + str[read_count] = NUL; + str_slot->li_tv.v_type = VAR_STRING; + str_slot->li_tv.v_lock = 0; + str_slot->li_tv.vval.v_string = (char_u *)str; + // Create the list which will be set to v:job_data list = list_alloc(); list_append_number(list, id); - list_append_string(list, (char_u *)buffer, len); + list_append(list, str_slot); list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3); + // Update v:job_data for the autocommands set_vim_var_list(VV_JOB_DATA, list); + // Call JobActivity autocommands apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL); } diff --git a/src/os/event_defs.h b/src/os/event_defs.h index a72462b18a..8e00324ba6 100644 --- a/src/os/event_defs.h +++ b/src/os/event_defs.h @@ -2,6 +2,7 @@ #define NEOVIM_OS_EVENT_DEFS_H #include "os/job_defs.h" +#include "os/rstream_defs.h" typedef enum { kEventSignal, @@ -12,7 +13,11 @@ typedef struct { EventType type; union { int signum; - Job *job; + struct { + Job *ptr; + RStream *target; + bool from_stdout; + } job; } data; } Event; diff --git a/src/os/job.c b/src/os/job.c index 0d62940013..5dd4f7abe1 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -5,6 +5,8 @@ #include "os/job.h" #include "os/job_defs.h" +#include "os/rstream.h" +#include "os/rstream_defs.h" #include "os/time.h" #include "os/shell.h" #include "vim.h" @@ -15,13 +17,6 @@ #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 1024 -/// Possible lock states of the job buffer -typedef enum { - kBufferLockNone = 0, ///< No data was read - kBufferLockStdout, ///< Data read from stdout - kBufferLockStderr ///< Data read from stderr -} BufferLock; - struct job { // Job id the index in the job table plus one. int id; @@ -31,14 +26,10 @@ struct job { bool stopped; // Data associated with the job void *data; - // Buffer for reading from stdout or stderr - char buffer[JOB_BUFFER_SIZE]; - // Size of the data from the last read - uint32_t length; - // Buffer lock state - BufferLock lock; // Callback for consuming data from the buffer job_read_cb read_cb; + // Readable streams + RStream *out, *err; // Structures for process spawning/management used by libuv uv_process_t proc; uv_process_options_t proc_opts; @@ -49,6 +40,7 @@ struct job { static Job *table[MAX_RUNNING_JOBS] = {NULL}; static uv_prepare_t job_prepare; +static void read_cb(RStream *rstream, void *data, bool eof); // Some helpers shared in this module static bool is_alive(Job *job); static Job * find_job(int id); @@ -56,8 +48,7 @@ static void free_job(Job *job); // Callbacks for libuv static void job_prepare_cb(uv_prepare_t *handle); -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf); -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); +// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); static void write_cb(uv_write_t *req, int status); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); @@ -154,12 +145,10 @@ int job_start(char **argv, void *data, job_read_cb cb) job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); - job->proc_stdout.data = job; job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); - job->proc_stderr.data = job; job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; @@ -170,8 +159,12 @@ int job_start(char **argv, void *data, job_read_cb cb) } // Start the readable streams - uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); - uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job); + rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); + rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); + rstream_start(job->out); + rstream_start(job->err); // Give the callback a reference to the job job->proc.data = job; // Save the job to the table @@ -217,19 +210,13 @@ bool job_write(int id, char *data, uint32_t len) void job_handle(Event event) { - Job *job = event.data.job; + Job *job = event.data.job.ptr; // Invoke the job callback job->read_cb(job->id, job->data, - job->buffer, - job->length, - job->lock == kBufferLockStdout); - - // restart reading - job->lock = kBufferLockNone; - uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); - uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); + event.data.job.target, + event.data.job.from_stdout); } static bool is_alive(Job *job) @@ -252,6 +239,8 @@ static void free_job(Job *job) uv_close((uv_handle_t *)&job->proc_stdin, NULL); uv_close((uv_handle_t *)&job->proc_stderr, NULL); uv_close((uv_handle_t *)&job->proc, NULL); + rstream_free(job->out); + rstream_free(job->err); free(job); } @@ -277,50 +266,22 @@ static void job_prepare_cb(uv_prepare_t *handle) } } -/// Puts the job into a 'reading state' which 'locks' the job buffer -/// until the data is consumed -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) -{ - Job *job = (Job *)handle->data; - - if (job->lock != kBufferLockNone) { - // Already reserved the buffer for reading from stdout or stderr. - buf->len = 0; - return; - } - - buf->base = job->buffer; - buf->len = JOB_BUFFER_SIZE; - // Avoid `alloc_cb`, `alloc_cb` sequences on windows and also mark which - // stream we are reading from - job->lock = - (handle == (uv_handle_t *)&job->proc_stdout) ? - kBufferLockStdout : - kBufferLockStderr; -} - /// Pushes a event object to the event queue, which will be handled later by /// `job_handle` -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) +static void read_cb(RStream *rstream, void *data, bool eof) { Event event; - Job *job = (Job *)stream->data; - // pause reading on both streams - uv_read_stop((uv_stream_t *)&job->proc_stdout); - uv_read_stop((uv_stream_t *)&job->proc_stderr); + Job *job = data; - if (cnt <= 0) { - if (cnt != UV_ENOBUFS) { - // Assume it's EOF and exit the job. Doesn't harm sending a SIGTERM - // at this point - uv_process_kill(&job->proc, SIGTERM); - } + if (eof) { + uv_process_kill(&job->proc, SIGTERM); return; } - job->length = cnt; event.type = kEventJobActivity; - event.data.job = job; + event.data.job.ptr = job; + event.data.job.target = rstream; + event.data.job.from_stdout = rstream == job->out; event_push(event); } diff --git a/src/os/job.h b/src/os/job.h index b0f9c99bb8..594a734bab 100644 --- a/src/os/job.h +++ b/src/os/job.h @@ -12,21 +12,6 @@ #include "os/event.h" -/// Function called when the job reads data -/// -/// @param id The job is -/// @param data Some data associated with the job by the caller -/// @param buffer Buffer containing the data read. It must be copied -/// immediately. -/// @param len Amount of bytes that must be read from `buffer` -/// @param from_stdout This is true if data was read from the job's stdout, -/// false if it came from stderr. -typedef void (*job_read_cb)(int id, - void *data, - char *buffer, - uint32_t len, - bool from_stdout); - /// Initializes job control resources void job_init(void); diff --git a/src/os/job_defs.h b/src/os/job_defs.h index 27a1133c2c..ada33d757e 100644 --- a/src/os/job_defs.h +++ b/src/os/job_defs.h @@ -1,6 +1,21 @@ #ifndef NEOVIM_OS_JOB_DEFS_H #define NEOVIM_OS_JOB_DEFS_H +#include "os/rstream_defs.h" + typedef struct job Job; +/// Function called when the job reads data +/// +/// @param id The job id +/// @param data Some data associated with the job by the caller +/// @param target The `RStream` instance containing data to be read +/// @param from_stdout This is true if data was read from the job's stdout, +/// false if it came from stderr. +typedef void (*job_read_cb)(int id, + void *data, + RStream *target, + bool from_stdout); + #endif // NEOVIM_OS_JOB_DEFS_H + |