aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/event/stream.c')
-rw-r--r--src/nvim/event/stream.c108
1 files changed, 108 insertions, 0 deletions
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
new file mode 100644
index 0000000000..959b532146
--- /dev/null
+++ b/src/nvim/event/stream.c
@@ -0,0 +1,108 @@
+#include <assert.h>
+#include <stdio.h>
+#include <stdbool.h>
+
+#include <uv.h>
+
+#include "nvim/rbuffer.h"
+#include "nvim/event/stream.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/stream.c.generated.h"
+#endif
+
+/// Sets the stream associated with `fd` to "blocking" mode.
+///
+/// @return `0` on success, or `-errno` on failure.
+int stream_set_blocking(int fd, bool blocking)
+{
+ // Private loop to avoid conflict with existing watcher(s):
+ // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
+ uv_loop_t loop;
+ uv_pipe_t stream;
+ uv_loop_init(&loop);
+ uv_pipe_init(&loop, &stream, 0);
+ uv_pipe_open(&stream, fd);
+ int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking);
+ uv_close((uv_handle_t *)&stream, NULL);
+ uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt.
+ uv_loop_close(&loop);
+ return retval;
+}
+
+void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
+ void *data)
+{
+ stream->uvstream = uvstream;
+
+ if (fd >= 0) {
+ uv_handle_type type = uv_guess_handle(fd);
+ stream->fd = fd;
+
+ if (type == UV_FILE) {
+ // Non-blocking file reads are simulated with an idle handle that reads in
+ // chunks of the ring buffer size, giving time for other events to be
+ // processed between reads.
+ uv_idle_init(&loop->uv, &stream->uv.idle);
+ stream->uv.idle.data = stream;
+ } else {
+ assert(type == UV_NAMED_PIPE || type == UV_TTY);
+ uv_pipe_init(&loop->uv, &stream->uv.pipe, 0);
+ uv_pipe_open(&stream->uv.pipe, fd);
+ stream->uvstream = (uv_stream_t *)&stream->uv.pipe;
+ }
+ }
+
+ if (stream->uvstream) {
+ stream->uvstream->data = stream;
+ }
+
+ stream->data = data;
+ stream->internal_data = NULL;
+ stream->fpos = 0;
+ stream->curmem = 0;
+ stream->maxmem = 0;
+ stream->pending_reqs = 0;
+ stream->read_cb = NULL;
+ stream->write_cb = NULL;
+ stream->close_cb = NULL;
+ stream->internal_close_cb = NULL;
+ stream->closed = false;
+ stream->buffer = NULL;
+}
+
+void stream_close(Stream *stream, stream_close_cb on_stream_close)
+{
+ assert(!stream->closed);
+
+ if (stream->buffer) {
+ rbuffer_free(stream->buffer);
+ }
+
+ stream->closed = true;
+ stream->close_cb = on_stream_close;
+
+ if (!stream->pending_reqs) {
+ stream_close_handle(stream);
+ }
+}
+
+void stream_close_handle(Stream *stream)
+{
+ if (stream->uvstream) {
+ uv_close((uv_handle_t *)stream->uvstream, close_cb);
+ } else {
+ uv_close((uv_handle_t *)&stream->uv.idle, close_cb);
+ }
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ Stream *stream = handle->data;
+ if (stream->close_cb) {
+ stream->close_cb(stream, stream->data);
+ }
+ if (stream->internal_close_cb) {
+ stream->internal_close_cb(stream, stream->internal_data);
+ }
+}