aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--clint-files.txt3
-rw-r--r--src/os/rstream.c232
-rw-r--r--src/os/rstream.h75
-rw-r--r--src/os/rstream_defs.h14
4 files changed, 324 insertions, 0 deletions
diff --git a/clint-files.txt b/clint-files.txt
index 8304059bc5..68c2358f85 100644
--- a/clint-files.txt
+++ b/clint-files.txt
@@ -6,6 +6,9 @@ src/os/event_defs.h
src/os/event.h
src/os/input.c
src/os/input.h
+src/os/rstream.c
+src/os/rstream_defs.h
+src/os/rstream.h
src/os/job.c
src/os/job_defs.h
src/os/job.h
diff --git a/src/os/rstream.c b/src/os/rstream.c
new file mode 100644
index 0000000000..1d3716284c
--- /dev/null
+++ b/src/os/rstream.c
@@ -0,0 +1,232 @@
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "os/rstream_defs.h"
+#include "os/rstream.h"
+#include "vim.h"
+#include "memory.h"
+
+struct rstream {
+ uv_buf_t uvbuf;
+ void *data;
+ char *buffer;
+ uv_stream_t *stream;
+ uv_idle_t *fread_idle;
+ uv_handle_type file_type;
+ uv_file fd;
+ rstream_cb cb;
+ uint32_t buffer_size, rpos, wpos, fpos;
+ bool reading, free_handle;
+};
+
+// Callbacks used by libuv
+static void alloc_cb(uv_handle_t *, size_t, uv_buf_t *);
+static void read_cb(uv_stream_t *, ssize_t, const uv_buf_t *);
+static void fread_idle_cb(uv_idle_t *);
+
+RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data)
+{
+ RStream *rv = xmalloc(sizeof(RStream));
+ rv->buffer = xmalloc(buffer_size);
+ rv->buffer_size = buffer_size;
+ rv->data = data;
+ rv->cb = cb;
+ rv->rpos = rv->wpos = rv->fpos = 0;
+ rv->stream = NULL;
+ rv->fread_idle = NULL;
+ rv->free_handle = false;
+
+ return rv;
+}
+
+void rstream_free(RStream *rstream)
+{
+ if (rstream->free_handle) {
+ if (rstream->fread_idle != NULL) {
+ uv_close((uv_handle_t *)rstream->fread_idle, NULL);
+ free(rstream->fread_idle);
+ } else {
+ uv_close((uv_handle_t *)rstream->stream, NULL);
+ free(rstream->stream);
+ }
+ }
+
+ free(rstream->buffer);
+ free(rstream);
+}
+
+void rstream_set_stream(RStream *rstream, uv_stream_t *stream)
+{
+ stream->data = rstream;
+ rstream->stream = stream;
+}
+
+void rstream_set_file(RStream *rstream, uv_file file)
+{
+ rstream->file_type = uv_guess_handle(file);
+
+ if (rstream->free_handle) {
+ // If this is the second time we're calling this function, free the
+ // previously allocated memory
+ if (rstream->fread_idle != NULL) {
+ uv_close((uv_handle_t *)rstream->fread_idle, NULL);
+ free(rstream->fread_idle);
+ } else {
+ uv_close((uv_handle_t *)rstream->stream, NULL);
+ free(rstream->stream);
+ }
+ }
+
+ if (rstream->file_type == UV_FILE) {
+ // Non-blocking file reads are simulated with a idle handle that reads
+ // in chunks of rstream->buffer_size, giving time for other events to
+ // be processed between reads.
+ rstream->fread_idle = xmalloc(sizeof(uv_idle_t));
+ uv_idle_init(uv_default_loop(), rstream->fread_idle);
+ rstream->fread_idle->data = rstream;
+ } else {
+ // Only pipes are supported for now
+ assert(rstream->file_type == UV_NAMED_PIPE
+ || rstream->file_type == UV_TTY);
+ rstream->stream = xmalloc(sizeof(uv_pipe_t));
+ uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0);
+ uv_pipe_open((uv_pipe_t *)rstream->stream, file);
+ rstream->stream->data = rstream;
+ }
+
+ rstream->fd = file;
+ rstream->free_handle = true;
+}
+
+bool rstream_is_regular_file(RStream *rstream)
+{
+ return rstream->file_type == UV_FILE;
+}
+
+void rstream_start(RStream *rstream)
+{
+ if (rstream->file_type == UV_FILE) {
+ uv_idle_start(rstream->fread_idle, fread_idle_cb);
+ } else {
+ rstream->reading = false;
+ uv_read_start(rstream->stream, alloc_cb, read_cb);
+ }
+}
+
+void rstream_stop(RStream *rstream)
+{
+ if (rstream->file_type == UV_FILE) {
+ uv_idle_stop(rstream->fread_idle);
+ } else {
+ uv_read_stop(rstream->stream);
+ }
+}
+
+uint32_t rstream_read(RStream *rstream, char *buf, uint32_t count)
+{
+ uint32_t read_count = rstream->wpos - rstream->rpos;
+
+ if (count < read_count) {
+ read_count = count;
+ }
+
+ if (read_count > 0) {
+ memcpy(buf, rstream->buffer + rstream->rpos, read_count);
+ rstream->rpos += read_count;
+ }
+
+ if (rstream->wpos == rstream->buffer_size) {
+ // `wpos` is at the end of the buffer, so free some space by moving unread
+ // data...
+ memmove(
+ rstream->buffer, // ...To the beginning of the buffer(rpos 0)
+ rstream->buffer + rstream->rpos, // ...From the first unread position
+ rstream->wpos - rstream->rpos); // ...By the number of unread bytes
+ rstream->wpos -= rstream->rpos;
+ rstream->rpos = 0;
+ }
+
+ return read_count;
+}
+
+uint32_t rstream_available(RStream *rstream)
+{
+ return rstream->wpos - rstream->rpos;
+}
+
+// Called by libuv to allocate memory for reading.
+static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
+{
+ RStream *rstream = handle->data;
+
+ if (rstream->reading) {
+ buf->len = 0;
+ return;
+ }
+
+ buf->base = rstream->buffer + rstream->wpos;
+ buf->len = rstream->buffer_size - rstream->wpos;
+ // Avoid `alloc_cb`, `alloc_cb` sequences on windows
+ rstream->reading = true;
+}
+
+// Callback invoked by libuv after it copies the data into the buffer provided
+// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
+// 0-length buffer.
+static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
+{
+ RStream *rstream = stream->data;
+
+ if (cnt <= 0) {
+ if (cnt != UV_ENOBUFS) {
+ // Read error or EOF, either way stop the stream and invoke the callback
+ // with eof == true
+ uv_read_stop(stream);
+ rstream->cb(rstream, rstream->data, true);
+ }
+ return;
+ }
+
+ // Data was already written, so all we need is to update 'wpos' to reflect
+ // the space actually used in the buffer.
+ rstream->wpos += cnt;
+ // Invoke the callback passing in the number of bytes available and data
+ // associated with the stream
+ rstream->cb(rstream, rstream->data, false);
+ rstream->reading = false;
+}
+
+// Called by the by the 'idle' handle to emulate a reading event
+static void fread_idle_cb(uv_idle_t *handle)
+{
+ uv_fs_t req;
+ RStream *rstream = handle->data;
+
+ rstream->uvbuf.base = rstream->buffer + rstream->wpos;
+ rstream->uvbuf.len = rstream->buffer_size - rstream->wpos;
+
+ // Synchronous read
+ uv_fs_read(
+ uv_default_loop(),
+ &req,
+ rstream->fd,
+ &rstream->uvbuf,
+ 1,
+ rstream->fpos,
+ NULL);
+
+ uv_fs_req_cleanup(&req);
+
+ if (req.result <= 0) {
+ uv_idle_stop(rstream->fread_idle);
+ rstream->cb(rstream, rstream->data, true);
+ return;
+ }
+
+ rstream->wpos += req.result;
+ rstream->fpos += req.result;
+ rstream->cb(rstream, rstream->data, false);
+}
diff --git a/src/os/rstream.h b/src/os/rstream.h
new file mode 100644
index 0000000000..1b3b679f9f
--- /dev/null
+++ b/src/os/rstream.h
@@ -0,0 +1,75 @@
+#ifndef NEOVIM_OS_RSTREAM_H
+#define NEOVIM_OS_RSTREAM_H
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <uv.h>
+
+#include "os/rstream_defs.h"
+
+/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
+/// necessary for reading from a libuv stream.
+///
+/// @param cb A function that will be called whenever some data is available
+/// for reading with `rstream_read`
+/// @param buffer_size Size in bytes of the internal buffer.
+/// @param data Some state to associate with the `RStream` instance
+/// @return The newly-allocated `RStream` instance
+RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data);
+
+/// Frees all memory allocated for a RStream instance
+///
+/// @param rstream The `RStream` instance
+void rstream_free(RStream *rstream);
+
+/// Sets the underlying `uv_stream_t` instance
+///
+/// @param rstream The `RStream` instance
+/// @param stream The new `uv_stream_t` instance
+void rstream_set_stream(RStream *rstream, uv_stream_t *stream);
+
+/// Sets the underlying `uv_file_t` instance
+///
+/// @param rstream The `RStream` instance
+/// @param stream The new `uv_stream_t` instance
+void rstream_set_stream(RStream *rstream, uv_stream_t *stream);
+
+/// Sets the underlying file descriptor that will be read from. Only pipes
+/// and regular files are supported for now.
+///
+/// @param rstream The `RStream` instance
+/// @param file The file descriptor
+void rstream_set_file(RStream *rstream, uv_file file);
+
+/// Tests if the stream is backed by a regular file
+///
+/// @param rstream The `RStream` instance
+/// @return True if the underlying file descriptor represents a regular file
+bool rstream_is_regular_file(RStream *rstream);
+
+/// Starts watching for events from a `RStream` instance.
+///
+/// @param rstream The `RStream` instance
+void rstream_start(RStream *rstream);
+
+/// Stops watching for events from a `RStream` instance.
+///
+/// @param rstream The `RStream` instance
+void rstream_stop(RStream *rstream);
+
+/// Reads data from a `RStream` instance into a buffer.
+///
+/// @param rstream The `RStream` instance
+/// @param buffer The buffer which will receive the data
+/// @param count Number of bytes that `buffer` can accept
+/// @return The number of bytes copied into `buffer`
+uint32_t rstream_read(RStream *rstream, char *buffer, uint32_t count);
+
+/// Returns the number of bytes available for reading from `rstream`
+///
+/// @param rstream The `RStream` instance
+/// @return The number of bytes available
+uint32_t rstream_available(RStream *rstream);
+
+#endif // NEOVIM_OS_RSTREAM_H
+
diff --git a/src/os/rstream_defs.h b/src/os/rstream_defs.h
new file mode 100644
index 0000000000..62c910d041
--- /dev/null
+++ b/src/os/rstream_defs.h
@@ -0,0 +1,14 @@
+#ifndef NEOVIM_OS_RSTREAM_DEFS_H
+#define NEOVIM_OS_RSTREAM_DEFS_H
+
+typedef struct rstream RStream;
+
+/// Function called when the RStream receives data
+///
+/// @param rstream The RStream instance
+/// @param data State associated with the RStream instance
+/// @param eof If the stream reached EOF.
+typedef void (*rstream_cb)(RStream *rstream, void *data, bool eof);
+
+#endif // NEOVIM_OS_RSTREAM_DEFS_H
+