aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:46:34 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:46:34 -0300
commit883b78d29864f39b8032468c4374766dad7d142f (patch)
treeb555f3a48c08862c07ef7518a8ba6c8fa58c1aee /src/nvim/event
parentd88c93acf390ea9d5e8674283927cff60fb41e0d (diff)
parentaa9cb48bf08af14068178619414590254b263882 (diff)
downloadrneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.gz
rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.bz2
rneovim-883b78d29864f39b8032468c4374766dad7d142f.zip
Merge PR #2980 'Refactor event loop layer'
Helped-by: oni-link <knil.ino@gmail.com> Reviewed-by: oni-link <knil.ino@gmail.com> Reviewed-by: Scott Prager <splinterofchaos@gmail.com>
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/loop.c144
-rw-r--r--src/nvim/event/loop.h59
-rw-r--r--src/nvim/event/process.c325
-rw-r--r--src/nvim/event/process.h56
-rw-r--r--src/nvim/event/pty_process.c254
-rw-r--r--src/nvim/event/pty_process.h30
-rw-r--r--src/nvim/event/rstream.c167
-rw-r--r--src/nvim/event/rstream.h16
-rw-r--r--src/nvim/event/signal.c52
-rw-r--r--src/nvim/event/signal.h22
-rw-r--r--src/nvim/event/socket.c157
-rw-r--r--src/nvim/event/socket.h38
-rw-r--r--src/nvim/event/stream.c108
-rw-r--r--src/nvim/event/stream.h55
-rw-r--r--src/nvim/event/time.c55
-rw-r--r--src/nvim/event/time.h20
-rw-r--r--src/nvim/event/uv_process.c77
-rw-r--r--src/nvim/event/uv_process.h25
-rw-r--r--src/nvim/event/wstream.c162
-rw-r--r--src/nvim/event/wstream.h24
20 files changed, 1846 insertions, 0 deletions
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
new file mode 100644
index 0000000000..d90565002e
--- /dev/null
+++ b/src/nvim/event/loop.c
@@ -0,0 +1,144 @@
+#include <stdint.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/process.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/loop.c.generated.h"
+#endif
+
+
+void loop_init(Loop *loop, void *data)
+{
+ uv_loop_init(&loop->uv);
+ loop->uv.data = loop;
+ loop->deferred_events = kl_init(Event);
+ loop->immediate_events = kl_init(Event);
+ loop->children = kl_init(WatcherPtr);
+ loop->children_stop_requests = 0;
+ uv_signal_init(&loop->uv, &loop->children_watcher);
+ uv_timer_init(&loop->uv, &loop->children_kill_timer);
+}
+
+void loop_poll_events(Loop *loop, int ms)
+{
+ static int recursive = 0;
+
+ if (recursive++) {
+ abort(); // Should not re-enter uv_run
+ }
+
+ bool wait = true;
+ uv_timer_t timer;
+
+ if (ms > 0) {
+ uv_timer_init(&loop->uv, &timer);
+ // Use a repeating timeout of ms milliseconds to make sure
+ // we do not block indefinitely for I/O.
+ uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
+ } else if (ms == 0) {
+ // For ms == 0, we need to do a non-blocking event poll by
+ // setting the run mode to UV_RUN_NOWAIT.
+ wait = false;
+ }
+
+ if (wait) {
+ loop_run_once(loop);
+ } else {
+ loop_run_nowait(loop);
+ }
+
+ if (ms > 0) {
+ // Ensure the timer handle is closed and run the event loop
+ // once more to let libuv perform it's cleanup
+ uv_timer_stop(&timer);
+ uv_close((uv_handle_t *)&timer, NULL);
+ loop_run_nowait(loop);
+ }
+
+ recursive--; // Can re-enter uv_run now
+ process_events_from(loop->immediate_events);
+}
+
+bool loop_has_deferred_events(Loop *loop)
+{
+ return loop->deferred_events_allowed && !kl_empty(loop->deferred_events);
+}
+
+void loop_enable_deferred_events(Loop *loop)
+{
+ ++loop->deferred_events_allowed;
+}
+
+void loop_disable_deferred_events(Loop *loop)
+{
+ --loop->deferred_events_allowed;
+}
+
+// Queue an event
+void loop_push_event(Loop *loop, Event event, bool deferred)
+{
+ // Sometimes libuv will run pending callbacks(timer for example) before
+ // blocking for a poll. If this happens and the callback pushes a event to one
+ // of the queues, the event would only be processed after the poll
+ // returns(user hits a key for example). To avoid this scenario, we call
+ // uv_stop when a event is enqueued.
+ loop_stop(loop);
+ kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events,
+ event);
+}
+
+void loop_process_event(Loop *loop)
+{
+ process_events_from(loop->deferred_events);
+}
+
+
+void loop_run(Loop *loop)
+{
+ uv_run(&loop->uv, UV_RUN_DEFAULT);
+}
+
+void loop_run_once(Loop *loop)
+{
+ uv_run(&loop->uv, UV_RUN_ONCE);
+}
+
+void loop_run_nowait(Loop *loop)
+{
+ uv_run(&loop->uv, UV_RUN_NOWAIT);
+}
+
+void loop_stop(Loop *loop)
+{
+ uv_stop(&loop->uv);
+}
+
+void loop_close(Loop *loop)
+{
+ uv_close((uv_handle_t *)&loop->children_watcher, NULL);
+ uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
+ do {
+ uv_run(&loop->uv, UV_RUN_DEFAULT);
+ } while (uv_loop_close(&loop->uv));
+}
+
+void loop_process_all_events(Loop *loop)
+{
+ process_events_from(loop->immediate_events);
+ process_events_from(loop->deferred_events);
+}
+
+static void process_events_from(klist_t(Event) *queue)
+{
+ while (!kl_empty(queue)) {
+ Event event = kl_shift(Event, queue);
+ event.handler(event);
+ }
+}
+
+static void timer_cb(uv_timer_t *handle)
+{
+}
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
new file mode 100644
index 0000000000..5eb4d32ca8
--- /dev/null
+++ b/src/nvim/event/loop.h
@@ -0,0 +1,59 @@
+#ifndef NVIM_EVENT_LOOP_H
+#define NVIM_EVENT_LOOP_H
+
+#include <stdint.h>
+
+#include <uv.h>
+
+#include "nvim/lib/klist.h"
+#include "nvim/os/time.h"
+
+typedef struct event Event;
+typedef void (*event_handler)(Event event);
+
+struct event {
+ void *data;
+ event_handler handler;
+};
+
+typedef void * WatcherPtr;
+
+#define _noop(x)
+KLIST_INIT(WatcherPtr, WatcherPtr, _noop)
+KLIST_INIT(Event, Event, _noop)
+
+typedef struct loop {
+ uv_loop_t uv;
+ klist_t(Event) *deferred_events, *immediate_events;
+ int deferred_events_allowed;
+ klist_t(WatcherPtr) *children;
+ uv_signal_t children_watcher;
+ uv_timer_t children_kill_timer;
+ size_t children_stop_requests;
+} Loop;
+
+// Poll for events until a condition or timeout
+#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \
+ do { \
+ int remaining = timeout; \
+ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
+ while (!(condition)) { \
+ loop_poll_events(loop, remaining); \
+ if (remaining == 0) { \
+ break; \
+ } else if (remaining > 0) { \
+ uint64_t now = os_hrtime(); \
+ remaining -= (int) ((now - before) / 1000000); \
+ before = now; \
+ if (remaining <= 0) { \
+ break; \
+ } \
+ } \
+ } \
+ } while (0)
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/loop.h.generated.h"
+#endif
+
+#endif // NVIM_EVENT_LOOP_H
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
new file mode 100644
index 0000000000..2b1f1ae096
--- /dev/null
+++ b/src/nvim/event/process.c
@@ -0,0 +1,325 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/os/shell.h"
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/process.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/event/pty_process.h"
+#include "nvim/globals.h"
+#include "nvim/log.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/process.c.generated.h"
+#endif
+
+// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly
+// exit before we send SIGNAL to it
+#define TERM_TIMEOUT 1000000000
+#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
+
+#define CLOSE_PROC_STREAM(proc, stream) \
+ do { \
+ if (proc->stream && !proc->stream->closed) { \
+ stream_close(proc->stream, NULL); \
+ } \
+ } while (0)
+
+
+bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ proc->loop = loop;
+ if (proc->in) {
+ uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0);
+ }
+
+ if (proc->out) {
+ uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0);
+ }
+
+ if (proc->err) {
+ uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0);
+ }
+
+ bool success;
+ switch (proc->type) {
+ case kProcessTypeUv:
+ success = uv_process_spawn((UvProcess *)proc);
+ break;
+ case kProcessTypePty:
+ success = pty_process_spawn((PtyProcess *)proc);
+ break;
+ default:
+ abort();
+ }
+
+ if (!success) {
+ if (proc->in) {
+ uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
+ }
+ if (proc->out) {
+ uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
+ }
+ if (proc->err) {
+ uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
+ }
+ process_close(proc);
+ shell_free_argv(proc->argv);
+ proc->status = -1;
+ return false;
+ }
+
+ void *data = proc->data;
+
+ if (proc->in) {
+ stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
+ proc->in->internal_data = proc;
+ proc->in->internal_close_cb = on_process_stream_close;
+ proc->refcount++;
+ }
+
+ if (proc->out) {
+ stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
+ proc->out->internal_data = proc;
+ proc->out->internal_close_cb = on_process_stream_close;
+ proc->refcount++;
+ }
+
+ if (proc->err) {
+ stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
+ proc->err->internal_data = proc;
+ proc->err->internal_close_cb = on_process_stream_close;
+ proc->refcount++;
+ }
+
+ proc->internal_exit_cb = on_process_exit;
+ proc->internal_close_cb = decref;
+ proc->refcount++;
+ kl_push(WatcherPtr, loop->children, proc);
+ return true;
+}
+
+void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
+{
+ kl_iter(WatcherPtr, loop->children, current) {
+ Process *proc = (*current)->data;
+ uv_kill(proc->pid, SIGTERM);
+ proc->term_sent = true;
+ process_stop(proc);
+ }
+
+ // Wait until all children exit
+ LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children));
+ pty_process_teardown(loop);
+}
+
+// Wrappers around `stream_close` that protect against double-closing.
+void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ process_close_in(proc);
+ process_close_out(proc);
+ process_close_err(proc);
+}
+
+void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ CLOSE_PROC_STREAM(proc, in);
+}
+
+void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ CLOSE_PROC_STREAM(proc, out);
+}
+
+void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ CLOSE_PROC_STREAM(proc, err);
+}
+
+/// Synchronously wait for a process to finish
+///
+/// @param process The Process instance
+/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
+/// waiting until the process quits.
+/// @return returns the status code of the exited process. -1 if the process is
+/// still running and the `timeout` has expired. Note that this is
+/// indistinguishable from the process returning -1 by itself. Which
+/// is possible on some OS. Returns -2 if an user has interruped the
+/// wait.
+int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
+{
+ // The default status is -1, which represents a timeout
+ int status = -1;
+ bool interrupted = false;
+
+ // Increase refcount to stop the exit callback from being called(and possibly
+ // being freed) before we have a chance to get the status.
+ proc->refcount++;
+ LOOP_POLL_EVENTS_UNTIL(proc->loop, ms,
+ // Until...
+ got_int || // interrupted by the user
+ proc->refcount == 1); // job exited
+
+ // we'll assume that a user frantically hitting interrupt doesn't like
+ // the current job. Signal that it has to be killed.
+ if (got_int) {
+ interrupted = true;
+ got_int = false;
+ process_stop(proc);
+ if (ms == -1) {
+ // We can only return, if all streams/handles are closed and the job
+
+ // exited.
+ LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1);
+ } else {
+ loop_poll_events(proc->loop, 0);
+ }
+ }
+
+ if (proc->refcount == 1) {
+ // Job exited, collect status and manually invoke close_cb to free the job
+ // resources
+ status = interrupted ? -2 : proc->status;
+ decref(proc);
+ } else {
+ proc->refcount--;
+ }
+
+ return status;
+}
+
+/// Ask a process to terminate and eventually kill if it doesn't respond
+void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
+{
+ if (proc->stopped_time) {
+ return;
+ }
+
+ proc->stopped_time = os_hrtime();
+ switch (proc->type) {
+ case kProcessTypeUv:
+ // Close the process's stdin. If the process doesn't close its own
+ // stdout/stderr, they will be closed when it exits(possibly due to being
+ // terminated after a timeout)
+ process_close_in(proc);
+ break;
+ case kProcessTypePty:
+ // close all streams for pty processes to send SIGHUP to the process
+ process_close_streams(proc);
+ pty_process_close_master((PtyProcess *)proc);
+ break;
+ default:
+ abort();
+ }
+
+ Loop *loop = proc->loop;
+ if (!loop->children_stop_requests++) {
+ // When there's at least one stop request pending, start a timer that
+ // will periodically check if a signal should be send to a to the job
+ DLOG("Starting job kill timer");
+ uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100);
+ }
+}
+
+/// Iterates the process list sending SIGTERM to stopped processes and SIGKILL
+/// to those that didn't die from SIGTERM after a while(exit_timeout is 0).
+static void children_kill_cb(uv_timer_t *handle)
+{
+ Loop *loop = handle->loop->data;
+ uint64_t now = os_hrtime();
+
+ kl_iter(WatcherPtr, loop->children, current) {
+ Process *proc = (*current)->data;
+ if (!proc->stopped_time) {
+ continue;
+ }
+ uint64_t elapsed = now - proc->stopped_time;
+
+ if (!proc->term_sent && elapsed >= TERM_TIMEOUT) {
+ ILOG("Sending SIGTERM to pid %d", proc->pid);
+ uv_kill(proc->pid, SIGTERM);
+ proc->term_sent = true;
+ } else if (elapsed >= KILL_TIMEOUT) {
+ ILOG("Sending SIGKILL to pid %d", proc->pid);
+ uv_kill(proc->pid, SIGKILL);
+ }
+ }
+}
+
+static void decref(Process *proc)
+{
+ if (--proc->refcount != 0) {
+ return;
+ }
+
+ Loop *loop = proc->loop;
+ kliter_t(WatcherPtr) **node = NULL;
+ kl_iter(WatcherPtr, loop->children, current) {
+ if ((*current)->data == proc) {
+ node = current;
+ break;
+ }
+ }
+
+ assert(node);
+ kl_shift_at(WatcherPtr, loop->children, node);
+ shell_free_argv(proc->argv);
+ if (proc->type == kProcessTypePty) {
+ xfree(((PtyProcess *)proc)->term_name);
+ }
+ if (proc->cb) {
+ proc->cb(proc, proc->status, proc->data);
+ }
+}
+
+static void process_close(Process *proc)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ assert(!proc->closed);
+ proc->closed = true;
+ switch (proc->type) {
+ case kProcessTypeUv:
+ uv_process_close((UvProcess *)proc);
+ break;
+ case kProcessTypePty:
+ pty_process_close((PtyProcess *)proc);
+ break;
+ default:
+ abort();
+ }
+}
+
+static void on_process_exit(Process *proc)
+{
+ if (exiting) {
+ on_process_exit_event((Event) {.data = proc});
+ } else {
+ loop_push_event(proc->loop,
+ (Event) {.handler = on_process_exit_event, .data = proc}, false);
+ }
+
+ Loop *loop = proc->loop;
+ if (loop->children_stop_requests && !--loop->children_stop_requests) {
+ // Stop the timer if no more stop requests are pending
+ DLOG("Stopping process kill timer");
+ uv_timer_stop(&loop->children_kill_timer);
+ }
+}
+
+static void on_process_exit_event(Event event)
+{
+ Process *proc = event.data;
+ process_close_streams(proc);
+ process_close(proc);
+}
+
+static void on_process_stream_close(Stream *stream, void *data)
+{
+ Process *proc = data;
+ decref(proc);
+}
+
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
new file mode 100644
index 0000000000..5c84a7d1d0
--- /dev/null
+++ b/src/nvim/event/process.h
@@ -0,0 +1,56 @@
+#ifndef NVIM_EVENT_PROCESS_H
+#define NVIM_EVENT_PROCESS_H
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+
+typedef enum {
+ kProcessTypeUv,
+ kProcessTypePty
+} ProcessType;
+
+typedef struct process Process;
+typedef void (*process_exit_cb)(Process *proc, int status, void *data);
+typedef void (*internal_process_cb)(Process *proc);
+
+struct process {
+ ProcessType type;
+ Loop *loop;
+ void *data;
+ int pid, status, refcount;
+ // set to the hrtime of when process_stop was called for the process.
+ uint64_t stopped_time;
+ char **argv;
+ Stream *in, *out, *err;
+ process_exit_cb cb;
+ internal_process_cb internal_exit_cb, internal_close_cb;
+ bool closed, term_sent;
+};
+
+static inline Process process_init(ProcessType type, void *data)
+{
+ return (Process) {
+ .type = type,
+ .data = data,
+ .loop = NULL,
+ .pid = 0,
+ .status = 0,
+ .refcount = 0,
+ .stopped_time = 0,
+ .argv = NULL,
+ .in = NULL,
+ .out = NULL,
+ .err = NULL,
+ .cb = NULL,
+ .closed = false,
+ .term_sent = false,
+ .internal_close_cb = NULL,
+ .internal_exit_cb = NULL
+ };
+}
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/process.h.generated.h"
+#endif
+#endif // NVIM_EVENT_PROCESS_H
diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c
new file mode 100644
index 0000000000..1e24d7c919
--- /dev/null
+++ b/src/nvim/event/pty_process.c
@@ -0,0 +1,254 @@
+// Some of the code came from pangoterm and libuv
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <unistd.h>
+#include <termios.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/ioctl.h>
+
+// forkpty is not in POSIX, so headers are platform-specific
+#if defined(__FreeBSD__)
+# include <libutil.h>
+#elif defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__)
+# include <util.h>
+#else
+# include <pty.h>
+#endif
+
+#include <uv.h>
+
+#include "nvim/lib/klist.h"
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/process.h"
+#include "nvim/event/pty_process.h"
+#include "nvim/log.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/pty_process.c.generated.h"
+#endif
+
+static const unsigned int KILL_RETRIES = 5;
+static const unsigned int KILL_TIMEOUT = 2; // seconds
+
+bool pty_process_spawn(PtyProcess *ptyproc)
+ FUNC_ATTR_NONNULL_ALL
+{
+ Process *proc = (Process *)ptyproc;
+ uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
+ ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};
+ struct termios termios;
+ init_termios(&termios);
+ uv_disable_stdio_inheritance();
+ int master;
+ int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
+
+ if (pid < 0) {
+ ELOG("forkpty failed: %s", strerror(errno));
+ return false;
+ } else if (pid == 0) {
+ init_child(ptyproc);
+ abort();
+ }
+
+ // make sure the master file descriptor is non blocking
+ int master_status_flags = fcntl(master, F_GETFL);
+ if (master_status_flags == -1) {
+ ELOG("Failed to get master descriptor status flags: %s", strerror(errno));
+ goto error;
+ }
+ if (fcntl(master, F_SETFL, master_status_flags | O_NONBLOCK) == -1) {
+ ELOG("Failed to make master descriptor non-blocking: %s", strerror(errno));
+ goto error;
+ }
+
+ if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) {
+ goto error;
+ }
+ if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {
+ goto error;
+ }
+ if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) {
+ goto error;
+ }
+
+ ptyproc->tty_fd = master;
+ proc->pid = pid;
+ return true;
+
+error:
+ close(master);
+
+ // terminate spawned process
+ kill(pid, SIGTERM);
+ int status, child;
+ unsigned int try = 0;
+ while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) {
+ sleep(KILL_TIMEOUT);
+ }
+ if (child != pid) {
+ kill(pid, SIGKILL);
+ waitpid(pid, NULL, 0);
+ }
+
+ return false;
+}
+
+void pty_process_resize(PtyProcess *ptyproc, uint16_t width,
+ uint16_t height)
+ FUNC_ATTR_NONNULL_ALL
+{
+ ptyproc->winsize = (struct winsize){height, width, 0, 0};
+ ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
+}
+
+void pty_process_close(PtyProcess *ptyproc)
+ FUNC_ATTR_NONNULL_ALL
+{
+ pty_process_close_master(ptyproc);
+ Process *proc = (Process *)ptyproc;
+ if (proc->internal_close_cb) {
+ proc->internal_close_cb(proc);
+ }
+}
+
+void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
+{
+ if (ptyproc->tty_fd >= 0) {
+ close(ptyproc->tty_fd);
+ ptyproc->tty_fd = -1;
+ }
+}
+
+void pty_process_teardown(Loop *loop)
+{
+ uv_signal_stop(&loop->children_watcher);
+}
+
+static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
+{
+ unsetenv("COLUMNS");
+ unsetenv("LINES");
+ unsetenv("TERMCAP");
+ unsetenv("COLORTERM");
+ unsetenv("COLORFGBG");
+
+ signal(SIGCHLD, SIG_DFL);
+ signal(SIGHUP, SIG_DFL);
+ signal(SIGINT, SIG_DFL);
+ signal(SIGQUIT, SIG_DFL);
+ signal(SIGTERM, SIG_DFL);
+ signal(SIGALRM, SIG_DFL);
+
+ setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1);
+ execvp(ptyproc->process.argv[0], ptyproc->process.argv);
+ fprintf(stderr, "execvp failed: %s\n", strerror(errno));
+}
+
+static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
+{
+ memset(termios, 0, sizeof(struct termios));
+ // Taken from pangoterm
+ termios->c_iflag = ICRNL|IXON;
+ termios->c_oflag = OPOST|ONLCR;
+#ifdef TAB0
+ termios->c_oflag |= TAB0;
+#endif
+ termios->c_cflag = CS8|CREAD;
+ termios->c_lflag = ISIG|ICANON|IEXTEN|ECHO|ECHOE|ECHOK;
+
+ cfsetspeed(termios, 38400);
+
+#ifdef IUTF8
+ termios->c_iflag |= IUTF8;
+#endif
+#ifdef NL0
+ termios->c_oflag |= NL0;
+#endif
+#ifdef CR0
+ termios->c_oflag |= CR0;
+#endif
+#ifdef BS0
+ termios->c_oflag |= BS0;
+#endif
+#ifdef VT0
+ termios->c_oflag |= VT0;
+#endif
+#ifdef FF0
+ termios->c_oflag |= FF0;
+#endif
+#ifdef ECHOCTL
+ termios->c_lflag |= ECHOCTL;
+#endif
+#ifdef ECHOKE
+ termios->c_lflag |= ECHOKE;
+#endif
+
+ termios->c_cc[VINTR] = 0x1f & 'C';
+ termios->c_cc[VQUIT] = 0x1f & '\\';
+ termios->c_cc[VERASE] = 0x7f;
+ termios->c_cc[VKILL] = 0x1f & 'U';
+ termios->c_cc[VEOF] = 0x1f & 'D';
+ termios->c_cc[VEOL] = _POSIX_VDISABLE;
+ termios->c_cc[VEOL2] = _POSIX_VDISABLE;
+ termios->c_cc[VSTART] = 0x1f & 'Q';
+ termios->c_cc[VSTOP] = 0x1f & 'S';
+ termios->c_cc[VSUSP] = 0x1f & 'Z';
+ termios->c_cc[VREPRINT] = 0x1f & 'R';
+ termios->c_cc[VWERASE] = 0x1f & 'W';
+ termios->c_cc[VLNEXT] = 0x1f & 'V';
+ termios->c_cc[VMIN] = 1;
+ termios->c_cc[VTIME] = 0;
+}
+
+static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe)
+ FUNC_ATTR_NONNULL_ALL
+{
+ int fd_dup = dup(fd);
+ if (fd_dup < 0) {
+ ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
+ return false;
+ }
+ int uv_result = uv_pipe_open(pipe, fd_dup);
+ if (uv_result) {
+ ELOG("Failed to set pipe to descriptor %d: %s",
+ fd_dup, uv_strerror(uv_result));
+ close(fd_dup);
+ return false;
+ }
+ return true;
+}
+
+static void chld_handler(uv_signal_t *handle, int signum)
+{
+ int stat = 0;
+ int pid;
+
+ do {
+ pid = waitpid(-1, &stat, WNOHANG);
+ } while (pid < 0 && errno == EINTR);
+
+ if (pid <= 0) {
+ return;
+ }
+
+ Loop *loop = handle->loop->data;
+
+ kl_iter(WatcherPtr, loop->children, current) {
+ Process *proc = (*current)->data;
+ if (proc->pid == pid) {
+ if (WIFEXITED(stat)) {
+ proc->status = WEXITSTATUS(stat);
+ } else if (WIFSIGNALED(stat)) {
+ proc->status = WTERMSIG(stat);
+ }
+ proc->internal_exit_cb(proc);
+ break;
+ }
+ }
+}
diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h
new file mode 100644
index 0000000000..a12b5489c5
--- /dev/null
+++ b/src/nvim/event/pty_process.h
@@ -0,0 +1,30 @@
+#ifndef NVIM_EVENT_PTY_PROCESS_H
+#define NVIM_EVENT_PTY_PROCESS_H
+
+#include <sys/ioctl.h>
+
+#include "nvim/event/process.h"
+
+typedef struct pty_process {
+ Process process;
+ char *term_name;
+ uint16_t width, height;
+ struct winsize winsize;
+ int tty_fd;
+} PtyProcess;
+
+static inline PtyProcess pty_process_init(void *data)
+{
+ PtyProcess rv;
+ rv.process = process_init(kProcessTypePty, data);
+ rv.term_name = NULL;
+ rv.width = 80;
+ rv.height = 24;
+ rv.tty_fd = -1;
+ return rv;
+}
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/pty_process.h.generated.h"
+#endif
+#endif // NVIM_EVENT_PTY_PROCESS_H
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
new file mode 100644
index 0000000000..78c044347f
--- /dev/null
+++ b/src/nvim/event/rstream.c
@@ -0,0 +1,167 @@
+#include <assert.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/event/rstream.h"
+#include "nvim/ascii.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+#include "nvim/log.h"
+#include "nvim/misc1.h"
+#include "nvim/event/loop.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/rstream.c.generated.h"
+#endif
+
+void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(loop, stream, fd, NULL, data);
+ rstream_init(stream, bufsize);
+}
+
+void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(NULL, stream, -1, uvstream, data);
+ rstream_init(stream, bufsize);
+}
+
+void rstream_init(Stream *stream, size_t bufsize)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ stream->buffer = rbuffer_new(bufsize);
+ stream->buffer->data = stream;
+ stream->buffer->full_cb = on_rbuffer_full;
+ stream->buffer->nonfull_cb = on_rbuffer_nonfull;
+}
+
+
+/// Starts watching for events from a `Stream` instance.
+///
+/// @param stream The `Stream` instance
+void rstream_start(Stream *stream, stream_read_cb cb)
+{
+ stream->read_cb = cb;
+ if (stream->uvstream) {
+ uv_read_start(stream->uvstream, alloc_cb, read_cb);
+ } else {
+ uv_idle_start(&stream->uv.idle, fread_idle_cb);
+ }
+}
+
+/// Stops watching for events from a `Stream` instance.
+///
+/// @param stream The `Stream` instance
+void rstream_stop(Stream *stream)
+{
+ if (stream->uvstream) {
+ uv_read_stop(stream->uvstream);
+ } else {
+ uv_idle_stop(&stream->uv.idle);
+ }
+}
+
+static void on_rbuffer_full(RBuffer *buf, void *data)
+{
+ rstream_stop(data);
+}
+
+static void on_rbuffer_nonfull(RBuffer *buf, void *data)
+{
+ Stream *stream = data;
+ assert(stream->read_cb);
+ rstream_start(stream, stream->read_cb);
+}
+
+// Callbacks used by libuv
+
+// Called by libuv to allocate memory for reading.
+static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
+{
+ Stream *stream = handle->data;
+ buf->base = rbuffer_write_ptr(stream->buffer, &buf->len);
+}
+
+// Callback invoked by libuv after it copies the data into the buffer provided
+// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
+// 0-length buffer.
+static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
+{
+ Stream *stream = uvstream->data;
+
+ if (cnt <= 0) {
+ if (cnt != UV_ENOBUFS
+ // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
+ // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
+ //
+ // We don't need to do anything with the RBuffer because the next call
+ // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
+ // won't be called)
+ && cnt != 0) {
+ DLOG("Closing Stream(%p) because of %s(%zd)", stream,
+ uv_strerror((int)cnt), cnt);
+ // Read error or EOF, either way stop the stream and invoke the callback
+ // with eof == true
+ uv_read_stop(uvstream);
+ stream->read_cb(stream, stream->buffer, stream->data, true);
+ }
+ return;
+ }
+
+ // at this point we're sure that cnt is positive, no error occurred
+ size_t nread = (size_t)cnt;
+ // Data was already written, so all we need is to update 'wpos' to reflect
+ // the space actually used in the buffer.
+ rbuffer_produced(stream->buffer, nread);
+ stream->read_cb(stream, stream->buffer, stream->data, false);
+}
+
+// Called by the by the 'idle' handle to emulate a reading event
+static void fread_idle_cb(uv_idle_t *handle)
+{
+ uv_fs_t req;
+ Stream *stream = handle->data;
+
+ stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len);
+
+ // the offset argument to uv_fs_read is int64_t, could someone really try
+ // to read more than 9 quintillion (9e18) bytes?
+ // upcast is meant to avoid tautological condition warning on 32 bits
+ uintmax_t fpos_intmax = stream->fpos;
+ if (fpos_intmax > INT64_MAX) {
+ ELOG("stream offset overflow");
+ preserve_exit();
+ }
+
+ // Synchronous read
+ uv_fs_read(
+ handle->loop,
+ &req,
+ stream->fd,
+ &stream->uvbuf,
+ 1,
+ (int64_t) stream->fpos,
+ NULL);
+
+ uv_fs_req_cleanup(&req);
+
+ if (req.result <= 0) {
+ uv_idle_stop(&stream->uv.idle);
+ stream->read_cb(stream, stream->buffer, stream->data, true);
+ return;
+ }
+
+ // no errors (req.result (ssize_t) is positive), it's safe to cast.
+ size_t nread = (size_t) req.result;
+ rbuffer_produced(stream->buffer, nread);
+ stream->fpos += nread;
+}
diff --git a/src/nvim/event/rstream.h b/src/nvim/event/rstream.h
new file mode 100644
index 0000000000..f30ad79ee5
--- /dev/null
+++ b/src/nvim/event/rstream.h
@@ -0,0 +1,16 @@
+#ifndef NVIM_EVENT_RSTREAM_H
+#define NVIM_EVENT_RSTREAM_H
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/stream.h"
+
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/rstream.h.generated.h"
+#endif
+#endif // NVIM_EVENT_RSTREAM_H
diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c
new file mode 100644
index 0000000000..63133b4f57
--- /dev/null
+++ b/src/nvim/event/signal.c
@@ -0,0 +1,52 @@
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/signal.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/signal.c.generated.h"
+#endif
+
+
+void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ uv_signal_init(&loop->uv, &watcher->uv);
+ watcher->uv.data = watcher;
+ watcher->data = data;
+ watcher->cb = NULL;
+}
+
+void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum)
+ FUNC_ATTR_NONNULL_ALL
+{
+ watcher->cb = cb;
+ uv_signal_start(&watcher->uv, signal_watcher_cb, signum);
+}
+
+void signal_watcher_stop(SignalWatcher *watcher)
+ FUNC_ATTR_NONNULL_ALL
+{
+ uv_signal_stop(&watcher->uv);
+}
+
+void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ watcher->close_cb = cb;
+ uv_close((uv_handle_t *)&watcher->uv, close_cb);
+}
+
+static void signal_watcher_cb(uv_signal_t *handle, int signum)
+{
+ SignalWatcher *watcher = handle->data;
+ watcher->cb(watcher, signum, watcher->data);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ SignalWatcher *watcher = handle->data;
+ if (watcher->close_cb) {
+ watcher->close_cb(watcher, watcher->data);
+ }
+}
diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h
new file mode 100644
index 0000000000..c269fa9d95
--- /dev/null
+++ b/src/nvim/event/signal.h
@@ -0,0 +1,22 @@
+#ifndef NVIM_EVENT_SIGNAL_H
+#define NVIM_EVENT_SIGNAL_H
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+
+typedef struct signal_watcher SignalWatcher;
+typedef void (*signal_cb)(SignalWatcher *watcher, int signum, void *data);
+typedef void (*signal_close_cb)(SignalWatcher *watcher, void *data);
+
+struct signal_watcher {
+ uv_signal_t uv;
+ void *data;
+ signal_cb cb;
+ signal_close_cb close_cb;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/signal.h.generated.h"
+#endif
+#endif // NVIM_EVENT_SIGNAL_H
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
new file mode 100644
index 0000000000..bdc632abf0
--- /dev/null
+++ b/src/nvim/event/socket.c
@@ -0,0 +1,157 @@
+#include <assert.h>
+#include <stdint.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/socket.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/os/os.h"
+#include "nvim/ascii.h"
+#include "nvim/vim.h"
+#include "nvim/strings.h"
+#include "nvim/path.h"
+#include "nvim/memory.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/socket.c.generated.h"
+#endif
+
+#define NVIM_DEFAULT_TCP_PORT 7450
+
+void socket_watcher_init(Loop *loop, SocketWatcher *watcher,
+ const char *endpoint, void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3)
+{
+ // Trim to `ADDRESS_MAX_SIZE`
+ if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr))
+ >= sizeof(watcher->addr)) {
+ // TODO(aktau): since this is not what the user wanted, perhaps we
+ // should return an error here
+ WLOG("Address was too long, truncated to %s", watcher->addr);
+ }
+
+ bool tcp = true;
+ char ip[16], *ip_end = xstrchrnul(watcher->addr, ':');
+
+ // (ip_end - addr) is always > 0, so convert to size_t
+ size_t addr_len = (size_t)(ip_end - watcher->addr);
+
+ if (addr_len > sizeof(ip) - 1) {
+ // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255)
+ addr_len = sizeof(ip) - 1;
+ }
+
+ // Extract the address part
+ xstrlcpy(ip, watcher->addr, addr_len + 1);
+ int port = NVIM_DEFAULT_TCP_PORT;
+
+ if (*ip_end == ':') {
+ // Extract the port
+ long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
+ if (lport <= 0 || lport > 0xffff) {
+ // Invalid port, treat as named pipe or unix socket
+ tcp = false;
+ } else {
+ port = (int) lport;
+ }
+ }
+
+ if (tcp) {
+ // Try to parse ip address
+ if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) {
+ // Invalid address, treat as named pipe or unix socket
+ tcp = false;
+ }
+ }
+
+ if (tcp) {
+ uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle);
+ watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle;
+ } else {
+ uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0);
+ watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle;
+ }
+
+ watcher->stream->data = watcher;
+ watcher->cb = NULL;
+ watcher->close_cb = NULL;
+}
+
+int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
+ FUNC_ATTR_NONNULL_ALL
+{
+ watcher->cb = cb;
+ int result;
+
+ if (watcher->stream->type == UV_TCP) {
+ result = uv_tcp_bind(&watcher->uv.tcp.handle,
+ (const struct sockaddr *)&watcher->uv.tcp.addr, 0);
+ } else {
+ result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr);
+ }
+
+ if (result == 0) {
+ result = uv_listen(watcher->stream, backlog, connection_cb);
+ }
+
+ assert(result <= 0); // libuv should have returned -errno or zero.
+ if (result < 0) {
+ if (result == -EACCES) {
+ // Libuv converts ENOENT to EACCES for Windows compatibility, but if
+ // the parent directory does not exist, ENOENT would be more accurate.
+ *path_tail((char_u *)watcher->addr) = NUL;
+ if (!os_file_exists((char_u *)watcher->addr)) {
+ result = -ENOENT;
+ }
+ }
+ return result;
+ }
+
+ return 0;
+}
+
+int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
+{
+ uv_stream_t *client;
+
+ if (watcher->stream->type == UV_TCP) {
+ client = (uv_stream_t *)&stream->uv.tcp;
+ uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client);
+ } else {
+ client = (uv_stream_t *)&stream->uv.pipe;
+ uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0);
+ }
+
+ int result = uv_accept(watcher->stream, client);
+
+ if (result) {
+ uv_close((uv_handle_t *)client, NULL);
+ return result;
+ }
+
+ stream_init(NULL, stream, -1, client, data);
+ return 0;
+}
+
+void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ watcher->close_cb = cb;
+ uv_close((uv_handle_t *)watcher->stream, close_cb);
+}
+
+static void connection_cb(uv_stream_t *handle, int status)
+{
+ SocketWatcher *watcher = handle->data;
+ watcher->cb(watcher, status, watcher->data);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ SocketWatcher *watcher = handle->data;
+ if (watcher->close_cb) {
+ watcher->close_cb(watcher, watcher->data);
+ }
+}
diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h
new file mode 100644
index 0000000000..17fd39f33b
--- /dev/null
+++ b/src/nvim/event/socket.h
@@ -0,0 +1,38 @@
+#ifndef NVIM_EVENT_SOCKET_H
+#define NVIM_EVENT_SOCKET_H
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+
+#define ADDRESS_MAX_SIZE 256
+
+typedef struct socket_watcher SocketWatcher;
+typedef void (*socket_cb)(SocketWatcher *watcher, int result, void *data);
+typedef void (*socket_close_cb)(SocketWatcher *watcher, void *data);
+
+struct socket_watcher {
+ // Pipe/socket path, or TCP address string
+ char addr[ADDRESS_MAX_SIZE];
+ // TCP server or unix socket (named pipe on Windows)
+ union {
+ struct {
+ uv_tcp_t handle;
+ struct sockaddr_in addr;
+ } tcp;
+ struct {
+ uv_pipe_t handle;
+ } pipe;
+ } uv;
+ uv_stream_t *stream;
+ void *data;
+ socket_cb cb;
+ socket_close_cb close_cb;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/socket.h.generated.h"
+#endif
+#endif // NVIM_EVENT_SOCKET_H
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
new file mode 100644
index 0000000000..959b532146
--- /dev/null
+++ b/src/nvim/event/stream.c
@@ -0,0 +1,108 @@
+#include <assert.h>
+#include <stdio.h>
+#include <stdbool.h>
+
+#include <uv.h>
+
+#include "nvim/rbuffer.h"
+#include "nvim/event/stream.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/stream.c.generated.h"
+#endif
+
+/// Sets the stream associated with `fd` to "blocking" mode.
+///
+/// @return `0` on success, or `-errno` on failure.
+int stream_set_blocking(int fd, bool blocking)
+{
+ // Private loop to avoid conflict with existing watcher(s):
+ // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
+ uv_loop_t loop;
+ uv_pipe_t stream;
+ uv_loop_init(&loop);
+ uv_pipe_init(&loop, &stream, 0);
+ uv_pipe_open(&stream, fd);
+ int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking);
+ uv_close((uv_handle_t *)&stream, NULL);
+ uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt.
+ uv_loop_close(&loop);
+ return retval;
+}
+
+void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
+ void *data)
+{
+ stream->uvstream = uvstream;
+
+ if (fd >= 0) {
+ uv_handle_type type = uv_guess_handle(fd);
+ stream->fd = fd;
+
+ if (type == UV_FILE) {
+ // Non-blocking file reads are simulated with an idle handle that reads in
+ // chunks of the ring buffer size, giving time for other events to be
+ // processed between reads.
+ uv_idle_init(&loop->uv, &stream->uv.idle);
+ stream->uv.idle.data = stream;
+ } else {
+ assert(type == UV_NAMED_PIPE || type == UV_TTY);
+ uv_pipe_init(&loop->uv, &stream->uv.pipe, 0);
+ uv_pipe_open(&stream->uv.pipe, fd);
+ stream->uvstream = (uv_stream_t *)&stream->uv.pipe;
+ }
+ }
+
+ if (stream->uvstream) {
+ stream->uvstream->data = stream;
+ }
+
+ stream->data = data;
+ stream->internal_data = NULL;
+ stream->fpos = 0;
+ stream->curmem = 0;
+ stream->maxmem = 0;
+ stream->pending_reqs = 0;
+ stream->read_cb = NULL;
+ stream->write_cb = NULL;
+ stream->close_cb = NULL;
+ stream->internal_close_cb = NULL;
+ stream->closed = false;
+ stream->buffer = NULL;
+}
+
+void stream_close(Stream *stream, stream_close_cb on_stream_close)
+{
+ assert(!stream->closed);
+
+ if (stream->buffer) {
+ rbuffer_free(stream->buffer);
+ }
+
+ stream->closed = true;
+ stream->close_cb = on_stream_close;
+
+ if (!stream->pending_reqs) {
+ stream_close_handle(stream);
+ }
+}
+
+void stream_close_handle(Stream *stream)
+{
+ if (stream->uvstream) {
+ uv_close((uv_handle_t *)stream->uvstream, close_cb);
+ } else {
+ uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
+ }
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ Stream *stream = handle->data;
+ if (stream->close_cb) {
+ stream->close_cb(stream, stream->data);
+ }
+ if (stream->internal_close_cb) {
+ stream->internal_close_cb(stream, stream->internal_data);
+ }
+}
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
new file mode 100644
index 0000000000..37410b2036
--- /dev/null
+++ b/src/nvim/event/stream.h
@@ -0,0 +1,55 @@
+#ifndef NVIM_EVENT_STREAM_H
+#define NVIM_EVENT_STREAM_H
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/rbuffer.h"
+
+typedef struct stream Stream;
+/// Type of function called when the Stream buffer is filled with data
+///
+/// @param stream The Stream instance
+/// @param rbuffer The associated RBuffer instance
+/// @param data User-defined data
+/// @param eof If the stream reached EOF.
+typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data,
+ bool eof);
+
+/// Type of function called when the Stream has information about a write
+/// request.
+///
+/// @param wstream The Stream instance
+/// @param data User-defined data
+/// @param status 0 on success, anything else indicates failure
+typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
+typedef void (*stream_close_cb)(Stream *stream, void *data);
+
+struct stream {
+ union {
+ uv_pipe_t pipe;
+ uv_tcp_t tcp;
+ uv_idle_t idle;
+ } uv;
+ uv_stream_t *uvstream;
+ uv_buf_t uvbuf;
+ RBuffer *buffer;
+ uv_file fd;
+ stream_read_cb read_cb;
+ stream_write_cb write_cb;
+ stream_close_cb close_cb, internal_close_cb;
+ size_t fpos;
+ size_t curmem;
+ size_t maxmem;
+ size_t pending_reqs;
+ void *data, *internal_data;
+ bool closed;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/stream.h.generated.h"
+#endif
+#endif // NVIM_EVENT_STREAM_H
diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c
new file mode 100644
index 0000000000..ce33cdfc10
--- /dev/null
+++ b/src/nvim/event/time.c
@@ -0,0 +1,55 @@
+#include <stdint.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/time.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/time.c.generated.h"
+#endif
+
+
+void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ uv_timer_init(&loop->uv, &watcher->uv);
+ watcher->uv.data = watcher;
+ watcher->data = data;
+}
+
+void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout,
+ uint64_t repeat)
+ FUNC_ATTR_NONNULL_ALL
+{
+ watcher->cb = cb;
+ uv_timer_start(&watcher->uv, time_watcher_cb, timeout, repeat);
+}
+
+void time_watcher_stop(TimeWatcher *watcher)
+ FUNC_ATTR_NONNULL_ALL
+{
+ uv_timer_stop(&watcher->uv);
+}
+
+void time_watcher_close(TimeWatcher *watcher, time_cb cb)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ watcher->close_cb = cb;
+ uv_close((uv_handle_t *)&watcher->uv, close_cb);
+}
+
+static void time_watcher_cb(uv_timer_t *handle)
+ FUNC_ATTR_NONNULL_ALL
+{
+ TimeWatcher *watcher = handle->data;
+ watcher->cb(watcher, watcher->data);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ TimeWatcher *watcher = handle->data;
+ if (watcher->close_cb) {
+ watcher->close_cb(watcher, watcher->data);
+ }
+}
diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h
new file mode 100644
index 0000000000..ee50e53d11
--- /dev/null
+++ b/src/nvim/event/time.h
@@ -0,0 +1,20 @@
+#ifndef NVIM_EVENT_TIME_H
+#define NVIM_EVENT_TIME_H
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+
+typedef struct time_watcher TimeWatcher;
+typedef void (*time_cb)(TimeWatcher *watcher, void *data);
+
+struct time_watcher {
+ uv_timer_t uv;
+ void *data;
+ time_cb cb, close_cb;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/time.h.generated.h"
+#endif
+#endif // NVIM_EVENT_TIME_H
diff --git a/src/nvim/event/uv_process.c b/src/nvim/event/uv_process.c
new file mode 100644
index 0000000000..21c2fd1790
--- /dev/null
+++ b/src/nvim/event/uv_process.c
@@ -0,0 +1,77 @@
+#include <assert.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/process.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/log.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/uv_process.c.generated.h"
+#endif
+
+bool uv_process_spawn(UvProcess *uvproc)
+ FUNC_ATTR_NONNULL_ALL
+{
+ Process *proc = (Process *)uvproc;
+ uvproc->uvopts.file = proc->argv[0];
+ uvproc->uvopts.args = proc->argv;
+ uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE;
+ uvproc->uvopts.exit_cb = exit_cb;
+ uvproc->uvopts.cwd = NULL;
+ uvproc->uvopts.env = NULL;
+ uvproc->uvopts.stdio = uvproc->uvstdio;
+ uvproc->uvopts.stdio_count = 3;
+ uvproc->uvstdio[0].flags = UV_IGNORE;
+ uvproc->uvstdio[1].flags = UV_IGNORE;
+ uvproc->uvstdio[2].flags = UV_IGNORE;
+ uvproc->uv.data = proc;
+
+ if (proc->in) {
+ uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
+ uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe;
+ }
+
+ if (proc->out) {
+ uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
+ uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe;
+ }
+
+ if (proc->err) {
+ uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
+ uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe;
+ }
+
+ int status;
+ if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) {
+ ELOG("uv_spawn failed: %s", uv_strerror(status));
+ return false;
+ }
+
+ proc->pid = uvproc->uv.pid;
+ return true;
+}
+
+void uv_process_close(UvProcess *uvproc)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ uv_close((uv_handle_t *)&uvproc->uv, close_cb);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ Process *proc = handle->data;
+ if (proc->internal_close_cb) {
+ proc->internal_close_cb(proc);
+ }
+}
+
+static void exit_cb(uv_process_t *handle, int64_t status, int term_signal)
+{
+ Process *proc = handle->data;
+ proc->status = (int)status;
+ proc->internal_exit_cb(proc);
+}
diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h
new file mode 100644
index 0000000000..a17f1446b3
--- /dev/null
+++ b/src/nvim/event/uv_process.h
@@ -0,0 +1,25 @@
+#ifndef NVIM_EVENT_UV_PROCESS_H
+#define NVIM_EVENT_UV_PROCESS_H
+
+#include <uv.h>
+
+#include "nvim/event/process.h"
+
+typedef struct uv_process {
+ Process process;
+ uv_process_t uv;
+ uv_process_options_t uvopts;
+ uv_stdio_container_t uvstdio[3];
+} UvProcess;
+
+static inline UvProcess uv_process_init(void *data)
+{
+ UvProcess rv;
+ rv.process = process_init(kProcessTypeUv, data);
+ return rv;
+}
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/uv_process.h.generated.h"
+#endif
+#endif // NVIM_EVENT_UV_PROCESS_H
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
new file mode 100644
index 0000000000..5fcb724fe3
--- /dev/null
+++ b/src/nvim/event/wstream.c
@@ -0,0 +1,162 @@
+#include <assert.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/wstream.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+
+#define DEFAULT_MAXMEM 1024 * 1024 * 10
+
+typedef struct {
+ Stream *stream;
+ WBuffer *buffer;
+ uv_write_t uv_req;
+} WRequest;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/wstream.c.generated.h"
+#endif
+
+void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(loop, stream, fd, NULL, data);
+ wstream_init(stream, maxmem);
+}
+
+void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(NULL, stream, -1, uvstream, data);
+ wstream_init(stream, maxmem);
+}
+
+void wstream_init(Stream *stream, size_t maxmem)
+{
+ stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM;
+}
+
+/// Sets a callback that will be called on completion of a write request,
+/// indicating failure/success.
+///
+/// This affects all requests currently in-flight as well. Overwrites any
+/// possible earlier callback.
+///
+/// @note This callback will not fire if the write request couldn't even be
+/// queued properly (i.e.: when `wstream_write() returns an error`).
+///
+/// @param stream The `Stream` instance
+/// @param cb The callback
+void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
+ FUNC_ATTR_NONNULL_ALL
+{
+ stream->write_cb = cb;
+}
+
+/// Queues data for writing to the backing file descriptor of a `Stream`
+/// instance. This will fail if the write would cause the Stream use more
+/// memory than specified by `maxmem`.
+///
+/// @param stream The `Stream` instance
+/// @param buffer The buffer which contains data to be written
+/// @return false if the write failed
+bool wstream_write(Stream *stream, WBuffer *buffer)
+ FUNC_ATTR_NONNULL_ALL
+{
+ assert(stream->maxmem);
+ // This should not be called after a stream was freed
+ assert(!stream->closed);
+
+ if (stream->curmem > stream->maxmem) {
+ goto err;
+ }
+
+ stream->curmem += buffer->size;
+
+ WRequest *data = xmalloc(sizeof(WRequest));
+ data->stream = stream;
+ data->buffer = buffer;
+ data->uv_req.data = data;
+
+ uv_buf_t uvbuf;
+ uvbuf.base = buffer->data;
+ uvbuf.len = buffer->size;
+
+ if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) {
+ xfree(data);
+ goto err;
+ }
+
+ stream->pending_reqs++;
+ return true;
+
+err:
+ wstream_release_wbuffer(buffer);
+ return false;
+}
+
+/// Creates a WBuffer object for holding output data. Instances of this
+/// object can be reused across Stream instances, and the memory is freed
+/// automatically when no longer needed(it tracks the number of references
+/// internally)
+///
+/// @param data Data stored by the WBuffer
+/// @param size The size of the data array
+/// @param refcount The number of references for the WBuffer. This will be used
+/// by Stream instances to decide when a WBuffer should be freed.
+/// @param cb Pointer to function that will be responsible for freeing
+/// the buffer data(passing 'free' will work as expected).
+/// @return The allocated WBuffer instance
+WBuffer *wstream_new_buffer(char *data,
+ size_t size,
+ size_t refcount,
+ wbuffer_data_finalizer cb)
+{
+ WBuffer *rv = xmalloc(sizeof(WBuffer));
+ rv->size = size;
+ rv->refcount = refcount;
+ rv->cb = cb;
+ rv->data = data;
+
+ return rv;
+}
+
+static void write_cb(uv_write_t *req, int status)
+{
+ WRequest *data = req->data;
+
+ data->stream->curmem -= data->buffer->size;
+
+ wstream_release_wbuffer(data->buffer);
+
+ if (data->stream->write_cb) {
+ data->stream->write_cb(data->stream, data->stream->data, status);
+ }
+
+ data->stream->pending_reqs--;
+
+ if (data->stream->closed && data->stream->pending_reqs == 0) {
+ // Last pending write, free the stream;
+ stream_close_handle(data->stream);
+ }
+
+ xfree(data);
+}
+
+void wstream_release_wbuffer(WBuffer *buffer)
+{
+ if (!--buffer->refcount) {
+ if (buffer->cb) {
+ buffer->cb(buffer->data);
+ }
+
+ xfree(buffer);
+ }
+}
diff --git a/src/nvim/event/wstream.h b/src/nvim/event/wstream.h
new file mode 100644
index 0000000000..9008de0d97
--- /dev/null
+++ b/src/nvim/event/wstream.h
@@ -0,0 +1,24 @@
+#ifndef NVIM_EVENT_WSTREAM_H
+#define NVIM_EVENT_WSTREAM_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/stream.h"
+
+typedef struct wbuffer WBuffer;
+typedef void (*wbuffer_data_finalizer)(void *data);
+
+struct wbuffer {
+ size_t size, refcount;
+ char *data;
+ wbuffer_data_finalizer cb;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/wstream.h.generated.h"
+#endif
+#endif // NVIM_EVENT_WSTREAM_H