aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-08-13 12:20:53 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-08-13 12:20:53 -0300
commita94a68145b3c607d1e58f708ded8fe625c9973d5 (patch)
tree2d69f4f3a06f0ac5a4e936ec40bde81a26cf2b53 /src/nvim/msgpack_rpc/channel.c
parent6bf322c6ff190b9f10c5286b3ae6fceedfbddb61 (diff)
parentf1de097dbb236ea400150f80b909407ca9af7441 (diff)
downloadrneovim-a94a68145b3c607d1e58f708ded8fe625c9973d5.tar.gz
rneovim-a94a68145b3c607d1e58f708ded8fe625c9973d5.tar.bz2
rneovim-a94a68145b3c607d1e58f708ded8fe625c9973d5.zip
Merge PR #3029 'Refactor event processing architecture'
Helped-by: oni-link <knil.ino@gmail.com> Reviewed-by: oni-link <knil.ino@gmail.com>
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c42
1 files changed, 23 insertions, 19 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index eee662dd9c..0e3b8200c9 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -68,6 +68,7 @@ typedef struct {
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
+ Queue *events;
} Channel;
typedef struct {
@@ -123,14 +124,14 @@ void channel_teardown(void)
uint64_t channel_from_process(char **argv)
{
Channel *channel = register_channel(kChannelTypeProc);
- channel->data.process.uvproc = uv_process_init(channel);
+ channel->data.process.uvproc = uv_process_init(&loop, channel);
Process *proc = &channel->data.process.uvproc.process;
proc->argv = argv;
proc->in = &channel->data.process.in;
proc->out = &channel->data.process.out;
proc->err = &channel->data.process.err;
proc->cb = process_exit;
- if (!process_spawn(&loop, proc)) {
+ if (!process_spawn(proc)) {
loop_poll_events(&loop, 0);
decref(channel);
return 0;
@@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id,
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
channel->pending_requests++;
- LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned);
+ LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@@ -327,7 +328,8 @@ static void channel_from_stdio(void)
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
-static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
+static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
+ void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
@@ -342,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data)
decref(data);
}
-static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)
+static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
+ bool eof)
{
Channel *channel = data;
incref(channel);
@@ -450,31 +453,31 @@ static void handle_request(Channel *channel, msgpack_object *request)
method->via.bin.size);
} else {
handler.fn = msgpack_rpc_handle_missing_method;
- handler.defer = false;
+ handler.async = true;
}
Array args = ARRAY_DICT_INIT;
if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
handler.fn = msgpack_rpc_handle_invalid_arguments;
- handler.defer = false;
+ handler.async = true;
}
- bool defer = (!kv_size(channel->call_stack) && handler.defer);
RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
incref(channel);
- loop_push_event(&loop, (Event) {
- .handler = on_request_event,
- .data = event_data
- }, defer);
+ if (handler.async) {
+ on_request_event((void **)&event_data);
+ } else {
+ queue_put(channel->events, on_request_event, 1, event_data);
+ }
}
-static void on_request_event(Event event)
+static void on_request_event(void **argv)
{
- RequestEvent *e = event.data;
+ RequestEvent *e = argv[0];
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
@@ -649,9 +652,8 @@ static void close_channel(Channel *channel)
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL);
stream_close(&channel->data.std.out, NULL);
- loop_push_event(&loop,
- (Event) { .handler = on_stdio_close, .data = channel }, false);
- break;
+ queue_put(loop.fast_events, exit_event, 1, channel);
+ return;
default:
abort();
}
@@ -659,9 +661,9 @@ static void close_channel(Channel *channel)
decref(channel);
}
-static void on_stdio_close(Event e)
+static void exit_event(void **argv)
{
- decref(e.data);
+ decref(argv[0]);
if (!exiting) {
mch_exit(0);
@@ -683,6 +685,7 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
+ queue_free(channel->events);
xfree(channel);
}
@@ -694,6 +697,7 @@ static void close_cb(Stream *stream, void *data)
static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
+ rv->events = queue_new_child(loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;