diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:32:07 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:32:07 -0300 |
commit | aa9cb48bf08af14068178619414590254b263882 (patch) | |
tree | b555f3a48c08862c07ef7518a8ba6c8fa58c1aee | |
parent | 9d8d2b7fa83fd69d1d616728c505a41acf8fedbb (diff) | |
download | rneovim-aa9cb48bf08af14068178619414590254b263882.tar.gz rneovim-aa9cb48bf08af14068178619414590254b263882.tar.bz2 rneovim-aa9cb48bf08af14068178619414590254b263882.zip |
job: Replace by a better process abstraction layer
- New libuv/pty process abstraction with simplified API and no globals.
- Remove nvim/os/job*. Jobs are now a concept that apply only to programs
spawned by vimscript job* functions.
- Refactor shell.c/channel.c to use the new module, which brings a number of
advantages:
- Simplified API, less code
- No slots in the user job table are used
- Not possible to acidentally receive data from vimscript
- Implement job table in eval.c, which is now a hash table with unilimited job
slots and unique job ids.
-rw-r--r-- | src/nvim/eval.c | 239 | ||||
-rw-r--r-- | src/nvim/event/loop.c | 7 | ||||
-rw-r--r-- | src/nvim/event/loop.h | 4 | ||||
-rw-r--r-- | src/nvim/event/process.c | 325 | ||||
-rw-r--r-- | src/nvim/event/process.h | 56 | ||||
-rw-r--r-- | src/nvim/event/pty_process.c (renamed from src/nvim/os/pty_process.c) | 159 | ||||
-rw-r--r-- | src/nvim/event/pty_process.h | 30 | ||||
-rw-r--r-- | src/nvim/event/uv_process.c | 77 | ||||
-rw-r--r-- | src/nvim/event/uv_process.h | 25 | ||||
-rw-r--r-- | src/nvim/lib/klist.h | 2 | ||||
-rw-r--r-- | src/nvim/main.c | 5 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 76 | ||||
-rw-r--r-- | src/nvim/os/job.c | 444 | ||||
-rw-r--r-- | src/nvim/os/job.h | 20 | ||||
-rw-r--r-- | src/nvim/os/job_defs.h | 64 | ||||
-rw-r--r-- | src/nvim/os/job_private.h | 101 | ||||
-rw-r--r-- | src/nvim/os/pipe_process.c | 88 | ||||
-rw-r--r-- | src/nvim/os/pipe_process.h | 17 | ||||
-rw-r--r-- | src/nvim/os/pty_process.h | 17 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 50 | ||||
-rw-r--r-- | src/nvim/os_unix.c | 1 |
21 files changed, 791 insertions, 1016 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index ae6b99c336..b43ab238cd 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -55,6 +55,7 @@ #include "nvim/misc1.h" #include "nvim/misc2.h" #include "nvim/keymap.h" +#include "nvim/map.h" #include "nvim/file_search.h" #include "nvim/garray.h" #include "nvim/move.h" @@ -82,8 +83,10 @@ #include "nvim/version.h" #include "nvim/window.h" #include "nvim/os/os.h" -#include "nvim/os/job.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/pty_process.h" #include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" #include "nvim/os/time.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" @@ -442,14 +445,19 @@ static dictitem_T vimvars_var; /* variable used for v: */ #define vimvarht vimvardict.dv_hashtab typedef struct { - Job *job; + union { + UvProcess uv; + PtyProcess pty; + } proc; + Stream in, out, err; Terminal *term; + bool stopped; bool exited; - bool stdin_closed; int refcount; ufunc_T *on_stdout, *on_stderr, *on_exit; dict_T *self; int *status_ptr; + uint64_t id; } TerminalJobData; @@ -462,7 +470,6 @@ typedef struct { valid character */ // Memory pool for reusing JobEvent structures typedef struct { - int job_id; TerminalJobData *data; ufunc_T *callback; const char *type; @@ -470,12 +477,15 @@ typedef struct { int status; } JobEvent; static int disable_job_defer = 0; +static uint64_t current_job_id = 1; +static PMap(uint64_t) *jobs = NULL; /* * Initialize the global and v: variables. */ void eval_init(void) { + jobs = pmap_new(uint64_t)(); int i; struct vimvar *p; @@ -10692,29 +10702,27 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv) return; } - Job *job = job_find(argvars[0].vval.v_number); - - if (!is_user_job(job)) { - // Invalid job id + TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + if (!data) { EMSG(_(e_invjob)); return; } + Process *proc = (Process *)&data->proc; + if (argvars[1].v_type == VAR_STRING) { char *stream = (char *)argvars[1].vval.v_string; if (!strcmp(stream, "stdin")) { - job_close_in(job); - ((TerminalJobData *)job_data(job))->stdin_closed = true; + process_close_in(proc); } else if (!strcmp(stream, "stdout")) { - job_close_out(job); + process_close_out(proc); } else if (!strcmp(stream, "stderr")) { - job_close_err(job); + process_close_err(proc); } else { EMSG2(_("Invalid job stream \"%s\""), stream); } } else { - ((TerminalJobData *)job_data(job))->stdin_closed = true; - job_close_streams(job); + process_close_streams(proc); } } @@ -10735,15 +10743,13 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv) return; } - Job *job = job_find(argvars[0].vval.v_number); - - if (!is_user_job(job)) { - // Invalid job id + TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + if (!data) { EMSG(_(e_invjob)); return; } - if (((TerminalJobData *)job_data(job))->stdin_closed) { + if (((Process *)&data->proc)->in->closed) { EMSG(_("Can't send data to the job: stdin is closed")); return; } @@ -10757,7 +10763,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv) } WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); - rettv->vval.v_number = job_write(job, buf); + rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf); } // "jobresize(job, width, height)" function @@ -10777,19 +10783,20 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv) return; } - Job *job = job_find(argvars[0].vval.v_number); - if (!is_user_job(job)) { - // Probably an invalid job id + TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + if (!data) { EMSG(_(e_invjob)); return; } - if (!job_resize(job, argvars[1].vval.v_number, argvars[2].vval.v_number)) { + if (data->proc.uv.process.type != kProcessTypePty) { EMSG(_(e_jobnotpty)); return; } + pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, + argvars[2].vval.v_number); rettv->vval.v_number = 1; } @@ -10878,37 +10885,33 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv) } } - JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit, - job_opts); + bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0; + TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, + job_opts, pty); + Process *proc = (Process *)&data->proc; - if (!job_opts) { - goto start; - } - - opts.pty = get_dict_number(job_opts, (uint8_t *)"pty"); - if (opts.pty) { + if (pty) { uint16_t width = get_dict_number(job_opts, (uint8_t *)"width"); if (width > 0) { - opts.width = width; + data->proc.pty.width = width; } uint16_t height = get_dict_number(job_opts, (uint8_t *)"height"); if (height > 0) { - opts.height = height; + data->proc.pty.height = height; } char *term = (char *)get_dict_string(job_opts, (uint8_t *)"TERM", true); if (term) { - opts.term_name = term; + data->proc.pty.term_name = term; } } -start: if (!on_stdout) { - opts.stdout_cb = NULL; + proc->out = NULL; } if (!on_stderr) { - opts.stderr_cb = NULL; + proc->err = NULL; } - common_job_start(opts, rettv); + common_job_start(data, rettv); } // "jobstop()" function @@ -10927,14 +10930,15 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv) return; } - Job *job = job_find(argvars[0].vval.v_number); - if (!is_user_job(job)) { + TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + if (!data || data->stopped) { EMSG(_(e_invjob)); return; } - job_stop(job); + process_stop((Process *)&data->proc); + data->stopped = true; rettv->vval.v_number = 1; } @@ -10971,13 +10975,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Job *job = NULL; + TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(job = job_find(arg->li_tv.vval.v_number)) - || !is_user_job(job)) { + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { list_append_number(rv, -3); } else { - TerminalJobData *data = job_data(job); // append the list item and set the status pointer so we'll collect the // status code when the job exits list_append_number(rv, -1); @@ -10993,18 +10995,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) } for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Job *job = NULL; + TerminalJobData *data = NULL; if (remaining == 0) { // timed out break; } if (arg->li_tv.v_type != VAR_NUMBER - || !(job = job_find(arg->li_tv.vval.v_number)) - || !is_user_job(job)) { + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { continue; } - TerminalJobData *data = job_data(job); - int status = job_wait(job, remaining); + int status = process_wait((Process *)&data->proc, remaining); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11028,13 +11028,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) loop_poll_events(&loop, 0); for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Job *job = NULL; + TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(job = job_find(arg->li_tv.vval.v_number)) - || !is_user_job(job)) { + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { continue; } - TerminalJobData *data = job_data(job); // remove the status pointer because the list may be freed before the // job exits data->status_ptr = NULL; @@ -12951,7 +12949,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv) // The last item of argv must be NULL argv[i] = NULL; - uint64_t channel_id = channel_from_job(argv); + uint64_t channel_id = channel_from_process(argv); if (!channel_id) { EMSG(_(e_api_spawn_failed)); @@ -15225,19 +15223,15 @@ static void f_termopen(typval_T *argvars, typval_T *rettv) } } - JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit, - job_opts); - opts.pty = true; - opts.width = curwin->w_width; - opts.height = curwin->w_height; - opts.term_name = xstrdup("xterm-256color"); - Job *job = common_job_start(opts, rettv); - if (!job) { - shell_free_argv(argv); + TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, + job_opts, true); + data->proc.pty.width = curwin->w_width; + data->proc.pty.height = curwin->w_height; + data->proc.pty.term_name = xstrdup("xterm-256color"); + if (!common_job_start(data, rettv)) { return; } - TerminalJobData *data = opts.data; - TerminalOptions topts = TERMINAL_OPTIONS_INIT; + TerminalOptions topts; topts.data = data; topts.width = curwin->w_width; topts.height = curwin->w_height; @@ -15250,7 +15244,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv) && os_isdir(argvars[1].vval.v_string)) { cwd = (char *)argvars[1].vval.v_string; } - int pid = job_pid(job); + int pid = data->proc.pty.process.pid; // Get the desired name of the buffer. char *name = job_opts ? @@ -20222,21 +20216,27 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags) return ret; } -static inline JobOptions common_job_options(char **argv, ufunc_T *on_stdout, - ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self) +static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, + ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self, bool pty) { TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); + data->stopped = false; data->on_stdout = on_stdout; data->on_stderr = on_stderr; data->on_exit = on_exit; data->self = self; - JobOptions opts = JOB_OPTIONS_INIT; - opts.argv = argv; - opts.data = data; - opts.stdout_cb = on_job_stdout; - opts.stderr_cb = on_job_stderr; - opts.exit_cb = on_job_exit; - return opts; + if (pty) { + data->proc.pty = pty_process_init(data); + } else { + data->proc.uv = uv_process_init(data); + } + Process *proc = (Process *)&data->proc; + proc->argv = argv; + proc->in = &data->in; + proc->out = &data->out; + proc->err = &data->err; + proc->cb = on_process_exit; + return data; } /// Return true/false on success/failure. @@ -20262,24 +20262,28 @@ static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout, return false; } -static inline Job *common_job_start(JobOptions opts, typval_T *rettv) +static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) { - TerminalJobData *data = opts.data; data->refcount++; - Job *job = job_start(opts, &rettv->vval.v_number); + Process *proc = (Process *)&data->proc; + if (!process_spawn(&loop, proc)) { + EMSG(_(e_jobexe)); + return false; + } - if (rettv->vval.v_number <= 0) { - if (rettv->vval.v_number == 0) { - EMSG(_(e_jobtblfull)); - xfree(opts.term_name); - free_term_job_data(data); - } else { - EMSG(_(e_jobexe)); - } - return NULL; + data->id = current_job_id++; + wstream_init(proc->in, 0); + if (proc->out) { + rstream_init(proc->out, 0); + rstream_start(proc->out, on_job_stdout); + } + if (proc->err) { + rstream_init(proc->err, 0); + rstream_start(proc->err, on_job_stderr); } - data->job = job; - return job; + pmap_put(uint64_t)(jobs, data->id, data); + rettv->vval.v_number = data->id; + return true; } static inline void free_term_job_data(TerminalJobData *data) { @@ -20300,25 +20304,15 @@ static inline void free_term_job_data(TerminalJobData *data) { xfree(data); } -static inline bool is_user_job(Job *job) -{ - if (!job) { - return false; - } - - JobOptions *opts = job_opts(job); - return opts->exit_cb == on_job_exit; -} - // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(Job *job, ufunc_T *callback, - const char *type, char *data, size_t count, int status) +static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, + const char *type, char *buf, size_t count, int status) { JobEvent *event_data = xmalloc(sizeof(JobEvent)); event_data->received = NULL; - if (data) { + if (buf) { event_data->received = list_alloc(); - char *ptr = data; + char *ptr = buf; size_t remaining = count; size_t off = 0; @@ -20342,8 +20336,7 @@ static inline void push_job_event(Job *job, ufunc_T *callback, } else { event_data->status = status; } - event_data->job_id = job_id(job); - event_data->data = job_data(job); + event_data->data = data; event_data->callback = callback; event_data->type = type; loop_push_event(&loop, (Event) { @@ -20354,24 +20347,23 @@ static inline void push_job_event(Job *job, ufunc_T *callback, static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof) { - TerminalJobData *data = job_data(job); + TerminalJobData *data = job; on_job_output(stream, job, buf, eof, data->on_stdout, "stdout"); } static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof) { - TerminalJobData *data = job_data(job); + TerminalJobData *data = job; on_job_output(stream, job, buf, eof, data->on_stderr, "stderr"); } -static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof, - ufunc_T *callback, const char *type) +static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, + bool eof, ufunc_T *callback, const char *type) { if (eof) { return; } - TerminalJobData *data = job_data(job); RBUFFER_UNTIL_EMPTY(buf, ptr, len) { // The order here matters, the terminal must receive the data first because // push_job_event will modify the read buffer(convert NULs into NLs) @@ -20380,17 +20372,16 @@ static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof, } if (callback) { - push_job_event(job, callback, type, ptr, len, 0); + push_job_event(data, callback, type, ptr, len, 0); } rbuffer_consumed(buf, len); } } -static void on_job_exit(Job *job, int status, void *d) +static void on_process_exit(Process *proc, int status, void *d) { TerminalJobData *data = d; - if (data->term && !data->exited) { data->exited = true; terminal_close(data->term, @@ -20401,19 +20392,20 @@ static void on_job_exit(Job *job, int status, void *d) *data->status_ptr = status; } - push_job_event(job, data->on_exit, "exit", NULL, 0, status); + push_job_event(data, data->on_exit, "exit", NULL, 0, status); } -static void term_write(char *buf, size_t size, void *data) +static void term_write(char *buf, size_t size, void *d) { - Job *job = ((TerminalJobData *)data)->job; + TerminalJobData *data = d; WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); - job_write(job, wbuf); + wstream_write(&data->in, wbuf); } -static void term_resize(uint16_t width, uint16_t height, void *data) +static void term_resize(uint16_t width, uint16_t height, void *d) { - job_resize(((TerminalJobData *)data)->job, width, height); + TerminalJobData *data = d; + pty_process_resize(&data->proc.pty, width, height); } static void term_close(void *d) @@ -20421,7 +20413,7 @@ static void term_close(void *d) TerminalJobData *data = d; if (!data->exited) { data->exited = true; - job_stop(data->job); + process_stop((Process *)&data->proc); } terminal_destroy(data->term); term_job_data_decref(d); @@ -20448,7 +20440,7 @@ static void on_job_event(Event event) if (argc > 0) { argv[0].v_type = VAR_NUMBER; argv[0].v_lock = 0; - argv[0].vval.v_number = ev->job_id; + argv[0].vval.v_number = ev->data->id; } if (argc > 1) { @@ -20479,6 +20471,7 @@ static void on_job_event(Event event) end: if (!ev->received) { // exit event, safe to free job data now + pmap_del(uint64_t)(jobs, ev->data->id); term_job_data_decref(ev->data); } xfree(ev); diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index c467ae8b96..d90565002e 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -3,6 +3,7 @@ #include <uv.h> #include "nvim/event/loop.h" +#include "nvim/event/process.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.c.generated.h" @@ -15,6 +16,10 @@ void loop_init(Loop *loop, void *data) loop->uv.data = loop; loop->deferred_events = kl_init(Event); loop->immediate_events = kl_init(Event); + loop->children = kl_init(WatcherPtr); + loop->children_stop_requests = 0; + uv_signal_init(&loop->uv, &loop->children_watcher); + uv_timer_init(&loop->uv, &loop->children_kill_timer); } void loop_poll_events(Loop *loop, int ms) @@ -113,6 +118,8 @@ void loop_stop(Loop *loop) void loop_close(Loop *loop) { + uv_close((uv_handle_t *)&loop->children_watcher, NULL); + uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); do { uv_run(&loop->uv, UV_RUN_DEFAULT); } while (uv_loop_close(&loop->uv)); diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index e5a890dddb..5eb4d32ca8 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -26,6 +26,10 @@ typedef struct loop { uv_loop_t uv; klist_t(Event) *deferred_events, *immediate_events; int deferred_events_allowed; + klist_t(WatcherPtr) *children; + uv_signal_t children_watcher; + uv_timer_t children_kill_timer; + size_t children_stop_requests; } Loop; // Poll for events until a condition or timeout diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c new file mode 100644 index 0000000000..2b1f1ae096 --- /dev/null +++ b/src/nvim/event/process.c @@ -0,0 +1,325 @@ +#include <assert.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/os/shell.h" +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/pty_process.h" +#include "nvim/globals.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/process.c.generated.h" +#endif + +// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly +// exit before we send SIGNAL to it +#define TERM_TIMEOUT 1000000000 +#define KILL_TIMEOUT (TERM_TIMEOUT * 2) + +#define CLOSE_PROC_STREAM(proc, stream) \ + do { \ + if (proc->stream && !proc->stream->closed) { \ + stream_close(proc->stream, NULL); \ + } \ + } while (0) + + +bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL +{ + proc->loop = loop; + if (proc->in) { + uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0); + } + + if (proc->out) { + uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0); + } + + if (proc->err) { + uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0); + } + + bool success; + switch (proc->type) { + case kProcessTypeUv: + success = uv_process_spawn((UvProcess *)proc); + break; + case kProcessTypePty: + success = pty_process_spawn((PtyProcess *)proc); + break; + default: + abort(); + } + + if (!success) { + if (proc->in) { + uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL); + } + if (proc->out) { + uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL); + } + if (proc->err) { + uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL); + } + process_close(proc); + shell_free_argv(proc->argv); + proc->status = -1; + return false; + } + + void *data = proc->data; + + if (proc->in) { + stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->internal_data = proc; + proc->in->internal_close_cb = on_process_stream_close; + proc->refcount++; + } + + if (proc->out) { + stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->internal_data = proc; + proc->out->internal_close_cb = on_process_stream_close; + proc->refcount++; + } + + if (proc->err) { + stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->internal_data = proc; + proc->err->internal_close_cb = on_process_stream_close; + proc->refcount++; + } + + proc->internal_exit_cb = on_process_exit; + proc->internal_close_cb = decref; + proc->refcount++; + kl_push(WatcherPtr, loop->children, proc); + return true; +} + +void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL +{ + kl_iter(WatcherPtr, loop->children, current) { + Process *proc = (*current)->data; + uv_kill(proc->pid, SIGTERM); + proc->term_sent = true; + process_stop(proc); + } + + // Wait until all children exit + LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + pty_process_teardown(loop); +} + +// Wrappers around `stream_close` that protect against double-closing. +void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + process_close_in(proc); + process_close_out(proc); + process_close_err(proc); +} + +void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + CLOSE_PROC_STREAM(proc, in); +} + +void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + CLOSE_PROC_STREAM(proc, out); +} + +void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + CLOSE_PROC_STREAM(proc, err); +} + +/// Synchronously wait for a process to finish +/// +/// @param process The Process instance +/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for +/// waiting until the process quits. +/// @return returns the status code of the exited process. -1 if the process is +/// still running and the `timeout` has expired. Note that this is +/// indistinguishable from the process returning -1 by itself. Which +/// is possible on some OS. Returns -2 if an user has interruped the +/// wait. +int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +{ + // The default status is -1, which represents a timeout + int status = -1; + bool interrupted = false; + + // Increase refcount to stop the exit callback from being called(and possibly + // being freed) before we have a chance to get the status. + proc->refcount++; + LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + // Until... + got_int || // interrupted by the user + proc->refcount == 1); // job exited + + // we'll assume that a user frantically hitting interrupt doesn't like + // the current job. Signal that it has to be killed. + if (got_int) { + interrupted = true; + got_int = false; + process_stop(proc); + if (ms == -1) { + // We can only return, if all streams/handles are closed and the job + + // exited. + LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + } else { + loop_poll_events(proc->loop, 0); + } + } + + if (proc->refcount == 1) { + // Job exited, collect status and manually invoke close_cb to free the job + // resources + status = interrupted ? -2 : proc->status; + decref(proc); + } else { + proc->refcount--; + } + + return status; +} + +/// Ask a process to terminate and eventually kill if it doesn't respond +void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + if (proc->stopped_time) { + return; + } + + proc->stopped_time = os_hrtime(); + switch (proc->type) { + case kProcessTypeUv: + // Close the process's stdin. If the process doesn't close its own + // stdout/stderr, they will be closed when it exits(possibly due to being + // terminated after a timeout) + process_close_in(proc); + break; + case kProcessTypePty: + // close all streams for pty processes to send SIGHUP to the process + process_close_streams(proc); + pty_process_close_master((PtyProcess *)proc); + break; + default: + abort(); + } + + Loop *loop = proc->loop; + if (!loop->children_stop_requests++) { + // When there's at least one stop request pending, start a timer that + // will periodically check if a signal should be send to a to the job + DLOG("Starting job kill timer"); + uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100); + } +} + +/// Iterates the process list sending SIGTERM to stopped processes and SIGKILL +/// to those that didn't die from SIGTERM after a while(exit_timeout is 0). +static void children_kill_cb(uv_timer_t *handle) +{ + Loop *loop = handle->loop->data; + uint64_t now = os_hrtime(); + + kl_iter(WatcherPtr, loop->children, current) { + Process *proc = (*current)->data; + if (!proc->stopped_time) { + continue; + } + uint64_t elapsed = now - proc->stopped_time; + + if (!proc->term_sent && elapsed >= TERM_TIMEOUT) { + ILOG("Sending SIGTERM to pid %d", proc->pid); + uv_kill(proc->pid, SIGTERM); + proc->term_sent = true; + } else if (elapsed >= KILL_TIMEOUT) { + ILOG("Sending SIGKILL to pid %d", proc->pid); + uv_kill(proc->pid, SIGKILL); + } + } +} + +static void decref(Process *proc) +{ + if (--proc->refcount != 0) { + return; + } + + Loop *loop = proc->loop; + kliter_t(WatcherPtr) **node = NULL; + kl_iter(WatcherPtr, loop->children, current) { + if ((*current)->data == proc) { + node = current; + break; + } + } + + assert(node); + kl_shift_at(WatcherPtr, loop->children, node); + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + +static void process_close(Process *proc) + FUNC_ATTR_NONNULL_ARG(1) +{ + assert(!proc->closed); + proc->closed = true; + switch (proc->type) { + case kProcessTypeUv: + uv_process_close((UvProcess *)proc); + break; + case kProcessTypePty: + pty_process_close((PtyProcess *)proc); + break; + default: + abort(); + } +} + +static void on_process_exit(Process *proc) +{ + if (exiting) { + on_process_exit_event((Event) {.data = proc}); + } else { + loop_push_event(proc->loop, + (Event) {.handler = on_process_exit_event, .data = proc}, false); + } + + Loop *loop = proc->loop; + if (loop->children_stop_requests && !--loop->children_stop_requests) { + // Stop the timer if no more stop requests are pending + DLOG("Stopping process kill timer"); + uv_timer_stop(&loop->children_kill_timer); + } +} + +static void on_process_exit_event(Event event) +{ + Process *proc = event.data; + process_close_streams(proc); + process_close(proc); +} + +static void on_process_stream_close(Stream *stream, void *data) +{ + Process *proc = data; + decref(proc); +} + diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h new file mode 100644 index 0000000000..5c84a7d1d0 --- /dev/null +++ b/src/nvim/event/process.h @@ -0,0 +1,56 @@ +#ifndef NVIM_EVENT_PROCESS_H +#define NVIM_EVENT_PROCESS_H + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" + +typedef enum { + kProcessTypeUv, + kProcessTypePty +} ProcessType; + +typedef struct process Process; +typedef void (*process_exit_cb)(Process *proc, int status, void *data); +typedef void (*internal_process_cb)(Process *proc); + +struct process { + ProcessType type; + Loop *loop; + void *data; + int pid, status, refcount; + // set to the hrtime of when process_stop was called for the process. + uint64_t stopped_time; + char **argv; + Stream *in, *out, *err; + process_exit_cb cb; + internal_process_cb internal_exit_cb, internal_close_cb; + bool closed, term_sent; +}; + +static inline Process process_init(ProcessType type, void *data) +{ + return (Process) { + .type = type, + .data = data, + .loop = NULL, + .pid = 0, + .status = 0, + .refcount = 0, + .stopped_time = 0, + .argv = NULL, + .in = NULL, + .out = NULL, + .err = NULL, + .cb = NULL, + .closed = false, + .term_sent = false, + .internal_close_cb = NULL, + .internal_exit_cb = NULL + }; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/process.h.generated.h" +#endif +#endif // NVIM_EVENT_PROCESS_H diff --git a/src/nvim/os/pty_process.c b/src/nvim/event/pty_process.c index a8fff2a61d..1e24d7c919 100644 --- a/src/nvim/os/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -20,58 +20,39 @@ #include <uv.h> -#include "nvim/func_attr.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/job_private.h" -#include "nvim/os/pty_process.h" -#include "nvim/memory.h" -#include "nvim/vim.h" -#include "nvim/globals.h" +#include "nvim/lib/klist.h" + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/pty_process.h" +#include "nvim/log.h" #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pty_process.c.generated.h" +# include "event/pty_process.c.generated.h" #endif static const unsigned int KILL_RETRIES = 5; static const unsigned int KILL_TIMEOUT = 2; // seconds -bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL +bool pty_process_spawn(PtyProcess *ptyproc) + FUNC_ATTR_NONNULL_ALL { - PtyProcess *ptyproc = &job->process.pty; - ptyproc->tty_fd = -1; - - if (job->opts.writable) { - uv_pipe_init(&loop.uv, &ptyproc->proc_stdin, 0); - ptyproc->proc_stdin.data = NULL; - } - - if (job->opts.stdout_cb) { - uv_pipe_init(&loop.uv, &ptyproc->proc_stdout, 0); - ptyproc->proc_stdout.data = NULL; - } - - if (job->opts.stderr_cb) { - uv_pipe_init(&loop.uv, &ptyproc->proc_stderr, 0); - ptyproc->proc_stderr.data = NULL; - } - - job->proc_stdin = (uv_stream_t *)&ptyproc->proc_stdin; - job->proc_stdout = (uv_stream_t *)&ptyproc->proc_stdout; - job->proc_stderr = (uv_stream_t *)&ptyproc->proc_stderr; - - int master; - ptyproc->winsize = (struct winsize){job->opts.height, job->opts.width, 0, 0}; + Process *proc = (Process *)ptyproc; + uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); + ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; struct termios termios; init_termios(&termios); uv_disable_stdio_inheritance(); - + int master; int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize); if (pid < 0) { + ELOG("forkpty failed: %s", strerror(errno)); return false; } else if (pid == 0) { - init_child(job); + init_child(ptyproc); abort(); } @@ -86,23 +67,18 @@ bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL goto error; } - if (job->opts.writable - && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdin)) { + if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) { goto error; } - - if (job->opts.stdout_cb - && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdout)) { + if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) { goto error; } - - if (job->opts.stderr_cb - && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stderr)) { + if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) { goto error; } ptyproc->tty_fd = master; - job->pid = pid; + proc->pid = pid; return true; error: @@ -117,54 +93,44 @@ error: } if (child != pid) { kill(pid, SIGKILL); + waitpid(pid, NULL, 0); } return false; } -static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe) +void pty_process_resize(PtyProcess *ptyproc, uint16_t width, + uint16_t height) FUNC_ATTR_NONNULL_ALL { - int fd_dup = dup(fd); - if (fd_dup < 0) { - ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); - return false; - } - int uv_result = uv_pipe_open(pipe, fd_dup); - if (uv_result) { - ELOG("Failed to set pipe to descriptor %d: %s", - fd_dup, uv_strerror(uv_result)); - close(fd_dup); - return false; - } - return true; + ptyproc->winsize = (struct winsize){height, width, 0, 0}; + ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize); } -void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL +void pty_process_close(PtyProcess *ptyproc) + FUNC_ATTR_NONNULL_ALL { - pty_process_close_master(job); - job_close_streams(job); - job_decref(job); + pty_process_close_master(ptyproc); + Process *proc = (Process *)ptyproc; + if (proc->internal_close_cb) { + proc->internal_close_cb(proc); + } } -void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL +void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { - PtyProcess *ptyproc = &job->process.pty; if (ptyproc->tty_fd >= 0) { close(ptyproc->tty_fd); ptyproc->tty_fd = -1; } } -void pty_process_resize(Job *job, uint16_t width, uint16_t height) - FUNC_ATTR_NONNULL_ALL +void pty_process_teardown(Loop *loop) { - PtyProcess *ptyproc = &job->process.pty; - ptyproc->winsize = (struct winsize){height, width, 0, 0}; - ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize); + uv_signal_stop(&loop->children_watcher); } -static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL +static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { unsetenv("COLUMNS"); unsetenv("LINES"); @@ -179,8 +145,8 @@ static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL signal(SIGTERM, SIG_DFL); signal(SIGALRM, SIG_DFL); - setenv("TERM", job->opts.term_name ? job->opts.term_name : "ansi", 1); - execvp(job->opts.argv[0], job->opts.argv); + setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1); + execvp(ptyproc->process.argv[0], ptyproc->process.argv); fprintf(stderr, "execvp failed: %s\n", strerror(errno)); } @@ -239,3 +205,50 @@ static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL termios->c_cc[VMIN] = 1; termios->c_cc[VTIME] = 0; } + +static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe) + FUNC_ATTR_NONNULL_ALL +{ + int fd_dup = dup(fd); + if (fd_dup < 0) { + ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); + return false; + } + int uv_result = uv_pipe_open(pipe, fd_dup); + if (uv_result) { + ELOG("Failed to set pipe to descriptor %d: %s", + fd_dup, uv_strerror(uv_result)); + close(fd_dup); + return false; + } + return true; +} + +static void chld_handler(uv_signal_t *handle, int signum) +{ + int stat = 0; + int pid; + + do { + pid = waitpid(-1, &stat, WNOHANG); + } while (pid < 0 && errno == EINTR); + + if (pid <= 0) { + return; + } + + Loop *loop = handle->loop->data; + + kl_iter(WatcherPtr, loop->children, current) { + Process *proc = (*current)->data; + if (proc->pid == pid) { + if (WIFEXITED(stat)) { + proc->status = WEXITSTATUS(stat); + } else if (WIFSIGNALED(stat)) { + proc->status = WTERMSIG(stat); + } + proc->internal_exit_cb(proc); + break; + } + } +} diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h new file mode 100644 index 0000000000..a12b5489c5 --- /dev/null +++ b/src/nvim/event/pty_process.h @@ -0,0 +1,30 @@ +#ifndef NVIM_EVENT_PTY_PROCESS_H +#define NVIM_EVENT_PTY_PROCESS_H + +#include <sys/ioctl.h> + +#include "nvim/event/process.h" + +typedef struct pty_process { + Process process; + char *term_name; + uint16_t width, height; + struct winsize winsize; + int tty_fd; +} PtyProcess; + +static inline PtyProcess pty_process_init(void *data) +{ + PtyProcess rv; + rv.process = process_init(kProcessTypePty, data); + rv.term_name = NULL; + rv.width = 80; + rv.height = 24; + rv.tty_fd = -1; + return rv; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/pty_process.h.generated.h" +#endif +#endif // NVIM_EVENT_PTY_PROCESS_H diff --git a/src/nvim/event/uv_process.c b/src/nvim/event/uv_process.c new file mode 100644 index 0000000000..21c2fd1790 --- /dev/null +++ b/src/nvim/event/uv_process.c @@ -0,0 +1,77 @@ +#include <assert.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/uv_process.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/uv_process.c.generated.h" +#endif + +bool uv_process_spawn(UvProcess *uvproc) + FUNC_ATTR_NONNULL_ALL +{ + Process *proc = (Process *)uvproc; + uvproc->uvopts.file = proc->argv[0]; + uvproc->uvopts.args = proc->argv; + uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; + uvproc->uvopts.exit_cb = exit_cb; + uvproc->uvopts.cwd = NULL; + uvproc->uvopts.env = NULL; + uvproc->uvopts.stdio = uvproc->uvstdio; + uvproc->uvopts.stdio_count = 3; + uvproc->uvstdio[0].flags = UV_IGNORE; + uvproc->uvstdio[1].flags = UV_IGNORE; + uvproc->uvstdio[2].flags = UV_IGNORE; + uvproc->uv.data = proc; + + if (proc->in) { + uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe; + } + + if (proc->out) { + uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe; + } + + if (proc->err) { + uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe; + } + + int status; + if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) { + ELOG("uv_spawn failed: %s", uv_strerror(status)); + return false; + } + + proc->pid = uvproc->uv.pid; + return true; +} + +void uv_process_close(UvProcess *uvproc) + FUNC_ATTR_NONNULL_ARG(1) +{ + uv_close((uv_handle_t *)&uvproc->uv, close_cb); +} + +static void close_cb(uv_handle_t *handle) +{ + Process *proc = handle->data; + if (proc->internal_close_cb) { + proc->internal_close_cb(proc); + } +} + +static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) +{ + Process *proc = handle->data; + proc->status = (int)status; + proc->internal_exit_cb(proc); +} diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h new file mode 100644 index 0000000000..a17f1446b3 --- /dev/null +++ b/src/nvim/event/uv_process.h @@ -0,0 +1,25 @@ +#ifndef NVIM_EVENT_UV_PROCESS_H +#define NVIM_EVENT_UV_PROCESS_H + +#include <uv.h> + +#include "nvim/event/process.h" + +typedef struct uv_process { + Process process; + uv_process_t uv; + uv_process_options_t uvopts; + uv_stdio_container_t uvstdio[3]; +} UvProcess; + +static inline UvProcess uv_process_init(void *data) +{ + UvProcess rv; + rv.process = process_init(kProcessTypeUv, data); + return rv; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/uv_process.h.generated.h" +#endif +#endif // NVIM_EVENT_UV_PROCESS_H diff --git a/src/nvim/lib/klist.h b/src/nvim/lib/klist.h index 10d6846133..1280a927e8 100644 --- a/src/nvim/lib/klist.h +++ b/src/nvim/lib/klist.h @@ -136,6 +136,6 @@ // `break` statement is executed before the next iteration. #define kl_iter(name, kl, p) kl_iter_at(name, kl, p, NULL) #define kl_iter_at(name, kl, p, h) \ - for (kl1_##name *p = h ? h : kl->head; p != kl->tail; p = p->next) + for (kl1_##name **p = h ? h : &kl->head; *p != kl->tail; p = &(*p)->next) #endif diff --git a/src/nvim/main.c b/src/nvim/main.c index ca55469730..e2ae63e134 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -63,7 +63,7 @@ #include "nvim/os/time.h" #include "nvim/event/loop.h" #include "nvim/os/signal.h" -#include "nvim/os/job.h" +#include "nvim/event/process.h" #include "nvim/msgpack_rpc/defs.h" #include "nvim/msgpack_rpc/helpers.h" #include "nvim/msgpack_rpc/server.h" @@ -149,7 +149,6 @@ void event_init(void) // `event_poll` // Signals signal_init(); - job_init(); // finish mspgack-rpc initialization channel_init(); server_init(); @@ -165,7 +164,7 @@ void event_teardown(void) loop_process_all_events(&loop); input_stop(); channel_teardown(); - job_teardown(); + process_teardown(&loop); server_teardown(); signal_teardown(); terminal_teardown(); diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 577965e5ba..861614f147 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -10,11 +10,10 @@ #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/remote_ui.h" #include "nvim/event/loop.h" +#include "nvim/event/uv_process.h" #include "nvim/event/rstream.h" #include "nvim/event/wstream.h" #include "nvim/event/socket.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" #include "nvim/msgpack_rpc/helpers.h" #include "nvim/vim.h" #include "nvim/ascii.h" @@ -35,7 +34,7 @@ typedef enum { kChannelTypeSocket, - kChannelTypeJob, + kChannelTypeProc, kChannelTypeStdio } ChannelType; @@ -54,9 +53,14 @@ typedef struct { ChannelType type; msgpack_unpacker *unpacker; union { - Job *job; Stream stream; struct { + UvProcess uvproc; + Stream in; + Stream out; + Stream err; + } process; + struct { Stream in; Stream out; } std; @@ -110,34 +114,35 @@ void channel_teardown(void) }); } -/// Creates an API channel by starting a job and connecting to its +/// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process. [consumed] /// @return The channel id (> 0), on success. /// 0, on error. -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(kChannelTypeJob); - incref(channel); // job channels are only closed by the exit_cb - - int status; - JobOptions opts = JOB_OPTIONS_INIT; - opts.argv = argv; - opts.data = channel; - opts.stdout_cb = job_out; - opts.stderr_cb = job_err; - opts.exit_cb = job_exit; - channel->data.job = job_start(opts, &status); - - if (status <= 0) { - if (status == 0) { // Two decrefs needed if status == 0. - decref(channel); // Only one needed if status < 0, - } // because exit_cb will do the second one. +uint64_t channel_from_process(char **argv) +{ + Channel *channel = register_channel(kChannelTypeProc); + channel->data.process.uvproc = uv_process_init(channel); + Process *proc = &channel->data.process.uvproc.process; + proc->argv = argv; + proc->in = &channel->data.process.in; + proc->out = &channel->data.process.out; + proc->err = &channel->data.process.err; + proc->cb = process_exit; + if (!process_spawn(&loop, proc)) { + loop_poll_events(&loop, 0); decref(channel); return 0; } + incref(channel); // process channels are only closed by the exit_cb + wstream_init(proc->in, 0); + rstream_init(proc->out, 0); + rstream_start(proc->out, parse_msgpack); + rstream_init(proc->err, 0); + rstream_start(proc->err, forward_stderr); + return channel->id; } @@ -319,24 +324,17 @@ static void channel_from_stdio(void) wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(stream, buf, job_data(job), eof); -} - -static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", - ((Channel *)job_data(data))->id, buf); + ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); } } -static void job_exit(Job *job, int status, void *data) +static void process_exit(Process *proc, int status, void *data) { decref(data); } @@ -511,8 +509,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer) case kChannelTypeSocket: success = wstream_write(&channel->data.stream, buffer); break; - case kChannelTypeJob: - success = job_write(channel->data.job, buffer); + case kChannelTypeProc: + success = wstream_write(&channel->data.process.in, buffer); break; case kChannelTypeStdio: success = wstream_write(&channel->data.std.out, buffer); @@ -627,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/job and free the channel resources. +/// Close the channel streams/process and free the channel resources. static void close_channel(Channel *channel) { if (channel->closed) { @@ -640,9 +638,9 @@ static void close_channel(Channel *channel) case kChannelTypeSocket: stream_close(&channel->data.stream, close_cb); break; - case kChannelTypeJob: - if (channel->data.job) { - job_stop(channel->data.job); + case kChannelTypeProc: + if (!channel->data.process.uvproc.process.closed) { + process_stop(&channel->data.process.uvproc.process); } break; case kChannelTypeStdio: diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c deleted file mode 100644 index 71419cefca..0000000000 --- a/src/nvim/os/job.c +++ /dev/null @@ -1,444 +0,0 @@ -#include <stdint.h> -#include <stdbool.h> - -#include <uv.h> - -#include "nvim/event/loop.h" -#include "nvim/event/time.h" -#include "nvim/event/signal.h" -#include "nvim/event/rstream.h" -#include "nvim/event/wstream.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/job_private.h" -#include "nvim/os/pty_process.h" -#include "nvim/os/time.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#ifdef HAVE_SYS_WAIT_H -# include <sys/wait.h> -#endif - -// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit -// before we send SIGNAL to it -#define TERM_TIMEOUT 1000000000 -#define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define JOB_BUFFER_SIZE 0xFFFF - -#define close_job_stream(job, stream) \ - do { \ - if (!job->stream.closed) { \ - stream_close(&job->stream, on_##stream_close); \ - } \ - } while (0) - -#define close_job_in(job) close_job_stream(job, in) -#define close_job_out(job) close_job_stream(job, out) -#define close_job_err(job) close_job_stream(job, err) - -Job *table[MAX_RUNNING_JOBS] = {NULL}; -size_t stop_requests = 0; -TimeWatcher job_stop_timer; -SignalWatcher schld; - -// Some helpers shared in this module - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/job.c.generated.h" -#endif -// Callbacks for libuv - -/// Initializes job control resources -void job_init(void) -{ - uv_disable_stdio_inheritance(); - time_watcher_init(&loop, &job_stop_timer, NULL); - signal_watcher_init(&loop, &schld, NULL); - signal_watcher_start(&schld, chld_handler, SIGCHLD); -} - -/// Releases job control resources and terminates running jobs -void job_teardown(void) -{ - // Stop all jobs - for (int i = 0; i < MAX_RUNNING_JOBS; i++) { - Job *job; - if ((job = table[i]) != NULL) { - uv_kill(job->pid, SIGTERM); - job->term_sent = true; - job_stop(job); - } - } - - // Wait until all jobs are closed - LOOP_POLL_EVENTS_UNTIL(&loop, -1, !stop_requests); - signal_watcher_stop(&schld); - signal_watcher_close(&schld, NULL); - // Close the timer - time_watcher_close(&job_stop_timer, NULL); -} - -/// Tries to start a new job. -/// -/// @param[out] status The job id if the job started successfully, 0 if the job -/// table is full, -1 if the program could not be executed. -/// @return The job pointer if the job started successfully, NULL otherwise -Job *job_start(JobOptions opts, int *status) -{ - int i; - Job *job; - - // Search for a free slot in the table - for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if (table[i] == NULL) { - break; - } - } - - if (i == MAX_RUNNING_JOBS) { - // No free slots - shell_free_argv(opts.argv); - *status = 0; - return NULL; - } - - job = xmalloc(sizeof(Job)); - // Initialize - job->id = i + 1; - *status = job->id; - job->status = -1; - job->refcount = 1; - job->stopped_time = 0; - job->term_sent = false; - job->opts = opts; - job->closed = false; - job->in.closed = true; - job->out.closed = true; - job->err.closed = true; - - // Spawn the job - if (!process_spawn(job)) { - if (job->opts.writable) { - uv_close((uv_handle_t *)job->proc_stdin, NULL); - } - if (job->opts.stdout_cb) { - uv_close((uv_handle_t *)job->proc_stdout, NULL); - } - if (job->opts.stderr_cb) { - uv_close((uv_handle_t *)job->proc_stderr, NULL); - } - process_close(job); - loop_poll_events(&loop, 0); - *status = -1; - return NULL; - } - - if (opts.writable) { - job->refcount++; - wstream_init_stream(&job->in, job->proc_stdin, opts.maxmem, job); - } - - // Start the readable streams - if (opts.stdout_cb) { - job->refcount++; - rstream_init_stream(&job->out, job->proc_stdout, JOB_BUFFER_SIZE, job); - rstream_start(&job->out, read_cb); - } - - if (opts.stderr_cb) { - job->refcount++; - rstream_init_stream(&job->err, job->proc_stderr, JOB_BUFFER_SIZE, job); - rstream_start(&job->err, read_cb); - } - // Save the job to the table - table[i] = job; - - return job; -} - -/// Finds a job instance by id -/// -/// @param id The job id -/// @return the Job instance -Job *job_find(int id) -{ - Job *job; - - if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1]) - || job->stopped_time) { - return NULL; - } - - return job; -} - -/// Terminates a job. This is a non-blocking operation, but if the job exists -/// it's guaranteed to succeed(SIGKILL will eventually be sent) -/// -/// @param job The Job instance -void job_stop(Job *job) -{ - if (job->stopped_time) { - return; - } - - job->stopped_time = os_hrtime(); - if (job->opts.pty) { - // close all streams for pty jobs to send SIGHUP to the process - job_close_streams(job); - pty_process_close_master(job); - } else { - // Close the job's stdin. If the job doesn't close its own stdout/stderr, - // they will be closed when the job exits(possibly due to being terminated - // after a timeout) - job_close_in(job); - } - - if (!stop_requests++) { - // When there's at least one stop request pending, start a timer that - // will periodically check if a signal should be send to a to the job - DLOG("Starting job kill timer"); - time_watcher_start(&job_stop_timer, job_stop_timer_cb, 100, 100); - } -} - -/// job_wait - synchronously wait for a job to finish -/// -/// @param job The job instance -/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for -/// waiting until the job quits. -/// @return returns the status code of the exited job. -1 if the job is -/// still running and the `timeout` has expired. Note that this is -/// indistinguishable from the process returning -1 by itself. Which -/// is possible on some OS. Returns -2 if the job was interrupted. -int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL -{ - // The default status is -1, which represents a timeout - int status = -1; - bool interrupted = false; - - // Increase refcount to stop the job from being freed before we have a - // chance to get the status. - job->refcount++; - LOOP_POLL_EVENTS_UNTIL(&loop, ms, - // Until... - got_int || // interrupted by the user - job->refcount == 1); // job exited - - // we'll assume that a user frantically hitting interrupt doesn't like - // the current job. Signal that it has to be killed. - if (got_int) { - interrupted = true; - got_int = false; - job_stop(job); - if (ms == -1) { - // We can only return, if all streams/handles are closed and the job - // exited. - LOOP_POLL_EVENTS_UNTIL(&loop, -1, job->refcount == 1); - } else { - loop_poll_events(&loop, 0); - } - } - - if (job->refcount == 1) { - // Job exited, collect status and manually invoke close_cb to free the job - // resources - status = interrupted ? -2 : job->status; - job_close_streams(job); - job_decref(job); - } else { - job->refcount--; - } - - return status; -} - -/// Close the pipe used to write to the job. -/// -/// This can be used for example to indicate to the job process that no more -/// input is coming, and that it should shut down cleanly. -/// -/// It has no effect when the input pipe doesn't exist or was already -/// closed. -/// -/// @param job The job instance -void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL -{ - close_job_in(job); -} - -// Close the job stdout stream. -void job_close_out(Job *job) FUNC_ATTR_NONNULL_ALL -{ - close_job_out(job); -} - -// Close the job stderr stream. -void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL -{ - close_job_out(job); -} - -/// All writes that complete after calling this function will be reported -/// to `cb`. -/// -/// Use this function to be notified about the status of an in-flight write. -/// -/// @see {wstream_set_write_cb} -/// -/// @param job The job instance -/// @param cb The function that will be called on write completion or -/// failure. It will be called with the job as the `data` argument. -void job_write_cb(Job *job, stream_write_cb cb) FUNC_ATTR_NONNULL_ALL -{ - wstream_set_write_cb(&job->in, cb); -} - -/// Writes data to the job's stdin. This is a non-blocking operation, it -/// returns when the write request was sent. -/// -/// @param job The Job instance -/// @param buffer The buffer which contains the data to be written -/// @return true if the write request was successfully sent, false if writing -/// to the job stream failed (possibly because the OS buffer is full) -bool job_write(Job *job, WBuffer *buffer) -{ - return wstream_write(&job->in, buffer); -} - -/// Get the job id -/// -/// @param job A pointer to the job -/// @return The job id -int job_id(Job *job) -{ - return job->id; -} - -// Get the job pid -int job_pid(Job *job) -{ - return job->pid; -} - -/// Get data associated with a job -/// -/// @param job A pointer to the job -/// @return The job data -void *job_data(Job *job) -{ - return job->opts.data; -} - -/// Resize the window for a pty job -bool job_resize(Job *job, uint16_t width, uint16_t height) -{ - if (!job->opts.pty) { - return false; - } - pty_process_resize(job, width, height); - return true; -} - -void job_close_streams(Job *job) -{ - close_job_in(job); - close_job_out(job); - close_job_err(job); -} - -JobOptions *job_opts(Job *job) -{ - return &job->opts; -} - -/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those -/// that didn't die from SIGTERM after a while(exit_timeout is 0). -static void job_stop_timer_cb(TimeWatcher *watcher, void *data) -{ - Job *job; - uint64_t now = os_hrtime(); - - for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL || !job->stopped_time) { - continue; - } - - uint64_t elapsed = now - job->stopped_time; - - if (!job->term_sent && elapsed >= TERM_TIMEOUT) { - ILOG("Sending SIGTERM to job(id: %d)", job->id); - uv_kill(job->pid, SIGTERM); - job->term_sent = true; - } else if (elapsed >= KILL_TIMEOUT) { - ILOG("Sending SIGKILL to job(id: %d)", job->id); - uv_kill(job->pid, SIGKILL); - process_close(job); - } - } -} - -// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. -static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) -{ - Job *job = data; - - if (stream == &job->out) { - job->opts.stdout_cb(stream, buf, data, eof); - if (eof) { - close_job_out(job); - } - } else { - job->opts.stderr_cb(stream, buf, data, eof); - if (eof) { - close_job_err(job); - } - } -} - -static void on_stream_close(Stream *stream, void *data) -{ - job_decref(data); -} - -static void job_exited(Event event) -{ - Job *job = event.data; - process_close(job); -} - -static void chld_handler(SignalWatcher *watcher, int signum, void *data) -{ - int stat = 0; - int pid; - - do { - pid = waitpid(-1, &stat, WNOHANG); - } while (pid < 0 && errno == EINTR); - - if (pid <= 0) { - return; - } - - Job *job = NULL; - // find the job corresponding to the exited pid - for (int i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) != NULL && job->pid == pid) { - if (WIFEXITED(stat)) { - job->status = WEXITSTATUS(stat); - } else if (WIFSIGNALED(stat)) { - job->status = WTERMSIG(stat); - } - if (exiting) { - // don't enqueue more events when exiting - process_close(job); - } else { - loop_push_event(&loop, - (Event) {.handler = job_exited, .data = job}, false); - } - break; - } - } -} - diff --git a/src/nvim/os/job.h b/src/nvim/os/job.h deleted file mode 100644 index 2f8bf79a31..0000000000 --- a/src/nvim/os/job.h +++ /dev/null @@ -1,20 +0,0 @@ -// Job is a short name we use to refer to child processes that run in parallel -// with the editor, probably executing long-running tasks and sending updates -// asynchronously. Communication happens through anonymous pipes connected to -// the job's std{in,out,err}. They are more like bash/zsh co-processes than the -// usual shell background job. The name 'Job' was chosen because it applies to -// the concept while being significantly shorter. -#ifndef NVIM_OS_JOB_H -#define NVIM_OS_JOB_H - -#include <stdint.h> -#include <stdbool.h> - -#include "nvim/os/job_defs.h" -#include "nvim/event/rstream.h" -#include "nvim/event/wstream.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/job.h.generated.h" -#endif -#endif // NVIM_OS_JOB_H diff --git a/src/nvim/os/job_defs.h b/src/nvim/os/job_defs.h deleted file mode 100644 index ea7a326404..0000000000 --- a/src/nvim/os/job_defs.h +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef NVIM_OS_JOB_DEFS_H -#define NVIM_OS_JOB_DEFS_H - -#include <uv.h> - -#include "nvim/event/rstream.h" -#include "nvim/event/wstream.h" - -#define MAX_RUNNING_JOBS 100 -typedef struct job Job; - -/// Function called when the job reads data -/// -/// @param id The job id -/// @param data Some data associated with the job by the caller -typedef void (*job_exit_cb)(Job *job, int status, void *data); - -// Job startup options -// job_exit_cb Callback that will be invoked when the job exits -// maxmem Maximum amount of memory used by the job WStream -typedef struct { - // Argument vector for the process. The first item is the - // executable to run. - // [consumed] - char **argv; - // Caller data that will be associated with the job - void *data; - // If true the job stdin will be available for writing with job_write, - // otherwise it will be redirected to /dev/null - bool writable; - // Callback that will be invoked when data is available on stdout. If NULL - // stdout will be redirected to /dev/null. - stream_read_cb stdout_cb; - // Callback that will be invoked when data is available on stderr. If NULL - // stderr will be redirected to /dev/null. - stream_read_cb stderr_cb; - // Callback that will be invoked when the job has exited and will not send - // data - job_exit_cb exit_cb; - // Maximum memory used by the job's WStream - size_t maxmem; - // Connect the job to a pseudo terminal - bool pty; - // Initial window dimensions if the job is connected to a pseudo terminal - uint16_t width, height; - // Value for the $TERM environment variable. A default value of "ansi" is - // assumed if NULL - char *term_name; -} JobOptions; - -#define JOB_OPTIONS_INIT ((JobOptions) { \ - .argv = NULL, \ - .data = NULL, \ - .writable = true, \ - .stdout_cb = NULL, \ - .stderr_cb = NULL, \ - .exit_cb = NULL, \ - .maxmem = 0, \ - .pty = false, \ - .width = 80, \ - .height = 24, \ - .term_name = NULL \ - }) -#endif // NVIM_OS_JOB_DEFS_H diff --git a/src/nvim/os/job_private.h b/src/nvim/os/job_private.h deleted file mode 100644 index 6bdb24e6cd..0000000000 --- a/src/nvim/os/job_private.h +++ /dev/null @@ -1,101 +0,0 @@ -#ifndef NVIM_OS_JOB_PRIVATE_H -#define NVIM_OS_JOB_PRIVATE_H - -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/event/time.h" -#include "nvim/event/rstream.h" -#include "nvim/event/wstream.h" -#include "nvim/os/pipe_process.h" -#include "nvim/os/pty_process.h" -#include "nvim/os/shell.h" -#include "nvim/log.h" -#include "nvim/memory.h" - -struct job { - // Job id the index in the job table plus one. - int id; - // Process id - int pid; - // Exit status code of the job process - int status; - // Number of references to the job. The job resources will only be freed by - // close_cb when this is 0 - int refcount; - // Time when job_stop was called for the job. - uint64_t stopped_time; - // If SIGTERM was already sent to the job(only send one before SIGKILL) - bool term_sent; - // stdio streams(std{in,out,err}) - Stream in, out, err; - // Libuv streams representing stdin/stdout/stderr - uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr; - // Extra data set by the process spawner - union { - UvProcess uv; - PtyProcess pty; - } process; - // If process_close has been called on this job - bool closed; - // Startup options - JobOptions opts; -}; - -extern Job *table[]; -extern size_t stop_requests; -extern TimeWatcher job_stop_timer; - -static inline bool process_spawn(Job *job) -{ - return job->opts.pty ? pty_process_spawn(job) : pipe_process_spawn(job); -} - -static inline void process_close(Job *job) -{ - if (job->closed) { - return; - } - job->closed = true; - if (job->opts.pty) { - pty_process_close(job); - } else { - pipe_process_close(job); - } -} - -static inline void job_exit_callback(Job *job) -{ - // Free the slot now, 'exit_cb' may want to start another job to replace - // this one - table[job->id - 1] = NULL; - - if (job->opts.exit_cb) { - // Invoke the exit callback - job->opts.exit_cb(job, job->status, job->opts.data); - } - - if (stop_requests && !--stop_requests) { - // Stop the timer if no more stop requests are pending - DLOG("Stopping job kill timer"); - time_watcher_stop(&job_stop_timer); - } -} - -static inline void job_decref(Job *job) -{ - if (--job->refcount == 0) { - // Invoke the exit_cb - job_exit_callback(job); - // Free all memory allocated for the job - shell_free_argv(job->opts.argv); - if (job->opts.pty) { - xfree(job->opts.term_name); - } - xfree(job); - } -} - - -#endif // NVIM_OS_JOB_PRIVATE_H diff --git a/src/nvim/os/pipe_process.c b/src/nvim/os/pipe_process.c deleted file mode 100644 index 1160015c34..0000000000 --- a/src/nvim/os/pipe_process.c +++ /dev/null @@ -1,88 +0,0 @@ -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/job_private.h" -#include "nvim/os/pipe_process.h" -#include "nvim/memory.h" -#include "nvim/vim.h" -#include "nvim/globals.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pipe_process.c.generated.h" -#endif - -bool pipe_process_spawn(Job *job) -{ - UvProcess *pipeproc = &job->process.uv; - pipeproc->proc_opts.file = job->opts.argv[0]; - pipeproc->proc_opts.args = job->opts.argv; - pipeproc->proc_opts.stdio = pipeproc->stdio; - pipeproc->proc_opts.stdio_count = 3; - pipeproc->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; - pipeproc->proc_opts.exit_cb = exit_cb; - pipeproc->proc_opts.cwd = NULL; - pipeproc->proc_opts.env = NULL; - pipeproc->proc.data = NULL; - pipeproc->proc_stdin.data = NULL; - pipeproc->proc_stdout.data = NULL; - pipeproc->proc_stderr.data = NULL; - - // Initialize the job std{in,out,err} - pipeproc->stdio[0].flags = UV_IGNORE; - pipeproc->stdio[1].flags = UV_IGNORE; - pipeproc->stdio[2].flags = UV_IGNORE; - - pipeproc->proc.data = job; - - if (job->opts.writable) { - uv_pipe_init(&loop.uv, &pipeproc->proc_stdin, 0); - pipeproc->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - pipeproc->stdio[0].data.stream = (uv_stream_t *)&pipeproc->proc_stdin; - } - - if (job->opts.stdout_cb) { - uv_pipe_init(&loop.uv, &pipeproc->proc_stdout, 0); - pipeproc->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - pipeproc->stdio[1].data.stream = (uv_stream_t *)&pipeproc->proc_stdout; - } - - if (job->opts.stderr_cb) { - uv_pipe_init(&loop.uv, &pipeproc->proc_stderr, 0); - pipeproc->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - pipeproc->stdio[2].data.stream = (uv_stream_t *)&pipeproc->proc_stderr; - } - - job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin; - job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout; - job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr; - - if (uv_spawn(&loop.uv, &pipeproc->proc, &pipeproc->proc_opts) != 0) { - return false; - } - - job->pid = pipeproc->proc.pid; - return true; -} - -void pipe_process_close(Job *job) -{ - uv_close((uv_handle_t *)&job->process.uv.proc, close_cb); -} - -static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) -{ - Job *job = proc->data; - job->status = (int)status; - pipe_process_close(job); -} - -static void close_cb(uv_handle_t *handle) -{ - Job *job = handle->data; - job_close_streams(job); - job_decref(job); -} diff --git a/src/nvim/os/pipe_process.h b/src/nvim/os/pipe_process.h deleted file mode 100644 index 65e5cfa78f..0000000000 --- a/src/nvim/os/pipe_process.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef NVIM_OS_PIPE_PROCESS_H -#define NVIM_OS_PIPE_PROCESS_H - -#include <uv.h> - -typedef struct { - // Structures for process spawning/management used by libuv - uv_process_t proc; - uv_process_options_t proc_opts; - uv_stdio_container_t stdio[3]; - uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -} UvProcess; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pipe_process.h.generated.h" -#endif -#endif // NVIM_OS_PIPE_PROCESS_H diff --git a/src/nvim/os/pty_process.h b/src/nvim/os/pty_process.h deleted file mode 100644 index b5a2eba8b3..0000000000 --- a/src/nvim/os/pty_process.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef NVIM_OS_PTY_PROCESS_H -#define NVIM_OS_PTY_PROCESS_H - -#include <sys/ioctl.h> - -#include <uv.h> - -typedef struct { - struct winsize winsize; - uv_pipe_t proc_stdin, proc_stdout, proc_stderr; - int tty_fd; -} PtyProcess; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pty_process.h.generated.h" -#endif -#endif // NVIM_OS_PTY_PROCESS_H diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 04ac9f1c03..e0d67d4951 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -9,7 +9,7 @@ #include "nvim/lib/kvec.h" #include "nvim/log.h" #include "nvim/event/loop.h" -#include "nvim/os/job.h" +#include "nvim/event/uv_process.h" #include "nvim/event/rstream.h" #include "nvim/os/shell.h" #include "nvim/os/signal.h" @@ -204,17 +204,15 @@ static int do_os_system(char **argv, char prog[MAXPATHL]; xstrlcpy(prog, argv[0], MAXPATHL); - int status; - JobOptions opts = JOB_OPTIONS_INIT; - opts.argv = argv; - opts.data = &buf; - opts.writable = input != NULL; - opts.stdout_cb = data_cb; - opts.stderr_cb = data_cb; - opts.exit_cb = NULL; - Job *job = job_start(opts, &status); - - if (status <= 0) { + Stream in, out, err; + UvProcess uvproc = uv_process_init(&buf); + Process *proc = &uvproc.process; + proc->argv = argv; + proc->in = input != NULL ? &in : NULL; + proc->out = &out; + proc->err = &err; + if (!process_spawn(&loop, proc)) { + loop_poll_events(&loop, 0); // Failed, probably due to `sh` not being executable if (!silent) { MSG_PUTS(_("\nCannot execute ")); @@ -224,28 +222,32 @@ static int do_os_system(char **argv, return -1; } + if (input != NULL) { + wstream_init(proc->in, 0); + } + rstream_init(proc->out, 0); + rstream_start(proc->out, data_cb); + rstream_init(proc->err, 0); + rstream_start(proc->err, data_cb); + // write the input, if any if (input) { WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL); - if (!job_write(job, input_buffer)) { - // couldn't write, stop the job and tell the user about it - job_stop(job); + if (!wstream_write(&in, input_buffer)) { + // couldn't write, stop the process and tell the user about it + process_stop(proc); return -1; } // close the input stream after everything is written - job_write_cb(job, shell_write_cb); - } else { - // close the input stream, let the process know that no more input is - // coming - job_close_in(job); + wstream_set_write_cb(&in, shell_write_cb); } // invoke busy_start here so event_poll_until wont change the busy state for // the UI ui_busy_start(); ui_flush(); - status = job_wait(job, -1); + int status = process_wait(proc, -1); ui_busy_stop(); // prepare the out parameters if requested @@ -285,8 +287,7 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired) static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) { - Job *job = data; - DynamicBuffer *dbuf = job_data(job); + DynamicBuffer *dbuf = data; size_t nread = buf->size; dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1); @@ -472,6 +473,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer, static void shell_write_cb(Stream *stream, void *data, int status) { - Job *job = data; - job_close_in(job); + stream_close(stream, NULL); } diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c index f568f5a7f1..122b3a171d 100644 --- a/src/nvim/os_unix.c +++ b/src/nvim/os_unix.c @@ -50,7 +50,6 @@ #include "nvim/os/input.h" #include "nvim/os/shell.h" #include "nvim/os/signal.h" -#include "nvim/os/job.h" #include "nvim/msgpack_rpc/helpers.h" #ifdef HAVE_STROPTS_H |