diff options
Diffstat (limited to 'src/nvim/event')
-rw-r--r-- | src/nvim/event/defs.h | 39 | ||||
-rw-r--r-- | src/nvim/event/loop.c | 95 | ||||
-rw-r--r-- | src/nvim/event/loop.h | 39 | ||||
-rw-r--r-- | src/nvim/event/process.c | 92 | ||||
-rw-r--r-- | src/nvim/event/process.h | 6 | ||||
-rw-r--r-- | src/nvim/event/pty_process.c | 30 | ||||
-rw-r--r-- | src/nvim/event/pty_process.h | 4 | ||||
-rw-r--r-- | src/nvim/event/queue.c | 208 | ||||
-rw-r--r-- | src/nvim/event/queue.h | 19 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 23 | ||||
-rw-r--r-- | src/nvim/event/signal.c | 9 | ||||
-rw-r--r-- | src/nvim/event/signal.h | 1 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 12 | ||||
-rw-r--r-- | src/nvim/event/socket.h | 1 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 13 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 9 | ||||
-rw-r--r-- | src/nvim/event/time.c | 9 | ||||
-rw-r--r-- | src/nvim/event/time.h | 1 | ||||
-rw-r--r-- | src/nvim/event/uv_process.h | 4 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 2 |
20 files changed, 444 insertions, 172 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h new file mode 100644 index 0000000000..5126d52241 --- /dev/null +++ b/src/nvim/event/defs.h @@ -0,0 +1,39 @@ +#ifndef NVIM_EVENT_DEFS_H +#define NVIM_EVENT_DEFS_H + +#include <assert.h> +#include <stdarg.h> + +#define EVENT_HANDLER_MAX_ARGC 4 + +typedef void (*argv_callback)(void **argv); +typedef struct message { + int priority; + argv_callback handler; + void *argv[EVENT_HANDLER_MAX_ARGC]; +} Event; + +#define VA_EVENT_INIT(event, p, h, a) \ + do { \ + assert(a <= EVENT_HANDLER_MAX_ARGC); \ + (event)->priority = p; \ + (event)->handler = h; \ + if (a) { \ + va_list args; \ + va_start(args, a); \ + for (int i = 0; i < a; i++) { \ + (event)->argv[i] = va_arg(args, void *); \ + } \ + va_end(args); \ + } \ + } while (0) + +static inline Event event_create(int priority, argv_callback cb, int argc, ...) +{ + assert(argc <= EVENT_HANDLER_MAX_ARGC); + Event event; + VA_EVENT_INIT(&event, priority, cb, argc); + return event; +} + +#endif // NVIM_EVENT_DEFS_H diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index d90565002e..3d3288f858 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -1,3 +1,4 @@ +#include <stdarg.h> #include <stdint.h> #include <uv.h> @@ -9,17 +10,23 @@ # include "event/loop.c.generated.h" #endif +typedef struct idle_event { + uv_idle_t idle; + Event event; +} IdleEvent; + void loop_init(Loop *loop, void *data) { uv_loop_init(&loop->uv); loop->uv.data = loop; - loop->deferred_events = kl_init(Event); - loop->immediate_events = kl_init(Event); loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; + loop->events = queue_new_parent(loop_on_put, loop); + loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); + uv_timer_init(&loop->uv, &loop->poll_timer); } void loop_poll_events(Loop *loop, int ms) @@ -30,89 +37,36 @@ void loop_poll_events(Loop *loop, int ms) abort(); // Should not re-enter uv_run } - bool wait = true; - uv_timer_t timer; + uv_run_mode mode = UV_RUN_ONCE; if (ms > 0) { - uv_timer_init(&loop->uv, &timer); // Use a repeating timeout of ms milliseconds to make sure // we do not block indefinitely for I/O. - uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms); + uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms); } else if (ms == 0) { // For ms == 0, we need to do a non-blocking event poll by // setting the run mode to UV_RUN_NOWAIT. - wait = false; + mode = UV_RUN_NOWAIT; } - if (wait) { - loop_run_once(loop); - } else { - loop_run_nowait(loop); - } + uv_run(&loop->uv, mode); if (ms > 0) { - // Ensure the timer handle is closed and run the event loop - // once more to let libuv perform it's cleanup - uv_timer_stop(&timer); - uv_close((uv_handle_t *)&timer, NULL); - loop_run_nowait(loop); + uv_timer_stop(&loop->poll_timer); } recursive--; // Can re-enter uv_run now - process_events_from(loop->immediate_events); -} - -bool loop_has_deferred_events(Loop *loop) -{ - return loop->deferred_events_allowed && !kl_empty(loop->deferred_events); + queue_process_events(loop->fast_events); } -void loop_enable_deferred_events(Loop *loop) -{ - ++loop->deferred_events_allowed; -} - -void loop_disable_deferred_events(Loop *loop) -{ - --loop->deferred_events_allowed; -} - -// Queue an event -void loop_push_event(Loop *loop, Event event, bool deferred) +void loop_on_put(Queue *queue, void *data) { + Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before // blocking for a poll. If this happens and the callback pushes a event to one // of the queues, the event would only be processed after the poll // returns(user hits a key for example). To avoid this scenario, we call // uv_stop when a event is enqueued. - loop_stop(loop); - kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, - event); -} - -void loop_process_event(Loop *loop) -{ - process_events_from(loop->deferred_events); -} - - -void loop_run(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_DEFAULT); -} - -void loop_run_once(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_ONCE); -} - -void loop_run_nowait(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_NOWAIT); -} - -void loop_stop(Loop *loop) -{ uv_stop(&loop->uv); } @@ -120,25 +74,12 @@ void loop_close(Loop *loop) { uv_close((uv_handle_t *)&loop->children_watcher, NULL); uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); + uv_close((uv_handle_t *)&loop->poll_timer, NULL); do { uv_run(&loop->uv, UV_RUN_DEFAULT); } while (uv_loop_close(&loop->uv)); } -void loop_process_all_events(Loop *loop) -{ - process_events_from(loop->immediate_events); - process_events_from(loop->deferred_events); -} - -static void process_events_from(klist_t(Event) *queue) -{ - while (!kl_empty(queue)) { - Event event = kl_shift(Event, queue); - event.handler(event); - } -} - static void timer_cb(uv_timer_t *handle) { } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 5eb4d32ca8..9212a45aa4 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -7,38 +7,39 @@ #include "nvim/lib/klist.h" #include "nvim/os/time.h" - -typedef struct event Event; -typedef void (*event_handler)(Event event); - -struct event { - void *data; - event_handler handler; -}; +#include "nvim/event/queue.h" typedef void * WatcherPtr; #define _noop(x) KLIST_INIT(WatcherPtr, WatcherPtr, _noop) -KLIST_INIT(Event, Event, _noop) typedef struct loop { uv_loop_t uv; - klist_t(Event) *deferred_events, *immediate_events; - int deferred_events_allowed; + Queue *events, *fast_events; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; - uv_timer_t children_kill_timer; + uv_timer_t children_kill_timer, poll_timer; size_t children_stop_requests; } Loop; +#define CREATE_EVENT(queue, handler, argc, ...) \ + do { \ + if (queue) { \ + queue_put((queue), (handler), argc, __VA_ARGS__); \ + } else { \ + void *argv[argc] = {__VA_ARGS__}; \ + (handler)(argv); \ + } \ + } while (0) + // Poll for events until a condition or timeout -#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \ +#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \ do { \ int remaining = timeout; \ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ while (!(condition)) { \ - loop_poll_events(loop, remaining); \ + LOOP_PROCESS_EVENTS(loop, queue, remaining); \ if (remaining == 0) { \ break; \ } else if (remaining > 0) { \ @@ -52,6 +53,16 @@ typedef struct loop { } \ } while (0) +#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \ + do { \ + if (queue && !queue_empty(queue)) { \ + queue_process_events(queue); \ + } else { \ + loop_poll_events(loop, timeout); \ + } \ + } while (0) + + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.h.generated.h" #endif diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 2b1f1ae096..81d4e690c3 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -22,7 +22,7 @@ #define TERM_TIMEOUT 1000000000 #define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define CLOSE_PROC_STREAM(proc, stream) \ +#define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ stream_close(proc->stream, NULL); \ @@ -30,19 +30,18 @@ } while (0) -bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL +bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL { - proc->loop = loop; if (proc->in) { - uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0); } if (proc->out) { - uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0); } if (proc->err) { - uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0); } bool success; @@ -77,6 +76,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->in) { stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -84,6 +84,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->out) { stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -91,6 +92,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->err) { stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -99,7 +101,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL proc->internal_exit_cb = on_process_exit; proc->internal_close_cb = decref; proc->refcount++; - kl_push(WatcherPtr, loop->children, proc); + kl_push(WatcherPtr, proc->loop->children, proc); return true; } @@ -113,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL } // Wait until all children exit - LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children)); pty_process_teardown(loop); } @@ -150,16 +152,24 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL /// indistinguishable from the process returning -1 by itself. Which /// is possible on some OS. Returns -2 if an user has interruped the /// wait. -int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1) { // The default status is -1, which represents a timeout int status = -1; bool interrupted = false; + if (!proc->refcount) { + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + return proc->status; + } + + if (!events) { + events = proc->events; + } // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -171,12 +181,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL got_int = false; process_stop(proc); if (ms == -1) { - // We can only return, if all streams/handles are closed and the job - + // We can only return if all streams/handles are closed and the job // exited. - LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1, + proc->refcount == 1); } else { - loop_poll_events(proc->loop, 0); + LOOP_PROCESS_EVENTS(proc->loop, events, 0); } } @@ -185,6 +195,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); + if (events) { + // the decref call created an exit event, process it now + queue_process_events(events); + } } else { proc->refcount--; } @@ -250,6 +264,18 @@ static void children_kill_cb(uv_timer_t *handle) } } +static void process_close_event(void **argv) +{ + Process *proc = argv[0]; + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + static void decref(Process *proc) { if (--proc->refcount != 0) { @@ -264,16 +290,9 @@ static void decref(Process *proc) break; } } - assert(node); kl_shift_at(WatcherPtr, loop->children, node); - shell_free_argv(proc->argv); - if (proc->type == kProcessTypePty) { - xfree(((PtyProcess *)proc)->term_name); - } - if (proc->cb) { - proc->cb(proc, proc->status, proc->data); - } + CREATE_EVENT(proc->events, process_close_event, 1, proc); } static void process_close(Process *proc) @@ -293,28 +312,27 @@ static void process_close(Process *proc) } } -static void on_process_exit(Process *proc) +static void process_close_handles(void **argv) { - if (exiting) { - on_process_exit_event((Event) {.data = proc}); - } else { - loop_push_event(proc->loop, - (Event) {.handler = on_process_exit_event, .data = proc}, false); - } + Process *proc = argv[0]; + process_close_streams(proc); + process_close(proc); +} +static void on_process_exit(Process *proc) +{ Loop *loop = proc->loop; - if (loop->children_stop_requests && !--loop->children_stop_requests) { + if (proc->stopped_time && loop->children_stop_requests + && !--loop->children_stop_requests) { // Stop the timer if no more stop requests are pending DLOG("Stopping process kill timer"); uv_timer_stop(&loop->children_kill_timer); } -} - -static void on_process_exit_event(Event event) -{ - Process *proc = event.data; - process_close_streams(proc); - process_close(proc); + // Process handles are closed in the next event loop tick. This is done to + // give libuv more time to read data from the OS after the process exits(If + // process_close_streams is called with data still in the OS buffer, we lose + // it) + CREATE_EVENT(proc->events, process_close_handles, 1, proc); } static void on_process_stream_close(Stream *stream, void *data) diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c84a7d1d0..45edc46b95 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,14 +26,16 @@ struct process { process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; bool closed, term_sent; + Queue *events; }; -static inline Process process_init(ProcessType type, void *data) +static inline Process process_init(Loop *loop, ProcessType type, void *data) { return (Process) { .type = type, .data = data, - .loop = NULL, + .loop = loop, + .events = NULL, .pid = 0, .status = 0, .refcount = 0, diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c index 1e24d7c919..8eef72f12f 100644 --- a/src/nvim/event/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -33,17 +33,18 @@ # include "event/pty_process.c.generated.h" #endif -static const unsigned int KILL_RETRIES = 5; -static const unsigned int KILL_TIMEOUT = 2; // seconds - bool pty_process_spawn(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { + static struct termios termios; + if (!termios.c_cflag) { + init_termios(&termios); + } + Process *proc = (Process *)ptyproc; + assert(!proc->err); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; - struct termios termios; - init_termios(&termios); uv_disable_stdio_inheritance(); int master; int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize); @@ -73,9 +74,6 @@ bool pty_process_spawn(PtyProcess *ptyproc) if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) { goto error; } - if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) { - goto error; - } ptyproc->tty_fd = master; proc->pid = pid; @@ -83,19 +81,8 @@ bool pty_process_spawn(PtyProcess *ptyproc) error: close(master); - - // terminate spawned process - kill(pid, SIGTERM); - int status, child; - unsigned int try = 0; - while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) { - sleep(KILL_TIMEOUT); - } - if (child != pid) { - kill(pid, SIGKILL); - waitpid(pid, NULL, 0); - } - + kill(pid, SIGKILL); + waitpid(pid, NULL, 0); return false; } @@ -152,7 +139,6 @@ static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL { - memset(termios, 0, sizeof(struct termios)); // Taken from pangoterm termios->c_iflag = ICRNL|IXON; termios->c_oflag = OPOST|ONLCR; diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h index a12b5489c5..446d7fd3c8 100644 --- a/src/nvim/event/pty_process.h +++ b/src/nvim/event/pty_process.h @@ -13,10 +13,10 @@ typedef struct pty_process { int tty_fd; } PtyProcess; -static inline PtyProcess pty_process_init(void *data) +static inline PtyProcess pty_process_init(Loop *loop, void *data) { PtyProcess rv; - rv.process = process_init(kProcessTypePty, data); + rv.process = process_init(loop, kProcessTypePty, data); rv.term_name = NULL; rv.width = 80; rv.height = 24; diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c new file mode 100644 index 0000000000..19eca14144 --- /dev/null +++ b/src/nvim/event/queue.c @@ -0,0 +1,208 @@ +// Queue for selective async event processing. Instances of this queue support a +// parent/child relationship with the following properties: +// +// - pushing a node to a child queue will push a corresponding link node to the +// parent queue +// - removing a link node from a parent queue will remove the next node +// in the linked child queue +// - removing a node from a child queue will remove the corresponding link node +// in the parent queue +// +// These properties allow neovim to organize and process events from different +// sources with a certain degree of control. Here's how the queue is used: +// +// +----------------+ +// | Main loop | +// +----------------+ +// ^ +// | +// +----------------+ +// +-------------->| Event loop |<------------+ +// | +--+-------------+ | +// | ^ ^ | +// | | | | +// +-----------+ +-----------+ +---------+ +---------+ +// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 | +// +-----------+ +-----------+ +---------+ +---------+ +// +// +// In the above diagram, the lower boxes represents event emitters, each with +// it's own private queue that have the event loop queue as the parent. +// +// When idle, the main loop spins the event loop which queues events from many +// sources(channels, jobs, user...). Each event emitter pushes events to its own +// private queue which is propagated to the event loop queue. When the main loop +// consumes an event, the corresponding event is removed from the emitter's +// queue. +// +// The main reason for this queue hierarchy is to allow focusing on a single +// event emitter while blocking the main loop. For example, if the `jobwait` +// vimscript function is called on job1, the main loop will temporarily stop +// polling the event loop queue and poll job1 queue instead. Same with channels, +// when calling `rpcrequest`, we want to temporarily stop processing events from +// other sources and focus on a specific channel. + +#include <assert.h> +#include <stdarg.h> +#include <stdbool.h> +#include <stdint.h> + + +#include <uv.h> + +#include "nvim/event/queue.h" +#include "nvim/memory.h" +#include "nvim/os/time.h" + +typedef struct queue_item QueueItem; +struct queue_item { + union { + Queue *queue; + struct { + Event event; + QueueItem *parent; + } item; + } data; + bool link; // this is just a link to a node in a child queue + QUEUE node; +}; + +struct queue { + Queue *parent; + QUEUE headtail; + put_callback put_cb; + void *data; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/queue.c.generated.h" +#endif + +static Event NILEVENT = {.handler = NULL, .argv = {NULL}}; + +Queue *queue_new_parent(put_callback put_cb, void *data) +{ + return queue_new(NULL, put_cb, data); +} + +Queue *queue_new_child(Queue *parent) + FUNC_ATTR_NONNULL_ALL +{ + assert(!parent->parent); + return queue_new(parent, NULL, NULL); +} + +static Queue *queue_new(Queue *parent, put_callback put_cb, void *data) +{ + Queue *rv = xmalloc(sizeof(Queue)); + QUEUE_INIT(&rv->headtail); + rv->parent = parent; + rv->put_cb = put_cb; + rv->data = data; + return rv; +} + +void queue_free(Queue *queue) +{ + assert(queue); + if (queue->parent) { + while (!QUEUE_EMPTY(&queue->headtail)) { + QUEUE *q = QUEUE_HEAD(&queue->headtail); + QueueItem *item = queue_node_data(q); + assert(!item->link); + QUEUE_REMOVE(&item->data.item.parent->node); + xfree(item->data.item.parent); + QUEUE_REMOVE(q); + xfree(item); + } + } + + xfree(queue); +} + +Event queue_get(Queue *queue) +{ + return queue_empty(queue) ? NILEVENT : queue_remove(queue); +} + +void queue_put_event(Queue *queue, Event event) +{ + assert(queue); + assert(queue->parent); // don't push directly to the parent queue + queue_push(queue, event); + if (queue->parent->put_cb) { + queue->parent->put_cb(queue->parent, queue->parent->data); + } +} + +void queue_process_events(Queue *queue) +{ + assert(queue); + while (!queue_empty(queue)) { + Event event = queue_get(queue); + if (event.handler) { + event.handler(event.argv); + } + } +} + +bool queue_empty(Queue *queue) +{ + assert(queue); + return QUEUE_EMPTY(&queue->headtail); +} + +void queue_replace_parent(Queue *queue, Queue *new_parent) +{ + assert(queue_empty(queue)); + queue->parent = new_parent; +} + +static Event queue_remove(Queue *queue) +{ + assert(!queue_empty(queue)); + QUEUE *h = QUEUE_HEAD(&queue->headtail); + QUEUE_REMOVE(h); + QueueItem *item = queue_node_data(h); + Event rv; + + if (item->link) { + assert(!queue->parent); + // remove the next node in the linked queue + Queue *linked = item->data.queue; + assert(!queue_empty(linked)); + QueueItem *child = + queue_node_data(QUEUE_HEAD(&linked->headtail)); + QUEUE_REMOVE(&child->node); + rv = child->data.item.event; + xfree(child); + } else { + assert(queue->parent); + assert(!queue_empty(queue->parent)); + // remove the corresponding link node in the parent queue + QUEUE_REMOVE(&item->data.item.parent->node); + xfree(item->data.item.parent); + rv = item->data.item.event; + } + + xfree(item); + return rv; +} + +static void queue_push(Queue *queue, Event event) +{ + QueueItem *item = xmalloc(sizeof(QueueItem)); + item->link = false; + item->data.item.event = event; + QUEUE_INSERT_TAIL(&queue->headtail, &item->node); + // push link node to the parent queue + item->data.item.parent = xmalloc(sizeof(QueueItem)); + item->data.item.parent->link = true; + item->data.item.parent->data.queue = queue; + QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node); +} + +static QueueItem *queue_node_data(QUEUE *q) +{ + return QUEUE_DATA(q, QueueItem, node); +} diff --git a/src/nvim/event/queue.h b/src/nvim/event/queue.h new file mode 100644 index 0000000000..85fc59f8b2 --- /dev/null +++ b/src/nvim/event/queue.h @@ -0,0 +1,19 @@ +#ifndef NVIM_EVENT_QUEUE_H +#define NVIM_EVENT_QUEUE_H + +#include <uv.h> + +#include "nvim/event/defs.h" +#include "nvim/lib/queue.h" + +typedef struct queue Queue; +typedef void (*put_callback)(Queue *queue, void *data); + +#define queue_put(q, h, ...) \ + queue_put_event(q, event_create(1, h, __VA_ARGS__)); + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/queue.h.generated.h" +#endif +#endif // NVIM_EVENT_QUEUE_H diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 9d2439ac2b..0a720bb852 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -49,6 +49,7 @@ void rstream_init(Stream *stream, size_t bufsize) /// /// @param stream The `Stream` instance void rstream_start(Stream *stream, stream_read_cb cb) + FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; if (stream->uvstream) { @@ -62,6 +63,7 @@ void rstream_start(Stream *stream, stream_read_cb cb) /// /// @param stream The `Stream` instance void rstream_stop(Stream *stream) + FUNC_ATTR_NONNULL_ALL { if (stream->uvstream) { uv_read_stop(stream->uvstream); @@ -112,7 +114,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); - invoke_read_cb(stream, true); + invoke_read_cb(stream, 0, true); } return; } @@ -122,7 +124,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(stream->buffer, nread); - invoke_read_cb(stream, false); + invoke_read_cb(stream, nread, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -156,7 +158,7 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(&stream->uv.idle); - invoke_read_cb(stream, true); + invoke_read_cb(stream, 0, true); return; } @@ -164,12 +166,21 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(stream->buffer, nread); stream->fpos += nread; - invoke_read_cb(stream, false); + invoke_read_cb(stream, nread, false); } -static void invoke_read_cb(Stream *stream, bool eof) +static void read_event(void **argv) { + Stream *stream = argv[0]; if (stream->read_cb) { - stream->read_cb(stream, stream->buffer, stream->data, eof); + size_t count = (uintptr_t)argv[1]; + bool eof = (uintptr_t)argv[2]; + stream->read_cb(stream, stream->buffer, count, stream->data, eof); } } + +static void invoke_read_cb(Stream *stream, size_t count, bool eof) +{ + CREATE_EVENT(stream->events, read_event, 3, stream, + (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); +} diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c index 63133b4f57..11ce15a882 100644 --- a/src/nvim/event/signal.c +++ b/src/nvim/event/signal.c @@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) watcher->uv.data = watcher; watcher->data = data; watcher->cb = NULL; + watcher->events = loop->fast_events; } void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) @@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void signal_event(void **argv) +{ + SignalWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->uv.signum, watcher->data); +} + static void signal_watcher_cb(uv_signal_t *handle, int signum) { SignalWatcher *watcher = handle->data; - watcher->cb(watcher, signum, watcher->data); + CREATE_EVENT(watcher->events, signal_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h index c269fa9d95..e32608acc0 100644 --- a/src/nvim/event/signal.h +++ b/src/nvim/event/signal.h @@ -14,6 +14,7 @@ struct signal_watcher { void *data; signal_cb cb; signal_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index bdc632abf0..347e464d25 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher, watcher->stream->data = watcher; watcher->cb = NULL; watcher->close_cb = NULL; + watcher->events = NULL; } int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) @@ -113,6 +114,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) } int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; @@ -142,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) uv_close((uv_handle_t *)watcher->stream, close_cb); } +static void connection_event(void **argv) +{ + SocketWatcher *watcher = argv[0]; + int status = (int)(uintptr_t)(argv[1]); + watcher->cb(watcher, status, watcher->data); +} + static void connection_cb(uv_stream_t *handle, int status) { SocketWatcher *watcher = handle->data; - watcher->cb(watcher, status, watcher->data); + CREATE_EVENT(watcher->events, connection_event, 2, watcher, + (void *)(uintptr_t)status); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index 17fd39f33b..ad59fdbe3a 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -30,6 +30,7 @@ struct socket_watcher { void *data; socket_cb cb; socket_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 959b532146..6caad6fdcc 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -32,6 +32,7 @@ int stream_set_blocking(int fd, bool blocking) void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, void *data) + FUNC_ATTR_NONNULL_ARG(2) { stream->uvstream = uvstream; @@ -55,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, if (stream->uvstream) { stream->uvstream->data = stream; + loop = stream->uvstream->loop->data; } stream->data = data; @@ -69,16 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->internal_close_cb = NULL; stream->closed = false; stream->buffer = NULL; + stream->events = NULL; } void stream_close(Stream *stream, stream_close_cb on_stream_close) + FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); - - if (stream->buffer) { - rbuffer_free(stream->buffer); - } - stream->closed = true; stream->close_cb = on_stream_close; @@ -88,6 +87,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close) } void stream_close_handle(Stream *stream) + FUNC_ATTR_NONNULL_ALL { if (stream->uvstream) { uv_close((uv_handle_t *)stream->uvstream, close_cb); @@ -99,6 +99,9 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + if (stream->buffer) { + rbuffer_free(stream->buffer); + } if (stream->close_cb) { stream->close_cb(stream, stream->data); } diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 37410b2036..c6baac0db7 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -14,10 +14,14 @@ typedef struct stream Stream; /// /// @param stream The Stream instance /// @param rbuffer The associated RBuffer instance +/// @param count Number of bytes to read. This must be respected if keeping +/// the order of events is a requirement. This is because events +/// may be queued and only processed later when more data is copied +/// into to the buffer, so one read may starve another. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, - bool eof); +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -47,6 +51,7 @@ struct stream { size_t pending_reqs; void *data, *internal_data; bool closed; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c index ce33cdfc10..7bf333bcea 100644 --- a/src/nvim/event/time.c +++ b/src/nvim/event/time.c @@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) uv_timer_init(&loop->uv, &watcher->uv); watcher->uv.data = watcher; watcher->data = data; + watcher->events = loop->fast_events; } void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, @@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void time_event(void **argv) +{ + TimeWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->data); +} + static void time_watcher_cb(uv_timer_t *handle) FUNC_ATTR_NONNULL_ALL { TimeWatcher *watcher = handle->data; - watcher->cb(watcher, watcher->data); + CREATE_EVENT(watcher->events, time_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h index ee50e53d11..7882b2b627 100644 --- a/src/nvim/event/time.h +++ b/src/nvim/event/time.h @@ -12,6 +12,7 @@ struct time_watcher { uv_timer_t uv; void *data; time_cb cb, close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h index a17f1446b3..5ee73044b5 100644 --- a/src/nvim/event/uv_process.h +++ b/src/nvim/event/uv_process.h @@ -12,10 +12,10 @@ typedef struct uv_process { uv_stdio_container_t uvstdio[3]; } UvProcess; -static inline UvProcess uv_process_init(void *data) +static inline UvProcess uv_process_init(Loop *loop, void *data) { UvProcess rv; - rv.process = process_init(kProcessTypeUv, data); + rv.process = process_init(loop, kProcessTypeUv, data); return rv; } diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 5fcb724fe3..8028e35e6b 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -118,6 +118,7 @@ WBuffer *wstream_new_buffer(char *data, size_t size, size_t refcount, wbuffer_data_finalizer cb) + FUNC_ATTR_NONNULL_ARG(1) { WBuffer *rv = xmalloc(sizeof(WBuffer)); rv->size = size; @@ -151,6 +152,7 @@ static void write_cb(uv_write_t *req, int status) } void wstream_release_wbuffer(WBuffer *buffer) + FUNC_ATTR_NONNULL_ALL { if (!--buffer->refcount) { if (buffer->cb) { |