aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r--src/nvim/channel.c138
1 files changed, 73 insertions, 65 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index 40af470bde..776e2bfa86 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -4,6 +4,7 @@
#include "nvim/api/ui.h"
#include "nvim/channel.h"
#include "nvim/eval.h"
+#include "nvim/eval/encode.h"
#include "nvim/event/socket.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
@@ -179,10 +180,12 @@ static Channel *channel_alloc(ChannelStreamType type)
}
/// Not implemented, only logging for now
-void channel_create_event(Channel *chan, char *ext_source)
+void channel_create_event(Channel *chan, const char *ext_source)
{
#if MIN_LOG_LEVEL <= INFO_LOG_LEVEL
- char *stream_desc, *mode_desc, *source;
+ const char *stream_desc;
+ const char *mode_desc;
+ const char *source;
switch (chan->streamtype) {
case kChannelStreamProc:
@@ -222,8 +225,8 @@ void channel_create_event(Channel *chan, char *ext_source)
// external events should be included.
source = ext_source;
} else {
- eval_format_source_name_line((char *)IObuff, sizeof(IObuff));
- source = (char *)IObuff;
+ eval_fmt_source_name_line((char *)IObuff, sizeof(IObuff));
+ source = (const char *)IObuff;
}
ILOG("new channel %" PRIu64 " (%s%s): %s", chan->id, stream_desc,
@@ -234,15 +237,16 @@ void channel_create_event(Channel *chan, char *ext_source)
#endif
}
-void channel_incref(Channel *channel)
+void channel_incref(Channel *chan)
{
- channel->refcount++;
+ chan->refcount++;
}
-void channel_decref(Channel *channel)
+void channel_decref(Channel *chan)
{
- if (!(--channel->refcount)) {
- multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel);
+ if (!(--chan->refcount)) {
+ // delay free, so that libuv is done with the handles
+ multiqueue_put(main_loop.events, free_channel_event, 1, chan);
}
}
@@ -264,18 +268,18 @@ void callback_reader_start(CallbackReader *reader)
static void free_channel_event(void **argv)
{
- Channel *channel = argv[0];
- if (channel->is_rpc) {
- rpc_free(channel);
+ Channel *chan = argv[0];
+ if (chan->is_rpc) {
+ rpc_free(chan);
}
- callback_reader_free(&channel->on_stdout);
- callback_reader_free(&channel->on_stderr);
- callback_free(&channel->on_exit);
+ callback_reader_free(&chan->on_stdout);
+ callback_reader_free(&chan->on_stderr);
+ callback_free(&chan->on_exit);
- pmap_del(uint64_t)(channels, channel->id);
- multiqueue_free(channel->events);
- xfree(channel);
+ pmap_del(uint64_t)(channels, chan->id);
+ multiqueue_free(chan->events);
+ xfree(chan);
}
static void channel_destroy_early(Channel *chan)
@@ -283,12 +287,15 @@ static void channel_destroy_early(Channel *chan)
if ((chan->id != --next_chan_id)) {
abort();
}
+ pmap_del(uint64_t)(channels, chan->id);
+ chan->id = 0;
if ((--chan->refcount != 0)) {
abort();
}
- free_channel_event((void **)&chan);
+ // uv will keep a reference to handles until next loop tick, so delay free
+ multiqueue_put(main_loop.events, free_channel_event, 1, chan);
}
@@ -391,17 +398,22 @@ uint64_t channel_connect(bool tcp, const char *address,
bool rpc, CallbackReader on_output,
int timeout, const char **error)
{
+ Channel *channel;
+
if (!tcp && rpc) {
char *path = fix_fname(address);
- if (server_owns_pipe_address(path)) {
- // avoid deadlock
- xfree(path);
- return channel_create_internal_rpc();
- }
+ bool loopback = server_owns_pipe_address(path);
xfree(path);
+ if (loopback) {
+ // Create a loopback channel. This avoids deadlock if nvim connects to
+ // its own named pipe.
+ channel = channel_alloc(kChannelStreamInternal);
+ rpc_start(channel);
+ goto end;
+ }
}
- Channel *channel = channel_alloc(kChannelStreamSocket);
+ channel = channel_alloc(kChannelStreamSocket);
if (!socket_connect(&main_loop, &channel->stream.socket,
tcp, address, timeout, error)) {
channel_destroy_early(channel);
@@ -421,7 +433,8 @@ uint64_t channel_connect(bool tcp, const char *address,
rstream_start(&channel->stream.socket, on_socket_output, channel);
}
- channel_create_event(channel, NULL);
+end:
+ channel_create_event(channel, address);
return channel->id;
}
@@ -440,15 +453,6 @@ void channel_from_connection(SocketWatcher *watcher)
channel_create_event(channel, watcher->addr);
}
-/// Creates a loopback channel. This is used to avoid deadlock
-/// when an instance connects to its own named pipe.
-static uint64_t channel_create_internal_rpc(void)
-{
- Channel *channel = channel_alloc(kChannelStreamInternal);
- rpc_start(channel);
- return channel->id;
-}
-
/// Creates an API channel from stdin/stdout. This is used when embedding
/// Neovim
uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
@@ -522,32 +526,21 @@ err:
return 0;
}
-/// NB: mutates buf in place!
-static list_T *buffer_to_tv_list(char *buf, size_t count)
+/// Convert binary byte array to a readfile()-style list
+///
+/// @param[in] buf Array to convert.
+/// @param[in] len Array length.
+///
+/// @return [allocated] Converted list.
+static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
+ FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE
{
- list_T *ret = tv_list_alloc();
- char *ptr = buf;
- size_t remaining = count;
- size_t off = 0;
-
- while (off < remaining) {
- // append the line
- if (ptr[off] == NL) {
- tv_list_append_string(ret, ptr, (ssize_t)off);
- size_t skip = off + 1;
- ptr += skip;
- remaining -= skip;
- off = 0;
- continue;
- }
- if (ptr[off] == NUL) {
- // Translate NUL to NL
- ptr[off] = NL;
- }
- off++;
- }
- tv_list_append_string(ret, ptr, (ssize_t)off);
- return ret;
+ list_T *const l = tv_list_alloc(kListLenMayKnow);
+ // Empty buffer should be represented by [''], encode_list_write() thinks
+ // empty list is fine for the case.
+ tv_list_append_string(l, "", 0);
+ encode_list_write(l, buf, len);
+ return l;
}
// vimscript job callbacks must be executed on Nvim main loop
@@ -599,6 +592,7 @@ static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
}
+/// @param type must have static lifetime
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
size_t count, bool eof, CallbackReader *reader,
const char *type)
@@ -613,14 +607,20 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
(size_t)reader->buffer.ga_len, 0);
- ga_clear(&reader->buffer);
} else if (reader->self) {
- list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
- (size_t)reader->buffer.ga_len);
- tv_dict_add_list(reader->self, type, strlen(type), data);
+ if (tv_dict_find(reader->self, type, -1) == NULL) {
+ list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len);
+ tv_dict_add_list(reader->self, type, strlen(type), data);
+ } else {
+ // can't display error message now, defer it.
+ channel_incref(chan);
+ multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
+ }
} else {
abort();
}
+ ga_clear(&reader->buffer);
} else if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
}
@@ -641,6 +641,14 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
}
}
+static void on_buffered_error(void **args)
+{
+ Channel *chan = (Channel *)args[0];
+ const char *stream = (const char *)args[1];
+ EMSG3(_(e_streamkey), stream, chan->id);
+ channel_decref(chan);
+}
+
static void channel_process_exit_cb(Process *proc, int status, void *data)
{
Channel *chan = data;
@@ -673,7 +681,7 @@ static void on_channel_event(void **args)
argv[1].v_type = VAR_LIST;
argv[1].v_lock = VAR_UNLOCKED;
argv[1].vval.v_list = ev->received;
- argv[1].vval.v_list->lv_refcount++;
+ tv_list_ref(argv[1].vval.v_list);
} else {
argv[1].v_type = VAR_NUMBER;
argv[1].v_lock = VAR_UNLOCKED;