diff options
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r-- | src/nvim/channel.c | 138 |
1 files changed, 73 insertions, 65 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c index 40af470bde..776e2bfa86 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -4,6 +4,7 @@ #include "nvim/api/ui.h" #include "nvim/channel.h" #include "nvim/eval.h" +#include "nvim/eval/encode.h" #include "nvim/event/socket.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" @@ -179,10 +180,12 @@ static Channel *channel_alloc(ChannelStreamType type) } /// Not implemented, only logging for now -void channel_create_event(Channel *chan, char *ext_source) +void channel_create_event(Channel *chan, const char *ext_source) { #if MIN_LOG_LEVEL <= INFO_LOG_LEVEL - char *stream_desc, *mode_desc, *source; + const char *stream_desc; + const char *mode_desc; + const char *source; switch (chan->streamtype) { case kChannelStreamProc: @@ -222,8 +225,8 @@ void channel_create_event(Channel *chan, char *ext_source) // external events should be included. source = ext_source; } else { - eval_format_source_name_line((char *)IObuff, sizeof(IObuff)); - source = (char *)IObuff; + eval_fmt_source_name_line((char *)IObuff, sizeof(IObuff)); + source = (const char *)IObuff; } ILOG("new channel %" PRIu64 " (%s%s): %s", chan->id, stream_desc, @@ -234,15 +237,16 @@ void channel_create_event(Channel *chan, char *ext_source) #endif } -void channel_incref(Channel *channel) +void channel_incref(Channel *chan) { - channel->refcount++; + chan->refcount++; } -void channel_decref(Channel *channel) +void channel_decref(Channel *chan) { - if (!(--channel->refcount)) { - multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel); + if (!(--chan->refcount)) { + // delay free, so that libuv is done with the handles + multiqueue_put(main_loop.events, free_channel_event, 1, chan); } } @@ -264,18 +268,18 @@ void callback_reader_start(CallbackReader *reader) static void free_channel_event(void **argv) { - Channel *channel = argv[0]; - if (channel->is_rpc) { - rpc_free(channel); + Channel *chan = argv[0]; + if (chan->is_rpc) { + rpc_free(chan); } - callback_reader_free(&channel->on_stdout); - callback_reader_free(&channel->on_stderr); - callback_free(&channel->on_exit); + callback_reader_free(&chan->on_stdout); + callback_reader_free(&chan->on_stderr); + callback_free(&chan->on_exit); - pmap_del(uint64_t)(channels, channel->id); - multiqueue_free(channel->events); - xfree(channel); + pmap_del(uint64_t)(channels, chan->id); + multiqueue_free(chan->events); + xfree(chan); } static void channel_destroy_early(Channel *chan) @@ -283,12 +287,15 @@ static void channel_destroy_early(Channel *chan) if ((chan->id != --next_chan_id)) { abort(); } + pmap_del(uint64_t)(channels, chan->id); + chan->id = 0; if ((--chan->refcount != 0)) { abort(); } - free_channel_event((void **)&chan); + // uv will keep a reference to handles until next loop tick, so delay free + multiqueue_put(main_loop.events, free_channel_event, 1, chan); } @@ -391,17 +398,22 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader on_output, int timeout, const char **error) { + Channel *channel; + if (!tcp && rpc) { char *path = fix_fname(address); - if (server_owns_pipe_address(path)) { - // avoid deadlock - xfree(path); - return channel_create_internal_rpc(); - } + bool loopback = server_owns_pipe_address(path); xfree(path); + if (loopback) { + // Create a loopback channel. This avoids deadlock if nvim connects to + // its own named pipe. + channel = channel_alloc(kChannelStreamInternal); + rpc_start(channel); + goto end; + } } - Channel *channel = channel_alloc(kChannelStreamSocket); + channel = channel_alloc(kChannelStreamSocket); if (!socket_connect(&main_loop, &channel->stream.socket, tcp, address, timeout, error)) { channel_destroy_early(channel); @@ -421,7 +433,8 @@ uint64_t channel_connect(bool tcp, const char *address, rstream_start(&channel->stream.socket, on_socket_output, channel); } - channel_create_event(channel, NULL); +end: + channel_create_event(channel, address); return channel->id; } @@ -440,15 +453,6 @@ void channel_from_connection(SocketWatcher *watcher) channel_create_event(channel, watcher->addr); } -/// Creates a loopback channel. This is used to avoid deadlock -/// when an instance connects to its own named pipe. -static uint64_t channel_create_internal_rpc(void) -{ - Channel *channel = channel_alloc(kChannelStreamInternal); - rpc_start(channel); - return channel->id; -} - /// Creates an API channel from stdin/stdout. This is used when embedding /// Neovim uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, @@ -522,32 +526,21 @@ err: return 0; } -/// NB: mutates buf in place! -static list_T *buffer_to_tv_list(char *buf, size_t count) +/// Convert binary byte array to a readfile()-style list +/// +/// @param[in] buf Array to convert. +/// @param[in] len Array length. +/// +/// @return [allocated] Converted list. +static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) + FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE { - list_T *ret = tv_list_alloc(); - char *ptr = buf; - size_t remaining = count; - size_t off = 0; - - while (off < remaining) { - // append the line - if (ptr[off] == NL) { - tv_list_append_string(ret, ptr, (ssize_t)off); - size_t skip = off + 1; - ptr += skip; - remaining -= skip; - off = 0; - continue; - } - if (ptr[off] == NUL) { - // Translate NUL to NL - ptr[off] = NL; - } - off++; - } - tv_list_append_string(ret, ptr, (ssize_t)off); - return ret; + list_T *const l = tv_list_alloc(kListLenMayKnow); + // Empty buffer should be represented by [''], encode_list_write() thinks + // empty list is fine for the case. + tv_list_append_string(l, "", 0); + encode_list_write(l, buf, len); + return l; } // vimscript job callbacks must be executed on Nvim main loop @@ -599,6 +592,7 @@ static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count, on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin"); } +/// @param type must have static lifetime static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, size_t count, bool eof, CallbackReader *reader, const char *type) @@ -613,14 +607,20 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, if (reader->cb.type != kCallbackNone) { process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data, (size_t)reader->buffer.ga_len, 0); - ga_clear(&reader->buffer); } else if (reader->self) { - list_T *data = buffer_to_tv_list(reader->buffer.ga_data, - (size_t)reader->buffer.ga_len); - tv_dict_add_list(reader->self, type, strlen(type), data); + if (tv_dict_find(reader->self, type, -1) == NULL) { + list_T *data = buffer_to_tv_list(reader->buffer.ga_data, + (size_t)reader->buffer.ga_len); + tv_dict_add_list(reader->self, type, strlen(type), data); + } else { + // can't display error message now, defer it. + channel_incref(chan); + multiqueue_put(chan->events, on_buffered_error, 2, chan, type); + } } else { abort(); } + ga_clear(&reader->buffer); } else if (reader->cb.type != kCallbackNone) { process_channel_event(chan, &reader->cb, type, ptr, 0, 0); } @@ -641,6 +641,14 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, } } +static void on_buffered_error(void **args) +{ + Channel *chan = (Channel *)args[0]; + const char *stream = (const char *)args[1]; + EMSG3(_(e_streamkey), stream, chan->id); + channel_decref(chan); +} + static void channel_process_exit_cb(Process *proc, int status, void *data) { Channel *chan = data; @@ -673,7 +681,7 @@ static void on_channel_event(void **args) argv[1].v_type = VAR_LIST; argv[1].v_lock = VAR_UNLOCKED; argv[1].vval.v_list = ev->received; - argv[1].vval.v_list->lv_refcount++; + tv_list_ref(argv[1].vval.v_list); } else { argv[1].v_type = VAR_NUMBER; argv[1].v_lock = VAR_UNLOCKED; |