aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c62
1 files changed, 10 insertions, 52 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 56af4fa791..32781cf4d9 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -54,6 +54,7 @@ void rpc_init(void)
void rpc_start(Channel *channel)
{
+ channel_incref(channel);
channel->is_rpc = true;
RpcState *rpc = &channel->rpc;
rpc->closed = false;
@@ -204,31 +205,6 @@ void rpc_unsubscribe(uint64_t id, char *event)
unsubscribe(channel, event);
}
-/// Closes a channel
-///
-/// @param id The channel id
-/// @return true if successful, false otherwise
-bool channel_close(uint64_t id)
-{
- Channel *channel;
-
- if (!(channel = find_rpc_channel(id))) {
- return false;
- }
-
- close_channel(channel);
- return true;
-}
-
-void channel_process_exit(uint64_t id, int status)
-{
- Channel *channel = pmap_get(uint64_t)(channels, id);
-
- // channel_decref(channel); remove??
- channel->rpc.closed = true;
-}
-
-// rstream.c:read_event() invokes this as stream->read_cb().
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
void *data, bool eof)
{
@@ -236,7 +212,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
channel_incref(channel);
if (eof) {
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
@@ -540,43 +516,25 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/process and free the channel resources.
-/// TODO: move to channel.h
-static void close_channel(Channel *channel)
+
+/// Mark rpc state as closed, and release its reference to the channel.
+/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
+void rpc_close(Channel *channel)
{
if (channel->rpc.closed) {
return;
}
channel->rpc.closed = true;
+ channel_decref(channel);
- switch (channel->streamtype) {
- case kChannelStreamSocket:
- stream_close(&channel->stream.socket, NULL, NULL);
- break;
- case kChannelStreamProc:
- // Only close the rpc channel part,
- // there could be an error message on the stderr stream
- process_close_in(&channel->stream.proc);
- process_close_out(&channel->stream.proc);
- break;
- case kChannelStreamStdio:
- stream_close(&channel->stream.stdio.in, NULL, NULL);
- stream_close(&channel->stream.stdio.out, NULL, NULL);
- multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
- return;
- case kChannelStreamInternal:
- // nothing to free.
- break;
+ if (channel->streamtype == kChannelStreamStdio) {
+ multiqueue_put(main_loop.fast_events, exit_event, 0);
}
-
- channel_decref(channel);
}
static void exit_event(void **argv)
{
- channel_decref(argv[0]);
-
if (!exiting) {
mch_exit(0);
}
@@ -642,7 +600,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
frame->result = STRING_OBJ(cstr_to_string(msg));
}
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
}
static WBuffer *serialize_request(uint64_t channel_id,