aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-07-16 23:10:15 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:19:55 -0300
commitac2bd0256183fe4255e5fcccf37f860f037d43a6 (patch)
tree09bcdf6eefd9b18a58159f06a20b97c938d4f367 /src/nvim/event
parent991d3ec1e679bb6407f2a5820910d2968424183c (diff)
downloadrneovim-ac2bd0256183fe4255e5fcccf37f860f037d43a6.tar.gz
rneovim-ac2bd0256183fe4255e5fcccf37f860f037d43a6.tar.bz2
rneovim-ac2bd0256183fe4255e5fcccf37f860f037d43a6.zip
rstream/wstream: Unify structures and simplify API
- Simplify RStream/WStream API and make it more consistent with libuv. - Move into the event loop layer(event subdirectory) - Remove uv_helpers module. - Simplify job/process internal modules/API. - Unify RStream and WStream into a single structure. This is necessary because libuv streams can be readable and writable at the same time(and because the uv_helpers.c hack to associate multiple streams with libuv handle was removed) - Make struct definition public, allowing more flexible/simple memory management by users of the module. - Adapt channel/job modules to cope with the changes.
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/rstream.c167
-rw-r--r--src/nvim/event/rstream.h16
-rw-r--r--src/nvim/event/stream.c108
-rw-r--r--src/nvim/event/stream.h55
-rw-r--r--src/nvim/event/wstream.c162
-rw-r--r--src/nvim/event/wstream.h24
6 files changed, 532 insertions, 0 deletions
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
new file mode 100644
index 0000000000..78c044347f
--- /dev/null
+++ b/src/nvim/event/rstream.c
@@ -0,0 +1,167 @@
+#include <assert.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/event/rstream.h"
+#include "nvim/ascii.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+#include "nvim/log.h"
+#include "nvim/misc1.h"
+#include "nvim/event/loop.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/rstream.c.generated.h"
+#endif
+
+void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(loop, stream, fd, NULL, data);
+ rstream_init(stream, bufsize);
+}
+
+void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(NULL, stream, -1, uvstream, data);
+ rstream_init(stream, bufsize);
+}
+
+void rstream_init(Stream *stream, size_t bufsize)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ stream->buffer = rbuffer_new(bufsize);
+ stream->buffer->data = stream;
+ stream->buffer->full_cb = on_rbuffer_full;
+ stream->buffer->nonfull_cb = on_rbuffer_nonfull;
+}
+
+
+/// Starts watching for events from a `Stream` instance.
+///
+/// @param stream The `Stream` instance
+void rstream_start(Stream *stream, stream_read_cb cb)
+{
+ stream->read_cb = cb;
+ if (stream->uvstream) {
+ uv_read_start(stream->uvstream, alloc_cb, read_cb);
+ } else {
+ uv_idle_start(&stream->uv.idle, fread_idle_cb);
+ }
+}
+
+/// Stops watching for events from a `Stream` instance.
+///
+/// @param stream The `Stream` instance
+void rstream_stop(Stream *stream)
+{
+ if (stream->uvstream) {
+ uv_read_stop(stream->uvstream);
+ } else {
+ uv_idle_stop(&stream->uv.idle);
+ }
+}
+
+static void on_rbuffer_full(RBuffer *buf, void *data)
+{
+ rstream_stop(data);
+}
+
+static void on_rbuffer_nonfull(RBuffer *buf, void *data)
+{
+ Stream *stream = data;
+ assert(stream->read_cb);
+ rstream_start(stream, stream->read_cb);
+}
+
+// Callbacks used by libuv
+
+// Called by libuv to allocate memory for reading.
+static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
+{
+ Stream *stream = handle->data;
+ buf->base = rbuffer_write_ptr(stream->buffer, &buf->len);
+}
+
+// Callback invoked by libuv after it copies the data into the buffer provided
+// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
+// 0-length buffer.
+static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
+{
+ Stream *stream = uvstream->data;
+
+ if (cnt <= 0) {
+ if (cnt != UV_ENOBUFS
+ // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
+ // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
+ //
+ // We don't need to do anything with the RBuffer because the next call
+ // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
+ // won't be called)
+ && cnt != 0) {
+ DLOG("Closing Stream(%p) because of %s(%zd)", stream,
+ uv_strerror((int)cnt), cnt);
+ // Read error or EOF, either way stop the stream and invoke the callback
+ // with eof == true
+ uv_read_stop(uvstream);
+ stream->read_cb(stream, stream->buffer, stream->data, true);
+ }
+ return;
+ }
+
+ // at this point we're sure that cnt is positive, no error occurred
+ size_t nread = (size_t)cnt;
+ // Data was already written, so all we need is to update 'wpos' to reflect
+ // the space actually used in the buffer.
+ rbuffer_produced(stream->buffer, nread);
+ stream->read_cb(stream, stream->buffer, stream->data, false);
+}
+
+// Called by the by the 'idle' handle to emulate a reading event
+static void fread_idle_cb(uv_idle_t *handle)
+{
+ uv_fs_t req;
+ Stream *stream = handle->data;
+
+ stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len);
+
+ // the offset argument to uv_fs_read is int64_t, could someone really try
+ // to read more than 9 quintillion (9e18) bytes?
+ // upcast is meant to avoid tautological condition warning on 32 bits
+ uintmax_t fpos_intmax = stream->fpos;
+ if (fpos_intmax > INT64_MAX) {
+ ELOG("stream offset overflow");
+ preserve_exit();
+ }
+
+ // Synchronous read
+ uv_fs_read(
+ handle->loop,
+ &req,
+ stream->fd,
+ &stream->uvbuf,
+ 1,
+ (int64_t) stream->fpos,
+ NULL);
+
+ uv_fs_req_cleanup(&req);
+
+ if (req.result <= 0) {
+ uv_idle_stop(&stream->uv.idle);
+ stream->read_cb(stream, stream->buffer, stream->data, true);
+ return;
+ }
+
+ // no errors (req.result (ssize_t) is positive), it's safe to cast.
+ size_t nread = (size_t) req.result;
+ rbuffer_produced(stream->buffer, nread);
+ stream->fpos += nread;
+}
diff --git a/src/nvim/event/rstream.h b/src/nvim/event/rstream.h
new file mode 100644
index 0000000000..f30ad79ee5
--- /dev/null
+++ b/src/nvim/event/rstream.h
@@ -0,0 +1,16 @@
+#ifndef NVIM_EVENT_RSTREAM_H
+#define NVIM_EVENT_RSTREAM_H
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/stream.h"
+
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/rstream.h.generated.h"
+#endif
+#endif // NVIM_EVENT_RSTREAM_H
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
new file mode 100644
index 0000000000..959b532146
--- /dev/null
+++ b/src/nvim/event/stream.c
@@ -0,0 +1,108 @@
+#include <assert.h>
+#include <stdio.h>
+#include <stdbool.h>
+
+#include <uv.h>
+
+#include "nvim/rbuffer.h"
+#include "nvim/event/stream.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/stream.c.generated.h"
+#endif
+
+/// Sets the stream associated with `fd` to "blocking" mode.
+///
+/// @return `0` on success, or `-errno` on failure.
+int stream_set_blocking(int fd, bool blocking)
+{
+ // Private loop to avoid conflict with existing watcher(s):
+ // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
+ uv_loop_t loop;
+ uv_pipe_t stream;
+ uv_loop_init(&loop);
+ uv_pipe_init(&loop, &stream, 0);
+ uv_pipe_open(&stream, fd);
+ int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking);
+ uv_close((uv_handle_t *)&stream, NULL);
+ uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt.
+ uv_loop_close(&loop);
+ return retval;
+}
+
+void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
+ void *data)
+{
+ stream->uvstream = uvstream;
+
+ if (fd >= 0) {
+ uv_handle_type type = uv_guess_handle(fd);
+ stream->fd = fd;
+
+ if (type == UV_FILE) {
+ // Non-blocking file reads are simulated with an idle handle that reads in
+ // chunks of the ring buffer size, giving time for other events to be
+ // processed between reads.
+ uv_idle_init(&loop->uv, &stream->uv.idle);
+ stream->uv.idle.data = stream;
+ } else {
+ assert(type == UV_NAMED_PIPE || type == UV_TTY);
+ uv_pipe_init(&loop->uv, &stream->uv.pipe, 0);
+ uv_pipe_open(&stream->uv.pipe, fd);
+ stream->uvstream = (uv_stream_t *)&stream->uv.pipe;
+ }
+ }
+
+ if (stream->uvstream) {
+ stream->uvstream->data = stream;
+ }
+
+ stream->data = data;
+ stream->internal_data = NULL;
+ stream->fpos = 0;
+ stream->curmem = 0;
+ stream->maxmem = 0;
+ stream->pending_reqs = 0;
+ stream->read_cb = NULL;
+ stream->write_cb = NULL;
+ stream->close_cb = NULL;
+ stream->internal_close_cb = NULL;
+ stream->closed = false;
+ stream->buffer = NULL;
+}
+
+void stream_close(Stream *stream, stream_close_cb on_stream_close)
+{
+ assert(!stream->closed);
+
+ if (stream->buffer) {
+ rbuffer_free(stream->buffer);
+ }
+
+ stream->closed = true;
+ stream->close_cb = on_stream_close;
+
+ if (!stream->pending_reqs) {
+ stream_close_handle(stream);
+ }
+}
+
+void stream_close_handle(Stream *stream)
+{
+ if (stream->uvstream) {
+ uv_close((uv_handle_t *)stream->uvstream, close_cb);
+ } else {
+ uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
+ }
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ Stream *stream = handle->data;
+ if (stream->close_cb) {
+ stream->close_cb(stream, stream->data);
+ }
+ if (stream->internal_close_cb) {
+ stream->internal_close_cb(stream, stream->internal_data);
+ }
+}
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
new file mode 100644
index 0000000000..37410b2036
--- /dev/null
+++ b/src/nvim/event/stream.h
@@ -0,0 +1,55 @@
+#ifndef NVIM_EVENT_STREAM_H
+#define NVIM_EVENT_STREAM_H
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/rbuffer.h"
+
+typedef struct stream Stream;
+/// Type of function called when the Stream buffer is filled with data
+///
+/// @param stream The Stream instance
+/// @param rbuffer The associated RBuffer instance
+/// @param data User-defined data
+/// @param eof If the stream reached EOF.
+typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data,
+ bool eof);
+
+/// Type of function called when the Stream has information about a write
+/// request.
+///
+/// @param wstream The Stream instance
+/// @param data User-defined data
+/// @param status 0 on success, anything else indicates failure
+typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
+typedef void (*stream_close_cb)(Stream *stream, void *data);
+
+struct stream {
+ union {
+ uv_pipe_t pipe;
+ uv_tcp_t tcp;
+ uv_idle_t idle;
+ } uv;
+ uv_stream_t *uvstream;
+ uv_buf_t uvbuf;
+ RBuffer *buffer;
+ uv_file fd;
+ stream_read_cb read_cb;
+ stream_write_cb write_cb;
+ stream_close_cb close_cb, internal_close_cb;
+ size_t fpos;
+ size_t curmem;
+ size_t maxmem;
+ size_t pending_reqs;
+ void *data, *internal_data;
+ bool closed;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/stream.h.generated.h"
+#endif
+#endif // NVIM_EVENT_STREAM_H
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
new file mode 100644
index 0000000000..5fcb724fe3
--- /dev/null
+++ b/src/nvim/event/wstream.c
@@ -0,0 +1,162 @@
+#include <assert.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/wstream.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+
+#define DEFAULT_MAXMEM 1024 * 1024 * 10
+
+typedef struct {
+ Stream *stream;
+ WBuffer *buffer;
+ uv_write_t uv_req;
+} WRequest;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/wstream.c.generated.h"
+#endif
+
+void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(loop, stream, fd, NULL, data);
+ wstream_init(stream, maxmem);
+}
+
+void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(NULL, stream, -1, uvstream, data);
+ wstream_init(stream, maxmem);
+}
+
+void wstream_init(Stream *stream, size_t maxmem)
+{
+ stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM;
+}
+
+/// Sets a callback that will be called on completion of a write request,
+/// indicating failure/success.
+///
+/// This affects all requests currently in-flight as well. Overwrites any
+/// possible earlier callback.
+///
+/// @note This callback will not fire if the write request couldn't even be
+/// queued properly (i.e.: when `wstream_write() returns an error`).
+///
+/// @param stream The `Stream` instance
+/// @param cb The callback
+void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
+ FUNC_ATTR_NONNULL_ALL
+{
+ stream->write_cb = cb;
+}
+
+/// Queues data for writing to the backing file descriptor of a `Stream`
+/// instance. This will fail if the write would cause the Stream use more
+/// memory than specified by `maxmem`.
+///
+/// @param stream The `Stream` instance
+/// @param buffer The buffer which contains data to be written
+/// @return false if the write failed
+bool wstream_write(Stream *stream, WBuffer *buffer)
+ FUNC_ATTR_NONNULL_ALL
+{
+ assert(stream->maxmem);
+ // This should not be called after a stream was freed
+ assert(!stream->closed);
+
+ if (stream->curmem > stream->maxmem) {
+ goto err;
+ }
+
+ stream->curmem += buffer->size;
+
+ WRequest *data = xmalloc(sizeof(WRequest));
+ data->stream = stream;
+ data->buffer = buffer;
+ data->uv_req.data = data;
+
+ uv_buf_t uvbuf;
+ uvbuf.base = buffer->data;
+ uvbuf.len = buffer->size;
+
+ if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) {
+ xfree(data);
+ goto err;
+ }
+
+ stream->pending_reqs++;
+ return true;
+
+err:
+ wstream_release_wbuffer(buffer);
+ return false;
+}
+
+/// Creates a WBuffer object for holding output data. Instances of this
+/// object can be reused across Stream instances, and the memory is freed
+/// automatically when no longer needed(it tracks the number of references
+/// internally)
+///
+/// @param data Data stored by the WBuffer
+/// @param size The size of the data array
+/// @param refcount The number of references for the WBuffer. This will be used
+/// by Stream instances to decide when a WBuffer should be freed.
+/// @param cb Pointer to function that will be responsible for freeing
+/// the buffer data(passing 'free' will work as expected).
+/// @return The allocated WBuffer instance
+WBuffer *wstream_new_buffer(char *data,
+ size_t size,
+ size_t refcount,
+ wbuffer_data_finalizer cb)
+{
+ WBuffer *rv = xmalloc(sizeof(WBuffer));
+ rv->size = size;
+ rv->refcount = refcount;
+ rv->cb = cb;
+ rv->data = data;
+
+ return rv;
+}
+
+static void write_cb(uv_write_t *req, int status)
+{
+ WRequest *data = req->data;
+
+ data->stream->curmem -= data->buffer->size;
+
+ wstream_release_wbuffer(data->buffer);
+
+ if (data->stream->write_cb) {
+ data->stream->write_cb(data->stream, data->stream->data, status);
+ }
+
+ data->stream->pending_reqs--;
+
+ if (data->stream->closed && data->stream->pending_reqs == 0) {
+ // Last pending write, free the stream;
+ stream_close_handle(data->stream);
+ }
+
+ xfree(data);
+}
+
+void wstream_release_wbuffer(WBuffer *buffer)
+{
+ if (!--buffer->refcount) {
+ if (buffer->cb) {
+ buffer->cb(buffer->data);
+ }
+
+ xfree(buffer);
+ }
+}
diff --git a/src/nvim/event/wstream.h b/src/nvim/event/wstream.h
new file mode 100644
index 0000000000..9008de0d97
--- /dev/null
+++ b/src/nvim/event/wstream.h
@@ -0,0 +1,24 @@
+#ifndef NVIM_EVENT_WSTREAM_H
+#define NVIM_EVENT_WSTREAM_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/stream.h"
+
+typedef struct wbuffer WBuffer;
+typedef void (*wbuffer_data_finalizer)(void *data);
+
+struct wbuffer {
+ size_t size, refcount;
+ char *data;
+ wbuffer_data_finalizer cb;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/wstream.h.generated.h"
+#endif
+#endif // NVIM_EVENT_WSTREAM_H