diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 861614f147..0e3b8200c9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -68,6 +68,7 @@ typedef struct { uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; kvec_t(WBuffer *) delayed_notifications; + Queue *events; } Channel; typedef struct { @@ -123,14 +124,14 @@ void channel_teardown(void) uint64_t channel_from_process(char **argv) { Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = uv_process_init(channel); + channel->data.process.uvproc = uv_process_init(&loop, channel); Process *proc = &channel->data.process.uvproc.process; proc->argv = argv; proc->in = &channel->data.process.in; proc->out = &channel->data.process.out; proc->err = &channel->data.process.err; proc->cb = process_exit; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { loop_poll_events(&loop, 0); decref(channel); return 0; @@ -153,6 +154,9 @@ void channel_from_connection(SocketWatcher *watcher) { Channel *channel = register_channel(kChannelTypeSocket); socket_watcher_accept(watcher, &channel->data.stream, channel); + incref(channel); // close channel only after the stream is closed + channel->data.stream.internal_close_cb = close_cb; + channel->data.stream.internal_data = channel; wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); rstream_start(&channel->data.stream, parse_msgpack); @@ -221,7 +225,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); + LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -324,7 +328,8 @@ static void channel_from_stdio(void) wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, + void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; @@ -339,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data) decref(data); } -static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, + bool eof) { Channel *channel = data; incref(channel); @@ -447,31 +453,31 @@ static void handle_request(Channel *channel, msgpack_object *request) method->via.bin.size); } else { handler.fn = msgpack_rpc_handle_missing_method; - handler.defer = false; + handler.async = true; } Array args = ARRAY_DICT_INIT; if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { handler.fn = msgpack_rpc_handle_invalid_arguments; - handler.defer = false; + handler.async = true; } - bool defer = (!kv_size(channel->call_stack) && handler.defer); RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; event_data->args = args; event_data->request_id = request_id; incref(channel); - loop_push_event(&loop, (Event) { - .handler = on_request_event, - .data = event_data - }, defer); + if (handler.async) { + on_request_event((void **)&event_data); + } else { + queue_put(channel->events, on_request_event, 1, event_data); + } } -static void on_request_event(Event event) +static void on_request_event(void **argv) { - RequestEvent *e = event.data; + RequestEvent *e = argv[0]; Channel *channel = e->channel; MsgpackRpcRequestHandler handler = e->handler; Array args = e->args; @@ -636,7 +642,7 @@ static void close_channel(Channel *channel) switch (channel->type) { case kChannelTypeSocket: - stream_close(&channel->data.stream, close_cb); + stream_close(&channel->data.stream, NULL); break; case kChannelTypeProc: if (!channel->data.process.uvproc.process.closed) { @@ -646,9 +652,8 @@ static void close_channel(Channel *channel) case kChannelTypeStdio: stream_close(&channel->data.std.in, NULL); stream_close(&channel->data.std.out, NULL); - loop_push_event(&loop, - (Event) { .handler = on_stdio_close, .data = channel }, false); - break; + queue_put(loop.fast_events, exit_event, 1, channel); + return; default: abort(); } @@ -656,9 +661,9 @@ static void close_channel(Channel *channel) decref(channel); } -static void on_stdio_close(Event e) +static void exit_event(void **argv) { - decref(e.data); + decref(argv[0]); if (!exiting) { mch_exit(0); @@ -680,17 +685,19 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); + queue_free(channel->events); xfree(channel); } static void close_cb(Stream *stream, void *data) { - xfree(data); + decref(data); } static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->events = queue_new_child(loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; |