diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-04-15 15:34:16 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-04-16 09:56:45 -0300 |
commit | 6e4e40a0f7f20614dff580a29f5e9b220b72f2f8 (patch) | |
tree | 8116940fa08f5d833e7ad7a8308de93b23209797 /src/os/job.c | |
parent | 001d05541b8f494b895553468be97ae509e425c6 (diff) | |
download | rneovim-6e4e40a0f7f20614dff580a29f5e9b220b72f2f8.tar.gz rneovim-6e4e40a0f7f20614dff580a29f5e9b220b72f2f8.tar.bz2 rneovim-6e4e40a0f7f20614dff580a29f5e9b220b72f2f8.zip |
Refactor job control module to use RStream class
Diffstat (limited to 'src/os/job.c')
-rw-r--r-- | src/os/job.c | 87 |
1 files changed, 24 insertions, 63 deletions
diff --git a/src/os/job.c b/src/os/job.c index 0d62940013..5dd4f7abe1 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -5,6 +5,8 @@ #include "os/job.h" #include "os/job_defs.h" +#include "os/rstream.h" +#include "os/rstream_defs.h" #include "os/time.h" #include "os/shell.h" #include "vim.h" @@ -15,13 +17,6 @@ #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 1024 -/// Possible lock states of the job buffer -typedef enum { - kBufferLockNone = 0, ///< No data was read - kBufferLockStdout, ///< Data read from stdout - kBufferLockStderr ///< Data read from stderr -} BufferLock; - struct job { // Job id the index in the job table plus one. int id; @@ -31,14 +26,10 @@ struct job { bool stopped; // Data associated with the job void *data; - // Buffer for reading from stdout or stderr - char buffer[JOB_BUFFER_SIZE]; - // Size of the data from the last read - uint32_t length; - // Buffer lock state - BufferLock lock; // Callback for consuming data from the buffer job_read_cb read_cb; + // Readable streams + RStream *out, *err; // Structures for process spawning/management used by libuv uv_process_t proc; uv_process_options_t proc_opts; @@ -49,6 +40,7 @@ struct job { static Job *table[MAX_RUNNING_JOBS] = {NULL}; static uv_prepare_t job_prepare; +static void read_cb(RStream *rstream, void *data, bool eof); // Some helpers shared in this module static bool is_alive(Job *job); static Job * find_job(int id); @@ -56,8 +48,7 @@ static void free_job(Job *job); // Callbacks for libuv static void job_prepare_cb(uv_prepare_t *handle); -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf); -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); +// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); static void write_cb(uv_write_t *req, int status); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); @@ -154,12 +145,10 @@ int job_start(char **argv, void *data, job_read_cb cb) job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); - job->proc_stdout.data = job; job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); - job->proc_stderr.data = job; job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; @@ -170,8 +159,12 @@ int job_start(char **argv, void *data, job_read_cb cb) } // Start the readable streams - uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); - uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job); + rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); + rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); + rstream_start(job->out); + rstream_start(job->err); // Give the callback a reference to the job job->proc.data = job; // Save the job to the table @@ -217,19 +210,13 @@ bool job_write(int id, char *data, uint32_t len) void job_handle(Event event) { - Job *job = event.data.job; + Job *job = event.data.job.ptr; // Invoke the job callback job->read_cb(job->id, job->data, - job->buffer, - job->length, - job->lock == kBufferLockStdout); - - // restart reading - job->lock = kBufferLockNone; - uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); - uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); + event.data.job.target, + event.data.job.from_stdout); } static bool is_alive(Job *job) @@ -252,6 +239,8 @@ static void free_job(Job *job) uv_close((uv_handle_t *)&job->proc_stdin, NULL); uv_close((uv_handle_t *)&job->proc_stderr, NULL); uv_close((uv_handle_t *)&job->proc, NULL); + rstream_free(job->out); + rstream_free(job->err); free(job); } @@ -277,50 +266,22 @@ static void job_prepare_cb(uv_prepare_t *handle) } } -/// Puts the job into a 'reading state' which 'locks' the job buffer -/// until the data is consumed -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) -{ - Job *job = (Job *)handle->data; - - if (job->lock != kBufferLockNone) { - // Already reserved the buffer for reading from stdout or stderr. - buf->len = 0; - return; - } - - buf->base = job->buffer; - buf->len = JOB_BUFFER_SIZE; - // Avoid `alloc_cb`, `alloc_cb` sequences on windows and also mark which - // stream we are reading from - job->lock = - (handle == (uv_handle_t *)&job->proc_stdout) ? - kBufferLockStdout : - kBufferLockStderr; -} - /// Pushes a event object to the event queue, which will be handled later by /// `job_handle` -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) +static void read_cb(RStream *rstream, void *data, bool eof) { Event event; - Job *job = (Job *)stream->data; - // pause reading on both streams - uv_read_stop((uv_stream_t *)&job->proc_stdout); - uv_read_stop((uv_stream_t *)&job->proc_stderr); + Job *job = data; - if (cnt <= 0) { - if (cnt != UV_ENOBUFS) { - // Assume it's EOF and exit the job. Doesn't harm sending a SIGTERM - // at this point - uv_process_kill(&job->proc, SIGTERM); - } + if (eof) { + uv_process_kill(&job->proc, SIGTERM); return; } - job->length = cnt; event.type = kEventJobActivity; - event.data.job = job; + event.data.job.ptr = job; + event.data.job.target = rstream; + event.data.job.from_stdout = rstream == job->out; event_push(event); } |