diff options
-rw-r--r-- | src/nvim/event/process.c | 3 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 2 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 2 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 1 | ||||
-rw-r--r-- | src/nvim/log.c | 2 | ||||
-rw-r--r-- | src/nvim/log.h | 8 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 76 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 9 | ||||
-rw-r--r-- | src/nvim/normal.c | 1 | ||||
-rw-r--r-- | test/functional/api/server_requests_spec.lua | 30 |
10 files changed, 105 insertions, 29 deletions
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index c936583841..8371d3cd48 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -233,8 +233,7 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL switch (proc->type) { case kProcessTypeUv: // Close the process's stdin. If the process doesn't close its own - // stdout/stderr, they will be closed when it exits(possibly due to being - // terminated after a timeout) + // stdout/stderr, they will be closed when it exits (voluntarily or not). process_close_in(proc); ILOG("Sending SIGTERM to pid %d", proc->pid); uv_kill(proc->pid, SIGTERM); diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 854af474b2..2c4db08b30 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -118,7 +118,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` // won't be called) && cnt != 0) { - DLOG("Closing Stream (%p): %s (%s)", stream, + DLOG("closing Stream: %p: %s (%s)", stream, uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 60ceff9b24..7c865bfe1e 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -7,6 +7,7 @@ #include <uv.h> +#include "nvim/log.h" #include "nvim/rbuffer.h" #include "nvim/macros.h" #include "nvim/event/stream.h" @@ -81,6 +82,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); + DLOG("closing Stream: %p", stream); stream->closed = true; stream->close_cb = on_stream_close; stream->close_cb_data = data; diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index f453e5898d..320006890d 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -8,6 +8,7 @@ #include <uv.h> +#include "nvim/log.h" #include "nvim/event/loop.h" #include "nvim/event/wstream.h" #include "nvim/vim.h" diff --git a/src/nvim/log.c b/src/nvim/log.c index 3baf0b2ebd..436a8a4079 100644 --- a/src/nvim/log.c +++ b/src/nvim/log.c @@ -250,7 +250,7 @@ static bool v_do_log_to_file(FILE *log_file, int log_level, static const char *log_levels[] = { [DEBUG_LOG_LEVEL] = "DEBUG", [INFO_LOG_LEVEL] = "INFO ", - [WARNING_LOG_LEVEL] = "WARN ", + [WARN_LOG_LEVEL] = "WARN ", [ERROR_LOG_LEVEL] = "ERROR", }; assert(log_level >= DEBUG_LOG_LEVEL && log_level <= ERROR_LOG_LEVEL); diff --git a/src/nvim/log.h b/src/nvim/log.h index 5064d9333b..d63bcc366c 100644 --- a/src/nvim/log.h +++ b/src/nvim/log.h @@ -6,7 +6,7 @@ #define DEBUG_LOG_LEVEL 0 #define INFO_LOG_LEVEL 1 -#define WARNING_LOG_LEVEL 2 +#define WARN_LOG_LEVEL 2 #define ERROR_LOG_LEVEL 3 #define DLOG(...) @@ -43,12 +43,12 @@ __VA_ARGS__) #endif -#if MIN_LOG_LEVEL <= WARNING_LOG_LEVEL +#if MIN_LOG_LEVEL <= WARN_LOG_LEVEL # undef WLOG # undef WLOGN -# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, true, \ +# define WLOG(...) do_log(WARN_LOG_LEVEL, __func__, __LINE__, true, \ __VA_ARGS__) -# define WLOGN(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, false, \ +# define WLOGN(...) do_log(WARN_LOG_LEVEL, __func__, __LINE__, false, \ __VA_ARGS__) #endif diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6fd1af1ba6..02f3854f47 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -62,7 +62,7 @@ typedef struct { ChannelType type; msgpack_unpacker *unpacker; union { - Stream stream; + Stream stream; // bidirectional (socket) Process *proc; struct { Stream in; @@ -133,6 +133,9 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) rstream_init(proc->out, 0); rstream_start(proc->out, receive_msgpack, channel); + DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, + proc->out); + return channel->id; } @@ -150,6 +153,9 @@ void channel_from_connection(SocketWatcher *watcher) wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); rstream_start(&channel->data.stream, receive_msgpack, channel); + + DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, + &channel->data.stream); } /// @param source description of source function, rplugin name, TCP addr, etc @@ -344,6 +350,9 @@ void channel_from_stdio(void) rstream_start(&channel->data.std.in, receive_msgpack, channel); // write stream wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); + + DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, + &channel->data.std.in, &channel->data.std.out); } /// Creates a loopback channel. This is used to avoid deadlock @@ -363,6 +372,7 @@ void channel_process_exit(uint64_t id, int status) decref(channel); } +// rstream.c:read_event() invokes this as stream->read_cb(). static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { @@ -374,12 +384,24 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, WARNING_LOG_LEVEL); + call_set_error(channel, buf, WARN_LOG_LEVEL); + goto end; + } + + if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) + || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { + char buf[256]; + snprintf(buf, sizeof(buf), + "ch %" PRIu64 ": stream closed unexpectedly. " + "closing channel", + channel->id); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } size_t count = rbuffer_size(rbuf); - DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream); + DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -435,8 +457,8 @@ static void parse_msgpack(Channel *channel) // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); @@ -534,6 +556,39 @@ static void on_request_event(void **argv) api_clear_error(&error); } +/// Returns the Stream that a Channel writes to. +static Stream *chan_wstream(Channel *chan) +{ + switch (chan->type) { + case kChannelTypeSocket: + return &chan->data.stream; + case kChannelTypeProc: + return chan->data.proc->in; + case kChannelTypeStdio: + return &chan->data.std.out; + case kChannelTypeInternal: + return NULL; + } + abort(); +} + +/// Returns the Stream that a Channel reads from. +static Stream *chan_rstream(Channel *chan) +{ + switch (chan->type) { + case kChannelTypeSocket: + return &chan->data.stream; + case kChannelTypeProc: + return chan->data.proc->out; + case kChannelTypeStdio: + return &chan->data.std.in; + case kChannelTypeInternal: + return NULL; + } + abort(); +} + + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -545,13 +600,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer) switch (channel->type) { case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; case kChannelTypeProc: - success = wstream_write(channel->data.proc->in, buffer); - break; case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); + success = wstream_write(chan_wstream(channel), buffer); break; case kChannelTypeInternal: incref(channel); @@ -565,8 +616,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer) char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, ch %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); call_set_error(channel, buf, ERROR_LOG_LEVEL); } @@ -817,6 +868,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel) ChannelCallFrame *frame = kv_A(channel->call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 444c6cc256..fecae11d45 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -88,7 +88,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) { bool ret = true; kvec_t(MPToAPIObjectStackItem) stack = KV_INITIAL_VALUE; - kv_push(stack, ((MPToAPIObjectStackItem) { obj, arg, false, 0 })); + kv_push(stack, ((MPToAPIObjectStackItem) { + .mobj = obj, + .aobj = arg, + .container = false, + .idx = 0, + })); while (ret && kv_size(stack)) { MPToAPIObjectStackItem cur = kv_last(stack); if (!cur.container) { @@ -361,7 +366,7 @@ typedef struct { size_t idx; } APIToMPObjectStackItem; -/// Convert type used by Neovim API to msgpack +/// Convert type used by Nvim API to msgpack type. /// /// @param[in] result Object to convert. /// @param[out] res Structure that defines where conversion results are saved. diff --git a/src/nvim/normal.c b/src/nvim/normal.c index c1676780d8..238378c474 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -13,6 +13,7 @@ #include <stdbool.h> #include <stdlib.h> +#include "nvim/log.h" #include "nvim/vim.h" #include "nvim/ascii.h" #include "nvim/normal.h" diff --git a/test/functional/api/server_requests_spec.lua b/test/functional/api/server_requests_spec.lua index 6a32f979ea..9f245d913b 100644 --- a/test/functional/api/server_requests_spec.lua +++ b/test/functional/api/server_requests_spec.lua @@ -20,6 +20,22 @@ describe('server -> client', function() cid = nvim('get_api_info')[1] end) + it('handles unexpected closed stream while preparing RPC response', function() + source([[ + let g:_nvim_args = [v:progpath, '--embed', '-n', '-u', 'NONE', '-i', 'NONE', ] + let ch1 = jobstart(g:_nvim_args, {'rpc': v:true}) + let child1_ch = rpcrequest(ch1, "nvim_get_api_info")[0] + call rpcnotify(ch1, 'nvim_eval', 'rpcrequest('.child1_ch.', "nvim_get_api_info")') + + let ch2 = jobstart(g:_nvim_args, {'rpc': v:true}) + let child2_ch = rpcrequest(ch2, "nvim_get_api_info")[0] + call rpcnotify(ch2, 'nvim_eval', 'rpcrequest('.child2_ch.', "nvim_get_api_info")') + + call jobstop(ch1) + ]]) + eq(2, eval("1+1")) -- Still alive? + end) + describe('simple call', function() it('works', function() local function on_setup() @@ -141,7 +157,7 @@ describe('server -> client', function() end) end) - describe('when the client is a recursive vim instance', function() + describe('recursive (child) nvim client', function() if os.getenv("TRAVIS") and helpers.os_name() == "osx" then -- XXX: Hangs Travis macOS since e9061117a5b8f195c3f26a5cb94e18ddd7752d86. pending("[Hangs on Travis macOS. #5002]", function() end) @@ -155,7 +171,7 @@ describe('server -> client', function() after_each(function() command('call rpcstop(vim)') end) - it('can send/recieve notifications and make requests', function() + it('can send/receive notifications and make requests', function() nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')") -- Wait for the notification to complete. @@ -188,7 +204,7 @@ describe('server -> client', function() end) end) - describe('when using jobstart', function() + describe('jobstart()', function() local jobid before_each(function() local channel = nvim('get_api_info')[1] @@ -227,7 +243,7 @@ describe('server -> client', function() end) end) - describe('when connecting to another nvim instance', function() + describe('connecting to another (peer) nvim', function() local function connect_test(server, mode, address) local serverpid = funcs.getpid() local client = spawn(nvim_argv) @@ -256,7 +272,7 @@ describe('server -> client', function() client:close() end - it('over a named pipe', function() + it('via named pipe', function() local server = spawn(nvim_argv) set_session(server) local address = funcs.serverlist()[1] @@ -265,7 +281,7 @@ describe('server -> client', function() connect_test(server, 'pipe', address) end) - it('to an ip adress', function() + it('via ip address', function() local server = spawn(nvim_argv) set_session(server) local address = funcs.serverstart("127.0.0.1:") @@ -273,7 +289,7 @@ describe('server -> client', function() connect_test(server, 'tcp', address) end) - it('to a hostname', function() + it('via hostname', function() local server = spawn(nvim_argv) set_session(server) local address = funcs.serverstart("localhost:") |