From ccdeb91e1206f38773664979bf03694213a2ba80 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 09:55:31 -0300 Subject: msgpack: Replace FUNC_ATTR_DEFERRED by FUNC_ATTR_ASYNC API functions exposed via msgpack-rpc now fall into two categories: - async functions, which are executed as soon as the request is parsed - sync functions, which are invoked in nvim main loop when processing the `K_EVENT special key Only a few functions which can be safely executed in any context are marked as async. --- src/nvim/msgpack_rpc/channel.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index eee662dd9c..ab81e3194c 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -450,16 +450,16 @@ static void handle_request(Channel *channel, msgpack_object *request) method->via.bin.size); } else { handler.fn = msgpack_rpc_handle_missing_method; - handler.defer = false; + handler.async = true; } Array args = ARRAY_DICT_INIT; if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { handler.fn = msgpack_rpc_handle_invalid_arguments; - handler.defer = false; + handler.async = true; } - bool defer = (!kv_size(channel->call_stack) && handler.defer); + bool async = kv_size(channel->call_stack) || handler.async; RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; @@ -469,7 +469,7 @@ static void handle_request(Channel *channel, msgpack_object *request) loop_push_event(&loop, (Event) { .handler = on_request_event, .data = event_data - }, defer); + }, !async); } static void on_request_event(Event event) -- cgit From 696f9c2759b078f749625d167f3424915586108d Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 11:56:44 -0300 Subject: process: Pass loop reference during initialization Change the API so that it is passed to {uv,pty}_process_init instead of `process_spawn`. --- src/nvim/msgpack_rpc/channel.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ab81e3194c..ca08af1fe8 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -123,14 +123,14 @@ void channel_teardown(void) uint64_t channel_from_process(char **argv) { Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = uv_process_init(channel); + channel->data.process.uvproc = uv_process_init(&loop, channel); Process *proc = &channel->data.process.uvproc.process; proc->argv = argv; proc->in = &channel->data.process.in; proc->out = &channel->data.process.out; proc->err = &channel->data.process.err; proc->cb = process_exit; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { loop_poll_events(&loop, 0); decref(channel); return 0; -- cgit From 502aee690c980fcb3cfcb3f211dcfad06103db46 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 7 Aug 2015 22:54:02 -0300 Subject: event: Refactor async event processing - Improve the implementation of deferred/immediate events. - Use the new queue module to change how/when events are queued/processed by giving a private queue to each emitter. - Immediate events(which only exist to break uv_run recursion) are now represented in the `loop->fast_events` queue. - Events pushed to child queues are propagated to the event loop main queue and processed as K_EVENT keys. --- src/nvim/msgpack_rpc/channel.c | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ca08af1fe8..6674f3c4e4 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -68,6 +68,7 @@ typedef struct { uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; kvec_t(WBuffer *) delayed_notifications; + Queue *events; } Channel; typedef struct { @@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); + LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -459,22 +460,22 @@ static void handle_request(Channel *channel, msgpack_object *request) handler.async = true; } - bool async = kv_size(channel->call_stack) || handler.async; RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; event_data->args = args; event_data->request_id = request_id; incref(channel); - loop_push_event(&loop, (Event) { - .handler = on_request_event, - .data = event_data - }, !async); + if (handler.async) { + on_request_event((void **)&event_data); + } else { + queue_put(channel->events, on_request_event, 1, event_data); + } } -static void on_request_event(Event event) +static void on_request_event(void **argv) { - RequestEvent *e = event.data; + RequestEvent *e = argv[0]; Channel *channel = e->channel; MsgpackRpcRequestHandler handler = e->handler; Array args = e->args; @@ -649,9 +650,8 @@ static void close_channel(Channel *channel) case kChannelTypeStdio: stream_close(&channel->data.std.in, NULL); stream_close(&channel->data.std.out, NULL); - loop_push_event(&loop, - (Event) { .handler = on_stdio_close, .data = channel }, false); - break; + queue_put(loop.fast_events, exit_event, 1, channel); + return; default: abort(); } @@ -659,9 +659,9 @@ static void close_channel(Channel *channel) decref(channel); } -static void on_stdio_close(Event e) +static void exit_event(void **argv) { - decref(e.data); + decref(argv[0]); if (!exiting) { mch_exit(0); @@ -683,6 +683,7 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); + queue_free(channel->events); xfree(channel); } @@ -694,6 +695,7 @@ static void close_cb(Stream *stream, void *data) static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->events = queue_new_child(loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; -- cgit From 6b3cd381dcd01268479dc56103498a029133644d Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Wed, 12 Aug 2015 19:16:06 -0300 Subject: rstream: Pass read count to read events This is necessary to keep events in the same order received from the OS. --- src/nvim/msgpack_rpc/channel.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6674f3c4e4..0e3b8200c9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -328,7 +328,8 @@ static void channel_from_stdio(void) wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, + void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; @@ -343,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data) decref(data); } -static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, + bool eof) { Channel *channel = data; incref(channel); -- cgit