aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nvim/os/channel.c54
1 files changed, 22 insertions, 32 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 439487f30b..efe628098c 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -143,7 +143,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
Channel *channel = NULL;
if (id > 0) {
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@@ -163,7 +163,7 @@ bool channel_send_call(uint64_t id,
{
Channel *channel = NULL;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@@ -178,6 +178,8 @@ bool channel_send_call(uint64_t id,
"while processing a RPC call",
channel->id);
*result = STRING_OBJ(cstr_to_string(buf));
+ msgpack_rpc_free_object(arg);
+ return false;
}
uint64_t request_id = channel->next_request_id++;
@@ -235,7 +237,7 @@ void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@@ -257,7 +259,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@@ -286,7 +288,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
- disable_channel(channel, buf);
+ call_set_error(channel, buf);
return;
}
@@ -316,7 +318,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
" a matching id for the current RPC call. Ensure the client "
" is properly synchronized",
channel->id);
- call_stack_unwind(channel, buf, 1);
+ call_set_error(channel, buf);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
@@ -369,7 +371,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed due to a failed write",
channel->id);
- disable_channel(channel, buf);
+ call_set_error(channel, buf);
}
return success;
@@ -450,6 +452,15 @@ static void close_channel(Channel *channel)
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
+ // Unsubscribe from all events
+ char *event_string;
+ map_foreach_value(channel->subscribed_events, event_string, {
+ unsubscribe(channel, event_string);
+ });
+
+ pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
+
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
@@ -460,14 +471,6 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
- // Unsubscribe from all events
- char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
- unsubscribe(channel, event_string);
- });
-
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
free(channel);
}
@@ -510,10 +513,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
static void call_stack_pop(msgpack_object *obj, Channel *channel)
{
- ChannelCallFrame *frame = kv_A(channel->call_stack,
- kv_size(channel->call_stack) - 1);
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
- (void)kv_pop(channel->call_stack);
if (frame->errored) {
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
@@ -522,24 +523,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
}
}
-static void call_stack_unwind(Channel *channel, char *msg, int count)
+static void call_set_error(Channel *channel, char *msg)
{
- while (kv_size(channel->call_stack) && count--) {
+ for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
-}
-static void disable_channel(Channel *channel, char *msg)
-{
- if (kv_size(channel->call_stack)) {
- // Channel is currently in the middle of a call, remove all frames and mark
- // it as "dead"
- channel->enabled = false;
- call_stack_unwind(channel, msg, -1);
- } else {
- // Safe to close it now
- close_channel(channel);
- }
+ channel->enabled = false;
}