aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c109
1 files changed, 88 insertions, 21 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index cd64e14976..68ac35bc4e 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -12,6 +12,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@@ -28,6 +29,7 @@
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
+#include "nvim/path.h"
#include "nvim/lib/kvec.h"
#include "nvim/os/input.h"
@@ -41,7 +43,8 @@
typedef enum {
kChannelTypeSocket,
kChannelTypeProc,
- kChannelTypeStdio
+ kChannelTypeStdio,
+ kChannelTypeInternal
} ChannelType;
typedef struct {
@@ -125,7 +128,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id)
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack, channel);
+ rstream_start(proc->out, receive_msgpack, channel);
return channel->id;
}
@@ -142,7 +145,36 @@ void channel_from_connection(SocketWatcher *watcher)
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack, channel);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+}
+
+uint64_t channel_connect(bool tcp, const char *address,
+ int timeout, const char **error)
+{
+ if (!tcp) {
+ char *path = fix_fname(address);
+ if (server_owns_pipe_address(path)) {
+ // avoid deadlock
+ xfree(path);
+ return channel_create_internal();
+ }
+ xfree(path);
+ }
+
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ if (!socket_connect(&main_loop, &channel->data.stream,
+ tcp, address, timeout, error)) {
+ decref(channel);
+ return 0;
+ }
+
+ incref(channel); // close channel only after the stream is closed
+ channel->data.stream.internal_close_cb = close_cb;
+ channel->data.stream.internal_data = channel;
+ wstream_init(&channel->data.stream, 0);
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+ return channel->id;
}
/// Sends event/arguments to channel
@@ -305,11 +337,20 @@ void channel_from_stdio(void)
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.std.in, parse_msgpack, channel);
+ rstream_start(&channel->data.std.in, receive_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
+/// Creates a loopback channel. This is used to avoid deadlock
+/// when an instance connects to its own named pipe.
+uint64_t channel_create_internal(void)
+{
+ Channel *channel = register_channel(kChannelTypeInternal, 0, NULL);
+ incref(channel); // internal channel lives until process exit
+ return channel->id;
+}
+
void channel_process_exit(uint64_t id, int status)
{
Channel *channel = pmap_get(uint64_t)(channels, id);
@@ -318,8 +359,8 @@ void channel_process_exit(uint64_t id, int status)
decref(channel);
}
-static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
- bool eof)
+static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
+ void *data, bool eof)
{
Channel *channel = data;
incref(channel);
@@ -329,7 +370,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, WARNING_LOG_LEVEL);
goto end;
}
@@ -341,6 +382,14 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+ parse_msgpack(channel);
+
+end:
+ decref(channel);
+}
+
+static void parse_msgpack(Channel *channel)
+{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
msgpack_unpack_return result;
@@ -360,11 +409,11 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
"ch %" PRIu64 " returned a response with an unknown request "
"id. Ensure the client is properly synchronized",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -388,11 +437,9 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- decref(channel);
}
+
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
@@ -412,7 +459,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
snprintf(buf, sizeof(buf),
"ch %" PRIu64 " sent an invalid message, closed.",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
api_clear_error(&error);
return;
@@ -485,7 +532,7 @@ static void on_request_event(void **argv)
static bool channel_write(Channel *channel, WBuffer *buffer)
{
- bool success;
+ bool success = false;
if (channel->closed) {
wstream_release_wbuffer(buffer);
@@ -502,8 +549,11 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
break;
- default:
- abort();
+ case kChannelTypeInternal:
+ incref(channel);
+ CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
+ success = true;
+ break;
}
if (!success) {
@@ -514,12 +564,28 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
"Before returning from a RPC call, ch %" PRIu64 " was "
"closed due to a failed write",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
return success;
}
+static void internal_read_event(void **argv)
+{
+ Channel *channel = argv[0];
+ WBuffer *buffer = argv[1];
+
+ msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
+ memcpy(msgpack_unpacker_buffer(channel->unpacker),
+ buffer->data, buffer->size);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
+
+ parse_msgpack(channel);
+
+ decref(channel);
+ wstream_release_wbuffer(buffer);
+}
+
static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
@@ -636,8 +702,9 @@ static void close_channel(Channel *channel)
stream_close(&channel->data.std.out, NULL, NULL);
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
- default:
- abort();
+ case kChannelTypeInternal:
+ // nothing to free.
+ break;
}
decref(channel);
@@ -728,9 +795,9 @@ static void complete_call(msgpack_object *obj, Channel *channel)
}
}
-static void call_set_error(Channel *channel, char *msg)
+static void call_set_error(Channel *channel, char *msg, int loglevel)
{
- ELOG("RPC: %s", msg);
+ LOG(loglevel, "RPC: %s", msg);
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
frame->returned = true;