diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 76 |
1 files changed, 64 insertions, 12 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6fd1af1ba6..02f3854f47 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -62,7 +62,7 @@ typedef struct { ChannelType type; msgpack_unpacker *unpacker; union { - Stream stream; + Stream stream; // bidirectional (socket) Process *proc; struct { Stream in; @@ -133,6 +133,9 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) rstream_init(proc->out, 0); rstream_start(proc->out, receive_msgpack, channel); + DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, + proc->out); + return channel->id; } @@ -150,6 +153,9 @@ void channel_from_connection(SocketWatcher *watcher) wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); rstream_start(&channel->data.stream, receive_msgpack, channel); + + DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, + &channel->data.stream); } /// @param source description of source function, rplugin name, TCP addr, etc @@ -344,6 +350,9 @@ void channel_from_stdio(void) rstream_start(&channel->data.std.in, receive_msgpack, channel); // write stream wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); + + DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, + &channel->data.std.in, &channel->data.std.out); } /// Creates a loopback channel. This is used to avoid deadlock @@ -363,6 +372,7 @@ void channel_process_exit(uint64_t id, int status) decref(channel); } +// rstream.c:read_event() invokes this as stream->read_cb(). static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { @@ -374,12 +384,24 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, WARNING_LOG_LEVEL); + call_set_error(channel, buf, WARN_LOG_LEVEL); + goto end; + } + + if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) + || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { + char buf[256]; + snprintf(buf, sizeof(buf), + "ch %" PRIu64 ": stream closed unexpectedly. " + "closing channel", + channel->id); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } size_t count = rbuffer_size(rbuf); - DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream); + DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -435,8 +457,8 @@ static void parse_msgpack(Channel *channel) // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); @@ -534,6 +556,39 @@ static void on_request_event(void **argv) api_clear_error(&error); } +/// Returns the Stream that a Channel writes to. +static Stream *chan_wstream(Channel *chan) +{ + switch (chan->type) { + case kChannelTypeSocket: + return &chan->data.stream; + case kChannelTypeProc: + return chan->data.proc->in; + case kChannelTypeStdio: + return &chan->data.std.out; + case kChannelTypeInternal: + return NULL; + } + abort(); +} + +/// Returns the Stream that a Channel reads from. +static Stream *chan_rstream(Channel *chan) +{ + switch (chan->type) { + case kChannelTypeSocket: + return &chan->data.stream; + case kChannelTypeProc: + return chan->data.proc->out; + case kChannelTypeStdio: + return &chan->data.std.in; + case kChannelTypeInternal: + return NULL; + } + abort(); +} + + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -545,13 +600,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer) switch (channel->type) { case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; case kChannelTypeProc: - success = wstream_write(channel->data.proc->in, buffer); - break; case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); + success = wstream_write(chan_wstream(channel), buffer); break; case kChannelTypeInternal: incref(channel); @@ -565,8 +616,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer) char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, ch %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); call_set_error(channel, buf, ERROR_LOG_LEVEL); } @@ -817,6 +868,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel) ChannelCallFrame *frame = kv_A(channel->call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } |