diff options
Diffstat (limited to 'src/nvim/event/stream.c')
-rw-r--r-- | src/nvim/event/stream.c | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c new file mode 100644 index 0000000000..959b532146 --- /dev/null +++ b/src/nvim/event/stream.c @@ -0,0 +1,108 @@ +#include <assert.h> +#include <stdio.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/rbuffer.h" +#include "nvim/event/stream.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.c.generated.h" +#endif + +/// Sets the stream associated with `fd` to "blocking" mode. +/// +/// @return `0` on success, or `-errno` on failure. +int stream_set_blocking(int fd, bool blocking) +{ + // Private loop to avoid conflict with existing watcher(s): + // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. + uv_loop_t loop; + uv_pipe_t stream; + uv_loop_init(&loop); + uv_pipe_init(&loop, &stream, 0); + uv_pipe_open(&stream, fd); + int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); + uv_close((uv_handle_t *)&stream, NULL); + uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt. + uv_loop_close(&loop); + return retval; +} + +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, + void *data) +{ + stream->uvstream = uvstream; + + if (fd >= 0) { + uv_handle_type type = uv_guess_handle(fd); + stream->fd = fd; + + if (type == UV_FILE) { + // Non-blocking file reads are simulated with an idle handle that reads in + // chunks of the ring buffer size, giving time for other events to be + // processed between reads. + uv_idle_init(&loop->uv, &stream->uv.idle); + stream->uv.idle.data = stream; + } else { + assert(type == UV_NAMED_PIPE || type == UV_TTY); + uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); + uv_pipe_open(&stream->uv.pipe, fd); + stream->uvstream = (uv_stream_t *)&stream->uv.pipe; + } + } + + if (stream->uvstream) { + stream->uvstream->data = stream; + } + + stream->data = data; + stream->internal_data = NULL; + stream->fpos = 0; + stream->curmem = 0; + stream->maxmem = 0; + stream->pending_reqs = 0; + stream->read_cb = NULL; + stream->write_cb = NULL; + stream->close_cb = NULL; + stream->internal_close_cb = NULL; + stream->closed = false; + stream->buffer = NULL; +} + +void stream_close(Stream *stream, stream_close_cb on_stream_close) +{ + assert(!stream->closed); + + if (stream->buffer) { + rbuffer_free(stream->buffer); + } + + stream->closed = true; + stream->close_cb = on_stream_close; + + if (!stream->pending_reqs) { + stream_close_handle(stream); + } +} + +void stream_close_handle(Stream *stream) +{ + if (stream->uvstream) { + uv_close((uv_handle_t *)stream->uvstream, close_cb); + } else { + uv_close((uv_handle_t *)&stream->uv.idle, close_cb); + } +} + +static void close_cb(uv_handle_t *handle) +{ + Stream *stream = handle->data; + if (stream->close_cb) { + stream->close_cb(stream, stream->data); + } + if (stream->internal_close_cb) { + stream->internal_close_cb(stream, stream->internal_data); + } +} |