aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event/multiqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/event/multiqueue.c')
-rw-r--r--src/nvim/event/multiqueue.c230
1 files changed, 230 insertions, 0 deletions
diff --git a/src/nvim/event/multiqueue.c b/src/nvim/event/multiqueue.c
new file mode 100644
index 0000000000..79b4dd9458
--- /dev/null
+++ b/src/nvim/event/multiqueue.c
@@ -0,0 +1,230 @@
+// Multi-level queue for selective async event processing.
+// Not threadsafe; access must be synchronized externally.
+//
+// Multiqueue supports a parent-child relationship with these properties:
+// - pushing a node to a child queue will push a corresponding link node to the
+// parent queue
+// - removing a link node from a parent queue will remove the next node
+// in the linked child queue
+// - removing a node from a child queue will remove the corresponding link node
+// in the parent queue
+//
+// These properties allow Nvim to organize and process events from different
+// sources with a certain degree of control. How the multiqueue is used:
+//
+// +----------------+
+// | Main loop |
+// +----------------+
+//
+// +----------------+
+// +-------------->| Event loop |<------------+
+// | +--+-------------+ |
+// | ^ ^ |
+// | | | |
+// +-----------+ +-----------+ +---------+ +---------+
+// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 |
+// +-----------+ +-----------+ +---------+ +---------+
+//
+//
+// The lower boxes represent event emitters, each with its own private queue
+// having the event loop queue as the parent.
+//
+// When idle, the main loop spins the event loop which queues events from many
+// sources (channels, jobs, user...). Each event emitter pushes events to its
+// private queue which is propagated to the event loop queue. When the main loop
+// consumes an event, the corresponding event is removed from the emitter's
+// queue.
+//
+// The main reason for this queue hierarchy is to allow focusing on a single
+// event emitter while blocking the main loop. For example, if the `jobwait`
+// VimL function is called on job1, the main loop will temporarily stop polling
+// the event loop queue and poll job1 queue instead. Same with channels, when
+// calling `rpcrequest` we want to temporarily stop processing events from
+// other sources and focus on a specific channel.
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+
+#include <uv.h>
+
+#include "nvim/event/multiqueue.h"
+#include "nvim/memory.h"
+#include "nvim/os/time.h"
+
+typedef struct multiqueue_item MultiQueueItem;
+struct multiqueue_item {
+ union {
+ MultiQueue *queue;
+ struct {
+ Event event;
+ MultiQueueItem *parent_item;
+ } item;
+ } data;
+ bool link; // true: current item is just a link to a node in a child queue
+ QUEUE node;
+};
+
+struct multiqueue {
+ MultiQueue *parent;
+ QUEUE headtail; // circularly-linked
+ put_callback put_cb;
+ void *data;
+ size_t size;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/multiqueue.c.generated.h"
+#endif
+
+static Event NILEVENT = { .handler = NULL, .argv = {NULL} };
+
+MultiQueue *multiqueue_new_parent(put_callback put_cb, void *data)
+{
+ return multiqueue_new(NULL, put_cb, data);
+}
+
+MultiQueue *multiqueue_new_child(MultiQueue *parent)
+ FUNC_ATTR_NONNULL_ALL
+{
+ assert(!parent->parent); // parent cannot have a parent, more like a "root"
+ parent->size++;
+ return multiqueue_new(parent, NULL, NULL);
+}
+
+static MultiQueue *multiqueue_new(MultiQueue *parent, put_callback put_cb,
+ void *data)
+{
+ MultiQueue *rv = xmalloc(sizeof(MultiQueue));
+ QUEUE_INIT(&rv->headtail);
+ rv->size = 0;
+ rv->parent = parent;
+ rv->put_cb = put_cb;
+ rv->data = data;
+ return rv;
+}
+
+void multiqueue_free(MultiQueue *this)
+{
+ assert(this);
+ while (!QUEUE_EMPTY(&this->headtail)) {
+ QUEUE *q = QUEUE_HEAD(&this->headtail);
+ MultiQueueItem *item = multiqueue_node_data(q);
+ if (this->parent) {
+ QUEUE_REMOVE(&item->data.item.parent_item->node);
+ xfree(item->data.item.parent_item);
+ }
+ QUEUE_REMOVE(q);
+ xfree(item);
+ }
+
+ xfree(this);
+}
+
+Event multiqueue_get(MultiQueue *this)
+{
+ return multiqueue_empty(this) ? NILEVENT : multiqueue_remove(this);
+}
+
+void multiqueue_put_event(MultiQueue *this, Event event)
+{
+ assert(this);
+ multiqueue_push(this, event);
+ if (this->parent && this->parent->put_cb) {
+ this->parent->put_cb(this->parent, this->parent->data);
+ }
+}
+
+void multiqueue_process_events(MultiQueue *this)
+{
+ assert(this);
+ while (!multiqueue_empty(this)) {
+ Event event = multiqueue_get(this);
+ if (event.handler) {
+ event.handler(event.argv);
+ }
+ }
+}
+
+/// Removes all events without processing them.
+void multiqueue_purge_events(MultiQueue *this)
+{
+ assert(this);
+ while (!multiqueue_empty(this)) {
+ (void)multiqueue_remove(this);
+ }
+}
+
+bool multiqueue_empty(MultiQueue *this)
+{
+ assert(this);
+ return QUEUE_EMPTY(&this->headtail);
+}
+
+void multiqueue_replace_parent(MultiQueue *this, MultiQueue *new_parent)
+{
+ assert(multiqueue_empty(this));
+ this->parent = new_parent;
+}
+
+/// Gets the count of all events currently in the queue.
+size_t multiqueue_size(MultiQueue *this)
+{
+ return this->size;
+}
+
+static Event multiqueue_remove(MultiQueue *this)
+{
+ assert(!multiqueue_empty(this));
+ QUEUE *h = QUEUE_HEAD(&this->headtail);
+ QUEUE_REMOVE(h);
+ MultiQueueItem *item = multiqueue_node_data(h);
+ Event rv;
+
+ if (item->link) {
+ assert(!this->parent);
+ // remove the next node in the linked queue
+ MultiQueue *linked = item->data.queue;
+ assert(!multiqueue_empty(linked));
+ MultiQueueItem *child =
+ multiqueue_node_data(QUEUE_HEAD(&linked->headtail));
+ QUEUE_REMOVE(&child->node);
+ rv = child->data.item.event;
+ xfree(child);
+ } else {
+ if (this->parent) {
+ // remove the corresponding link node in the parent queue
+ QUEUE_REMOVE(&item->data.item.parent_item->node);
+ xfree(item->data.item.parent_item);
+ }
+ rv = item->data.item.event;
+ }
+
+ this->size--;
+ xfree(item);
+ return rv;
+}
+
+static void multiqueue_push(MultiQueue *this, Event event)
+{
+ MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem));
+ item->link = false;
+ item->data.item.event = event;
+ QUEUE_INSERT_TAIL(&this->headtail, &item->node);
+ if (this->parent) {
+ // push link node to the parent queue
+ item->data.item.parent_item = xmalloc(sizeof(MultiQueueItem));
+ item->data.item.parent_item->link = true;
+ item->data.item.parent_item->data.queue = this;
+ QUEUE_INSERT_TAIL(&this->parent->headtail,
+ &item->data.item.parent_item->node);
+ }
+ this->size++;
+}
+
+static MultiQueueItem *multiqueue_node_data(QUEUE *q)
+{
+ return QUEUE_DATA(q, MultiQueueItem, node);
+}