diff options
author | bfredl <bjorn.linse@gmail.com> | 2024-05-30 12:59:02 +0200 |
---|---|---|
committer | bfredl <bjorn.linse@gmail.com> | 2024-05-31 15:01:13 +0200 |
commit | c13c50b752dca322a5ec77dea6188c9e3694549b (patch) | |
tree | 048342b08d13f6df0b9c29b9b0b5d470322e6a0c /src | |
parent | 6566a59b3a6c8dabfa40f8debd0de96d875825e9 (diff) | |
download | rneovim-c13c50b752dca322a5ec77dea6188c9e3694549b.tar.gz rneovim-c13c50b752dca322a5ec77dea6188c9e3694549b.tar.bz2 rneovim-c13c50b752dca322a5ec77dea6188c9e3694549b.zip |
refactor(io): separate types for read and write streams
This is a structural refactor with no logical changes, yet. Done in
preparation for simplifying rstream/rbuffer which will require more
state inline in RStream.
The initial idea was to have RStream and WStream as sub-types
symetrically but that doesn't work, as sockets are both reading and
writing. Also there is very little write-specific state to start with,
so the benefit of a separate WStream struct is a lot smaller. Just
document what fields in `Stream` are write specific.
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/channel.c | 32 | ||||
-rw-r--r-- | src/nvim/channel.h | 8 | ||||
-rw-r--r-- | src/nvim/channel_defs.h | 2 | ||||
-rw-r--r-- | src/nvim/event/defs.h | 31 | ||||
-rw-r--r-- | src/nvim/event/libuv_process.c | 8 | ||||
-rw-r--r-- | src/nvim/event/process.c | 46 | ||||
-rw-r--r-- | src/nvim/event/process.h | 4 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 66 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 16 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 26 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 7 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 4 | ||||
-rw-r--r-- | src/nvim/os/input.c | 12 | ||||
-rw-r--r-- | src/nvim/os/pty_process_unix.c | 6 | ||||
-rw-r--r-- | src/nvim/os/pty_process_win.c | 8 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 6 | ||||
-rw-r--r-- | src/nvim/tui/input.c | 4 | ||||
-rw-r--r-- | src/nvim/tui/input.h | 2 |
18 files changed, 157 insertions, 131 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c index 41635747f8..e5492caf45 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -126,19 +126,19 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error) *error = e_invstream; return false; } - stream_may_close(&chan->stream.socket); + rstream_may_close(&chan->stream.socket); break; case kChannelStreamProc: proc = &chan->stream.proc; if (part == kChannelPartStdin || close_main) { - stream_may_close(&proc->in); + wstream_may_close(&proc->in); } if (part == kChannelPartStdout || close_main) { - stream_may_close(&proc->out); + rstream_may_close(&proc->out); } if (part == kChannelPartStderr || part == kChannelPartAll) { - stream_may_close(&proc->err); + rstream_may_close(&proc->err); } if (proc->type == kProcessTypePty && part == kChannelPartAll) { pty_process_close_master(&chan->stream.pty); @@ -148,10 +148,10 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error) case kChannelStreamStdio: if (part == kChannelPartStdin || close_main) { - stream_may_close(&chan->stream.stdio.in); + rstream_may_close(&chan->stream.stdio.in); } if (part == kChannelPartStdout || close_main) { - stream_may_close(&chan->stream.stdio.out); + wstream_may_close(&chan->stream.stdio.out); } if (part == kChannelPartStderr) { *error = e_invstream; @@ -480,9 +480,9 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader return 0; } - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - wstream_init(&channel->stream.socket, 0); + channel->stream.socket.s.internal_close_cb = close_cb; + channel->stream.socket.s.internal_data = channel; + wstream_init(&channel->stream.socket.s, 0); rstream_init(&channel->stream.socket, 0); if (rpc) { @@ -505,9 +505,9 @@ void channel_from_connection(SocketWatcher *watcher) { Channel *channel = channel_alloc(kChannelStreamSocket); socket_watcher_accept(watcher, &channel->stream.socket); - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - wstream_init(&channel->stream.socket, 0); + channel->stream.socket.s.internal_close_cb = close_cb; + channel->stream.socket.s.internal_data = channel; + wstream_init(&channel->stream.socket.s, 0); rstream_init(&channel->stream.socket, 0); rpc_start(channel); channel_create_event(channel, watcher->addr); @@ -647,19 +647,19 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) return l; } -void on_channel_data(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +void on_channel_data(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; on_channel_output(stream, chan, buf, eof, &chan->on_data); } -void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +void on_job_stderr(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; on_channel_output(stream, chan, buf, eof, &chan->on_stderr); } -static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool eof, +static void on_channel_output(RStream *stream, Channel *chan, RBuffer *buf, bool eof, CallbackReader *reader) { size_t count; @@ -864,7 +864,7 @@ static void term_resize(uint16_t width, uint16_t height, void *data) static inline void term_delayed_free(void **argv) { Channel *chan = argv[0]; - if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) { + if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) { multiqueue_put(chan->events, term_delayed_free, chan); return; } diff --git a/src/nvim/channel.h b/src/nvim/channel.h index 35d369e513..72480db0d5 100644 --- a/src/nvim/channel.h +++ b/src/nvim/channel.h @@ -30,7 +30,7 @@ struct Channel { Process proc; LibuvProcess uv; PtyProcess pty; - Stream socket; + RStream socket; StdioPair stdio; StderrState err; InternalState internal; @@ -73,7 +73,7 @@ static inline Stream *channel_instream(Channel *chan) return &chan->stream.proc.in; case kChannelStreamSocket: - return &chan->stream.socket; + return &chan->stream.socket.s; case kChannelStreamStdio: return &chan->stream.stdio.out; @@ -85,10 +85,10 @@ static inline Stream *channel_instream(Channel *chan) abort(); } -static inline Stream *channel_outstream(Channel *chan) +static inline RStream *channel_outstream(Channel *chan) REAL_FATTR_NONNULL_ALL; -static inline Stream *channel_outstream(Channel *chan) +static inline RStream *channel_outstream(Channel *chan) { switch (chan->streamtype) { case kChannelStreamProc: diff --git a/src/nvim/channel_defs.h b/src/nvim/channel_defs.h index d4f1895420..2df6edea7a 100644 --- a/src/nvim/channel_defs.h +++ b/src/nvim/channel_defs.h @@ -30,7 +30,7 @@ typedef enum { } ChannelStdinMode; typedef struct { - Stream in; + RStream in; Stream out; } StdioPair; diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 9b7d8708be..8563006159 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -55,14 +55,15 @@ struct wbuffer { }; typedef struct stream Stream; -/// Type of function called when the Stream buffer is filled with data +typedef struct rstream RStream; +/// Type of function called when the RStream buffer is filled with data /// /// @param stream The Stream instance /// @param buf The associated RBuffer instance /// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof); +typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -71,11 +72,11 @@ typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void /// @param data User-defined data /// @param status 0 on success, anything else indicates failure typedef void (*stream_write_cb)(Stream *stream, void *data, int status); + typedef void (*stream_close_cb)(Stream *stream, void *data); struct stream { bool closed; - bool did_eof; union { uv_pipe_t pipe; uv_tcp_t tcp; @@ -85,20 +86,27 @@ struct stream { #endif } uv; uv_stream_t *uvstream; - uv_buf_t uvbuf; - RBuffer *buffer; uv_file fd; - stream_read_cb read_cb; - stream_write_cb write_cb; void *cb_data; stream_close_cb close_cb, internal_close_cb; void *close_cb_data, *internal_data; - size_t fpos; + size_t pending_reqs; + MultiQueue *events; + + // only used for writing: + stream_write_cb write_cb; size_t curmem; size_t maxmem; - size_t pending_reqs; +}; + +struct rstream { + Stream s; + bool did_eof; + RBuffer *buffer; + uv_buf_t uvbuf; + stream_read_cb read_cb; size_t num_bytes; - MultiQueue *events; + size_t fpos; }; #define ADDRESS_MAX_SIZE 256 @@ -147,7 +155,8 @@ struct process { char **argv; const char *exepath; dict_T *env; - Stream in, out, err; + Stream in; + RStream out, err; /// Exit handler. If set, user must call process_free(). process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c index f77d686c10..0dead1f9b4 100644 --- a/src/nvim/event/libuv_process.c +++ b/src/nvim/event/libuv_process.c @@ -70,19 +70,19 @@ int libuv_process_spawn(LibuvProcess *uvproc) uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe); } - if (!proc->out.closed) { + if (!proc->out.s.closed) { uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; #ifdef MSWIN // pipe must be readable for IOCP to work on Windows. uvproc->uvstdio[1].flags |= proc->overlapped ? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0; #endif - uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.uv.pipe); + uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe); } - if (!proc->err.closed) { + if (!proc->err.s.closed) { uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.uv.pipe); + uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe); } else if (proc->fwd_err) { uvproc->uvstdio[2].flags = UV_INHERIT_FD; uvproc->uvstdio[2].data.fd = STDERR_FILENO; diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 7460e92766..710376cd62 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -8,7 +8,9 @@ #include "nvim/event/loop.h" #include "nvim/event/multiqueue.h" #include "nvim/event/process.h" +#include "nvim/event/rstream.h" #include "nvim/event/stream.h" +#include "nvim/event/wstream.h" #include "nvim/globals.h" #include "nvim/log.h" #include "nvim/main.h" @@ -51,15 +53,15 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0); } else { - proc->out.closed = true; + proc->out.s.closed = true; } if (err) { - uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0); } else { - proc->err.closed = true; + proc->err.s.closed = true; } #ifdef USE_GCOV @@ -82,10 +84,10 @@ int process_spawn(Process *proc, bool in, bool out, bool err) uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); } if (out) { - uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL); } if (err) { - uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL); } if (proc->type == kProcessTypeUv) { @@ -106,16 +108,16 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - stream_init(NULL, &proc->out, -1, (uv_stream_t *)&proc->out.uv.pipe); - proc->out.internal_data = proc; - proc->out.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe); + proc->out.s.internal_data = proc; + proc->out.s.internal_close_cb = on_process_stream_close; proc->refcount++; } if (err) { - stream_init(NULL, &proc->err, -1, (uv_stream_t *)&proc->err.uv.pipe); - proc->err.internal_data = proc; - proc->err.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe); + proc->err.s.internal_data = proc; + proc->err.s.internal_close_cb = on_process_stream_close; proc->refcount++; } @@ -148,9 +150,9 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL { - stream_may_close(&proc->in); - stream_may_close(&proc->out); - stream_may_close(&proc->err); + wstream_may_close(&proc->in); + rstream_may_close(&proc->out); + rstream_may_close(&proc->err); } /// Synchronously wait for a process to finish @@ -337,10 +339,10 @@ static void process_close(Process *proc) /// /// @param proc Process, for which an output stream should be flushed. /// @param stream Stream to flush. -static void flush_stream(Process *proc, Stream *stream) +static void flush_stream(Process *proc, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { - if (!stream || stream->closed) { + if (!stream || stream->s.closed) { return; } @@ -350,7 +352,7 @@ static void flush_stream(Process *proc, Stream *stream) // keeps sending data, we only accept as much data as the system buffer size. // Otherwise this would block cleanup/teardown. int system_buffer_size = 0; - int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe, + int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, &system_buffer_size); if (err) { system_buffer_size = (int)rbuffer_capacity(stream->buffer); @@ -359,14 +361,14 @@ static void flush_stream(Process *proc, Stream *stream) size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; // Read remaining data. - while (!stream->closed && stream->num_bytes < max_bytes) { + while (!stream->s.closed && stream->num_bytes < max_bytes) { // Remember number of bytes before polling size_t num_bytes = stream->num_bytes; // Poll for data and process the generated events. loop_poll_events(proc->loop, 0); - if (stream->events) { - multiqueue_process_events(stream->events); + if (stream->s.events) { + multiqueue_process_events(stream->s.events); } // Stream can be closed if it is empty. @@ -374,7 +376,7 @@ static void flush_stream(Process *proc, Stream *stream) if (stream->read_cb && !stream->did_eof) { // Stream callback could miss EOF handling if a child keeps the stream // open. But only send EOF if we haven't already. - stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); + stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true); } break; } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 421a470244..74b52cbbb1 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -21,8 +21,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .argv = NULL, .exepath = NULL, .in = { .closed = false }, - .out = { .closed = false }, - .err = { .closed = false }, + .out = { .s.closed = false }, + .err = { .s.closed = false }, .cb = NULL, .closed = false, .internal_close_cb = NULL, diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 6b4ab472e4..6c7fa20bd8 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -19,23 +19,26 @@ # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) +void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1, 2) { - stream_init(loop, stream, fd, NULL); + stream_init(loop, &stream->s, fd, NULL); rstream_init(stream, bufsize); } -void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) +void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1, 2) { - stream_init(NULL, stream, -1, uvstream); + stream_init(NULL, &stream->s, -1, uvstream); rstream_init(stream, bufsize); } -void rstream_init(Stream *stream, size_t bufsize) +void rstream_init(RStream *stream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) { + stream->fpos = 0; + stream->read_cb = NULL; + stream->num_bytes = 0; stream->buffer = rbuffer_new(bufsize); stream->buffer->data = stream; stream->buffer->full_cb = on_rbuffer_full; @@ -45,28 +48,28 @@ void rstream_init(Stream *stream, size_t bufsize) /// Starts watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_start(Stream *stream, stream_read_cb cb, void *data) +void rstream_start(RStream *stream, stream_read_cb cb, void *data) FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; - stream->cb_data = data; - if (stream->uvstream) { - uv_read_start(stream->uvstream, alloc_cb, read_cb); + stream->s.cb_data = data; + if (stream->s.uvstream) { + uv_read_start(stream->s.uvstream, alloc_cb, read_cb); } else { - uv_idle_start(&stream->uv.idle, fread_idle_cb); + uv_idle_start(&stream->s.uv.idle, fread_idle_cb); } } /// Stops watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_stop(Stream *stream) +void rstream_stop(RStream *stream) FUNC_ATTR_NONNULL_ALL { - if (stream->uvstream) { - uv_read_stop(stream->uvstream); + if (stream->s.uvstream) { + uv_read_stop(stream->s.uvstream); } else { - uv_idle_stop(&stream->uv.idle); + uv_idle_stop(&stream->s.uv.idle); } } @@ -77,9 +80,9 @@ static void on_rbuffer_full(RBuffer *buf, void *data) static void on_rbuffer_nonfull(RBuffer *buf, void *data) { - Stream *stream = data; + RStream *stream = data; assert(stream->read_cb); - rstream_start(stream, stream->read_cb, stream->cb_data); + rstream_start(stream, stream->read_cb, stream->s.cb_data); } // Callbacks used by libuv @@ -87,7 +90,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) /// Called by libuv to allocate memory for reading. static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { - Stream *stream = handle->data; + RStream *stream = handle->data; // `uv_buf_t.len` happens to have different size on Windows. size_t write_count; buf->base = rbuffer_write_ptr(stream->buffer, &write_count); @@ -99,7 +102,7 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) /// 0-length buffer. static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) { - Stream *stream = uvstream->data; + RStream *stream = uvstream->data; if (cnt <= 0) { // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: @@ -141,7 +144,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) static void fread_idle_cb(uv_idle_t *handle) { uv_fs_t req; - Stream *stream = handle->data; + RStream *stream = handle->data; // `uv_buf_t.len` happens to have different size on Windows. size_t write_count; @@ -160,7 +163,7 @@ static void fread_idle_cb(uv_idle_t *handle) // Synchronous read uv_fs_read(handle->loop, &req, - stream->fd, + stream->s.fd, &stream->uvbuf, 1, (int64_t)stream->fpos, @@ -169,7 +172,7 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_req_cleanup(&req); if (req.result <= 0) { - uv_idle_stop(&stream->uv.idle); + uv_idle_stop(&stream->s.uv.idle); invoke_read_cb(stream, 0, true); return; } @@ -183,24 +186,29 @@ static void fread_idle_cb(uv_idle_t *handle) static void read_event(void **argv) { - Stream *stream = argv[0]; + RStream *stream = argv[0]; if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; stream->did_eof = eof; - stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); + stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof); } - stream->pending_reqs--; - if (stream->closed && !stream->pending_reqs) { - stream_close_handle(stream); + stream->s.pending_reqs--; + if (stream->s.closed && !stream->s.pending_reqs) { + stream_close_handle(&stream->s, true); } } -static void invoke_read_cb(Stream *stream, size_t count, bool eof) +static void invoke_read_cb(RStream *stream, size_t count, bool eof) { // Don't let the stream be closed before the event is processed. - stream->pending_reqs++; + stream->s.pending_reqs++; - CREATE_EVENT(stream->events, read_event, + CREATE_EVENT(stream->s.events, read_event, stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); } + +void rstream_may_close(RStream *stream) +{ + stream_may_close(&stream->s, true); +} diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 4e878a2ecf..017f159fa1 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -135,17 +135,17 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) return 0; } -int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) +int socket_watcher_accept(SocketWatcher *watcher, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; if (watcher->stream->type == UV_TCP) { - client = (uv_stream_t *)(&stream->uv.tcp); + client = (uv_stream_t *)(&stream->s.uv.tcp); uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); uv_tcp_nodelay((uv_tcp_t *)client, true); } else { - client = (uv_stream_t *)&stream->uv.pipe; + client = (uv_stream_t *)&stream->s.uv.pipe; uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); } @@ -156,7 +156,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) return result; } - stream_init(NULL, stream, -1, client); + stream_init(NULL, &stream->s, -1, client); return 0; } @@ -197,7 +197,7 @@ static void connect_cb(uv_connect_t *req, int status) } } -bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address, int timeout, +bool socket_connect(Loop *loop, RStream *stream, bool is_tcp, const char *address, int timeout, const char **error) { bool success = false; @@ -206,7 +206,7 @@ bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address req.data = &status; uv_stream_t *uv_stream; - uv_tcp_t *tcp = &stream->uv.tcp; + uv_tcp_t *tcp = &stream->s.uv.tcp; uv_getaddrinfo_t addr_req; addr_req.addrinfo = NULL; const struct addrinfo *addrinfo = NULL; @@ -237,7 +237,7 @@ tcp_retry: uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb); uv_stream = (uv_stream_t *)tcp; } else { - uv_pipe_t *pipe = &stream->uv.pipe; + uv_pipe_t *pipe = &stream->s.uv.pipe; uv_pipe_init(&loop->uv, pipe, 0); uv_pipe_connect(&req, pipe, address, connect_cb); uv_stream = (uv_stream_t *)pipe; @@ -245,7 +245,7 @@ tcp_retry: status = 1; LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1); if (status == 0) { - stream_init(NULL, stream, -1, uv_stream); + stream_init(NULL, &stream->s, -1, uv_stream); success = true; } else if (is_tcp && addrinfo->ai_next) { addrinfo = addrinfo->ai_next; diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 0b9ed4f25b..3d26dd868f 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -85,21 +85,17 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) } stream->internal_data = NULL; - stream->fpos = 0; stream->curmem = 0; stream->maxmem = 0; stream->pending_reqs = 0; - stream->read_cb = NULL; stream->write_cb = NULL; stream->close_cb = NULL; stream->internal_close_cb = NULL; stream->closed = false; - stream->buffer = NULL; stream->events = NULL; - stream->num_bytes = 0; } -void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) +void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data, bool rstream) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); @@ -116,18 +112,18 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) #endif if (!stream->pending_reqs) { - stream_close_handle(stream); + stream_close_handle(stream, rstream); } } -void stream_may_close(Stream *stream) +void stream_may_close(Stream *stream, bool rstream) { if (!stream->closed) { - stream_close(stream, NULL, NULL); + stream_close(stream, NULL, NULL, rstream); } } -void stream_close_handle(Stream *stream) +void stream_close_handle(Stream *stream, bool rstream) FUNC_ATTR_NONNULL_ALL { uv_handle_t *handle = NULL; @@ -145,16 +141,22 @@ void stream_close_handle(Stream *stream) assert(handle != NULL); if (!uv_is_closing(handle)) { - uv_close(handle, close_cb); + uv_close(handle, rstream ? rstream_close_cb : close_cb); } } -static void close_cb(uv_handle_t *handle) +static void rstream_close_cb(uv_handle_t *handle) { - Stream *stream = handle->data; + RStream *stream = handle->data; if (stream->buffer) { rbuffer_free(stream->buffer); } + close_cb(handle); +} + +static void close_cb(uv_handle_t *handle) +{ + Stream *stream = handle->data; if (stream->close_cb) { stream->close_cb(stream, stream->close_cb_data); } diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index c67a9b96ed..07aab87e4d 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -141,7 +141,7 @@ static void write_cb(uv_write_t *req, int status) if (data->stream->closed && data->stream->pending_reqs == 0) { // Last pending write, free the stream; - stream_close_handle(data->stream); + stream_close_handle(data->stream, false); } xfree(data); @@ -158,3 +158,8 @@ void wstream_release_wbuffer(WBuffer *buffer) xfree(buffer); } } + +void wstream_may_close(Stream *stream) +{ + stream_may_close(stream, false); +} diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5737a0440f..98d5d8c6cb 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -89,7 +89,7 @@ void rpc_start(Channel *channel) kv_init(rpc->call_stack); if (channel->streamtype != kChannelStreamInternal) { - Stream *out = channel_outstream(channel); + RStream *out = channel_outstream(channel); #ifdef NVIM_LOG_DEBUG Stream *in = channel_instream(channel); DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, @@ -202,7 +202,7 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem return frame.errored ? NIL : frame.result; } -static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) +static void receive_msgpack(RStream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { Channel *channel = data; channel_incref(channel); diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 60b5b48745..cfe8696cdd 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -41,7 +41,7 @@ typedef enum { kInputEof, } InbufPollResult; -static Stream read_stream = { .closed = true }; // Input before UI starts. +static RStream read_stream = { .s.closed = true }; // Input before UI starts. static RBuffer *input_buffer = NULL; static bool input_eof = false; static bool blocking = false; @@ -59,7 +59,7 @@ void input_init(void) void input_start(void) { - if (!read_stream.closed) { + if (!read_stream.s.closed) { return; } @@ -70,12 +70,12 @@ void input_start(void) void input_stop(void) { - if (read_stream.closed) { + if (read_stream.s.closed) { return; } rstream_stop(&read_stream); - stream_close(&read_stream, NULL, NULL); + rstream_may_close(&read_stream); } #ifdef EXITFREE @@ -138,7 +138,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt, MultiQueue *e uint64_t wait_start = os_hrtime(); cursorhold_time = MIN(cursorhold_time, (int)p_ut); if ((result = inbuf_poll((int)p_ut - cursorhold_time, events)) == kInputNone) { - if (read_stream.closed && silent_mode) { + if (read_stream.s.closed && silent_mode) { // Drained eventloop & initial input; exit silent/batch-mode (-es/-Es). read_error_exit(); } @@ -489,7 +489,7 @@ bool input_available(void) return rbuffer_size(input_buffer) != 0; } -static void input_read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, bool at_eof) +static void input_read_cb(RStream *stream, RBuffer *buf, size_t c, void *data, bool at_eof) { if (at_eof) { input_eof = true; diff --git a/src/nvim/os/pty_process_unix.c b/src/nvim/os/pty_process_unix.c index 4d34e8fac4..cfa4dcada7 100644 --- a/src/nvim/os/pty_process_unix.c +++ b/src/nvim/os/pty_process_unix.c @@ -169,7 +169,7 @@ int pty_process_spawn(PtyProcess *ptyproc) int status = 0; // zero or negative error code (libuv convention) Process *proc = (Process *)ptyproc; - assert(proc->err.closed); + assert(proc->err.s.closed); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 }; uv_disable_stdio_inheritance(); @@ -208,8 +208,8 @@ int pty_process_spawn(PtyProcess *ptyproc) && (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) { goto error; } - if (!proc->out.closed - && (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) { + if (!proc->out.s.closed + && (status = set_duplicating_descriptor(master, &proc->out.s.uv.pipe))) { goto error; } diff --git a/src/nvim/os/pty_process_win.c b/src/nvim/os/pty_process_win.c index 12831ff05f..f73baed490 100644 --- a/src/nvim/os/pty_process_win.c +++ b/src/nvim/os/pty_process_win.c @@ -55,7 +55,7 @@ int pty_process_spawn(PtyProcess *ptyproc) wchar_t *env = NULL; const char *emsg = NULL; - assert(proc->err.closed); + assert(proc->err.s.closed); if (!os_has_conpty_working() || (conpty_object = os_conpty_init(&in_name, &out_name, ptyproc->width, @@ -72,10 +72,10 @@ int pty_process_spawn(PtyProcess *ptyproc) pty_process_connect_cb); } - if (!proc->out.closed) { + if (!proc->out.s.closed) { out_req = xmalloc(sizeof(uv_connect_t)); uv_pipe_connect(out_req, - &proc->out.uv.pipe, + &proc->out.s.uv.pipe, out_name, pty_process_connect_cb); } @@ -216,7 +216,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer) Process *proc = (Process *)ptyproc; assert(ptyproc->finish_wait != NULL); - if (proc->out.closed || proc->out.did_eof || !uv_is_readable(proc->out.uvstream)) { + if (proc->out.s.closed || proc->out.did_eof || !uv_is_readable(proc->out.s.uvstream)) { uv_timer_stop(&ptyproc->wait_eof_timer); pty_process_finish2(ptyproc); } diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 2a10510b0f..958faa4d22 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -987,7 +987,7 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired) buf->data = xrealloc(buf->data, buf->cap); } -static void system_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +static void system_data_cb(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof) { DynamicBuffer *dbuf = data; @@ -1151,7 +1151,7 @@ end: ui_flush(); } -static void out_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +static void out_data_cb(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof) { size_t cnt; char *ptr = rbuffer_read_ptr(buf, &cnt); @@ -1331,7 +1331,7 @@ static void shell_write_cb(Stream *stream, void *data, int status) msg_schedule_semsg(_("E5677: Error writing input to shell-command: %s"), uv_err_name(status)); } - stream_close(stream, NULL, NULL); + stream_close(stream, NULL, NULL, false); } /// Applies 'shellxescape' (p_sxe) and 'shellxquote' (p_sxq) to a command. diff --git a/src/nvim/tui/input.c b/src/nvim/tui/input.c index f1594dfcb9..588fed2d90 100644 --- a/src/nvim/tui/input.c +++ b/src/nvim/tui/input.c @@ -167,7 +167,7 @@ void tinput_destroy(TermInput *input) map_destroy(int, &kitty_key_map); rbuffer_free(input->key_buffer); uv_close((uv_handle_t *)&input->timer_handle, NULL); - stream_close(&input->read_stream, NULL, NULL); + rstream_may_close(&input->read_stream); termkey_destroy(input->tk); } @@ -737,7 +737,7 @@ static void handle_raw_buffer(TermInput *input, bool force) } } -static void tinput_read_cb(Stream *stream, RBuffer *buf, size_t count_, void *data, bool eof) +static void tinput_read_cb(RStream *stream, RBuffer *buf, size_t count_, void *data, bool eof) { TermInput *input = data; diff --git a/src/nvim/tui/input.h b/src/nvim/tui/input.h index bf6d0f2978..646fbdd16a 100644 --- a/src/nvim/tui/input.h +++ b/src/nvim/tui/input.h @@ -33,7 +33,7 @@ typedef struct { TermKey_Terminfo_Getstr_Hook *tk_ti_hook_fn; ///< libtermkey terminfo hook uv_timer_t timer_handle; Loop *loop; - Stream read_stream; + RStream read_stream; RBuffer *key_buffer; TUIData *tui_data; } TermInput; |