diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:46:34 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:46:34 -0300 |
commit | 883b78d29864f39b8032468c4374766dad7d142f (patch) | |
tree | b555f3a48c08862c07ef7518a8ba6c8fa58c1aee /src/nvim/event | |
parent | d88c93acf390ea9d5e8674283927cff60fb41e0d (diff) | |
parent | aa9cb48bf08af14068178619414590254b263882 (diff) | |
download | rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.gz rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.bz2 rneovim-883b78d29864f39b8032468c4374766dad7d142f.zip |
Merge PR #2980 'Refactor event loop layer'
Helped-by: oni-link <knil.ino@gmail.com>
Reviewed-by: oni-link <knil.ino@gmail.com>
Reviewed-by: Scott Prager <splinterofchaos@gmail.com>
Diffstat (limited to 'src/nvim/event')
-rw-r--r-- | src/nvim/event/loop.c | 144 | ||||
-rw-r--r-- | src/nvim/event/loop.h | 59 | ||||
-rw-r--r-- | src/nvim/event/process.c | 325 | ||||
-rw-r--r-- | src/nvim/event/process.h | 56 | ||||
-rw-r--r-- | src/nvim/event/pty_process.c | 254 | ||||
-rw-r--r-- | src/nvim/event/pty_process.h | 30 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 167 | ||||
-rw-r--r-- | src/nvim/event/rstream.h | 16 | ||||
-rw-r--r-- | src/nvim/event/signal.c | 52 | ||||
-rw-r--r-- | src/nvim/event/signal.h | 22 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 157 | ||||
-rw-r--r-- | src/nvim/event/socket.h | 38 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 108 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 55 | ||||
-rw-r--r-- | src/nvim/event/time.c | 55 | ||||
-rw-r--r-- | src/nvim/event/time.h | 20 | ||||
-rw-r--r-- | src/nvim/event/uv_process.c | 77 | ||||
-rw-r--r-- | src/nvim/event/uv_process.h | 25 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 162 | ||||
-rw-r--r-- | src/nvim/event/wstream.h | 24 |
20 files changed, 1846 insertions, 0 deletions
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c new file mode 100644 index 0000000000..d90565002e --- /dev/null +++ b/src/nvim/event/loop.c @@ -0,0 +1,144 @@ +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/process.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/loop.c.generated.h" +#endif + + +void loop_init(Loop *loop, void *data) +{ + uv_loop_init(&loop->uv); + loop->uv.data = loop; + loop->deferred_events = kl_init(Event); + loop->immediate_events = kl_init(Event); + loop->children = kl_init(WatcherPtr); + loop->children_stop_requests = 0; + uv_signal_init(&loop->uv, &loop->children_watcher); + uv_timer_init(&loop->uv, &loop->children_kill_timer); +} + +void loop_poll_events(Loop *loop, int ms) +{ + static int recursive = 0; + + if (recursive++) { + abort(); // Should not re-enter uv_run + } + + bool wait = true; + uv_timer_t timer; + + if (ms > 0) { + uv_timer_init(&loop->uv, &timer); + // Use a repeating timeout of ms milliseconds to make sure + // we do not block indefinitely for I/O. + uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms); + } else if (ms == 0) { + // For ms == 0, we need to do a non-blocking event poll by + // setting the run mode to UV_RUN_NOWAIT. + wait = false; + } + + if (wait) { + loop_run_once(loop); + } else { + loop_run_nowait(loop); + } + + if (ms > 0) { + // Ensure the timer handle is closed and run the event loop + // once more to let libuv perform it's cleanup + uv_timer_stop(&timer); + uv_close((uv_handle_t *)&timer, NULL); + loop_run_nowait(loop); + } + + recursive--; // Can re-enter uv_run now + process_events_from(loop->immediate_events); +} + +bool loop_has_deferred_events(Loop *loop) +{ + return loop->deferred_events_allowed && !kl_empty(loop->deferred_events); +} + +void loop_enable_deferred_events(Loop *loop) +{ + ++loop->deferred_events_allowed; +} + +void loop_disable_deferred_events(Loop *loop) +{ + --loop->deferred_events_allowed; +} + +// Queue an event +void loop_push_event(Loop *loop, Event event, bool deferred) +{ + // Sometimes libuv will run pending callbacks(timer for example) before + // blocking for a poll. If this happens and the callback pushes a event to one + // of the queues, the event would only be processed after the poll + // returns(user hits a key for example). To avoid this scenario, we call + // uv_stop when a event is enqueued. + loop_stop(loop); + kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, + event); +} + +void loop_process_event(Loop *loop) +{ + process_events_from(loop->deferred_events); +} + + +void loop_run(Loop *loop) +{ + uv_run(&loop->uv, UV_RUN_DEFAULT); +} + +void loop_run_once(Loop *loop) +{ + uv_run(&loop->uv, UV_RUN_ONCE); +} + +void loop_run_nowait(Loop *loop) +{ + uv_run(&loop->uv, UV_RUN_NOWAIT); +} + +void loop_stop(Loop *loop) +{ + uv_stop(&loop->uv); +} + +void loop_close(Loop *loop) +{ + uv_close((uv_handle_t *)&loop->children_watcher, NULL); + uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); + do { + uv_run(&loop->uv, UV_RUN_DEFAULT); + } while (uv_loop_close(&loop->uv)); +} + +void loop_process_all_events(Loop *loop) +{ + process_events_from(loop->immediate_events); + process_events_from(loop->deferred_events); +} + +static void process_events_from(klist_t(Event) *queue) +{ + while (!kl_empty(queue)) { + Event event = kl_shift(Event, queue); + event.handler(event); + } +} + +static void timer_cb(uv_timer_t *handle) +{ +} diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h new file mode 100644 index 0000000000..5eb4d32ca8 --- /dev/null +++ b/src/nvim/event/loop.h @@ -0,0 +1,59 @@ +#ifndef NVIM_EVENT_LOOP_H +#define NVIM_EVENT_LOOP_H + +#include <stdint.h> + +#include <uv.h> + +#include "nvim/lib/klist.h" +#include "nvim/os/time.h" + +typedef struct event Event; +typedef void (*event_handler)(Event event); + +struct event { + void *data; + event_handler handler; +}; + +typedef void * WatcherPtr; + +#define _noop(x) +KLIST_INIT(WatcherPtr, WatcherPtr, _noop) +KLIST_INIT(Event, Event, _noop) + +typedef struct loop { + uv_loop_t uv; + klist_t(Event) *deferred_events, *immediate_events; + int deferred_events_allowed; + klist_t(WatcherPtr) *children; + uv_signal_t children_watcher; + uv_timer_t children_kill_timer; + size_t children_stop_requests; +} Loop; + +// Poll for events until a condition or timeout +#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \ + do { \ + int remaining = timeout; \ + uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ + while (!(condition)) { \ + loop_poll_events(loop, remaining); \ + if (remaining == 0) { \ + break; \ + } else if (remaining > 0) { \ + uint64_t now = os_hrtime(); \ + remaining -= (int) ((now - before) / 1000000); \ + before = now; \ + if (remaining <= 0) { \ + break; \ + } \ + } \ + } \ + } while (0) + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/loop.h.generated.h" +#endif + +#endif // NVIM_EVENT_LOOP_H diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c new file mode 100644 index 0000000000..2b1f1ae096 --- /dev/null +++ b/src/nvim/event/process.c @@ -0,0 +1,325 @@ +#include <assert.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/os/shell.h" +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/pty_process.h" +#include "nvim/globals.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/process.c.generated.h" +#endif + +// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly +// exit before we send SIGNAL to it +#define TERM_TIMEOUT 1000000000 +#define KILL_TIMEOUT (TERM_TIMEOUT * 2) + +#define CLOSE_PROC_STREAM(proc, stream) \ + do { \ + if (proc->stream && !proc->stream->closed) { \ + stream_close(proc->stream, NULL); \ + } \ + } while (0) + + +bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL +{ + proc->loop = loop; + if (proc->in) { + uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0); + } + + if (proc->out) { + uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0); + } + + if (proc->err) { + uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0); + } + + bool success; + switch (proc->type) { + case kProcessTypeUv: + success = uv_process_spawn((UvProcess *)proc); + break; + case kProcessTypePty: + success = pty_process_spawn((PtyProcess *)proc); + break; + default: + abort(); + } + + if (!success) { + if (proc->in) { + uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL); + } + if (proc->out) { + uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL); + } + if (proc->err) { + uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL); + } + process_close(proc); + shell_free_argv(proc->argv); + proc->status = -1; + return false; + } + + void *data = proc->data; + + if (proc->in) { + stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->internal_data = proc; + proc->in->internal_close_cb = on_process_stream_close; + proc->refcount++; + } + + if (proc->out) { + stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->internal_data = proc; + proc->out->internal_close_cb = on_process_stream_close; + proc->refcount++; + } + + if (proc->err) { + stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->internal_data = proc; + proc->err->internal_close_cb = on_process_stream_close; + proc->refcount++; + } + + proc->internal_exit_cb = on_process_exit; + proc->internal_close_cb = decref; + proc->refcount++; + kl_push(WatcherPtr, loop->children, proc); + return true; +} + +void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL +{ + kl_iter(WatcherPtr, loop->children, current) { + Process *proc = (*current)->data; + uv_kill(proc->pid, SIGTERM); + proc->term_sent = true; + process_stop(proc); + } + + // Wait until all children exit + LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + pty_process_teardown(loop); +} + +// Wrappers around `stream_close` that protect against double-closing. +void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + process_close_in(proc); + process_close_out(proc); + process_close_err(proc); +} + +void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + CLOSE_PROC_STREAM(proc, in); +} + +void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + CLOSE_PROC_STREAM(proc, out); +} + +void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + CLOSE_PROC_STREAM(proc, err); +} + +/// Synchronously wait for a process to finish +/// +/// @param process The Process instance +/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for +/// waiting until the process quits. +/// @return returns the status code of the exited process. -1 if the process is +/// still running and the `timeout` has expired. Note that this is +/// indistinguishable from the process returning -1 by itself. Which +/// is possible on some OS. Returns -2 if an user has interruped the +/// wait. +int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +{ + // The default status is -1, which represents a timeout + int status = -1; + bool interrupted = false; + + // Increase refcount to stop the exit callback from being called(and possibly + // being freed) before we have a chance to get the status. + proc->refcount++; + LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + // Until... + got_int || // interrupted by the user + proc->refcount == 1); // job exited + + // we'll assume that a user frantically hitting interrupt doesn't like + // the current job. Signal that it has to be killed. + if (got_int) { + interrupted = true; + got_int = false; + process_stop(proc); + if (ms == -1) { + // We can only return, if all streams/handles are closed and the job + + // exited. + LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + } else { + loop_poll_events(proc->loop, 0); + } + } + + if (proc->refcount == 1) { + // Job exited, collect status and manually invoke close_cb to free the job + // resources + status = interrupted ? -2 : proc->status; + decref(proc); + } else { + proc->refcount--; + } + + return status; +} + +/// Ask a process to terminate and eventually kill if it doesn't respond +void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL +{ + if (proc->stopped_time) { + return; + } + + proc->stopped_time = os_hrtime(); + switch (proc->type) { + case kProcessTypeUv: + // Close the process's stdin. If the process doesn't close its own + // stdout/stderr, they will be closed when it exits(possibly due to being + // terminated after a timeout) + process_close_in(proc); + break; + case kProcessTypePty: + // close all streams for pty processes to send SIGHUP to the process + process_close_streams(proc); + pty_process_close_master((PtyProcess *)proc); + break; + default: + abort(); + } + + Loop *loop = proc->loop; + if (!loop->children_stop_requests++) { + // When there's at least one stop request pending, start a timer that + // will periodically check if a signal should be send to a to the job + DLOG("Starting job kill timer"); + uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100); + } +} + +/// Iterates the process list sending SIGTERM to stopped processes and SIGKILL +/// to those that didn't die from SIGTERM after a while(exit_timeout is 0). +static void children_kill_cb(uv_timer_t *handle) +{ + Loop *loop = handle->loop->data; + uint64_t now = os_hrtime(); + + kl_iter(WatcherPtr, loop->children, current) { + Process *proc = (*current)->data; + if (!proc->stopped_time) { + continue; + } + uint64_t elapsed = now - proc->stopped_time; + + if (!proc->term_sent && elapsed >= TERM_TIMEOUT) { + ILOG("Sending SIGTERM to pid %d", proc->pid); + uv_kill(proc->pid, SIGTERM); + proc->term_sent = true; + } else if (elapsed >= KILL_TIMEOUT) { + ILOG("Sending SIGKILL to pid %d", proc->pid); + uv_kill(proc->pid, SIGKILL); + } + } +} + +static void decref(Process *proc) +{ + if (--proc->refcount != 0) { + return; + } + + Loop *loop = proc->loop; + kliter_t(WatcherPtr) **node = NULL; + kl_iter(WatcherPtr, loop->children, current) { + if ((*current)->data == proc) { + node = current; + break; + } + } + + assert(node); + kl_shift_at(WatcherPtr, loop->children, node); + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + +static void process_close(Process *proc) + FUNC_ATTR_NONNULL_ARG(1) +{ + assert(!proc->closed); + proc->closed = true; + switch (proc->type) { + case kProcessTypeUv: + uv_process_close((UvProcess *)proc); + break; + case kProcessTypePty: + pty_process_close((PtyProcess *)proc); + break; + default: + abort(); + } +} + +static void on_process_exit(Process *proc) +{ + if (exiting) { + on_process_exit_event((Event) {.data = proc}); + } else { + loop_push_event(proc->loop, + (Event) {.handler = on_process_exit_event, .data = proc}, false); + } + + Loop *loop = proc->loop; + if (loop->children_stop_requests && !--loop->children_stop_requests) { + // Stop the timer if no more stop requests are pending + DLOG("Stopping process kill timer"); + uv_timer_stop(&loop->children_kill_timer); + } +} + +static void on_process_exit_event(Event event) +{ + Process *proc = event.data; + process_close_streams(proc); + process_close(proc); +} + +static void on_process_stream_close(Stream *stream, void *data) +{ + Process *proc = data; + decref(proc); +} + diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h new file mode 100644 index 0000000000..5c84a7d1d0 --- /dev/null +++ b/src/nvim/event/process.h @@ -0,0 +1,56 @@ +#ifndef NVIM_EVENT_PROCESS_H +#define NVIM_EVENT_PROCESS_H + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" + +typedef enum { + kProcessTypeUv, + kProcessTypePty +} ProcessType; + +typedef struct process Process; +typedef void (*process_exit_cb)(Process *proc, int status, void *data); +typedef void (*internal_process_cb)(Process *proc); + +struct process { + ProcessType type; + Loop *loop; + void *data; + int pid, status, refcount; + // set to the hrtime of when process_stop was called for the process. + uint64_t stopped_time; + char **argv; + Stream *in, *out, *err; + process_exit_cb cb; + internal_process_cb internal_exit_cb, internal_close_cb; + bool closed, term_sent; +}; + +static inline Process process_init(ProcessType type, void *data) +{ + return (Process) { + .type = type, + .data = data, + .loop = NULL, + .pid = 0, + .status = 0, + .refcount = 0, + .stopped_time = 0, + .argv = NULL, + .in = NULL, + .out = NULL, + .err = NULL, + .cb = NULL, + .closed = false, + .term_sent = false, + .internal_close_cb = NULL, + .internal_exit_cb = NULL + }; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/process.h.generated.h" +#endif +#endif // NVIM_EVENT_PROCESS_H diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c new file mode 100644 index 0000000000..1e24d7c919 --- /dev/null +++ b/src/nvim/event/pty_process.c @@ -0,0 +1,254 @@ +// Some of the code came from pangoterm and libuv +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +#include <unistd.h> +#include <termios.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <sys/ioctl.h> + +// forkpty is not in POSIX, so headers are platform-specific +#if defined(__FreeBSD__) +# include <libutil.h> +#elif defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__) +# include <util.h> +#else +# include <pty.h> +#endif + +#include <uv.h> + +#include "nvim/lib/klist.h" + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/pty_process.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/pty_process.c.generated.h" +#endif + +static const unsigned int KILL_RETRIES = 5; +static const unsigned int KILL_TIMEOUT = 2; // seconds + +bool pty_process_spawn(PtyProcess *ptyproc) + FUNC_ATTR_NONNULL_ALL +{ + Process *proc = (Process *)ptyproc; + uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); + ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; + struct termios termios; + init_termios(&termios); + uv_disable_stdio_inheritance(); + int master; + int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize); + + if (pid < 0) { + ELOG("forkpty failed: %s", strerror(errno)); + return false; + } else if (pid == 0) { + init_child(ptyproc); + abort(); + } + + // make sure the master file descriptor is non blocking + int master_status_flags = fcntl(master, F_GETFL); + if (master_status_flags == -1) { + ELOG("Failed to get master descriptor status flags: %s", strerror(errno)); + goto error; + } + if (fcntl(master, F_SETFL, master_status_flags | O_NONBLOCK) == -1) { + ELOG("Failed to make master descriptor non-blocking: %s", strerror(errno)); + goto error; + } + + if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) { + goto error; + } + if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) { + goto error; + } + if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) { + goto error; + } + + ptyproc->tty_fd = master; + proc->pid = pid; + return true; + +error: + close(master); + + // terminate spawned process + kill(pid, SIGTERM); + int status, child; + unsigned int try = 0; + while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) { + sleep(KILL_TIMEOUT); + } + if (child != pid) { + kill(pid, SIGKILL); + waitpid(pid, NULL, 0); + } + + return false; +} + +void pty_process_resize(PtyProcess *ptyproc, uint16_t width, + uint16_t height) + FUNC_ATTR_NONNULL_ALL +{ + ptyproc->winsize = (struct winsize){height, width, 0, 0}; + ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize); +} + +void pty_process_close(PtyProcess *ptyproc) + FUNC_ATTR_NONNULL_ALL +{ + pty_process_close_master(ptyproc); + Process *proc = (Process *)ptyproc; + if (proc->internal_close_cb) { + proc->internal_close_cb(proc); + } +} + +void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL +{ + if (ptyproc->tty_fd >= 0) { + close(ptyproc->tty_fd); + ptyproc->tty_fd = -1; + } +} + +void pty_process_teardown(Loop *loop) +{ + uv_signal_stop(&loop->children_watcher); +} + +static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL +{ + unsetenv("COLUMNS"); + unsetenv("LINES"); + unsetenv("TERMCAP"); + unsetenv("COLORTERM"); + unsetenv("COLORFGBG"); + + signal(SIGCHLD, SIG_DFL); + signal(SIGHUP, SIG_DFL); + signal(SIGINT, SIG_DFL); + signal(SIGQUIT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + signal(SIGALRM, SIG_DFL); + + setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1); + execvp(ptyproc->process.argv[0], ptyproc->process.argv); + fprintf(stderr, "execvp failed: %s\n", strerror(errno)); +} + +static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL +{ + memset(termios, 0, sizeof(struct termios)); + // Taken from pangoterm + termios->c_iflag = ICRNL|IXON; + termios->c_oflag = OPOST|ONLCR; +#ifdef TAB0 + termios->c_oflag |= TAB0; +#endif + termios->c_cflag = CS8|CREAD; + termios->c_lflag = ISIG|ICANON|IEXTEN|ECHO|ECHOE|ECHOK; + + cfsetspeed(termios, 38400); + +#ifdef IUTF8 + termios->c_iflag |= IUTF8; +#endif +#ifdef NL0 + termios->c_oflag |= NL0; +#endif +#ifdef CR0 + termios->c_oflag |= CR0; +#endif +#ifdef BS0 + termios->c_oflag |= BS0; +#endif +#ifdef VT0 + termios->c_oflag |= VT0; +#endif +#ifdef FF0 + termios->c_oflag |= FF0; +#endif +#ifdef ECHOCTL + termios->c_lflag |= ECHOCTL; +#endif +#ifdef ECHOKE + termios->c_lflag |= ECHOKE; +#endif + + termios->c_cc[VINTR] = 0x1f & 'C'; + termios->c_cc[VQUIT] = 0x1f & '\\'; + termios->c_cc[VERASE] = 0x7f; + termios->c_cc[VKILL] = 0x1f & 'U'; + termios->c_cc[VEOF] = 0x1f & 'D'; + termios->c_cc[VEOL] = _POSIX_VDISABLE; + termios->c_cc[VEOL2] = _POSIX_VDISABLE; + termios->c_cc[VSTART] = 0x1f & 'Q'; + termios->c_cc[VSTOP] = 0x1f & 'S'; + termios->c_cc[VSUSP] = 0x1f & 'Z'; + termios->c_cc[VREPRINT] = 0x1f & 'R'; + termios->c_cc[VWERASE] = 0x1f & 'W'; + termios->c_cc[VLNEXT] = 0x1f & 'V'; + termios->c_cc[VMIN] = 1; + termios->c_cc[VTIME] = 0; +} + +static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe) + FUNC_ATTR_NONNULL_ALL +{ + int fd_dup = dup(fd); + if (fd_dup < 0) { + ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); + return false; + } + int uv_result = uv_pipe_open(pipe, fd_dup); + if (uv_result) { + ELOG("Failed to set pipe to descriptor %d: %s", + fd_dup, uv_strerror(uv_result)); + close(fd_dup); + return false; + } + return true; +} + +static void chld_handler(uv_signal_t *handle, int signum) +{ + int stat = 0; + int pid; + + do { + pid = waitpid(-1, &stat, WNOHANG); + } while (pid < 0 && errno == EINTR); + + if (pid <= 0) { + return; + } + + Loop *loop = handle->loop->data; + + kl_iter(WatcherPtr, loop->children, current) { + Process *proc = (*current)->data; + if (proc->pid == pid) { + if (WIFEXITED(stat)) { + proc->status = WEXITSTATUS(stat); + } else if (WIFSIGNALED(stat)) { + proc->status = WTERMSIG(stat); + } + proc->internal_exit_cb(proc); + break; + } + } +} diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h new file mode 100644 index 0000000000..a12b5489c5 --- /dev/null +++ b/src/nvim/event/pty_process.h @@ -0,0 +1,30 @@ +#ifndef NVIM_EVENT_PTY_PROCESS_H +#define NVIM_EVENT_PTY_PROCESS_H + +#include <sys/ioctl.h> + +#include "nvim/event/process.h" + +typedef struct pty_process { + Process process; + char *term_name; + uint16_t width, height; + struct winsize winsize; + int tty_fd; +} PtyProcess; + +static inline PtyProcess pty_process_init(void *data) +{ + PtyProcess rv; + rv.process = process_init(kProcessTypePty, data); + rv.term_name = NULL; + rv.width = 80; + rv.height = 24; + rv.tty_fd = -1; + return rv; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/pty_process.h.generated.h" +#endif +#endif // NVIM_EVENT_PTY_PROCESS_H diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c new file mode 100644 index 0000000000..78c044347f --- /dev/null +++ b/src/nvim/event/rstream.c @@ -0,0 +1,167 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/rstream.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/memory.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/event/loop.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.c.generated.h" +#endif + +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, + void *data) + FUNC_ATTR_NONNULL_ARG(1) + FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(loop, stream, fd, NULL, data); + rstream_init(stream, bufsize); +} + +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, + void *data) + FUNC_ATTR_NONNULL_ARG(1) + FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(NULL, stream, -1, uvstream, data); + rstream_init(stream, bufsize); +} + +void rstream_init(Stream *stream, size_t bufsize) + FUNC_ATTR_NONNULL_ARG(1) +{ + stream->buffer = rbuffer_new(bufsize); + stream->buffer->data = stream; + stream->buffer->full_cb = on_rbuffer_full; + stream->buffer->nonfull_cb = on_rbuffer_nonfull; +} + + +/// Starts watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_start(Stream *stream, stream_read_cb cb) +{ + stream->read_cb = cb; + if (stream->uvstream) { + uv_read_start(stream->uvstream, alloc_cb, read_cb); + } else { + uv_idle_start(&stream->uv.idle, fread_idle_cb); + } +} + +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(Stream *stream) +{ + if (stream->uvstream) { + uv_read_stop(stream->uvstream); + } else { + uv_idle_stop(&stream->uv.idle); + } +} + +static void on_rbuffer_full(RBuffer *buf, void *data) +{ + rstream_stop(data); +} + +static void on_rbuffer_nonfull(RBuffer *buf, void *data) +{ + Stream *stream = data; + assert(stream->read_cb); + rstream_start(stream, stream->read_cb); +} + +// Callbacks used by libuv + +// Called by libuv to allocate memory for reading. +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ + Stream *stream = handle->data; + buf->base = rbuffer_write_ptr(stream->buffer, &buf->len); +} + +// Callback invoked by libuv after it copies the data into the buffer provided +// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a +// 0-length buffer. +static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) +{ + Stream *stream = uvstream->data; + + if (cnt <= 0) { + if (cnt != UV_ENOBUFS + // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: + // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. + // + // We don't need to do anything with the RBuffer because the next call + // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` + // won't be called) + && cnt != 0) { + DLOG("Closing Stream(%p) because of %s(%zd)", stream, + uv_strerror((int)cnt), cnt); + // Read error or EOF, either way stop the stream and invoke the callback + // with eof == true + uv_read_stop(uvstream); + stream->read_cb(stream, stream->buffer, stream->data, true); + } + return; + } + + // at this point we're sure that cnt is positive, no error occurred + size_t nread = (size_t)cnt; + // Data was already written, so all we need is to update 'wpos' to reflect + // the space actually used in the buffer. + rbuffer_produced(stream->buffer, nread); + stream->read_cb(stream, stream->buffer, stream->data, false); +} + +// Called by the by the 'idle' handle to emulate a reading event +static void fread_idle_cb(uv_idle_t *handle) +{ + uv_fs_t req; + Stream *stream = handle->data; + + stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len); + + // the offset argument to uv_fs_read is int64_t, could someone really try + // to read more than 9 quintillion (9e18) bytes? + // upcast is meant to avoid tautological condition warning on 32 bits + uintmax_t fpos_intmax = stream->fpos; + if (fpos_intmax > INT64_MAX) { + ELOG("stream offset overflow"); + preserve_exit(); + } + + // Synchronous read + uv_fs_read( + handle->loop, + &req, + stream->fd, + &stream->uvbuf, + 1, + (int64_t) stream->fpos, + NULL); + + uv_fs_req_cleanup(&req); + + if (req.result <= 0) { + uv_idle_stop(&stream->uv.idle); + stream->read_cb(stream, stream->buffer, stream->data, true); + return; + } + + // no errors (req.result (ssize_t) is positive), it's safe to cast. + size_t nread = (size_t) req.result; + rbuffer_produced(stream->buffer, nread); + stream->fpos += nread; +} diff --git a/src/nvim/event/rstream.h b/src/nvim/event/rstream.h new file mode 100644 index 0000000000..f30ad79ee5 --- /dev/null +++ b/src/nvim/event/rstream.h @@ -0,0 +1,16 @@ +#ifndef NVIM_EVENT_RSTREAM_H +#define NVIM_EVENT_RSTREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.h.generated.h" +#endif +#endif // NVIM_EVENT_RSTREAM_H diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c new file mode 100644 index 0000000000..63133b4f57 --- /dev/null +++ b/src/nvim/event/signal.c @@ -0,0 +1,52 @@ +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/signal.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/signal.c.generated.h" +#endif + + +void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ + uv_signal_init(&loop->uv, &watcher->uv); + watcher->uv.data = watcher; + watcher->data = data; + watcher->cb = NULL; +} + +void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) + FUNC_ATTR_NONNULL_ALL +{ + watcher->cb = cb; + uv_signal_start(&watcher->uv, signal_watcher_cb, signum); +} + +void signal_watcher_stop(SignalWatcher *watcher) + FUNC_ATTR_NONNULL_ALL +{ + uv_signal_stop(&watcher->uv); +} + +void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) + FUNC_ATTR_NONNULL_ARG(1) +{ + watcher->close_cb = cb; + uv_close((uv_handle_t *)&watcher->uv, close_cb); +} + +static void signal_watcher_cb(uv_signal_t *handle, int signum) +{ + SignalWatcher *watcher = handle->data; + watcher->cb(watcher, signum, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ + SignalWatcher *watcher = handle->data; + if (watcher->close_cb) { + watcher->close_cb(watcher, watcher->data); + } +} diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h new file mode 100644 index 0000000000..c269fa9d95 --- /dev/null +++ b/src/nvim/event/signal.h @@ -0,0 +1,22 @@ +#ifndef NVIM_EVENT_SIGNAL_H +#define NVIM_EVENT_SIGNAL_H + +#include <uv.h> + +#include "nvim/event/loop.h" + +typedef struct signal_watcher SignalWatcher; +typedef void (*signal_cb)(SignalWatcher *watcher, int signum, void *data); +typedef void (*signal_close_cb)(SignalWatcher *watcher, void *data); + +struct signal_watcher { + uv_signal_t uv; + void *data; + signal_cb cb; + signal_close_cb close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/signal.h.generated.h" +#endif +#endif // NVIM_EVENT_SIGNAL_H diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c new file mode 100644 index 0000000000..bdc632abf0 --- /dev/null +++ b/src/nvim/event/socket.c @@ -0,0 +1,157 @@ +#include <assert.h> +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/socket.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/os/os.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/strings.h" +#include "nvim/path.h" +#include "nvim/memory.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/socket.c.generated.h" +#endif + +#define NVIM_DEFAULT_TCP_PORT 7450 + +void socket_watcher_init(Loop *loop, SocketWatcher *watcher, + const char *endpoint, void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3) +{ + // Trim to `ADDRESS_MAX_SIZE` + if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr)) + >= sizeof(watcher->addr)) { + // TODO(aktau): since this is not what the user wanted, perhaps we + // should return an error here + WLOG("Address was too long, truncated to %s", watcher->addr); + } + + bool tcp = true; + char ip[16], *ip_end = xstrchrnul(watcher->addr, ':'); + + // (ip_end - addr) is always > 0, so convert to size_t + size_t addr_len = (size_t)(ip_end - watcher->addr); + + if (addr_len > sizeof(ip) - 1) { + // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) + addr_len = sizeof(ip) - 1; + } + + // Extract the address part + xstrlcpy(ip, watcher->addr, addr_len + 1); + int port = NVIM_DEFAULT_TCP_PORT; + + if (*ip_end == ':') { + // Extract the port + long lport = strtol(ip_end + 1, NULL, 10); // NOLINT + if (lport <= 0 || lport > 0xffff) { + // Invalid port, treat as named pipe or unix socket + tcp = false; + } else { + port = (int) lport; + } + } + + if (tcp) { + // Try to parse ip address + if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) { + // Invalid address, treat as named pipe or unix socket + tcp = false; + } + } + + if (tcp) { + uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle); + watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle; + } else { + uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0); + watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle; + } + + watcher->stream->data = watcher; + watcher->cb = NULL; + watcher->close_cb = NULL; +} + +int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) + FUNC_ATTR_NONNULL_ALL +{ + watcher->cb = cb; + int result; + + if (watcher->stream->type == UV_TCP) { + result = uv_tcp_bind(&watcher->uv.tcp.handle, + (const struct sockaddr *)&watcher->uv.tcp.addr, 0); + } else { + result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr); + } + + if (result == 0) { + result = uv_listen(watcher->stream, backlog, connection_cb); + } + + assert(result <= 0); // libuv should have returned -errno or zero. + if (result < 0) { + if (result == -EACCES) { + // Libuv converts ENOENT to EACCES for Windows compatibility, but if + // the parent directory does not exist, ENOENT would be more accurate. + *path_tail((char_u *)watcher->addr) = NUL; + if (!os_file_exists((char_u *)watcher->addr)) { + result = -ENOENT; + } + } + return result; + } + + return 0; +} + +int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) +{ + uv_stream_t *client; + + if (watcher->stream->type == UV_TCP) { + client = (uv_stream_t *)&stream->uv.tcp; + uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); + } else { + client = (uv_stream_t *)&stream->uv.pipe; + uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); + } + + int result = uv_accept(watcher->stream, client); + + if (result) { + uv_close((uv_handle_t *)client, NULL); + return result; + } + + stream_init(NULL, stream, -1, client, data); + return 0; +} + +void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) + FUNC_ATTR_NONNULL_ARG(1) +{ + watcher->close_cb = cb; + uv_close((uv_handle_t *)watcher->stream, close_cb); +} + +static void connection_cb(uv_stream_t *handle, int status) +{ + SocketWatcher *watcher = handle->data; + watcher->cb(watcher, status, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ + SocketWatcher *watcher = handle->data; + if (watcher->close_cb) { + watcher->close_cb(watcher, watcher->data); + } +} diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h new file mode 100644 index 0000000000..17fd39f33b --- /dev/null +++ b/src/nvim/event/socket.h @@ -0,0 +1,38 @@ +#ifndef NVIM_EVENT_SOCKET_H +#define NVIM_EVENT_SOCKET_H + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" + +#define ADDRESS_MAX_SIZE 256 + +typedef struct socket_watcher SocketWatcher; +typedef void (*socket_cb)(SocketWatcher *watcher, int result, void *data); +typedef void (*socket_close_cb)(SocketWatcher *watcher, void *data); + +struct socket_watcher { + // Pipe/socket path, or TCP address string + char addr[ADDRESS_MAX_SIZE]; + // TCP server or unix socket (named pipe on Windows) + union { + struct { + uv_tcp_t handle; + struct sockaddr_in addr; + } tcp; + struct { + uv_pipe_t handle; + } pipe; + } uv; + uv_stream_t *stream; + void *data; + socket_cb cb; + socket_close_cb close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/socket.h.generated.h" +#endif +#endif // NVIM_EVENT_SOCKET_H diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c new file mode 100644 index 0000000000..959b532146 --- /dev/null +++ b/src/nvim/event/stream.c @@ -0,0 +1,108 @@ +#include <assert.h> +#include <stdio.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/rbuffer.h" +#include "nvim/event/stream.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.c.generated.h" +#endif + +/// Sets the stream associated with `fd` to "blocking" mode. +/// +/// @return `0` on success, or `-errno` on failure. +int stream_set_blocking(int fd, bool blocking) +{ + // Private loop to avoid conflict with existing watcher(s): + // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. + uv_loop_t loop; + uv_pipe_t stream; + uv_loop_init(&loop); + uv_pipe_init(&loop, &stream, 0); + uv_pipe_open(&stream, fd); + int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); + uv_close((uv_handle_t *)&stream, NULL); + uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt. + uv_loop_close(&loop); + return retval; +} + +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, + void *data) +{ + stream->uvstream = uvstream; + + if (fd >= 0) { + uv_handle_type type = uv_guess_handle(fd); + stream->fd = fd; + + if (type == UV_FILE) { + // Non-blocking file reads are simulated with an idle handle that reads in + // chunks of the ring buffer size, giving time for other events to be + // processed between reads. + uv_idle_init(&loop->uv, &stream->uv.idle); + stream->uv.idle.data = stream; + } else { + assert(type == UV_NAMED_PIPE || type == UV_TTY); + uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); + uv_pipe_open(&stream->uv.pipe, fd); + stream->uvstream = (uv_stream_t *)&stream->uv.pipe; + } + } + + if (stream->uvstream) { + stream->uvstream->data = stream; + } + + stream->data = data; + stream->internal_data = NULL; + stream->fpos = 0; + stream->curmem = 0; + stream->maxmem = 0; + stream->pending_reqs = 0; + stream->read_cb = NULL; + stream->write_cb = NULL; + stream->close_cb = NULL; + stream->internal_close_cb = NULL; + stream->closed = false; + stream->buffer = NULL; +} + +void stream_close(Stream *stream, stream_close_cb on_stream_close) +{ + assert(!stream->closed); + + if (stream->buffer) { + rbuffer_free(stream->buffer); + } + + stream->closed = true; + stream->close_cb = on_stream_close; + + if (!stream->pending_reqs) { + stream_close_handle(stream); + } +} + +void stream_close_handle(Stream *stream) +{ + if (stream->uvstream) { + uv_close((uv_handle_t *)stream->uvstream, close_cb); + } else { + uv_close((uv_handle_t *)&stream->uv.idle, close_cb); + } +} + +static void close_cb(uv_handle_t *handle) +{ + Stream *stream = handle->data; + if (stream->close_cb) { + stream->close_cb(stream, stream->data); + } + if (stream->internal_close_cb) { + stream->internal_close_cb(stream, stream->internal_data); + } +} diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h new file mode 100644 index 0000000000..37410b2036 --- /dev/null +++ b/src/nvim/event/stream.h @@ -0,0 +1,55 @@ +#ifndef NVIM_EVENT_STREAM_H +#define NVIM_EVENT_STREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/rbuffer.h" + +typedef struct stream Stream; +/// Type of function called when the Stream buffer is filled with data +/// +/// @param stream The Stream instance +/// @param rbuffer The associated RBuffer instance +/// @param data User-defined data +/// @param eof If the stream reached EOF. +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, + bool eof); + +/// Type of function called when the Stream has information about a write +/// request. +/// +/// @param wstream The Stream instance +/// @param data User-defined data +/// @param status 0 on success, anything else indicates failure +typedef void (*stream_write_cb)(Stream *stream, void *data, int status); +typedef void (*stream_close_cb)(Stream *stream, void *data); + +struct stream { + union { + uv_pipe_t pipe; + uv_tcp_t tcp; + uv_idle_t idle; + } uv; + uv_stream_t *uvstream; + uv_buf_t uvbuf; + RBuffer *buffer; + uv_file fd; + stream_read_cb read_cb; + stream_write_cb write_cb; + stream_close_cb close_cb, internal_close_cb; + size_t fpos; + size_t curmem; + size_t maxmem; + size_t pending_reqs; + void *data, *internal_data; + bool closed; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.h.generated.h" +#endif +#endif // NVIM_EVENT_STREAM_H diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c new file mode 100644 index 0000000000..ce33cdfc10 --- /dev/null +++ b/src/nvim/event/time.c @@ -0,0 +1,55 @@ +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/time.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/time.c.generated.h" +#endif + + +void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ + uv_timer_init(&loop->uv, &watcher->uv); + watcher->uv.data = watcher; + watcher->data = data; +} + +void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, + uint64_t repeat) + FUNC_ATTR_NONNULL_ALL +{ + watcher->cb = cb; + uv_timer_start(&watcher->uv, time_watcher_cb, timeout, repeat); +} + +void time_watcher_stop(TimeWatcher *watcher) + FUNC_ATTR_NONNULL_ALL +{ + uv_timer_stop(&watcher->uv); +} + +void time_watcher_close(TimeWatcher *watcher, time_cb cb) + FUNC_ATTR_NONNULL_ARG(1) +{ + watcher->close_cb = cb; + uv_close((uv_handle_t *)&watcher->uv, close_cb); +} + +static void time_watcher_cb(uv_timer_t *handle) + FUNC_ATTR_NONNULL_ALL +{ + TimeWatcher *watcher = handle->data; + watcher->cb(watcher, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ + TimeWatcher *watcher = handle->data; + if (watcher->close_cb) { + watcher->close_cb(watcher, watcher->data); + } +} diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h new file mode 100644 index 0000000000..ee50e53d11 --- /dev/null +++ b/src/nvim/event/time.h @@ -0,0 +1,20 @@ +#ifndef NVIM_EVENT_TIME_H +#define NVIM_EVENT_TIME_H + +#include <uv.h> + +#include "nvim/event/loop.h" + +typedef struct time_watcher TimeWatcher; +typedef void (*time_cb)(TimeWatcher *watcher, void *data); + +struct time_watcher { + uv_timer_t uv; + void *data; + time_cb cb, close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/time.h.generated.h" +#endif +#endif // NVIM_EVENT_TIME_H diff --git a/src/nvim/event/uv_process.c b/src/nvim/event/uv_process.c new file mode 100644 index 0000000000..21c2fd1790 --- /dev/null +++ b/src/nvim/event/uv_process.c @@ -0,0 +1,77 @@ +#include <assert.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/uv_process.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/uv_process.c.generated.h" +#endif + +bool uv_process_spawn(UvProcess *uvproc) + FUNC_ATTR_NONNULL_ALL +{ + Process *proc = (Process *)uvproc; + uvproc->uvopts.file = proc->argv[0]; + uvproc->uvopts.args = proc->argv; + uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; + uvproc->uvopts.exit_cb = exit_cb; + uvproc->uvopts.cwd = NULL; + uvproc->uvopts.env = NULL; + uvproc->uvopts.stdio = uvproc->uvstdio; + uvproc->uvopts.stdio_count = 3; + uvproc->uvstdio[0].flags = UV_IGNORE; + uvproc->uvstdio[1].flags = UV_IGNORE; + uvproc->uvstdio[2].flags = UV_IGNORE; + uvproc->uv.data = proc; + + if (proc->in) { + uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe; + } + + if (proc->out) { + uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe; + } + + if (proc->err) { + uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe; + } + + int status; + if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) { + ELOG("uv_spawn failed: %s", uv_strerror(status)); + return false; + } + + proc->pid = uvproc->uv.pid; + return true; +} + +void uv_process_close(UvProcess *uvproc) + FUNC_ATTR_NONNULL_ARG(1) +{ + uv_close((uv_handle_t *)&uvproc->uv, close_cb); +} + +static void close_cb(uv_handle_t *handle) +{ + Process *proc = handle->data; + if (proc->internal_close_cb) { + proc->internal_close_cb(proc); + } +} + +static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) +{ + Process *proc = handle->data; + proc->status = (int)status; + proc->internal_exit_cb(proc); +} diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h new file mode 100644 index 0000000000..a17f1446b3 --- /dev/null +++ b/src/nvim/event/uv_process.h @@ -0,0 +1,25 @@ +#ifndef NVIM_EVENT_UV_PROCESS_H +#define NVIM_EVENT_UV_PROCESS_H + +#include <uv.h> + +#include "nvim/event/process.h" + +typedef struct uv_process { + Process process; + uv_process_t uv; + uv_process_options_t uvopts; + uv_stdio_container_t uvstdio[3]; +} UvProcess; + +static inline UvProcess uv_process_init(void *data) +{ + UvProcess rv; + rv.process = process_init(kProcessTypeUv, data); + return rv; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/uv_process.h.generated.h" +#endif +#endif // NVIM_EVENT_UV_PROCESS_H diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c new file mode 100644 index 0000000000..5fcb724fe3 --- /dev/null +++ b/src/nvim/event/wstream.c @@ -0,0 +1,162 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/wstream.h" +#include "nvim/vim.h" +#include "nvim/memory.h" + +#define DEFAULT_MAXMEM 1024 * 1024 * 10 + +typedef struct { + Stream *stream; + WBuffer *buffer; + uv_write_t uv_req; +} WRequest; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.c.generated.h" +#endif + +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, + void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(loop, stream, fd, NULL, data); + wstream_init(stream, maxmem); +} + +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, + void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(NULL, stream, -1, uvstream, data); + wstream_init(stream, maxmem); +} + +void wstream_init(Stream *stream, size_t maxmem) +{ + stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM; +} + +/// Sets a callback that will be called on completion of a write request, +/// indicating failure/success. +/// +/// This affects all requests currently in-flight as well. Overwrites any +/// possible earlier callback. +/// +/// @note This callback will not fire if the write request couldn't even be +/// queued properly (i.e.: when `wstream_write() returns an error`). +/// +/// @param stream The `Stream` instance +/// @param cb The callback +void wstream_set_write_cb(Stream *stream, stream_write_cb cb) + FUNC_ATTR_NONNULL_ALL +{ + stream->write_cb = cb; +} + +/// Queues data for writing to the backing file descriptor of a `Stream` +/// instance. This will fail if the write would cause the Stream use more +/// memory than specified by `maxmem`. +/// +/// @param stream The `Stream` instance +/// @param buffer The buffer which contains data to be written +/// @return false if the write failed +bool wstream_write(Stream *stream, WBuffer *buffer) + FUNC_ATTR_NONNULL_ALL +{ + assert(stream->maxmem); + // This should not be called after a stream was freed + assert(!stream->closed); + + if (stream->curmem > stream->maxmem) { + goto err; + } + + stream->curmem += buffer->size; + + WRequest *data = xmalloc(sizeof(WRequest)); + data->stream = stream; + data->buffer = buffer; + data->uv_req.data = data; + + uv_buf_t uvbuf; + uvbuf.base = buffer->data; + uvbuf.len = buffer->size; + + if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) { + xfree(data); + goto err; + } + + stream->pending_reqs++; + return true; + +err: + wstream_release_wbuffer(buffer); + return false; +} + +/// Creates a WBuffer object for holding output data. Instances of this +/// object can be reused across Stream instances, and the memory is freed +/// automatically when no longer needed(it tracks the number of references +/// internally) +/// +/// @param data Data stored by the WBuffer +/// @param size The size of the data array +/// @param refcount The number of references for the WBuffer. This will be used +/// by Stream instances to decide when a WBuffer should be freed. +/// @param cb Pointer to function that will be responsible for freeing +/// the buffer data(passing 'free' will work as expected). +/// @return The allocated WBuffer instance +WBuffer *wstream_new_buffer(char *data, + size_t size, + size_t refcount, + wbuffer_data_finalizer cb) +{ + WBuffer *rv = xmalloc(sizeof(WBuffer)); + rv->size = size; + rv->refcount = refcount; + rv->cb = cb; + rv->data = data; + + return rv; +} + +static void write_cb(uv_write_t *req, int status) +{ + WRequest *data = req->data; + + data->stream->curmem -= data->buffer->size; + + wstream_release_wbuffer(data->buffer); + + if (data->stream->write_cb) { + data->stream->write_cb(data->stream, data->stream->data, status); + } + + data->stream->pending_reqs--; + + if (data->stream->closed && data->stream->pending_reqs == 0) { + // Last pending write, free the stream; + stream_close_handle(data->stream); + } + + xfree(data); +} + +void wstream_release_wbuffer(WBuffer *buffer) +{ + if (!--buffer->refcount) { + if (buffer->cb) { + buffer->cb(buffer->data); + } + + xfree(buffer); + } +} diff --git a/src/nvim/event/wstream.h b/src/nvim/event/wstream.h new file mode 100644 index 0000000000..9008de0d97 --- /dev/null +++ b/src/nvim/event/wstream.h @@ -0,0 +1,24 @@ +#ifndef NVIM_EVENT_WSTREAM_H +#define NVIM_EVENT_WSTREAM_H + +#include <stdint.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + +typedef struct wbuffer WBuffer; +typedef void (*wbuffer_data_finalizer)(void *data); + +struct wbuffer { + size_t size, refcount; + char *data; + wbuffer_data_finalizer cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.h.generated.h" +#endif +#endif // NVIM_EVENT_WSTREAM_H |