diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 109 |
1 files changed, 88 insertions, 21 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index cd64e14976..68ac35bc4e 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -12,6 +12,7 @@ #include "nvim/api/vim.h" #include "nvim/api/ui.h" #include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" #include "nvim/event/rstream.h" @@ -28,6 +29,7 @@ #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" +#include "nvim/path.h" #include "nvim/lib/kvec.h" #include "nvim/os/input.h" @@ -41,7 +43,8 @@ typedef enum { kChannelTypeSocket, kChannelTypeProc, - kChannelTypeStdio + kChannelTypeStdio, + kChannelTypeInternal } ChannelType; typedef struct { @@ -125,7 +128,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id) wstream_init(proc->in, 0); rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack, channel); + rstream_start(proc->out, receive_msgpack, channel); return channel->id; } @@ -142,7 +145,36 @@ void channel_from_connection(SocketWatcher *watcher) channel->data.stream.internal_data = channel; wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack, channel); + rstream_start(&channel->data.stream, receive_msgpack, channel); +} + +uint64_t channel_connect(bool tcp, const char *address, + int timeout, const char **error) +{ + if (!tcp) { + char *path = fix_fname(address); + if (server_owns_pipe_address(path)) { + // avoid deadlock + xfree(path); + return channel_create_internal(); + } + xfree(path); + } + + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); + if (!socket_connect(&main_loop, &channel->data.stream, + tcp, address, timeout, error)) { + decref(channel); + return 0; + } + + incref(channel); // close channel only after the stream is closed + channel->data.stream.internal_close_cb = close_cb; + channel->data.stream.internal_data = channel; + wstream_init(&channel->data.stream, 0); + rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.stream, receive_msgpack, channel); + return channel->id; } /// Sends event/arguments to channel @@ -305,11 +337,20 @@ void channel_from_stdio(void) incref(channel); // stdio channels are only closed on exit // read stream rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, parse_msgpack, channel); + rstream_start(&channel->data.std.in, receive_msgpack, channel); // write stream wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); } +/// Creates a loopback channel. This is used to avoid deadlock +/// when an instance connects to its own named pipe. +uint64_t channel_create_internal(void) +{ + Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); + incref(channel); // internal channel lives until process exit + return channel->id; +} + void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); @@ -318,8 +359,8 @@ void channel_process_exit(uint64_t id, int status) decref(channel); } -static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, - bool eof) +static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, + void *data, bool eof) { Channel *channel = data; incref(channel); @@ -329,7 +370,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, WARNING_LOG_LEVEL); goto end; } @@ -341,6 +382,14 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); msgpack_unpacker_buffer_consumed(channel->unpacker, count); + parse_msgpack(channel); + +end: + decref(channel); +} + +static void parse_msgpack(Channel *channel) +{ msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); msgpack_unpack_return result; @@ -360,11 +409,11 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -388,11 +437,9 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - decref(channel); } + static void handle_request(Channel *channel, msgpack_object *request) FUNC_ATTR_NONNULL_ALL { @@ -412,7 +459,7 @@ static void handle_request(Channel *channel, msgpack_object *request) snprintf(buf, sizeof(buf), "ch %" PRIu64 " sent an invalid message, closed.", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } api_clear_error(&error); return; @@ -485,7 +532,7 @@ static void on_request_event(void **argv) static bool channel_write(Channel *channel, WBuffer *buffer) { - bool success; + bool success = false; if (channel->closed) { wstream_release_wbuffer(buffer); @@ -502,8 +549,11 @@ static bool channel_write(Channel *channel, WBuffer *buffer) case kChannelTypeStdio: success = wstream_write(&channel->data.std.out, buffer); break; - default: - abort(); + case kChannelTypeInternal: + incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + break; } if (!success) { @@ -514,12 +564,28 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "Before returning from a RPC call, ch %" PRIu64 " was " "closed due to a failed write", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } return success; } +static void internal_read_event(void **argv) +{ + Channel *channel = argv[0]; + WBuffer *buffer = argv[1]; + + msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->unpacker), + buffer->data, buffer->size); + msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); + + parse_msgpack(channel); + + decref(channel); + wstream_release_wbuffer(buffer); +} + static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; @@ -636,8 +702,9 @@ static void close_channel(Channel *channel) stream_close(&channel->data.std.out, NULL, NULL); multiqueue_put(main_loop.fast_events, exit_event, 1, channel); return; - default: - abort(); + case kChannelTypeInternal: + // nothing to free. + break; } decref(channel); @@ -728,9 +795,9 @@ static void complete_call(msgpack_object *obj, Channel *channel) } } -static void call_set_error(Channel *channel, char *msg) +static void call_set_error(Channel *channel, char *msg, int loglevel) { - ELOG("RPC: %s", msg); + LOG(loglevel, "RPC: %s", msg); for (size_t i = 0; i < kv_size(channel->call_stack); i++) { ChannelCallFrame *frame = kv_A(channel->call_stack, i); frame->returned = true; |