From 502af39f6261e1d1b71df7fa3169e943707ee7f2 Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Wed, 19 Jul 2017 02:24:12 +0200 Subject: log: channel registration --- src/nvim/msgpack_rpc/channel.c | 44 +++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 68ac35bc4e..d7748925aa 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -117,12 +117,11 @@ void channel_teardown(void) /// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is handled by the job infrastructure. /// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. +/// @return Channel id (> 0), on success. 0, on error. uint64_t channel_from_process(Process *proc, uint64_t id) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); + Channel *channel = register_channel(kChannelTypeProc, id, proc->events, + proc->argv); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; @@ -138,7 +137,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, NULL); socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; @@ -161,7 +160,7 @@ uint64_t channel_connect(bool tcp, const char *address, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, NULL); if (!socket_connect(&main_loop, &channel->data.stream, tcp, address, timeout, error)) { decref(channel); @@ -333,7 +332,7 @@ bool channel_close(uint64_t id) /// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); + Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL); incref(channel); // stdio channels are only closed on exit // read stream rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); @@ -346,7 +345,7 @@ void channel_from_stdio(void) /// when an instance connects to its own named pipe. uint64_t channel_create_internal(void) { - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); + Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL); incref(channel); // internal channel lives until process exit return channel->id; } @@ -746,7 +745,7 @@ static void close_cb(Stream *stream, void *data) } static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) + MultiQueue *events, char **argv) { Channel *rv = xmalloc(sizeof(Channel)); rv->events = events ? events : multiqueue_new_child(main_loop.events); @@ -761,6 +760,33 @@ static Channel *register_channel(ChannelType type, uint64_t id, kv_init(rv->call_stack); kv_init(rv->delayed_notifications); pmap_put(uint64_t)(channels, rv->id, rv); + +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + switch(type) { + case kChannelTypeSocket: + DLOG("register ch %" PRIu64 " type=socket", rv->id); + break; + case kChannelTypeProc: { + char buf[IOSIZE]; + buf[0] = '\0'; + char **p = argv; + while (p != NULL && *p != NULL) { + xstrlcat(buf, *p, sizeof(buf)); + xstrlcat(buf, " ", sizeof(buf)); + p++; + } + DLOG("register ch %" PRIu64 " type=proc argv=%s", rv->id, buf); + } + break; + case kChannelTypeStdio: + DLOG("register ch %" PRIu64 " type=stdio", rv->id); + break; + case kChannelTypeInternal: + DLOG("register ch %" PRIu64 " type=internal", rv->id); + break; + } +#endif + return rv; } -- cgit From bc6a3fe78445f41851f30c6f55e11b6b1ed5feaf Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Wed, 19 Jul 2017 10:27:34 +0200 Subject: log: caller provides the source details --- src/nvim/msgpack_rpc/channel.c | 57 +++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 34 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index d7748925aa..6fd1af1ba6 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -117,11 +117,15 @@ void channel_teardown(void) /// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is handled by the job infrastructure. /// +/// @param proc process object +/// @param id (optional) channel id +/// @param source description of source function, rplugin name, TCP addr, etc +/// /// @return Channel id (> 0), on success. 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +uint64_t channel_from_process(Process *proc, uint64_t id, char *source) { Channel *channel = register_channel(kChannelTypeProc, id, proc->events, - proc->argv); + source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; @@ -137,7 +141,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, NULL); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, + watcher->addr); socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; @@ -147,8 +152,9 @@ void channel_from_connection(SocketWatcher *watcher) rstream_start(&channel->data.stream, receive_msgpack, channel); } -uint64_t channel_connect(bool tcp, const char *address, - int timeout, const char **error) +/// @param source description of source function, rplugin name, TCP addr, etc +uint64_t channel_connect(bool tcp, const char *address, int timeout, + char *source, const char **error) { if (!tcp) { char *path = fix_fname(address); @@ -160,7 +166,7 @@ uint64_t channel_connect(bool tcp, const char *address, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, NULL); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source); if (!socket_connect(&main_loop, &channel->data.stream, tcp, address, timeout, error)) { decref(channel); @@ -328,8 +334,7 @@ bool channel_close(uint64_t id) return true; } -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim +/// Creates an API channel from stdin/stdout. Used to embed Nvim. void channel_from_stdio(void) { Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL); @@ -744,9 +749,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +/// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events, char **argv) + MultiQueue *events, char *source) { + // Jobs and channels share the same id namespace. + assert(id == 0 || !pmap_get(uint64_t)(channels, id)); Channel *rv = xmalloc(sizeof(Channel)); rv->events = events ? events : multiqueue_new_child(main_loop.events); rv->type = type; @@ -761,31 +769,12 @@ static Channel *register_channel(ChannelType type, uint64_t id, kv_init(rv->delayed_notifications); pmap_put(uint64_t)(channels, rv->id, rv); -#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL - switch(type) { - case kChannelTypeSocket: - DLOG("register ch %" PRIu64 " type=socket", rv->id); - break; - case kChannelTypeProc: { - char buf[IOSIZE]; - buf[0] = '\0'; - char **p = argv; - while (p != NULL && *p != NULL) { - xstrlcat(buf, *p, sizeof(buf)); - xstrlcat(buf, " ", sizeof(buf)); - p++; - } - DLOG("register ch %" PRIu64 " type=proc argv=%s", rv->id, buf); - } - break; - case kChannelTypeStdio: - DLOG("register ch %" PRIu64 " type=stdio", rv->id); - break; - case kChannelTypeInternal: - DLOG("register ch %" PRIu64 " type=internal", rv->id); - break; - } -#endif + ILOG("new channel %" PRIu64 " (%s): %s", rv->id, + (type == kChannelTypeProc ? "proc" + : (type == kChannelTypeSocket ? "socket" + : (type == kChannelTypeStdio ? "stdio" + : (type == kChannelTypeInternal ? "internal" : "?")))), + (source ? source : "?")); return rv; } -- cgit From e006b1d98d92aead6aff565ed5d8b034772aa16c Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Thu, 27 Jul 2017 00:30:57 +0200 Subject: log: some DEBUG-level stream logging --- src/nvim/msgpack_rpc/channel.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6fd1af1ba6..789cf1d4a8 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -133,6 +133,9 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) rstream_init(proc->out, 0); rstream_start(proc->out, receive_msgpack, channel); + DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, + proc->out); + return channel->id; } @@ -150,6 +153,8 @@ void channel_from_connection(SocketWatcher *watcher) wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); rstream_start(&channel->data.stream, receive_msgpack, channel); + + DLOG("ch %" PRIu64 " in/out-stream=%p", &channel->data.stream); } /// @param source description of source function, rplugin name, TCP addr, etc @@ -344,6 +349,9 @@ void channel_from_stdio(void) rstream_start(&channel->data.std.in, receive_msgpack, channel); // write stream wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); + + DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, + &channel->data.std.in, &channel->data.std.out); } /// Creates a loopback channel. This is used to avoid deadlock @@ -363,6 +371,7 @@ void channel_process_exit(uint64_t id, int status) decref(channel); } +// rstream.c:read_event() invokes this as stream->read_cb(). static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { @@ -379,7 +388,8 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, } size_t count = rbuffer_size(rbuf); - DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream); + DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -565,8 +575,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer) char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, ch %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); call_set_error(channel, buf, ERROR_LOG_LEVEL); } -- cgit From af993da4351d579aa24de08d806c8c1b90813106 Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Thu, 27 Jul 2017 01:08:50 +0200 Subject: rpc: close channel if stream was closed f_jobstop()/f_rpcstop() .. process_stop() .. process_close_in(proc) closes the write-stream of a RPC channel. But there might be a pending RPC notification on the queue, which may get processed just before the channel is closed. To handle that case, check the Stream.closed in channel.c:receive_msgpack(). Before this change, the above scenario could trigger this assert(!stream->closed) in wstream_write(): 0x00007f96e1cd3428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54 0x00007f96e1cd502a in __GI_abort () at abort.c:89 0x00007f96e1ccbbd7 in __assert_fail_base (fmt=, assertion=assertion@entry=0x768f9b "!stream->closed", file=file@entry=0x768f70 "../src/nvim/event/wstream.c", line=line@entry=77, function=function@entry=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:92 0x00007f96e1ccbc82 in __GI___assert_fail (assertion=0x768f9b "!stream->closed", file=0x768f70 "../src/nvim/event/wstream.c", line=77, function=0x768fb0 <__PRETTY_FUNCTION__.13735> "wstream_write") at assert.c:101 0x00000000004d2c1f in wstream_write (stream=0x7f96e0a35078, buffer=0x7f96e09f9b40) at ../src/nvim/event/wstream.c:77 0x00000000005857b2 in channel_write (channel=0x7f96e0ae5800, buffer=0x7f96e09f9b40) at ../src/nvim/msgpack_rpc/channel.c:551 0x000000000058567d in on_request_event (argv=0x7ffed792efa0) at ../src/nvim/msgpack_rpc/channel.c:523 0x00000000005854c8 in handle_request (channel=0x7f96e0ae5800, request=0x7ffed792f1b8) at ../src/nvim/msgpack_rpc/channel.c:503 0x00000000005850cb in parse_msgpack (channel=0x7f96e0ae5800) at ../src/nvim/msgpack_rpc/channel.c:423 0x0000000000584f90 in receive_msgpack (stream=0x7f96e0a35218, rbuf=0x7f96e0d1d4c0, c=22, data=0x7f96e0ae5800, eof=false) at ../src/nvim/msgpack_rpc/channel.c:389 0x00000000004d0b20 in read_event (argv=0x7ffed792f4a8) at ../src/nvim/event/rstream.c:190 0x00000000004ce462 in multiqueue_process_events (this=0x7f96e18172d0) at ../src/nvim/event/multiqueue.c:150 0x000000000059b630 in nv_event (cap=0x7ffed792f620) at ../src/nvim/normal.c:7908 0x000000000058be69 in normal_execute (state=0x7ffed792f580, key=-25341) at ../src/nvim/normal.c:1137 0x0000000000652463 in state_enter (s=0x7ffed792f580) at ../src/nvim/state.c:61 0x000000000058a1fe in normal_enter (cmdwin=false, noexmode=false) at ../src/nvim/normal.c:467 0x00000000005500c2 in main (argc=2, argv=0x7ffed792f8d8) at ../src/nvim/main.c:554 Alternative approach suggested by bfredl is to use close_cb of the process. My unsuccessful attempt is below. (It seems close_cb is queued too late, which is the similar problem addressed by this commit): commit 75fc12c6ab15711bdb7b18c6d42ec9d157f5145e Author: Justin M. Keyes Date: Fri Aug 18 01:30:41 2017 +0200 rpc: use Stream's close_cb instead of explicit check in receive_msgpack() diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 8371d3cd482e..e52da23cdc40 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -416,6 +416,10 @@ static void on_process_exit(Process *proc) static void on_process_stream_close(Stream *stream, void *data) { Process *proc = data; + ILOG("on_process_stream_close"); + if (proc->stream_close_cb != NULL) { + proc->stream_close_cb(stream, proc->stream_close_data); + } decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ecd5..34a8d54f6f8c 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,11 @@ struct process { Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + + // Called when any of the process streams (in/out/err) closes. + stream_close_cb stream_close_cb; + void *stream_close_data; + bool closed, detach; MultiQueue *events; }; @@ -50,6 +55,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .closed = false, .internal_close_cb = NULL, .internal_exit_cb = NULL, + .stream_close_cb = NULL, + .stream_close_data = NULL, .detach = false }; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 7c865bfe1e8c..c8720d1e45d9 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -95,7 +95,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close_handle(Stream *stream) FUNC_ATTR_NONNULL_ALL { + ILOG("stream=%d", stream); + // LOG_CALLSTACK(); if (stream->uvstream) { + // problem: this schedules on the queue, but channel.c:receive_msgpack may + // be processed before close_cb is called by libuv. uv_close((uv_handle_t *)stream->uvstream, close_cb); } else { uv_close((uv_handle_t *)&stream->uv.idle, close_cb); @@ -105,6 +109,7 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + ILOG(">>>>>>>>>>>>>>>>>>>>>>> stream=%p stream->internal_close_cb=%p", stream, stream->internal_close_cb); if (stream->buffer) { rbuffer_free(stream->buffer); } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 782eabe04e4a..dc2b794e366a 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -128,6 +128,8 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) source); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; + channel->data.proc->stream_close_cb = close_cb2; + channel->data.proc->stream_close_data = channel; wstream_init(proc->in, 0); rstream_init(proc->out, 0); @@ -387,17 +389,6 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", channel->id, count, stream); @@ -571,23 +562,6 @@ static Stream *chan_wstream(Channel *chan) abort(); } -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -799,6 +773,12 @@ static void close_cb(Stream *stream, void *data) decref(data); } +static void close_cb2(Stream *stream, void *data) +{ + ILOG("close_cb2"); + close_channel(data); +} + /// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, MultiQueue *events, char *source) --- src/nvim/msgpack_rpc/channel.c | 61 +++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 10 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 789cf1d4a8..ed97f298d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -62,7 +62,7 @@ typedef struct { ChannelType type; msgpack_unpacker *unpacker; union { - Stream stream; + Stream stream; // bidirectional (socket) Process *proc; struct { Stream in; @@ -154,7 +154,8 @@ void channel_from_connection(SocketWatcher *watcher) rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); rstream_start(&channel->data.stream, receive_msgpack, channel); - DLOG("ch %" PRIu64 " in/out-stream=%p", &channel->data.stream); + DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, + &channel->data.stream); } /// @param source description of source function, rplugin name, TCP addr, etc @@ -383,7 +384,18 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, WARNING_LOG_LEVEL); + call_set_error(channel, buf, WARN_LOG_LEVEL); + goto end; + } + + if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) + || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { + char buf[256]; + snprintf(buf, sizeof(buf), + "ch %" PRIu64 ": stream closed unexpectedly. " + "closing channel", + channel->id); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } @@ -445,8 +457,8 @@ static void parse_msgpack(Channel *channel) // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); @@ -544,6 +556,39 @@ static void on_request_event(void **argv) api_clear_error(&error); } +/// Returns the Stream that a Channel writes to. +static Stream *chan_wstream(Channel *chan) +{ + switch (chan->type) { + case kChannelTypeSocket: + return &chan->data.stream; + case kChannelTypeProc: + return chan->data.proc->in; + case kChannelTypeStdio: + return &chan->data.std.out; + case kChannelTypeInternal: + return NULL; + } + abort(); +} + +/// Returns the Stream that a Channel reads from. +static Stream *chan_rstream(Channel *chan) +{ + switch (chan->type) { + case kChannelTypeSocket: + return &chan->data.stream; + case kChannelTypeProc: + return chan->data.proc->out; + case kChannelTypeStdio: + return &chan->data.std.in; + case kChannelTypeInternal: + return NULL; + } + abort(); +} + + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success = false; @@ -555,13 +600,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer) switch (channel->type) { case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; case kChannelTypeProc: - success = wstream_write(channel->data.proc->in, buffer); - break; case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); + success = wstream_write(chan_wstream(channel), buffer); break; case kChannelTypeInternal: incref(channel); -- cgit From 0f442c328e33a0c742df9283c1944c077d202a77 Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Fri, 18 Aug 2017 14:13:28 +0200 Subject: channel.c:call_set_error(): fix memory leak --- src/nvim/msgpack_rpc/channel.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ed97f298d9..02f3854f47 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -868,6 +868,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel) ChannelCallFrame *frame = kv_A(channel->call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } -- cgit From cdd9e868efdad1f1eb9febfabb5f8671e75b95b9 Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Sat, 2 Sep 2017 19:51:49 +0200 Subject: doc: channel, eventloop --- src/nvim/msgpack_rpc/channel.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 02f3854f47..88232a55de 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -188,12 +188,11 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout, return channel->id; } -/// Sends event/arguments to channel +/// Publishes an event to a channel. /// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments +/// @param id Channel id. 0 means "broadcast to all subscribed channels" +/// @param name Event name (application-defined) +/// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. bool channel_send_event(uint64_t id, const char *name, Array args) { @@ -215,7 +214,6 @@ bool channel_send_event(uint64_t id, const char *name, Array args) send_event(channel, name, args); } } else { - // TODO(tarruda): Implement event broadcasting in vimscript broadcast_event(name, args); } -- cgit From 2a3bcd1ff883429a3dd17e7ae5ddc1396abbad17 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 29 Oct 2017 03:06:53 +0100 Subject: rpc: Don't delay notifications when request is pending (#6544) With the old behavior, if a GUI makes a blocking request that requires user interaction (like nvim_input()), it would not get any screen updates. The client, not nvim, should decide how to handle notifications during a pending request. If an rplugin wants to avoid async calls while a sync call is busy, it likely wants to avoid processing async calls while another async call also is handled as well. This may break the expectation of some existing rplugins. For compatibility, remote/define.vim reimplements the old behavior. Clients can opt-out by specifying `sync=urgent`. - Legacy hosts should be updated to use `sync=urgent`. They could add a flag indicating which async methods are always safe to call and which must wait until the main loop returns. - New hosts can expose the full asyncness, they don't need to offer both behaviors. ref #6532 ref #1398 d83868fe9071af1b4866594eac12f7aa0fa71b53 --- src/nvim/msgpack_rpc/channel.c | 36 ++---------------------------------- 1 file changed, 2 insertions(+), 34 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 88232a55de..5efdb9a194 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -56,7 +56,6 @@ typedef struct { typedef struct { uint64_t id; size_t refcount; - size_t pending_requests; PMap(cstr_t) *subscribed_events; bool closed; ChannelType type; @@ -71,7 +70,6 @@ typedef struct { } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; - kvec_t(WBuffer *) delayed_notifications; MultiQueue *events; } Channel; @@ -205,14 +203,7 @@ bool channel_send_event(uint64_t id, const char *name, Array args) } if (channel) { - if (channel->pending_requests) { - // Pending request, queue the notification for later sending. - const String method = cstr_as_string((char *)name); - WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1); - kv_push(channel->delayed_notifications, buffer); - } else { - send_event(channel, name, args); - } + send_event(channel, name, args); } else { broadcast_event(name, args); } @@ -248,10 +239,8 @@ Object channel_send_call(uint64_t id, // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; kv_push(channel->call_stack, &frame); - channel->pending_requests++; LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); (void)kv_pop(channel->call_stack); - channel->pending_requests--; if (frame.errored) { if (frame.result.type == kObjectTypeString) { @@ -276,10 +265,6 @@ Object channel_send_call(uint64_t id, api_free_object(frame.result); } - if (!channel->pending_requests) { - send_delayed_notifications(channel); - } - decref(channel); return frame.errored ? NIL : frame.result; @@ -704,11 +689,7 @@ static void broadcast_event(const char *name, Array args) for (size_t i = 0; i < kv_size(subscribed); i++) { Channel *channel = kv_A(subscribed, i); - if (channel->pending_requests) { - kv_push(channel->delayed_notifications, buffer); - } else { - channel_write(channel, buffer); - } + channel_write(channel, buffer); } end: @@ -786,7 +767,6 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); - kv_destroy(channel->delayed_notifications); if (channel->type != kChannelTypeProc) { multiqueue_free(channel->events); } @@ -811,11 +791,9 @@ static Channel *register_channel(ChannelType type, uint64_t id, rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->id = id > 0 ? id : next_chan_id++; - rv->pending_requests = 0; rv->subscribed_events = pmap_new(cstr_t)(); rv->next_request_id = 1; kv_init(rv->call_stack); - kv_init(rv->delayed_notifications); pmap_put(uint64_t)(channels, rv->id, rv); ILOG("new channel %" PRIu64 " (%s): %s", rv->id, @@ -912,16 +890,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void send_delayed_notifications(Channel* channel) -{ - for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) { - WBuffer *buffer = kv_A(channel->delayed_notifications, i); - channel_write(channel, buffer); - } - - kv_size(channel->delayed_notifications) = 0; -} - static void incref(Channel *channel) { channel->refcount++; -- cgit From 3717e2157f2d45ce23dbe4ac03085fea2d956dc4 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 23 Jul 2017 18:04:14 +0200 Subject: Revert channel logging, rebased on new code below --- src/nvim/msgpack_rpc/channel.c | 43 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 29 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5efdb9a194..18f6334780 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -115,15 +115,12 @@ void channel_teardown(void) /// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is handled by the job infrastructure. /// -/// @param proc process object -/// @param id (optional) channel id -/// @param source description of source function, rplugin name, TCP addr, etc -/// -/// @return Channel id (> 0), on success. 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id, char *source) +/// @param argv The argument vector for the process. [consumed] +/// @return The channel id (> 0), on success. +/// 0, on error. +uint64_t channel_from_process(Process *proc, uint64_t id) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events, - source); + Channel *channel = register_channel(kChannelTypeProc, id, proc->events); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; @@ -142,8 +139,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, - watcher->addr); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; @@ -156,9 +152,8 @@ void channel_from_connection(SocketWatcher *watcher) &channel->data.stream); } -/// @param source description of source function, rplugin name, TCP addr, etc -uint64_t channel_connect(bool tcp, const char *address, int timeout, - char *source, const char **error) +uint64_t channel_connect(bool tcp, const char *address, + int timeout, const char **error) { if (!tcp) { char *path = fix_fname(address); @@ -170,7 +165,7 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); if (!socket_connect(&main_loop, &channel->data.stream, tcp, address, timeout, error)) { decref(channel); @@ -323,10 +318,11 @@ bool channel_close(uint64_t id) return true; } -/// Creates an API channel from stdin/stdout. Used to embed Nvim. +/// Creates an API channel from stdin/stdout. This is used when embedding +/// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL); + Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); incref(channel); // stdio channels are only closed on exit // read stream rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); @@ -342,7 +338,7 @@ void channel_from_stdio(void) /// when an instance connects to its own named pipe. uint64_t channel_create_internal(void) { - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL); + Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); incref(channel); // internal channel lives until process exit return channel->id; } @@ -778,12 +774,9 @@ static void close_cb(Stream *stream, void *data) decref(data); } -/// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events, char *source) + MultiQueue *events) { - // Jobs and channels share the same id namespace. - assert(id == 0 || !pmap_get(uint64_t)(channels, id)); Channel *rv = xmalloc(sizeof(Channel)); rv->events = events ? events : multiqueue_new_child(main_loop.events); rv->type = type; @@ -795,14 +788,6 @@ static Channel *register_channel(ChannelType type, uint64_t id, rv->next_request_id = 1; kv_init(rv->call_stack); pmap_put(uint64_t)(channels, rv->id, rv); - - ILOG("new channel %" PRIu64 " (%s): %s", rv->id, - (type == kChannelTypeProc ? "proc" - : (type == kChannelTypeSocket ? "socket" - : (type == kChannelTypeStdio ? "stdio" - : (type == kChannelTypeInternal ? "internal" : "?")))), - (source ? source : "?")); - return rv; } -- cgit From 5215e3205a07b85e4e4cf1f8a8ca6be2b9556459 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 27 Aug 2017 11:59:33 +0200 Subject: channels: refactor --- src/nvim/msgpack_rpc/channel.c | 388 ++++++++++++++--------------------------- 1 file changed, 128 insertions(+), 260 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 18f6334780..42b1f63830 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -11,6 +11,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" @@ -40,47 +41,6 @@ #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio, - kChannelTypeInternal -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; // bidirectional (socket) - Process *proc; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - MultiQueue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -88,50 +48,32 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { ch_before_blocking_events = multiqueue_new_child(main_loop.events); - channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +void rpc_start(Channel *channel) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); - incref(channel); // process channels are only closed by the exit_cb - channel->data.proc = proc; + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + kv_init(rpc->call_stack); - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, receive_msgpack, channel); + Stream *in = channel_instream(channel); + Stream *out = channel_outstream(channel); - DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, - proc->out); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); - return channel->id; + wstream_init(in, 0); + rstream_init(out, CHANNEL_BUFFER_SIZE); + rstream_start(out, receive_msgpack, channel); } /// Creates an API channel from a tcp/pipe socket connection @@ -139,19 +81,15 @@ uint64_t channel_from_process(Process *proc, uint64_t id) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - socket_watcher_accept(watcher, &channel->data.stream); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); - - DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, - &channel->data.stream); + Channel *channel = channel_alloc(kChannelStreamSocket); + socket_watcher_accept(watcher, &channel->stream.socket); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + rpc_start(channel); } +/// TODO: move to eval.c, also support bytes uint64_t channel_connect(bool tcp, const char *address, int timeout, const char **error) { @@ -165,34 +103,40 @@ uint64_t channel_connect(bool tcp, const char *address, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - if (!socket_connect(&main_loop, &channel->data.stream, + Channel *channel = channel_alloc(kChannelStreamSocket); + if (!socket_connect(&main_loop, &channel->stream.socket, tcp, address, timeout, error)) { - decref(channel); + channel_decref(channel); return 0; } - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + rpc_start(channel); return channel->id; } +static Channel *find_rpc_channel(uint64_t id) +{ + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; + } + return chan; +} + /// Publishes an event to a channel. /// /// @param id Channel id. 0 means "broadcast to all subscribed channels" /// @param name Event name (application-defined) /// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, const char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } @@ -213,29 +157,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - const char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { @@ -260,7 +205,7 @@ Object channel_send_call(uint64_t id, api_free_object(frame.result); } - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -269,11 +214,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -284,18 +229,18 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -310,7 +255,7 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { return false; } @@ -322,24 +267,22 @@ bool channel_close(uint64_t id) /// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); - incref(channel); // stdio channels are only closed on exit + Channel *channel = channel_alloc(kChannelStreamStdio); + channel_incref(channel); // stdio channels are only closed on exit // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, receive_msgpack, channel); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); + rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); + wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); - DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, - &channel->data.std.in, &channel->data.std.out); + rpc_start(channel); } /// Creates a loopback channel. This is used to avoid deadlock /// when an instance connects to its own named pipe. uint64_t channel_create_internal(void) { - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); - incref(channel); // internal channel lives until process exit + Channel *channel = channel_alloc(kChannelStreamInternal); + channel_incref(channel); // internal channel lives until process exit + rpc_start(channel); return channel->id; } @@ -347,8 +290,8 @@ void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); - channel->closed = true; - decref(channel); + // channel_decref(channel); remove?? + channel->rpc.closed = true; } // rstream.c:read_event() invokes this as stream->read_cb(). @@ -356,7 +299,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { close_channel(channel); @@ -367,30 +310,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); - DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); parse_msgpack(channel); end: - decref(channel); + channel_decref(channel); } static void parse_msgpack(Channel *channel) @@ -400,8 +332,8 @@ static void parse_msgpack(Channel *channel) msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -427,7 +359,7 @@ static void parse_msgpack(Channel *channel) if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -492,7 +424,7 @@ static void handle_request(Channel *channel, msgpack_object *request) evdata->handler = handler; evdata->args = args; evdata->request_id = request_id; - incref(channel); + channel_incref(channel); if (handler.async) { bool is_get_mode = handler.fn == handle_nvim_get_mode; @@ -530,66 +462,30 @@ static void on_request_event(void **argv) api_free_object(result); } api_free_array(args); - decref(channel); + channel_decref(channel); xfree(e); api_clear_error(&error); } -/// Returns the Stream that a Channel writes to. -static Stream *chan_wstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->in; - case kChannelTypeStdio: - return &chan->data.std.out; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { - bool success = false; + bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - case kChannelTypeProc: - case kChannelTypeStdio: - success = wstream_write(chan_wstream(channel), buffer); - break; - case kChannelTypeInternal: - incref(channel); - CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); - success = true; - break; + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; @@ -609,14 +505,14 @@ static void internal_read_event(void **argv) Channel *channel = argv[0]; WBuffer *buffer = argv[1]; - msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->unpacker), + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); parse_msgpack(channel); - decref(channel); + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -665,7 +561,8 @@ static void broadcast_event(const char *name, Array args) Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -695,10 +592,11 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -709,86 +607,65 @@ static void unsubscribe(Channel *channel, char *event) } /// Close the channel streams/process and free the channel resources. +/// TODO: move to channel.h static void close_channel(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL, NULL); + switch (channel->streamtype) { + case kChannelStreamSocket: + stream_close(&channel->stream.socket, NULL, NULL); break; - case kChannelTypeProc: + case kChannelStreamProc: // Only close the rpc channel part, // there could be an error message on the stderr stream - process_close_in(channel->data.proc); - process_close_out(channel->data.proc); + process_close_in(&channel->stream.proc); + process_close_out(&channel->stream.proc); break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL, NULL); - stream_close(&channel->data.std.out, NULL, NULL); + case kChannelStreamStdio: + stream_close(&channel->stream.stdio.in, NULL, NULL); + stream_close(&channel->stream.stdio.out, NULL, NULL); multiqueue_put(main_loop.fast_events, exit_event, 1, channel); return; - case kChannelTypeInternal: + case kChannelStreamInternal: // nothing to free. break; } - decref(channel); + channel_decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); + channel_decref(argv[0]); if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - if (channel->type != kChannelTypeProc) { - multiqueue_free(channel->events); - } - xfree(channel); + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); } static void close_cb(Stream *stream, void *data) { - decref(data); -} - -static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = events ? events : multiqueue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = id > 0 ? id : next_chan_id++; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + channel_decref(data); } static bool is_rpc_response(msgpack_object *obj) @@ -803,15 +680,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -825,8 +705,8 @@ static void complete_call(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; api_free_object(frame->result); @@ -875,18 +755,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void incref(Channel *channel) -{ - channel->refcount++; -} - -static void decref(Channel *channel) -{ - if (!(--channel->refcount)) { - free_channel(channel); - } -} - #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL #define REQ "[request] " #define RES "[response] " -- cgit From 1ebc96fe10fbdbec22caa26d5d52a9f095da9687 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Mon, 5 Jun 2017 08:29:10 +0200 Subject: channels: allow bytes sockets and stdio, and buffered bytes output --- src/nvim/msgpack_rpc/channel.c | 87 ++++-------------------------------------- 1 file changed, 8 insertions(+), 79 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 42b1f63830..56af4fa791 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -13,7 +13,6 @@ #include "nvim/api/ui.h" #include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" -#include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" #include "nvim/event/rstream.h" @@ -30,12 +29,9 @@ #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" -#include "nvim/path.h" #include "nvim/lib/kvec.h" #include "nvim/os/input.h" -#define CHANNEL_BUFFER_SIZE 0xffff - #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) @@ -66,57 +62,18 @@ void rpc_start(Channel *channel) rpc->next_request_id = 1; kv_init(rpc->call_stack); - Stream *in = channel_instream(channel); - Stream *out = channel_outstream(channel); - - DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); - - wstream_init(in, 0); - rstream_init(out, CHANNEL_BUFFER_SIZE); - rstream_start(out, receive_msgpack, channel); -} - -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) -{ - Channel *channel = channel_alloc(kChannelStreamSocket); - socket_watcher_accept(watcher, &channel->stream.socket); - channel_incref(channel); // close channel only after the stream is closed - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - rpc_start(channel); -} - -/// TODO: move to eval.c, also support bytes -uint64_t channel_connect(bool tcp, const char *address, - int timeout, const char **error) -{ - if (!tcp) { - char *path = fix_fname(address); - if (server_owns_pipe_address(path)) { - // avoid deadlock - xfree(path); - return channel_create_internal(); - } - xfree(path); - } + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - Channel *channel = channel_alloc(kChannelStreamSocket); - if (!socket_connect(&main_loop, &channel->stream.socket, - tcp, address, timeout, error)) { - channel_decref(channel); - return 0; + rstream_start(out, receive_msgpack, channel); } - - channel_incref(channel); // close channel only after the stream is closed - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - rpc_start(channel); - return channel->id; } + static Channel *find_rpc_channel(uint64_t id) { Channel *chan = find_channel(id); @@ -263,29 +220,6 @@ bool channel_close(uint64_t id) return true; } -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = channel_alloc(kChannelStreamStdio); - channel_incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); - wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); - - rpc_start(channel); -} - -/// Creates a loopback channel. This is used to avoid deadlock -/// when an instance connects to its own named pipe. -uint64_t channel_create_internal(void) -{ - Channel *channel = channel_alloc(kChannelStreamInternal); - channel_incref(channel); // internal channel lives until process exit - rpc_start(channel); - return channel->id; -} - void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); @@ -663,11 +597,6 @@ void rpc_free(Channel *channel) kv_destroy(channel->rpc.call_stack); } -static void close_cb(Stream *stream, void *data) -{ - channel_decref(data); -} - static bool is_rpc_response(msgpack_object *obj) { return obj->type == MSGPACK_OBJECT_ARRAY -- cgit From 90e5cc5484ceeb410ae2a2706e09ed475cade4a5 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Thu, 8 Jun 2017 17:15:53 +0200 Subject: channels: generalize jobclose() --- src/nvim/msgpack_rpc/channel.c | 62 +++++++----------------------------------- 1 file changed, 10 insertions(+), 52 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 56af4fa791..32781cf4d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -54,6 +54,7 @@ void rpc_init(void) void rpc_start(Channel *channel) { + channel_incref(channel); channel->is_rpc = true; RpcState *rpc = &channel->rpc; rpc->closed = false; @@ -204,31 +205,6 @@ void rpc_unsubscribe(uint64_t id, char *event) unsubscribe(channel, event); } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = find_rpc_channel(id))) { - return false; - } - - close_channel(channel); - return true; -} - -void channel_process_exit(uint64_t id, int status) -{ - Channel *channel = pmap_get(uint64_t)(channels, id); - - // channel_decref(channel); remove?? - channel->rpc.closed = true; -} - -// rstream.c:read_event() invokes this as stream->read_cb(). static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { @@ -236,7 +212,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, channel_incref(channel); if (eof) { - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); @@ -540,43 +516,25 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/process and free the channel resources. -/// TODO: move to channel.h -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel) { if (channel->rpc.closed) { return; } channel->rpc.closed = true; + channel_decref(channel); - switch (channel->streamtype) { - case kChannelStreamSocket: - stream_close(&channel->stream.socket, NULL, NULL); - break; - case kChannelStreamProc: - // Only close the rpc channel part, - // there could be an error message on the stderr stream - process_close_in(&channel->stream.proc); - process_close_out(&channel->stream.proc); - break; - case kChannelStreamStdio: - stream_close(&channel->stream.stdio.in, NULL, NULL); - stream_close(&channel->stream.stdio.out, NULL, NULL); - multiqueue_put(main_loop.fast_events, exit_event, 1, channel); - return; - case kChannelStreamInternal: - // nothing to free. - break; + if (channel->streamtype == kChannelStreamStdio) { + multiqueue_put(main_loop.fast_events, exit_event, 0); } - - channel_decref(channel); } static void exit_event(void **argv) { - channel_decref(argv[0]); - if (!exiting) { mch_exit(0); } @@ -642,7 +600,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel) frame->result = STRING_OBJ(cstr_to_string(msg)); } - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); } static WBuffer *serialize_request(uint64_t channel_id, -- cgit