aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/defs.h26
-rw-r--r--src/nvim/event/libuv_process.c5
-rw-r--r--src/nvim/event/loop.c46
-rw-r--r--src/nvim/event/loop.h68
-rw-r--r--src/nvim/event/multiqueue.c230
-rw-r--r--src/nvim/event/multiqueue.h19
-rw-r--r--src/nvim/event/process.c119
-rw-r--r--src/nvim/event/process.h4
-rw-r--r--src/nvim/event/pty_process.c240
-rw-r--r--src/nvim/event/pty_process.h30
-rw-r--r--src/nvim/event/queue.c208
-rw-r--r--src/nvim/event/queue.h19
-rw-r--r--src/nvim/event/rstream.c29
-rw-r--r--src/nvim/event/signal.h2
-rw-r--r--src/nvim/event/socket.c6
-rw-r--r--src/nvim/event/socket.h2
-rw-r--r--src/nvim/event/stream.c10
-rw-r--r--src/nvim/event/stream.h6
-rw-r--r--src/nvim/event/time.c5
-rw-r--r--src/nvim/event/time.h3
-rw-r--r--src/nvim/event/wstream.c17
21 files changed, 460 insertions, 634 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h
index b802866a3d..e5335d9f25 100644
--- a/src/nvim/event/defs.h
+++ b/src/nvim/event/defs.h
@@ -14,19 +14,19 @@ typedef struct message {
} Event;
typedef void(*event_scheduler)(Event event, void *data);
-#define VA_EVENT_INIT(event, p, h, a) \
- do { \
- assert(a <= EVENT_HANDLER_MAX_ARGC); \
- (event)->priority = p; \
- (event)->handler = h; \
- if (a) { \
- va_list args; \
- va_start(args, a); \
- for (int i = 0; i < a; i++) { \
- (event)->argv[i] = va_arg(args, void *); \
- } \
- va_end(args); \
- } \
+#define VA_EVENT_INIT(event, p, h, a) \
+ do { \
+ assert(a <= EVENT_HANDLER_MAX_ARGC); \
+ (event)->priority = p; \
+ (event)->handler = h; \
+ if (a) { \
+ va_list args; \
+ va_start(args, a); \
+ for (int i = 0; i < a; i++) { \
+ (event)->argv[i] = va_arg(args, void *); \
+ } \
+ va_end(args); \
+ } \
} while (0)
static inline Event event_create(int priority, argv_callback cb, int argc, ...)
diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c
index 9ef3468284..907187aa17 100644
--- a/src/nvim/event/libuv_process.c
+++ b/src/nvim/event/libuv_process.c
@@ -19,13 +19,12 @@ bool libuv_process_spawn(LibuvProcess *uvproc)
Process *proc = (Process *)uvproc;
uvproc->uvopts.file = proc->argv[0];
uvproc->uvopts.args = proc->argv;
- uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE
- | UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS;
+ uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE;
if (proc->detach) {
uvproc->uvopts.flags |= UV_PROCESS_DETACHED;
}
uvproc->uvopts.exit_cb = exit_cb;
- uvproc->uvopts.cwd = NULL;
+ uvproc->uvopts.cwd = proc->cwd;
uvproc->uvopts.env = NULL;
uvproc->uvopts.stdio = uvproc->uvstdio;
uvproc->uvopts.stdio_count = 3;
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
index 6f3e6b9253..0e1775d01b 100644
--- a/src/nvim/event/loop.c
+++ b/src/nvim/event/loop.c
@@ -18,9 +18,9 @@ void loop_init(Loop *loop, void *data)
loop->uv.data = loop;
loop->children = kl_init(WatcherPtr);
loop->children_stop_requests = 0;
- loop->events = queue_new_parent(loop_on_put, loop);
- loop->fast_events = queue_new_child(loop->events);
- loop->thread_events = queue_new_parent(NULL, NULL);
+ loop->events = multiqueue_new_parent(loop_on_put, loop);
+ loop->fast_events = multiqueue_new_child(loop->events);
+ loop->thread_events = multiqueue_new_parent(NULL, NULL);
uv_mutex_init(&loop->mutex);
uv_async_init(&loop->uv, &loop->async, async_cb);
uv_signal_init(&loop->uv, &loop->children_watcher);
@@ -53,19 +53,19 @@ void loop_poll_events(Loop *loop, int ms)
}
loop->recursive--; // Can re-enter uv_run now
- queue_process_events(loop->fast_events);
+ multiqueue_process_events(loop->fast_events);
}
// Schedule an event from another thread
void loop_schedule(Loop *loop, Event event)
{
uv_mutex_lock(&loop->mutex);
- queue_put_event(loop->thread_events, event);
+ multiqueue_put_event(loop->thread_events, event);
uv_async_send(&loop->async);
uv_mutex_unlock(&loop->mutex);
}
-void loop_on_put(Queue *queue, void *data)
+void loop_on_put(MultiQueue *queue, void *data)
{
Loop *loop = data;
// Sometimes libuv will run pending callbacks(timer for example) before
@@ -76,7 +76,7 @@ void loop_on_put(Queue *queue, void *data)
uv_stop(&loop->uv);
}
-void loop_close(Loop *loop)
+void loop_close(Loop *loop, bool wait)
{
uv_mutex_destroy(&loop->mutex);
uv_close((uv_handle_t *)&loop->children_watcher, NULL);
@@ -84,21 +84,37 @@ void loop_close(Loop *loop)
uv_close((uv_handle_t *)&loop->poll_timer, NULL);
uv_close((uv_handle_t *)&loop->async, NULL);
do {
- uv_run(&loop->uv, UV_RUN_DEFAULT);
- } while (uv_loop_close(&loop->uv));
- queue_free(loop->fast_events);
- queue_free(loop->thread_events);
- queue_free(loop->events);
+ uv_run(&loop->uv, wait ? UV_RUN_DEFAULT : UV_RUN_NOWAIT);
+ } while (uv_loop_close(&loop->uv) && wait);
+ multiqueue_free(loop->fast_events);
+ multiqueue_free(loop->thread_events);
+ multiqueue_free(loop->events);
kl_destroy(WatcherPtr, loop->children);
}
+void loop_purge(Loop *loop)
+{
+ uv_mutex_lock(&loop->mutex);
+ multiqueue_purge_events(loop->thread_events);
+ multiqueue_purge_events(loop->fast_events);
+ uv_mutex_unlock(&loop->mutex);
+}
+
+size_t loop_size(Loop *loop)
+{
+ uv_mutex_lock(&loop->mutex);
+ size_t rv = multiqueue_size(loop->thread_events);
+ uv_mutex_unlock(&loop->mutex);
+ return rv;
+}
+
static void async_cb(uv_async_t *handle)
{
Loop *l = handle->loop->data;
uv_mutex_lock(&l->mutex);
- while (!queue_empty(l->thread_events)) {
- Event ev = queue_get(l->thread_events);
- queue_put_event(l->fast_events, ev);
+ while (!multiqueue_empty(l->thread_events)) {
+ Event ev = multiqueue_get(l->thread_events);
+ multiqueue_put_event(l->fast_events, ev);
}
uv_mutex_unlock(&l->mutex);
}
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
index 0c1fcb5ed9..e7d7bdd483 100644
--- a/src/nvim/event/loop.h
+++ b/src/nvim/event/loop.h
@@ -7,7 +7,7 @@
#include "nvim/lib/klist.h"
#include "nvim/os/time.h"
-#include "nvim/event/queue.h"
+#include "nvim/event/multiqueue.h"
typedef void * WatcherPtr;
@@ -16,7 +16,7 @@ KLIST_INIT(WatcherPtr, WatcherPtr, _noop)
typedef struct loop {
uv_loop_t uv;
- Queue *events, *fast_events, *thread_events;
+ MultiQueue *events, *fast_events, *thread_events;
klist_t(WatcherPtr) *children;
uv_signal_t children_watcher;
uv_timer_t children_kill_timer, poll_timer;
@@ -26,43 +26,43 @@ typedef struct loop {
int recursive;
} Loop;
-#define CREATE_EVENT(queue, handler, argc, ...) \
- do { \
- if (queue) { \
- queue_put((queue), (handler), argc, __VA_ARGS__); \
- } else { \
- void *argv[argc] = {__VA_ARGS__}; \
- (handler)(argv); \
- } \
+#define CREATE_EVENT(multiqueue, handler, argc, ...) \
+ do { \
+ if (multiqueue) { \
+ multiqueue_put((multiqueue), (handler), argc, __VA_ARGS__); \
+ } else { \
+ void *argv[argc] = { __VA_ARGS__ }; \
+ (handler)(argv); \
+ } \
} while (0)
// Poll for events until a condition or timeout
-#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \
- do { \
- int remaining = timeout; \
- uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
- while (!(condition)) { \
- LOOP_PROCESS_EVENTS(loop, queue, remaining); \
- if (remaining == 0) { \
- break; \
- } else if (remaining > 0) { \
- uint64_t now = os_hrtime(); \
- remaining -= (int) ((now - before) / 1000000); \
- before = now; \
- if (remaining <= 0) { \
- break; \
- } \
- } \
- } \
+#define LOOP_PROCESS_EVENTS_UNTIL(loop, multiqueue, timeout, condition) \
+ do { \
+ int remaining = timeout; \
+ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
+ while (!(condition)) { \
+ LOOP_PROCESS_EVENTS(loop, multiqueue, remaining); \
+ if (remaining == 0) { \
+ break; \
+ } else if (remaining > 0) { \
+ uint64_t now = os_hrtime(); \
+ remaining -= (int) ((now - before) / 1000000); \
+ before = now; \
+ if (remaining <= 0) { \
+ break; \
+ } \
+ } \
+ } \
} while (0)
-#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \
- do { \
- if (queue && !queue_empty(queue)) { \
- queue_process_events(queue); \
- } else { \
- loop_poll_events(loop, timeout); \
- } \
+#define LOOP_PROCESS_EVENTS(loop, multiqueue, timeout) \
+ do { \
+ if (multiqueue && !multiqueue_empty(multiqueue)) { \
+ multiqueue_process_events(multiqueue); \
+ } else { \
+ loop_poll_events(loop, timeout); \
+ } \
} while (0)
diff --git a/src/nvim/event/multiqueue.c b/src/nvim/event/multiqueue.c
new file mode 100644
index 0000000000..79b4dd9458
--- /dev/null
+++ b/src/nvim/event/multiqueue.c
@@ -0,0 +1,230 @@
+// Multi-level queue for selective async event processing.
+// Not threadsafe; access must be synchronized externally.
+//
+// Multiqueue supports a parent-child relationship with these properties:
+// - pushing a node to a child queue will push a corresponding link node to the
+// parent queue
+// - removing a link node from a parent queue will remove the next node
+// in the linked child queue
+// - removing a node from a child queue will remove the corresponding link node
+// in the parent queue
+//
+// These properties allow Nvim to organize and process events from different
+// sources with a certain degree of control. How the multiqueue is used:
+//
+// +----------------+
+// | Main loop |
+// +----------------+
+//
+// +----------------+
+// +-------------->| Event loop |<------------+
+// | +--+-------------+ |
+// | ^ ^ |
+// | | | |
+// +-----------+ +-----------+ +---------+ +---------+
+// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 |
+// +-----------+ +-----------+ +---------+ +---------+
+//
+//
+// The lower boxes represent event emitters, each with its own private queue
+// having the event loop queue as the parent.
+//
+// When idle, the main loop spins the event loop which queues events from many
+// sources (channels, jobs, user...). Each event emitter pushes events to its
+// private queue which is propagated to the event loop queue. When the main loop
+// consumes an event, the corresponding event is removed from the emitter's
+// queue.
+//
+// The main reason for this queue hierarchy is to allow focusing on a single
+// event emitter while blocking the main loop. For example, if the `jobwait`
+// VimL function is called on job1, the main loop will temporarily stop polling
+// the event loop queue and poll job1 queue instead. Same with channels, when
+// calling `rpcrequest` we want to temporarily stop processing events from
+// other sources and focus on a specific channel.
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+
+#include <uv.h>
+
+#include "nvim/event/multiqueue.h"
+#include "nvim/memory.h"
+#include "nvim/os/time.h"
+
+typedef struct multiqueue_item MultiQueueItem;
+struct multiqueue_item {
+ union {
+ MultiQueue *queue;
+ struct {
+ Event event;
+ MultiQueueItem *parent_item;
+ } item;
+ } data;
+ bool link; // true: current item is just a link to a node in a child queue
+ QUEUE node;
+};
+
+struct multiqueue {
+ MultiQueue *parent;
+ QUEUE headtail; // circularly-linked
+ put_callback put_cb;
+ void *data;
+ size_t size;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/multiqueue.c.generated.h"
+#endif
+
+static Event NILEVENT = { .handler = NULL, .argv = {NULL} };
+
+MultiQueue *multiqueue_new_parent(put_callback put_cb, void *data)
+{
+ return multiqueue_new(NULL, put_cb, data);
+}
+
+MultiQueue *multiqueue_new_child(MultiQueue *parent)
+ FUNC_ATTR_NONNULL_ALL
+{
+ assert(!parent->parent); // parent cannot have a parent, more like a "root"
+ parent->size++;
+ return multiqueue_new(parent, NULL, NULL);
+}
+
+static MultiQueue *multiqueue_new(MultiQueue *parent, put_callback put_cb,
+ void *data)
+{
+ MultiQueue *rv = xmalloc(sizeof(MultiQueue));
+ QUEUE_INIT(&rv->headtail);
+ rv->size = 0;
+ rv->parent = parent;
+ rv->put_cb = put_cb;
+ rv->data = data;
+ return rv;
+}
+
+void multiqueue_free(MultiQueue *this)
+{
+ assert(this);
+ while (!QUEUE_EMPTY(&this->headtail)) {
+ QUEUE *q = QUEUE_HEAD(&this->headtail);
+ MultiQueueItem *item = multiqueue_node_data(q);
+ if (this->parent) {
+ QUEUE_REMOVE(&item->data.item.parent_item->node);
+ xfree(item->data.item.parent_item);
+ }
+ QUEUE_REMOVE(q);
+ xfree(item);
+ }
+
+ xfree(this);
+}
+
+Event multiqueue_get(MultiQueue *this)
+{
+ return multiqueue_empty(this) ? NILEVENT : multiqueue_remove(this);
+}
+
+void multiqueue_put_event(MultiQueue *this, Event event)
+{
+ assert(this);
+ multiqueue_push(this, event);
+ if (this->parent && this->parent->put_cb) {
+ this->parent->put_cb(this->parent, this->parent->data);
+ }
+}
+
+void multiqueue_process_events(MultiQueue *this)
+{
+ assert(this);
+ while (!multiqueue_empty(this)) {
+ Event event = multiqueue_get(this);
+ if (event.handler) {
+ event.handler(event.argv);
+ }
+ }
+}
+
+/// Removes all events without processing them.
+void multiqueue_purge_events(MultiQueue *this)
+{
+ assert(this);
+ while (!multiqueue_empty(this)) {
+ (void)multiqueue_remove(this);
+ }
+}
+
+bool multiqueue_empty(MultiQueue *this)
+{
+ assert(this);
+ return QUEUE_EMPTY(&this->headtail);
+}
+
+void multiqueue_replace_parent(MultiQueue *this, MultiQueue *new_parent)
+{
+ assert(multiqueue_empty(this));
+ this->parent = new_parent;
+}
+
+/// Gets the count of all events currently in the queue.
+size_t multiqueue_size(MultiQueue *this)
+{
+ return this->size;
+}
+
+static Event multiqueue_remove(MultiQueue *this)
+{
+ assert(!multiqueue_empty(this));
+ QUEUE *h = QUEUE_HEAD(&this->headtail);
+ QUEUE_REMOVE(h);
+ MultiQueueItem *item = multiqueue_node_data(h);
+ Event rv;
+
+ if (item->link) {
+ assert(!this->parent);
+ // remove the next node in the linked queue
+ MultiQueue *linked = item->data.queue;
+ assert(!multiqueue_empty(linked));
+ MultiQueueItem *child =
+ multiqueue_node_data(QUEUE_HEAD(&linked->headtail));
+ QUEUE_REMOVE(&child->node);
+ rv = child->data.item.event;
+ xfree(child);
+ } else {
+ if (this->parent) {
+ // remove the corresponding link node in the parent queue
+ QUEUE_REMOVE(&item->data.item.parent_item->node);
+ xfree(item->data.item.parent_item);
+ }
+ rv = item->data.item.event;
+ }
+
+ this->size--;
+ xfree(item);
+ return rv;
+}
+
+static void multiqueue_push(MultiQueue *this, Event event)
+{
+ MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem));
+ item->link = false;
+ item->data.item.event = event;
+ QUEUE_INSERT_TAIL(&this->headtail, &item->node);
+ if (this->parent) {
+ // push link node to the parent queue
+ item->data.item.parent_item = xmalloc(sizeof(MultiQueueItem));
+ item->data.item.parent_item->link = true;
+ item->data.item.parent_item->data.queue = this;
+ QUEUE_INSERT_TAIL(&this->parent->headtail,
+ &item->data.item.parent_item->node);
+ }
+ this->size++;
+}
+
+static MultiQueueItem *multiqueue_node_data(QUEUE *q)
+{
+ return QUEUE_DATA(q, MultiQueueItem, node);
+}
diff --git a/src/nvim/event/multiqueue.h b/src/nvim/event/multiqueue.h
new file mode 100644
index 0000000000..def6b95a10
--- /dev/null
+++ b/src/nvim/event/multiqueue.h
@@ -0,0 +1,19 @@
+#ifndef NVIM_EVENT_MULTIQUEUE_H
+#define NVIM_EVENT_MULTIQUEUE_H
+
+#include <uv.h>
+
+#include "nvim/event/defs.h"
+#include "nvim/lib/queue.h"
+
+typedef struct multiqueue MultiQueue;
+typedef void (*put_callback)(MultiQueue *multiq, void *data);
+
+#define multiqueue_put(q, h, ...) \
+ multiqueue_put_event(q, event_create(1, h, __VA_ARGS__));
+
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/multiqueue.h.generated.h"
+#endif
+#endif // NVIM_EVENT_MULTIQUEUE_H
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 9bb62891c7..dc7886469b 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -9,7 +9,7 @@
#include "nvim/event/wstream.h"
#include "nvim/event/process.h"
#include "nvim/event/libuv_process.h"
-#include "nvim/event/pty_process.h"
+#include "nvim/os/pty_process.h"
#include "nvim/globals.h"
#include "nvim/log.h"
@@ -17,16 +17,15 @@
# include "event/process.c.generated.h"
#endif
-// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly
-// exit before we send SIGNAL to it
+// Time (ns) for a process to exit cleanly before we send TERM/KILL.
#define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
-#define CLOSE_PROC_STREAM(proc, stream) \
- do { \
- if (proc->stream && !proc->stream->closed) { \
- stream_close(proc->stream, NULL); \
- } \
+#define CLOSE_PROC_STREAM(proc, stream) \
+ do { \
+ if (proc->stream && !proc->stream->closed) { \
+ stream_close(proc->stream, NULL, NULL); \
+ } \
} while (0)
static bool process_is_tearing_down = false;
@@ -78,10 +77,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return false;
}
- void *data = proc->data;
-
if (proc->in) {
- stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
+ stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
@@ -89,7 +86,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->out) {
- stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
+ stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
@@ -97,7 +94,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->err) {
- stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
+ stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
@@ -116,23 +113,20 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
process_is_tearing_down = true;
kl_iter(WatcherPtr, loop->children, current) {
Process *proc = (*current)->data;
- if (proc->detach) {
+ if (proc->detach || proc->type == kProcessTypePty) {
// Close handles to process without killing it.
CREATE_EVENT(loop->events, process_close_handles, 1, proc);
} else {
- if (proc->type == kProcessTypeUv) {
- uv_kill(proc->pid, SIGTERM);
- proc->term_sent = true;
- process_stop(proc);
- } else { // kProcessTypePty
- process_close_streams(proc);
- pty_process_close_master((PtyProcess *)proc);
- }
+ uv_kill(proc->pid, SIGTERM);
+ proc->term_sent = true;
+ process_stop(proc);
}
}
- // Wait until all children exit
- LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children));
+ // Wait until all children exit and all close events are processed.
+ LOOP_PROCESS_EVENTS_UNTIL(
+ loop, loop->events, -1,
+ kl_empty(loop->children) && multiqueue_empty(loop->events));
pty_process_teardown(loop);
}
@@ -169,14 +163,16 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// indistinguishable from the process returning -1 by itself. Which
/// is possible on some OS. Returns -2 if an user has interruped the
/// wait.
-int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1)
+int process_wait(Process *proc, int ms, MultiQueue *events)
+ FUNC_ATTR_NONNULL_ARG(1)
{
// The default status is -1, which represents a timeout
int status = -1;
bool interrupted = false;
if (!proc->refcount) {
+ status = proc->status;
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
- return proc->status;
+ return status;
}
if (!events) {
@@ -214,7 +210,7 @@ int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1)
decref(proc);
if (events) {
// the decref call created an exit event, process it now
- queue_process_events(events);
+ multiqueue_process_events(events);
}
} else {
proc->refcount--;
@@ -315,8 +311,10 @@ static void decref(Process *proc)
static void process_close(Process *proc)
FUNC_ATTR_NONNULL_ARG(1)
{
- if (process_is_tearing_down && proc->detach && proc->closed) {
- // If a detached process dies while tearing down it might get closed twice.
+ if (process_is_tearing_down && (proc->detach || proc->type == kProcessTypePty)
+ && proc->closed) {
+ // If a detached/pty process dies while tearing down it might get closed
+ // twice.
return;
}
assert(!proc->closed);
@@ -333,9 +331,61 @@ static void process_close(Process *proc)
}
}
+/// Flush output stream.
+///
+/// @param proc Process, for which an output stream should be flushed.
+/// @param stream Stream to flush.
+static void flush_stream(Process *proc, Stream *stream)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ if (!stream || stream->closed) {
+ return;
+ }
+
+ // Maximal remaining data size of terminated process is system
+ // buffer size.
+ // Also helps with a child process that keeps the output streams open. If it
+ // keeps sending data, we only accept as much data as the system buffer size.
+ // Otherwise this would block cleanup/teardown.
+ int system_buffer_size = 0;
+ int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe,
+ &system_buffer_size);
+ if (err) {
+ system_buffer_size = (int)rbuffer_capacity(stream->buffer);
+ }
+
+ size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size;
+
+ // Read remaining data.
+ while (!stream->closed && stream->num_bytes < max_bytes) {
+ // Remember number of bytes before polling
+ size_t num_bytes = stream->num_bytes;
+
+ // Poll for data and process the generated events.
+ loop_poll_events(proc->loop, 0);
+ if (proc->events) {
+ multiqueue_process_events(proc->events);
+ }
+
+ // Stream can be closed if it is empty.
+ if (num_bytes == stream->num_bytes) {
+ if (stream->read_cb) {
+ // Stream callback could miss EOF handling if a child keeps the stream
+ // open.
+ stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
+ }
+ break;
+ }
+ }
+}
+
static void process_close_handles(void **argv)
{
Process *proc = argv[0];
+
+ flush_stream(proc, proc->out);
+ flush_stream(proc, proc->err);
+
process_close_streams(proc);
process_close(proc);
}
@@ -350,11 +400,12 @@ static void on_process_exit(Process *proc)
uv_timer_stop(&loop->children_kill_timer);
}
- // Process handles are closed in the next event loop tick. This is done to
- // give libuv more time to read data from the OS after the process exits(If
- // process_close_streams is called with data still in the OS buffer, we lose
- // it)
- CREATE_EVENT(proc->events, process_close_handles, 1, proc);
+ // Process has terminated, but there could still be data to be read from the
+ // OS. We are still in the libuv loop, so we cannot call code that polls for
+ // more data directly. Instead delay the reading after the libuv loop by
+ // queueing process_close_handles() as an event.
+ MultiQueue *queue = proc->events ? proc->events : loop->events;
+ CREATE_EVENT(queue, process_close_handles, 1, proc);
}
static void on_process_stream_close(Stream *stream, void *data)
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index e23c8ea60f..5cbf7f9ce7 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -21,12 +21,13 @@ struct process {
int pid, status, refcount;
// set to the hrtime of when process_stop was called for the process.
uint64_t stopped_time;
+ char *cwd;
char **argv;
Stream *in, *out, *err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
bool closed, term_sent, detach;
- Queue *events;
+ MultiQueue *events;
};
static inline Process process_init(Loop *loop, ProcessType type, void *data)
@@ -40,6 +41,7 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.status = 0,
.refcount = 0,
.stopped_time = 0,
+ .cwd = NULL,
.argv = NULL,
.in = NULL,
.out = NULL,
diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c
deleted file mode 100644
index 8eef72f12f..0000000000
--- a/src/nvim/event/pty_process.c
+++ /dev/null
@@ -1,240 +0,0 @@
-// Some of the code came from pangoterm and libuv
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <unistd.h>
-#include <termios.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <sys/ioctl.h>
-
-// forkpty is not in POSIX, so headers are platform-specific
-#if defined(__FreeBSD__)
-# include <libutil.h>
-#elif defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__)
-# include <util.h>
-#else
-# include <pty.h>
-#endif
-
-#include <uv.h>
-
-#include "nvim/lib/klist.h"
-
-#include "nvim/event/loop.h"
-#include "nvim/event/rstream.h"
-#include "nvim/event/wstream.h"
-#include "nvim/event/process.h"
-#include "nvim/event/pty_process.h"
-#include "nvim/log.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/pty_process.c.generated.h"
-#endif
-
-bool pty_process_spawn(PtyProcess *ptyproc)
- FUNC_ATTR_NONNULL_ALL
-{
- static struct termios termios;
- if (!termios.c_cflag) {
- init_termios(&termios);
- }
-
- Process *proc = (Process *)ptyproc;
- assert(!proc->err);
- uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
- ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};
- uv_disable_stdio_inheritance();
- int master;
- int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
-
- if (pid < 0) {
- ELOG("forkpty failed: %s", strerror(errno));
- return false;
- } else if (pid == 0) {
- init_child(ptyproc);
- abort();
- }
-
- // make sure the master file descriptor is non blocking
- int master_status_flags = fcntl(master, F_GETFL);
- if (master_status_flags == -1) {
- ELOG("Failed to get master descriptor status flags: %s", strerror(errno));
- goto error;
- }
- if (fcntl(master, F_SETFL, master_status_flags | O_NONBLOCK) == -1) {
- ELOG("Failed to make master descriptor non-blocking: %s", strerror(errno));
- goto error;
- }
-
- if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) {
- goto error;
- }
- if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {
- goto error;
- }
-
- ptyproc->tty_fd = master;
- proc->pid = pid;
- return true;
-
-error:
- close(master);
- kill(pid, SIGKILL);
- waitpid(pid, NULL, 0);
- return false;
-}
-
-void pty_process_resize(PtyProcess *ptyproc, uint16_t width,
- uint16_t height)
- FUNC_ATTR_NONNULL_ALL
-{
- ptyproc->winsize = (struct winsize){height, width, 0, 0};
- ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
-}
-
-void pty_process_close(PtyProcess *ptyproc)
- FUNC_ATTR_NONNULL_ALL
-{
- pty_process_close_master(ptyproc);
- Process *proc = (Process *)ptyproc;
- if (proc->internal_close_cb) {
- proc->internal_close_cb(proc);
- }
-}
-
-void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
-{
- if (ptyproc->tty_fd >= 0) {
- close(ptyproc->tty_fd);
- ptyproc->tty_fd = -1;
- }
-}
-
-void pty_process_teardown(Loop *loop)
-{
- uv_signal_stop(&loop->children_watcher);
-}
-
-static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
-{
- unsetenv("COLUMNS");
- unsetenv("LINES");
- unsetenv("TERMCAP");
- unsetenv("COLORTERM");
- unsetenv("COLORFGBG");
-
- signal(SIGCHLD, SIG_DFL);
- signal(SIGHUP, SIG_DFL);
- signal(SIGINT, SIG_DFL);
- signal(SIGQUIT, SIG_DFL);
- signal(SIGTERM, SIG_DFL);
- signal(SIGALRM, SIG_DFL);
-
- setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1);
- execvp(ptyproc->process.argv[0], ptyproc->process.argv);
- fprintf(stderr, "execvp failed: %s\n", strerror(errno));
-}
-
-static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
-{
- // Taken from pangoterm
- termios->c_iflag = ICRNL|IXON;
- termios->c_oflag = OPOST|ONLCR;
-#ifdef TAB0
- termios->c_oflag |= TAB0;
-#endif
- termios->c_cflag = CS8|CREAD;
- termios->c_lflag = ISIG|ICANON|IEXTEN|ECHO|ECHOE|ECHOK;
-
- cfsetspeed(termios, 38400);
-
-#ifdef IUTF8
- termios->c_iflag |= IUTF8;
-#endif
-#ifdef NL0
- termios->c_oflag |= NL0;
-#endif
-#ifdef CR0
- termios->c_oflag |= CR0;
-#endif
-#ifdef BS0
- termios->c_oflag |= BS0;
-#endif
-#ifdef VT0
- termios->c_oflag |= VT0;
-#endif
-#ifdef FF0
- termios->c_oflag |= FF0;
-#endif
-#ifdef ECHOCTL
- termios->c_lflag |= ECHOCTL;
-#endif
-#ifdef ECHOKE
- termios->c_lflag |= ECHOKE;
-#endif
-
- termios->c_cc[VINTR] = 0x1f & 'C';
- termios->c_cc[VQUIT] = 0x1f & '\\';
- termios->c_cc[VERASE] = 0x7f;
- termios->c_cc[VKILL] = 0x1f & 'U';
- termios->c_cc[VEOF] = 0x1f & 'D';
- termios->c_cc[VEOL] = _POSIX_VDISABLE;
- termios->c_cc[VEOL2] = _POSIX_VDISABLE;
- termios->c_cc[VSTART] = 0x1f & 'Q';
- termios->c_cc[VSTOP] = 0x1f & 'S';
- termios->c_cc[VSUSP] = 0x1f & 'Z';
- termios->c_cc[VREPRINT] = 0x1f & 'R';
- termios->c_cc[VWERASE] = 0x1f & 'W';
- termios->c_cc[VLNEXT] = 0x1f & 'V';
- termios->c_cc[VMIN] = 1;
- termios->c_cc[VTIME] = 0;
-}
-
-static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe)
- FUNC_ATTR_NONNULL_ALL
-{
- int fd_dup = dup(fd);
- if (fd_dup < 0) {
- ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
- return false;
- }
- int uv_result = uv_pipe_open(pipe, fd_dup);
- if (uv_result) {
- ELOG("Failed to set pipe to descriptor %d: %s",
- fd_dup, uv_strerror(uv_result));
- close(fd_dup);
- return false;
- }
- return true;
-}
-
-static void chld_handler(uv_signal_t *handle, int signum)
-{
- int stat = 0;
- int pid;
-
- do {
- pid = waitpid(-1, &stat, WNOHANG);
- } while (pid < 0 && errno == EINTR);
-
- if (pid <= 0) {
- return;
- }
-
- Loop *loop = handle->loop->data;
-
- kl_iter(WatcherPtr, loop->children, current) {
- Process *proc = (*current)->data;
- if (proc->pid == pid) {
- if (WIFEXITED(stat)) {
- proc->status = WEXITSTATUS(stat);
- } else if (WIFSIGNALED(stat)) {
- proc->status = WTERMSIG(stat);
- }
- proc->internal_exit_cb(proc);
- break;
- }
- }
-}
diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h
deleted file mode 100644
index 446d7fd3c8..0000000000
--- a/src/nvim/event/pty_process.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#ifndef NVIM_EVENT_PTY_PROCESS_H
-#define NVIM_EVENT_PTY_PROCESS_H
-
-#include <sys/ioctl.h>
-
-#include "nvim/event/process.h"
-
-typedef struct pty_process {
- Process process;
- char *term_name;
- uint16_t width, height;
- struct winsize winsize;
- int tty_fd;
-} PtyProcess;
-
-static inline PtyProcess pty_process_init(Loop *loop, void *data)
-{
- PtyProcess rv;
- rv.process = process_init(loop, kProcessTypePty, data);
- rv.term_name = NULL;
- rv.width = 80;
- rv.height = 24;
- rv.tty_fd = -1;
- return rv;
-}
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/pty_process.h.generated.h"
-#endif
-#endif // NVIM_EVENT_PTY_PROCESS_H
diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c
deleted file mode 100644
index c5ef22d426..0000000000
--- a/src/nvim/event/queue.c
+++ /dev/null
@@ -1,208 +0,0 @@
-// Queue for selective async event processing. Instances of this queue support a
-// parent/child relationship with the following properties:
-//
-// - pushing a node to a child queue will push a corresponding link node to the
-// parent queue
-// - removing a link node from a parent queue will remove the next node
-// in the linked child queue
-// - removing a node from a child queue will remove the corresponding link node
-// in the parent queue
-//
-// These properties allow neovim to organize and process events from different
-// sources with a certain degree of control. Here's how the queue is used:
-//
-// +----------------+
-// | Main loop |
-// +----------------+
-// ^
-// |
-// +----------------+
-// +-------------->| Event loop |<------------+
-// | +--+-------------+ |
-// | ^ ^ |
-// | | | |
-// +-----------+ +-----------+ +---------+ +---------+
-// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 |
-// +-----------+ +-----------+ +---------+ +---------+
-//
-//
-// In the above diagram, the lower boxes represents event emitters, each with
-// it's own private queue that have the event loop queue as the parent.
-//
-// When idle, the main loop spins the event loop which queues events from many
-// sources(channels, jobs, user...). Each event emitter pushes events to its own
-// private queue which is propagated to the event loop queue. When the main loop
-// consumes an event, the corresponding event is removed from the emitter's
-// queue.
-//
-// The main reason for this queue hierarchy is to allow focusing on a single
-// event emitter while blocking the main loop. For example, if the `jobwait`
-// vimscript function is called on job1, the main loop will temporarily stop
-// polling the event loop queue and poll job1 queue instead. Same with channels,
-// when calling `rpcrequest`, we want to temporarily stop processing events from
-// other sources and focus on a specific channel.
-
-#include <assert.h>
-#include <stdarg.h>
-#include <stdbool.h>
-#include <stdint.h>
-
-
-#include <uv.h>
-
-#include "nvim/event/queue.h"
-#include "nvim/memory.h"
-#include "nvim/os/time.h"
-
-typedef struct queue_item QueueItem;
-struct queue_item {
- union {
- Queue *queue;
- struct {
- Event event;
- QueueItem *parent;
- } item;
- } data;
- bool link; // this is just a link to a node in a child queue
- QUEUE node;
-};
-
-struct queue {
- Queue *parent;
- QUEUE headtail;
- put_callback put_cb;
- void *data;
-};
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/queue.c.generated.h"
-#endif
-
-static Event NILEVENT = {.handler = NULL, .argv = {NULL}};
-
-Queue *queue_new_parent(put_callback put_cb, void *data)
-{
- return queue_new(NULL, put_cb, data);
-}
-
-Queue *queue_new_child(Queue *parent)
- FUNC_ATTR_NONNULL_ALL
-{
- assert(!parent->parent);
- return queue_new(parent, NULL, NULL);
-}
-
-static Queue *queue_new(Queue *parent, put_callback put_cb, void *data)
-{
- Queue *rv = xmalloc(sizeof(Queue));
- QUEUE_INIT(&rv->headtail);
- rv->parent = parent;
- rv->put_cb = put_cb;
- rv->data = data;
- return rv;
-}
-
-void queue_free(Queue *queue)
-{
- assert(queue);
- while (!QUEUE_EMPTY(&queue->headtail)) {
- QUEUE *q = QUEUE_HEAD(&queue->headtail);
- QueueItem *item = queue_node_data(q);
- if (queue->parent) {
- QUEUE_REMOVE(&item->data.item.parent->node);
- xfree(item->data.item.parent);
- }
- QUEUE_REMOVE(q);
- xfree(item);
- }
-
- xfree(queue);
-}
-
-Event queue_get(Queue *queue)
-{
- return queue_empty(queue) ? NILEVENT : queue_remove(queue);
-}
-
-void queue_put_event(Queue *queue, Event event)
-{
- assert(queue);
- queue_push(queue, event);
- if (queue->parent && queue->parent->put_cb) {
- queue->parent->put_cb(queue->parent, queue->parent->data);
- }
-}
-
-void queue_process_events(Queue *queue)
-{
- assert(queue);
- while (!queue_empty(queue)) {
- Event event = queue_get(queue);
- if (event.handler) {
- event.handler(event.argv);
- }
- }
-}
-
-bool queue_empty(Queue *queue)
-{
- assert(queue);
- return QUEUE_EMPTY(&queue->headtail);
-}
-
-void queue_replace_parent(Queue *queue, Queue *new_parent)
-{
- assert(queue_empty(queue));
- queue->parent = new_parent;
-}
-
-static Event queue_remove(Queue *queue)
-{
- assert(!queue_empty(queue));
- QUEUE *h = QUEUE_HEAD(&queue->headtail);
- QUEUE_REMOVE(h);
- QueueItem *item = queue_node_data(h);
- Event rv;
-
- if (item->link) {
- assert(!queue->parent);
- // remove the next node in the linked queue
- Queue *linked = item->data.queue;
- assert(!queue_empty(linked));
- QueueItem *child =
- queue_node_data(QUEUE_HEAD(&linked->headtail));
- QUEUE_REMOVE(&child->node);
- rv = child->data.item.event;
- xfree(child);
- } else {
- if (queue->parent) {
- // remove the corresponding link node in the parent queue
- QUEUE_REMOVE(&item->data.item.parent->node);
- xfree(item->data.item.parent);
- }
- rv = item->data.item.event;
- }
-
- xfree(item);
- return rv;
-}
-
-static void queue_push(Queue *queue, Event event)
-{
- QueueItem *item = xmalloc(sizeof(QueueItem));
- item->link = false;
- item->data.item.event = event;
- QUEUE_INSERT_TAIL(&queue->headtail, &item->node);
- if (queue->parent) {
- // push link node to the parent queue
- item->data.item.parent = xmalloc(sizeof(QueueItem));
- item->data.item.parent->link = true;
- item->data.item.parent->data.queue = queue;
- QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node);
- }
-}
-
-static QueueItem *queue_node_data(QUEUE *q)
-{
- return QUEUE_DATA(q, QueueItem, node);
-}
diff --git a/src/nvim/event/queue.h b/src/nvim/event/queue.h
deleted file mode 100644
index 85fc59f8b2..0000000000
--- a/src/nvim/event/queue.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#ifndef NVIM_EVENT_QUEUE_H
-#define NVIM_EVENT_QUEUE_H
-
-#include <uv.h>
-
-#include "nvim/event/defs.h"
-#include "nvim/lib/queue.h"
-
-typedef struct queue Queue;
-typedef void (*put_callback)(Queue *queue, void *data);
-
-#define queue_put(q, h, ...) \
- queue_put_event(q, event_create(1, h, __VA_ARGS__));
-
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/queue.h.generated.h"
-#endif
-#endif // NVIM_EVENT_QUEUE_H
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 9f3fbc25ff..92efc9fa2e 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -17,21 +17,19 @@
# include "event/rstream.c.generated.h"
#endif
-void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
- void *data)
+void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL, data);
+ stream_init(loop, stream, fd, NULL);
rstream_init(stream, bufsize);
}
-void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
- void *data)
+void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream, data);
+ stream_init(NULL, stream, -1, uvstream);
rstream_init(stream, bufsize);
}
@@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize)
/// Starts watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
-void rstream_start(Stream *stream, stream_read_cb cb)
+void rstream_start(Stream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
+ stream->cb_data = data;
if (stream->uvstream) {
uv_read_start(stream->uvstream, alloc_cb, read_cb);
} else {
@@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
Stream *stream = data;
assert(stream->read_cb);
- rstream_start(stream, stream->read_cb);
+ rstream_start(stream, stream->read_cb, stream->cb_data);
}
// Callbacks used by libuv
@@ -100,6 +99,10 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
Stream *stream = uvstream->data;
+ if (cnt > 0) {
+ stream->num_bytes += (size_t)cnt;
+ }
+
if (cnt <= 0) {
if (cnt != UV_ENOBUFS
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
@@ -109,8 +112,8 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
// won't be called)
&& cnt != 0) {
- DLOG("Closing Stream(%p) because of %s(%zd)", stream,
- uv_strerror((int)cnt), cnt);
+ DLOG("Closing Stream (%p): %s (%s)", stream,
+ uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(uvstream);
@@ -175,7 +178,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
- stream->read_cb(stream, stream->buffer, count, stream->data, eof);
+ stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
if (stream->closed && !stream->pending_reqs) {
@@ -185,10 +188,6 @@ static void read_event(void **argv)
static void invoke_read_cb(Stream *stream, size_t count, bool eof)
{
- if (stream->closed) {
- return;
- }
-
// Don't let the stream be closed before the event is processed.
stream->pending_reqs++;
diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h
index e32608acc0..7fe352edef 100644
--- a/src/nvim/event/signal.h
+++ b/src/nvim/event/signal.h
@@ -14,7 +14,7 @@ struct signal_watcher {
void *data;
signal_cb cb;
signal_close_cb close_cb;
- Queue *events;
+ MultiQueue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index 93cc592683..8f9327f3d4 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -103,7 +103,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
// Libuv converts ENOENT to EACCES for Windows compatibility, but if
// the parent directory does not exist, ENOENT would be more accurate.
*path_tail((char_u *)watcher->addr) = NUL;
- if (!os_file_exists((char_u *)watcher->addr)) {
+ if (!os_path_exists((char_u *)watcher->addr)) {
result = -ENOENT;
}
}
@@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0;
}
-int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
+int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
@@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
return result;
}
- stream_init(NULL, stream, -1, client, data);
+ stream_init(NULL, stream, -1, client);
return 0;
}
diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h
index ad59fdbe3a..eb0823c76d 100644
--- a/src/nvim/event/socket.h
+++ b/src/nvim/event/socket.h
@@ -30,7 +30,7 @@ struct socket_watcher {
void *data;
socket_cb cb;
socket_close_cb close_cb;
- Queue *events;
+ MultiQueue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 71582ab357..26083c20f4 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking)
return retval;
}
-void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
- void *data)
+void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(2)
{
stream->uvstream = uvstream;
@@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->uvstream->data = stream;
}
- stream->data = data;
stream->internal_data = NULL;
stream->fpos = 0;
stream->curmem = 0;
@@ -71,14 +69,16 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->closed = false;
stream->buffer = NULL;
stream->events = NULL;
+ stream->num_bytes = 0;
}
-void stream_close(Stream *stream, stream_close_cb on_stream_close)
+void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
stream->closed = true;
stream->close_cb = on_stream_close;
+ stream->close_cb_data = data;
if (!stream->pending_reqs) {
stream_close_handle(stream);
@@ -102,7 +102,7 @@ static void close_cb(uv_handle_t *handle)
rbuffer_free(stream->buffer);
}
if (stream->close_cb) {
- stream->close_cb(stream, stream->data);
+ stream->close_cb(stream, stream->close_cb_data);
}
if (stream->internal_close_cb) {
stream->internal_close_cb(stream, stream->internal_data);
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index c6baac0db7..d27497e4a4 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -44,14 +44,16 @@ struct stream {
uv_file fd;
stream_read_cb read_cb;
stream_write_cb write_cb;
+ void *cb_data;
stream_close_cb close_cb, internal_close_cb;
+ void *close_cb_data, *internal_data;
size_t fpos;
size_t curmem;
size_t maxmem;
size_t pending_reqs;
- void *data, *internal_data;
+ size_t num_bytes;
bool closed;
- Queue *events;
+ MultiQueue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c
index 7bf333bcea..77260546db 100644
--- a/src/nvim/event/time.c
+++ b/src/nvim/event/time.c
@@ -17,6 +17,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data)
watcher->uv.data = watcher;
watcher->data = data;
watcher->events = loop->fast_events;
+ watcher->blockable = false;
}
void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout,
@@ -50,6 +51,10 @@ static void time_watcher_cb(uv_timer_t *handle)
FUNC_ATTR_NONNULL_ALL
{
TimeWatcher *watcher = handle->data;
+ if (watcher->blockable && !multiqueue_empty(watcher->events)) {
+ // the timer blocked and there already is an unprocessed event waiting
+ return;
+ }
CREATE_EVENT(watcher->events, time_event, 1, watcher);
}
diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h
index 7882b2b627..a6de89ad6e 100644
--- a/src/nvim/event/time.h
+++ b/src/nvim/event/time.h
@@ -12,7 +12,8 @@ struct time_watcher {
uv_timer_t uv;
void *data;
time_cb cb, close_cb;
- Queue *events;
+ MultiQueue *events;
+ bool blockable;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index 8028e35e6b..fc7aad8eb9 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -22,19 +22,17 @@ typedef struct {
# include "event/wstream.c.generated.h"
#endif
-void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
- void *data)
+void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL, data);
+ stream_init(loop, stream, fd, NULL);
wstream_init(stream, maxmem);
}
-void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
- void *data)
+void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream, data);
+ stream_init(NULL, stream, -1, uvstream);
wstream_init(stream, maxmem);
}
@@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem)
///
/// @param stream The `Stream` instance
/// @param cb The callback
-void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
- FUNC_ATTR_NONNULL_ALL
+void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
+ FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream->write_cb = cb;
+ stream->cb_data = data;
}
/// Queues data for writing to the backing file descriptor of a `Stream`
@@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status)
wstream_release_wbuffer(data->buffer);
if (data->stream->write_cb) {
- data->stream->write_cb(data->stream, data->stream->data, status);
+ data->stream->write_cb(data->stream, data->stream->cb_data, status);
}
data->stream->pending_reqs--;