aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c28
1 files changed, 15 insertions, 13 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index ca08af1fe8..6674f3c4e4 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -68,6 +68,7 @@ typedef struct {
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
+ Queue *events;
} Channel;
typedef struct {
@@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id,
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
channel->pending_requests++;
- LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned);
+ LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@@ -459,22 +460,22 @@ static void handle_request(Channel *channel, msgpack_object *request)
handler.async = true;
}
- bool async = kv_size(channel->call_stack) || handler.async;
RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
incref(channel);
- loop_push_event(&loop, (Event) {
- .handler = on_request_event,
- .data = event_data
- }, !async);
+ if (handler.async) {
+ on_request_event((void **)&event_data);
+ } else {
+ queue_put(channel->events, on_request_event, 1, event_data);
+ }
}
-static void on_request_event(Event event)
+static void on_request_event(void **argv)
{
- RequestEvent *e = event.data;
+ RequestEvent *e = argv[0];
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
@@ -649,9 +650,8 @@ static void close_channel(Channel *channel)
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL);
stream_close(&channel->data.std.out, NULL);
- loop_push_event(&loop,
- (Event) { .handler = on_stdio_close, .data = channel }, false);
- break;
+ queue_put(loop.fast_events, exit_event, 1, channel);
+ return;
default:
abort();
}
@@ -659,9 +659,9 @@ static void close_channel(Channel *channel)
decref(channel);
}
-static void on_stdio_close(Event e)
+static void exit_event(void **argv)
{
- decref(e.data);
+ decref(argv[0]);
if (!exiting) {
mch_exit(0);
@@ -683,6 +683,7 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
+ queue_free(channel->events);
xfree(channel);
}
@@ -694,6 +695,7 @@ static void close_cb(Stream *stream, void *data)
static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
+ rv->events = queue_new_child(loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;