aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/rstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/rstream.c')
-rw-r--r--src/nvim/os/rstream.c297
1 files changed, 297 insertions, 0 deletions
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
new file mode 100644
index 0000000000..e14075c8db
--- /dev/null
+++ b/src/nvim/os/rstream.c
@@ -0,0 +1,297 @@
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "os/uv_helpers.h"
+#include "os/rstream_defs.h"
+#include "os/rstream.h"
+#include "os/event_defs.h"
+#include "os/event.h"
+#include "vim.h"
+#include "memory.h"
+
+struct rstream {
+ uv_buf_t uvbuf;
+ void *data;
+ char *buffer;
+ uv_stream_t *stream;
+ uv_idle_t *fread_idle;
+ uv_handle_type file_type;
+ uv_file fd;
+ rstream_cb cb;
+ size_t buffer_size, rpos, wpos, fpos;
+ bool reading, free_handle, async;
+};
+
+// Callbacks used by libuv
+static void alloc_cb(uv_handle_t *, size_t, uv_buf_t *);
+static void read_cb(uv_stream_t *, ssize_t, const uv_buf_t *);
+static void fread_idle_cb(uv_idle_t *);
+static void close_cb(uv_handle_t *handle);
+static void emit_read_event(RStream *rstream, bool eof);
+
+RStream * rstream_new(rstream_cb cb,
+ uint32_t buffer_size,
+ void *data,
+ bool async)
+{
+ RStream *rv = xmalloc(sizeof(RStream));
+ rv->buffer = xmalloc(buffer_size);
+ rv->buffer_size = buffer_size;
+ rv->data = data;
+ rv->async = async;
+ rv->cb = cb;
+ rv->rpos = rv->wpos = rv->fpos = 0;
+ rv->stream = NULL;
+ rv->fread_idle = NULL;
+ rv->free_handle = false;
+ rv->file_type = UV_UNKNOWN_HANDLE;
+
+ return rv;
+}
+
+void rstream_free(RStream *rstream)
+{
+ if (rstream->free_handle) {
+ if (rstream->fread_idle != NULL) {
+ uv_close((uv_handle_t *)rstream->fread_idle, close_cb);
+ } else {
+ uv_close((uv_handle_t *)rstream->stream, close_cb);
+ }
+ }
+
+ free(rstream->buffer);
+ free(rstream);
+}
+
+void rstream_set_stream(RStream *rstream, uv_stream_t *stream)
+{
+ handle_set_rstream((uv_handle_t *)stream, rstream);
+ rstream->stream = stream;
+}
+
+void rstream_set_file(RStream *rstream, uv_file file)
+{
+ rstream->file_type = uv_guess_handle(file);
+
+ if (rstream->free_handle) {
+ // If this is the second time we're calling this function, free the
+ // previously allocated memory
+ if (rstream->fread_idle != NULL) {
+ uv_close((uv_handle_t *)rstream->fread_idle, close_cb);
+ } else {
+ uv_close((uv_handle_t *)rstream->stream, close_cb);
+ }
+ }
+
+ if (rstream->file_type == UV_FILE) {
+ // Non-blocking file reads are simulated with a idle handle that reads
+ // in chunks of rstream->buffer_size, giving time for other events to
+ // be processed between reads.
+ rstream->fread_idle = xmalloc(sizeof(uv_idle_t));
+ uv_idle_init(uv_default_loop(), rstream->fread_idle);
+ rstream->fread_idle->data = NULL;
+ handle_set_rstream((uv_handle_t *)rstream->fread_idle, rstream);
+ } else {
+ // Only pipes are supported for now
+ assert(rstream->file_type == UV_NAMED_PIPE
+ || rstream->file_type == UV_TTY);
+ rstream->stream = xmalloc(sizeof(uv_pipe_t));
+ uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0);
+ uv_pipe_open((uv_pipe_t *)rstream->stream, file);
+ rstream->stream->data = NULL;
+ handle_set_rstream((uv_handle_t *)rstream->stream, rstream);
+ }
+
+ rstream->fd = file;
+ rstream->free_handle = true;
+}
+
+bool rstream_is_regular_file(RStream *rstream)
+{
+ return rstream->file_type == UV_FILE;
+}
+
+void rstream_start(RStream *rstream)
+{
+ if (rstream->file_type == UV_FILE) {
+ uv_idle_start(rstream->fread_idle, fread_idle_cb);
+ } else {
+ rstream->reading = false;
+ uv_read_start(rstream->stream, alloc_cb, read_cb);
+ }
+}
+
+void rstream_stop(RStream *rstream)
+{
+ if (rstream->file_type == UV_FILE) {
+ uv_idle_stop(rstream->fread_idle);
+ } else {
+ uv_read_stop(rstream->stream);
+ }
+}
+
+size_t rstream_read(RStream *rstream, char *buf, uint32_t count)
+{
+ size_t read_count = rstream->wpos - rstream->rpos;
+
+ if (count < read_count) {
+ read_count = count;
+ }
+
+ if (read_count > 0) {
+ memcpy(buf, rstream->buffer + rstream->rpos, read_count);
+ rstream->rpos += read_count;
+ }
+
+ if (rstream->wpos == rstream->buffer_size) {
+ // `wpos` is at the end of the buffer, so free some space by moving unread
+ // data...
+ memmove(
+ rstream->buffer, // ...To the beginning of the buffer(rpos 0)
+ rstream->buffer + rstream->rpos, // ...From the first unread position
+ rstream->wpos - rstream->rpos); // ...By the number of unread bytes
+ rstream->wpos -= rstream->rpos;
+ rstream->rpos = 0;
+
+ if (rstream->wpos < rstream->buffer_size) {
+ // Restart reading since we have freed some space
+ rstream_start(rstream);
+ }
+ }
+
+ return read_count;
+}
+
+size_t rstream_available(RStream *rstream)
+{
+ return rstream->wpos - rstream->rpos;
+}
+
+void rstream_read_event(Event event)
+{
+ RStream *rstream = event.data.rstream.ptr;
+
+ rstream->cb(rstream, rstream->data, event.data.rstream.eof);
+}
+
+// Called by libuv to allocate memory for reading.
+static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
+{
+ RStream *rstream = handle_get_rstream(handle);
+
+ if (rstream->reading) {
+ buf->len = 0;
+ return;
+ }
+
+ buf->len = rstream->buffer_size - rstream->wpos;
+ buf->base = rstream->buffer + rstream->wpos;
+
+ // Avoid `alloc_cb`, `alloc_cb` sequences on windows
+ rstream->reading = true;
+}
+
+// Callback invoked by libuv after it copies the data into the buffer provided
+// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
+// 0-length buffer.
+static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
+{
+ RStream *rstream = handle_get_rstream((uv_handle_t *)stream);
+
+ if (cnt <= 0) {
+ if (cnt != UV_ENOBUFS) {
+ // Read error or EOF, either way stop the stream and invoke the callback
+ // with eof == true
+ uv_read_stop(stream);
+ emit_read_event(rstream, true);
+ }
+ return;
+ }
+
+ // at this point we're sure that cnt is positive, no error occurred
+ size_t nread = (size_t) cnt;
+
+ // Data was already written, so all we need is to update 'wpos' to reflect
+ // the space actually used in the buffer.
+ rstream->wpos += nread;
+
+ if (rstream->wpos == rstream->buffer_size) {
+ // The last read filled the buffer, stop reading for now
+ rstream_stop(rstream);
+ }
+
+ rstream->reading = false;
+ emit_read_event(rstream, false);
+}
+
+// Called by the by the 'idle' handle to emulate a reading event
+static void fread_idle_cb(uv_idle_t *handle)
+{
+ uv_fs_t req;
+ RStream *rstream = handle_get_rstream((uv_handle_t *)handle);
+
+ rstream->uvbuf.base = rstream->buffer + rstream->wpos;
+ rstream->uvbuf.len = rstream->buffer_size - rstream->wpos;
+
+ // the offset argument to uv_fs_read is int64_t, could someone really try
+ // to read more than 9 quintillion (9e18) bytes?
+ // upcast is meant to avoid tautological condition warning on 32 bits
+ uintmax_t fpos_intmax = rstream->fpos;
+ assert(fpos_intmax <= INT64_MAX);
+
+ // Synchronous read
+ uv_fs_read(
+ uv_default_loop(),
+ &req,
+ rstream->fd,
+ &rstream->uvbuf,
+ 1,
+ (int64_t) rstream->fpos,
+ NULL);
+
+ uv_fs_req_cleanup(&req);
+
+ if (req.result <= 0) {
+ uv_idle_stop(rstream->fread_idle);
+ emit_read_event(rstream, true);
+ return;
+ }
+
+ // no errors (req.result (ssize_t) is positive), it's safe to cast.
+ size_t nread = (size_t) req.result;
+
+ rstream->wpos += nread;
+ rstream->fpos += nread;
+
+ if (rstream->wpos == rstream->buffer_size) {
+ // The last read filled the buffer, stop reading for now
+ rstream_stop(rstream);
+ }
+
+ emit_read_event(rstream, false);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ free(handle->data);
+ free(handle);
+}
+
+static void emit_read_event(RStream *rstream, bool eof)
+{
+ if (rstream->async) {
+ Event event;
+
+ event.type = kEventRStreamData;
+ event.data.rstream.ptr = rstream;
+ event.data.rstream.eof = eof;
+ event_push(event);
+ } else {
+ // Invoke the callback passing in the number of bytes available and data
+ // associated with the stream
+ rstream->cb(rstream, rstream->data, eof);
+ }
+}