aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:32:07 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:32:07 -0300
commitaa9cb48bf08af14068178619414590254b263882 (patch)
treeb555f3a48c08862c07ef7518a8ba6c8fa58c1aee
parent9d8d2b7fa83fd69d1d616728c505a41acf8fedbb (diff)
downloadrneovim-aa9cb48bf08af14068178619414590254b263882.tar.gz
rneovim-aa9cb48bf08af14068178619414590254b263882.tar.bz2
rneovim-aa9cb48bf08af14068178619414590254b263882.zip
job: Replace by a better process abstraction layer
- New libuv/pty process abstraction with simplified API and no globals. - Remove nvim/os/job*. Jobs are now a concept that apply only to programs spawned by vimscript job* functions. - Refactor shell.c/channel.c to use the new module, which brings a number of advantages: - Simplified API, less code - No slots in the user job table are used - Not possible to acidentally receive data from vimscript - Implement job table in eval.c, which is now a hash table with unilimited job slots and unique job ids.
-rw-r--r--src/nvim/eval.c239
-rw-r--r--src/nvim/event/loop.c7
-rw-r--r--src/nvim/event/loop.h4
-rw-r--r--src/nvim/event/process.c325
-rw-r--r--src/nvim/event/process.h56
-rw-r--r--src/nvim/event/pty_process.c (renamed from src/nvim/os/pty_process.c)159
-rw-r--r--src/nvim/event/pty_process.h30
-rw-r--r--src/nvim/event/uv_process.c77
-rw-r--r--src/nvim/event/uv_process.h25
-rw-r--r--src/nvim/lib/klist.h2
-rw-r--r--src/nvim/main.c5
-rw-r--r--src/nvim/msgpack_rpc/channel.c76
-rw-r--r--src/nvim/os/job.c444
-rw-r--r--src/nvim/os/job.h20
-rw-r--r--src/nvim/os/job_defs.h64
-rw-r--r--src/nvim/os/job_private.h101
-rw-r--r--src/nvim/os/pipe_process.c88
-rw-r--r--src/nvim/os/pipe_process.h17
-rw-r--r--src/nvim/os/pty_process.h17
-rw-r--r--src/nvim/os/shell.c50
-rw-r--r--src/nvim/os_unix.c1
21 files changed, 791 insertions, 1016 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index ae6b99c336..b43ab238cd 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -55,6 +55,7 @@
#include "nvim/misc1.h"
#include "nvim/misc2.h"
#include "nvim/keymap.h"
+#include "nvim/map.h"
#include "nvim/file_search.h"
#include "nvim/garray.h"
#include "nvim/move.h"
@@ -82,8 +83,10 @@
#include "nvim/version.h"
#include "nvim/window.h"
#include "nvim/os/os.h"
-#include "nvim/os/job.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/event/pty_process.h"
#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
#include "nvim/os/time.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
@@ -442,14 +445,19 @@ static dictitem_T vimvars_var; /* variable used for v: */
#define vimvarht vimvardict.dv_hashtab
typedef struct {
- Job *job;
+ union {
+ UvProcess uv;
+ PtyProcess pty;
+ } proc;
+ Stream in, out, err;
Terminal *term;
+ bool stopped;
bool exited;
- bool stdin_closed;
int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self;
int *status_ptr;
+ uint64_t id;
} TerminalJobData;
@@ -462,7 +470,6 @@ typedef struct {
valid character */
// Memory pool for reusing JobEvent structures
typedef struct {
- int job_id;
TerminalJobData *data;
ufunc_T *callback;
const char *type;
@@ -470,12 +477,15 @@ typedef struct {
int status;
} JobEvent;
static int disable_job_defer = 0;
+static uint64_t current_job_id = 1;
+static PMap(uint64_t) *jobs = NULL;
/*
* Initialize the global and v: variables.
*/
void eval_init(void)
{
+ jobs = pmap_new(uint64_t)();
int i;
struct vimvar *p;
@@ -10692,29 +10702,27 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
return;
}
- Job *job = job_find(argvars[0].vval.v_number);
-
- if (!is_user_job(job)) {
- // Invalid job id
+ TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ if (!data) {
EMSG(_(e_invjob));
return;
}
+ Process *proc = (Process *)&data->proc;
+
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
- job_close_in(job);
- ((TerminalJobData *)job_data(job))->stdin_closed = true;
+ process_close_in(proc);
} else if (!strcmp(stream, "stdout")) {
- job_close_out(job);
+ process_close_out(proc);
} else if (!strcmp(stream, "stderr")) {
- job_close_err(job);
+ process_close_err(proc);
} else {
EMSG2(_("Invalid job stream \"%s\""), stream);
}
} else {
- ((TerminalJobData *)job_data(job))->stdin_closed = true;
- job_close_streams(job);
+ process_close_streams(proc);
}
}
@@ -10735,15 +10743,13 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
- Job *job = job_find(argvars[0].vval.v_number);
-
- if (!is_user_job(job)) {
- // Invalid job id
+ TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ if (!data) {
EMSG(_(e_invjob));
return;
}
- if (((TerminalJobData *)job_data(job))->stdin_closed) {
+ if (((Process *)&data->proc)->in->closed) {
EMSG(_("Can't send data to the job: stdin is closed"));
return;
}
@@ -10757,7 +10763,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
}
WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree);
- rettv->vval.v_number = job_write(job, buf);
+ rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf);
}
// "jobresize(job, width, height)" function
@@ -10777,19 +10783,20 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv)
return;
}
- Job *job = job_find(argvars[0].vval.v_number);
- if (!is_user_job(job)) {
- // Probably an invalid job id
+ TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ if (!data) {
EMSG(_(e_invjob));
return;
}
- if (!job_resize(job, argvars[1].vval.v_number, argvars[2].vval.v_number)) {
+ if (data->proc.uv.process.type != kProcessTypePty) {
EMSG(_(e_jobnotpty));
return;
}
+ pty_process_resize(&data->proc.pty, argvars[1].vval.v_number,
+ argvars[2].vval.v_number);
rettv->vval.v_number = 1;
}
@@ -10878,37 +10885,33 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
- JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit,
- job_opts);
+ bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
+ TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
+ job_opts, pty);
+ Process *proc = (Process *)&data->proc;
- if (!job_opts) {
- goto start;
- }
-
- opts.pty = get_dict_number(job_opts, (uint8_t *)"pty");
- if (opts.pty) {
+ if (pty) {
uint16_t width = get_dict_number(job_opts, (uint8_t *)"width");
if (width > 0) {
- opts.width = width;
+ data->proc.pty.width = width;
}
uint16_t height = get_dict_number(job_opts, (uint8_t *)"height");
if (height > 0) {
- opts.height = height;
+ data->proc.pty.height = height;
}
char *term = (char *)get_dict_string(job_opts, (uint8_t *)"TERM", true);
if (term) {
- opts.term_name = term;
+ data->proc.pty.term_name = term;
}
}
-start:
if (!on_stdout) {
- opts.stdout_cb = NULL;
+ proc->out = NULL;
}
if (!on_stderr) {
- opts.stderr_cb = NULL;
+ proc->err = NULL;
}
- common_job_start(opts, rettv);
+ common_job_start(data, rettv);
}
// "jobstop()" function
@@ -10927,14 +10930,15 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)
return;
}
- Job *job = job_find(argvars[0].vval.v_number);
- if (!is_user_job(job)) {
+ TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ if (!data || data->stopped) {
EMSG(_(e_invjob));
return;
}
- job_stop(job);
+ process_stop((Process *)&data->proc);
+ data->stopped = true;
rettv->vval.v_number = 1;
}
@@ -10971,13 +10975,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- Job *job = NULL;
+ TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
- || !(job = job_find(arg->li_tv.vval.v_number))
- || !is_user_job(job)) {
+ || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
list_append_number(rv, -3);
} else {
- TerminalJobData *data = job_data(job);
// append the list item and set the status pointer so we'll collect the
// status code when the job exits
list_append_number(rv, -1);
@@ -10993,18 +10995,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
}
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- Job *job = NULL;
+ TerminalJobData *data = NULL;
if (remaining == 0) {
// timed out
break;
}
if (arg->li_tv.v_type != VAR_NUMBER
- || !(job = job_find(arg->li_tv.vval.v_number))
- || !is_user_job(job)) {
+ || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
continue;
}
- TerminalJobData *data = job_data(job);
- int status = job_wait(job, remaining);
+ int status = process_wait((Process *)&data->proc, remaining);
if (status < 0) {
// interrupted or timed out, skip remaining jobs.
if (status == -2) {
@@ -11028,13 +11028,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
loop_poll_events(&loop, 0);
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- Job *job = NULL;
+ TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
- || !(job = job_find(arg->li_tv.vval.v_number))
- || !is_user_job(job)) {
+ || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
continue;
}
- TerminalJobData *data = job_data(job);
// remove the status pointer because the list may be freed before the
// job exits
data->status_ptr = NULL;
@@ -12951,7 +12949,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
- uint64_t channel_id = channel_from_job(argv);
+ uint64_t channel_id = channel_from_process(argv);
if (!channel_id) {
EMSG(_(e_api_spawn_failed));
@@ -15225,19 +15223,15 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
}
}
- JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit,
- job_opts);
- opts.pty = true;
- opts.width = curwin->w_width;
- opts.height = curwin->w_height;
- opts.term_name = xstrdup("xterm-256color");
- Job *job = common_job_start(opts, rettv);
- if (!job) {
- shell_free_argv(argv);
+ TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
+ job_opts, true);
+ data->proc.pty.width = curwin->w_width;
+ data->proc.pty.height = curwin->w_height;
+ data->proc.pty.term_name = xstrdup("xterm-256color");
+ if (!common_job_start(data, rettv)) {
return;
}
- TerminalJobData *data = opts.data;
- TerminalOptions topts = TERMINAL_OPTIONS_INIT;
+ TerminalOptions topts;
topts.data = data;
topts.width = curwin->w_width;
topts.height = curwin->w_height;
@@ -15250,7 +15244,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
&& os_isdir(argvars[1].vval.v_string)) {
cwd = (char *)argvars[1].vval.v_string;
}
- int pid = job_pid(job);
+ int pid = data->proc.pty.process.pid;
// Get the desired name of the buffer.
char *name = job_opts ?
@@ -20222,21 +20216,27 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
return ret;
}
-static inline JobOptions common_job_options(char **argv, ufunc_T *on_stdout,
- ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self)
+static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout,
+ ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self, bool pty)
{
TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData));
+ data->stopped = false;
data->on_stdout = on_stdout;
data->on_stderr = on_stderr;
data->on_exit = on_exit;
data->self = self;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = data;
- opts.stdout_cb = on_job_stdout;
- opts.stderr_cb = on_job_stderr;
- opts.exit_cb = on_job_exit;
- return opts;
+ if (pty) {
+ data->proc.pty = pty_process_init(data);
+ } else {
+ data->proc.uv = uv_process_init(data);
+ }
+ Process *proc = (Process *)&data->proc;
+ proc->argv = argv;
+ proc->in = &data->in;
+ proc->out = &data->out;
+ proc->err = &data->err;
+ proc->cb = on_process_exit;
+ return data;
}
/// Return true/false on success/failure.
@@ -20262,24 +20262,28 @@ static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
return false;
}
-static inline Job *common_job_start(JobOptions opts, typval_T *rettv)
+static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
{
- TerminalJobData *data = opts.data;
data->refcount++;
- Job *job = job_start(opts, &rettv->vval.v_number);
+ Process *proc = (Process *)&data->proc;
+ if (!process_spawn(&loop, proc)) {
+ EMSG(_(e_jobexe));
+ return false;
+ }
- if (rettv->vval.v_number <= 0) {
- if (rettv->vval.v_number == 0) {
- EMSG(_(e_jobtblfull));
- xfree(opts.term_name);
- free_term_job_data(data);
- } else {
- EMSG(_(e_jobexe));
- }
- return NULL;
+ data->id = current_job_id++;
+ wstream_init(proc->in, 0);
+ if (proc->out) {
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, on_job_stdout);
+ }
+ if (proc->err) {
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, on_job_stderr);
}
- data->job = job;
- return job;
+ pmap_put(uint64_t)(jobs, data->id, data);
+ rettv->vval.v_number = data->id;
+ return true;
}
static inline void free_term_job_data(TerminalJobData *data) {
@@ -20300,25 +20304,15 @@ static inline void free_term_job_data(TerminalJobData *data) {
xfree(data);
}
-static inline bool is_user_job(Job *job)
-{
- if (!job) {
- return false;
- }
-
- JobOptions *opts = job_opts(job);
- return opts->exit_cb == on_job_exit;
-}
-
// vimscript job callbacks must be executed on Nvim main loop
-static inline void push_job_event(Job *job, ufunc_T *callback,
- const char *type, char *data, size_t count, int status)
+static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
+ const char *type, char *buf, size_t count, int status)
{
JobEvent *event_data = xmalloc(sizeof(JobEvent));
event_data->received = NULL;
- if (data) {
+ if (buf) {
event_data->received = list_alloc();
- char *ptr = data;
+ char *ptr = buf;
size_t remaining = count;
size_t off = 0;
@@ -20342,8 +20336,7 @@ static inline void push_job_event(Job *job, ufunc_T *callback,
} else {
event_data->status = status;
}
- event_data->job_id = job_id(job);
- event_data->data = job_data(job);
+ event_data->data = data;
event_data->callback = callback;
event_data->type = type;
loop_push_event(&loop, (Event) {
@@ -20354,24 +20347,23 @@ static inline void push_job_event(Job *job, ufunc_T *callback,
static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)
{
- TerminalJobData *data = job_data(job);
+ TerminalJobData *data = job;
on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");
}
static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)
{
- TerminalJobData *data = job_data(job);
+ TerminalJobData *data = job;
on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");
}
-static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof,
- ufunc_T *callback, const char *type)
+static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf,
+ bool eof, ufunc_T *callback, const char *type)
{
if (eof) {
return;
}
- TerminalJobData *data = job_data(job);
RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
// The order here matters, the terminal must receive the data first because
// push_job_event will modify the read buffer(convert NULs into NLs)
@@ -20380,17 +20372,16 @@ static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof,
}
if (callback) {
- push_job_event(job, callback, type, ptr, len, 0);
+ push_job_event(data, callback, type, ptr, len, 0);
}
rbuffer_consumed(buf, len);
}
}
-static void on_job_exit(Job *job, int status, void *d)
+static void on_process_exit(Process *proc, int status, void *d)
{
TerminalJobData *data = d;
-
if (data->term && !data->exited) {
data->exited = true;
terminal_close(data->term,
@@ -20401,19 +20392,20 @@ static void on_job_exit(Job *job, int status, void *d)
*data->status_ptr = status;
}
- push_job_event(job, data->on_exit, "exit", NULL, 0, status);
+ push_job_event(data, data->on_exit, "exit", NULL, 0, status);
}
-static void term_write(char *buf, size_t size, void *data)
+static void term_write(char *buf, size_t size, void *d)
{
- Job *job = ((TerminalJobData *)data)->job;
+ TerminalJobData *data = d;
WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
- job_write(job, wbuf);
+ wstream_write(&data->in, wbuf);
}
-static void term_resize(uint16_t width, uint16_t height, void *data)
+static void term_resize(uint16_t width, uint16_t height, void *d)
{
- job_resize(((TerminalJobData *)data)->job, width, height);
+ TerminalJobData *data = d;
+ pty_process_resize(&data->proc.pty, width, height);
}
static void term_close(void *d)
@@ -20421,7 +20413,7 @@ static void term_close(void *d)
TerminalJobData *data = d;
if (!data->exited) {
data->exited = true;
- job_stop(data->job);
+ process_stop((Process *)&data->proc);
}
terminal_destroy(data->term);
term_job_data_decref(d);
@@ -20448,7 +20440,7 @@ static void on_job_event(Event event)
if (argc > 0) {
argv[0].v_type = VAR_NUMBER;
argv[0].v_lock = 0;
- argv[0].vval.v_number = ev->job_id;
+ argv[0].vval.v_number = ev->data->id;
}
if (argc > 1) {
@@ -20479,6 +20471,7 @@ static void on_job_event(Event event)
end:
if (!ev->received) {
// exit event, safe to free job data now
+ pmap_del(uint64_t)(jobs, ev->data->id);
term_job_data_decref(ev->data);
}
xfree(ev);
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
index c467ae8b96..d90565002e 100644
--- a/src/nvim/event/loop.c
+++ b/src/nvim/event/loop.c
@@ -3,6 +3,7 @@
#include <uv.h>
#include "nvim/event/loop.h"
+#include "nvim/event/process.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/loop.c.generated.h"
@@ -15,6 +16,10 @@ void loop_init(Loop *loop, void *data)
loop->uv.data = loop;
loop->deferred_events = kl_init(Event);
loop->immediate_events = kl_init(Event);
+ loop->children = kl_init(WatcherPtr);
+ loop->children_stop_requests = 0;
+ uv_signal_init(&loop->uv, &loop->children_watcher);
+ uv_timer_init(&loop->uv, &loop->children_kill_timer);
}
void loop_poll_events(Loop *loop, int ms)
@@ -113,6 +118,8 @@ void loop_stop(Loop *loop)
void loop_close(Loop *loop)
{
+ uv_close((uv_handle_t *)&loop->children_watcher, NULL);
+ uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
do {
uv_run(&loop->uv, UV_RUN_DEFAULT);
} while (uv_loop_close(&loop->uv));
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
index e5a890dddb..5eb4d32ca8 100644
--- a/src/nvim/event/loop.h
+++ b/src/nvim/event/loop.h
@@ -26,6 +26,10 @@ typedef struct loop {
uv_loop_t uv;
klist_t(Event) *deferred_events, *immediate_events;
int deferred_events_allowed;
+ klist_t(WatcherPtr) *children;
+ uv_signal_t children_watcher;
+ uv_timer_t children_kill_timer;
+ size_t children_stop_requests;
} Loop;
// Poll for events until a condition or timeout
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
new file mode 100644
index 0000000000..2b1f1ae096
--- /dev/null
+++ b/src/nvim/event/process.c
@@ -0,0 +1,325 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/os/shell.h"
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/process.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/event/pty_process.h"
+#include "nvim/globals.h"
+#include "nvim/log.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/process.c.generated.h"
+#endif
+
+// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly
+// exit before we send SIGNAL to it
+#define TERM_TIMEOUT 1000000000
+#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
+
+#define CLOSE_PROC_STREAM(proc, stream) \
+ do { \
+ if (proc->stream && !proc->stream->closed) { \
+ stream_close(proc->stream, NULL); \
+ } \
+ } while (0)
+
+
+bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ proc->loop = loop;
+ if (proc->in) {
+ uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0);
+ }
+
+ if (proc->out) {
+ uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0);
+ }
+
+ if (proc->err) {
+ uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0);
+ }
+
+ bool success;
+ switch (proc->type) {
+ case kProcessTypeUv:
+ success = uv_process_spawn((UvProcess *)proc);
+ break;
+ case kProcessTypePty:
+ success = pty_process_spawn((PtyProcess *)proc);
+ break;
+ default:
+ abort();
+ }
+
+ if (!success) {
+ if (proc->in) {
+ uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
+ }
+ if (proc->out) {
+ uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
+ }
+ if (proc->err) {
+ uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
+ }
+ process_close(proc);
+ shell_free_argv(proc->argv);
+ proc->status = -1;
+ return false;
+ }
+
+ void *data = proc->data;
+
+ if (proc->in) {
+ stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
+ proc->in->internal_data = proc;
+ proc->in->internal_close_cb = on_process_stream_close;
+ proc->refcount++;
+ }
+
+ if (proc->out) {
+ stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
+ proc->out->internal_data = proc;
+ proc->out->internal_close_cb = on_process_stream_close;
+ proc->refcount++;
+ }
+
+ if (proc->err) {
+ stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
+ proc->err->internal_data = proc;
+ proc->err->internal_close_cb = on_process_stream_close;
+ proc->refcount++;
+ }
+
+ proc->internal_exit_cb = on_process_exit;
+ proc->internal_close_cb = decref;
+ proc->refcount++;
+ kl_push(WatcherPtr, loop->children, proc);
+ return true;
+}
+
+void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
+{
+ kl_iter(WatcherPtr, loop->children, current) {
+ Process *proc = (*current)->data;
+ uv_kill(proc->pid, SIGTERM);
+ proc->term_sent = true;
+ process_stop(proc);
+ }
+
+ // Wait until all children exit
+ LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children));
+ pty_process_teardown(loop);
+}
+
+// Wrappers around `stream_close` that protect against double-closing.
+void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ process_close_in(proc);
+ process_close_out(proc);
+ process_close_err(proc);
+}
+
+void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ CLOSE_PROC_STREAM(proc, in);
+}
+
+void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ CLOSE_PROC_STREAM(proc, out);
+}
+
+void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ CLOSE_PROC_STREAM(proc, err);
+}
+
+/// Synchronously wait for a process to finish
+///
+/// @param process The Process instance
+/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
+/// waiting until the process quits.
+/// @return returns the status code of the exited process. -1 if the process is
+/// still running and the `timeout` has expired. Note that this is
+/// indistinguishable from the process returning -1 by itself. Which
+/// is possible on some OS. Returns -2 if an user has interruped the
+/// wait.
+int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
+{
+ // The default status is -1, which represents a timeout
+ int status = -1;
+ bool interrupted = false;
+
+ // Increase refcount to stop the exit callback from being called(and possibly
+ // being freed) before we have a chance to get the status.
+ proc->refcount++;
+ LOOP_POLL_EVENTS_UNTIL(proc->loop, ms,
+ // Until...
+ got_int || // interrupted by the user
+ proc->refcount == 1); // job exited
+
+ // we'll assume that a user frantically hitting interrupt doesn't like
+ // the current job. Signal that it has to be killed.
+ if (got_int) {
+ interrupted = true;
+ got_int = false;
+ process_stop(proc);
+ if (ms == -1) {
+ // We can only return, if all streams/handles are closed and the job
+
+ // exited.
+ LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1);
+ } else {
+ loop_poll_events(proc->loop, 0);
+ }
+ }
+
+ if (proc->refcount == 1) {
+ // Job exited, collect status and manually invoke close_cb to free the job
+ // resources
+ status = interrupted ? -2 : proc->status;
+ decref(proc);
+ } else {
+ proc->refcount--;
+ }
+
+ return status;
+}
+
+/// Ask a process to terminate and eventually kill if it doesn't respond
+void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ if (proc->stopped_time) {
+ return;
+ }
+
+ proc->stopped_time = os_hrtime();
+ switch (proc->type) {
+ case kProcessTypeUv:
+ // Close the process's stdin. If the process doesn't close its own
+ // stdout/stderr, they will be closed when it exits(possibly due to being
+ // terminated after a timeout)
+ process_close_in(proc);
+ break;
+ case kProcessTypePty:
+ // close all streams for pty processes to send SIGHUP to the process
+ process_close_streams(proc);
+ pty_process_close_master((PtyProcess *)proc);
+ break;
+ default:
+ abort();
+ }
+
+ Loop *loop = proc->loop;
+ if (!loop->children_stop_requests++) {
+ // When there's at least one stop request pending, start a timer that
+ // will periodically check if a signal should be send to a to the job
+ DLOG("Starting job kill timer");
+ uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100);
+ }
+}
+
+/// Iterates the process list sending SIGTERM to stopped processes and SIGKILL
+/// to those that didn't die from SIGTERM after a while(exit_timeout is 0).
+static void children_kill_cb(uv_timer_t *handle)
+{
+ Loop *loop = handle->loop->data;
+ uint64_t now = os_hrtime();
+
+ kl_iter(WatcherPtr, loop->children, current) {
+ Process *proc = (*current)->data;
+ if (!proc->stopped_time) {
+ continue;
+ }
+ uint64_t elapsed = now - proc->stopped_time;
+
+ if (!proc->term_sent && elapsed >= TERM_TIMEOUT) {
+ ILOG("Sending SIGTERM to pid %d", proc->pid);
+ uv_kill(proc->pid, SIGTERM);
+ proc->term_sent = true;
+ } else if (elapsed >= KILL_TIMEOUT) {
+ ILOG("Sending SIGKILL to pid %d", proc->pid);
+ uv_kill(proc->pid, SIGKILL);
+ }
+ }
+}
+
+static void decref(Process *proc)
+{
+ if (--proc->refcount != 0) {
+ return;
+ }
+
+ Loop *loop = proc->loop;
+ kliter_t(WatcherPtr) **node = NULL;
+ kl_iter(WatcherPtr, loop->children, current) {
+ if ((*current)->data == proc) {
+ node = current;
+ break;
+ }
+ }
+
+ assert(node);
+ kl_shift_at(WatcherPtr, loop->children, node);
+ shell_free_argv(proc->argv);
+ if (proc->type == kProcessTypePty) {
+ xfree(((PtyProcess *)proc)->term_name);
+ }
+ if (proc->cb) {
+ proc->cb(proc, proc->status, proc->data);
+ }
+}
+
+static void process_close(Process *proc)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ assert(!proc->closed);
+ proc->closed = true;
+ switch (proc->type) {
+ case kProcessTypeUv:
+ uv_process_close((UvProcess *)proc);
+ break;
+ case kProcessTypePty:
+ pty_process_close((PtyProcess *)proc);
+ break;
+ default:
+ abort();
+ }
+}
+
+static void on_process_exit(Process *proc)
+{
+ if (exiting) {
+ on_process_exit_event((Event) {.data = proc});
+ } else {
+ loop_push_event(proc->loop,
+ (Event) {.handler = on_process_exit_event, .data = proc}, false);
+ }
+
+ Loop *loop = proc->loop;
+ if (loop->children_stop_requests && !--loop->children_stop_requests) {
+ // Stop the timer if no more stop requests are pending
+ DLOG("Stopping process kill timer");
+ uv_timer_stop(&loop->children_kill_timer);
+ }
+}
+
+static void on_process_exit_event(Event event)
+{
+ Process *proc = event.data;
+ process_close_streams(proc);
+ process_close(proc);
+}
+
+static void on_process_stream_close(Stream *stream, void *data)
+{
+ Process *proc = data;
+ decref(proc);
+}
+
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
new file mode 100644
index 0000000000..5c84a7d1d0
--- /dev/null
+++ b/src/nvim/event/process.h
@@ -0,0 +1,56 @@
+#ifndef NVIM_EVENT_PROCESS_H
+#define NVIM_EVENT_PROCESS_H
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+
+typedef enum {
+ kProcessTypeUv,
+ kProcessTypePty
+} ProcessType;
+
+typedef struct process Process;
+typedef void (*process_exit_cb)(Process *proc, int status, void *data);
+typedef void (*internal_process_cb)(Process *proc);
+
+struct process {
+ ProcessType type;
+ Loop *loop;
+ void *data;
+ int pid, status, refcount;
+ // set to the hrtime of when process_stop was called for the process.
+ uint64_t stopped_time;
+ char **argv;
+ Stream *in, *out, *err;
+ process_exit_cb cb;
+ internal_process_cb internal_exit_cb, internal_close_cb;
+ bool closed, term_sent;
+};
+
+static inline Process process_init(ProcessType type, void *data)
+{
+ return (Process) {
+ .type = type,
+ .data = data,
+ .loop = NULL,
+ .pid = 0,
+ .status = 0,
+ .refcount = 0,
+ .stopped_time = 0,
+ .argv = NULL,
+ .in = NULL,
+ .out = NULL,
+ .err = NULL,
+ .cb = NULL,
+ .closed = false,
+ .term_sent = false,
+ .internal_close_cb = NULL,
+ .internal_exit_cb = NULL
+ };
+}
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/process.h.generated.h"
+#endif
+#endif // NVIM_EVENT_PROCESS_H
diff --git a/src/nvim/os/pty_process.c b/src/nvim/event/pty_process.c
index a8fff2a61d..1e24d7c919 100644
--- a/src/nvim/os/pty_process.c
+++ b/src/nvim/event/pty_process.c
@@ -20,58 +20,39 @@
#include <uv.h>
-#include "nvim/func_attr.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/job_private.h"
-#include "nvim/os/pty_process.h"
-#include "nvim/memory.h"
-#include "nvim/vim.h"
-#include "nvim/globals.h"
+#include "nvim/lib/klist.h"
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/process.h"
+#include "nvim/event/pty_process.h"
+#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pty_process.c.generated.h"
+# include "event/pty_process.c.generated.h"
#endif
static const unsigned int KILL_RETRIES = 5;
static const unsigned int KILL_TIMEOUT = 2; // seconds
-bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL
+bool pty_process_spawn(PtyProcess *ptyproc)
+ FUNC_ATTR_NONNULL_ALL
{
- PtyProcess *ptyproc = &job->process.pty;
- ptyproc->tty_fd = -1;
-
- if (job->opts.writable) {
- uv_pipe_init(&loop.uv, &ptyproc->proc_stdin, 0);
- ptyproc->proc_stdin.data = NULL;
- }
-
- if (job->opts.stdout_cb) {
- uv_pipe_init(&loop.uv, &ptyproc->proc_stdout, 0);
- ptyproc->proc_stdout.data = NULL;
- }
-
- if (job->opts.stderr_cb) {
- uv_pipe_init(&loop.uv, &ptyproc->proc_stderr, 0);
- ptyproc->proc_stderr.data = NULL;
- }
-
- job->proc_stdin = (uv_stream_t *)&ptyproc->proc_stdin;
- job->proc_stdout = (uv_stream_t *)&ptyproc->proc_stdout;
- job->proc_stderr = (uv_stream_t *)&ptyproc->proc_stderr;
-
- int master;
- ptyproc->winsize = (struct winsize){job->opts.height, job->opts.width, 0, 0};
+ Process *proc = (Process *)ptyproc;
+ uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
+ ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};
struct termios termios;
init_termios(&termios);
uv_disable_stdio_inheritance();
-
+ int master;
int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
if (pid < 0) {
+ ELOG("forkpty failed: %s", strerror(errno));
return false;
} else if (pid == 0) {
- init_child(job);
+ init_child(ptyproc);
abort();
}
@@ -86,23 +67,18 @@ bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL
goto error;
}
- if (job->opts.writable
- && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdin)) {
+ if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) {
goto error;
}
-
- if (job->opts.stdout_cb
- && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdout)) {
+ if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {
goto error;
}
-
- if (job->opts.stderr_cb
- && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stderr)) {
+ if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) {
goto error;
}
ptyproc->tty_fd = master;
- job->pid = pid;
+ proc->pid = pid;
return true;
error:
@@ -117,54 +93,44 @@ error:
}
if (child != pid) {
kill(pid, SIGKILL);
+ waitpid(pid, NULL, 0);
}
return false;
}
-static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe)
+void pty_process_resize(PtyProcess *ptyproc, uint16_t width,
+ uint16_t height)
FUNC_ATTR_NONNULL_ALL
{
- int fd_dup = dup(fd);
- if (fd_dup < 0) {
- ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
- return false;
- }
- int uv_result = uv_pipe_open(pipe, fd_dup);
- if (uv_result) {
- ELOG("Failed to set pipe to descriptor %d: %s",
- fd_dup, uv_strerror(uv_result));
- close(fd_dup);
- return false;
- }
- return true;
+ ptyproc->winsize = (struct winsize){height, width, 0, 0};
+ ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
}
-void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL
+void pty_process_close(PtyProcess *ptyproc)
+ FUNC_ATTR_NONNULL_ALL
{
- pty_process_close_master(job);
- job_close_streams(job);
- job_decref(job);
+ pty_process_close_master(ptyproc);
+ Process *proc = (Process *)ptyproc;
+ if (proc->internal_close_cb) {
+ proc->internal_close_cb(proc);
+ }
}
-void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL
+void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
{
- PtyProcess *ptyproc = &job->process.pty;
if (ptyproc->tty_fd >= 0) {
close(ptyproc->tty_fd);
ptyproc->tty_fd = -1;
}
}
-void pty_process_resize(Job *job, uint16_t width, uint16_t height)
- FUNC_ATTR_NONNULL_ALL
+void pty_process_teardown(Loop *loop)
{
- PtyProcess *ptyproc = &job->process.pty;
- ptyproc->winsize = (struct winsize){height, width, 0, 0};
- ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
+ uv_signal_stop(&loop->children_watcher);
}
-static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL
+static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
{
unsetenv("COLUMNS");
unsetenv("LINES");
@@ -179,8 +145,8 @@ static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL
signal(SIGTERM, SIG_DFL);
signal(SIGALRM, SIG_DFL);
- setenv("TERM", job->opts.term_name ? job->opts.term_name : "ansi", 1);
- execvp(job->opts.argv[0], job->opts.argv);
+ setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1);
+ execvp(ptyproc->process.argv[0], ptyproc->process.argv);
fprintf(stderr, "execvp failed: %s\n", strerror(errno));
}
@@ -239,3 +205,50 @@ static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
termios->c_cc[VMIN] = 1;
termios->c_cc[VTIME] = 0;
}
+
+static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe)
+ FUNC_ATTR_NONNULL_ALL
+{
+ int fd_dup = dup(fd);
+ if (fd_dup < 0) {
+ ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
+ return false;
+ }
+ int uv_result = uv_pipe_open(pipe, fd_dup);
+ if (uv_result) {
+ ELOG("Failed to set pipe to descriptor %d: %s",
+ fd_dup, uv_strerror(uv_result));
+ close(fd_dup);
+ return false;
+ }
+ return true;
+}
+
+static void chld_handler(uv_signal_t *handle, int signum)
+{
+ int stat = 0;
+ int pid;
+
+ do {
+ pid = waitpid(-1, &stat, WNOHANG);
+ } while (pid < 0 && errno == EINTR);
+
+ if (pid <= 0) {
+ return;
+ }
+
+ Loop *loop = handle->loop->data;
+
+ kl_iter(WatcherPtr, loop->children, current) {
+ Process *proc = (*current)->data;
+ if (proc->pid == pid) {
+ if (WIFEXITED(stat)) {
+ proc->status = WEXITSTATUS(stat);
+ } else if (WIFSIGNALED(stat)) {
+ proc->status = WTERMSIG(stat);
+ }
+ proc->internal_exit_cb(proc);
+ break;
+ }
+ }
+}
diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h
new file mode 100644
index 0000000000..a12b5489c5
--- /dev/null
+++ b/src/nvim/event/pty_process.h
@@ -0,0 +1,30 @@
+#ifndef NVIM_EVENT_PTY_PROCESS_H
+#define NVIM_EVENT_PTY_PROCESS_H
+
+#include <sys/ioctl.h>
+
+#include "nvim/event/process.h"
+
+typedef struct pty_process {
+ Process process;
+ char *term_name;
+ uint16_t width, height;
+ struct winsize winsize;
+ int tty_fd;
+} PtyProcess;
+
+static inline PtyProcess pty_process_init(void *data)
+{
+ PtyProcess rv;
+ rv.process = process_init(kProcessTypePty, data);
+ rv.term_name = NULL;
+ rv.width = 80;
+ rv.height = 24;
+ rv.tty_fd = -1;
+ return rv;
+}
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/pty_process.h.generated.h"
+#endif
+#endif // NVIM_EVENT_PTY_PROCESS_H
diff --git a/src/nvim/event/uv_process.c b/src/nvim/event/uv_process.c
new file mode 100644
index 0000000000..21c2fd1790
--- /dev/null
+++ b/src/nvim/event/uv_process.c
@@ -0,0 +1,77 @@
+#include <assert.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/process.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/log.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/uv_process.c.generated.h"
+#endif
+
+bool uv_process_spawn(UvProcess *uvproc)
+ FUNC_ATTR_NONNULL_ALL
+{
+ Process *proc = (Process *)uvproc;
+ uvproc->uvopts.file = proc->argv[0];
+ uvproc->uvopts.args = proc->argv;
+ uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE;
+ uvproc->uvopts.exit_cb = exit_cb;
+ uvproc->uvopts.cwd = NULL;
+ uvproc->uvopts.env = NULL;
+ uvproc->uvopts.stdio = uvproc->uvstdio;
+ uvproc->uvopts.stdio_count = 3;
+ uvproc->uvstdio[0].flags = UV_IGNORE;
+ uvproc->uvstdio[1].flags = UV_IGNORE;
+ uvproc->uvstdio[2].flags = UV_IGNORE;
+ uvproc->uv.data = proc;
+
+ if (proc->in) {
+ uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
+ uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe;
+ }
+
+ if (proc->out) {
+ uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
+ uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe;
+ }
+
+ if (proc->err) {
+ uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
+ uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe;
+ }
+
+ int status;
+ if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) {
+ ELOG("uv_spawn failed: %s", uv_strerror(status));
+ return false;
+ }
+
+ proc->pid = uvproc->uv.pid;
+ return true;
+}
+
+void uv_process_close(UvProcess *uvproc)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ uv_close((uv_handle_t *)&uvproc->uv, close_cb);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ Process *proc = handle->data;
+ if (proc->internal_close_cb) {
+ proc->internal_close_cb(proc);
+ }
+}
+
+static void exit_cb(uv_process_t *handle, int64_t status, int term_signal)
+{
+ Process *proc = handle->data;
+ proc->status = (int)status;
+ proc->internal_exit_cb(proc);
+}
diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h
new file mode 100644
index 0000000000..a17f1446b3
--- /dev/null
+++ b/src/nvim/event/uv_process.h
@@ -0,0 +1,25 @@
+#ifndef NVIM_EVENT_UV_PROCESS_H
+#define NVIM_EVENT_UV_PROCESS_H
+
+#include <uv.h>
+
+#include "nvim/event/process.h"
+
+typedef struct uv_process {
+ Process process;
+ uv_process_t uv;
+ uv_process_options_t uvopts;
+ uv_stdio_container_t uvstdio[3];
+} UvProcess;
+
+static inline UvProcess uv_process_init(void *data)
+{
+ UvProcess rv;
+ rv.process = process_init(kProcessTypeUv, data);
+ return rv;
+}
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/uv_process.h.generated.h"
+#endif
+#endif // NVIM_EVENT_UV_PROCESS_H
diff --git a/src/nvim/lib/klist.h b/src/nvim/lib/klist.h
index 10d6846133..1280a927e8 100644
--- a/src/nvim/lib/klist.h
+++ b/src/nvim/lib/klist.h
@@ -136,6 +136,6 @@
// `break` statement is executed before the next iteration.
#define kl_iter(name, kl, p) kl_iter_at(name, kl, p, NULL)
#define kl_iter_at(name, kl, p, h) \
- for (kl1_##name *p = h ? h : kl->head; p != kl->tail; p = p->next)
+ for (kl1_##name **p = h ? h : &kl->head; *p != kl->tail; p = &(*p)->next)
#endif
diff --git a/src/nvim/main.c b/src/nvim/main.c
index ca55469730..e2ae63e134 100644
--- a/src/nvim/main.c
+++ b/src/nvim/main.c
@@ -63,7 +63,7 @@
#include "nvim/os/time.h"
#include "nvim/event/loop.h"
#include "nvim/os/signal.h"
-#include "nvim/os/job.h"
+#include "nvim/event/process.h"
#include "nvim/msgpack_rpc/defs.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/msgpack_rpc/server.h"
@@ -149,7 +149,6 @@ void event_init(void)
// `event_poll`
// Signals
signal_init();
- job_init();
// finish mspgack-rpc initialization
channel_init();
server_init();
@@ -165,7 +164,7 @@ void event_teardown(void)
loop_process_all_events(&loop);
input_stop();
channel_teardown();
- job_teardown();
+ process_teardown(&loop);
server_teardown();
signal_teardown();
terminal_teardown();
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 577965e5ba..861614f147 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -10,11 +10,10 @@
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
#include "nvim/event/loop.h"
+#include "nvim/event/uv_process.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/event/socket.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@@ -35,7 +34,7 @@
typedef enum {
kChannelTypeSocket,
- kChannelTypeJob,
+ kChannelTypeProc,
kChannelTypeStdio
} ChannelType;
@@ -54,9 +53,14 @@ typedef struct {
ChannelType type;
msgpack_unpacker *unpacker;
union {
- Job *job;
Stream stream;
struct {
+ UvProcess uvproc;
+ Stream in;
+ Stream out;
+ Stream err;
+ } process;
+ struct {
Stream in;
Stream out;
} std;
@@ -110,34 +114,35 @@ void channel_teardown(void)
});
}
-/// Creates an API channel by starting a job and connecting to its
+/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_job(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeJob);
- incref(channel); // job channels are only closed by the exit_cb
-
- int status;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = channel;
- opts.stdout_cb = job_out;
- opts.stderr_cb = job_err;
- opts.exit_cb = job_exit;
- channel->data.job = job_start(opts, &status);
-
- if (status <= 0) {
- if (status == 0) { // Two decrefs needed if status == 0.
- decref(channel); // Only one needed if status < 0,
- } // because exit_cb will do the second one.
+uint64_t channel_from_process(char **argv)
+{
+ Channel *channel = register_channel(kChannelTypeProc);
+ channel->data.process.uvproc = uv_process_init(channel);
+ Process *proc = &channel->data.process.uvproc.process;
+ proc->argv = argv;
+ proc->in = &channel->data.process.in;
+ proc->out = &channel->data.process.out;
+ proc->err = &channel->data.process.err;
+ proc->cb = process_exit;
+ if (!process_spawn(&loop, proc)) {
+ loop_poll_events(&loop, 0);
decref(channel);
return 0;
}
+ incref(channel); // process channels are only closed by the exit_cb
+ wstream_init(proc->in, 0);
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, parse_msgpack);
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, forward_stderr);
+
return channel->id;
}
@@ -319,24 +324,17 @@ static void channel_from_stdio(void)
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
-static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof)
-{
- Job *job = data;
- parse_msgpack(stream, buf, job_data(job), eof);
-}
-
-static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof)
+static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s",
- ((Channel *)job_data(data))->id, buf);
+ ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
-static void job_exit(Job *job, int status, void *data)
+static void process_exit(Process *proc, int status, void *data)
{
decref(data);
}
@@ -511,8 +509,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
case kChannelTypeSocket:
success = wstream_write(&channel->data.stream, buffer);
break;
- case kChannelTypeJob:
- success = job_write(channel->data.job, buffer);
+ case kChannelTypeProc:
+ success = wstream_write(&channel->data.process.in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@@ -627,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/job and free the channel resources.
+/// Close the channel streams/process and free the channel resources.
static void close_channel(Channel *channel)
{
if (channel->closed) {
@@ -640,9 +638,9 @@ static void close_channel(Channel *channel)
case kChannelTypeSocket:
stream_close(&channel->data.stream, close_cb);
break;
- case kChannelTypeJob:
- if (channel->data.job) {
- job_stop(channel->data.job);
+ case kChannelTypeProc:
+ if (!channel->data.process.uvproc.process.closed) {
+ process_stop(&channel->data.process.uvproc.process);
}
break;
case kChannelTypeStdio:
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
deleted file mode 100644
index 71419cefca..0000000000
--- a/src/nvim/os/job.c
+++ /dev/null
@@ -1,444 +0,0 @@
-#include <stdint.h>
-#include <stdbool.h>
-
-#include <uv.h>
-
-#include "nvim/event/loop.h"
-#include "nvim/event/time.h"
-#include "nvim/event/signal.h"
-#include "nvim/event/rstream.h"
-#include "nvim/event/wstream.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/job_private.h"
-#include "nvim/os/pty_process.h"
-#include "nvim/os/time.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-
-#ifdef HAVE_SYS_WAIT_H
-# include <sys/wait.h>
-#endif
-
-// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit
-// before we send SIGNAL to it
-#define TERM_TIMEOUT 1000000000
-#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
-#define JOB_BUFFER_SIZE 0xFFFF
-
-#define close_job_stream(job, stream) \
- do { \
- if (!job->stream.closed) { \
- stream_close(&job->stream, on_##stream_close); \
- } \
- } while (0)
-
-#define close_job_in(job) close_job_stream(job, in)
-#define close_job_out(job) close_job_stream(job, out)
-#define close_job_err(job) close_job_stream(job, err)
-
-Job *table[MAX_RUNNING_JOBS] = {NULL};
-size_t stop_requests = 0;
-TimeWatcher job_stop_timer;
-SignalWatcher schld;
-
-// Some helpers shared in this module
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/job.c.generated.h"
-#endif
-// Callbacks for libuv
-
-/// Initializes job control resources
-void job_init(void)
-{
- uv_disable_stdio_inheritance();
- time_watcher_init(&loop, &job_stop_timer, NULL);
- signal_watcher_init(&loop, &schld, NULL);
- signal_watcher_start(&schld, chld_handler, SIGCHLD);
-}
-
-/// Releases job control resources and terminates running jobs
-void job_teardown(void)
-{
- // Stop all jobs
- for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
- Job *job;
- if ((job = table[i]) != NULL) {
- uv_kill(job->pid, SIGTERM);
- job->term_sent = true;
- job_stop(job);
- }
- }
-
- // Wait until all jobs are closed
- LOOP_POLL_EVENTS_UNTIL(&loop, -1, !stop_requests);
- signal_watcher_stop(&schld);
- signal_watcher_close(&schld, NULL);
- // Close the timer
- time_watcher_close(&job_stop_timer, NULL);
-}
-
-/// Tries to start a new job.
-///
-/// @param[out] status The job id if the job started successfully, 0 if the job
-/// table is full, -1 if the program could not be executed.
-/// @return The job pointer if the job started successfully, NULL otherwise
-Job *job_start(JobOptions opts, int *status)
-{
- int i;
- Job *job;
-
- // Search for a free slot in the table
- for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if (table[i] == NULL) {
- break;
- }
- }
-
- if (i == MAX_RUNNING_JOBS) {
- // No free slots
- shell_free_argv(opts.argv);
- *status = 0;
- return NULL;
- }
-
- job = xmalloc(sizeof(Job));
- // Initialize
- job->id = i + 1;
- *status = job->id;
- job->status = -1;
- job->refcount = 1;
- job->stopped_time = 0;
- job->term_sent = false;
- job->opts = opts;
- job->closed = false;
- job->in.closed = true;
- job->out.closed = true;
- job->err.closed = true;
-
- // Spawn the job
- if (!process_spawn(job)) {
- if (job->opts.writable) {
- uv_close((uv_handle_t *)job->proc_stdin, NULL);
- }
- if (job->opts.stdout_cb) {
- uv_close((uv_handle_t *)job->proc_stdout, NULL);
- }
- if (job->opts.stderr_cb) {
- uv_close((uv_handle_t *)job->proc_stderr, NULL);
- }
- process_close(job);
- loop_poll_events(&loop, 0);
- *status = -1;
- return NULL;
- }
-
- if (opts.writable) {
- job->refcount++;
- wstream_init_stream(&job->in, job->proc_stdin, opts.maxmem, job);
- }
-
- // Start the readable streams
- if (opts.stdout_cb) {
- job->refcount++;
- rstream_init_stream(&job->out, job->proc_stdout, JOB_BUFFER_SIZE, job);
- rstream_start(&job->out, read_cb);
- }
-
- if (opts.stderr_cb) {
- job->refcount++;
- rstream_init_stream(&job->err, job->proc_stderr, JOB_BUFFER_SIZE, job);
- rstream_start(&job->err, read_cb);
- }
- // Save the job to the table
- table[i] = job;
-
- return job;
-}
-
-/// Finds a job instance by id
-///
-/// @param id The job id
-/// @return the Job instance
-Job *job_find(int id)
-{
- Job *job;
-
- if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
- || job->stopped_time) {
- return NULL;
- }
-
- return job;
-}
-
-/// Terminates a job. This is a non-blocking operation, but if the job exists
-/// it's guaranteed to succeed(SIGKILL will eventually be sent)
-///
-/// @param job The Job instance
-void job_stop(Job *job)
-{
- if (job->stopped_time) {
- return;
- }
-
- job->stopped_time = os_hrtime();
- if (job->opts.pty) {
- // close all streams for pty jobs to send SIGHUP to the process
- job_close_streams(job);
- pty_process_close_master(job);
- } else {
- // Close the job's stdin. If the job doesn't close its own stdout/stderr,
- // they will be closed when the job exits(possibly due to being terminated
- // after a timeout)
- job_close_in(job);
- }
-
- if (!stop_requests++) {
- // When there's at least one stop request pending, start a timer that
- // will periodically check if a signal should be send to a to the job
- DLOG("Starting job kill timer");
- time_watcher_start(&job_stop_timer, job_stop_timer_cb, 100, 100);
- }
-}
-
-/// job_wait - synchronously wait for a job to finish
-///
-/// @param job The job instance
-/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
-/// waiting until the job quits.
-/// @return returns the status code of the exited job. -1 if the job is
-/// still running and the `timeout` has expired. Note that this is
-/// indistinguishable from the process returning -1 by itself. Which
-/// is possible on some OS. Returns -2 if the job was interrupted.
-int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
-{
- // The default status is -1, which represents a timeout
- int status = -1;
- bool interrupted = false;
-
- // Increase refcount to stop the job from being freed before we have a
- // chance to get the status.
- job->refcount++;
- LOOP_POLL_EVENTS_UNTIL(&loop, ms,
- // Until...
- got_int || // interrupted by the user
- job->refcount == 1); // job exited
-
- // we'll assume that a user frantically hitting interrupt doesn't like
- // the current job. Signal that it has to be killed.
- if (got_int) {
- interrupted = true;
- got_int = false;
- job_stop(job);
- if (ms == -1) {
- // We can only return, if all streams/handles are closed and the job
- // exited.
- LOOP_POLL_EVENTS_UNTIL(&loop, -1, job->refcount == 1);
- } else {
- loop_poll_events(&loop, 0);
- }
- }
-
- if (job->refcount == 1) {
- // Job exited, collect status and manually invoke close_cb to free the job
- // resources
- status = interrupted ? -2 : job->status;
- job_close_streams(job);
- job_decref(job);
- } else {
- job->refcount--;
- }
-
- return status;
-}
-
-/// Close the pipe used to write to the job.
-///
-/// This can be used for example to indicate to the job process that no more
-/// input is coming, and that it should shut down cleanly.
-///
-/// It has no effect when the input pipe doesn't exist or was already
-/// closed.
-///
-/// @param job The job instance
-void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- close_job_in(job);
-}
-
-// Close the job stdout stream.
-void job_close_out(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- close_job_out(job);
-}
-
-// Close the job stderr stream.
-void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- close_job_out(job);
-}
-
-/// All writes that complete after calling this function will be reported
-/// to `cb`.
-///
-/// Use this function to be notified about the status of an in-flight write.
-///
-/// @see {wstream_set_write_cb}
-///
-/// @param job The job instance
-/// @param cb The function that will be called on write completion or
-/// failure. It will be called with the job as the `data` argument.
-void job_write_cb(Job *job, stream_write_cb cb) FUNC_ATTR_NONNULL_ALL
-{
- wstream_set_write_cb(&job->in, cb);
-}
-
-/// Writes data to the job's stdin. This is a non-blocking operation, it
-/// returns when the write request was sent.
-///
-/// @param job The Job instance
-/// @param buffer The buffer which contains the data to be written
-/// @return true if the write request was successfully sent, false if writing
-/// to the job stream failed (possibly because the OS buffer is full)
-bool job_write(Job *job, WBuffer *buffer)
-{
- return wstream_write(&job->in, buffer);
-}
-
-/// Get the job id
-///
-/// @param job A pointer to the job
-/// @return The job id
-int job_id(Job *job)
-{
- return job->id;
-}
-
-// Get the job pid
-int job_pid(Job *job)
-{
- return job->pid;
-}
-
-/// Get data associated with a job
-///
-/// @param job A pointer to the job
-/// @return The job data
-void *job_data(Job *job)
-{
- return job->opts.data;
-}
-
-/// Resize the window for a pty job
-bool job_resize(Job *job, uint16_t width, uint16_t height)
-{
- if (!job->opts.pty) {
- return false;
- }
- pty_process_resize(job, width, height);
- return true;
-}
-
-void job_close_streams(Job *job)
-{
- close_job_in(job);
- close_job_out(job);
- close_job_err(job);
-}
-
-JobOptions *job_opts(Job *job)
-{
- return &job->opts;
-}
-
-/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
-/// that didn't die from SIGTERM after a while(exit_timeout is 0).
-static void job_stop_timer_cb(TimeWatcher *watcher, void *data)
-{
- Job *job;
- uint64_t now = os_hrtime();
-
- for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL || !job->stopped_time) {
- continue;
- }
-
- uint64_t elapsed = now - job->stopped_time;
-
- if (!job->term_sent && elapsed >= TERM_TIMEOUT) {
- ILOG("Sending SIGTERM to job(id: %d)", job->id);
- uv_kill(job->pid, SIGTERM);
- job->term_sent = true;
- } else if (elapsed >= KILL_TIMEOUT) {
- ILOG("Sending SIGKILL to job(id: %d)", job->id);
- uv_kill(job->pid, SIGKILL);
- process_close(job);
- }
- }
-}
-
-// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
-static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
-{
- Job *job = data;
-
- if (stream == &job->out) {
- job->opts.stdout_cb(stream, buf, data, eof);
- if (eof) {
- close_job_out(job);
- }
- } else {
- job->opts.stderr_cb(stream, buf, data, eof);
- if (eof) {
- close_job_err(job);
- }
- }
-}
-
-static void on_stream_close(Stream *stream, void *data)
-{
- job_decref(data);
-}
-
-static void job_exited(Event event)
-{
- Job *job = event.data;
- process_close(job);
-}
-
-static void chld_handler(SignalWatcher *watcher, int signum, void *data)
-{
- int stat = 0;
- int pid;
-
- do {
- pid = waitpid(-1, &stat, WNOHANG);
- } while (pid < 0 && errno == EINTR);
-
- if (pid <= 0) {
- return;
- }
-
- Job *job = NULL;
- // find the job corresponding to the exited pid
- for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) != NULL && job->pid == pid) {
- if (WIFEXITED(stat)) {
- job->status = WEXITSTATUS(stat);
- } else if (WIFSIGNALED(stat)) {
- job->status = WTERMSIG(stat);
- }
- if (exiting) {
- // don't enqueue more events when exiting
- process_close(job);
- } else {
- loop_push_event(&loop,
- (Event) {.handler = job_exited, .data = job}, false);
- }
- break;
- }
- }
-}
-
diff --git a/src/nvim/os/job.h b/src/nvim/os/job.h
deleted file mode 100644
index 2f8bf79a31..0000000000
--- a/src/nvim/os/job.h
+++ /dev/null
@@ -1,20 +0,0 @@
-// Job is a short name we use to refer to child processes that run in parallel
-// with the editor, probably executing long-running tasks and sending updates
-// asynchronously. Communication happens through anonymous pipes connected to
-// the job's std{in,out,err}. They are more like bash/zsh co-processes than the
-// usual shell background job. The name 'Job' was chosen because it applies to
-// the concept while being significantly shorter.
-#ifndef NVIM_OS_JOB_H
-#define NVIM_OS_JOB_H
-
-#include <stdint.h>
-#include <stdbool.h>
-
-#include "nvim/os/job_defs.h"
-#include "nvim/event/rstream.h"
-#include "nvim/event/wstream.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/job.h.generated.h"
-#endif
-#endif // NVIM_OS_JOB_H
diff --git a/src/nvim/os/job_defs.h b/src/nvim/os/job_defs.h
deleted file mode 100644
index ea7a326404..0000000000
--- a/src/nvim/os/job_defs.h
+++ /dev/null
@@ -1,64 +0,0 @@
-#ifndef NVIM_OS_JOB_DEFS_H
-#define NVIM_OS_JOB_DEFS_H
-
-#include <uv.h>
-
-#include "nvim/event/rstream.h"
-#include "nvim/event/wstream.h"
-
-#define MAX_RUNNING_JOBS 100
-typedef struct job Job;
-
-/// Function called when the job reads data
-///
-/// @param id The job id
-/// @param data Some data associated with the job by the caller
-typedef void (*job_exit_cb)(Job *job, int status, void *data);
-
-// Job startup options
-// job_exit_cb Callback that will be invoked when the job exits
-// maxmem Maximum amount of memory used by the job WStream
-typedef struct {
- // Argument vector for the process. The first item is the
- // executable to run.
- // [consumed]
- char **argv;
- // Caller data that will be associated with the job
- void *data;
- // If true the job stdin will be available for writing with job_write,
- // otherwise it will be redirected to /dev/null
- bool writable;
- // Callback that will be invoked when data is available on stdout. If NULL
- // stdout will be redirected to /dev/null.
- stream_read_cb stdout_cb;
- // Callback that will be invoked when data is available on stderr. If NULL
- // stderr will be redirected to /dev/null.
- stream_read_cb stderr_cb;
- // Callback that will be invoked when the job has exited and will not send
- // data
- job_exit_cb exit_cb;
- // Maximum memory used by the job's WStream
- size_t maxmem;
- // Connect the job to a pseudo terminal
- bool pty;
- // Initial window dimensions if the job is connected to a pseudo terminal
- uint16_t width, height;
- // Value for the $TERM environment variable. A default value of "ansi" is
- // assumed if NULL
- char *term_name;
-} JobOptions;
-
-#define JOB_OPTIONS_INIT ((JobOptions) { \
- .argv = NULL, \
- .data = NULL, \
- .writable = true, \
- .stdout_cb = NULL, \
- .stderr_cb = NULL, \
- .exit_cb = NULL, \
- .maxmem = 0, \
- .pty = false, \
- .width = 80, \
- .height = 24, \
- .term_name = NULL \
- })
-#endif // NVIM_OS_JOB_DEFS_H
diff --git a/src/nvim/os/job_private.h b/src/nvim/os/job_private.h
deleted file mode 100644
index 6bdb24e6cd..0000000000
--- a/src/nvim/os/job_private.h
+++ /dev/null
@@ -1,101 +0,0 @@
-#ifndef NVIM_OS_JOB_PRIVATE_H
-#define NVIM_OS_JOB_PRIVATE_H
-
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/event/time.h"
-#include "nvim/event/rstream.h"
-#include "nvim/event/wstream.h"
-#include "nvim/os/pipe_process.h"
-#include "nvim/os/pty_process.h"
-#include "nvim/os/shell.h"
-#include "nvim/log.h"
-#include "nvim/memory.h"
-
-struct job {
- // Job id the index in the job table plus one.
- int id;
- // Process id
- int pid;
- // Exit status code of the job process
- int status;
- // Number of references to the job. The job resources will only be freed by
- // close_cb when this is 0
- int refcount;
- // Time when job_stop was called for the job.
- uint64_t stopped_time;
- // If SIGTERM was already sent to the job(only send one before SIGKILL)
- bool term_sent;
- // stdio streams(std{in,out,err})
- Stream in, out, err;
- // Libuv streams representing stdin/stdout/stderr
- uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr;
- // Extra data set by the process spawner
- union {
- UvProcess uv;
- PtyProcess pty;
- } process;
- // If process_close has been called on this job
- bool closed;
- // Startup options
- JobOptions opts;
-};
-
-extern Job *table[];
-extern size_t stop_requests;
-extern TimeWatcher job_stop_timer;
-
-static inline bool process_spawn(Job *job)
-{
- return job->opts.pty ? pty_process_spawn(job) : pipe_process_spawn(job);
-}
-
-static inline void process_close(Job *job)
-{
- if (job->closed) {
- return;
- }
- job->closed = true;
- if (job->opts.pty) {
- pty_process_close(job);
- } else {
- pipe_process_close(job);
- }
-}
-
-static inline void job_exit_callback(Job *job)
-{
- // Free the slot now, 'exit_cb' may want to start another job to replace
- // this one
- table[job->id - 1] = NULL;
-
- if (job->opts.exit_cb) {
- // Invoke the exit callback
- job->opts.exit_cb(job, job->status, job->opts.data);
- }
-
- if (stop_requests && !--stop_requests) {
- // Stop the timer if no more stop requests are pending
- DLOG("Stopping job kill timer");
- time_watcher_stop(&job_stop_timer);
- }
-}
-
-static inline void job_decref(Job *job)
-{
- if (--job->refcount == 0) {
- // Invoke the exit_cb
- job_exit_callback(job);
- // Free all memory allocated for the job
- shell_free_argv(job->opts.argv);
- if (job->opts.pty) {
- xfree(job->opts.term_name);
- }
- xfree(job);
- }
-}
-
-
-#endif // NVIM_OS_JOB_PRIVATE_H
diff --git a/src/nvim/os/pipe_process.c b/src/nvim/os/pipe_process.c
deleted file mode 100644
index 1160015c34..0000000000
--- a/src/nvim/os/pipe_process.c
+++ /dev/null
@@ -1,88 +0,0 @@
-#include <stdbool.h>
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/job_private.h"
-#include "nvim/os/pipe_process.h"
-#include "nvim/memory.h"
-#include "nvim/vim.h"
-#include "nvim/globals.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pipe_process.c.generated.h"
-#endif
-
-bool pipe_process_spawn(Job *job)
-{
- UvProcess *pipeproc = &job->process.uv;
- pipeproc->proc_opts.file = job->opts.argv[0];
- pipeproc->proc_opts.args = job->opts.argv;
- pipeproc->proc_opts.stdio = pipeproc->stdio;
- pipeproc->proc_opts.stdio_count = 3;
- pipeproc->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE;
- pipeproc->proc_opts.exit_cb = exit_cb;
- pipeproc->proc_opts.cwd = NULL;
- pipeproc->proc_opts.env = NULL;
- pipeproc->proc.data = NULL;
- pipeproc->proc_stdin.data = NULL;
- pipeproc->proc_stdout.data = NULL;
- pipeproc->proc_stderr.data = NULL;
-
- // Initialize the job std{in,out,err}
- pipeproc->stdio[0].flags = UV_IGNORE;
- pipeproc->stdio[1].flags = UV_IGNORE;
- pipeproc->stdio[2].flags = UV_IGNORE;
-
- pipeproc->proc.data = job;
-
- if (job->opts.writable) {
- uv_pipe_init(&loop.uv, &pipeproc->proc_stdin, 0);
- pipeproc->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
- pipeproc->stdio[0].data.stream = (uv_stream_t *)&pipeproc->proc_stdin;
- }
-
- if (job->opts.stdout_cb) {
- uv_pipe_init(&loop.uv, &pipeproc->proc_stdout, 0);
- pipeproc->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- pipeproc->stdio[1].data.stream = (uv_stream_t *)&pipeproc->proc_stdout;
- }
-
- if (job->opts.stderr_cb) {
- uv_pipe_init(&loop.uv, &pipeproc->proc_stderr, 0);
- pipeproc->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- pipeproc->stdio[2].data.stream = (uv_stream_t *)&pipeproc->proc_stderr;
- }
-
- job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin;
- job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout;
- job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr;
-
- if (uv_spawn(&loop.uv, &pipeproc->proc, &pipeproc->proc_opts) != 0) {
- return false;
- }
-
- job->pid = pipeproc->proc.pid;
- return true;
-}
-
-void pipe_process_close(Job *job)
-{
- uv_close((uv_handle_t *)&job->process.uv.proc, close_cb);
-}
-
-static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
-{
- Job *job = proc->data;
- job->status = (int)status;
- pipe_process_close(job);
-}
-
-static void close_cb(uv_handle_t *handle)
-{
- Job *job = handle->data;
- job_close_streams(job);
- job_decref(job);
-}
diff --git a/src/nvim/os/pipe_process.h b/src/nvim/os/pipe_process.h
deleted file mode 100644
index 65e5cfa78f..0000000000
--- a/src/nvim/os/pipe_process.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef NVIM_OS_PIPE_PROCESS_H
-#define NVIM_OS_PIPE_PROCESS_H
-
-#include <uv.h>
-
-typedef struct {
- // Structures for process spawning/management used by libuv
- uv_process_t proc;
- uv_process_options_t proc_opts;
- uv_stdio_container_t stdio[3];
- uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
-} UvProcess;
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pipe_process.h.generated.h"
-#endif
-#endif // NVIM_OS_PIPE_PROCESS_H
diff --git a/src/nvim/os/pty_process.h b/src/nvim/os/pty_process.h
deleted file mode 100644
index b5a2eba8b3..0000000000
--- a/src/nvim/os/pty_process.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef NVIM_OS_PTY_PROCESS_H
-#define NVIM_OS_PTY_PROCESS_H
-
-#include <sys/ioctl.h>
-
-#include <uv.h>
-
-typedef struct {
- struct winsize winsize;
- uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
- int tty_fd;
-} PtyProcess;
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pty_process.h.generated.h"
-#endif
-#endif // NVIM_OS_PTY_PROCESS_H
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 04ac9f1c03..e0d67d4951 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -9,7 +9,7 @@
#include "nvim/lib/kvec.h"
#include "nvim/log.h"
#include "nvim/event/loop.h"
-#include "nvim/os/job.h"
+#include "nvim/event/uv_process.h"
#include "nvim/event/rstream.h"
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
@@ -204,17 +204,15 @@ static int do_os_system(char **argv,
char prog[MAXPATHL];
xstrlcpy(prog, argv[0], MAXPATHL);
- int status;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = &buf;
- opts.writable = input != NULL;
- opts.stdout_cb = data_cb;
- opts.stderr_cb = data_cb;
- opts.exit_cb = NULL;
- Job *job = job_start(opts, &status);
-
- if (status <= 0) {
+ Stream in, out, err;
+ UvProcess uvproc = uv_process_init(&buf);
+ Process *proc = &uvproc.process;
+ proc->argv = argv;
+ proc->in = input != NULL ? &in : NULL;
+ proc->out = &out;
+ proc->err = &err;
+ if (!process_spawn(&loop, proc)) {
+ loop_poll_events(&loop, 0);
// Failed, probably due to `sh` not being executable
if (!silent) {
MSG_PUTS(_("\nCannot execute "));
@@ -224,28 +222,32 @@ static int do_os_system(char **argv,
return -1;
}
+ if (input != NULL) {
+ wstream_init(proc->in, 0);
+ }
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, data_cb);
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, data_cb);
+
// write the input, if any
if (input) {
WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL);
- if (!job_write(job, input_buffer)) {
- // couldn't write, stop the job and tell the user about it
- job_stop(job);
+ if (!wstream_write(&in, input_buffer)) {
+ // couldn't write, stop the process and tell the user about it
+ process_stop(proc);
return -1;
}
// close the input stream after everything is written
- job_write_cb(job, shell_write_cb);
- } else {
- // close the input stream, let the process know that no more input is
- // coming
- job_close_in(job);
+ wstream_set_write_cb(&in, shell_write_cb);
}
// invoke busy_start here so event_poll_until wont change the busy state for
// the UI
ui_busy_start();
ui_flush();
- status = job_wait(job, -1);
+ int status = process_wait(proc, -1);
ui_busy_stop();
// prepare the out parameters if requested
@@ -285,8 +287,7 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
{
- Job *job = data;
- DynamicBuffer *dbuf = job_data(job);
+ DynamicBuffer *dbuf = data;
size_t nread = buf->size;
dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1);
@@ -472,6 +473,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
static void shell_write_cb(Stream *stream, void *data, int status)
{
- Job *job = data;
- job_close_in(job);
+ stream_close(stream, NULL);
}
diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c
index f568f5a7f1..122b3a171d 100644
--- a/src/nvim/os_unix.c
+++ b/src/nvim/os_unix.c
@@ -50,7 +50,6 @@
#include "nvim/os/input.h"
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
-#include "nvim/os/job.h"
#include "nvim/msgpack_rpc/helpers.h"
#ifdef HAVE_STROPTS_H