diff options
-rw-r--r-- | clint-files.txt | 3 | ||||
-rw-r--r-- | src/os/rstream.c | 232 | ||||
-rw-r--r-- | src/os/rstream.h | 75 | ||||
-rw-r--r-- | src/os/rstream_defs.h | 14 |
4 files changed, 324 insertions, 0 deletions
diff --git a/clint-files.txt b/clint-files.txt index 8304059bc5..68c2358f85 100644 --- a/clint-files.txt +++ b/clint-files.txt @@ -6,6 +6,9 @@ src/os/event_defs.h src/os/event.h src/os/input.c src/os/input.h +src/os/rstream.c +src/os/rstream_defs.h +src/os/rstream.h src/os/job.c src/os/job_defs.h src/os/job.h diff --git a/src/os/rstream.c b/src/os/rstream.c new file mode 100644 index 0000000000..1d3716284c --- /dev/null +++ b/src/os/rstream.c @@ -0,0 +1,232 @@ +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "os/rstream_defs.h" +#include "os/rstream.h" +#include "vim.h" +#include "memory.h" + +struct rstream { + uv_buf_t uvbuf; + void *data; + char *buffer; + uv_stream_t *stream; + uv_idle_t *fread_idle; + uv_handle_type file_type; + uv_file fd; + rstream_cb cb; + uint32_t buffer_size, rpos, wpos, fpos; + bool reading, free_handle; +}; + +// Callbacks used by libuv +static void alloc_cb(uv_handle_t *, size_t, uv_buf_t *); +static void read_cb(uv_stream_t *, ssize_t, const uv_buf_t *); +static void fread_idle_cb(uv_idle_t *); + +RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data) +{ + RStream *rv = xmalloc(sizeof(RStream)); + rv->buffer = xmalloc(buffer_size); + rv->buffer_size = buffer_size; + rv->data = data; + rv->cb = cb; + rv->rpos = rv->wpos = rv->fpos = 0; + rv->stream = NULL; + rv->fread_idle = NULL; + rv->free_handle = false; + + return rv; +} + +void rstream_free(RStream *rstream) +{ + if (rstream->free_handle) { + if (rstream->fread_idle != NULL) { + uv_close((uv_handle_t *)rstream->fread_idle, NULL); + free(rstream->fread_idle); + } else { + uv_close((uv_handle_t *)rstream->stream, NULL); + free(rstream->stream); + } + } + + free(rstream->buffer); + free(rstream); +} + +void rstream_set_stream(RStream *rstream, uv_stream_t *stream) +{ + stream->data = rstream; + rstream->stream = stream; +} + +void rstream_set_file(RStream *rstream, uv_file file) +{ + rstream->file_type = uv_guess_handle(file); + + if (rstream->free_handle) { + // If this is the second time we're calling this function, free the + // previously allocated memory + if (rstream->fread_idle != NULL) { + uv_close((uv_handle_t *)rstream->fread_idle, NULL); + free(rstream->fread_idle); + } else { + uv_close((uv_handle_t *)rstream->stream, NULL); + free(rstream->stream); + } + } + + if (rstream->file_type == UV_FILE) { + // Non-blocking file reads are simulated with a idle handle that reads + // in chunks of rstream->buffer_size, giving time for other events to + // be processed between reads. + rstream->fread_idle = xmalloc(sizeof(uv_idle_t)); + uv_idle_init(uv_default_loop(), rstream->fread_idle); + rstream->fread_idle->data = rstream; + } else { + // Only pipes are supported for now + assert(rstream->file_type == UV_NAMED_PIPE + || rstream->file_type == UV_TTY); + rstream->stream = xmalloc(sizeof(uv_pipe_t)); + uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0); + uv_pipe_open((uv_pipe_t *)rstream->stream, file); + rstream->stream->data = rstream; + } + + rstream->fd = file; + rstream->free_handle = true; +} + +bool rstream_is_regular_file(RStream *rstream) +{ + return rstream->file_type == UV_FILE; +} + +void rstream_start(RStream *rstream) +{ + if (rstream->file_type == UV_FILE) { + uv_idle_start(rstream->fread_idle, fread_idle_cb); + } else { + rstream->reading = false; + uv_read_start(rstream->stream, alloc_cb, read_cb); + } +} + +void rstream_stop(RStream *rstream) +{ + if (rstream->file_type == UV_FILE) { + uv_idle_stop(rstream->fread_idle); + } else { + uv_read_stop(rstream->stream); + } +} + +uint32_t rstream_read(RStream *rstream, char *buf, uint32_t count) +{ + uint32_t read_count = rstream->wpos - rstream->rpos; + + if (count < read_count) { + read_count = count; + } + + if (read_count > 0) { + memcpy(buf, rstream->buffer + rstream->rpos, read_count); + rstream->rpos += read_count; + } + + if (rstream->wpos == rstream->buffer_size) { + // `wpos` is at the end of the buffer, so free some space by moving unread + // data... + memmove( + rstream->buffer, // ...To the beginning of the buffer(rpos 0) + rstream->buffer + rstream->rpos, // ...From the first unread position + rstream->wpos - rstream->rpos); // ...By the number of unread bytes + rstream->wpos -= rstream->rpos; + rstream->rpos = 0; + } + + return read_count; +} + +uint32_t rstream_available(RStream *rstream) +{ + return rstream->wpos - rstream->rpos; +} + +// Called by libuv to allocate memory for reading. +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ + RStream *rstream = handle->data; + + if (rstream->reading) { + buf->len = 0; + return; + } + + buf->base = rstream->buffer + rstream->wpos; + buf->len = rstream->buffer_size - rstream->wpos; + // Avoid `alloc_cb`, `alloc_cb` sequences on windows + rstream->reading = true; +} + +// Callback invoked by libuv after it copies the data into the buffer provided +// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a +// 0-length buffer. +static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) +{ + RStream *rstream = stream->data; + + if (cnt <= 0) { + if (cnt != UV_ENOBUFS) { + // Read error or EOF, either way stop the stream and invoke the callback + // with eof == true + uv_read_stop(stream); + rstream->cb(rstream, rstream->data, true); + } + return; + } + + // Data was already written, so all we need is to update 'wpos' to reflect + // the space actually used in the buffer. + rstream->wpos += cnt; + // Invoke the callback passing in the number of bytes available and data + // associated with the stream + rstream->cb(rstream, rstream->data, false); + rstream->reading = false; +} + +// Called by the by the 'idle' handle to emulate a reading event +static void fread_idle_cb(uv_idle_t *handle) +{ + uv_fs_t req; + RStream *rstream = handle->data; + + rstream->uvbuf.base = rstream->buffer + rstream->wpos; + rstream->uvbuf.len = rstream->buffer_size - rstream->wpos; + + // Synchronous read + uv_fs_read( + uv_default_loop(), + &req, + rstream->fd, + &rstream->uvbuf, + 1, + rstream->fpos, + NULL); + + uv_fs_req_cleanup(&req); + + if (req.result <= 0) { + uv_idle_stop(rstream->fread_idle); + rstream->cb(rstream, rstream->data, true); + return; + } + + rstream->wpos += req.result; + rstream->fpos += req.result; + rstream->cb(rstream, rstream->data, false); +} diff --git a/src/os/rstream.h b/src/os/rstream.h new file mode 100644 index 0000000000..1b3b679f9f --- /dev/null +++ b/src/os/rstream.h @@ -0,0 +1,75 @@ +#ifndef NEOVIM_OS_RSTREAM_H +#define NEOVIM_OS_RSTREAM_H + +#include <stdbool.h> +#include <stdint.h> +#include <uv.h> + +#include "os/rstream_defs.h" + +/// Creates a new RStream instance. A RStream encapsulates all the boilerplate +/// necessary for reading from a libuv stream. +/// +/// @param cb A function that will be called whenever some data is available +/// for reading with `rstream_read` +/// @param buffer_size Size in bytes of the internal buffer. +/// @param data Some state to associate with the `RStream` instance +/// @return The newly-allocated `RStream` instance +RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data); + +/// Frees all memory allocated for a RStream instance +/// +/// @param rstream The `RStream` instance +void rstream_free(RStream *rstream); + +/// Sets the underlying `uv_stream_t` instance +/// +/// @param rstream The `RStream` instance +/// @param stream The new `uv_stream_t` instance +void rstream_set_stream(RStream *rstream, uv_stream_t *stream); + +/// Sets the underlying `uv_file_t` instance +/// +/// @param rstream The `RStream` instance +/// @param stream The new `uv_stream_t` instance +void rstream_set_stream(RStream *rstream, uv_stream_t *stream); + +/// Sets the underlying file descriptor that will be read from. Only pipes +/// and regular files are supported for now. +/// +/// @param rstream The `RStream` instance +/// @param file The file descriptor +void rstream_set_file(RStream *rstream, uv_file file); + +/// Tests if the stream is backed by a regular file +/// +/// @param rstream The `RStream` instance +/// @return True if the underlying file descriptor represents a regular file +bool rstream_is_regular_file(RStream *rstream); + +/// Starts watching for events from a `RStream` instance. +/// +/// @param rstream The `RStream` instance +void rstream_start(RStream *rstream); + +/// Stops watching for events from a `RStream` instance. +/// +/// @param rstream The `RStream` instance +void rstream_stop(RStream *rstream); + +/// Reads data from a `RStream` instance into a buffer. +/// +/// @param rstream The `RStream` instance +/// @param buffer The buffer which will receive the data +/// @param count Number of bytes that `buffer` can accept +/// @return The number of bytes copied into `buffer` +uint32_t rstream_read(RStream *rstream, char *buffer, uint32_t count); + +/// Returns the number of bytes available for reading from `rstream` +/// +/// @param rstream The `RStream` instance +/// @return The number of bytes available +uint32_t rstream_available(RStream *rstream); + +#endif // NEOVIM_OS_RSTREAM_H + diff --git a/src/os/rstream_defs.h b/src/os/rstream_defs.h new file mode 100644 index 0000000000..62c910d041 --- /dev/null +++ b/src/os/rstream_defs.h @@ -0,0 +1,14 @@ +#ifndef NEOVIM_OS_RSTREAM_DEFS_H +#define NEOVIM_OS_RSTREAM_DEFS_H + +typedef struct rstream RStream; + +/// Function called when the RStream receives data +/// +/// @param rstream The RStream instance +/// @param data State associated with the RStream instance +/// @param eof If the stream reached EOF. +typedef void (*rstream_cb)(RStream *rstream, void *data, bool eof); + +#endif // NEOVIM_OS_RSTREAM_DEFS_H + |