diff options
Diffstat (limited to 'src/nvim/os/job.c')
-rw-r--r-- | src/nvim/os/job.c | 95 |
1 files changed, 35 insertions, 60 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 7e90994fb3..71419cefca 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -6,15 +6,12 @@ #include "nvim/event/loop.h" #include "nvim/event/time.h" #include "nvim/event/signal.h" -#include "nvim/os/uv_helpers.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" #include "nvim/os/job.h" #include "nvim/os/job_defs.h" #include "nvim/os/job_private.h" #include "nvim/os/pty_process.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" #include "nvim/os/time.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -29,20 +26,16 @@ #define KILL_TIMEOUT (TERM_TIMEOUT * 2) #define JOB_BUFFER_SIZE 0xFFFF -#define close_job_stream(job, stream, type) \ +#define close_job_stream(job, stream) \ do { \ - if (job->stream) { \ - type##stream_free(job->stream); \ - job->stream = NULL; \ - if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) { \ - uv_close((uv_handle_t *)job->proc_std##stream, close_cb); \ - } \ + if (!job->stream.closed) { \ + stream_close(&job->stream, on_##stream_close); \ } \ } while (0) -#define close_job_in(job) close_job_stream(job, in, w) -#define close_job_out(job) close_job_stream(job, out, r) -#define close_job_err(job) close_job_stream(job, err, r) +#define close_job_in(job) close_job_stream(job, in) +#define close_job_out(job) close_job_stream(job, out) +#define close_job_err(job) close_job_stream(job, err) Job *table[MAX_RUNNING_JOBS] = {NULL}; size_t stop_requests = 0; @@ -118,63 +111,45 @@ Job *job_start(JobOptions opts, int *status) job->refcount = 1; job->stopped_time = 0; job->term_sent = false; - job->in = NULL; - job->out = NULL; - job->err = NULL; job->opts = opts; job->closed = false; - - process_init(job); - - if (opts.writable) { - handle_set_job((uv_handle_t *)job->proc_stdin, job); - job->refcount++; - } - - if (opts.stdout_cb) { - handle_set_job((uv_handle_t *)job->proc_stdout, job); - job->refcount++; - } - - if (opts.stderr_cb) { - handle_set_job((uv_handle_t *)job->proc_stderr, job); - job->refcount++; - } + job->in.closed = true; + job->out.closed = true; + job->err.closed = true; // Spawn the job if (!process_spawn(job)) { - if (opts.writable) { - uv_close((uv_handle_t *)job->proc_stdin, close_cb); + if (job->opts.writable) { + uv_close((uv_handle_t *)job->proc_stdin, NULL); } - if (opts.stdout_cb) { - uv_close((uv_handle_t *)job->proc_stdout, close_cb); + if (job->opts.stdout_cb) { + uv_close((uv_handle_t *)job->proc_stdout, NULL); } - if (opts.stderr_cb) { - uv_close((uv_handle_t *)job->proc_stderr, close_cb); + if (job->opts.stderr_cb) { + uv_close((uv_handle_t *)job->proc_stderr, NULL); } process_close(job); loop_poll_events(&loop, 0); - // Manually invoke the close_cb to free the job resources *status = -1; return NULL; } if (opts.writable) { - job->in = wstream_new(opts.maxmem); - wstream_set_stream(job->in, job->proc_stdin); + job->refcount++; + wstream_init_stream(&job->in, job->proc_stdin, opts.maxmem, job); } // Start the readable streams if (opts.stdout_cb) { - job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->out, job->proc_stdout); - rstream_start(job->out); + job->refcount++; + rstream_init_stream(&job->out, job->proc_stdout, JOB_BUFFER_SIZE, job); + rstream_start(&job->out, read_cb); } if (opts.stderr_cb) { - job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->err, job->proc_stderr); - rstream_start(job->err); + job->refcount++; + rstream_init_stream(&job->err, job->proc_stderr, JOB_BUFFER_SIZE, job); + rstream_start(&job->err, read_cb); } // Save the job to the table table[i] = job; @@ -217,7 +192,7 @@ void job_stop(Job *job) // Close the job's stdin. If the job doesn't close its own stdout/stderr, // they will be closed when the job exits(possibly due to being terminated // after a timeout) - close_job_in(job); + job_close_in(job); } if (!stop_requests++) { @@ -315,9 +290,9 @@ void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL /// @param job The job instance /// @param cb The function that will be called on write completion or /// failure. It will be called with the job as the `data` argument. -void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL +void job_write_cb(Job *job, stream_write_cb cb) FUNC_ATTR_NONNULL_ALL { - wstream_set_write_cb(job->in, cb, job); + wstream_set_write_cb(&job->in, cb); } /// Writes data to the job's stdin. This is a non-blocking operation, it @@ -329,7 +304,7 @@ void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL /// to the job stream failed (possibly because the OS buffer is full) bool job_write(Job *job, WBuffer *buffer) { - return wstream_write(job->in, buffer); + return wstream_write(&job->in, buffer); } /// Get the job id @@ -405,26 +380,26 @@ static void job_stop_timer_cb(TimeWatcher *watcher, void *data) } // Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) { Job *job = data; - if (rstream == job->out) { - job->opts.stdout_cb(rstream, buf, data, eof); + if (stream == &job->out) { + job->opts.stdout_cb(stream, buf, data, eof); if (eof) { close_job_out(job); } } else { - job->opts.stderr_cb(rstream, buf, data, eof); + job->opts.stderr_cb(stream, buf, data, eof); if (eof) { close_job_err(job); } } } -static void close_cb(uv_handle_t *handle) +static void on_stream_close(Stream *stream, void *data) { - job_decref(handle_get_job(handle)); + job_decref(data); } static void job_exited(Event event) |