aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/defs.h39
-rw-r--r--src/nvim/event/loop.c95
-rw-r--r--src/nvim/event/loop.h39
-rw-r--r--src/nvim/event/process.c92
-rw-r--r--src/nvim/event/process.h6
-rw-r--r--src/nvim/event/pty_process.c30
-rw-r--r--src/nvim/event/pty_process.h4
-rw-r--r--src/nvim/event/queue.c208
-rw-r--r--src/nvim/event/queue.h19
-rw-r--r--src/nvim/event/rstream.c23
-rw-r--r--src/nvim/event/signal.c9
-rw-r--r--src/nvim/event/signal.h1
-rw-r--r--src/nvim/event/socket.c12
-rw-r--r--src/nvim/event/socket.h1
-rw-r--r--src/nvim/event/stream.c13
-rw-r--r--src/nvim/event/stream.h9
-rw-r--r--src/nvim/event/time.c9
-rw-r--r--src/nvim/event/time.h1
-rw-r--r--src/nvim/event/uv_process.h4
-rw-r--r--src/nvim/event/wstream.c2
20 files changed, 444 insertions, 172 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h
new file mode 100644
index 0000000000..5126d52241
--- /dev/null
+++ b/src/nvim/event/defs.h
@@ -0,0 +1,39 @@
+#ifndef NVIM_EVENT_DEFS_H
+#define NVIM_EVENT_DEFS_H
+
+#include <assert.h>
+#include <stdarg.h>
+
+#define EVENT_HANDLER_MAX_ARGC 4
+
+typedef void (*argv_callback)(void **argv);
+typedef struct message {
+ int priority;
+ argv_callback handler;
+ void *argv[EVENT_HANDLER_MAX_ARGC];
+} Event;
+
+#define VA_EVENT_INIT(event, p, h, a) \
+ do { \
+ assert(a <= EVENT_HANDLER_MAX_ARGC); \
+ (event)->priority = p; \
+ (event)->handler = h; \
+ if (a) { \
+ va_list args; \
+ va_start(args, a); \
+ for (int i = 0; i < a; i++) { \
+ (event)->argv[i] = va_arg(args, void *); \
+ } \
+ va_end(args); \
+ } \
+ } while (0)
+
+static inline Event event_create(int priority, argv_callback cb, int argc, ...)
+{
+ assert(argc <= EVENT_HANDLER_MAX_ARGC);
+ Event event;
+ VA_EVENT_INIT(&event, priority, cb, argc);
+ return event;
+}
+
+#endif // NVIM_EVENT_DEFS_H
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
index d90565002e..3d3288f858 100644
--- a/src/nvim/event/loop.c
+++ b/src/nvim/event/loop.c
@@ -1,3 +1,4 @@
+#include <stdarg.h>
#include <stdint.h>
#include <uv.h>
@@ -9,17 +10,23 @@
# include "event/loop.c.generated.h"
#endif
+typedef struct idle_event {
+ uv_idle_t idle;
+ Event event;
+} IdleEvent;
+
void loop_init(Loop *loop, void *data)
{
uv_loop_init(&loop->uv);
loop->uv.data = loop;
- loop->deferred_events = kl_init(Event);
- loop->immediate_events = kl_init(Event);
loop->children = kl_init(WatcherPtr);
loop->children_stop_requests = 0;
+ loop->events = queue_new_parent(loop_on_put, loop);
+ loop->fast_events = queue_new_child(loop->events);
uv_signal_init(&loop->uv, &loop->children_watcher);
uv_timer_init(&loop->uv, &loop->children_kill_timer);
+ uv_timer_init(&loop->uv, &loop->poll_timer);
}
void loop_poll_events(Loop *loop, int ms)
@@ -30,89 +37,36 @@ void loop_poll_events(Loop *loop, int ms)
abort(); // Should not re-enter uv_run
}
- bool wait = true;
- uv_timer_t timer;
+ uv_run_mode mode = UV_RUN_ONCE;
if (ms > 0) {
- uv_timer_init(&loop->uv, &timer);
// Use a repeating timeout of ms milliseconds to make sure
// we do not block indefinitely for I/O.
- uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
+ uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
} else if (ms == 0) {
// For ms == 0, we need to do a non-blocking event poll by
// setting the run mode to UV_RUN_NOWAIT.
- wait = false;
+ mode = UV_RUN_NOWAIT;
}
- if (wait) {
- loop_run_once(loop);
- } else {
- loop_run_nowait(loop);
- }
+ uv_run(&loop->uv, mode);
if (ms > 0) {
- // Ensure the timer handle is closed and run the event loop
- // once more to let libuv perform it's cleanup
- uv_timer_stop(&timer);
- uv_close((uv_handle_t *)&timer, NULL);
- loop_run_nowait(loop);
+ uv_timer_stop(&loop->poll_timer);
}
recursive--; // Can re-enter uv_run now
- process_events_from(loop->immediate_events);
-}
-
-bool loop_has_deferred_events(Loop *loop)
-{
- return loop->deferred_events_allowed && !kl_empty(loop->deferred_events);
+ queue_process_events(loop->fast_events);
}
-void loop_enable_deferred_events(Loop *loop)
-{
- ++loop->deferred_events_allowed;
-}
-
-void loop_disable_deferred_events(Loop *loop)
-{
- --loop->deferred_events_allowed;
-}
-
-// Queue an event
-void loop_push_event(Loop *loop, Event event, bool deferred)
+void loop_on_put(Queue *queue, void *data)
{
+ Loop *loop = data;
// Sometimes libuv will run pending callbacks(timer for example) before
// blocking for a poll. If this happens and the callback pushes a event to one
// of the queues, the event would only be processed after the poll
// returns(user hits a key for example). To avoid this scenario, we call
// uv_stop when a event is enqueued.
- loop_stop(loop);
- kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events,
- event);
-}
-
-void loop_process_event(Loop *loop)
-{
- process_events_from(loop->deferred_events);
-}
-
-
-void loop_run(Loop *loop)
-{
- uv_run(&loop->uv, UV_RUN_DEFAULT);
-}
-
-void loop_run_once(Loop *loop)
-{
- uv_run(&loop->uv, UV_RUN_ONCE);
-}
-
-void loop_run_nowait(Loop *loop)
-{
- uv_run(&loop->uv, UV_RUN_NOWAIT);
-}
-
-void loop_stop(Loop *loop)
-{
uv_stop(&loop->uv);
}
@@ -120,25 +74,12 @@ void loop_close(Loop *loop)
{
uv_close((uv_handle_t *)&loop->children_watcher, NULL);
uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
+ uv_close((uv_handle_t *)&loop->poll_timer, NULL);
do {
uv_run(&loop->uv, UV_RUN_DEFAULT);
} while (uv_loop_close(&loop->uv));
}
-void loop_process_all_events(Loop *loop)
-{
- process_events_from(loop->immediate_events);
- process_events_from(loop->deferred_events);
-}
-
-static void process_events_from(klist_t(Event) *queue)
-{
- while (!kl_empty(queue)) {
- Event event = kl_shift(Event, queue);
- event.handler(event);
- }
-}
-
static void timer_cb(uv_timer_t *handle)
{
}
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
index 5eb4d32ca8..9212a45aa4 100644
--- a/src/nvim/event/loop.h
+++ b/src/nvim/event/loop.h
@@ -7,38 +7,39 @@
#include "nvim/lib/klist.h"
#include "nvim/os/time.h"
-
-typedef struct event Event;
-typedef void (*event_handler)(Event event);
-
-struct event {
- void *data;
- event_handler handler;
-};
+#include "nvim/event/queue.h"
typedef void * WatcherPtr;
#define _noop(x)
KLIST_INIT(WatcherPtr, WatcherPtr, _noop)
-KLIST_INIT(Event, Event, _noop)
typedef struct loop {
uv_loop_t uv;
- klist_t(Event) *deferred_events, *immediate_events;
- int deferred_events_allowed;
+ Queue *events, *fast_events;
klist_t(WatcherPtr) *children;
uv_signal_t children_watcher;
- uv_timer_t children_kill_timer;
+ uv_timer_t children_kill_timer, poll_timer;
size_t children_stop_requests;
} Loop;
+#define CREATE_EVENT(queue, handler, argc, ...) \
+ do { \
+ if (queue) { \
+ queue_put((queue), (handler), argc, __VA_ARGS__); \
+ } else { \
+ void *argv[argc] = {__VA_ARGS__}; \
+ (handler)(argv); \
+ } \
+ } while (0)
+
// Poll for events until a condition or timeout
-#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \
+#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \
do { \
int remaining = timeout; \
uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
while (!(condition)) { \
- loop_poll_events(loop, remaining); \
+ LOOP_PROCESS_EVENTS(loop, queue, remaining); \
if (remaining == 0) { \
break; \
} else if (remaining > 0) { \
@@ -52,6 +53,16 @@ typedef struct loop {
} \
} while (0)
+#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \
+ do { \
+ if (queue && !queue_empty(queue)) { \
+ queue_process_events(queue); \
+ } else { \
+ loop_poll_events(loop, timeout); \
+ } \
+ } while (0)
+
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/loop.h.generated.h"
#endif
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 2b1f1ae096..81d4e690c3 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -22,7 +22,7 @@
#define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
-#define CLOSE_PROC_STREAM(proc, stream) \
+#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
stream_close(proc->stream, NULL); \
@@ -30,19 +30,18 @@
} while (0)
-bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
+bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
{
- proc->loop = loop;
if (proc->in) {
- uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0);
+ uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
}
if (proc->out) {
- uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0);
+ uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
}
if (proc->err) {
- uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0);
+ uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
}
bool success;
@@ -77,6 +76,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
if (proc->in) {
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
+ proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
proc->refcount++;
@@ -84,6 +84,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
if (proc->out) {
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
+ proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
proc->refcount++;
@@ -91,6 +92,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
if (proc->err) {
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
+ proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
proc->refcount++;
@@ -99,7 +101,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
proc->internal_exit_cb = on_process_exit;
proc->internal_close_cb = decref;
proc->refcount++;
- kl_push(WatcherPtr, loop->children, proc);
+ kl_push(WatcherPtr, proc->loop->children, proc);
return true;
}
@@ -113,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
}
// Wait until all children exit
- LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children));
+ LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children));
pty_process_teardown(loop);
}
@@ -150,16 +152,24 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// indistinguishable from the process returning -1 by itself. Which
/// is possible on some OS. Returns -2 if an user has interruped the
/// wait.
-int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
+int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1)
{
// The default status is -1, which represents a timeout
int status = -1;
bool interrupted = false;
+ if (!proc->refcount) {
+ LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
+ return proc->status;
+ }
+
+ if (!events) {
+ events = proc->events;
+ }
// Increase refcount to stop the exit callback from being called(and possibly
// being freed) before we have a chance to get the status.
proc->refcount++;
- LOOP_POLL_EVENTS_UNTIL(proc->loop, ms,
+ LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms,
// Until...
got_int || // interrupted by the user
proc->refcount == 1); // job exited
@@ -171,12 +181,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
got_int = false;
process_stop(proc);
if (ms == -1) {
- // We can only return, if all streams/handles are closed and the job
-
+ // We can only return if all streams/handles are closed and the job
// exited.
- LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1);
+ LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1,
+ proc->refcount == 1);
} else {
- loop_poll_events(proc->loop, 0);
+ LOOP_PROCESS_EVENTS(proc->loop, events, 0);
}
}
@@ -185,6 +195,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
// resources
status = interrupted ? -2 : proc->status;
decref(proc);
+ if (events) {
+ // the decref call created an exit event, process it now
+ queue_process_events(events);
+ }
} else {
proc->refcount--;
}
@@ -250,6 +264,18 @@ static void children_kill_cb(uv_timer_t *handle)
}
}
+static void process_close_event(void **argv)
+{
+ Process *proc = argv[0];
+ shell_free_argv(proc->argv);
+ if (proc->type == kProcessTypePty) {
+ xfree(((PtyProcess *)proc)->term_name);
+ }
+ if (proc->cb) {
+ proc->cb(proc, proc->status, proc->data);
+ }
+}
+
static void decref(Process *proc)
{
if (--proc->refcount != 0) {
@@ -264,16 +290,9 @@ static void decref(Process *proc)
break;
}
}
-
assert(node);
kl_shift_at(WatcherPtr, loop->children, node);
- shell_free_argv(proc->argv);
- if (proc->type == kProcessTypePty) {
- xfree(((PtyProcess *)proc)->term_name);
- }
- if (proc->cb) {
- proc->cb(proc, proc->status, proc->data);
- }
+ CREATE_EVENT(proc->events, process_close_event, 1, proc);
}
static void process_close(Process *proc)
@@ -293,28 +312,27 @@ static void process_close(Process *proc)
}
}
-static void on_process_exit(Process *proc)
+static void process_close_handles(void **argv)
{
- if (exiting) {
- on_process_exit_event((Event) {.data = proc});
- } else {
- loop_push_event(proc->loop,
- (Event) {.handler = on_process_exit_event, .data = proc}, false);
- }
+ Process *proc = argv[0];
+ process_close_streams(proc);
+ process_close(proc);
+}
+static void on_process_exit(Process *proc)
+{
Loop *loop = proc->loop;
- if (loop->children_stop_requests && !--loop->children_stop_requests) {
+ if (proc->stopped_time && loop->children_stop_requests
+ && !--loop->children_stop_requests) {
// Stop the timer if no more stop requests are pending
DLOG("Stopping process kill timer");
uv_timer_stop(&loop->children_kill_timer);
}
-}
-
-static void on_process_exit_event(Event event)
-{
- Process *proc = event.data;
- process_close_streams(proc);
- process_close(proc);
+ // Process handles are closed in the next event loop tick. This is done to
+ // give libuv more time to read data from the OS after the process exits(If
+ // process_close_streams is called with data still in the OS buffer, we lose
+ // it)
+ CREATE_EVENT(proc->events, process_close_handles, 1, proc);
}
static void on_process_stream_close(Stream *stream, void *data)
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c84a7d1d0..45edc46b95 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -26,14 +26,16 @@ struct process {
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
bool closed, term_sent;
+ Queue *events;
};
-static inline Process process_init(ProcessType type, void *data)
+static inline Process process_init(Loop *loop, ProcessType type, void *data)
{
return (Process) {
.type = type,
.data = data,
- .loop = NULL,
+ .loop = loop,
+ .events = NULL,
.pid = 0,
.status = 0,
.refcount = 0,
diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c
index 1e24d7c919..8eef72f12f 100644
--- a/src/nvim/event/pty_process.c
+++ b/src/nvim/event/pty_process.c
@@ -33,17 +33,18 @@
# include "event/pty_process.c.generated.h"
#endif
-static const unsigned int KILL_RETRIES = 5;
-static const unsigned int KILL_TIMEOUT = 2; // seconds
-
bool pty_process_spawn(PtyProcess *ptyproc)
FUNC_ATTR_NONNULL_ALL
{
+ static struct termios termios;
+ if (!termios.c_cflag) {
+ init_termios(&termios);
+ }
+
Process *proc = (Process *)ptyproc;
+ assert(!proc->err);
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};
- struct termios termios;
- init_termios(&termios);
uv_disable_stdio_inheritance();
int master;
int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
@@ -73,9 +74,6 @@ bool pty_process_spawn(PtyProcess *ptyproc)
if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {
goto error;
}
- if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) {
- goto error;
- }
ptyproc->tty_fd = master;
proc->pid = pid;
@@ -83,19 +81,8 @@ bool pty_process_spawn(PtyProcess *ptyproc)
error:
close(master);
-
- // terminate spawned process
- kill(pid, SIGTERM);
- int status, child;
- unsigned int try = 0;
- while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) {
- sleep(KILL_TIMEOUT);
- }
- if (child != pid) {
- kill(pid, SIGKILL);
- waitpid(pid, NULL, 0);
- }
-
+ kill(pid, SIGKILL);
+ waitpid(pid, NULL, 0);
return false;
}
@@ -152,7 +139,6 @@ static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
{
- memset(termios, 0, sizeof(struct termios));
// Taken from pangoterm
termios->c_iflag = ICRNL|IXON;
termios->c_oflag = OPOST|ONLCR;
diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h
index a12b5489c5..446d7fd3c8 100644
--- a/src/nvim/event/pty_process.h
+++ b/src/nvim/event/pty_process.h
@@ -13,10 +13,10 @@ typedef struct pty_process {
int tty_fd;
} PtyProcess;
-static inline PtyProcess pty_process_init(void *data)
+static inline PtyProcess pty_process_init(Loop *loop, void *data)
{
PtyProcess rv;
- rv.process = process_init(kProcessTypePty, data);
+ rv.process = process_init(loop, kProcessTypePty, data);
rv.term_name = NULL;
rv.width = 80;
rv.height = 24;
diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c
new file mode 100644
index 0000000000..19eca14144
--- /dev/null
+++ b/src/nvim/event/queue.c
@@ -0,0 +1,208 @@
+// Queue for selective async event processing. Instances of this queue support a
+// parent/child relationship with the following properties:
+//
+// - pushing a node to a child queue will push a corresponding link node to the
+// parent queue
+// - removing a link node from a parent queue will remove the next node
+// in the linked child queue
+// - removing a node from a child queue will remove the corresponding link node
+// in the parent queue
+//
+// These properties allow neovim to organize and process events from different
+// sources with a certain degree of control. Here's how the queue is used:
+//
+// +----------------+
+// | Main loop |
+// +----------------+
+// ^
+// |
+// +----------------+
+// +-------------->| Event loop |<------------+
+// | +--+-------------+ |
+// | ^ ^ |
+// | | | |
+// +-----------+ +-----------+ +---------+ +---------+
+// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 |
+// +-----------+ +-----------+ +---------+ +---------+
+//
+//
+// In the above diagram, the lower boxes represents event emitters, each with
+// it's own private queue that have the event loop queue as the parent.
+//
+// When idle, the main loop spins the event loop which queues events from many
+// sources(channels, jobs, user...). Each event emitter pushes events to its own
+// private queue which is propagated to the event loop queue. When the main loop
+// consumes an event, the corresponding event is removed from the emitter's
+// queue.
+//
+// The main reason for this queue hierarchy is to allow focusing on a single
+// event emitter while blocking the main loop. For example, if the `jobwait`
+// vimscript function is called on job1, the main loop will temporarily stop
+// polling the event loop queue and poll job1 queue instead. Same with channels,
+// when calling `rpcrequest`, we want to temporarily stop processing events from
+// other sources and focus on a specific channel.
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+
+#include <uv.h>
+
+#include "nvim/event/queue.h"
+#include "nvim/memory.h"
+#include "nvim/os/time.h"
+
+typedef struct queue_item QueueItem;
+struct queue_item {
+ union {
+ Queue *queue;
+ struct {
+ Event event;
+ QueueItem *parent;
+ } item;
+ } data;
+ bool link; // this is just a link to a node in a child queue
+ QUEUE node;
+};
+
+struct queue {
+ Queue *parent;
+ QUEUE headtail;
+ put_callback put_cb;
+ void *data;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/queue.c.generated.h"
+#endif
+
+static Event NILEVENT = {.handler = NULL, .argv = {NULL}};
+
+Queue *queue_new_parent(put_callback put_cb, void *data)
+{
+ return queue_new(NULL, put_cb, data);
+}
+
+Queue *queue_new_child(Queue *parent)
+ FUNC_ATTR_NONNULL_ALL
+{
+ assert(!parent->parent);
+ return queue_new(parent, NULL, NULL);
+}
+
+static Queue *queue_new(Queue *parent, put_callback put_cb, void *data)
+{
+ Queue *rv = xmalloc(sizeof(Queue));
+ QUEUE_INIT(&rv->headtail);
+ rv->parent = parent;
+ rv->put_cb = put_cb;
+ rv->data = data;
+ return rv;
+}
+
+void queue_free(Queue *queue)
+{
+ assert(queue);
+ if (queue->parent) {
+ while (!QUEUE_EMPTY(&queue->headtail)) {
+ QUEUE *q = QUEUE_HEAD(&queue->headtail);
+ QueueItem *item = queue_node_data(q);
+ assert(!item->link);
+ QUEUE_REMOVE(&item->data.item.parent->node);
+ xfree(item->data.item.parent);
+ QUEUE_REMOVE(q);
+ xfree(item);
+ }
+ }
+
+ xfree(queue);
+}
+
+Event queue_get(Queue *queue)
+{
+ return queue_empty(queue) ? NILEVENT : queue_remove(queue);
+}
+
+void queue_put_event(Queue *queue, Event event)
+{
+ assert(queue);
+ assert(queue->parent); // don't push directly to the parent queue
+ queue_push(queue, event);
+ if (queue->parent->put_cb) {
+ queue->parent->put_cb(queue->parent, queue->parent->data);
+ }
+}
+
+void queue_process_events(Queue *queue)
+{
+ assert(queue);
+ while (!queue_empty(queue)) {
+ Event event = queue_get(queue);
+ if (event.handler) {
+ event.handler(event.argv);
+ }
+ }
+}
+
+bool queue_empty(Queue *queue)
+{
+ assert(queue);
+ return QUEUE_EMPTY(&queue->headtail);
+}
+
+void queue_replace_parent(Queue *queue, Queue *new_parent)
+{
+ assert(queue_empty(queue));
+ queue->parent = new_parent;
+}
+
+static Event queue_remove(Queue *queue)
+{
+ assert(!queue_empty(queue));
+ QUEUE *h = QUEUE_HEAD(&queue->headtail);
+ QUEUE_REMOVE(h);
+ QueueItem *item = queue_node_data(h);
+ Event rv;
+
+ if (item->link) {
+ assert(!queue->parent);
+ // remove the next node in the linked queue
+ Queue *linked = item->data.queue;
+ assert(!queue_empty(linked));
+ QueueItem *child =
+ queue_node_data(QUEUE_HEAD(&linked->headtail));
+ QUEUE_REMOVE(&child->node);
+ rv = child->data.item.event;
+ xfree(child);
+ } else {
+ assert(queue->parent);
+ assert(!queue_empty(queue->parent));
+ // remove the corresponding link node in the parent queue
+ QUEUE_REMOVE(&item->data.item.parent->node);
+ xfree(item->data.item.parent);
+ rv = item->data.item.event;
+ }
+
+ xfree(item);
+ return rv;
+}
+
+static void queue_push(Queue *queue, Event event)
+{
+ QueueItem *item = xmalloc(sizeof(QueueItem));
+ item->link = false;
+ item->data.item.event = event;
+ QUEUE_INSERT_TAIL(&queue->headtail, &item->node);
+ // push link node to the parent queue
+ item->data.item.parent = xmalloc(sizeof(QueueItem));
+ item->data.item.parent->link = true;
+ item->data.item.parent->data.queue = queue;
+ QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node);
+}
+
+static QueueItem *queue_node_data(QUEUE *q)
+{
+ return QUEUE_DATA(q, QueueItem, node);
+}
diff --git a/src/nvim/event/queue.h b/src/nvim/event/queue.h
new file mode 100644
index 0000000000..85fc59f8b2
--- /dev/null
+++ b/src/nvim/event/queue.h
@@ -0,0 +1,19 @@
+#ifndef NVIM_EVENT_QUEUE_H
+#define NVIM_EVENT_QUEUE_H
+
+#include <uv.h>
+
+#include "nvim/event/defs.h"
+#include "nvim/lib/queue.h"
+
+typedef struct queue Queue;
+typedef void (*put_callback)(Queue *queue, void *data);
+
+#define queue_put(q, h, ...) \
+ queue_put_event(q, event_create(1, h, __VA_ARGS__));
+
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/queue.h.generated.h"
+#endif
+#endif // NVIM_EVENT_QUEUE_H
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 9d2439ac2b..0a720bb852 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -49,6 +49,7 @@ void rstream_init(Stream *stream, size_t bufsize)
///
/// @param stream The `Stream` instance
void rstream_start(Stream *stream, stream_read_cb cb)
+ FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
if (stream->uvstream) {
@@ -62,6 +63,7 @@ void rstream_start(Stream *stream, stream_read_cb cb)
///
/// @param stream The `Stream` instance
void rstream_stop(Stream *stream)
+ FUNC_ATTR_NONNULL_ALL
{
if (stream->uvstream) {
uv_read_stop(stream->uvstream);
@@ -112,7 +114,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(uvstream);
- invoke_read_cb(stream, true);
+ invoke_read_cb(stream, 0, true);
}
return;
}
@@ -122,7 +124,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
- invoke_read_cb(stream, false);
+ invoke_read_cb(stream, nread, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -156,7 +158,7 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(&stream->uv.idle);
- invoke_read_cb(stream, true);
+ invoke_read_cb(stream, 0, true);
return;
}
@@ -164,12 +166,21 @@ static void fread_idle_cb(uv_idle_t *handle)
size_t nread = (size_t) req.result;
rbuffer_produced(stream->buffer, nread);
stream->fpos += nread;
- invoke_read_cb(stream, false);
+ invoke_read_cb(stream, nread, false);
}
-static void invoke_read_cb(Stream *stream, bool eof)
+static void read_event(void **argv)
{
+ Stream *stream = argv[0];
if (stream->read_cb) {
- stream->read_cb(stream, stream->buffer, stream->data, eof);
+ size_t count = (uintptr_t)argv[1];
+ bool eof = (uintptr_t)argv[2];
+ stream->read_cb(stream, stream->buffer, count, stream->data, eof);
}
}
+
+static void invoke_read_cb(Stream *stream, size_t count, bool eof)
+{
+ CREATE_EVENT(stream->events, read_event, 3, stream,
+ (void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
+}
diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c
index 63133b4f57..11ce15a882 100644
--- a/src/nvim/event/signal.c
+++ b/src/nvim/event/signal.c
@@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data)
watcher->uv.data = watcher;
watcher->data = data;
watcher->cb = NULL;
+ watcher->events = loop->fast_events;
}
void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum)
@@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb)
uv_close((uv_handle_t *)&watcher->uv, close_cb);
}
+static void signal_event(void **argv)
+{
+ SignalWatcher *watcher = argv[0];
+ watcher->cb(watcher, watcher->uv.signum, watcher->data);
+}
+
static void signal_watcher_cb(uv_signal_t *handle, int signum)
{
SignalWatcher *watcher = handle->data;
- watcher->cb(watcher, signum, watcher->data);
+ CREATE_EVENT(watcher->events, signal_event, 1, watcher);
}
static void close_cb(uv_handle_t *handle)
diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h
index c269fa9d95..e32608acc0 100644
--- a/src/nvim/event/signal.h
+++ b/src/nvim/event/signal.h
@@ -14,6 +14,7 @@ struct signal_watcher {
void *data;
signal_cb cb;
signal_close_cb close_cb;
+ Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index bdc632abf0..347e464d25 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher,
watcher->stream->data = watcher;
watcher->cb = NULL;
watcher->close_cb = NULL;
+ watcher->events = NULL;
}
int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
@@ -113,6 +114,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
}
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
@@ -142,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb)
uv_close((uv_handle_t *)watcher->stream, close_cb);
}
+static void connection_event(void **argv)
+{
+ SocketWatcher *watcher = argv[0];
+ int status = (int)(uintptr_t)(argv[1]);
+ watcher->cb(watcher, status, watcher->data);
+}
+
static void connection_cb(uv_stream_t *handle, int status)
{
SocketWatcher *watcher = handle->data;
- watcher->cb(watcher, status, watcher->data);
+ CREATE_EVENT(watcher->events, connection_event, 2, watcher,
+ (void *)(uintptr_t)status);
}
static void close_cb(uv_handle_t *handle)
diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h
index 17fd39f33b..ad59fdbe3a 100644
--- a/src/nvim/event/socket.h
+++ b/src/nvim/event/socket.h
@@ -30,6 +30,7 @@ struct socket_watcher {
void *data;
socket_cb cb;
socket_close_cb close_cb;
+ Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 959b532146..6caad6fdcc 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -32,6 +32,7 @@ int stream_set_blocking(int fd, bool blocking)
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
void *data)
+ FUNC_ATTR_NONNULL_ARG(2)
{
stream->uvstream = uvstream;
@@ -55,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
if (stream->uvstream) {
stream->uvstream->data = stream;
+ loop = stream->uvstream->loop->data;
}
stream->data = data;
@@ -69,16 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->internal_close_cb = NULL;
stream->closed = false;
stream->buffer = NULL;
+ stream->events = NULL;
}
void stream_close(Stream *stream, stream_close_cb on_stream_close)
+ FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
-
- if (stream->buffer) {
- rbuffer_free(stream->buffer);
- }
-
stream->closed = true;
stream->close_cb = on_stream_close;
@@ -88,6 +87,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close)
}
void stream_close_handle(Stream *stream)
+ FUNC_ATTR_NONNULL_ALL
{
if (stream->uvstream) {
uv_close((uv_handle_t *)stream->uvstream, close_cb);
@@ -99,6 +99,9 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
+ if (stream->buffer) {
+ rbuffer_free(stream->buffer);
+ }
if (stream->close_cb) {
stream->close_cb(stream, stream->data);
}
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index 37410b2036..c6baac0db7 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -14,10 +14,14 @@ typedef struct stream Stream;
///
/// @param stream The Stream instance
/// @param rbuffer The associated RBuffer instance
+/// @param count Number of bytes to read. This must be respected if keeping
+/// the order of events is a requirement. This is because events
+/// may be queued and only processed later when more data is copied
+/// into to the buffer, so one read may starve another.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
-typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data,
- bool eof);
+typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof);
/// Type of function called when the Stream has information about a write
/// request.
@@ -47,6 +51,7 @@ struct stream {
size_t pending_reqs;
void *data, *internal_data;
bool closed;
+ Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c
index ce33cdfc10..7bf333bcea 100644
--- a/src/nvim/event/time.c
+++ b/src/nvim/event/time.c
@@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data)
uv_timer_init(&loop->uv, &watcher->uv);
watcher->uv.data = watcher;
watcher->data = data;
+ watcher->events = loop->fast_events;
}
void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout,
@@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb)
uv_close((uv_handle_t *)&watcher->uv, close_cb);
}
+static void time_event(void **argv)
+{
+ TimeWatcher *watcher = argv[0];
+ watcher->cb(watcher, watcher->data);
+}
+
static void time_watcher_cb(uv_timer_t *handle)
FUNC_ATTR_NONNULL_ALL
{
TimeWatcher *watcher = handle->data;
- watcher->cb(watcher, watcher->data);
+ CREATE_EVENT(watcher->events, time_event, 1, watcher);
}
static void close_cb(uv_handle_t *handle)
diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h
index ee50e53d11..7882b2b627 100644
--- a/src/nvim/event/time.h
+++ b/src/nvim/event/time.h
@@ -12,6 +12,7 @@ struct time_watcher {
uv_timer_t uv;
void *data;
time_cb cb, close_cb;
+ Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h
index a17f1446b3..5ee73044b5 100644
--- a/src/nvim/event/uv_process.h
+++ b/src/nvim/event/uv_process.h
@@ -12,10 +12,10 @@ typedef struct uv_process {
uv_stdio_container_t uvstdio[3];
} UvProcess;
-static inline UvProcess uv_process_init(void *data)
+static inline UvProcess uv_process_init(Loop *loop, void *data)
{
UvProcess rv;
- rv.process = process_init(kProcessTypeUv, data);
+ rv.process = process_init(loop, kProcessTypeUv, data);
return rv;
}
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index 5fcb724fe3..8028e35e6b 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -118,6 +118,7 @@ WBuffer *wstream_new_buffer(char *data,
size_t size,
size_t refcount,
wbuffer_data_finalizer cb)
+ FUNC_ATTR_NONNULL_ARG(1)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
@@ -151,6 +152,7 @@ static void write_cb(uv_write_t *req, int status)
}
void wstream_release_wbuffer(WBuffer *buffer)
+ FUNC_ATTR_NONNULL_ALL
{
if (!--buffer->refcount) {
if (buffer->cb) {