diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-08-07 22:54:02 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-08-13 08:49:38 -0300 |
commit | 502aee690c980fcb3cfcb3f211dcfad06103db46 (patch) | |
tree | 803dbcccaa874b78cbdfeacc74b7cc891e09f89a | |
parent | a6e0d35d2da3ee4270ddb712410ea0c8c55b0f0f (diff) | |
download | rneovim-502aee690c980fcb3cfcb3f211dcfad06103db46.tar.gz rneovim-502aee690c980fcb3cfcb3f211dcfad06103db46.tar.bz2 rneovim-502aee690c980fcb3cfcb3f211dcfad06103db46.zip |
event: Refactor async event processing
- Improve the implementation of deferred/immediate events.
- Use the new queue module to change how/when events are queued/processed by
giving a private queue to each emitter.
- Immediate events(which only exist to break uv_run recursion) are now
represented in the `loop->fast_events` queue.
- Events pushed to child queues are propagated to the event loop main queue and
processed as K_EVENT keys.
-rw-r--r-- | src/nvim/edit.c | 4 | ||||
-rw-r--r-- | src/nvim/eval.c | 78 | ||||
-rw-r--r-- | src/nvim/event/loop.c | 39 | ||||
-rw-r--r-- | src/nvim/event/loop.h | 36 | ||||
-rw-r--r-- | src/nvim/event/process.c | 75 | ||||
-rw-r--r-- | src/nvim/event/process.h | 2 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 9 | ||||
-rw-r--r-- | src/nvim/event/signal.c | 9 | ||||
-rw-r--r-- | src/nvim/event/signal.h | 1 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 11 | ||||
-rw-r--r-- | src/nvim/event/socket.h | 1 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 10 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 1 | ||||
-rw-r--r-- | src/nvim/event/time.c | 9 | ||||
-rw-r--r-- | src/nvim/event/time.h | 1 | ||||
-rw-r--r-- | src/nvim/ex_getln.c | 2 | ||||
-rw-r--r-- | src/nvim/globals.h | 8 | ||||
-rw-r--r-- | src/nvim/main.c | 4 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 28 | ||||
-rw-r--r-- | src/nvim/normal.c | 2 | ||||
-rw-r--r-- | src/nvim/os/input.c | 6 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 13 | ||||
-rw-r--r-- | src/nvim/os/signal.c | 11 | ||||
-rw-r--r-- | src/nvim/os/time.c | 2 | ||||
-rw-r--r-- | src/nvim/terminal.c | 25 | ||||
-rw-r--r-- | src/nvim/tui/term_input.inl | 9 | ||||
-rw-r--r-- | src/nvim/tui/tui.c | 15 | ||||
-rw-r--r-- | src/nvim/ui.c | 14 |
28 files changed, 215 insertions, 210 deletions
diff --git a/src/nvim/edit.c b/src/nvim/edit.c index 3982e32c2f..6bcf5e804a 100644 --- a/src/nvim/edit.c +++ b/src/nvim/edit.c @@ -254,7 +254,7 @@ edit ( ) { if (curbuf->terminal) { - terminal_enter(true); + terminal_enter(); return false; } @@ -609,7 +609,7 @@ edit ( if (c == K_EVENT) { c = lastc; - loop_process_event(&loop); + queue_process_events(loop.events); continue; } diff --git a/src/nvim/eval.c b/src/nvim/eval.c index e2f095dbad..d61cccd41c 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -468,6 +468,7 @@ typedef struct { dict_T *self; int *status_ptr; uint64_t id; + Queue *events; } TerminalJobData; /// Structure representing current VimL to messagepack conversion state @@ -493,6 +494,13 @@ typedef struct { /// Stack used to convert VimL values to messagepack. typedef kvec_t(MPConvStackVal) MPConvStack; +typedef struct { + TerminalJobData *data; + ufunc_T *callback; + const char *type; + list_T *received; + int status; +} JobEvent; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "eval.c.generated.h" @@ -501,15 +509,6 @@ typedef kvec_t(MPConvStackVal) MPConvStack; #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -// Memory pool for reusing JobEvent structures -typedef struct { - TerminalJobData *data; - ufunc_T *callback; - const char *type; - list_T *received; - int status; -} JobEvent; -static int disable_job_defer = 0; static uint64_t current_job_id = 1; static PMap(uint64_t) *jobs = NULL; @@ -11038,15 +11037,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) list_T *rv = list_alloc(); ui_busy_start(); - // disable breakchecks, which could result in job callbacks being executed - // at unexpected places - disable_breakcheck++; - // disable job event deferring so the callbacks are processed while waiting. - if (!disable_job_defer++) { - // process any pending job events in the deferred queue, but only do this if - // deferred is not disabled(at the top-level `jobwait()` call) - loop_process_event(&loop); - } // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. @@ -11113,8 +11103,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // job exits data->status_ptr = NULL; } - disable_job_defer--; - disable_breakcheck--; ui_busy_stop(); rv->lv_refcount++; @@ -21053,6 +21041,7 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, data->on_stderr = on_stderr; data->on_exit = on_exit; data->self = self; + data->events = queue_new_child(loop.events); if (pty) { data->proc.pty = pty_process_init(&loop, data); } else { @@ -21064,6 +21053,7 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, proc->out = &data->out; proc->err = &data->err; proc->cb = on_process_exit; + proc->events = data->events; return data; } @@ -21118,7 +21108,9 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) return true; } -static inline void free_term_job_data(TerminalJobData *data) { +static inline void free_term_job_data_event(void **argv) +{ + TerminalJobData *data = argv[0]; if (data->on_stdout) { user_func_unref(data->on_stdout); } @@ -21133,17 +21125,25 @@ static inline void free_term_job_data(TerminalJobData *data) { data->self->internal_refcount--; dict_unref(data->self); } + queue_free(data->events); xfree(data); } +static inline void free_term_job_data(TerminalJobData *data) +{ + // data->queue may still be used after this function returns(process_wait), so + // only free in the next event loop iteration + queue_put(loop.fast_events, free_term_job_data_event, 1, data); +} + // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, +static inline void process_job_event(TerminalJobData *data, ufunc_T *callback, const char *type, char *buf, size_t count, int status) { - JobEvent *event_data = xmalloc(sizeof(JobEvent)); - event_data->received = NULL; + JobEvent event_data; + event_data.received = NULL; if (buf) { - event_data->received = list_alloc(); + event_data.received = list_alloc(); char *ptr = buf; size_t remaining = count; size_t off = 0; @@ -21151,7 +21151,7 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, while (off < remaining) { // append the line if (ptr[off] == NL) { - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); size_t skip = off + 1; ptr += skip; remaining -= skip; @@ -21164,17 +21164,14 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, } off++; } - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); } else { - event_data->status = status; + event_data.status = status; } - event_data->data = data; - event_data->callback = callback; - event_data->type = type; - loop_push_event(&loop, (Event) { - .handler = on_job_event, - .data = event_data - }, !disable_job_defer); + event_data.data = data; + event_data.callback = callback; + event_data.type = type; + on_job_event(&event_data); } static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof) @@ -21198,13 +21195,13 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, RBUFFER_UNTIL_EMPTY(buf, ptr, len) { // The order here matters, the terminal must receive the data first because - // push_job_event will modify the read buffer(convert NULs into NLs) + // process_job_event will modify the read buffer(convert NULs into NLs) if (data->term) { terminal_receive(data->term, ptr, len); } if (callback) { - push_job_event(data, callback, type, ptr, len, 0); + process_job_event(data, callback, type, ptr, len, 0); } rbuffer_consumed(buf, len); @@ -21224,7 +21221,7 @@ static void on_process_exit(Process *proc, int status, void *d) *data->status_ptr = status; } - push_job_event(data, data->on_exit, "exit", NULL, 0, status); + process_job_event(data, data->on_exit, "exit", NULL, 0, status); } static void term_write(char *buf, size_t size, void *d) @@ -21258,10 +21255,8 @@ static void term_job_data_decref(TerminalJobData *data) } } -static void on_job_event(Event event) +static void on_job_event(JobEvent *ev) { - JobEvent *ev = event.data; - if (!ev->callback) { goto end; } @@ -21306,7 +21301,6 @@ end: pmap_del(uint64_t)(jobs, ev->data->id); term_job_data_decref(ev->data); } - xfree(ev); } static TerminalJobData *find_job(uint64_t id) diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 67572c4f30..1a50ec0d9a 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -1,3 +1,4 @@ +#include <stdarg.h> #include <stdint.h> #include <uv.h> @@ -9,15 +10,20 @@ # include "event/loop.c.generated.h" #endif +typedef struct idle_event { + uv_idle_t idle; + Event event; +} IdleEvent; + void loop_init(Loop *loop, void *data) { uv_loop_init(&loop->uv); loop->uv.data = loop; - loop->deferred_events = kl_init(Event); - loop->immediate_events = kl_init(Event); loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; + loop->events = queue_new_parent(on_put, loop); + loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); uv_timer_init(&loop->uv, &loop->poll_timer); @@ -50,29 +56,20 @@ void loop_poll_events(Loop *loop, int ms) } recursive--; // Can re-enter uv_run now - process_events_from(loop->immediate_events); + queue_process_events(loop->fast_events); } - -// Queue an event -void loop_push_event(Loop *loop, Event event, bool deferred) +static void on_put(Queue *queue, void *data) { + Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before // blocking for a poll. If this happens and the callback pushes a event to one // of the queues, the event would only be processed after the poll // returns(user hits a key for example). To avoid this scenario, we call // uv_stop when a event is enqueued. uv_stop(&loop->uv); - kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, - event); } -void loop_process_event(Loop *loop) -{ - process_events_from(loop->deferred_events); -} - - void loop_close(Loop *loop) { uv_close((uv_handle_t *)&loop->children_watcher, NULL); @@ -83,20 +80,6 @@ void loop_close(Loop *loop) } while (uv_loop_close(&loop->uv)); } -void loop_process_all_events(Loop *loop) -{ - process_events_from(loop->immediate_events); - process_events_from(loop->deferred_events); -} - -static void process_events_from(klist_t(Event) *queue) -{ - while (!kl_empty(queue)) { - Event event = kl_shift(Event, queue); - event.handler(event); - } -} - static void timer_cb(uv_timer_t *handle) { } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 340e096fec..9212a45aa4 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -7,37 +7,39 @@ #include "nvim/lib/klist.h" #include "nvim/os/time.h" - -typedef struct event Event; -typedef void (*event_handler)(Event event); - -struct event { - void *data; - event_handler handler; -}; +#include "nvim/event/queue.h" typedef void * WatcherPtr; #define _noop(x) KLIST_INIT(WatcherPtr, WatcherPtr, _noop) -KLIST_INIT(Event, Event, _noop) typedef struct loop { uv_loop_t uv; - klist_t(Event) *deferred_events, *immediate_events; + Queue *events, *fast_events; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; uv_timer_t children_kill_timer, poll_timer; size_t children_stop_requests; } Loop; +#define CREATE_EVENT(queue, handler, argc, ...) \ + do { \ + if (queue) { \ + queue_put((queue), (handler), argc, __VA_ARGS__); \ + } else { \ + void *argv[argc] = {__VA_ARGS__}; \ + (handler)(argv); \ + } \ + } while (0) + // Poll for events until a condition or timeout -#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \ +#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \ do { \ int remaining = timeout; \ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ while (!(condition)) { \ - loop_poll_events(loop, remaining); \ + LOOP_PROCESS_EVENTS(loop, queue, remaining); \ if (remaining == 0) { \ break; \ } else if (remaining > 0) { \ @@ -51,6 +53,16 @@ typedef struct loop { } \ } while (0) +#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \ + do { \ + if (queue && !queue_empty(queue)) { \ + queue_process_events(queue); \ + } else { \ + loop_poll_events(loop, timeout); \ + } \ + } while (0) + + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.h.generated.h" #endif diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index c7360b8614..54dbc11a03 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -22,7 +22,7 @@ #define TERM_TIMEOUT 1000000000 #define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define CLOSE_PROC_STREAM(proc, stream) \ +#define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ stream_close(proc->stream, NULL); \ @@ -76,6 +76,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->in) { stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -83,6 +84,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->out) { stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -90,6 +92,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->err) { stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -112,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL } // Wait until all children exit - LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children)); pty_process_teardown(loop); } @@ -154,11 +157,15 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // The default status is -1, which represents a timeout int status = -1; bool interrupted = false; + if (!proc->refcount) { + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + return proc->status; + } // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -170,12 +177,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL got_int = false; process_stop(proc); if (ms == -1) { - // We can only return, if all streams/handles are closed and the job - + // We can only return if all streams/handles are closed and the job // exited. - LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, -1, + proc->refcount == 1); } else { - loop_poll_events(proc->loop, 0); + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); } } @@ -184,6 +191,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); + if (proc->events) { + // the decref call created an exit event, process it now + queue_process_events(proc->events); + } } else { proc->refcount--; } @@ -249,6 +260,18 @@ static void children_kill_cb(uv_timer_t *handle) } } +static void process_close_event(void **argv) +{ + Process *proc = argv[0]; + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + static void decref(Process *proc) { if (--proc->refcount != 0) { @@ -263,16 +286,9 @@ static void decref(Process *proc) break; } } - assert(node); kl_shift_at(WatcherPtr, loop->children, node); - shell_free_argv(proc->argv); - if (proc->type == kProcessTypePty) { - xfree(((PtyProcess *)proc)->term_name); - } - if (proc->cb) { - proc->cb(proc, proc->status, proc->data); - } + CREATE_EVENT(proc->events, process_close_event, 1, proc); } static void process_close(Process *proc) @@ -292,28 +308,27 @@ static void process_close(Process *proc) } } -static void on_process_exit(Process *proc) +static void process_close_handles(void **argv) { - if (exiting) { - on_process_exit_event((Event) {.data = proc}); - } else { - loop_push_event(proc->loop, - (Event) {.handler = on_process_exit_event, .data = proc}, false); - } + Process *proc = argv[0]; + process_close_streams(proc); + process_close(proc); +} +static void on_process_exit(Process *proc) +{ Loop *loop = proc->loop; - if (loop->children_stop_requests && !--loop->children_stop_requests) { + if (proc->stopped_time && loop->children_stop_requests + && !--loop->children_stop_requests) { // Stop the timer if no more stop requests are pending DLOG("Stopping process kill timer"); uv_timer_stop(&loop->children_kill_timer); } -} - -static void on_process_exit_event(Event event) -{ - Process *proc = event.data; - process_close_streams(proc); - process_close(proc); + // Process handles are closed in the next event loop tick. This is done to + // give libuv more time to read data from the OS after the process exits(If + // process_close_streams is called with data still in the OS buffer, we lose + // it) + CREATE_EVENT(proc->events, process_close_handles, 1, proc); } static void on_process_stream_close(Stream *stream, void *data) diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 7ef2b24b7f..45edc46b95 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,7 @@ struct process { process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; bool closed, term_sent; + Queue *events; }; static inline Process process_init(Loop *loop, ProcessType type, void *data) @@ -34,6 +35,7 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .type = type, .data = data, .loop = loop, + .events = NULL, .pid = 0, .status = 0, .refcount = 0, diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 7283cca02b..94853f616a 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -169,9 +169,16 @@ static void fread_idle_cb(uv_idle_t *handle) invoke_read_cb(stream, false); } -static void invoke_read_cb(Stream *stream, bool eof) +static void read_event(void **argv) { + Stream *stream = argv[0]; if (stream->read_cb) { + bool eof = (uintptr_t)argv[1]; stream->read_cb(stream, stream->buffer, stream->data, eof); } } + +static void invoke_read_cb(Stream *stream, bool eof) +{ + CREATE_EVENT(stream->events, read_event, 2, stream, (void *)(uintptr_t)eof); +} diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c index 63133b4f57..11ce15a882 100644 --- a/src/nvim/event/signal.c +++ b/src/nvim/event/signal.c @@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) watcher->uv.data = watcher; watcher->data = data; watcher->cb = NULL; + watcher->events = loop->fast_events; } void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) @@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void signal_event(void **argv) +{ + SignalWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->uv.signum, watcher->data); +} + static void signal_watcher_cb(uv_signal_t *handle, int signum) { SignalWatcher *watcher = handle->data; - watcher->cb(watcher, signum, watcher->data); + CREATE_EVENT(watcher->events, signal_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h index c269fa9d95..e32608acc0 100644 --- a/src/nvim/event/signal.h +++ b/src/nvim/event/signal.h @@ -14,6 +14,7 @@ struct signal_watcher { void *data; signal_cb cb; signal_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 2a618d290d..347e464d25 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher, watcher->stream->data = watcher; watcher->cb = NULL; watcher->close_cb = NULL; + watcher->events = NULL; } int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) @@ -143,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) uv_close((uv_handle_t *)watcher->stream, close_cb); } +static void connection_event(void **argv) +{ + SocketWatcher *watcher = argv[0]; + int status = (int)(uintptr_t)(argv[1]); + watcher->cb(watcher, status, watcher->data); +} + static void connection_cb(uv_stream_t *handle, int status) { SocketWatcher *watcher = handle->data; - watcher->cb(watcher, status, watcher->data); + CREATE_EVENT(watcher->events, connection_event, 2, watcher, + (void *)(uintptr_t)status); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index 17fd39f33b..ad59fdbe3a 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -30,6 +30,7 @@ struct socket_watcher { void *data; socket_cb cb; socket_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 72dabc3ce7..6caad6fdcc 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -56,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, if (stream->uvstream) { stream->uvstream->data = stream; + loop = stream->uvstream->loop->data; } stream->data = data; @@ -70,17 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->internal_close_cb = NULL; stream->closed = false; stream->buffer = NULL; + stream->events = NULL; } void stream_close(Stream *stream, stream_close_cb on_stream_close) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); - - if (stream->buffer) { - rbuffer_free(stream->buffer); - } - stream->closed = true; stream->close_cb = on_stream_close; @@ -102,6 +99,9 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + if (stream->buffer) { + rbuffer_free(stream->buffer); + } if (stream->close_cb) { stream->close_cb(stream, stream->data); } diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 37410b2036..eaf4b010f5 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -47,6 +47,7 @@ struct stream { size_t pending_reqs; void *data, *internal_data; bool closed; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c index ce33cdfc10..7bf333bcea 100644 --- a/src/nvim/event/time.c +++ b/src/nvim/event/time.c @@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) uv_timer_init(&loop->uv, &watcher->uv); watcher->uv.data = watcher; watcher->data = data; + watcher->events = loop->fast_events; } void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, @@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void time_event(void **argv) +{ + TimeWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->data); +} + static void time_watcher_cb(uv_timer_t *handle) FUNC_ATTR_NONNULL_ALL { TimeWatcher *watcher = handle->data; - watcher->cb(watcher, watcher->data); + CREATE_EVENT(watcher->events, time_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h index ee50e53d11..7882b2b627 100644 --- a/src/nvim/event/time.h +++ b/src/nvim/event/time.h @@ -12,6 +12,7 @@ struct time_watcher { uv_timer_t uv; void *data; time_cb cb, close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c index f32aee809a..03116d454f 100644 --- a/src/nvim/ex_getln.c +++ b/src/nvim/ex_getln.c @@ -306,7 +306,7 @@ getcmdline ( input_disable_events(); if (c == K_EVENT) { - loop_process_event(&loop); + queue_process_events(loop.events); continue; } diff --git a/src/nvim/globals.h b/src/nvim/globals.h index e4dcad9afb..68cb923e42 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -898,14 +898,6 @@ EXTERN FILE *scriptout INIT(= NULL); /* stream to write script to */ /* volatile because it is used in signal handler catch_sigint(). */ EXTERN volatile int got_int INIT(= FALSE); /* set to TRUE when interrupt signal occurred */ -EXTERN int disable_breakcheck INIT(= 0); // > 0 if breakchecks should be - // ignored. FIXME(tarruda): Hacky - // way to run functions that would - // result in *_breakcheck calls - // while events that would normally - // be deferred are being processed - // immediately. Ref: - // neovim/neovim#2371 EXTERN int bangredo INIT(= FALSE); /* set to TRUE with ! command */ EXTERN int searchcmdlen; /* length of previous search cmd */ EXTERN int reg_do_extmatch INIT(= 0); /* Used when compiling regexp: diff --git a/src/nvim/main.c b/src/nvim/main.c index bfaeada6de..e11db16c61 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -157,11 +157,11 @@ void event_init(void) void event_teardown(void) { - if (!loop.deferred_events) { + if (!loop.events) { return; } - loop_process_all_events(&loop); + queue_process_events(loop.events); input_stop(); channel_teardown(); process_teardown(&loop); diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ca08af1fe8..6674f3c4e4 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -68,6 +68,7 @@ typedef struct { uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; kvec_t(WBuffer *) delayed_notifications; + Queue *events; } Channel; typedef struct { @@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); + LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -459,22 +460,22 @@ static void handle_request(Channel *channel, msgpack_object *request) handler.async = true; } - bool async = kv_size(channel->call_stack) || handler.async; RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; event_data->args = args; event_data->request_id = request_id; incref(channel); - loop_push_event(&loop, (Event) { - .handler = on_request_event, - .data = event_data - }, !async); + if (handler.async) { + on_request_event((void **)&event_data); + } else { + queue_put(channel->events, on_request_event, 1, event_data); + } } -static void on_request_event(Event event) +static void on_request_event(void **argv) { - RequestEvent *e = event.data; + RequestEvent *e = argv[0]; Channel *channel = e->channel; MsgpackRpcRequestHandler handler = e->handler; Array args = e->args; @@ -649,9 +650,8 @@ static void close_channel(Channel *channel) case kChannelTypeStdio: stream_close(&channel->data.std.in, NULL); stream_close(&channel->data.std.out, NULL); - loop_push_event(&loop, - (Event) { .handler = on_stdio_close, .data = channel }, false); - break; + queue_put(loop.fast_events, exit_event, 1, channel); + return; default: abort(); } @@ -659,9 +659,9 @@ static void close_channel(Channel *channel) decref(channel); } -static void on_stdio_close(Event e) +static void exit_event(void **argv) { - decref(e.data); + decref(argv[0]); if (!exiting) { mch_exit(0); @@ -683,6 +683,7 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); + queue_free(channel->events); xfree(channel); } @@ -694,6 +695,7 @@ static void close_cb(Stream *stream, void *data) static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->events = queue_new_child(loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; diff --git a/src/nvim/normal.c b/src/nvim/normal.c index dd9a4230b5..5b35af9209 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -493,7 +493,7 @@ normal_cmd ( input_disable_events(); if (c == K_EVENT) { - loop_process_event(&loop); + queue_process_events(loop.events); return; } diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index b0263f18d0..8bc713bcff 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -132,7 +132,7 @@ bool os_char_avail(void) // Check for CTRL-C typed by reading all available characters. void os_breakcheck(void) { - if (!disable_breakcheck && !got_int) { + if (!got_int) { loop_poll_events(&loop, 0); } } @@ -292,7 +292,7 @@ static bool input_poll(int ms) prof_inchar_enter(); } - LOOP_POLL_EVENTS_UNTIL(&loop, ms, input_ready() || input_eof); + LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, ms, input_ready() || input_eof); if (do_profiling == PROF_YES && ms) { prof_inchar_exit(); @@ -383,5 +383,5 @@ static void read_error_exit(void) static bool pending_events(void) { - return events_enabled && !kl_empty(loop.deferred_events); + return events_enabled && !queue_empty(loop.events); } diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 958b4483e8..8faa46dd63 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -207,6 +207,8 @@ static int do_os_system(char **argv, Stream in, out, err; UvProcess uvproc = uv_process_init(&loop, &buf); Process *proc = &uvproc.process; + Queue *events = queue_new_child(loop.events); + proc->events = events; proc->argv = argv; proc->in = input != NULL ? &in : NULL; proc->out = &out; @@ -219,14 +221,22 @@ static int do_os_system(char **argv, msg_outtrans((char_u *)prog); msg_putchar('\n'); } + queue_free(events); return -1; } + // We want to deal with stream events as fast a possible while queueing + // process events, so reset everything to NULL. It prevents closing the + // streams while there's still data in the OS buffer(due to the process + // exiting before all data is read). if (input != NULL) { + proc->in->events = NULL; wstream_init(proc->in, 0); } + proc->out->events = NULL; rstream_init(proc->out, 0); rstream_start(proc->out, data_cb); + proc->err->events = NULL; rstream_init(proc->err, 0); rstream_start(proc->err, data_cb); @@ -267,6 +277,9 @@ static int do_os_system(char **argv, } } + assert(queue_empty(events)); + queue_free(events); + return status; } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 6de3435c4c..7158721433 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -115,16 +115,6 @@ static void deadly_signal(int signum) static void on_signal(SignalWatcher *handle, int signum, void *data) { assert(signum >= 0); - loop_push_event(&loop, (Event) { - .handler = on_signal_event, - .data = (void *)(uintptr_t)signum - }, false); -} - -static void on_signal_event(Event event) -{ - int signum = (int)(uintptr_t)event.data; - switch (signum) { #ifdef SIGPWR case SIGPWR: @@ -148,4 +138,3 @@ static void on_signal_event(Event event) break; } } - diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index 6b5d4359db..ee17938afc 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -43,7 +43,7 @@ void os_delay(uint64_t milliseconds, bool ignoreinput) if (milliseconds > INT_MAX) { milliseconds = INT_MAX; } - LOOP_POLL_EVENTS_UNTIL(&loop, (int)milliseconds, got_int); + LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, (int)milliseconds, got_int); } else { os_microdelay(milliseconds * 1000); } diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c index cf68143ac7..b9bc4c6d78 100644 --- a/src/nvim/terminal.c +++ b/src/nvim/terminal.c @@ -325,7 +325,7 @@ void terminal_resize(Terminal *term, uint16_t width, uint16_t height) invalidate_terminal(term, -1, -1); } -void terminal_enter(bool process_deferred) +void terminal_enter(void) { Terminal *term = curbuf->terminal; assert(term && "should only be called when curbuf has a terminal"); @@ -354,15 +354,9 @@ void terminal_enter(bool process_deferred) bool got_bs = false; // True if the last input was <C-\> while (term->buf == curbuf) { - if (process_deferred) { - input_enable_events(); - } - + input_enable_events(); c = safe_vgetc(); - - if (process_deferred) { - input_disable_events(); - } + input_disable_events(); switch (c) { case K_LEFTMOUSE: @@ -382,7 +376,7 @@ void terminal_enter(bool process_deferred) break; case K_EVENT: - loop_process_event(&loop); + queue_process_events(loop.events); break; case Ctrl_N: @@ -914,17 +908,10 @@ static void refresh_terminal(Terminal *term) // event. static void refresh_timer_cb(TimeWatcher *watcher, void *data) { - loop_push_event(&loop, (Event) {.handler = on_refresh}, false); - refresh_pending = false; -} - -// Refresh all invalidated terminals -static void on_refresh(Event event) -{ if (exiting) { // bad things can happen if we redraw when exiting, and there's no need to // update the buffer. - return; + goto end; } Terminal *term; void *stub; (void)(stub); @@ -936,6 +923,8 @@ static void on_refresh(Event event) pmap_clear(ptr_t)(invalidated_terminals); unblock_autocmds(); redraw(true); +end: + refresh_pending = false; } static void refresh_size(Terminal *term) diff --git a/src/nvim/tui/term_input.inl b/src/nvim/tui/term_input.inl index 0a84a3688b..1907d9895e 100644 --- a/src/nvim/tui/term_input.inl +++ b/src/nvim/tui/term_input.inl @@ -206,7 +206,7 @@ static bool handle_forced_escape(TermInput *input) return false; } -static void restart_reading(Event event); +static void restart_reading(void **argv); static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) { @@ -226,8 +226,7 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) // ls *.md | xargs nvim input->in_fd = 2; stream_close(&input->read_stream, NULL); - loop_push_event(&loop, - (Event) { .data = input, .handler = restart_reading }, false); + queue_put(loop.fast_events, restart_reading, 1, input); } else { input_done(); } @@ -272,9 +271,9 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) rbuffer_reset(input->read_stream.buffer); } -static void restart_reading(Event event) +static void restart_reading(void **argv) { - TermInput *input = event.data; + TermInput *input = argv[0]; rstream_init_fd(&loop, &input->read_stream, input->in_fd, 0xfff, input); rstream_start(&input->read_stream, read_cb); } diff --git a/src/nvim/tui/tui.c b/src/nvim/tui/tui.c index 28870b5abb..460fdb81a8 100644 --- a/src/nvim/tui/tui.c +++ b/src/nvim/tui/tui.c @@ -205,24 +205,13 @@ static void tui_stop(UI *ui) xfree(ui); } -static void try_resize(Event ev) +static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data) { - UI *ui = ev.data; + UI *ui = data; update_size(ui); ui_refresh(); } -static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data) -{ - got_winch = true; - // Queue the event because resizing can result in recursive event_poll calls - // FIXME(blueyed): TUI does not resize properly when not deferred. Why? #2322 - loop_push_event(&loop, (Event) { - .data = data, - .handler = try_resize - }, true); -} - static bool attrs_differ(HlAttrs a1, HlAttrs a2) { return a1.foreground != a2.foreground || a1.background != a2.background diff --git a/src/nvim/ui.c b/src/nvim/ui.c index 7e155f9b4f..ad875367c9 100644 --- a/src/nvim/ui.c +++ b/src/nvim/ui.c @@ -214,9 +214,9 @@ void ui_detach(UI *ui) shift_index++; } - ui_count--; - // schedule a refresh - loop_push_event(&loop, (Event) { .handler = refresh }, false); + if (--ui_count) { + ui_refresh(); + } } void ui_clear(void) @@ -485,11 +485,3 @@ static void ui_mode_change(void) UI_CALL(mode_change, mode); conceal_check_cursur_line(); } - -static void refresh(Event event) -{ - if (ui_count) { - ui_refresh(); - } -} - |