diff options
| author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-08-07 22:54:02 -0300 |
|---|---|---|
| committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-08-13 08:49:38 -0300 |
| commit | 502aee690c980fcb3cfcb3f211dcfad06103db46 (patch) | |
| tree | 803dbcccaa874b78cbdfeacc74b7cc891e09f89a /src/nvim/event | |
| parent | a6e0d35d2da3ee4270ddb712410ea0c8c55b0f0f (diff) | |
| download | rneovim-502aee690c980fcb3cfcb3f211dcfad06103db46.tar.gz rneovim-502aee690c980fcb3cfcb3f211dcfad06103db46.tar.bz2 rneovim-502aee690c980fcb3cfcb3f211dcfad06103db46.zip | |
event: Refactor async event processing
- Improve the implementation of deferred/immediate events.
- Use the new queue module to change how/when events are queued/processed by
giving a private queue to each emitter.
- Immediate events(which only exist to break uv_run recursion) are now
represented in the `loop->fast_events` queue.
- Events pushed to child queues are propagated to the event loop main queue and
processed as K_EVENT keys.
Diffstat (limited to 'src/nvim/event')
| -rw-r--r-- | src/nvim/event/loop.c | 39 | ||||
| -rw-r--r-- | src/nvim/event/loop.h | 36 | ||||
| -rw-r--r-- | src/nvim/event/process.c | 75 | ||||
| -rw-r--r-- | src/nvim/event/process.h | 2 | ||||
| -rw-r--r-- | src/nvim/event/rstream.c | 9 | ||||
| -rw-r--r-- | src/nvim/event/signal.c | 9 | ||||
| -rw-r--r-- | src/nvim/event/signal.h | 1 | ||||
| -rw-r--r-- | src/nvim/event/socket.c | 11 | ||||
| -rw-r--r-- | src/nvim/event/socket.h | 1 | ||||
| -rw-r--r-- | src/nvim/event/stream.c | 10 | ||||
| -rw-r--r-- | src/nvim/event/stream.h | 1 | ||||
| -rw-r--r-- | src/nvim/event/time.c | 9 | ||||
| -rw-r--r-- | src/nvim/event/time.h | 1 |
13 files changed, 125 insertions, 79 deletions
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 67572c4f30..1a50ec0d9a 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -1,3 +1,4 @@ +#include <stdarg.h> #include <stdint.h> #include <uv.h> @@ -9,15 +10,20 @@ # include "event/loop.c.generated.h" #endif +typedef struct idle_event { + uv_idle_t idle; + Event event; +} IdleEvent; + void loop_init(Loop *loop, void *data) { uv_loop_init(&loop->uv); loop->uv.data = loop; - loop->deferred_events = kl_init(Event); - loop->immediate_events = kl_init(Event); loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; + loop->events = queue_new_parent(on_put, loop); + loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); uv_timer_init(&loop->uv, &loop->poll_timer); @@ -50,29 +56,20 @@ void loop_poll_events(Loop *loop, int ms) } recursive--; // Can re-enter uv_run now - process_events_from(loop->immediate_events); + queue_process_events(loop->fast_events); } - -// Queue an event -void loop_push_event(Loop *loop, Event event, bool deferred) +static void on_put(Queue *queue, void *data) { + Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before // blocking for a poll. If this happens and the callback pushes a event to one // of the queues, the event would only be processed after the poll // returns(user hits a key for example). To avoid this scenario, we call // uv_stop when a event is enqueued. uv_stop(&loop->uv); - kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, - event); } -void loop_process_event(Loop *loop) -{ - process_events_from(loop->deferred_events); -} - - void loop_close(Loop *loop) { uv_close((uv_handle_t *)&loop->children_watcher, NULL); @@ -83,20 +80,6 @@ void loop_close(Loop *loop) } while (uv_loop_close(&loop->uv)); } -void loop_process_all_events(Loop *loop) -{ - process_events_from(loop->immediate_events); - process_events_from(loop->deferred_events); -} - -static void process_events_from(klist_t(Event) *queue) -{ - while (!kl_empty(queue)) { - Event event = kl_shift(Event, queue); - event.handler(event); - } -} - static void timer_cb(uv_timer_t *handle) { } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 340e096fec..9212a45aa4 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -7,37 +7,39 @@ #include "nvim/lib/klist.h" #include "nvim/os/time.h" - -typedef struct event Event; -typedef void (*event_handler)(Event event); - -struct event { - void *data; - event_handler handler; -}; +#include "nvim/event/queue.h" typedef void * WatcherPtr; #define _noop(x) KLIST_INIT(WatcherPtr, WatcherPtr, _noop) -KLIST_INIT(Event, Event, _noop) typedef struct loop { uv_loop_t uv; - klist_t(Event) *deferred_events, *immediate_events; + Queue *events, *fast_events; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; uv_timer_t children_kill_timer, poll_timer; size_t children_stop_requests; } Loop; +#define CREATE_EVENT(queue, handler, argc, ...) \ + do { \ + if (queue) { \ + queue_put((queue), (handler), argc, __VA_ARGS__); \ + } else { \ + void *argv[argc] = {__VA_ARGS__}; \ + (handler)(argv); \ + } \ + } while (0) + // Poll for events until a condition or timeout -#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \ +#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \ do { \ int remaining = timeout; \ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ while (!(condition)) { \ - loop_poll_events(loop, remaining); \ + LOOP_PROCESS_EVENTS(loop, queue, remaining); \ if (remaining == 0) { \ break; \ } else if (remaining > 0) { \ @@ -51,6 +53,16 @@ typedef struct loop { } \ } while (0) +#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \ + do { \ + if (queue && !queue_empty(queue)) { \ + queue_process_events(queue); \ + } else { \ + loop_poll_events(loop, timeout); \ + } \ + } while (0) + + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.h.generated.h" #endif diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index c7360b8614..54dbc11a03 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -22,7 +22,7 @@ #define TERM_TIMEOUT 1000000000 #define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define CLOSE_PROC_STREAM(proc, stream) \ +#define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ stream_close(proc->stream, NULL); \ @@ -76,6 +76,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->in) { stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -83,6 +84,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->out) { stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -90,6 +92,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->err) { stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -112,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL } // Wait until all children exit - LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children)); pty_process_teardown(loop); } @@ -154,11 +157,15 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // The default status is -1, which represents a timeout int status = -1; bool interrupted = false; + if (!proc->refcount) { + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + return proc->status; + } // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -170,12 +177,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL got_int = false; process_stop(proc); if (ms == -1) { - // We can only return, if all streams/handles are closed and the job - + // We can only return if all streams/handles are closed and the job // exited. - LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, -1, + proc->refcount == 1); } else { - loop_poll_events(proc->loop, 0); + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); } } @@ -184,6 +191,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); + if (proc->events) { + // the decref call created an exit event, process it now + queue_process_events(proc->events); + } } else { proc->refcount--; } @@ -249,6 +260,18 @@ static void children_kill_cb(uv_timer_t *handle) } } +static void process_close_event(void **argv) +{ + Process *proc = argv[0]; + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + static void decref(Process *proc) { if (--proc->refcount != 0) { @@ -263,16 +286,9 @@ static void decref(Process *proc) break; } } - assert(node); kl_shift_at(WatcherPtr, loop->children, node); - shell_free_argv(proc->argv); - if (proc->type == kProcessTypePty) { - xfree(((PtyProcess *)proc)->term_name); - } - if (proc->cb) { - proc->cb(proc, proc->status, proc->data); - } + CREATE_EVENT(proc->events, process_close_event, 1, proc); } static void process_close(Process *proc) @@ -292,28 +308,27 @@ static void process_close(Process *proc) } } -static void on_process_exit(Process *proc) +static void process_close_handles(void **argv) { - if (exiting) { - on_process_exit_event((Event) {.data = proc}); - } else { - loop_push_event(proc->loop, - (Event) {.handler = on_process_exit_event, .data = proc}, false); - } + Process *proc = argv[0]; + process_close_streams(proc); + process_close(proc); +} +static void on_process_exit(Process *proc) +{ Loop *loop = proc->loop; - if (loop->children_stop_requests && !--loop->children_stop_requests) { + if (proc->stopped_time && loop->children_stop_requests + && !--loop->children_stop_requests) { // Stop the timer if no more stop requests are pending DLOG("Stopping process kill timer"); uv_timer_stop(&loop->children_kill_timer); } -} - -static void on_process_exit_event(Event event) -{ - Process *proc = event.data; - process_close_streams(proc); - process_close(proc); + // Process handles are closed in the next event loop tick. This is done to + // give libuv more time to read data from the OS after the process exits(If + // process_close_streams is called with data still in the OS buffer, we lose + // it) + CREATE_EVENT(proc->events, process_close_handles, 1, proc); } static void on_process_stream_close(Stream *stream, void *data) diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 7ef2b24b7f..45edc46b95 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,7 @@ struct process { process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; bool closed, term_sent; + Queue *events; }; static inline Process process_init(Loop *loop, ProcessType type, void *data) @@ -34,6 +35,7 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .type = type, .data = data, .loop = loop, + .events = NULL, .pid = 0, .status = 0, .refcount = 0, diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 7283cca02b..94853f616a 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -169,9 +169,16 @@ static void fread_idle_cb(uv_idle_t *handle) invoke_read_cb(stream, false); } -static void invoke_read_cb(Stream *stream, bool eof) +static void read_event(void **argv) { + Stream *stream = argv[0]; if (stream->read_cb) { + bool eof = (uintptr_t)argv[1]; stream->read_cb(stream, stream->buffer, stream->data, eof); } } + +static void invoke_read_cb(Stream *stream, bool eof) +{ + CREATE_EVENT(stream->events, read_event, 2, stream, (void *)(uintptr_t)eof); +} diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c index 63133b4f57..11ce15a882 100644 --- a/src/nvim/event/signal.c +++ b/src/nvim/event/signal.c @@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) watcher->uv.data = watcher; watcher->data = data; watcher->cb = NULL; + watcher->events = loop->fast_events; } void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) @@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void signal_event(void **argv) +{ + SignalWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->uv.signum, watcher->data); +} + static void signal_watcher_cb(uv_signal_t *handle, int signum) { SignalWatcher *watcher = handle->data; - watcher->cb(watcher, signum, watcher->data); + CREATE_EVENT(watcher->events, signal_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h index c269fa9d95..e32608acc0 100644 --- a/src/nvim/event/signal.h +++ b/src/nvim/event/signal.h @@ -14,6 +14,7 @@ struct signal_watcher { void *data; signal_cb cb; signal_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 2a618d290d..347e464d25 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher, watcher->stream->data = watcher; watcher->cb = NULL; watcher->close_cb = NULL; + watcher->events = NULL; } int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) @@ -143,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) uv_close((uv_handle_t *)watcher->stream, close_cb); } +static void connection_event(void **argv) +{ + SocketWatcher *watcher = argv[0]; + int status = (int)(uintptr_t)(argv[1]); + watcher->cb(watcher, status, watcher->data); +} + static void connection_cb(uv_stream_t *handle, int status) { SocketWatcher *watcher = handle->data; - watcher->cb(watcher, status, watcher->data); + CREATE_EVENT(watcher->events, connection_event, 2, watcher, + (void *)(uintptr_t)status); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index 17fd39f33b..ad59fdbe3a 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -30,6 +30,7 @@ struct socket_watcher { void *data; socket_cb cb; socket_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 72dabc3ce7..6caad6fdcc 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -56,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, if (stream->uvstream) { stream->uvstream->data = stream; + loop = stream->uvstream->loop->data; } stream->data = data; @@ -70,17 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->internal_close_cb = NULL; stream->closed = false; stream->buffer = NULL; + stream->events = NULL; } void stream_close(Stream *stream, stream_close_cb on_stream_close) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); - - if (stream->buffer) { - rbuffer_free(stream->buffer); - } - stream->closed = true; stream->close_cb = on_stream_close; @@ -102,6 +99,9 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + if (stream->buffer) { + rbuffer_free(stream->buffer); + } if (stream->close_cb) { stream->close_cb(stream, stream->data); } diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 37410b2036..eaf4b010f5 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -47,6 +47,7 @@ struct stream { size_t pending_reqs; void *data, *internal_data; bool closed; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c index ce33cdfc10..7bf333bcea 100644 --- a/src/nvim/event/time.c +++ b/src/nvim/event/time.c @@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) uv_timer_init(&loop->uv, &watcher->uv); watcher->uv.data = watcher; watcher->data = data; + watcher->events = loop->fast_events; } void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, @@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void time_event(void **argv) +{ + TimeWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->data); +} + static void time_watcher_cb(uv_timer_t *handle) FUNC_ATTR_NONNULL_ALL { TimeWatcher *watcher = handle->data; - watcher->cb(watcher, watcher->data); + CREATE_EVENT(watcher->events, time_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h index ee50e53d11..7882b2b627 100644 --- a/src/nvim/event/time.h +++ b/src/nvim/event/time.h @@ -12,6 +12,7 @@ struct time_watcher { uv_timer_t uv; void *data; time_cb cb, close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS |