diff options
author | Björn Linse <bjorn.linse@gmail.com> | 2017-06-05 08:29:10 +0200 |
---|---|---|
committer | Björn Linse <bjorn.linse@gmail.com> | 2017-11-24 14:54:15 +0100 |
commit | 1ebc96fe10fbdbec22caa26d5d52a9f095da9687 (patch) | |
tree | f0a9170522cf2836cca741690cddf2cf8051ac4c | |
parent | 5215e3205a07b85e4e4cf1f8a8ca6be2b9556459 (diff) | |
download | rneovim-1ebc96fe10fbdbec22caa26d5d52a9f095da9687.tar.gz rneovim-1ebc96fe10fbdbec22caa26d5d52a9f095da9687.tar.bz2 rneovim-1ebc96fe10fbdbec22caa26d5d52a9f095da9687.zip |
channels: allow bytes sockets and stdio, and buffered bytes output
-rw-r--r-- | src/nvim/channel.c | 417 | ||||
-rw-r--r-- | src/nvim/channel.h | 36 | ||||
-rw-r--r-- | src/nvim/eval.c | 359 | ||||
-rw-r--r-- | src/nvim/eval.h | 3 | ||||
-rw-r--r-- | src/nvim/eval.lua | 1 | ||||
-rw-r--r-- | src/nvim/eval/typval.c | 11 | ||||
-rw-r--r-- | src/nvim/event/process.c | 11 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 28 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 6 | ||||
-rw-r--r-- | src/nvim/globals.h | 8 | ||||
-rw-r--r-- | src/nvim/main.c | 19 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 87 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 7 | ||||
-rw-r--r-- | src/nvim/rbuffer.c | 2 |
14 files changed, 574 insertions, 421 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c index e61ec9c19b..5ee4e5be09 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -3,10 +3,25 @@ #include "nvim/api/ui.h" #include "nvim/channel.h" +#include "nvim/eval.h" +#include "nvim/event/socket.h" #include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/os/shell.h" +#include "nvim/path.h" +#include "nvim/ascii.h" +static bool did_stdio = false; PMap(uint64_t) *channels = NULL; +typedef struct { + Channel *data; + Callback *callback; + const char *type; + list_T *received; + int status; +} ChannelEvent; + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "channel.c.generated.h" #endif @@ -32,6 +47,21 @@ void channel_init(void) remote_ui_init(); } +/// Allocates a channel. +/// +/// Channel is allocated with refcount 1, which should be decreased +/// when the underlying stream closes. +static Channel *channel_alloc(ChannelStreamType type) +{ + Channel *chan = xcalloc(1, sizeof(*chan)); + chan->id = type == kChannelStreamStdio ? 1 : next_chan_id++; + chan->events = multiqueue_new_child(main_loop.events); + chan->refcount = 1; + chan->streamtype = type; + pmap_put(uint64_t)(channels, chan->id, chan); + return chan; +} + void channel_incref(Channel *channel) { channel->refcount++; @@ -44,6 +74,21 @@ void channel_decref(Channel *channel) } } +void callback_reader_free(CallbackReader *reader) +{ + callback_free(&reader->cb); + if (reader->buffered) { + ga_clear(&reader->buffer); + } +} + +void callback_reader_start(CallbackReader *reader) +{ + if (reader->buffered) { + ga_init(&reader->buffer, sizeof(char *), 1); + } +} + static void free_channel_event(void **argv) { Channel *channel = argv[0]; @@ -51,11 +96,379 @@ static void free_channel_event(void **argv) rpc_free(channel); } - callback_free(&channel->on_stdout); - callback_free(&channel->on_stderr); + callback_reader_free(&channel->on_stdout); + callback_reader_free(&channel->on_stderr); callback_free(&channel->on_exit); pmap_del(uint64_t)(channels, channel->id); multiqueue_free(channel->events); xfree(channel); } + +static void channel_destroy_early(Channel *chan) +{ + if ((chan->id != --next_chan_id)) { + abort(); + } + + if ((--chan->refcount != 0)) { + abort(); + } + + free_channel_event((void **)&chan); +} + + +static void close_cb(Stream *stream, void *data) +{ + channel_decref(data); +} + +Channel *channel_job_start(char **argv, CallbackReader on_stdout, + CallbackReader on_stderr, Callback on_exit, + bool pty, bool rpc, bool detach, const char *cwd, + uint16_t pty_width, uint16_t pty_height, + char *term_name, varnumber_T *status_out) +{ + Channel *chan = channel_alloc(kChannelStreamProc); + chan->on_stdout = on_stdout; + chan->on_stderr = on_stderr; + chan->on_exit = on_exit; + chan->is_rpc = rpc; + + if (pty) { + if (detach) { + EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); + shell_free_argv(argv); + xfree(term_name); + channel_destroy_early(chan); + *status_out = 0; + return NULL; + } + chan->stream.pty = pty_process_init(&main_loop, chan); + if (pty_width > 0) { + chan->stream.pty.width = pty_width; + } + if (pty_height > 0) { + chan->stream.pty.height = pty_height; + } + if (term_name) { + chan->stream.pty.term_name = term_name; + } + } else { + chan->stream.uv = libuv_process_init(&main_loop, chan); + } + + Process *proc = (Process *)&chan->stream.proc; + proc->argv = argv; + proc->cb = channel_process_exit_cb; + proc->events = chan->events; + proc->detach = detach; + proc->cwd = cwd; + + char *cmd = xstrdup(proc->argv[0]); + bool has_out, has_err; + if (proc->type == kProcessTypePty) { + has_out = true; + has_err = false; + } else { + has_out = chan->is_rpc || callback_reader_set(chan->on_stdout); + has_err = callback_reader_set(chan->on_stderr); + } + int status = process_spawn(proc, true, has_out, has_err); + if (has_err) { + proc->err.events = chan->events; + } + if (status) { + EMSG3(_(e_jobspawn), os_strerror(status), cmd); + xfree(cmd); + if (proc->type == kProcessTypePty) { + xfree(chan->stream.pty.term_name); + } + channel_destroy_early(chan); + *status_out = proc->status; + return NULL; + } + xfree(cmd); + + wstream_init(&proc->in, 0); + if (has_out) { + rstream_init(&proc->out, 0); + } + + if (chan->is_rpc) { + // the rpc takes over the in and out streams + rpc_start(chan); + } else { + proc->in.events = chan->events; + if (has_out) { + callback_reader_start(&chan->on_stdout); + proc->out.events = chan->events; + rstream_start(&proc->out, on_job_stdout, chan); + } + } + + if (has_err) { + callback_reader_start(&chan->on_stderr); + rstream_init(&proc->err, 0); + rstream_start(&proc->err, on_job_stderr, chan); + } + *status_out = (varnumber_T)chan->id; + return chan; +} + + +uint64_t channel_connect(bool tcp, const char *address, + bool rpc, CallbackReader on_output, + int timeout, const char **error) +{ + if (!tcp && rpc) { + char *path = fix_fname(address); + if (server_owns_pipe_address(path)) { + // avoid deadlock + xfree(path); + return channel_create_internal_rpc(); + } + xfree(path); + } + + Channel *channel = channel_alloc(kChannelStreamSocket); + if (!socket_connect(&main_loop, &channel->stream.socket, + tcp, address, timeout, error)) { + channel_destroy_early(channel); + return 0; + } + + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + wstream_init(&channel->stream.socket, 0); + rstream_init(&channel->stream.socket, 0); + + if (rpc) { + rpc_start(channel); + } else { + channel->on_stdout = on_output; + callback_reader_start(&channel->on_stdout); + channel->stream.socket.events = channel->events; + rstream_start(&channel->stream.socket, on_socket_output, channel); + } + + return channel->id; +} + +/// Creates an RPC channel from a tcp/pipe socket connection +/// +/// @param watcher The SocketWatcher ready to accept the connection +void channel_from_connection(SocketWatcher *watcher) +{ + Channel *channel = channel_alloc(kChannelStreamSocket); + socket_watcher_accept(watcher, &channel->stream.socket); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + wstream_init(&channel->stream.socket, 0); + rstream_init(&channel->stream.socket, 0); + rpc_start(channel); +} + +/// Creates a loopback channel. This is used to avoid deadlock +/// when an instance connects to its own named pipe. +static uint64_t channel_create_internal_rpc(void) +{ + Channel *channel = channel_alloc(kChannelStreamInternal); + channel_incref(channel); // internal channel lives until process exit + rpc_start(channel); + return channel->id; +} + +/// Creates an API channel from stdin/stdout. This is used when embedding +/// Neovim +uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, + const char **error) + FUNC_ATTR_NONNULL_ALL +{ + if (!headless_mode) { + *error = _("can only be opened in headless mode"); + return 0; + } + + if (did_stdio) { + *error = _("channel was already open"); + return 0; + } + did_stdio = true; + + Channel *channel = channel_alloc(kChannelStreamStdio); + + rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0); + wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); + + if (rpc) { + rpc_start(channel); + } else { + channel->on_stdout = on_output; + callback_reader_start(&channel->on_stdout); + channel->stream.stdio.in.events = channel->events; + channel->stream.stdio.out.events = channel->events; + rstream_start(&channel->stream.stdio.in, on_stdio_input, channel); + } + + return channel->id; +} + +// vimscript job callbacks must be executed on Nvim main loop +static inline void process_channel_event(Channel *chan, Callback *callback, + const char *type, char *buf, + size_t count, int status) +{ + ChannelEvent event_data; + event_data.received = NULL; + if (buf) { + event_data.received = tv_list_alloc(); + char *ptr = buf; + size_t remaining = count; + size_t off = 0; + + while (off < remaining) { + // append the line + if (ptr[off] == NL) { + tv_list_append_string(event_data.received, ptr, (ssize_t)off); + size_t skip = off + 1; + ptr += skip; + remaining -= skip; + off = 0; + continue; + } + if (ptr[off] == NUL) { + // Translate NUL to NL + ptr[off] = NL; + } + off++; + } + tv_list_append_string(event_data.received, ptr, (ssize_t)off); + } else { + event_data.status = status; + } + event_data.data = chan; + event_data.callback = callback; + event_data.type = type; + on_channel_event(&event_data); +} + +void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) +{ + Channel *chan = data; + on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout"); +} + +void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) +{ + Channel *chan = data; + on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr"); +} + +static void on_socket_output(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) +{ + Channel *chan = data; + on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data"); +} + +static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) +{ + Channel *chan = data; + on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin"); +} + +static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, + size_t count, bool eof, CallbackReader *reader, + const char *type) +{ + // stub variable, to keep reading consistent with the order of events, only + // consider the count parameter. + size_t r; + char *ptr = rbuffer_read_ptr(buf, &r); + + if (eof) { + if (reader->buffered) { + process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data, + (size_t)reader->buffer.ga_len, 0); + ga_clear(&reader->buffer); + } else if (callback_reader_set(*reader)) { + process_channel_event(chan, &reader->cb, type, ptr, 0, 0); + } + return; + } + + // The order here matters, the terminal must receive the data first because + // process_channel_event will modify the read buffer(convert NULs into NLs) + if (chan->term) { + terminal_receive(chan->term, ptr, count); + } + + rbuffer_consumed(buf, count); + if (reader->buffered) { + ga_concat_len(&reader->buffer, ptr, count); + } else if (callback_reader_set(*reader)) { + process_channel_event(chan, &reader->cb, type, ptr, count, 0); + } +} + +static void channel_process_exit_cb(Process *proc, int status, void *data) +{ + Channel *chan = data; + if (chan->term && !chan->stream.proc.exited) { + chan->stream.proc.exited = true; + char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; + snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); + terminal_close(chan->term, msg); + } + if (chan->is_rpc) { + channel_process_exit(chan->id, status); + } + + if (chan->status_ptr) { + *chan->status_ptr = status; + } + + process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status); + + channel_decref(chan); +} + +static void on_channel_event(ChannelEvent *ev) +{ + if (!ev->callback) { + return; + } + + typval_T argv[4]; + + argv[0].v_type = VAR_NUMBER; + argv[0].v_lock = VAR_UNLOCKED; + argv[0].vval.v_number = (varnumber_T)ev->data->id; + + if (ev->received) { + argv[1].v_type = VAR_LIST; + argv[1].v_lock = VAR_UNLOCKED; + argv[1].vval.v_list = ev->received; + argv[1].vval.v_list->lv_refcount++; + } else { + argv[1].v_type = VAR_NUMBER; + argv[1].v_lock = VAR_UNLOCKED; + argv[1].vval.v_number = ev->status; + } + + argv[2].v_type = VAR_STRING; + argv[2].v_lock = VAR_UNLOCKED; + argv[2].vval.v_string = (uint8_t *)ev->type; + + typval_T rettv = TV_INITIAL_VALUE; + callback_call(ev->callback, 3, argv, &rettv); + tv_clear(&rettv); +} + diff --git a/src/nvim/channel.h b/src/nvim/channel.h index b48f508722..8ead6749a6 100644 --- a/src/nvim/channel.h +++ b/src/nvim/channel.h @@ -21,14 +21,19 @@ typedef struct { Stream out; } StdioPair; -// typedef struct { -// Callback on_out; -// Callback on_close; -// Garray buffer; -// bool buffering; -// } CallbackReader - -#define CallbackReader Callback +typedef struct { + Callback cb; + garray_T buffer; + bool buffered; +} CallbackReader; + +#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \ + .buffer = GA_EMPTY_INIT_VALUE, \ + .buffered = false }) +static inline bool callback_reader_set(CallbackReader reader) +{ + return reader.cb.type != kCallbackNone; +} struct Channel { uint64_t id; @@ -61,17 +66,6 @@ EXTERN PMap(uint64_t) *channels; # include "channel.h.generated.h" #endif -static inline Channel *channel_alloc(ChannelStreamType type) -{ - Channel *chan = xcalloc(1, sizeof(*chan)); - chan->id = next_chan_id++; - chan->events = multiqueue_new_child(main_loop.events); - chan->refcount = 1; - chan->streamtype = type; - pmap_put(uint64_t)(channels, chan->id, chan); - return chan; -} - /// @returns Channel with the id or NULL if not found static inline Channel *find_channel(uint64_t id) { @@ -89,7 +83,7 @@ static inline Stream *channel_instream(Channel *chan) return &chan->stream.socket; case kChannelStreamStdio: - return &chan->stream.stdio.in; + return &chan->stream.stdio.out; case kChannelStreamInternal: abort(); @@ -108,7 +102,7 @@ static inline Stream *channel_outstream(Channel *chan) return &chan->stream.socket; case kChannelStreamStdio: - return &chan->stream.stdio.out; + return &chan->stream.stdio.in; case kChannelStreamInternal: abort(); diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 013cfce78d..c2fd7ac19c 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -438,14 +438,6 @@ static ScopeDictDictItem vimvars_var; #define vimvarht vimvardict.dv_hashtab typedef struct { - Channel *data; - Callback *callback; - const char *type; - list_T *received; - int status; -} ChannelEvent; - -typedef struct { TimeWatcher tw; int timer_id; int repeat_count; @@ -5121,12 +5113,12 @@ bool garbage_collect(bool testing) // named functions (matters for closures) ABORTING(set_ref_in_functions(copyID)); - // Jobs + // Channels { Channel *data; map_foreach_value(channels, data, { - set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); - set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); + set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL); + set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL); set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); }) } @@ -11643,8 +11635,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) bool detach = false; bool rpc = false; bool pty = false; - Callback on_stdout = CALLBACK_NONE; - Callback on_stderr = CALLBACK_NONE; + CallbackReader on_stdout = CALLBACK_READER_INIT, + on_stderr = CALLBACK_READER_INIT; Callback on_exit = CALLBACK_NONE; char *cwd = NULL; if (argvars[1].v_type == VAR_DICT) { @@ -11676,26 +11668,17 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) } } - Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, - pty, rpc, detach, cwd); + uint16_t width = 0, height = 0; + char *term_name = NULL; if (pty) { - PtyProcess *pty = &data->stream.pty; - uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); - if (width > 0) { - pty->width = width; - } - uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); - if (height > 0) { - pty->height = height; - } - char *term = tv_dict_get_string(job_opts, "TERM", true); - if (term) { - pty->term_name = term; - } + width = (uint16_t)tv_dict_get_number(job_opts, "width"); + height = (uint16_t)tv_dict_get_number(job_opts, "height"); + term_name = tv_dict_get_string(job_opts, "TERM", true); } - common_job_start(data, rettv); + channel_job_start(argv, on_stdout, on_stderr, on_exit, pty, rpc, detach, + cwd, width, height, term_name, &rettv->vval.v_number); } // "jobstop()" function @@ -11782,7 +11765,8 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } - int status = process_wait((Process *)&data->stream.proc, remaining, waiting_jobs); + int status = process_wait((Process *)&data->stream.proc, remaining, + waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -13922,10 +13906,9 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) // The last item of argv must be NULL argv[i] = NULL; - Channel *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, - CALLBACK_NONE, false, true, false, - NULL); - common_job_start(data, rettv); + channel_job_start(argv, CALLBACK_READER_INIT, CALLBACK_READER_INIT, + CALLBACK_NONE, false, true, false, NULL, 0, 0, NULL, + &rettv->vval.v_number); } // "rpcstop()" function @@ -13946,7 +13929,7 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) // if called with a job, stop it, else closes the channel uint64_t id = argvars[0].vval.v_number; - if (find_job(id, false)) { // FIXME + if (find_job(id, false)) { f_jobstop(argvars, rettv, NULL); } else { rettv->vval.v_number = channel_close(id); @@ -15126,18 +15109,19 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr) } bool rpc = false; + CallbackReader on_data = CALLBACK_READER_INIT; if (argvars[2].v_type == VAR_DICT) { dict_T *opts = argvars[2].vval.v_dict; rpc = tv_dict_get_number(opts, "rpc") != 0; - } - if (!rpc) { - EMSG2(_(e_invarg2), "rpc option must be true"); - return; + if (!tv_dict_get_callback(opts, S_LEN("on_data"), &on_data.cb)) { + return; + } + on_data.buffered = tv_dict_get_number(opts, "data_buffered"); } const char *error = NULL; - uint64_t id = channel_connect(tcp, address, 50, &error); + uint64_t id = channel_connect(tcp, address, rpc, on_data, 50, &error); if (error) { EMSG2(_("connection failed: %s"), error); @@ -15517,6 +15501,35 @@ static void f_sort(typval_T *argvars, typval_T *rettv, FunPtr fptr) do_sort_uniq(argvars, rettv, true); } +/// "stdioopen()" function +static void f_stdioopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ + if (argvars[0].v_type != VAR_DICT) { + EMSG(_(e_invarg)); + return; + } + + + bool rpc = false; + CallbackReader on_stdin = CALLBACK_READER_INIT; + dict_T *opts = argvars[0].vval.v_dict; + rpc = tv_dict_get_number(opts, "rpc") != 0; + + if (!tv_dict_get_callback(opts, S_LEN("on_stdin"), &on_stdin.cb)) { + return; + } + + const char *error; + uint64_t id = channel_from_stdio(rpc, on_stdin, &error); + if (!id) { + EMSG2(e_stdiochan2, error); + } + + + rettv->vval.v_number = (varnumber_T)id; + rettv->v_type = VAR_NUMBER; +} + /// "uniq({list})" function static void f_uniq(typval_T *argvars, typval_T *rettv, FunPtr fptr) { @@ -16633,8 +16646,9 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - Callback on_stdout = CALLBACK_NONE, on_stderr = CALLBACK_NONE, - on_exit = CALLBACK_NONE; + CallbackReader on_stdout = CALLBACK_READER_INIT, + on_stderr = CALLBACK_READER_INIT; + Callback on_exit = CALLBACK_NONE; dict_T *job_opts = NULL; const char *cwd = "."; if (argvars[1].v_type == VAR_DICT) { @@ -16658,23 +16672,23 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) } uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); - Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, - true, false, false, cwd); - data->stream.pty.width = term_width; - data->stream.pty.height = curwin->w_height; - data->stream.pty.term_name = xstrdup("xterm-256color"); - if (!common_job_start(data, rettv)) { + Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit, + true, false, false, cwd, + term_width, curwin->w_height, + xstrdup("xterm-256color"), + &rettv->vval.v_number); + if (rettv->vval.v_number <= 0) { return; } TerminalOptions topts; - topts.data = data; + topts.data = chan; topts.width = term_width; topts.height = curwin->w_height; topts.write_cb = term_write; topts.resize_cb = term_resize; topts.close_cb = term_close; - int pid = data->stream.pty.process.pid; + int pid = chan->stream.pty.process.pid; char buf[1024]; // format the title with the pid to conform with the term:// URI @@ -16685,16 +16699,19 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) (void)setfname(curbuf, (char_u *)buf, NULL, true); // Save the job id and pid in b:terminal_job_{id,pid} Error err = ERROR_INIT; + dict_set_var(curbuf->b_vars, cstr_as_string("terminal_channel_id"), + INTEGER_OBJ(chan->id), false, false, &err); + // deprecated name: dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_id"), - INTEGER_OBJ(rettv->vval.v_number), false, false, &err); + INTEGER_OBJ(chan->id), false, false, &err); api_clear_error(&err); dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_pid"), INTEGER_OBJ(pid), false, false, &err); api_clear_error(&err); Terminal *term = terminal_open(topts); - data->term = term; - channel_incref(data); + chan->term = term; + channel_incref(chan); return; } @@ -16783,6 +16800,13 @@ static bool set_ref_in_callback(Callback *callback, int copyID, return false; } +static bool set_ref_in_callback_reader(CallbackReader *reader, int copyID, + ht_stack_T **ht_stack, + list_stack_T **list_stack) +{ + return set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack); +} + static void add_timer_info(typval_T *rettv, timer_T *timer) { list_T *list = rettv->vval.v_list; @@ -22347,206 +22371,29 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, return ret; } -static inline Channel *common_job_init(char **argv, - Callback on_stdout, - Callback on_stderr, - Callback on_exit, - bool pty, - bool rpc, - bool detach, - const char *cwd) -{ - Channel *data = channel_alloc(kChannelStreamProc); - data->on_stdout = on_stdout; - data->on_stderr = on_stderr; - data->on_exit = on_exit; - data->is_rpc = rpc; - if (pty) { - data->stream.pty = pty_process_init(&main_loop, data); - } else { - data->stream.uv = libuv_process_init(&main_loop, data); - } - Process *proc = (Process *)&data->stream.proc; - proc->argv = argv; - proc->cb = eval_job_process_exit_cb; - proc->events = data->events; - proc->detach = detach; - proc->cwd = cwd; - return data; -} - /// common code for getting job callbacks for jobstart, termopen and rpcstart /// /// @return true/false on success/failure. -static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, - Callback *on_stderr, Callback *on_exit) +static inline bool common_job_callbacks(dict_T *vopts, + CallbackReader *on_stdout, + CallbackReader *on_stderr, + Callback *on_exit) { - if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), on_stdout) - &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), on_stderr) + if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), &on_stdout->cb) + &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), &on_stderr->cb) && tv_dict_get_callback(vopts, S_LEN("on_exit"), on_exit)) { + on_stdout->buffered = tv_dict_get_number(vopts, "stdout_buffered"); + on_stderr->buffered = tv_dict_get_number(vopts, "stderr_buffered"); vopts->dv_refcount++; return true; } - callback_free(on_stdout); - callback_free(on_stderr); + callback_reader_free(on_stdout); + callback_reader_free(on_stderr); callback_free(on_exit); return false; } -static inline bool common_job_start(Channel *data, typval_T *rettv) -{ - Process *proc = (Process *)&data->stream.proc; - if (proc->type == kProcessTypePty && proc->detach) { - EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); - xfree(data->stream.pty.term_name); - shell_free_argv(proc->argv); - channel_decref(data); - return false; - } - - data->refcount++; - char *cmd = xstrdup(proc->argv[0]); - bool has_out, has_err; - if (proc->type == kProcessTypePty) { - has_out = true; - has_err = false; - } else { - has_out = data->is_rpc || data->on_stdout.type != kCallbackNone; - has_err = data->on_stderr.type != kCallbackNone; - } - int status = process_spawn(proc, true, has_out, has_err); - if (status) { - EMSG3(_(e_jobspawn), os_strerror(status), cmd); - xfree(cmd); - if (proc->type == kProcessTypePty) { - xfree(data->stream.pty.term_name); - } - rettv->vval.v_number = proc->status; - channel_decref(data); - return false; - } - xfree(cmd); - - - if (data->is_rpc) { - // the rpc takes over the in and out streams - rpc_start(data); - } else { - wstream_init(&proc->in, 0); - if (has_out) { - rstream_init(&proc->out, 0); - rstream_start(&proc->out, on_job_stdout, data); - } - } - - if (has_err) { - rstream_init(&proc->err, 0); - rstream_start(&proc->err, on_job_stderr, data); - } - rettv->vval.v_number = data->id; - return true; -} - -// vimscript job callbacks must be executed on Nvim main loop -static inline void process_job_event(Channel *data, Callback *callback, - const char *type, char *buf, size_t count, - int status) -{ - ChannelEvent event_data; - event_data.received = NULL; - if (buf) { - event_data.received = tv_list_alloc(); - char *ptr = buf; - size_t remaining = count; - size_t off = 0; - - while (off < remaining) { - // append the line - if (ptr[off] == NL) { - tv_list_append_string(event_data.received, ptr, off); - size_t skip = off + 1; - ptr += skip; - remaining -= skip; - off = 0; - continue; - } - if (ptr[off] == NUL) { - // Translate NUL to NL - ptr[off] = NL; - } - off++; - } - tv_list_append_string(event_data.received, ptr, off); - } else { - event_data.status = status; - } - event_data.data = data; - event_data.callback = callback; - event_data.type = type; - on_job_event(&event_data); -} - -static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, - void *job, bool eof) -{ - Channel *data = job; - on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); -} - -static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, - void *job, bool eof) -{ - Channel *data = job; - on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); -} - -static void on_job_output(Stream *stream, Channel *data, RBuffer *buf, - size_t count, bool eof, Callback *callback, - const char *type) -{ - if (eof) { - return; - } - - // stub variable, to keep reading consistent with the order of events, only - // consider the count parameter. - size_t r; - char *ptr = rbuffer_read_ptr(buf, &r); - - // The order here matters, the terminal must receive the data first because - // process_job_event will modify the read buffer(convert NULs into NLs) - if (data->term) { - terminal_receive(data->term, ptr, count); - } - - rbuffer_consumed(buf, count); - if (callback->type != kCallbackNone) { - process_job_event(data, callback, type, ptr, count, 0); - } -} - -static void eval_job_process_exit_cb(Process *proc, int status, void *d) -{ - Channel *data = d; - if (data->term && !data->stream.proc.exited) { - data->stream.proc.exited = true; - char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; - snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); - terminal_close(data->term, msg); - } - if (data->is_rpc) { - channel_process_exit(data->id, status); - } - - if (data->status_ptr) { - *data->status_ptr = status; - } - - process_job_event(data, &data->on_exit, "exit", NULL, 0, status); - - channel_decref(data); -} static void term_write(char *buf, size_t size, void *d) { @@ -22589,38 +22436,6 @@ static void term_close(void *d) multiqueue_put(data->events, term_delayed_free, 1, data); } -static void on_job_event(ChannelEvent *ev) -{ - if (!ev->callback) { - return; - } - - typval_T argv[4]; - - argv[0].v_type = VAR_NUMBER; - argv[0].v_lock = 0; - argv[0].vval.v_number = ev->data->id; - - if (ev->received) { - argv[1].v_type = VAR_LIST; - argv[1].v_lock = 0; - argv[1].vval.v_list = ev->received; - argv[1].vval.v_list->lv_refcount++; - } else { - argv[1].v_type = VAR_NUMBER; - argv[1].v_lock = 0; - argv[1].vval.v_number = ev->status; - } - - argv[2].v_type = VAR_STRING; - argv[2].v_lock = 0; - argv[2].vval.v_string = (uint8_t *)ev->type; - - typval_T rettv = TV_INITIAL_VALUE; - callback_call(ev->callback, 3, argv, &rettv); - tv_clear(&rettv); -} - static Channel *find_job(uint64_t id, bool show_error) { Channel *data = find_channel(id); diff --git a/src/nvim/eval.h b/src/nvim/eval.h index 070bc35bd5..7814a086fa 100644 --- a/src/nvim/eval.h +++ b/src/nvim/eval.h @@ -7,6 +7,9 @@ #include "nvim/eval/typval.h" #include "nvim/profile.h" #include "nvim/garray.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/channel.h" #define COPYID_INC 2 #define COPYID_MASK (~0x1) diff --git a/src/nvim/eval.lua b/src/nvim/eval.lua index 0e359fb61c..3150f26df6 100644 --- a/src/nvim/eval.lua +++ b/src/nvim/eval.lua @@ -273,6 +273,7 @@ return { sockconnect={args={2,3}}, sort={args={1, 3}}, soundfold={args=1}, + stdioopen={args=1}, spellbadword={args={0, 1}}, spellsuggest={args={1, 3}}, split={args={1, 3}}, diff --git a/src/nvim/eval/typval.c b/src/nvim/eval/typval.c index 99382d2a24..4bc3a85efb 100644 --- a/src/nvim/eval/typval.c +++ b/src/nvim/eval/typval.c @@ -374,7 +374,7 @@ void tv_list_append_dict(list_T *const list, dict_T *const dict) /// case string is considered to be usual zero-terminated /// string or NULL “empty” string. void tv_list_append_string(list_T *const l, const char *const str, - const ptrdiff_t len) + const ssize_t len) FUNC_ATTR_NONNULL_ARG(1) { if (str == NULL) { @@ -824,7 +824,7 @@ void tv_dict_watcher_add(dict_T *const dict, const char *const key_pattern, /// @param[in] cb2 Second callback to check. /// /// @return True if they are equal, false otherwise. -bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2) +bool tv_callback_equal(const Callback *cb1, const Callback *cb2) FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT { if (cb1->type != cb2->type) { @@ -843,12 +843,12 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2) return true; } } - assert(false); + abort(); return false; } /// Unref/free callback -void callback_free(Callback *const callback) +void callback_free(Callback *callback) FUNC_ATTR_NONNULL_ALL { switch (callback->type) { @@ -864,9 +864,6 @@ void callback_free(Callback *const callback) case kCallbackNone: { break; } - default: { - abort(); - } } callback->type = kCallbackNone; } diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 1bbe4d9cad..8946f049e2 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -92,7 +92,6 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (in) { stream_init(NULL, &proc->in, -1, STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe)); - proc->in.events = proc->events; proc->in.internal_data = proc; proc->in.internal_close_cb = on_process_stream_close; proc->refcount++; @@ -101,7 +100,6 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (out) { stream_init(NULL, &proc->out, -1, STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe)); - proc->out.events = proc->events; proc->out.internal_data = proc; proc->out.internal_close_cb = on_process_stream_close; proc->refcount++; @@ -110,7 +108,6 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (err) { stream_init(NULL, &proc->err, -1, STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe)); - proc->err.events = proc->events; proc->err.internal_data = proc; proc->err.internal_close_cb = on_process_stream_close; proc->refcount++; @@ -382,15 +379,15 @@ static void flush_stream(Process *proc, Stream *stream) // Poll for data and process the generated events. loop_poll_events(proc->loop, 0); - if (proc->events) { - multiqueue_process_events(proc->events); + if (stream->events) { + multiqueue_process_events(stream->events); } // Stream can be closed if it is empty. if (num_bytes == stream->num_bytes) { - if (stream->read_cb) { + if (stream->read_cb && !stream->did_eof) { // Stream callback could miss EOF handling if a child keeps the stream - // open. + // open. But only send EOF if we haven't already. stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); } break; diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 2c4db08b30..e0500ba828 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -105,20 +105,20 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) { Stream *stream = uvstream->data; - if (cnt > 0) { - stream->num_bytes += (size_t)cnt; - } - if (cnt <= 0) { - if (cnt != UV_ENOBUFS - // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: - // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. - // - // We don't need to do anything with the RBuffer because the next call - // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` - // won't be called) - && cnt != 0) { - DLOG("closing Stream: %p: %s (%s)", stream, + // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: + // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. + // + // We don't need to do anything with the RBuffer because the next call + // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` + // won't be called) + if (cnt == UV_ENOBUFS || cnt == 0) { + return; + } else if (cnt == UV_EOF && uvstream->type == UV_TTY) { + // The TTY driver might signal TTY without closing the stream + invoke_read_cb(stream, 0, true); + } else { + DLOG("Closing Stream (%p): %s (%s)", stream, uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true @@ -130,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // at this point we're sure that cnt is positive, no error occurred size_t nread = (size_t)cnt; + stream->num_bytes += nread; // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(stream->buffer, nread); @@ -187,6 +188,7 @@ static void read_event(void **argv) if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; + stream->did_eof = eof; stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); } stream->pending_reqs--; diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 70a708ff4d..e713323f5c 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -14,10 +14,7 @@ typedef struct stream Stream; /// /// @param stream The Stream instance /// @param rbuffer The associated RBuffer instance -/// @param count Number of bytes to read. This must be respected if keeping -/// the order of events is a requirement. This is because events -/// may be queued and only processed later when more data is copied -/// into to the buffer, so one read may starve another. +/// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, @@ -34,6 +31,7 @@ typedef void (*stream_close_cb)(Stream *stream, void *data); struct stream { bool closed; + bool did_eof; union { uv_pipe_t pipe; uv_tcp_t tcp; diff --git a/src/nvim/globals.h b/src/nvim/globals.h index c639dbcf83..d21cfe7ab6 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -1080,6 +1080,8 @@ EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full")); EXTERN char_u e_jobspawn[] INIT(= N_( "E903: Process failed to start: %s: \"%s\"")); EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty")); +EXTERN char_u e_stdiochan2[] INIT(= N_( + "E905: Couldn't open stdio channel: %s")); EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\"")); EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s")); EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number")); @@ -1190,9 +1192,13 @@ EXTERN char *ignoredp; // If a msgpack-rpc channel should be started over stdin/stdout EXTERN bool embedded_mode INIT(= false); +// Dont try to start an user interface +// or read/write to stdio (unless embedding) +EXTERN bool headless_mode INIT(= false); /// next free id for a job or rpc channel -EXTERN uint64_t next_chan_id INIT(= 1); +/// 1 is reserved for stdio channel +EXTERN uint64_t next_chan_id INIT(= 2); /// Used to track the status of external functions. /// Currently only used for iconv(). diff --git a/src/nvim/main.c b/src/nvim/main.c index 93afe11f3a..9059644fc0 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -103,7 +103,6 @@ typedef struct { bool input_isatty; // stdin is a terminal bool output_isatty; // stdout is a terminal bool err_isatty; // stderr is a terminal - bool headless; // Do not start the builtin UI. int no_swap_file; // "-n" argument used int use_debug_break_level; int window_count; /* number of windows to use */ @@ -299,7 +298,7 @@ int main(int argc, char **argv) cmdline_row = (int)(Rows - p_ch); msg_row = cmdline_row; screenalloc(false); /* allocate screen buffers */ - set_init_2(params.headless); + set_init_2(headless_mode); TIME_MSG("inits 2"); msg_scroll = TRUE; @@ -311,7 +310,7 @@ int main(int argc, char **argv) /* Set the break level after the terminal is initialized. */ debug_break_level = params.use_debug_break_level; - bool reading_input = !params.headless && (params.input_isatty + bool reading_input = !headless_mode && (params.input_isatty || params.output_isatty || params.err_isatty); if (reading_input) { @@ -448,7 +447,7 @@ int main(int argc, char **argv) wait_return(TRUE); } - if (!params.headless) { + if (!headless_mode) { // Stop reading from input stream, the UI layer will take over now. input_stop(); ui_builtin_start(); @@ -809,11 +808,14 @@ static void command_line_scan(mparm_T *parmp) } mch_exit(0); } else if (STRICMP(argv[0] + argv_idx, "headless") == 0) { - parmp->headless = true; + headless_mode = true; } else if (STRICMP(argv[0] + argv_idx, "embed") == 0) { embedded_mode = true; - parmp->headless = true; - channel_from_stdio(); + headless_mode = true; + const char *err; + if (!channel_from_stdio(true, CALLBACK_READER_INIT, &err)) { + abort(); + } } else if (STRNICMP(argv[0] + argv_idx, "literal", 7) == 0) { #if !defined(UNIX) parmp->literal = TRUE; @@ -1216,7 +1218,6 @@ static void init_params(mparm_T *paramp, int argc, char **argv) memset(paramp, 0, sizeof(*paramp)); paramp->argc = argc; paramp->argv = argv; - paramp->headless = false; paramp->want_full_screen = true; paramp->use_debug_break_level = -1; paramp->window_count = -1; @@ -1387,7 +1388,7 @@ static void handle_tag(char_u *tagname) // When starting in Ex mode and commands come from a file, set Silent mode. static void check_tty(mparm_T *parmp) { - if (parmp->headless) { + if (headless_mode) { return; } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 42b1f63830..56af4fa791 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -13,7 +13,6 @@ #include "nvim/api/ui.h" #include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" -#include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" #include "nvim/event/rstream.h" @@ -30,12 +29,9 @@ #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" -#include "nvim/path.h" #include "nvim/lib/kvec.h" #include "nvim/os/input.h" -#define CHANNEL_BUFFER_SIZE 0xffff - #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) @@ -66,57 +62,18 @@ void rpc_start(Channel *channel) rpc->next_request_id = 1; kv_init(rpc->call_stack); - Stream *in = channel_instream(channel); - Stream *out = channel_outstream(channel); - - DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); - - wstream_init(in, 0); - rstream_init(out, CHANNEL_BUFFER_SIZE); - rstream_start(out, receive_msgpack, channel); -} - -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) -{ - Channel *channel = channel_alloc(kChannelStreamSocket); - socket_watcher_accept(watcher, &channel->stream.socket); - channel_incref(channel); // close channel only after the stream is closed - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - rpc_start(channel); -} - -/// TODO: move to eval.c, also support bytes -uint64_t channel_connect(bool tcp, const char *address, - int timeout, const char **error) -{ - if (!tcp) { - char *path = fix_fname(address); - if (server_owns_pipe_address(path)) { - // avoid deadlock - xfree(path); - return channel_create_internal(); - } - xfree(path); - } + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - Channel *channel = channel_alloc(kChannelStreamSocket); - if (!socket_connect(&main_loop, &channel->stream.socket, - tcp, address, timeout, error)) { - channel_decref(channel); - return 0; + rstream_start(out, receive_msgpack, channel); } - - channel_incref(channel); // close channel only after the stream is closed - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - rpc_start(channel); - return channel->id; } + static Channel *find_rpc_channel(uint64_t id) { Channel *chan = find_channel(id); @@ -263,29 +220,6 @@ bool channel_close(uint64_t id) return true; } -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = channel_alloc(kChannelStreamStdio); - channel_incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); - wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); - - rpc_start(channel); -} - -/// Creates a loopback channel. This is used to avoid deadlock -/// when an instance connects to its own named pipe. -uint64_t channel_create_internal(void) -{ - Channel *channel = channel_alloc(kChannelStreamInternal); - channel_incref(channel); // internal channel lives until process exit - rpc_start(channel); - return channel->id; -} - void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); @@ -663,11 +597,6 @@ void rpc_free(Channel *channel) kv_destroy(channel->rpc.call_stack); } -static void close_cb(Stream *stream, void *data) -{ - channel_decref(data); -} - static bool is_rpc_response(msgpack_object *obj) { return obj->type == MSGPACK_OBJECT_ARRAY diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index f54fe412ba..ec2ebb9d19 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -227,18 +227,15 @@ static int do_os_system(char **argv, return -1; } - // We want to deal with stream events as fast a possible while queueing - // process events, so reset everything to NULL. It prevents closing the + // Note: unlike process events, stream events are not queued, as we want to + // deal with stream events as fast a possible. It prevents closing the // streams while there's still data in the OS buffer (due to the process // exiting before all data is read). if (input != NULL) { - proc->in.events = NULL; wstream_init(&proc->in, 0); } - proc->out.events = NULL; rstream_init(&proc->out, 0); rstream_start(&proc->out, data_cb, &buf); - proc->err.events = NULL; rstream_init(&proc->err, 0); rstream_start(&proc->err, data_cb, &buf); diff --git a/src/nvim/rbuffer.c b/src/nvim/rbuffer.c index 18d453cbe9..df9394fbb2 100644 --- a/src/nvim/rbuffer.c +++ b/src/nvim/rbuffer.c @@ -121,7 +121,7 @@ char *rbuffer_read_ptr(RBuffer *buf, size_t *read_count) FUNC_ATTR_NONNULL_ALL { if (!buf->size) { *read_count = 0; - return NULL; + return buf->read_ptr; } if (buf->read_ptr < buf->write_ptr) { |