diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/eval.c | 118 | ||||
-rw-r--r-- | src/nvim/event/process.c | 12 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 17 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 4 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 9 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 3 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 17 | ||||
-rw-r--r-- | src/nvim/globals.h | 3 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 92 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 1 | ||||
-rw-r--r-- | src/nvim/os/input.c | 6 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 8 | ||||
-rw-r--r-- | src/nvim/tui/input.c | 12 |
13 files changed, 158 insertions, 144 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index d936c9572a..dce24230b0 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -408,6 +408,7 @@ typedef struct { Terminal *term; bool stopped; bool exited; + bool rpc; int refcount; ufunc_T *on_stdout, *on_stderr, *on_exit; dict_T *self; @@ -448,8 +449,7 @@ typedef struct { #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -static uint64_t current_job_id = 1; -static PMap(uint64_t) *jobs = NULL; +static PMap(uint64_t) *jobs = NULL; static uint64_t last_timer_id = 0; static PMap(uint64_t) *timers = NULL; @@ -11724,16 +11724,35 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv) if (argvars[1].v_type == VAR_STRING) { char *stream = (char *)argvars[1].vval.v_string; if (!strcmp(stream, "stdin")) { - process_close_in(proc); + if (data->rpc) { + EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); + } else { + process_close_in(proc); + } } else if (!strcmp(stream, "stdout")) { - process_close_out(proc); + if (data->rpc) { + EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); + } else { + process_close_out(proc); + } } else if (!strcmp(stream, "stderr")) { process_close_err(proc); + } else if (!strcmp(stream, "rpc")) { + if (data->rpc) { + channel_close(data->id); + } else { + EMSG(_("Invalid job stream: Not an rpc job")); + } } else { EMSG2(_("Invalid job stream \"%s\""), stream); } } else { - process_close_streams(proc); + if (data->rpc) { + channel_close(data->id); + process_close_err(proc); + } else { + process_close_streams(proc); + } } } @@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv) return; } + if (data->rpc) { + EMSG(_("Can't send raw data to rpc channel")); + return; + } + ssize_t input_len; char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false); if (!input) { @@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv) return; } + dict_T *job_opts = NULL; + bool detach = false, rpc = false, pty = false; ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL; char *cwd = NULL; if (argvars[1].v_type == VAR_DICT) { job_opts = argvars[1].vval.v_dict; + detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0; + rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0; + pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0; + if (pty && rpc) { + EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set"); + shell_free_argv(argv); + return; + } + char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false); if (new_cwd && strlen(new_cwd) > 0) { cwd = new_cwd; @@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv) } } - bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0; - bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0; TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, - job_opts, pty, detach, cwd); + job_opts, pty, rpc, detach, cwd); Process *proc = (Process *)&data->proc; if (pty) { @@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv) } } - if (!on_stdout) { + if (!rpc && !on_stdout) { proc->out = NULL; } if (!on_stderr) { @@ -14105,7 +14138,7 @@ end: api_free_object(result); } -// "rpcstart()" function +// "rpcstart()" function (DEPRECATED) static void f_rpcstart(typval_T *argvars, typval_T *rettv) { rettv->v_type = VAR_NUMBER; @@ -14158,32 +14191,27 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv) // The last item of argv must be NULL argv[i] = NULL; - uint64_t channel_id = channel_from_process(argv); - if (!channel_id) { - EMSG(_(e_api_spawn_failed)); - } - - rettv->vval.v_number = (varnumber_T)channel_id; + TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL, + NULL, false, true, false, NULL); + common_job_start(data, rettv); } // "rpcstop()" function static void f_rpcstop(typval_T *argvars, typval_T *rettv) { - rettv->v_type = VAR_NUMBER; - rettv->vval.v_number = 0; - - if (check_restricted() || check_secure()) { - return; - } - if (argvars[0].v_type != VAR_NUMBER) { // Wrong argument types EMSG(_(e_invarg)); return; } - rettv->vval.v_number = channel_close(argvars[0].vval.v_number); + // if called with a job, stop it, else closes the channel + if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { + f_jobstop(argvars, rettv); + } else { + rettv->vval.v_number = channel_close(argvars[0].vval.v_number); + } } /* @@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv) } TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, - job_opts, true, false, cwd); + job_opts, true, false, false, cwd); data->proc.pty.width = curwin->w_width; data->proc.pty.height = curwin->w_height; data->proc.pty.term_name = xstrdup("xterm-256color"); @@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_exit, dict_T *self, bool pty, + bool rpc, bool detach, char *cwd) { @@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv, data->on_exit = on_exit; data->self = self; data->events = queue_new_child(main_loop.events); + data->rpc = rpc; if (pty) { data->proc.pty = pty_process_init(&main_loop, data); } else { @@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv, return data; } -/// Return true/false on success/failure. +/// common code for getting job callbacks for jobstart, termopen and rpcstart +/// +/// @return true/false on success/failure. static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout, ufunc_T **on_stderr, ufunc_T **on_exit) { @@ -22174,15 +22206,22 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) } xfree(cmd); - data->id = current_job_id++; - wstream_init(proc->in, 0); - if (proc->out) { - rstream_init(proc->out, 0); - rstream_start(proc->out, on_job_stdout); + data->id = next_chan_id++; + + if (data->rpc) { + // the rpc channel takes over the in and out streams + channel_from_process(proc, data->id); + } else { + wstream_init(proc->in, 0); + if (proc->out) { + rstream_init(proc->out, 0); + rstream_start(proc->out, on_job_stdout, data); + } } + if (proc->err) { rstream_init(proc->err, 0); - rstream_start(proc->err, on_job_stderr); + rstream_start(proc->err, on_job_stderr, data); } pmap_put(uint64_t)(jobs, data->id, data); rettv->vval.v_number = data->id; @@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d) snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); terminal_close(data->term, msg); } + if (data->rpc) { + channel_process_exit(data->id, status); + } if (data->status_ptr) { *data->status_ptr = status; } process_job_event(data, data->on_exit, "exit", NULL, 0, status); + + pmap_del(uint64_t)(jobs, data->id); + term_job_data_decref(data); } static void term_write(char *buf, size_t size, void *d) @@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data) static void on_job_event(JobEvent *ev) { if (!ev->callback) { - goto end; + return; } typval_T argv[3]; @@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev) call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum, curwin->w_cursor.lnum, ev->data->self); clear_tv(&rettv); - -end: - if (!ev->received) { - // exit event, safe to free job data now - pmap_del(uint64_t)(jobs, ev->data->id); - term_job_data_decref(ev->data); - } } static TerminalJobData *find_job(uint64_t id) diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 317e40e43a..f507e3d71d 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -25,7 +25,7 @@ #define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ - stream_close(proc->stream, NULL); \ + stream_close(proc->stream, NULL, NULL); \ } \ } while (0) @@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL return false; } - void *data = proc->data; - if (proc->in) { - stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe); proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; @@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->out) { - stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe); proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; @@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->err) { - stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe); proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; @@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream) if (stream->read_cb) { // Stream callback could miss EOF handling if a child keeps the stream // open. - stream->read_cb(stream, stream->buffer, 0, stream->data, true); + stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); } break; } diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index a520143064..5126dfd84e 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -17,21 +17,19 @@ # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, - void *data) +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(loop, stream, fd, NULL, data); + stream_init(loop, stream, fd, NULL); rstream_init(stream, bufsize); } -void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, - void *data) +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(NULL, stream, -1, uvstream, data); + stream_init(NULL, stream, -1, uvstream); rstream_init(stream, bufsize); } @@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize) /// Starts watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_start(Stream *stream, stream_read_cb cb) +void rstream_start(Stream *stream, stream_read_cb cb, void *data) FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; + stream->cb_data = data; if (stream->uvstream) { uv_read_start(stream->uvstream, alloc_cb, read_cb); } else { @@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) { Stream *stream = data; assert(stream->read_cb); - rstream_start(stream, stream->read_cb); + rstream_start(stream, stream->read_cb, stream->cb_data); } // Callbacks used by libuv @@ -179,7 +178,7 @@ static void read_event(void **argv) if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; - stream->read_cb(stream, stream->buffer, count, stream->data, eof); + stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); } stream->pending_reqs--; if (stream->closed && !stream->pending_reqs) { diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index cdaf40849b..8f9327f3d4 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) return 0; } -int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) +int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; @@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) return result; } - stream_init(NULL, stream, -1, client, data); + stream_init(NULL, stream, -1, client); return 0; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 33404158cf..26083c20f4 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking) return retval; } -void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, - void *data) +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(2) { stream->uvstream = uvstream; @@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->uvstream->data = stream; } - stream->data = data; stream->internal_data = NULL; stream->fpos = 0; stream->curmem = 0; @@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->num_bytes = 0; } -void stream_close(Stream *stream, stream_close_cb on_stream_close) +void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); stream->closed = true; stream->close_cb = on_stream_close; + stream->close_cb_data = data; if (!stream->pending_reqs) { stream_close_handle(stream); @@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle) rbuffer_free(stream->buffer); } if (stream->close_cb) { - stream->close_cb(stream, stream->data); + stream->close_cb(stream, stream->close_cb_data); } if (stream->internal_close_cb) { stream->internal_close_cb(stream, stream->internal_data); diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index ad4e24775b..a176fac1c0 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -44,13 +44,14 @@ struct stream { uv_file fd; stream_read_cb read_cb; stream_write_cb write_cb; + void *cb_data; stream_close_cb close_cb, internal_close_cb; + void *close_cb_data, *internal_data; size_t fpos; size_t curmem; size_t maxmem; size_t pending_reqs; size_t num_bytes; - void *data, *internal_data; bool closed; Queue *events; }; diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 8028e35e6b..fc7aad8eb9 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -22,19 +22,17 @@ typedef struct { # include "event/wstream.c.generated.h" #endif -void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, - void *data) +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(loop, stream, fd, NULL, data); + stream_init(loop, stream, fd, NULL); wstream_init(stream, maxmem); } -void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, - void *data) +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(NULL, stream, -1, uvstream, data); + stream_init(NULL, stream, -1, uvstream); wstream_init(stream, maxmem); } @@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem) /// /// @param stream The `Stream` instance /// @param cb The callback -void wstream_set_write_cb(Stream *stream, stream_write_cb cb) - FUNC_ATTR_NONNULL_ALL +void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data) + FUNC_ATTR_NONNULL_ARG(1, 2) { stream->write_cb = cb; + stream->cb_data = data; } /// Queues data for writing to the backing file descriptor of a `Stream` @@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status) wstream_release_wbuffer(data->buffer); if (data->stream->write_cb) { - data->stream->write_cb(data->stream, data->stream->data, status); + data->stream->write_cb(data->stream, data->stream->cb_data, status); } data->stream->pending_reqs--; diff --git a/src/nvim/globals.h b/src/nvim/globals.h index 950ceb4c74..4c014010c2 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -1244,6 +1244,9 @@ EXTERN char *ignoredp; // If a msgpack-rpc channel should be started over stdin/stdout EXTERN bool embedded_mode INIT(= false); +/// next free id for a job or rpc channel +EXTERN uint64_t next_chan_id INIT(= 1); + /// Used to track the status of external functions. /// Currently only used for iconv(). typedef enum { diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5b249ee1c7..8b5f212d66 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -19,6 +19,7 @@ #include "nvim/main.h" #include "nvim/ascii.h" #include "nvim/memory.h" +#include "nvim/eval.h" #include "nvim/os_unix.h" #include "nvim/message.h" #include "nvim/map.h" @@ -55,12 +56,7 @@ typedef struct { msgpack_unpacker *unpacker; union { Stream stream; - struct { - LibuvProcess uvproc; - Stream in; - Stream out; - Stream err; - } process; + Process *proc; struct { Stream in; Stream out; @@ -79,7 +75,6 @@ typedef struct { uint64_t request_id; } RequestEvent; -static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -112,33 +107,20 @@ void channel_teardown(void) } /// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. +/// stdin/stdout. stderr is handled by the job infrastructure. /// /// @param argv The argument vector for the process. [consumed] /// @return The channel id (> 0), on success. /// 0, on error. -uint64_t channel_from_process(char **argv) -{ - Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = libuv_process_init(&main_loop, channel); - Process *proc = &channel->data.process.uvproc.process; - proc->argv = argv; - proc->in = &channel->data.process.in; - proc->out = &channel->data.process.out; - proc->err = &channel->data.process.err; - proc->cb = process_exit; - if (!process_spawn(proc)) { - loop_poll_events(&main_loop, 0); - decref(channel); - return 0; - } - +uint64_t channel_from_process(Process *proc, uint64_t id) +{ + Channel *channel = register_channel(kChannelTypeProc, id, proc->events); incref(channel); // process channels are only closed by the exit_cb + channel->data.proc = proc; + wstream_init(proc->in, 0); rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack); - rstream_init(proc->err, 0); - rstream_start(proc->err, forward_stderr); + rstream_start(proc->out, parse_msgpack, channel); return channel->id; } @@ -148,14 +130,14 @@ uint64_t channel_from_process(char **argv) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket); - socket_watcher_accept(watcher, &channel->data.stream, channel); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); + socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; channel->data.stream.internal_data = channel; wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack); + rstream_start(&channel->data.stream, parse_msgpack, channel); } /// Sends event/arguments to channel @@ -314,30 +296,21 @@ bool channel_close(uint64_t id) /// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio); + Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); incref(channel); // stdio channels are only closed on exit // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, - channel); - rstream_start(&channel->data.std.in, parse_msgpack); + rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.std.in, parse_msgpack, channel); // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL); + wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); } -static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, - void *data, bool eof) +void channel_process_exit(uint64_t id, int status) { - while (rbuffer_size(rbuf)) { - char buf[256]; - size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); - buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); - } -} + Channel *channel = pmap_get(uint64_t)(channels, id); -static void process_exit(Process *proc, int status, void *data) -{ - decref(data); + channel->closed = true; + decref(channel); } static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, @@ -512,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) success = wstream_write(&channel->data.stream, buffer); break; case kChannelTypeProc: - success = wstream_write(&channel->data.process.in, buffer); + success = wstream_write(channel->data.proc->in, buffer); break; case kChannelTypeStdio: success = wstream_write(&channel->data.std.out, buffer); @@ -637,16 +610,17 @@ static void close_channel(Channel *channel) switch (channel->type) { case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL); + stream_close(&channel->data.stream, NULL, NULL); break; case kChannelTypeProc: - if (!channel->data.process.uvproc.process.closed) { - process_stop(&channel->data.process.uvproc.process); - } + // Only close the rpc channel part, + // there could be an error message on the stderr stream + process_close_in(channel->data.proc); + process_close_out(channel->data.proc); break; case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL); - stream_close(&channel->data.std.out, NULL); + stream_close(&channel->data.std.in, NULL, NULL); + stream_close(&channel->data.std.out, NULL, NULL); queue_put(main_loop.fast_events, exit_event, 1, channel); return; default: @@ -680,7 +654,9 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); - queue_free(channel->events); + if (channel->type != kChannelTypeProc) { + queue_free(channel->events); + } xfree(channel); } @@ -689,15 +665,15 @@ static void close_cb(Stream *stream, void *data) decref(data); } -static Channel *register_channel(ChannelType type) +static Channel *register_channel(ChannelType type, uint64_t id, Queue *events) { Channel *rv = xmalloc(sizeof(Channel)); - rv->events = queue_new_child(main_loop.events); + rv->events = events ? events : queue_new_child(main_loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = next_id++; + rv->id = id > 0 ? id : next_chan_id++; rv->pending_requests = 0; rv->subscribed_events = pmap_new(cstr_t)(); rv->next_request_id = 1; diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index 104547a7b8..0d92976d02 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -6,6 +6,7 @@ #include "nvim/api/private/defs.h" #include "nvim/event/socket.h" +#include "nvim/event/process.h" #include "nvim/vim.h" #define METHOD_MAXLEN 512 diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index a4e01b18cd..c0c73364c0 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -60,8 +60,8 @@ void input_start(int fd) } global_fd = fd; - rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL); - rstream_start(&read_stream, read_cb); + rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE); + rstream_start(&read_stream, read_cb, NULL); } void input_stop(void) @@ -71,7 +71,7 @@ void input_stop(void) } rstream_stop(&read_stream); - stream_close(&read_stream, NULL); + stream_close(&read_stream, NULL, NULL); } static void cursorhold_event(void **argv) diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 64c673930a..ba52b9f661 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -236,10 +236,10 @@ static int do_os_system(char **argv, } proc->out->events = NULL; rstream_init(proc->out, 0); - rstream_start(proc->out, data_cb); + rstream_start(proc->out, data_cb, &buf); proc->err->events = NULL; rstream_init(proc->err, 0); - rstream_start(proc->err, data_cb); + rstream_start(proc->err, data_cb, &buf); // write the input, if any if (input) { @@ -251,7 +251,7 @@ static int do_os_system(char **argv, return -1; } // close the input stream after everything is written - wstream_set_write_cb(&in, shell_write_cb); + wstream_set_write_cb(&in, shell_write_cb, NULL); } // invoke busy_start here so event_poll_until wont change the busy state for @@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer, static void shell_write_cb(Stream *stream, void *data, int status) { - stream_close(stream, NULL); + stream_close(stream, NULL, NULL); } diff --git a/src/nvim/tui/input.c b/src/nvim/tui/input.c index be256f3ebc..3ef4d34c9a 100644 --- a/src/nvim/tui/input.c +++ b/src/nvim/tui/input.c @@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop) int curflags = termkey_get_canonflags(input->tk); termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS); // setup input handle - rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input); + rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff); // initialize a timer handle for handling ESC with libtermkey time_watcher_init(loop, &input->timer_handle, input); } @@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input) uv_mutex_destroy(&input->key_buffer_mutex); uv_cond_destroy(&input->key_buffer_cond); time_watcher_close(&input->timer_handle, NULL); - stream_close(&input->read_stream, NULL); + stream_close(&input->read_stream, NULL, NULL); termkey_destroy(input->tk); } void term_input_start(TermInput *input) { - rstream_start(&input->read_stream, read_cb); + rstream_start(&input->read_stream, read_cb, input); } void term_input_stop(TermInput *input) @@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, // // ls *.md | xargs nvim input->in_fd = 2; - stream_close(&input->read_stream, NULL); + stream_close(&input->read_stream, NULL, NULL); queue_put(input->loop->fast_events, restart_reading, 1, input); } else { loop_schedule(&main_loop, event_create(1, input_done_event, 0)); @@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, static void restart_reading(void **argv) { TermInput *input = argv[0]; - rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input); - rstream_start(&input->read_stream, read_cb); + rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff); + rstream_start(&input->read_stream, read_cb, input); } |