aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/job.c')
-rw-r--r--src/nvim/os/job.c95
1 files changed, 35 insertions, 60 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index 7e90994fb3..71419cefca 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -6,15 +6,12 @@
#include "nvim/event/loop.h"
#include "nvim/event/time.h"
#include "nvim/event/signal.h"
-#include "nvim/os/uv_helpers.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/job_private.h"
#include "nvim/os/pty_process.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
#include "nvim/os/time.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
@@ -29,20 +26,16 @@
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define JOB_BUFFER_SIZE 0xFFFF
-#define close_job_stream(job, stream, type) \
+#define close_job_stream(job, stream) \
do { \
- if (job->stream) { \
- type##stream_free(job->stream); \
- job->stream = NULL; \
- if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) { \
- uv_close((uv_handle_t *)job->proc_std##stream, close_cb); \
- } \
+ if (!job->stream.closed) { \
+ stream_close(&job->stream, on_##stream_close); \
} \
} while (0)
-#define close_job_in(job) close_job_stream(job, in, w)
-#define close_job_out(job) close_job_stream(job, out, r)
-#define close_job_err(job) close_job_stream(job, err, r)
+#define close_job_in(job) close_job_stream(job, in)
+#define close_job_out(job) close_job_stream(job, out)
+#define close_job_err(job) close_job_stream(job, err)
Job *table[MAX_RUNNING_JOBS] = {NULL};
size_t stop_requests = 0;
@@ -118,63 +111,45 @@ Job *job_start(JobOptions opts, int *status)
job->refcount = 1;
job->stopped_time = 0;
job->term_sent = false;
- job->in = NULL;
- job->out = NULL;
- job->err = NULL;
job->opts = opts;
job->closed = false;
-
- process_init(job);
-
- if (opts.writable) {
- handle_set_job((uv_handle_t *)job->proc_stdin, job);
- job->refcount++;
- }
-
- if (opts.stdout_cb) {
- handle_set_job((uv_handle_t *)job->proc_stdout, job);
- job->refcount++;
- }
-
- if (opts.stderr_cb) {
- handle_set_job((uv_handle_t *)job->proc_stderr, job);
- job->refcount++;
- }
+ job->in.closed = true;
+ job->out.closed = true;
+ job->err.closed = true;
// Spawn the job
if (!process_spawn(job)) {
- if (opts.writable) {
- uv_close((uv_handle_t *)job->proc_stdin, close_cb);
+ if (job->opts.writable) {
+ uv_close((uv_handle_t *)job->proc_stdin, NULL);
}
- if (opts.stdout_cb) {
- uv_close((uv_handle_t *)job->proc_stdout, close_cb);
+ if (job->opts.stdout_cb) {
+ uv_close((uv_handle_t *)job->proc_stdout, NULL);
}
- if (opts.stderr_cb) {
- uv_close((uv_handle_t *)job->proc_stderr, close_cb);
+ if (job->opts.stderr_cb) {
+ uv_close((uv_handle_t *)job->proc_stderr, NULL);
}
process_close(job);
loop_poll_events(&loop, 0);
- // Manually invoke the close_cb to free the job resources
*status = -1;
return NULL;
}
if (opts.writable) {
- job->in = wstream_new(opts.maxmem);
- wstream_set_stream(job->in, job->proc_stdin);
+ job->refcount++;
+ wstream_init_stream(&job->in, job->proc_stdin, opts.maxmem, job);
}
// Start the readable streams
if (opts.stdout_cb) {
- job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
- rstream_set_stream(job->out, job->proc_stdout);
- rstream_start(job->out);
+ job->refcount++;
+ rstream_init_stream(&job->out, job->proc_stdout, JOB_BUFFER_SIZE, job);
+ rstream_start(&job->out, read_cb);
}
if (opts.stderr_cb) {
- job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
- rstream_set_stream(job->err, job->proc_stderr);
- rstream_start(job->err);
+ job->refcount++;
+ rstream_init_stream(&job->err, job->proc_stderr, JOB_BUFFER_SIZE, job);
+ rstream_start(&job->err, read_cb);
}
// Save the job to the table
table[i] = job;
@@ -217,7 +192,7 @@ void job_stop(Job *job)
// Close the job's stdin. If the job doesn't close its own stdout/stderr,
// they will be closed when the job exits(possibly due to being terminated
// after a timeout)
- close_job_in(job);
+ job_close_in(job);
}
if (!stop_requests++) {
@@ -315,9 +290,9 @@ void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL
/// @param job The job instance
/// @param cb The function that will be called on write completion or
/// failure. It will be called with the job as the `data` argument.
-void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL
+void job_write_cb(Job *job, stream_write_cb cb) FUNC_ATTR_NONNULL_ALL
{
- wstream_set_write_cb(job->in, cb, job);
+ wstream_set_write_cb(&job->in, cb);
}
/// Writes data to the job's stdin. This is a non-blocking operation, it
@@ -329,7 +304,7 @@ void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL
/// to the job stream failed (possibly because the OS buffer is full)
bool job_write(Job *job, WBuffer *buffer)
{
- return wstream_write(job->in, buffer);
+ return wstream_write(&job->in, buffer);
}
/// Get the job id
@@ -405,26 +380,26 @@ static void job_stop_timer_cb(TimeWatcher *watcher, void *data)
}
// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
-static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
+static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
- if (rstream == job->out) {
- job->opts.stdout_cb(rstream, buf, data, eof);
+ if (stream == &job->out) {
+ job->opts.stdout_cb(stream, buf, data, eof);
if (eof) {
close_job_out(job);
}
} else {
- job->opts.stderr_cb(rstream, buf, data, eof);
+ job->opts.stderr_cb(stream, buf, data, eof);
if (eof) {
close_job_err(job);
}
}
}
-static void close_cb(uv_handle_t *handle)
+static void on_stream_close(Stream *stream, void *data)
{
- job_decref(handle_get_job(handle));
+ job_decref(data);
}
static void job_exited(Event event)