From 1ebc96fe10fbdbec22caa26d5d52a9f095da9687 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Mon, 5 Jun 2017 08:29:10 +0200 Subject: channels: allow bytes sockets and stdio, and buffered bytes output --- src/nvim/event/process.c | 11 ++++------- src/nvim/event/rstream.c | 28 +++++++++++++++------------- src/nvim/event/stream.h | 6 ++---- 3 files changed, 21 insertions(+), 24 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 1bbe4d9cad..8946f049e2 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -92,7 +92,6 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (in) { stream_init(NULL, &proc->in, -1, STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe)); - proc->in.events = proc->events; proc->in.internal_data = proc; proc->in.internal_close_cb = on_process_stream_close; proc->refcount++; @@ -101,7 +100,6 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (out) { stream_init(NULL, &proc->out, -1, STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe)); - proc->out.events = proc->events; proc->out.internal_data = proc; proc->out.internal_close_cb = on_process_stream_close; proc->refcount++; @@ -110,7 +108,6 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (err) { stream_init(NULL, &proc->err, -1, STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe)); - proc->err.events = proc->events; proc->err.internal_data = proc; proc->err.internal_close_cb = on_process_stream_close; proc->refcount++; @@ -382,15 +379,15 @@ static void flush_stream(Process *proc, Stream *stream) // Poll for data and process the generated events. loop_poll_events(proc->loop, 0); - if (proc->events) { - multiqueue_process_events(proc->events); + if (stream->events) { + multiqueue_process_events(stream->events); } // Stream can be closed if it is empty. if (num_bytes == stream->num_bytes) { - if (stream->read_cb) { + if (stream->read_cb && !stream->did_eof) { // Stream callback could miss EOF handling if a child keeps the stream - // open. + // open. But only send EOF if we haven't already. stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); } break; diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 2c4db08b30..e0500ba828 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -105,20 +105,20 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) { Stream *stream = uvstream->data; - if (cnt > 0) { - stream->num_bytes += (size_t)cnt; - } - if (cnt <= 0) { - if (cnt != UV_ENOBUFS - // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: - // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. - // - // We don't need to do anything with the RBuffer because the next call - // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` - // won't be called) - && cnt != 0) { - DLOG("closing Stream: %p: %s (%s)", stream, + // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: + // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. + // + // We don't need to do anything with the RBuffer because the next call + // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` + // won't be called) + if (cnt == UV_ENOBUFS || cnt == 0) { + return; + } else if (cnt == UV_EOF && uvstream->type == UV_TTY) { + // The TTY driver might signal TTY without closing the stream + invoke_read_cb(stream, 0, true); + } else { + DLOG("Closing Stream (%p): %s (%s)", stream, uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true @@ -130,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // at this point we're sure that cnt is positive, no error occurred size_t nread = (size_t)cnt; + stream->num_bytes += nread; // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(stream->buffer, nread); @@ -187,6 +188,7 @@ static void read_event(void **argv) if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; + stream->did_eof = eof; stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); } stream->pending_reqs--; diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 70a708ff4d..e713323f5c 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -14,10 +14,7 @@ typedef struct stream Stream; /// /// @param stream The Stream instance /// @param rbuffer The associated RBuffer instance -/// @param count Number of bytes to read. This must be respected if keeping -/// the order of events is a requirement. This is because events -/// may be queued and only processed later when more data is copied -/// into to the buffer, so one read may starve another. +/// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, @@ -34,6 +31,7 @@ typedef void (*stream_close_cb)(Stream *stream, void *data); struct stream { bool closed; + bool did_eof; union { uv_pipe_t pipe; uv_tcp_t tcp; -- cgit