diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-16 23:10:15 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:19:55 -0300 |
commit | ac2bd0256183fe4255e5fcccf37f860f037d43a6 (patch) | |
tree | 09bcdf6eefd9b18a58159f06a20b97c938d4f367 /src/nvim/event | |
parent | 991d3ec1e679bb6407f2a5820910d2968424183c (diff) | |
download | rneovim-ac2bd0256183fe4255e5fcccf37f860f037d43a6.tar.gz rneovim-ac2bd0256183fe4255e5fcccf37f860f037d43a6.tar.bz2 rneovim-ac2bd0256183fe4255e5fcccf37f860f037d43a6.zip |
rstream/wstream: Unify structures and simplify API
- Simplify RStream/WStream API and make it more consistent with libuv.
- Move into the event loop layer(event subdirectory)
- Remove uv_helpers module.
- Simplify job/process internal modules/API.
- Unify RStream and WStream into a single structure. This is necessary because
libuv streams can be readable and writable at the same time(and because the
uv_helpers.c hack to associate multiple streams with libuv handle was removed)
- Make struct definition public, allowing more flexible/simple memory
management by users of the module.
- Adapt channel/job modules to cope with the changes.
Diffstat (limited to 'src/nvim/event')
-rw-r--r-- | src/nvim/event/rstream.c | 167 | ||||
-rw-r--r-- | src/nvim/event/rstream.h | 16 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 108 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 55 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 162 | ||||
-rw-r--r-- | src/nvim/event/wstream.h | 24 |
6 files changed, 532 insertions, 0 deletions
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c new file mode 100644 index 0000000000..78c044347f --- /dev/null +++ b/src/nvim/event/rstream.c @@ -0,0 +1,167 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/rstream.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/memory.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/event/loop.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.c.generated.h" +#endif + +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, + void *data) + FUNC_ATTR_NONNULL_ARG(1) + FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(loop, stream, fd, NULL, data); + rstream_init(stream, bufsize); +} + +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, + void *data) + FUNC_ATTR_NONNULL_ARG(1) + FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(NULL, stream, -1, uvstream, data); + rstream_init(stream, bufsize); +} + +void rstream_init(Stream *stream, size_t bufsize) + FUNC_ATTR_NONNULL_ARG(1) +{ + stream->buffer = rbuffer_new(bufsize); + stream->buffer->data = stream; + stream->buffer->full_cb = on_rbuffer_full; + stream->buffer->nonfull_cb = on_rbuffer_nonfull; +} + + +/// Starts watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_start(Stream *stream, stream_read_cb cb) +{ + stream->read_cb = cb; + if (stream->uvstream) { + uv_read_start(stream->uvstream, alloc_cb, read_cb); + } else { + uv_idle_start(&stream->uv.idle, fread_idle_cb); + } +} + +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(Stream *stream) +{ + if (stream->uvstream) { + uv_read_stop(stream->uvstream); + } else { + uv_idle_stop(&stream->uv.idle); + } +} + +static void on_rbuffer_full(RBuffer *buf, void *data) +{ + rstream_stop(data); +} + +static void on_rbuffer_nonfull(RBuffer *buf, void *data) +{ + Stream *stream = data; + assert(stream->read_cb); + rstream_start(stream, stream->read_cb); +} + +// Callbacks used by libuv + +// Called by libuv to allocate memory for reading. +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ + Stream *stream = handle->data; + buf->base = rbuffer_write_ptr(stream->buffer, &buf->len); +} + +// Callback invoked by libuv after it copies the data into the buffer provided +// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a +// 0-length buffer. +static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) +{ + Stream *stream = uvstream->data; + + if (cnt <= 0) { + if (cnt != UV_ENOBUFS + // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: + // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. + // + // We don't need to do anything with the RBuffer because the next call + // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` + // won't be called) + && cnt != 0) { + DLOG("Closing Stream(%p) because of %s(%zd)", stream, + uv_strerror((int)cnt), cnt); + // Read error or EOF, either way stop the stream and invoke the callback + // with eof == true + uv_read_stop(uvstream); + stream->read_cb(stream, stream->buffer, stream->data, true); + } + return; + } + + // at this point we're sure that cnt is positive, no error occurred + size_t nread = (size_t)cnt; + // Data was already written, so all we need is to update 'wpos' to reflect + // the space actually used in the buffer. + rbuffer_produced(stream->buffer, nread); + stream->read_cb(stream, stream->buffer, stream->data, false); +} + +// Called by the by the 'idle' handle to emulate a reading event +static void fread_idle_cb(uv_idle_t *handle) +{ + uv_fs_t req; + Stream *stream = handle->data; + + stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len); + + // the offset argument to uv_fs_read is int64_t, could someone really try + // to read more than 9 quintillion (9e18) bytes? + // upcast is meant to avoid tautological condition warning on 32 bits + uintmax_t fpos_intmax = stream->fpos; + if (fpos_intmax > INT64_MAX) { + ELOG("stream offset overflow"); + preserve_exit(); + } + + // Synchronous read + uv_fs_read( + handle->loop, + &req, + stream->fd, + &stream->uvbuf, + 1, + (int64_t) stream->fpos, + NULL); + + uv_fs_req_cleanup(&req); + + if (req.result <= 0) { + uv_idle_stop(&stream->uv.idle); + stream->read_cb(stream, stream->buffer, stream->data, true); + return; + } + + // no errors (req.result (ssize_t) is positive), it's safe to cast. + size_t nread = (size_t) req.result; + rbuffer_produced(stream->buffer, nread); + stream->fpos += nread; +} diff --git a/src/nvim/event/rstream.h b/src/nvim/event/rstream.h new file mode 100644 index 0000000000..f30ad79ee5 --- /dev/null +++ b/src/nvim/event/rstream.h @@ -0,0 +1,16 @@ +#ifndef NVIM_EVENT_RSTREAM_H +#define NVIM_EVENT_RSTREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.h.generated.h" +#endif +#endif // NVIM_EVENT_RSTREAM_H diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c new file mode 100644 index 0000000000..959b532146 --- /dev/null +++ b/src/nvim/event/stream.c @@ -0,0 +1,108 @@ +#include <assert.h> +#include <stdio.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/rbuffer.h" +#include "nvim/event/stream.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.c.generated.h" +#endif + +/// Sets the stream associated with `fd` to "blocking" mode. +/// +/// @return `0` on success, or `-errno` on failure. +int stream_set_blocking(int fd, bool blocking) +{ + // Private loop to avoid conflict with existing watcher(s): + // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. + uv_loop_t loop; + uv_pipe_t stream; + uv_loop_init(&loop); + uv_pipe_init(&loop, &stream, 0); + uv_pipe_open(&stream, fd); + int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); + uv_close((uv_handle_t *)&stream, NULL); + uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt. + uv_loop_close(&loop); + return retval; +} + +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, + void *data) +{ + stream->uvstream = uvstream; + + if (fd >= 0) { + uv_handle_type type = uv_guess_handle(fd); + stream->fd = fd; + + if (type == UV_FILE) { + // Non-blocking file reads are simulated with an idle handle that reads in + // chunks of the ring buffer size, giving time for other events to be + // processed between reads. + uv_idle_init(&loop->uv, &stream->uv.idle); + stream->uv.idle.data = stream; + } else { + assert(type == UV_NAMED_PIPE || type == UV_TTY); + uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); + uv_pipe_open(&stream->uv.pipe, fd); + stream->uvstream = (uv_stream_t *)&stream->uv.pipe; + } + } + + if (stream->uvstream) { + stream->uvstream->data = stream; + } + + stream->data = data; + stream->internal_data = NULL; + stream->fpos = 0; + stream->curmem = 0; + stream->maxmem = 0; + stream->pending_reqs = 0; + stream->read_cb = NULL; + stream->write_cb = NULL; + stream->close_cb = NULL; + stream->internal_close_cb = NULL; + stream->closed = false; + stream->buffer = NULL; +} + +void stream_close(Stream *stream, stream_close_cb on_stream_close) +{ + assert(!stream->closed); + + if (stream->buffer) { + rbuffer_free(stream->buffer); + } + + stream->closed = true; + stream->close_cb = on_stream_close; + + if (!stream->pending_reqs) { + stream_close_handle(stream); + } +} + +void stream_close_handle(Stream *stream) +{ + if (stream->uvstream) { + uv_close((uv_handle_t *)stream->uvstream, close_cb); + } else { + uv_close((uv_handle_t *)&stream->uv.idle, close_cb); + } +} + +static void close_cb(uv_handle_t *handle) +{ + Stream *stream = handle->data; + if (stream->close_cb) { + stream->close_cb(stream, stream->data); + } + if (stream->internal_close_cb) { + stream->internal_close_cb(stream, stream->internal_data); + } +} diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h new file mode 100644 index 0000000000..37410b2036 --- /dev/null +++ b/src/nvim/event/stream.h @@ -0,0 +1,55 @@ +#ifndef NVIM_EVENT_STREAM_H +#define NVIM_EVENT_STREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/rbuffer.h" + +typedef struct stream Stream; +/// Type of function called when the Stream buffer is filled with data +/// +/// @param stream The Stream instance +/// @param rbuffer The associated RBuffer instance +/// @param data User-defined data +/// @param eof If the stream reached EOF. +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, + bool eof); + +/// Type of function called when the Stream has information about a write +/// request. +/// +/// @param wstream The Stream instance +/// @param data User-defined data +/// @param status 0 on success, anything else indicates failure +typedef void (*stream_write_cb)(Stream *stream, void *data, int status); +typedef void (*stream_close_cb)(Stream *stream, void *data); + +struct stream { + union { + uv_pipe_t pipe; + uv_tcp_t tcp; + uv_idle_t idle; + } uv; + uv_stream_t *uvstream; + uv_buf_t uvbuf; + RBuffer *buffer; + uv_file fd; + stream_read_cb read_cb; + stream_write_cb write_cb; + stream_close_cb close_cb, internal_close_cb; + size_t fpos; + size_t curmem; + size_t maxmem; + size_t pending_reqs; + void *data, *internal_data; + bool closed; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.h.generated.h" +#endif +#endif // NVIM_EVENT_STREAM_H diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c new file mode 100644 index 0000000000..5fcb724fe3 --- /dev/null +++ b/src/nvim/event/wstream.c @@ -0,0 +1,162 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/wstream.h" +#include "nvim/vim.h" +#include "nvim/memory.h" + +#define DEFAULT_MAXMEM 1024 * 1024 * 10 + +typedef struct { + Stream *stream; + WBuffer *buffer; + uv_write_t uv_req; +} WRequest; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.c.generated.h" +#endif + +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, + void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(loop, stream, fd, NULL, data); + wstream_init(stream, maxmem); +} + +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, + void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ + stream_init(NULL, stream, -1, uvstream, data); + wstream_init(stream, maxmem); +} + +void wstream_init(Stream *stream, size_t maxmem) +{ + stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM; +} + +/// Sets a callback that will be called on completion of a write request, +/// indicating failure/success. +/// +/// This affects all requests currently in-flight as well. Overwrites any +/// possible earlier callback. +/// +/// @note This callback will not fire if the write request couldn't even be +/// queued properly (i.e.: when `wstream_write() returns an error`). +/// +/// @param stream The `Stream` instance +/// @param cb The callback +void wstream_set_write_cb(Stream *stream, stream_write_cb cb) + FUNC_ATTR_NONNULL_ALL +{ + stream->write_cb = cb; +} + +/// Queues data for writing to the backing file descriptor of a `Stream` +/// instance. This will fail if the write would cause the Stream use more +/// memory than specified by `maxmem`. +/// +/// @param stream The `Stream` instance +/// @param buffer The buffer which contains data to be written +/// @return false if the write failed +bool wstream_write(Stream *stream, WBuffer *buffer) + FUNC_ATTR_NONNULL_ALL +{ + assert(stream->maxmem); + // This should not be called after a stream was freed + assert(!stream->closed); + + if (stream->curmem > stream->maxmem) { + goto err; + } + + stream->curmem += buffer->size; + + WRequest *data = xmalloc(sizeof(WRequest)); + data->stream = stream; + data->buffer = buffer; + data->uv_req.data = data; + + uv_buf_t uvbuf; + uvbuf.base = buffer->data; + uvbuf.len = buffer->size; + + if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) { + xfree(data); + goto err; + } + + stream->pending_reqs++; + return true; + +err: + wstream_release_wbuffer(buffer); + return false; +} + +/// Creates a WBuffer object for holding output data. Instances of this +/// object can be reused across Stream instances, and the memory is freed +/// automatically when no longer needed(it tracks the number of references +/// internally) +/// +/// @param data Data stored by the WBuffer +/// @param size The size of the data array +/// @param refcount The number of references for the WBuffer. This will be used +/// by Stream instances to decide when a WBuffer should be freed. +/// @param cb Pointer to function that will be responsible for freeing +/// the buffer data(passing 'free' will work as expected). +/// @return The allocated WBuffer instance +WBuffer *wstream_new_buffer(char *data, + size_t size, + size_t refcount, + wbuffer_data_finalizer cb) +{ + WBuffer *rv = xmalloc(sizeof(WBuffer)); + rv->size = size; + rv->refcount = refcount; + rv->cb = cb; + rv->data = data; + + return rv; +} + +static void write_cb(uv_write_t *req, int status) +{ + WRequest *data = req->data; + + data->stream->curmem -= data->buffer->size; + + wstream_release_wbuffer(data->buffer); + + if (data->stream->write_cb) { + data->stream->write_cb(data->stream, data->stream->data, status); + } + + data->stream->pending_reqs--; + + if (data->stream->closed && data->stream->pending_reqs == 0) { + // Last pending write, free the stream; + stream_close_handle(data->stream); + } + + xfree(data); +} + +void wstream_release_wbuffer(WBuffer *buffer) +{ + if (!--buffer->refcount) { + if (buffer->cb) { + buffer->cb(buffer->data); + } + + xfree(buffer); + } +} diff --git a/src/nvim/event/wstream.h b/src/nvim/event/wstream.h new file mode 100644 index 0000000000..9008de0d97 --- /dev/null +++ b/src/nvim/event/wstream.h @@ -0,0 +1,24 @@ +#ifndef NVIM_EVENT_WSTREAM_H +#define NVIM_EVENT_WSTREAM_H + +#include <stdint.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + +typedef struct wbuffer WBuffer; +typedef void (*wbuffer_data_finalizer)(void *data); + +struct wbuffer { + size_t size, refcount; + char *data; + wbuffer_data_finalizer cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.h.generated.h" +#endif +#endif // NVIM_EVENT_WSTREAM_H |