diff options
Diffstat (limited to 'src/nvim/event')
-rw-r--r-- | src/nvim/event/defs.h | 26 | ||||
-rw-r--r-- | src/nvim/event/libuv_process.c | 5 | ||||
-rw-r--r-- | src/nvim/event/loop.c | 46 | ||||
-rw-r--r-- | src/nvim/event/loop.h | 68 | ||||
-rw-r--r-- | src/nvim/event/multiqueue.c | 230 | ||||
-rw-r--r-- | src/nvim/event/multiqueue.h | 19 | ||||
-rw-r--r-- | src/nvim/event/process.c | 119 | ||||
-rw-r--r-- | src/nvim/event/process.h | 4 | ||||
-rw-r--r-- | src/nvim/event/pty_process.c | 240 | ||||
-rw-r--r-- | src/nvim/event/pty_process.h | 30 | ||||
-rw-r--r-- | src/nvim/event/queue.c | 208 | ||||
-rw-r--r-- | src/nvim/event/queue.h | 19 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 29 | ||||
-rw-r--r-- | src/nvim/event/signal.h | 2 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 6 | ||||
-rw-r--r-- | src/nvim/event/socket.h | 2 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 10 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 6 | ||||
-rw-r--r-- | src/nvim/event/time.c | 5 | ||||
-rw-r--r-- | src/nvim/event/time.h | 3 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 17 |
21 files changed, 460 insertions, 634 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index b802866a3d..e5335d9f25 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -14,19 +14,19 @@ typedef struct message { } Event; typedef void(*event_scheduler)(Event event, void *data); -#define VA_EVENT_INIT(event, p, h, a) \ - do { \ - assert(a <= EVENT_HANDLER_MAX_ARGC); \ - (event)->priority = p; \ - (event)->handler = h; \ - if (a) { \ - va_list args; \ - va_start(args, a); \ - for (int i = 0; i < a; i++) { \ - (event)->argv[i] = va_arg(args, void *); \ - } \ - va_end(args); \ - } \ +#define VA_EVENT_INIT(event, p, h, a) \ + do { \ + assert(a <= EVENT_HANDLER_MAX_ARGC); \ + (event)->priority = p; \ + (event)->handler = h; \ + if (a) { \ + va_list args; \ + va_start(args, a); \ + for (int i = 0; i < a; i++) { \ + (event)->argv[i] = va_arg(args, void *); \ + } \ + va_end(args); \ + } \ } while (0) static inline Event event_create(int priority, argv_callback cb, int argc, ...) diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c index 9ef3468284..907187aa17 100644 --- a/src/nvim/event/libuv_process.c +++ b/src/nvim/event/libuv_process.c @@ -19,13 +19,12 @@ bool libuv_process_spawn(LibuvProcess *uvproc) Process *proc = (Process *)uvproc; uvproc->uvopts.file = proc->argv[0]; uvproc->uvopts.args = proc->argv; - uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE - | UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS; + uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; if (proc->detach) { uvproc->uvopts.flags |= UV_PROCESS_DETACHED; } uvproc->uvopts.exit_cb = exit_cb; - uvproc->uvopts.cwd = NULL; + uvproc->uvopts.cwd = proc->cwd; uvproc->uvopts.env = NULL; uvproc->uvopts.stdio = uvproc->uvstdio; uvproc->uvopts.stdio_count = 3; diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 6f3e6b9253..0e1775d01b 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -18,9 +18,9 @@ void loop_init(Loop *loop, void *data) loop->uv.data = loop; loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; - loop->events = queue_new_parent(loop_on_put, loop); - loop->fast_events = queue_new_child(loop->events); - loop->thread_events = queue_new_parent(NULL, NULL); + loop->events = multiqueue_new_parent(loop_on_put, loop); + loop->fast_events = multiqueue_new_child(loop->events); + loop->thread_events = multiqueue_new_parent(NULL, NULL); uv_mutex_init(&loop->mutex); uv_async_init(&loop->uv, &loop->async, async_cb); uv_signal_init(&loop->uv, &loop->children_watcher); @@ -53,19 +53,19 @@ void loop_poll_events(Loop *loop, int ms) } loop->recursive--; // Can re-enter uv_run now - queue_process_events(loop->fast_events); + multiqueue_process_events(loop->fast_events); } // Schedule an event from another thread void loop_schedule(Loop *loop, Event event) { uv_mutex_lock(&loop->mutex); - queue_put_event(loop->thread_events, event); + multiqueue_put_event(loop->thread_events, event); uv_async_send(&loop->async); uv_mutex_unlock(&loop->mutex); } -void loop_on_put(Queue *queue, void *data) +void loop_on_put(MultiQueue *queue, void *data) { Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before @@ -76,7 +76,7 @@ void loop_on_put(Queue *queue, void *data) uv_stop(&loop->uv); } -void loop_close(Loop *loop) +void loop_close(Loop *loop, bool wait) { uv_mutex_destroy(&loop->mutex); uv_close((uv_handle_t *)&loop->children_watcher, NULL); @@ -84,21 +84,37 @@ void loop_close(Loop *loop) uv_close((uv_handle_t *)&loop->poll_timer, NULL); uv_close((uv_handle_t *)&loop->async, NULL); do { - uv_run(&loop->uv, UV_RUN_DEFAULT); - } while (uv_loop_close(&loop->uv)); - queue_free(loop->fast_events); - queue_free(loop->thread_events); - queue_free(loop->events); + uv_run(&loop->uv, wait ? UV_RUN_DEFAULT : UV_RUN_NOWAIT); + } while (uv_loop_close(&loop->uv) && wait); + multiqueue_free(loop->fast_events); + multiqueue_free(loop->thread_events); + multiqueue_free(loop->events); kl_destroy(WatcherPtr, loop->children); } +void loop_purge(Loop *loop) +{ + uv_mutex_lock(&loop->mutex); + multiqueue_purge_events(loop->thread_events); + multiqueue_purge_events(loop->fast_events); + uv_mutex_unlock(&loop->mutex); +} + +size_t loop_size(Loop *loop) +{ + uv_mutex_lock(&loop->mutex); + size_t rv = multiqueue_size(loop->thread_events); + uv_mutex_unlock(&loop->mutex); + return rv; +} + static void async_cb(uv_async_t *handle) { Loop *l = handle->loop->data; uv_mutex_lock(&l->mutex); - while (!queue_empty(l->thread_events)) { - Event ev = queue_get(l->thread_events); - queue_put_event(l->fast_events, ev); + while (!multiqueue_empty(l->thread_events)) { + Event ev = multiqueue_get(l->thread_events); + multiqueue_put_event(l->fast_events, ev); } uv_mutex_unlock(&l->mutex); } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 0c1fcb5ed9..e7d7bdd483 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -7,7 +7,7 @@ #include "nvim/lib/klist.h" #include "nvim/os/time.h" -#include "nvim/event/queue.h" +#include "nvim/event/multiqueue.h" typedef void * WatcherPtr; @@ -16,7 +16,7 @@ KLIST_INIT(WatcherPtr, WatcherPtr, _noop) typedef struct loop { uv_loop_t uv; - Queue *events, *fast_events, *thread_events; + MultiQueue *events, *fast_events, *thread_events; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; uv_timer_t children_kill_timer, poll_timer; @@ -26,43 +26,43 @@ typedef struct loop { int recursive; } Loop; -#define CREATE_EVENT(queue, handler, argc, ...) \ - do { \ - if (queue) { \ - queue_put((queue), (handler), argc, __VA_ARGS__); \ - } else { \ - void *argv[argc] = {__VA_ARGS__}; \ - (handler)(argv); \ - } \ +#define CREATE_EVENT(multiqueue, handler, argc, ...) \ + do { \ + if (multiqueue) { \ + multiqueue_put((multiqueue), (handler), argc, __VA_ARGS__); \ + } else { \ + void *argv[argc] = { __VA_ARGS__ }; \ + (handler)(argv); \ + } \ } while (0) // Poll for events until a condition or timeout -#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \ - do { \ - int remaining = timeout; \ - uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ - while (!(condition)) { \ - LOOP_PROCESS_EVENTS(loop, queue, remaining); \ - if (remaining == 0) { \ - break; \ - } else if (remaining > 0) { \ - uint64_t now = os_hrtime(); \ - remaining -= (int) ((now - before) / 1000000); \ - before = now; \ - if (remaining <= 0) { \ - break; \ - } \ - } \ - } \ +#define LOOP_PROCESS_EVENTS_UNTIL(loop, multiqueue, timeout, condition) \ + do { \ + int remaining = timeout; \ + uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ + while (!(condition)) { \ + LOOP_PROCESS_EVENTS(loop, multiqueue, remaining); \ + if (remaining == 0) { \ + break; \ + } else if (remaining > 0) { \ + uint64_t now = os_hrtime(); \ + remaining -= (int) ((now - before) / 1000000); \ + before = now; \ + if (remaining <= 0) { \ + break; \ + } \ + } \ + } \ } while (0) -#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \ - do { \ - if (queue && !queue_empty(queue)) { \ - queue_process_events(queue); \ - } else { \ - loop_poll_events(loop, timeout); \ - } \ +#define LOOP_PROCESS_EVENTS(loop, multiqueue, timeout) \ + do { \ + if (multiqueue && !multiqueue_empty(multiqueue)) { \ + multiqueue_process_events(multiqueue); \ + } else { \ + loop_poll_events(loop, timeout); \ + } \ } while (0) diff --git a/src/nvim/event/multiqueue.c b/src/nvim/event/multiqueue.c new file mode 100644 index 0000000000..79b4dd9458 --- /dev/null +++ b/src/nvim/event/multiqueue.c @@ -0,0 +1,230 @@ +// Multi-level queue for selective async event processing. +// Not threadsafe; access must be synchronized externally. +// +// Multiqueue supports a parent-child relationship with these properties: +// - pushing a node to a child queue will push a corresponding link node to the +// parent queue +// - removing a link node from a parent queue will remove the next node +// in the linked child queue +// - removing a node from a child queue will remove the corresponding link node +// in the parent queue +// +// These properties allow Nvim to organize and process events from different +// sources with a certain degree of control. How the multiqueue is used: +// +// +----------------+ +// | Main loop | +// +----------------+ +// +// +----------------+ +// +-------------->| Event loop |<------------+ +// | +--+-------------+ | +// | ^ ^ | +// | | | | +// +-----------+ +-----------+ +---------+ +---------+ +// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 | +// +-----------+ +-----------+ +---------+ +---------+ +// +// +// The lower boxes represent event emitters, each with its own private queue +// having the event loop queue as the parent. +// +// When idle, the main loop spins the event loop which queues events from many +// sources (channels, jobs, user...). Each event emitter pushes events to its +// private queue which is propagated to the event loop queue. When the main loop +// consumes an event, the corresponding event is removed from the emitter's +// queue. +// +// The main reason for this queue hierarchy is to allow focusing on a single +// event emitter while blocking the main loop. For example, if the `jobwait` +// VimL function is called on job1, the main loop will temporarily stop polling +// the event loop queue and poll job1 queue instead. Same with channels, when +// calling `rpcrequest` we want to temporarily stop processing events from +// other sources and focus on a specific channel. + +#include <assert.h> +#include <stdarg.h> +#include <stdbool.h> +#include <stdint.h> + + +#include <uv.h> + +#include "nvim/event/multiqueue.h" +#include "nvim/memory.h" +#include "nvim/os/time.h" + +typedef struct multiqueue_item MultiQueueItem; +struct multiqueue_item { + union { + MultiQueue *queue; + struct { + Event event; + MultiQueueItem *parent_item; + } item; + } data; + bool link; // true: current item is just a link to a node in a child queue + QUEUE node; +}; + +struct multiqueue { + MultiQueue *parent; + QUEUE headtail; // circularly-linked + put_callback put_cb; + void *data; + size_t size; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/multiqueue.c.generated.h" +#endif + +static Event NILEVENT = { .handler = NULL, .argv = {NULL} }; + +MultiQueue *multiqueue_new_parent(put_callback put_cb, void *data) +{ + return multiqueue_new(NULL, put_cb, data); +} + +MultiQueue *multiqueue_new_child(MultiQueue *parent) + FUNC_ATTR_NONNULL_ALL +{ + assert(!parent->parent); // parent cannot have a parent, more like a "root" + parent->size++; + return multiqueue_new(parent, NULL, NULL); +} + +static MultiQueue *multiqueue_new(MultiQueue *parent, put_callback put_cb, + void *data) +{ + MultiQueue *rv = xmalloc(sizeof(MultiQueue)); + QUEUE_INIT(&rv->headtail); + rv->size = 0; + rv->parent = parent; + rv->put_cb = put_cb; + rv->data = data; + return rv; +} + +void multiqueue_free(MultiQueue *this) +{ + assert(this); + while (!QUEUE_EMPTY(&this->headtail)) { + QUEUE *q = QUEUE_HEAD(&this->headtail); + MultiQueueItem *item = multiqueue_node_data(q); + if (this->parent) { + QUEUE_REMOVE(&item->data.item.parent_item->node); + xfree(item->data.item.parent_item); + } + QUEUE_REMOVE(q); + xfree(item); + } + + xfree(this); +} + +Event multiqueue_get(MultiQueue *this) +{ + return multiqueue_empty(this) ? NILEVENT : multiqueue_remove(this); +} + +void multiqueue_put_event(MultiQueue *this, Event event) +{ + assert(this); + multiqueue_push(this, event); + if (this->parent && this->parent->put_cb) { + this->parent->put_cb(this->parent, this->parent->data); + } +} + +void multiqueue_process_events(MultiQueue *this) +{ + assert(this); + while (!multiqueue_empty(this)) { + Event event = multiqueue_get(this); + if (event.handler) { + event.handler(event.argv); + } + } +} + +/// Removes all events without processing them. +void multiqueue_purge_events(MultiQueue *this) +{ + assert(this); + while (!multiqueue_empty(this)) { + (void)multiqueue_remove(this); + } +} + +bool multiqueue_empty(MultiQueue *this) +{ + assert(this); + return QUEUE_EMPTY(&this->headtail); +} + +void multiqueue_replace_parent(MultiQueue *this, MultiQueue *new_parent) +{ + assert(multiqueue_empty(this)); + this->parent = new_parent; +} + +/// Gets the count of all events currently in the queue. +size_t multiqueue_size(MultiQueue *this) +{ + return this->size; +} + +static Event multiqueue_remove(MultiQueue *this) +{ + assert(!multiqueue_empty(this)); + QUEUE *h = QUEUE_HEAD(&this->headtail); + QUEUE_REMOVE(h); + MultiQueueItem *item = multiqueue_node_data(h); + Event rv; + + if (item->link) { + assert(!this->parent); + // remove the next node in the linked queue + MultiQueue *linked = item->data.queue; + assert(!multiqueue_empty(linked)); + MultiQueueItem *child = + multiqueue_node_data(QUEUE_HEAD(&linked->headtail)); + QUEUE_REMOVE(&child->node); + rv = child->data.item.event; + xfree(child); + } else { + if (this->parent) { + // remove the corresponding link node in the parent queue + QUEUE_REMOVE(&item->data.item.parent_item->node); + xfree(item->data.item.parent_item); + } + rv = item->data.item.event; + } + + this->size--; + xfree(item); + return rv; +} + +static void multiqueue_push(MultiQueue *this, Event event) +{ + MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem)); + item->link = false; + item->data.item.event = event; + QUEUE_INSERT_TAIL(&this->headtail, &item->node); + if (this->parent) { + // push link node to the parent queue + item->data.item.parent_item = xmalloc(sizeof(MultiQueueItem)); + item->data.item.parent_item->link = true; + item->data.item.parent_item->data.queue = this; + QUEUE_INSERT_TAIL(&this->parent->headtail, + &item->data.item.parent_item->node); + } + this->size++; +} + +static MultiQueueItem *multiqueue_node_data(QUEUE *q) +{ + return QUEUE_DATA(q, MultiQueueItem, node); +} diff --git a/src/nvim/event/multiqueue.h b/src/nvim/event/multiqueue.h new file mode 100644 index 0000000000..def6b95a10 --- /dev/null +++ b/src/nvim/event/multiqueue.h @@ -0,0 +1,19 @@ +#ifndef NVIM_EVENT_MULTIQUEUE_H +#define NVIM_EVENT_MULTIQUEUE_H + +#include <uv.h> + +#include "nvim/event/defs.h" +#include "nvim/lib/queue.h" + +typedef struct multiqueue MultiQueue; +typedef void (*put_callback)(MultiQueue *multiq, void *data); + +#define multiqueue_put(q, h, ...) \ + multiqueue_put_event(q, event_create(1, h, __VA_ARGS__)); + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/multiqueue.h.generated.h" +#endif +#endif // NVIM_EVENT_MULTIQUEUE_H diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 9bb62891c7..dc7886469b 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -9,7 +9,7 @@ #include "nvim/event/wstream.h" #include "nvim/event/process.h" #include "nvim/event/libuv_process.h" -#include "nvim/event/pty_process.h" +#include "nvim/os/pty_process.h" #include "nvim/globals.h" #include "nvim/log.h" @@ -17,16 +17,15 @@ # include "event/process.c.generated.h" #endif -// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly -// exit before we send SIGNAL to it +// Time (ns) for a process to exit cleanly before we send TERM/KILL. #define TERM_TIMEOUT 1000000000 #define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define CLOSE_PROC_STREAM(proc, stream) \ - do { \ - if (proc->stream && !proc->stream->closed) { \ - stream_close(proc->stream, NULL); \ - } \ +#define CLOSE_PROC_STREAM(proc, stream) \ + do { \ + if (proc->stream && !proc->stream->closed) { \ + stream_close(proc->stream, NULL, NULL); \ + } \ } while (0) static bool process_is_tearing_down = false; @@ -78,10 +77,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL return false; } - void *data = proc->data; - if (proc->in) { - stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe); proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; @@ -89,7 +86,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->out) { - stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe); proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; @@ -97,7 +94,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->err) { - stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe); proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; @@ -116,23 +113,20 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL process_is_tearing_down = true; kl_iter(WatcherPtr, loop->children, current) { Process *proc = (*current)->data; - if (proc->detach) { + if (proc->detach || proc->type == kProcessTypePty) { // Close handles to process without killing it. CREATE_EVENT(loop->events, process_close_handles, 1, proc); } else { - if (proc->type == kProcessTypeUv) { - uv_kill(proc->pid, SIGTERM); - proc->term_sent = true; - process_stop(proc); - } else { // kProcessTypePty - process_close_streams(proc); - pty_process_close_master((PtyProcess *)proc); - } + uv_kill(proc->pid, SIGTERM); + proc->term_sent = true; + process_stop(proc); } } - // Wait until all children exit - LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children)); + // Wait until all children exit and all close events are processed. + LOOP_PROCESS_EVENTS_UNTIL( + loop, loop->events, -1, + kl_empty(loop->children) && multiqueue_empty(loop->events)); pty_process_teardown(loop); } @@ -169,14 +163,16 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL /// indistinguishable from the process returning -1 by itself. Which /// is possible on some OS. Returns -2 if an user has interruped the /// wait. -int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1) +int process_wait(Process *proc, int ms, MultiQueue *events) + FUNC_ATTR_NONNULL_ARG(1) { // The default status is -1, which represents a timeout int status = -1; bool interrupted = false; if (!proc->refcount) { + status = proc->status; LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); - return proc->status; + return status; } if (!events) { @@ -214,7 +210,7 @@ int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1) decref(proc); if (events) { // the decref call created an exit event, process it now - queue_process_events(events); + multiqueue_process_events(events); } } else { proc->refcount--; @@ -315,8 +311,10 @@ static void decref(Process *proc) static void process_close(Process *proc) FUNC_ATTR_NONNULL_ARG(1) { - if (process_is_tearing_down && proc->detach && proc->closed) { - // If a detached process dies while tearing down it might get closed twice. + if (process_is_tearing_down && (proc->detach || proc->type == kProcessTypePty) + && proc->closed) { + // If a detached/pty process dies while tearing down it might get closed + // twice. return; } assert(!proc->closed); @@ -333,9 +331,61 @@ static void process_close(Process *proc) } } +/// Flush output stream. +/// +/// @param proc Process, for which an output stream should be flushed. +/// @param stream Stream to flush. +static void flush_stream(Process *proc, Stream *stream) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (!stream || stream->closed) { + return; + } + + // Maximal remaining data size of terminated process is system + // buffer size. + // Also helps with a child process that keeps the output streams open. If it + // keeps sending data, we only accept as much data as the system buffer size. + // Otherwise this would block cleanup/teardown. + int system_buffer_size = 0; + int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe, + &system_buffer_size); + if (err) { + system_buffer_size = (int)rbuffer_capacity(stream->buffer); + } + + size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; + + // Read remaining data. + while (!stream->closed && stream->num_bytes < max_bytes) { + // Remember number of bytes before polling + size_t num_bytes = stream->num_bytes; + + // Poll for data and process the generated events. + loop_poll_events(proc->loop, 0); + if (proc->events) { + multiqueue_process_events(proc->events); + } + + // Stream can be closed if it is empty. + if (num_bytes == stream->num_bytes) { + if (stream->read_cb) { + // Stream callback could miss EOF handling if a child keeps the stream + // open. + stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); + } + break; + } + } +} + static void process_close_handles(void **argv) { Process *proc = argv[0]; + + flush_stream(proc, proc->out); + flush_stream(proc, proc->err); + process_close_streams(proc); process_close(proc); } @@ -350,11 +400,12 @@ static void on_process_exit(Process *proc) uv_timer_stop(&loop->children_kill_timer); } - // Process handles are closed in the next event loop tick. This is done to - // give libuv more time to read data from the OS after the process exits(If - // process_close_streams is called with data still in the OS buffer, we lose - // it) - CREATE_EVENT(proc->events, process_close_handles, 1, proc); + // Process has terminated, but there could still be data to be read from the + // OS. We are still in the libuv loop, so we cannot call code that polls for + // more data directly. Instead delay the reading after the libuv loop by + // queueing process_close_handles() as an event. + MultiQueue *queue = proc->events ? proc->events : loop->events; + CREATE_EVENT(queue, process_close_handles, 1, proc); } static void on_process_stream_close(Stream *stream, void *data) diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index e23c8ea60f..5cbf7f9ce7 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -21,12 +21,13 @@ struct process { int pid, status, refcount; // set to the hrtime of when process_stop was called for the process. uint64_t stopped_time; + char *cwd; char **argv; Stream *in, *out, *err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; bool closed, term_sent, detach; - Queue *events; + MultiQueue *events; }; static inline Process process_init(Loop *loop, ProcessType type, void *data) @@ -40,6 +41,7 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .status = 0, .refcount = 0, .stopped_time = 0, + .cwd = NULL, .argv = NULL, .in = NULL, .out = NULL, diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c deleted file mode 100644 index 8eef72f12f..0000000000 --- a/src/nvim/event/pty_process.c +++ /dev/null @@ -1,240 +0,0 @@ -// Some of the code came from pangoterm and libuv -#include <stdbool.h> -#include <stdlib.h> -#include <string.h> - -#include <unistd.h> -#include <termios.h> -#include <sys/types.h> -#include <sys/wait.h> -#include <sys/ioctl.h> - -// forkpty is not in POSIX, so headers are platform-specific -#if defined(__FreeBSD__) -# include <libutil.h> -#elif defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__) -# include <util.h> -#else -# include <pty.h> -#endif - -#include <uv.h> - -#include "nvim/lib/klist.h" - -#include "nvim/event/loop.h" -#include "nvim/event/rstream.h" -#include "nvim/event/wstream.h" -#include "nvim/event/process.h" -#include "nvim/event/pty_process.h" -#include "nvim/log.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/pty_process.c.generated.h" -#endif - -bool pty_process_spawn(PtyProcess *ptyproc) - FUNC_ATTR_NONNULL_ALL -{ - static struct termios termios; - if (!termios.c_cflag) { - init_termios(&termios); - } - - Process *proc = (Process *)ptyproc; - assert(!proc->err); - uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); - ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; - uv_disable_stdio_inheritance(); - int master; - int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize); - - if (pid < 0) { - ELOG("forkpty failed: %s", strerror(errno)); - return false; - } else if (pid == 0) { - init_child(ptyproc); - abort(); - } - - // make sure the master file descriptor is non blocking - int master_status_flags = fcntl(master, F_GETFL); - if (master_status_flags == -1) { - ELOG("Failed to get master descriptor status flags: %s", strerror(errno)); - goto error; - } - if (fcntl(master, F_SETFL, master_status_flags | O_NONBLOCK) == -1) { - ELOG("Failed to make master descriptor non-blocking: %s", strerror(errno)); - goto error; - } - - if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) { - goto error; - } - if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) { - goto error; - } - - ptyproc->tty_fd = master; - proc->pid = pid; - return true; - -error: - close(master); - kill(pid, SIGKILL); - waitpid(pid, NULL, 0); - return false; -} - -void pty_process_resize(PtyProcess *ptyproc, uint16_t width, - uint16_t height) - FUNC_ATTR_NONNULL_ALL -{ - ptyproc->winsize = (struct winsize){height, width, 0, 0}; - ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize); -} - -void pty_process_close(PtyProcess *ptyproc) - FUNC_ATTR_NONNULL_ALL -{ - pty_process_close_master(ptyproc); - Process *proc = (Process *)ptyproc; - if (proc->internal_close_cb) { - proc->internal_close_cb(proc); - } -} - -void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL -{ - if (ptyproc->tty_fd >= 0) { - close(ptyproc->tty_fd); - ptyproc->tty_fd = -1; - } -} - -void pty_process_teardown(Loop *loop) -{ - uv_signal_stop(&loop->children_watcher); -} - -static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL -{ - unsetenv("COLUMNS"); - unsetenv("LINES"); - unsetenv("TERMCAP"); - unsetenv("COLORTERM"); - unsetenv("COLORFGBG"); - - signal(SIGCHLD, SIG_DFL); - signal(SIGHUP, SIG_DFL); - signal(SIGINT, SIG_DFL); - signal(SIGQUIT, SIG_DFL); - signal(SIGTERM, SIG_DFL); - signal(SIGALRM, SIG_DFL); - - setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1); - execvp(ptyproc->process.argv[0], ptyproc->process.argv); - fprintf(stderr, "execvp failed: %s\n", strerror(errno)); -} - -static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL -{ - // Taken from pangoterm - termios->c_iflag = ICRNL|IXON; - termios->c_oflag = OPOST|ONLCR; -#ifdef TAB0 - termios->c_oflag |= TAB0; -#endif - termios->c_cflag = CS8|CREAD; - termios->c_lflag = ISIG|ICANON|IEXTEN|ECHO|ECHOE|ECHOK; - - cfsetspeed(termios, 38400); - -#ifdef IUTF8 - termios->c_iflag |= IUTF8; -#endif -#ifdef NL0 - termios->c_oflag |= NL0; -#endif -#ifdef CR0 - termios->c_oflag |= CR0; -#endif -#ifdef BS0 - termios->c_oflag |= BS0; -#endif -#ifdef VT0 - termios->c_oflag |= VT0; -#endif -#ifdef FF0 - termios->c_oflag |= FF0; -#endif -#ifdef ECHOCTL - termios->c_lflag |= ECHOCTL; -#endif -#ifdef ECHOKE - termios->c_lflag |= ECHOKE; -#endif - - termios->c_cc[VINTR] = 0x1f & 'C'; - termios->c_cc[VQUIT] = 0x1f & '\\'; - termios->c_cc[VERASE] = 0x7f; - termios->c_cc[VKILL] = 0x1f & 'U'; - termios->c_cc[VEOF] = 0x1f & 'D'; - termios->c_cc[VEOL] = _POSIX_VDISABLE; - termios->c_cc[VEOL2] = _POSIX_VDISABLE; - termios->c_cc[VSTART] = 0x1f & 'Q'; - termios->c_cc[VSTOP] = 0x1f & 'S'; - termios->c_cc[VSUSP] = 0x1f & 'Z'; - termios->c_cc[VREPRINT] = 0x1f & 'R'; - termios->c_cc[VWERASE] = 0x1f & 'W'; - termios->c_cc[VLNEXT] = 0x1f & 'V'; - termios->c_cc[VMIN] = 1; - termios->c_cc[VTIME] = 0; -} - -static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe) - FUNC_ATTR_NONNULL_ALL -{ - int fd_dup = dup(fd); - if (fd_dup < 0) { - ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); - return false; - } - int uv_result = uv_pipe_open(pipe, fd_dup); - if (uv_result) { - ELOG("Failed to set pipe to descriptor %d: %s", - fd_dup, uv_strerror(uv_result)); - close(fd_dup); - return false; - } - return true; -} - -static void chld_handler(uv_signal_t *handle, int signum) -{ - int stat = 0; - int pid; - - do { - pid = waitpid(-1, &stat, WNOHANG); - } while (pid < 0 && errno == EINTR); - - if (pid <= 0) { - return; - } - - Loop *loop = handle->loop->data; - - kl_iter(WatcherPtr, loop->children, current) { - Process *proc = (*current)->data; - if (proc->pid == pid) { - if (WIFEXITED(stat)) { - proc->status = WEXITSTATUS(stat); - } else if (WIFSIGNALED(stat)) { - proc->status = WTERMSIG(stat); - } - proc->internal_exit_cb(proc); - break; - } - } -} diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h deleted file mode 100644 index 446d7fd3c8..0000000000 --- a/src/nvim/event/pty_process.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef NVIM_EVENT_PTY_PROCESS_H -#define NVIM_EVENT_PTY_PROCESS_H - -#include <sys/ioctl.h> - -#include "nvim/event/process.h" - -typedef struct pty_process { - Process process; - char *term_name; - uint16_t width, height; - struct winsize winsize; - int tty_fd; -} PtyProcess; - -static inline PtyProcess pty_process_init(Loop *loop, void *data) -{ - PtyProcess rv; - rv.process = process_init(loop, kProcessTypePty, data); - rv.term_name = NULL; - rv.width = 80; - rv.height = 24; - rv.tty_fd = -1; - return rv; -} - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/pty_process.h.generated.h" -#endif -#endif // NVIM_EVENT_PTY_PROCESS_H diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c deleted file mode 100644 index c5ef22d426..0000000000 --- a/src/nvim/event/queue.c +++ /dev/null @@ -1,208 +0,0 @@ -// Queue for selective async event processing. Instances of this queue support a -// parent/child relationship with the following properties: -// -// - pushing a node to a child queue will push a corresponding link node to the -// parent queue -// - removing a link node from a parent queue will remove the next node -// in the linked child queue -// - removing a node from a child queue will remove the corresponding link node -// in the parent queue -// -// These properties allow neovim to organize and process events from different -// sources with a certain degree of control. Here's how the queue is used: -// -// +----------------+ -// | Main loop | -// +----------------+ -// ^ -// | -// +----------------+ -// +-------------->| Event loop |<------------+ -// | +--+-------------+ | -// | ^ ^ | -// | | | | -// +-----------+ +-----------+ +---------+ +---------+ -// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 | -// +-----------+ +-----------+ +---------+ +---------+ -// -// -// In the above diagram, the lower boxes represents event emitters, each with -// it's own private queue that have the event loop queue as the parent. -// -// When idle, the main loop spins the event loop which queues events from many -// sources(channels, jobs, user...). Each event emitter pushes events to its own -// private queue which is propagated to the event loop queue. When the main loop -// consumes an event, the corresponding event is removed from the emitter's -// queue. -// -// The main reason for this queue hierarchy is to allow focusing on a single -// event emitter while blocking the main loop. For example, if the `jobwait` -// vimscript function is called on job1, the main loop will temporarily stop -// polling the event loop queue and poll job1 queue instead. Same with channels, -// when calling `rpcrequest`, we want to temporarily stop processing events from -// other sources and focus on a specific channel. - -#include <assert.h> -#include <stdarg.h> -#include <stdbool.h> -#include <stdint.h> - - -#include <uv.h> - -#include "nvim/event/queue.h" -#include "nvim/memory.h" -#include "nvim/os/time.h" - -typedef struct queue_item QueueItem; -struct queue_item { - union { - Queue *queue; - struct { - Event event; - QueueItem *parent; - } item; - } data; - bool link; // this is just a link to a node in a child queue - QUEUE node; -}; - -struct queue { - Queue *parent; - QUEUE headtail; - put_callback put_cb; - void *data; -}; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/queue.c.generated.h" -#endif - -static Event NILEVENT = {.handler = NULL, .argv = {NULL}}; - -Queue *queue_new_parent(put_callback put_cb, void *data) -{ - return queue_new(NULL, put_cb, data); -} - -Queue *queue_new_child(Queue *parent) - FUNC_ATTR_NONNULL_ALL -{ - assert(!parent->parent); - return queue_new(parent, NULL, NULL); -} - -static Queue *queue_new(Queue *parent, put_callback put_cb, void *data) -{ - Queue *rv = xmalloc(sizeof(Queue)); - QUEUE_INIT(&rv->headtail); - rv->parent = parent; - rv->put_cb = put_cb; - rv->data = data; - return rv; -} - -void queue_free(Queue *queue) -{ - assert(queue); - while (!QUEUE_EMPTY(&queue->headtail)) { - QUEUE *q = QUEUE_HEAD(&queue->headtail); - QueueItem *item = queue_node_data(q); - if (queue->parent) { - QUEUE_REMOVE(&item->data.item.parent->node); - xfree(item->data.item.parent); - } - QUEUE_REMOVE(q); - xfree(item); - } - - xfree(queue); -} - -Event queue_get(Queue *queue) -{ - return queue_empty(queue) ? NILEVENT : queue_remove(queue); -} - -void queue_put_event(Queue *queue, Event event) -{ - assert(queue); - queue_push(queue, event); - if (queue->parent && queue->parent->put_cb) { - queue->parent->put_cb(queue->parent, queue->parent->data); - } -} - -void queue_process_events(Queue *queue) -{ - assert(queue); - while (!queue_empty(queue)) { - Event event = queue_get(queue); - if (event.handler) { - event.handler(event.argv); - } - } -} - -bool queue_empty(Queue *queue) -{ - assert(queue); - return QUEUE_EMPTY(&queue->headtail); -} - -void queue_replace_parent(Queue *queue, Queue *new_parent) -{ - assert(queue_empty(queue)); - queue->parent = new_parent; -} - -static Event queue_remove(Queue *queue) -{ - assert(!queue_empty(queue)); - QUEUE *h = QUEUE_HEAD(&queue->headtail); - QUEUE_REMOVE(h); - QueueItem *item = queue_node_data(h); - Event rv; - - if (item->link) { - assert(!queue->parent); - // remove the next node in the linked queue - Queue *linked = item->data.queue; - assert(!queue_empty(linked)); - QueueItem *child = - queue_node_data(QUEUE_HEAD(&linked->headtail)); - QUEUE_REMOVE(&child->node); - rv = child->data.item.event; - xfree(child); - } else { - if (queue->parent) { - // remove the corresponding link node in the parent queue - QUEUE_REMOVE(&item->data.item.parent->node); - xfree(item->data.item.parent); - } - rv = item->data.item.event; - } - - xfree(item); - return rv; -} - -static void queue_push(Queue *queue, Event event) -{ - QueueItem *item = xmalloc(sizeof(QueueItem)); - item->link = false; - item->data.item.event = event; - QUEUE_INSERT_TAIL(&queue->headtail, &item->node); - if (queue->parent) { - // push link node to the parent queue - item->data.item.parent = xmalloc(sizeof(QueueItem)); - item->data.item.parent->link = true; - item->data.item.parent->data.queue = queue; - QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node); - } -} - -static QueueItem *queue_node_data(QUEUE *q) -{ - return QUEUE_DATA(q, QueueItem, node); -} diff --git a/src/nvim/event/queue.h b/src/nvim/event/queue.h deleted file mode 100644 index 85fc59f8b2..0000000000 --- a/src/nvim/event/queue.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef NVIM_EVENT_QUEUE_H -#define NVIM_EVENT_QUEUE_H - -#include <uv.h> - -#include "nvim/event/defs.h" -#include "nvim/lib/queue.h" - -typedef struct queue Queue; -typedef void (*put_callback)(Queue *queue, void *data); - -#define queue_put(q, h, ...) \ - queue_put_event(q, event_create(1, h, __VA_ARGS__)); - - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/queue.h.generated.h" -#endif -#endif // NVIM_EVENT_QUEUE_H diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 9f3fbc25ff..92efc9fa2e 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -17,21 +17,19 @@ # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, - void *data) +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(loop, stream, fd, NULL, data); + stream_init(loop, stream, fd, NULL); rstream_init(stream, bufsize); } -void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, - void *data) +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(NULL, stream, -1, uvstream, data); + stream_init(NULL, stream, -1, uvstream); rstream_init(stream, bufsize); } @@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize) /// Starts watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_start(Stream *stream, stream_read_cb cb) +void rstream_start(Stream *stream, stream_read_cb cb, void *data) FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; + stream->cb_data = data; if (stream->uvstream) { uv_read_start(stream->uvstream, alloc_cb, read_cb); } else { @@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) { Stream *stream = data; assert(stream->read_cb); - rstream_start(stream, stream->read_cb); + rstream_start(stream, stream->read_cb, stream->cb_data); } // Callbacks used by libuv @@ -100,6 +99,10 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) { Stream *stream = uvstream->data; + if (cnt > 0) { + stream->num_bytes += (size_t)cnt; + } + if (cnt <= 0) { if (cnt != UV_ENOBUFS // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: @@ -109,8 +112,8 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` // won't be called) && cnt != 0) { - DLOG("Closing Stream(%p) because of %s(%zd)", stream, - uv_strerror((int)cnt), cnt); + DLOG("Closing Stream (%p): %s (%s)", stream, + uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); @@ -175,7 +178,7 @@ static void read_event(void **argv) if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; - stream->read_cb(stream, stream->buffer, count, stream->data, eof); + stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); } stream->pending_reqs--; if (stream->closed && !stream->pending_reqs) { @@ -185,10 +188,6 @@ static void read_event(void **argv) static void invoke_read_cb(Stream *stream, size_t count, bool eof) { - if (stream->closed) { - return; - } - // Don't let the stream be closed before the event is processed. stream->pending_reqs++; diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h index e32608acc0..7fe352edef 100644 --- a/src/nvim/event/signal.h +++ b/src/nvim/event/signal.h @@ -14,7 +14,7 @@ struct signal_watcher { void *data; signal_cb cb; signal_close_cb close_cb; - Queue *events; + MultiQueue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 93cc592683..8f9327f3d4 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -103,7 +103,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) // Libuv converts ENOENT to EACCES for Windows compatibility, but if // the parent directory does not exist, ENOENT would be more accurate. *path_tail((char_u *)watcher->addr) = NUL; - if (!os_file_exists((char_u *)watcher->addr)) { + if (!os_path_exists((char_u *)watcher->addr)) { result = -ENOENT; } } @@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) return 0; } -int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) +int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; @@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) return result; } - stream_init(NULL, stream, -1, client, data); + stream_init(NULL, stream, -1, client); return 0; } diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index ad59fdbe3a..eb0823c76d 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -30,7 +30,7 @@ struct socket_watcher { void *data; socket_cb cb; socket_close_cb close_cb; - Queue *events; + MultiQueue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 71582ab357..26083c20f4 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking) return retval; } -void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, - void *data) +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(2) { stream->uvstream = uvstream; @@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->uvstream->data = stream; } - stream->data = data; stream->internal_data = NULL; stream->fpos = 0; stream->curmem = 0; @@ -71,14 +69,16 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->closed = false; stream->buffer = NULL; stream->events = NULL; + stream->num_bytes = 0; } -void stream_close(Stream *stream, stream_close_cb on_stream_close) +void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); stream->closed = true; stream->close_cb = on_stream_close; + stream->close_cb_data = data; if (!stream->pending_reqs) { stream_close_handle(stream); @@ -102,7 +102,7 @@ static void close_cb(uv_handle_t *handle) rbuffer_free(stream->buffer); } if (stream->close_cb) { - stream->close_cb(stream, stream->data); + stream->close_cb(stream, stream->close_cb_data); } if (stream->internal_close_cb) { stream->internal_close_cb(stream, stream->internal_data); diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index c6baac0db7..d27497e4a4 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -44,14 +44,16 @@ struct stream { uv_file fd; stream_read_cb read_cb; stream_write_cb write_cb; + void *cb_data; stream_close_cb close_cb, internal_close_cb; + void *close_cb_data, *internal_data; size_t fpos; size_t curmem; size_t maxmem; size_t pending_reqs; - void *data, *internal_data; + size_t num_bytes; bool closed; - Queue *events; + MultiQueue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c index 7bf333bcea..77260546db 100644 --- a/src/nvim/event/time.c +++ b/src/nvim/event/time.c @@ -17,6 +17,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) watcher->uv.data = watcher; watcher->data = data; watcher->events = loop->fast_events; + watcher->blockable = false; } void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, @@ -50,6 +51,10 @@ static void time_watcher_cb(uv_timer_t *handle) FUNC_ATTR_NONNULL_ALL { TimeWatcher *watcher = handle->data; + if (watcher->blockable && !multiqueue_empty(watcher->events)) { + // the timer blocked and there already is an unprocessed event waiting + return; + } CREATE_EVENT(watcher->events, time_event, 1, watcher); } diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h index 7882b2b627..a6de89ad6e 100644 --- a/src/nvim/event/time.h +++ b/src/nvim/event/time.h @@ -12,7 +12,8 @@ struct time_watcher { uv_timer_t uv; void *data; time_cb cb, close_cb; - Queue *events; + MultiQueue *events; + bool blockable; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 8028e35e6b..fc7aad8eb9 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -22,19 +22,17 @@ typedef struct { # include "event/wstream.c.generated.h" #endif -void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, - void *data) +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(loop, stream, fd, NULL, data); + stream_init(loop, stream, fd, NULL); wstream_init(stream, maxmem); } -void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, - void *data) +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(NULL, stream, -1, uvstream, data); + stream_init(NULL, stream, -1, uvstream); wstream_init(stream, maxmem); } @@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem) /// /// @param stream The `Stream` instance /// @param cb The callback -void wstream_set_write_cb(Stream *stream, stream_write_cb cb) - FUNC_ATTR_NONNULL_ALL +void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data) + FUNC_ATTR_NONNULL_ARG(1, 2) { stream->write_cb = cb; + stream->cb_data = data; } /// Queues data for writing to the backing file descriptor of a `Stream` @@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status) wstream_release_wbuffer(data->buffer); if (data->stream->write_cb) { - data->stream->write_cb(data->stream, data->stream->data, status); + data->stream->write_cb(data->stream, data->stream->cb_data, status); } data->stream->pending_reqs--; |