diff options
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 109 |
1 files changed, 53 insertions, 56 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index dcd7e41737..83e7900a54 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -31,14 +31,14 @@ typedef struct { uint64_t request_id; - bool errored; + bool returned, errored; Object result; } ChannelCallFrame; typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job, enabled; + bool is_job, closed; msgpack_unpacker *unpacker; union { Job *job; @@ -50,7 +50,6 @@ typedef struct { } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; } Channel; static uint64_t next_id = 1; @@ -103,12 +102,12 @@ uint64_t channel_from_job(char **argv) channel, job_out, job_err, - NULL, + job_exit, 0, &status); if (status <= 0) { - close_channel(channel); + free_channel(channel); return 0; } @@ -141,7 +140,7 @@ bool channel_exists(uint64_t id) { Channel *channel; return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; + && !channel->closed; } /// Sends event/arguments to channel @@ -156,7 +155,7 @@ bool channel_send_event(uint64_t id, char *name, Array args) Channel *channel = NULL; if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_free_array(args); return false; } @@ -182,7 +181,7 @@ Object channel_send_call(uint64_t id, { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); api_free_array(args); return NIL; @@ -208,16 +207,14 @@ Object channel_send_call(uint64_t id, EventSource sources[] = {channel_source, NULL}; // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; + ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); do { event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return + } while (!frame.returned); + + (void)kv_pop(channel->call_stack); if (frame.errored) { api_set_error(err, Exception, "%s", frame.result.data.string.data); @@ -235,7 +232,7 @@ void channel_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -257,7 +254,7 @@ void channel_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -272,12 +269,11 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { return false; } - channel_kill(channel); - channel->enabled = false; + close_channel(channel); return true; } @@ -319,19 +315,16 @@ static void job_err(RStream *rstream, void *data, bool eof) } } +static void job_exit(Job *job, void *data) +{ + free_channel((Channel *)data); +} + static void parse_msgpack(RStream *rstream, void *data, bool eof) { Channel *channel = data; - channel->rpc_call_level++; if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); goto end; } @@ -354,7 +347,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) MSGPACK_UNPACK_SUCCESS) { if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); + complete_call(&unpacked.data, channel); } else { char buf[256]; snprintf(buf, @@ -397,10 +390,11 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); + if (eof && !channel->is_job && !kv_size(channel->call_stack)) { + // The free_channel call is deferred for jobs because it's possible that + // job_stderr will called after this. For non-job channels, this is the + // last callback so it must be freed now. + free_channel(channel); } } @@ -500,26 +494,11 @@ static void unsubscribe(Channel *channel, char *event) free(event_string); } +/// Close the channel streams/job. The channel resources will be freed by +/// free_channel later. static void close_channel(Channel *channel) { - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ + channel->closed = true; if (channel->is_job) { if (channel->data.job) { job_stop(channel->data.job); @@ -536,6 +515,22 @@ static void channel_kill(Channel *channel) } } +static void free_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + free(channel); +} + static void close_cb(uv_handle_t *handle) { free(handle->data); @@ -545,8 +540,7 @@ static void close_cb(uv_handle_t *handle) static Channel *register_channel(void) { Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; + rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); @@ -573,9 +567,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) kv_size(channel->call_stack) - 1)->request_id; } -static void call_stack_pop(msgpack_object *obj, Channel *channel) +static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; if (frame->errored) { @@ -588,10 +584,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg) { for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, i); + frame->returned = true; frame->errored = true; frame->result = STRING_OBJ(cstr_to_string(msg)); } - channel->enabled = false; + close_channel(channel); } |