From c13c50b752dca322a5ec77dea6188c9e3694549b Mon Sep 17 00:00:00 2001 From: bfredl Date: Thu, 30 May 2024 12:59:02 +0200 Subject: refactor(io): separate types for read and write streams This is a structural refactor with no logical changes, yet. Done in preparation for simplifying rstream/rbuffer which will require more state inline in RStream. The initial idea was to have RStream and WStream as sub-types symetrically but that doesn't work, as sockets are both reading and writing. Also there is very little write-specific state to start with, so the benefit of a separate WStream struct is a lot smaller. Just document what fields in `Stream` are write specific. --- src/nvim/event/defs.h | 31 +++++++++++++------- src/nvim/event/libuv_process.c | 8 ++--- src/nvim/event/process.c | 46 +++++++++++++++-------------- src/nvim/event/process.h | 4 +-- src/nvim/event/rstream.c | 66 +++++++++++++++++++++++------------------- src/nvim/event/socket.c | 16 +++++----- src/nvim/event/stream.c | 26 +++++++++-------- src/nvim/event/wstream.c | 7 ++++- 8 files changed, 115 insertions(+), 89 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 9b7d8708be..8563006159 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -55,14 +55,15 @@ struct wbuffer { }; typedef struct stream Stream; -/// Type of function called when the Stream buffer is filled with data +typedef struct rstream RStream; +/// Type of function called when the RStream buffer is filled with data /// /// @param stream The Stream instance /// @param buf The associated RBuffer instance /// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof); +typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -71,11 +72,11 @@ typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void /// @param data User-defined data /// @param status 0 on success, anything else indicates failure typedef void (*stream_write_cb)(Stream *stream, void *data, int status); + typedef void (*stream_close_cb)(Stream *stream, void *data); struct stream { bool closed; - bool did_eof; union { uv_pipe_t pipe; uv_tcp_t tcp; @@ -85,20 +86,27 @@ struct stream { #endif } uv; uv_stream_t *uvstream; - uv_buf_t uvbuf; - RBuffer *buffer; uv_file fd; - stream_read_cb read_cb; - stream_write_cb write_cb; void *cb_data; stream_close_cb close_cb, internal_close_cb; void *close_cb_data, *internal_data; - size_t fpos; + size_t pending_reqs; + MultiQueue *events; + + // only used for writing: + stream_write_cb write_cb; size_t curmem; size_t maxmem; - size_t pending_reqs; +}; + +struct rstream { + Stream s; + bool did_eof; + RBuffer *buffer; + uv_buf_t uvbuf; + stream_read_cb read_cb; size_t num_bytes; - MultiQueue *events; + size_t fpos; }; #define ADDRESS_MAX_SIZE 256 @@ -147,7 +155,8 @@ struct process { char **argv; const char *exepath; dict_T *env; - Stream in, out, err; + Stream in; + RStream out, err; /// Exit handler. If set, user must call process_free(). process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c index f77d686c10..0dead1f9b4 100644 --- a/src/nvim/event/libuv_process.c +++ b/src/nvim/event/libuv_process.c @@ -70,19 +70,19 @@ int libuv_process_spawn(LibuvProcess *uvproc) uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe); } - if (!proc->out.closed) { + if (!proc->out.s.closed) { uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; #ifdef MSWIN // pipe must be readable for IOCP to work on Windows. uvproc->uvstdio[1].flags |= proc->overlapped ? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0; #endif - uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.uv.pipe); + uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe); } - if (!proc->err.closed) { + if (!proc->err.s.closed) { uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.uv.pipe); + uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe); } else if (proc->fwd_err) { uvproc->uvstdio[2].flags = UV_INHERIT_FD; uvproc->uvstdio[2].data.fd = STDERR_FILENO; diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 7460e92766..710376cd62 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -8,7 +8,9 @@ #include "nvim/event/loop.h" #include "nvim/event/multiqueue.h" #include "nvim/event/process.h" +#include "nvim/event/rstream.h" #include "nvim/event/stream.h" +#include "nvim/event/wstream.h" #include "nvim/globals.h" #include "nvim/log.h" #include "nvim/main.h" @@ -51,15 +53,15 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0); } else { - proc->out.closed = true; + proc->out.s.closed = true; } if (err) { - uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0); } else { - proc->err.closed = true; + proc->err.s.closed = true; } #ifdef USE_GCOV @@ -82,10 +84,10 @@ int process_spawn(Process *proc, bool in, bool out, bool err) uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); } if (out) { - uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL); } if (err) { - uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL); } if (proc->type == kProcessTypeUv) { @@ -106,16 +108,16 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - stream_init(NULL, &proc->out, -1, (uv_stream_t *)&proc->out.uv.pipe); - proc->out.internal_data = proc; - proc->out.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe); + proc->out.s.internal_data = proc; + proc->out.s.internal_close_cb = on_process_stream_close; proc->refcount++; } if (err) { - stream_init(NULL, &proc->err, -1, (uv_stream_t *)&proc->err.uv.pipe); - proc->err.internal_data = proc; - proc->err.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe); + proc->err.s.internal_data = proc; + proc->err.s.internal_close_cb = on_process_stream_close; proc->refcount++; } @@ -148,9 +150,9 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL { - stream_may_close(&proc->in); - stream_may_close(&proc->out); - stream_may_close(&proc->err); + wstream_may_close(&proc->in); + rstream_may_close(&proc->out); + rstream_may_close(&proc->err); } /// Synchronously wait for a process to finish @@ -337,10 +339,10 @@ static void process_close(Process *proc) /// /// @param proc Process, for which an output stream should be flushed. /// @param stream Stream to flush. -static void flush_stream(Process *proc, Stream *stream) +static void flush_stream(Process *proc, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { - if (!stream || stream->closed) { + if (!stream || stream->s.closed) { return; } @@ -350,7 +352,7 @@ static void flush_stream(Process *proc, Stream *stream) // keeps sending data, we only accept as much data as the system buffer size. // Otherwise this would block cleanup/teardown. int system_buffer_size = 0; - int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe, + int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, &system_buffer_size); if (err) { system_buffer_size = (int)rbuffer_capacity(stream->buffer); @@ -359,14 +361,14 @@ static void flush_stream(Process *proc, Stream *stream) size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; // Read remaining data. - while (!stream->closed && stream->num_bytes < max_bytes) { + while (!stream->s.closed && stream->num_bytes < max_bytes) { // Remember number of bytes before polling size_t num_bytes = stream->num_bytes; // Poll for data and process the generated events. loop_poll_events(proc->loop, 0); - if (stream->events) { - multiqueue_process_events(stream->events); + if (stream->s.events) { + multiqueue_process_events(stream->s.events); } // Stream can be closed if it is empty. @@ -374,7 +376,7 @@ static void flush_stream(Process *proc, Stream *stream) if (stream->read_cb && !stream->did_eof) { // Stream callback could miss EOF handling if a child keeps the stream // open. But only send EOF if we haven't already. - stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); + stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true); } break; } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 421a470244..74b52cbbb1 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -21,8 +21,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .argv = NULL, .exepath = NULL, .in = { .closed = false }, - .out = { .closed = false }, - .err = { .closed = false }, + .out = { .s.closed = false }, + .err = { .s.closed = false }, .cb = NULL, .closed = false, .internal_close_cb = NULL, diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 6b4ab472e4..6c7fa20bd8 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -19,23 +19,26 @@ # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) +void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1, 2) { - stream_init(loop, stream, fd, NULL); + stream_init(loop, &stream->s, fd, NULL); rstream_init(stream, bufsize); } -void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) +void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1, 2) { - stream_init(NULL, stream, -1, uvstream); + stream_init(NULL, &stream->s, -1, uvstream); rstream_init(stream, bufsize); } -void rstream_init(Stream *stream, size_t bufsize) +void rstream_init(RStream *stream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) { + stream->fpos = 0; + stream->read_cb = NULL; + stream->num_bytes = 0; stream->buffer = rbuffer_new(bufsize); stream->buffer->data = stream; stream->buffer->full_cb = on_rbuffer_full; @@ -45,28 +48,28 @@ void rstream_init(Stream *stream, size_t bufsize) /// Starts watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_start(Stream *stream, stream_read_cb cb, void *data) +void rstream_start(RStream *stream, stream_read_cb cb, void *data) FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; - stream->cb_data = data; - if (stream->uvstream) { - uv_read_start(stream->uvstream, alloc_cb, read_cb); + stream->s.cb_data = data; + if (stream->s.uvstream) { + uv_read_start(stream->s.uvstream, alloc_cb, read_cb); } else { - uv_idle_start(&stream->uv.idle, fread_idle_cb); + uv_idle_start(&stream->s.uv.idle, fread_idle_cb); } } /// Stops watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_stop(Stream *stream) +void rstream_stop(RStream *stream) FUNC_ATTR_NONNULL_ALL { - if (stream->uvstream) { - uv_read_stop(stream->uvstream); + if (stream->s.uvstream) { + uv_read_stop(stream->s.uvstream); } else { - uv_idle_stop(&stream->uv.idle); + uv_idle_stop(&stream->s.uv.idle); } } @@ -77,9 +80,9 @@ static void on_rbuffer_full(RBuffer *buf, void *data) static void on_rbuffer_nonfull(RBuffer *buf, void *data) { - Stream *stream = data; + RStream *stream = data; assert(stream->read_cb); - rstream_start(stream, stream->read_cb, stream->cb_data); + rstream_start(stream, stream->read_cb, stream->s.cb_data); } // Callbacks used by libuv @@ -87,7 +90,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) /// Called by libuv to allocate memory for reading. static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { - Stream *stream = handle->data; + RStream *stream = handle->data; // `uv_buf_t.len` happens to have different size on Windows. size_t write_count; buf->base = rbuffer_write_ptr(stream->buffer, &write_count); @@ -99,7 +102,7 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) /// 0-length buffer. static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) { - Stream *stream = uvstream->data; + RStream *stream = uvstream->data; if (cnt <= 0) { // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: @@ -141,7 +144,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) static void fread_idle_cb(uv_idle_t *handle) { uv_fs_t req; - Stream *stream = handle->data; + RStream *stream = handle->data; // `uv_buf_t.len` happens to have different size on Windows. size_t write_count; @@ -160,7 +163,7 @@ static void fread_idle_cb(uv_idle_t *handle) // Synchronous read uv_fs_read(handle->loop, &req, - stream->fd, + stream->s.fd, &stream->uvbuf, 1, (int64_t)stream->fpos, @@ -169,7 +172,7 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_req_cleanup(&req); if (req.result <= 0) { - uv_idle_stop(&stream->uv.idle); + uv_idle_stop(&stream->s.uv.idle); invoke_read_cb(stream, 0, true); return; } @@ -183,24 +186,29 @@ static void fread_idle_cb(uv_idle_t *handle) static void read_event(void **argv) { - Stream *stream = argv[0]; + RStream *stream = argv[0]; if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; stream->did_eof = eof; - stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); + stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof); } - stream->pending_reqs--; - if (stream->closed && !stream->pending_reqs) { - stream_close_handle(stream); + stream->s.pending_reqs--; + if (stream->s.closed && !stream->s.pending_reqs) { + stream_close_handle(&stream->s, true); } } -static void invoke_read_cb(Stream *stream, size_t count, bool eof) +static void invoke_read_cb(RStream *stream, size_t count, bool eof) { // Don't let the stream be closed before the event is processed. - stream->pending_reqs++; + stream->s.pending_reqs++; - CREATE_EVENT(stream->events, read_event, + CREATE_EVENT(stream->s.events, read_event, stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); } + +void rstream_may_close(RStream *stream) +{ + stream_may_close(&stream->s, true); +} diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 4e878a2ecf..017f159fa1 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -135,17 +135,17 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) return 0; } -int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) +int socket_watcher_accept(SocketWatcher *watcher, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; if (watcher->stream->type == UV_TCP) { - client = (uv_stream_t *)(&stream->uv.tcp); + client = (uv_stream_t *)(&stream->s.uv.tcp); uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); uv_tcp_nodelay((uv_tcp_t *)client, true); } else { - client = (uv_stream_t *)&stream->uv.pipe; + client = (uv_stream_t *)&stream->s.uv.pipe; uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); } @@ -156,7 +156,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) return result; } - stream_init(NULL, stream, -1, client); + stream_init(NULL, &stream->s, -1, client); return 0; } @@ -197,7 +197,7 @@ static void connect_cb(uv_connect_t *req, int status) } } -bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address, int timeout, +bool socket_connect(Loop *loop, RStream *stream, bool is_tcp, const char *address, int timeout, const char **error) { bool success = false; @@ -206,7 +206,7 @@ bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address req.data = &status; uv_stream_t *uv_stream; - uv_tcp_t *tcp = &stream->uv.tcp; + uv_tcp_t *tcp = &stream->s.uv.tcp; uv_getaddrinfo_t addr_req; addr_req.addrinfo = NULL; const struct addrinfo *addrinfo = NULL; @@ -237,7 +237,7 @@ tcp_retry: uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb); uv_stream = (uv_stream_t *)tcp; } else { - uv_pipe_t *pipe = &stream->uv.pipe; + uv_pipe_t *pipe = &stream->s.uv.pipe; uv_pipe_init(&loop->uv, pipe, 0); uv_pipe_connect(&req, pipe, address, connect_cb); uv_stream = (uv_stream_t *)pipe; @@ -245,7 +245,7 @@ tcp_retry: status = 1; LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1); if (status == 0) { - stream_init(NULL, stream, -1, uv_stream); + stream_init(NULL, &stream->s, -1, uv_stream); success = true; } else if (is_tcp && addrinfo->ai_next) { addrinfo = addrinfo->ai_next; diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 0b9ed4f25b..3d26dd868f 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -85,21 +85,17 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) } stream->internal_data = NULL; - stream->fpos = 0; stream->curmem = 0; stream->maxmem = 0; stream->pending_reqs = 0; - stream->read_cb = NULL; stream->write_cb = NULL; stream->close_cb = NULL; stream->internal_close_cb = NULL; stream->closed = false; - stream->buffer = NULL; stream->events = NULL; - stream->num_bytes = 0; } -void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) +void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data, bool rstream) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); @@ -116,18 +112,18 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) #endif if (!stream->pending_reqs) { - stream_close_handle(stream); + stream_close_handle(stream, rstream); } } -void stream_may_close(Stream *stream) +void stream_may_close(Stream *stream, bool rstream) { if (!stream->closed) { - stream_close(stream, NULL, NULL); + stream_close(stream, NULL, NULL, rstream); } } -void stream_close_handle(Stream *stream) +void stream_close_handle(Stream *stream, bool rstream) FUNC_ATTR_NONNULL_ALL { uv_handle_t *handle = NULL; @@ -145,16 +141,22 @@ void stream_close_handle(Stream *stream) assert(handle != NULL); if (!uv_is_closing(handle)) { - uv_close(handle, close_cb); + uv_close(handle, rstream ? rstream_close_cb : close_cb); } } -static void close_cb(uv_handle_t *handle) +static void rstream_close_cb(uv_handle_t *handle) { - Stream *stream = handle->data; + RStream *stream = handle->data; if (stream->buffer) { rbuffer_free(stream->buffer); } + close_cb(handle); +} + +static void close_cb(uv_handle_t *handle) +{ + Stream *stream = handle->data; if (stream->close_cb) { stream->close_cb(stream, stream->close_cb_data); } diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index c67a9b96ed..07aab87e4d 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -141,7 +141,7 @@ static void write_cb(uv_write_t *req, int status) if (data->stream->closed && data->stream->pending_reqs == 0) { // Last pending write, free the stream; - stream_close_handle(data->stream); + stream_close_handle(data->stream, false); } xfree(data); @@ -158,3 +158,8 @@ void wstream_release_wbuffer(WBuffer *buffer) xfree(buffer); } } + +void wstream_may_close(Stream *stream) +{ + stream_may_close(stream, false); +} -- cgit From 200e7ad1578619e78c664bd0c6be024168433412 Mon Sep 17 00:00:00 2001 From: James Tirta Halim Date: Mon, 3 Jun 2024 11:10:30 +0700 Subject: fixup: apply the change on more files --- src/nvim/event/socket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 017f159fa1..1214c3e336 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -35,7 +35,7 @@ int socket_watcher_init(Loop *loop, SocketWatcher *watcher, const char *endpoint if (host_end && addr != host_end) { // Split user specified address into two strings, addr(hostname) and port. // The port part in watcher->addr will be updated later. - *host_end = '\0'; + *host_end = NUL; char *port = host_end + 1; intmax_t iport; -- cgit From 78d21593a35cf89692224f1000a04d3c9fff8add Mon Sep 17 00:00:00 2001 From: bfredl Date: Fri, 31 May 2024 14:40:53 +0200 Subject: refactor(io): make rstream use a linear buffer If you like it you shouldn't put a ring on it. This is what _every_ consumer of RStream used anyway, either by calling rbuffer_reset, or rbuffer_consumed_compact (same as rbuffer_reset without needing a scratch buffer), or by consuming everything in each stream_read_cb call directly. --- src/nvim/event/defs.h | 16 +++-- src/nvim/event/process.c | 3 +- src/nvim/event/rstream.c | 163 +++++++++++++++++++++++++++-------------------- src/nvim/event/stream.c | 3 +- 4 files changed, 107 insertions(+), 78 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 8563006159..41690ead88 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -6,7 +6,6 @@ #include #include "nvim/eval/typval_defs.h" -#include "nvim/rbuffer_defs.h" #include "nvim/types_defs.h" enum { EVENT_HANDLER_MAX_ARGC = 10, }; @@ -59,11 +58,13 @@ typedef struct rstream RStream; /// Type of function called when the RStream buffer is filled with data /// /// @param stream The Stream instance -/// @param buf The associated RBuffer instance +/// @param read_data data that was read /// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof); +/// @return number of bytes which were consumed +typedef size_t (*stream_read_cb)(RStream *stream, const char *read_data, size_t count, void *data, + bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -102,11 +103,16 @@ struct stream { struct rstream { Stream s; bool did_eof; - RBuffer *buffer; + bool want_read; + bool pending_read; + bool paused_full; + char *buffer; // ARENA_BLOCK_SIZE + char *read_pos; + char *write_pos; uv_buf_t uvbuf; stream_read_cb read_cb; size_t num_bytes; - size_t fpos; + int64_t fpos; }; #define ADDRESS_MAX_SIZE 256 diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 710376cd62..70fc31ba21 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -18,7 +18,6 @@ #include "nvim/os/pty_process.h" #include "nvim/os/shell.h" #include "nvim/os/time.h" -#include "nvim/rbuffer_defs.h" #include "nvim/ui_client.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -355,7 +354,7 @@ static void flush_stream(Process *proc, RStream *stream) int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, &system_buffer_size); if (err) { - system_buffer_size = (int)rbuffer_capacity(stream->buffer); + system_buffer_size = ARENA_BLOCK_SIZE; } size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 6c7fa20bd8..71290d0c0d 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -11,38 +11,44 @@ #include "nvim/macros_defs.h" #include "nvim/main.h" #include "nvim/os/os_defs.h" -#include "nvim/rbuffer.h" -#include "nvim/rbuffer_defs.h" #include "nvim/types_defs.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize) +void rstream_init_fd(Loop *loop, RStream *stream, int fd) FUNC_ATTR_NONNULL_ARG(1, 2) { stream_init(loop, &stream->s, fd, NULL); - rstream_init(stream, bufsize); + rstream_init(stream); } -void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize) +void rstream_init_stream(RStream *stream, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(1, 2) { stream_init(NULL, &stream->s, -1, uvstream); - rstream_init(stream, bufsize); + rstream_init(stream); } -void rstream_init(RStream *stream, size_t bufsize) +void rstream_init(RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { stream->fpos = 0; stream->read_cb = NULL; stream->num_bytes = 0; - stream->buffer = rbuffer_new(bufsize); - stream->buffer->data = stream; - stream->buffer->full_cb = on_rbuffer_full; - stream->buffer->nonfull_cb = on_rbuffer_nonfull; + stream->buffer = alloc_block(); + stream->read_pos = stream->write_pos = stream->buffer; +} + +void rstream_start_inner(RStream *stream) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (stream->s.uvstream) { + uv_read_start(stream->s.uvstream, alloc_cb, read_cb); + } else { + uv_idle_start(&stream->s.uv.idle, fread_idle_cb); + } } /// Starts watching for events from a `Stream` instance. @@ -53,17 +59,16 @@ void rstream_start(RStream *stream, stream_read_cb cb, void *data) { stream->read_cb = cb; stream->s.cb_data = data; - if (stream->s.uvstream) { - uv_read_start(stream->s.uvstream, alloc_cb, read_cb); - } else { - uv_idle_start(&stream->s.uv.idle, fread_idle_cb); + stream->want_read = true; + if (!stream->paused_full) { + rstream_start_inner(stream); } } /// Stops watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_stop(RStream *stream) +void rstream_stop_inner(RStream *stream) FUNC_ATTR_NONNULL_ALL { if (stream->s.uvstream) { @@ -73,16 +78,14 @@ void rstream_stop(RStream *stream) } } -static void on_rbuffer_full(RBuffer *buf, void *data) -{ - rstream_stop(data); -} - -static void on_rbuffer_nonfull(RBuffer *buf, void *data) +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(RStream *stream) + FUNC_ATTR_NONNULL_ALL { - RStream *stream = data; - assert(stream->read_cb); - rstream_start(stream, stream->read_cb, stream->s.cb_data); + rstream_stop_inner(stream); + stream->want_read = false; } // Callbacks used by libuv @@ -91,10 +94,9 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { RStream *stream = handle->data; - // `uv_buf_t.len` happens to have different size on Windows. - size_t write_count; - buf->base = rbuffer_write_ptr(stream->buffer, &write_count); - buf->len = UV_BUF_LEN(write_count); + buf->base = stream->write_pos; + // `uv_buf_t.len` happens to have different size on Windows (as a treat) + buf->len = UV_BUF_LEN(rstream_space(stream)); } /// Callback invoked by libuv after it copies the data into the buffer provided @@ -108,21 +110,21 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. // - // We don't need to do anything with the RBuffer because the next call + // We don't need to do anything with the buffer because the next call // to `alloc_cb` will return the same unused pointer (`rbuffer_produced` // won't be called) if (cnt == UV_ENOBUFS || cnt == 0) { return; } else if (cnt == UV_EOF && uvstream->type == UV_TTY) { // The TTY driver might signal EOF without closing the stream - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); } else { DLOG("closing Stream (%p): %s (%s)", (void *)stream, uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); } return; } @@ -130,10 +132,13 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // at this point we're sure that cnt is positive, no error occurred size_t nread = (size_t)cnt; stream->num_bytes += nread; - // Data was already written, so all we need is to update 'wpos' to reflect - // the space actually used in the buffer. - rbuffer_produced(stream->buffer, nread); - invoke_read_cb(stream, nread, false); + stream->write_pos += cnt; + invoke_read_cb(stream, false); +} + +static size_t rstream_space(RStream *stream) +{ + return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos); } /// Called by the by the 'idle' handle to emulate a reading event @@ -146,52 +151,37 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_t req; RStream *stream = handle->data; + stream->uvbuf.base = stream->write_pos; // `uv_buf_t.len` happens to have different size on Windows. - size_t write_count; - stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count); - stream->uvbuf.len = UV_BUF_LEN(write_count); - - // the offset argument to uv_fs_read is int64_t, could someone really try - // to read more than 9 quintillion (9e18) bytes? - // upcast is meant to avoid tautological condition warning on 32 bits - uintmax_t fpos_intmax = stream->fpos; - if (fpos_intmax > INT64_MAX) { - ELOG("stream offset overflow"); - preserve_exit("stream offset overflow"); - } + stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream)); // Synchronous read - uv_fs_read(handle->loop, - &req, - stream->s.fd, - &stream->uvbuf, - 1, - (int64_t)stream->fpos, - NULL); + uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->fpos, NULL); uv_fs_req_cleanup(&req); if (req.result <= 0) { uv_idle_stop(&stream->s.uv.idle); - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); return; } - // no errors (req.result (ssize_t) is positive), it's safe to cast. - size_t nread = (size_t)req.result; - rbuffer_produced(stream->buffer, nread); - stream->fpos += nread; - invoke_read_cb(stream, nread, false); + // no errors (req.result (ssize_t) is positive), it's safe to use. + stream->write_pos += req.result; + stream->fpos += req.result; + invoke_read_cb(stream, false); } static void read_event(void **argv) { RStream *stream = argv[0]; + stream->pending_read = false; if (stream->read_cb) { - size_t count = (uintptr_t)argv[1]; - bool eof = (uintptr_t)argv[2]; - stream->did_eof = eof; - stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof); + size_t available = rstream_available(stream); + size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data, + stream->did_eof); + assert(consumed <= available); + rstream_consume(stream, consumed); } stream->s.pending_reqs--; if (stream->s.closed && !stream->s.pending_reqs) { @@ -199,13 +189,48 @@ static void read_event(void **argv) } } -static void invoke_read_cb(RStream *stream, size_t count, bool eof) +size_t rstream_available(RStream *stream) { + return (size_t)(stream->write_pos - stream->read_pos); +} + +void rstream_consume(RStream *stream, size_t consumed) +{ + stream->read_pos += consumed; + size_t remaining = (size_t)(stream->write_pos - stream->read_pos); + if (remaining > 0 && stream->read_pos > stream->buffer) { + memmove(stream->buffer, stream->read_pos, remaining); + stream->read_pos = stream->buffer; + stream->write_pos = stream->buffer + remaining; + } else if (remaining == 0) { + stream->read_pos = stream->write_pos = stream->buffer; + } + + if (stream->want_read && stream->paused_full && rstream_space(stream)) { + assert(stream->read_cb); + stream->paused_full = false; + rstream_start_inner(stream); + } +} + +static void invoke_read_cb(RStream *stream, bool eof) +{ + stream->did_eof |= eof; + + if (!rstream_space(stream)) { + rstream_stop_inner(stream); + stream->paused_full = true; + } + + // we cannot use pending_reqs as a socket can have both pending reads and writes + if (stream->pending_read) { + return; + } + // Don't let the stream be closed before the event is processed. stream->s.pending_reqs++; - - CREATE_EVENT(stream->s.events, read_event, - stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); + stream->pending_read = true; + CREATE_EVENT(stream->s.events, read_event, stream); } void rstream_may_close(RStream *stream) diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 3d26dd868f..bc1b503f4c 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -8,7 +8,6 @@ #include "nvim/event/loop.h" #include "nvim/event/stream.h" #include "nvim/log.h" -#include "nvim/rbuffer.h" #include "nvim/types_defs.h" #ifdef MSWIN # include "nvim/os/os_win_console.h" @@ -149,7 +148,7 @@ static void rstream_close_cb(uv_handle_t *handle) { RStream *stream = handle->data; if (stream->buffer) { - rbuffer_free(stream->buffer); + free_block(stream->buffer); } close_cb(handle); } -- cgit From 5d7853f22903a4f42d52f565f6a662c3ef178a8c Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Tue, 10 Sep 2024 01:14:18 -0700 Subject: refactor(os/input.c): rename os_inchar => input_get #30327 Problem: The name `os_inchar` (from Vim's old `mch_inchar`) is ambiguous: "inchar" sounds like it could be reading or enqueuing (setting) input. Its docstring is also ambiguous. Solution: - Rename `os_inchar` to `input_get`. - Write some mf'ing docstrings. - Add assert() in TRY_READ(). --- src/nvim/event/loop.h | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 6ecc7cb781..da4852b836 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -16,15 +16,14 @@ struct loop { uv_loop_t uv; MultiQueue *events; MultiQueue *thread_events; - // Immediate events: - // "Processed after exiting uv_run() (to avoid recursion), but before - // returning from loop_poll_events()." 502aee690c98 - // Practical consequence (for main_loop): these events are processed by - // state_enter()..os_inchar() - // whereas "regular" events (main_loop.events) are processed by - // state_enter()..VimState.execute() - // But state_enter()..os_inchar() can be "too early" if you want the event - // to trigger UI updates and other user-activity-related side-effects. + // Immediate events. + // - "Processed after exiting `uv_run()` (to avoid recursion), but before returning from + // `loop_poll_events()`." 502aee690c98 + // - Practical consequence (for `main_loop`): + // - these are processed by `state_enter()..input_get()` whereas "regular" events + // (`main_loop.events`) are processed by `state_enter()..VimState.execute()` + // - `state_enter()..input_get()` can be "too early" if you want the event to trigger UI + // updates and other user-activity-related side-effects. MultiQueue *fast_events; // used by process/job-control subsystem -- cgit From deac7df80a1491ae65b68a1a1047902bcd775adc Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Thu, 12 Sep 2024 09:16:57 -0700 Subject: refactor(stream.c): unused params in stream_close #30356 --- src/nvim/event/stream.c | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index bc1b503f4c..d9c44e06be 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -94,14 +94,17 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) stream->events = NULL; } -void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data, bool rstream) +void stream_may_close(Stream *stream, bool rstream) FUNC_ATTR_NONNULL_ARG(1) { + if (stream->closed) { + return; + } assert(!stream->closed); DLOG("closing Stream: %p", (void *)stream); stream->closed = true; - stream->close_cb = on_stream_close; - stream->close_cb_data = data; + stream->close_cb = NULL; + stream->close_cb_data = NULL; #ifdef MSWIN if (UV_TTY == uv_guess_handle(stream->fd)) { @@ -115,13 +118,6 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data, b } } -void stream_may_close(Stream *stream, bool rstream) -{ - if (!stream->closed) { - stream_close(stream, NULL, NULL, rstream); - } -} - void stream_close_handle(Stream *stream, bool rstream) FUNC_ATTR_NONNULL_ALL { -- cgit From 057d27a9d6ef0bb2ee5130704c45b9e9197e7c36 Mon Sep 17 00:00:00 2001 From: "Justin M. Keyes" Date: Sun, 15 Sep 2024 12:20:58 -0700 Subject: refactor: rename "process" => "proc" #30387 Problem: - "process" is often used as a verb (`multiqueue_process_events`), which is ambiguous for cases where it's used as a topic. - The documented naming convention for processes is "proc". - `:help dev-name-common` - Shorter is better, when it doesn't harm readability or discoverability. Solution: Rename "process" => "proc" in all C symbols and module names. --- src/nvim/event/defs.h | 25 +-- src/nvim/event/libuv_proc.c | 139 +++++++++++++ src/nvim/event/libuv_proc.h | 16 ++ src/nvim/event/libuv_process.c | 139 ------------- src/nvim/event/libuv_process.h | 16 -- src/nvim/event/proc.c | 451 +++++++++++++++++++++++++++++++++++++++++ src/nvim/event/proc.h | 49 +++++ src/nvim/event/process.c | 451 ----------------------------------------- src/nvim/event/process.h | 49 ----- 9 files changed, 668 insertions(+), 667 deletions(-) create mode 100644 src/nvim/event/libuv_proc.c create mode 100644 src/nvim/event/libuv_proc.h delete mode 100644 src/nvim/event/libuv_process.c delete mode 100644 src/nvim/event/libuv_process.h create mode 100644 src/nvim/event/proc.c create mode 100644 src/nvim/event/proc.h delete mode 100644 src/nvim/event/process.c delete mode 100644 src/nvim/event/process.h (limited to 'src/nvim/event') diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 41690ead88..33e2f9cecb 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -142,30 +142,31 @@ struct socket_watcher { }; typedef enum { - kProcessTypeUv, - kProcessTypePty, -} ProcessType; + kProcTypeUv, + kProcTypePty, +} ProcType; -typedef struct process Process; -typedef void (*process_exit_cb)(Process *proc, int status, void *data); -typedef void (*internal_process_cb)(Process *proc); +/// OS process +typedef struct proc Proc; +typedef void (*proc_exit_cb)(Proc *proc, int status, void *data); +typedef void (*internal_proc_cb)(Proc *proc); -struct process { - ProcessType type; +struct proc { + ProcType type; Loop *loop; void *data; int pid, status, refcount; uint8_t exit_signal; // Signal used when killing (on Windows). - uint64_t stopped_time; // process_stop() timestamp + uint64_t stopped_time; // proc_stop() timestamp const char *cwd; char **argv; const char *exepath; dict_T *env; Stream in; RStream out, err; - /// Exit handler. If set, user must call process_free(). - process_exit_cb cb; - internal_process_cb internal_exit_cb, internal_close_cb; + /// Exit handler. If set, user must call proc_free(). + proc_exit_cb cb; + internal_proc_cb internal_exit_cb, internal_close_cb; bool closed, detach, overlapped, fwd_err; MultiQueue *events; }; diff --git a/src/nvim/event/libuv_proc.c b/src/nvim/event/libuv_proc.c new file mode 100644 index 0000000000..5b445cdda7 --- /dev/null +++ b/src/nvim/event/libuv_proc.c @@ -0,0 +1,139 @@ +#include +#include +#include +#include + +#include "nvim/eval/typval.h" +#include "nvim/event/defs.h" +#include "nvim/event/libuv_proc.h" +#include "nvim/event/loop.h" +#include "nvim/event/proc.h" +#include "nvim/log.h" +#include "nvim/os/os.h" +#include "nvim/os/os_defs.h" +#include "nvim/types_defs.h" +#include "nvim/ui_client.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/libuv_proc.c.generated.h" +#endif + +/// @returns zero on success, or negative error code +int libuv_proc_spawn(LibuvProc *uvproc) + FUNC_ATTR_NONNULL_ALL +{ + Proc *proc = (Proc *)uvproc; + uvproc->uvopts.file = proc_get_exepath(proc); + uvproc->uvopts.args = proc->argv; + uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; +#ifdef MSWIN + // libuv collapses the argv to a CommandLineToArgvW()-style string. cmd.exe + // expects a different syntax (must be prepared by the caller before now). + if (os_shell_is_cmdexe(proc->argv[0])) { + uvproc->uvopts.flags |= UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS; + } + if (proc->detach) { + uvproc->uvopts.flags |= UV_PROCESS_DETACHED; + } +#else + // Always setsid() on unix-likes. #8107 + uvproc->uvopts.flags |= UV_PROCESS_DETACHED; +#endif + uvproc->uvopts.exit_cb = exit_cb; + uvproc->uvopts.cwd = proc->cwd; + + uvproc->uvopts.stdio = uvproc->uvstdio; + uvproc->uvopts.stdio_count = 3; + uvproc->uvstdio[0].flags = UV_IGNORE; + uvproc->uvstdio[1].flags = UV_IGNORE; + uvproc->uvstdio[2].flags = UV_IGNORE; + + if (ui_client_forward_stdin) { + assert(UI_CLIENT_STDIN_FD == 3); + uvproc->uvopts.stdio_count = 4; + uvproc->uvstdio[3].data.fd = 0; + uvproc->uvstdio[3].flags = UV_INHERIT_FD; + } + uvproc->uv.data = proc; + + if (proc->env) { + uvproc->uvopts.env = tv_dict_to_env(proc->env); + } else { + uvproc->uvopts.env = NULL; + } + + if (!proc->in.closed) { + uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; +#ifdef MSWIN + uvproc->uvstdio[0].flags |= proc->overlapped ? UV_OVERLAPPED_PIPE : 0; +#endif + uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe); + } + + if (!proc->out.s.closed) { + uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; +#ifdef MSWIN + // pipe must be readable for IOCP to work on Windows. + uvproc->uvstdio[1].flags |= proc->overlapped + ? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0; +#endif + uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe); + } + + if (!proc->err.s.closed) { + uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe); + } else if (proc->fwd_err) { + uvproc->uvstdio[2].flags = UV_INHERIT_FD; + uvproc->uvstdio[2].data.fd = STDERR_FILENO; + } + + int status; + if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) { + ILOG("uv_spawn(%s) failed: %s", uvproc->uvopts.file, uv_strerror(status)); + if (uvproc->uvopts.env) { + os_free_fullenv(uvproc->uvopts.env); + } + return status; + } + + proc->pid = uvproc->uv.pid; + return status; +} + +void libuv_proc_close(LibuvProc *uvproc) + FUNC_ATTR_NONNULL_ARG(1) +{ + uv_close((uv_handle_t *)&uvproc->uv, close_cb); +} + +static void close_cb(uv_handle_t *handle) +{ + Proc *proc = handle->data; + if (proc->internal_close_cb) { + proc->internal_close_cb(proc); + } + LibuvProc *uvproc = (LibuvProc *)proc; + if (uvproc->uvopts.env) { + os_free_fullenv(uvproc->uvopts.env); + } +} + +static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) +{ + Proc *proc = handle->data; +#if defined(MSWIN) + // Use stored/expected signal. + term_signal = proc->exit_signal; +#endif + proc->status = term_signal ? 128 + term_signal : (int)status; + proc->internal_exit_cb(proc); +} + +LibuvProc libuv_proc_init(Loop *loop, void *data) +{ + LibuvProc rv = { + .proc = proc_init(loop, kProcTypeUv, data) + }; + return rv; +} diff --git a/src/nvim/event/libuv_proc.h b/src/nvim/event/libuv_proc.h new file mode 100644 index 0000000000..3127e166c0 --- /dev/null +++ b/src/nvim/event/libuv_proc.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +#include "nvim/event/defs.h" + +typedef struct { + Proc proc; + uv_process_t uv; + uv_process_options_t uvopts; + uv_stdio_container_t uvstdio[4]; +} LibuvProc; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/libuv_proc.h.generated.h" +#endif diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c deleted file mode 100644 index 0dead1f9b4..0000000000 --- a/src/nvim/event/libuv_process.c +++ /dev/null @@ -1,139 +0,0 @@ -#include -#include -#include -#include - -#include "nvim/eval/typval.h" -#include "nvim/event/defs.h" -#include "nvim/event/libuv_process.h" -#include "nvim/event/loop.h" -#include "nvim/event/process.h" -#include "nvim/log.h" -#include "nvim/os/os.h" -#include "nvim/os/os_defs.h" -#include "nvim/types_defs.h" -#include "nvim/ui_client.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/libuv_process.c.generated.h" -#endif - -/// @returns zero on success, or negative error code -int libuv_process_spawn(LibuvProcess *uvproc) - FUNC_ATTR_NONNULL_ALL -{ - Process *proc = (Process *)uvproc; - uvproc->uvopts.file = process_get_exepath(proc); - uvproc->uvopts.args = proc->argv; - uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; -#ifdef MSWIN - // libuv collapses the argv to a CommandLineToArgvW()-style string. cmd.exe - // expects a different syntax (must be prepared by the caller before now). - if (os_shell_is_cmdexe(proc->argv[0])) { - uvproc->uvopts.flags |= UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS; - } - if (proc->detach) { - uvproc->uvopts.flags |= UV_PROCESS_DETACHED; - } -#else - // Always setsid() on unix-likes. #8107 - uvproc->uvopts.flags |= UV_PROCESS_DETACHED; -#endif - uvproc->uvopts.exit_cb = exit_cb; - uvproc->uvopts.cwd = proc->cwd; - - uvproc->uvopts.stdio = uvproc->uvstdio; - uvproc->uvopts.stdio_count = 3; - uvproc->uvstdio[0].flags = UV_IGNORE; - uvproc->uvstdio[1].flags = UV_IGNORE; - uvproc->uvstdio[2].flags = UV_IGNORE; - - if (ui_client_forward_stdin) { - assert(UI_CLIENT_STDIN_FD == 3); - uvproc->uvopts.stdio_count = 4; - uvproc->uvstdio[3].data.fd = 0; - uvproc->uvstdio[3].flags = UV_INHERIT_FD; - } - uvproc->uv.data = proc; - - if (proc->env) { - uvproc->uvopts.env = tv_dict_to_env(proc->env); - } else { - uvproc->uvopts.env = NULL; - } - - if (!proc->in.closed) { - uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; -#ifdef MSWIN - uvproc->uvstdio[0].flags |= proc->overlapped ? UV_OVERLAPPED_PIPE : 0; -#endif - uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe); - } - - if (!proc->out.s.closed) { - uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; -#ifdef MSWIN - // pipe must be readable for IOCP to work on Windows. - uvproc->uvstdio[1].flags |= proc->overlapped - ? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0; -#endif - uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe); - } - - if (!proc->err.s.closed) { - uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe); - } else if (proc->fwd_err) { - uvproc->uvstdio[2].flags = UV_INHERIT_FD; - uvproc->uvstdio[2].data.fd = STDERR_FILENO; - } - - int status; - if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) { - ILOG("uv_spawn(%s) failed: %s", uvproc->uvopts.file, uv_strerror(status)); - if (uvproc->uvopts.env) { - os_free_fullenv(uvproc->uvopts.env); - } - return status; - } - - proc->pid = uvproc->uv.pid; - return status; -} - -void libuv_process_close(LibuvProcess *uvproc) - FUNC_ATTR_NONNULL_ARG(1) -{ - uv_close((uv_handle_t *)&uvproc->uv, close_cb); -} - -static void close_cb(uv_handle_t *handle) -{ - Process *proc = handle->data; - if (proc->internal_close_cb) { - proc->internal_close_cb(proc); - } - LibuvProcess *uvproc = (LibuvProcess *)proc; - if (uvproc->uvopts.env) { - os_free_fullenv(uvproc->uvopts.env); - } -} - -static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) -{ - Process *proc = handle->data; -#if defined(MSWIN) - // Use stored/expected signal. - term_signal = proc->exit_signal; -#endif - proc->status = term_signal ? 128 + term_signal : (int)status; - proc->internal_exit_cb(proc); -} - -LibuvProcess libuv_process_init(Loop *loop, void *data) -{ - LibuvProcess rv = { - .process = process_init(loop, kProcessTypeUv, data) - }; - return rv; -} diff --git a/src/nvim/event/libuv_process.h b/src/nvim/event/libuv_process.h deleted file mode 100644 index 12401dbb35..0000000000 --- a/src/nvim/event/libuv_process.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once - -#include - -#include "nvim/event/defs.h" - -typedef struct { - Process process; - uv_process_t uv; - uv_process_options_t uvopts; - uv_stdio_container_t uvstdio[4]; -} LibuvProcess; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/libuv_process.h.generated.h" -#endif diff --git a/src/nvim/event/proc.c b/src/nvim/event/proc.c new file mode 100644 index 0000000000..808bf794f0 --- /dev/null +++ b/src/nvim/event/proc.c @@ -0,0 +1,451 @@ +#include +#include +#include +#include + +#include "klib/klist.h" +#include "nvim/event/libuv_proc.h" +#include "nvim/event/loop.h" +#include "nvim/event/multiqueue.h" +#include "nvim/event/proc.h" +#include "nvim/event/rstream.h" +#include "nvim/event/stream.h" +#include "nvim/event/wstream.h" +#include "nvim/globals.h" +#include "nvim/log.h" +#include "nvim/main.h" +#include "nvim/os/proc.h" +#include "nvim/os/pty_proc.h" +#include "nvim/os/shell.h" +#include "nvim/os/time.h" +#include "nvim/ui_client.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/proc.c.generated.h" +#endif + +// Time for a process to exit cleanly before we send KILL. +// For PTY processes SIGTERM is sent first (in case SIGHUP was not enough). +#define KILL_TIMEOUT_MS 2000 + +/// Externally defined with gcov. +#ifdef USE_GCOV +void __gcov_flush(void); +#endif + +static bool proc_is_tearing_down = false; + +// Delay exit until handles are closed, to avoid deadlocks +static int exit_need_delay = 0; + +/// @returns zero on success, or negative error code +int proc_spawn(Proc *proc, bool in, bool out, bool err) + FUNC_ATTR_NONNULL_ALL +{ + // forwarding stderr contradicts with processing it internally + assert(!(err && proc->fwd_err)); + + if (in) { + uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0); + } else { + proc->in.closed = true; + } + + if (out) { + uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0); + } else { + proc->out.s.closed = true; + } + + if (err) { + uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0); + } else { + proc->err.s.closed = true; + } + +#ifdef USE_GCOV + // Flush coverage data before forking, to avoid "Merge mismatch" errors. + __gcov_flush(); +#endif + + int status; + switch (proc->type) { + case kProcTypeUv: + status = libuv_proc_spawn((LibuvProc *)proc); + break; + case kProcTypePty: + status = pty_proc_spawn((PtyProc *)proc); + break; + } + + if (status) { + if (in) { + uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); + } + if (out) { + uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL); + } + if (err) { + uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL); + } + + if (proc->type == kProcTypeUv) { + uv_close((uv_handle_t *)&(((LibuvProc *)proc)->uv), NULL); + } else { + proc_close(proc); + } + proc_free(proc); + proc->status = -1; + return status; + } + + if (in) { + stream_init(NULL, &proc->in, -1, (uv_stream_t *)&proc->in.uv.pipe); + proc->in.internal_data = proc; + proc->in.internal_close_cb = on_proc_stream_close; + proc->refcount++; + } + + if (out) { + stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe); + proc->out.s.internal_data = proc; + proc->out.s.internal_close_cb = on_proc_stream_close; + proc->refcount++; + } + + if (err) { + stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe); + proc->err.s.internal_data = proc; + proc->err.s.internal_close_cb = on_proc_stream_close; + proc->refcount++; + } + + proc->internal_exit_cb = on_proc_exit; + proc->internal_close_cb = decref; + proc->refcount++; + kl_push(WatcherPtr, proc->loop->children, proc); + DLOG("new: pid=%d exepath=[%s]", proc->pid, proc_get_exepath(proc)); + return 0; +} + +void proc_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL +{ + proc_is_tearing_down = true; + kl_iter(WatcherPtr, loop->children, current) { + Proc *proc = (*current)->data; + if (proc->detach || proc->type == kProcTypePty) { + // Close handles to process without killing it. + CREATE_EVENT(loop->events, proc_close_handles, proc); + } else { + proc_stop(proc); + } + } + + // Wait until all children exit and all close events are processed. + LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, + kl_empty(loop->children) && multiqueue_empty(loop->events)); + pty_proc_teardown(loop); +} + +void proc_close_streams(Proc *proc) FUNC_ATTR_NONNULL_ALL +{ + wstream_may_close(&proc->in); + rstream_may_close(&proc->out); + rstream_may_close(&proc->err); +} + +/// Synchronously wait for a process to finish +/// +/// @param process Process instance +/// @param ms Time in milliseconds to wait for the process. +/// 0 for no wait. -1 to wait until the process quits. +/// @return Exit code of the process. proc->status will have the same value. +/// -1 if the timeout expired while the process is still running. +/// -2 if the user interrupted the wait. +int proc_wait(Proc *proc, int ms, MultiQueue *events) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (!proc->refcount) { + int status = proc->status; + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + return status; + } + + if (!events) { + events = proc->events; + } + + // Increase refcount to stop the exit callback from being called (and possibly + // freed) before we have a chance to get the status. + proc->refcount++; + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms, + // Until... + got_int // interrupted by the user + || proc->refcount == 1); // job exited + + // Assume that a user hitting CTRL-C does not like the current job. Kill it. + if (got_int) { + got_int = false; + proc_stop(proc); + if (ms == -1) { + // We can only return if all streams/handles are closed and the job + // exited. + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1, + proc->refcount == 1); + } else { + LOOP_PROCESS_EVENTS(proc->loop, events, 0); + } + + proc->status = -2; + } + + if (proc->refcount == 1) { + // Job exited, free its resources. + decref(proc); + if (proc->events) { + // decref() created an exit event, process it now. + multiqueue_process_events(proc->events); + } + } else { + proc->refcount--; + } + + return proc->status; +} + +/// Ask a process to terminate and eventually kill if it doesn't respond +void proc_stop(Proc *proc) FUNC_ATTR_NONNULL_ALL +{ + bool exited = (proc->status >= 0); + if (exited || proc->stopped_time) { + return; + } + proc->stopped_time = os_hrtime(); + proc->exit_signal = SIGTERM; + + switch (proc->type) { + case kProcTypeUv: + os_proc_tree_kill(proc->pid, SIGTERM); + break; + case kProcTypePty: + // close all streams for pty processes to send SIGHUP to the process + proc_close_streams(proc); + pty_proc_close_master((PtyProc *)proc); + break; + } + + // (Re)start timer to verify that stopped process(es) died. + uv_timer_start(&proc->loop->children_kill_timer, children_kill_cb, + KILL_TIMEOUT_MS, 0); +} + +/// Frees process-owned resources. +void proc_free(Proc *proc) FUNC_ATTR_NONNULL_ALL +{ + if (proc->argv != NULL) { + shell_free_argv(proc->argv); + proc->argv = NULL; + } +} + +/// Sends SIGKILL (or SIGTERM..SIGKILL for PTY jobs) to processes that did +/// not terminate after proc_stop(). +static void children_kill_cb(uv_timer_t *handle) +{ + Loop *loop = handle->loop->data; + + kl_iter(WatcherPtr, loop->children, current) { + Proc *proc = (*current)->data; + bool exited = (proc->status >= 0); + if (exited || !proc->stopped_time) { + continue; + } + uint64_t term_sent = UINT64_MAX == proc->stopped_time; + if (kProcTypePty != proc->type || term_sent) { + proc->exit_signal = SIGKILL; + os_proc_tree_kill(proc->pid, SIGKILL); + } else { + proc->exit_signal = SIGTERM; + os_proc_tree_kill(proc->pid, SIGTERM); + proc->stopped_time = UINT64_MAX; // Flag: SIGTERM was sent. + // Restart timer. + uv_timer_start(&proc->loop->children_kill_timer, children_kill_cb, + KILL_TIMEOUT_MS, 0); + } + } +} + +static void proc_close_event(void **argv) +{ + Proc *proc = argv[0]; + if (proc->cb) { + // User (hint: channel_job_start) is responsible for calling + // proc_free(). + proc->cb(proc, proc->status, proc->data); + } else { + proc_free(proc); + } +} + +static void decref(Proc *proc) +{ + if (--proc->refcount != 0) { + return; + } + + Loop *loop = proc->loop; + kliter_t(WatcherPtr) **node = NULL; + kl_iter(WatcherPtr, loop->children, current) { + if ((*current)->data == proc) { + node = current; + break; + } + } + assert(node); + kl_shift_at(WatcherPtr, loop->children, node); + CREATE_EVENT(proc->events, proc_close_event, proc); +} + +static void proc_close(Proc *proc) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (proc_is_tearing_down && (proc->detach || proc->type == kProcTypePty) + && proc->closed) { + // If a detached/pty process dies while tearing down it might get closed + // twice. + return; + } + assert(!proc->closed); + proc->closed = true; + + if (proc->detach) { + if (proc->type == kProcTypeUv) { + uv_unref((uv_handle_t *)&(((LibuvProc *)proc)->uv)); + } + } + + switch (proc->type) { + case kProcTypeUv: + libuv_proc_close((LibuvProc *)proc); + break; + case kProcTypePty: + pty_proc_close((PtyProc *)proc); + break; + } +} + +/// Flush output stream. +/// +/// @param proc Process, for which an output stream should be flushed. +/// @param stream Stream to flush. +static void flush_stream(Proc *proc, RStream *stream) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (!stream || stream->s.closed) { + return; + } + + // Maximal remaining data size of terminated process is system + // buffer size. + // Also helps with a child process that keeps the output streams open. If it + // keeps sending data, we only accept as much data as the system buffer size. + // Otherwise this would block cleanup/teardown. + int system_buffer_size = 0; + int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, + &system_buffer_size); + if (err) { + system_buffer_size = ARENA_BLOCK_SIZE; + } + + size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; + + // Read remaining data. + while (!stream->s.closed && stream->num_bytes < max_bytes) { + // Remember number of bytes before polling + size_t num_bytes = stream->num_bytes; + + // Poll for data and process the generated events. + loop_poll_events(proc->loop, 0); + if (stream->s.events) { + multiqueue_process_events(stream->s.events); + } + + // Stream can be closed if it is empty. + if (num_bytes == stream->num_bytes) { + if (stream->read_cb && !stream->did_eof) { + // Stream callback could miss EOF handling if a child keeps the stream + // open. But only send EOF if we haven't already. + stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true); + } + break; + } + } +} + +static void proc_close_handles(void **argv) +{ + Proc *proc = argv[0]; + + exit_need_delay++; + flush_stream(proc, &proc->out); + flush_stream(proc, &proc->err); + + proc_close_streams(proc); + proc_close(proc); + exit_need_delay--; +} + +static void exit_delay_cb(uv_timer_t *handle) +{ + uv_timer_stop(&main_loop.exit_delay_timer); + multiqueue_put(main_loop.fast_events, exit_event, main_loop.exit_delay_timer.data); +} + +static void exit_event(void **argv) +{ + int status = (int)(intptr_t)argv[0]; + if (exit_need_delay) { + main_loop.exit_delay_timer.data = argv[0]; + uv_timer_start(&main_loop.exit_delay_timer, exit_delay_cb, 0, 0); + return; + } + + if (!exiting) { + if (ui_client_channel_id) { + ui_client_exit_status = status; + os_exit(status); + } else { + assert(status == 0); // Called from rpc_close(), which passes 0 as status. + preserve_exit(NULL); + } + } +} + +void exit_from_channel(int status) +{ + multiqueue_put(main_loop.fast_events, exit_event, (void *)(intptr_t)status); +} + +static void on_proc_exit(Proc *proc) +{ + Loop *loop = proc->loop; + ILOG("exited: pid=%d status=%d stoptime=%" PRIu64, proc->pid, proc->status, + proc->stopped_time); + + if (ui_client_channel_id) { + exit_from_channel(proc->status); + } + + // Process has terminated, but there could still be data to be read from the + // OS. We are still in the libuv loop, so we cannot call code that polls for + // more data directly. Instead delay the reading after the libuv loop by + // queueing proc_close_handles() as an event. + MultiQueue *queue = proc->events ? proc->events : loop->events; + CREATE_EVENT(queue, proc_close_handles, proc); +} + +static void on_proc_stream_close(Stream *stream, void *data) +{ + Proc *proc = data; + decref(proc); +} diff --git a/src/nvim/event/proc.h b/src/nvim/event/proc.h new file mode 100644 index 0000000000..f525d46f87 --- /dev/null +++ b/src/nvim/event/proc.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include + +#include "nvim/event/defs.h" // IWYU pragma: keep +#include "nvim/types_defs.h" + +static inline Proc proc_init(Loop *loop, ProcType type, void *data) +{ + return (Proc) { + .type = type, + .data = data, + .loop = loop, + .events = NULL, + .pid = 0, + .status = -1, + .refcount = 0, + .stopped_time = 0, + .cwd = NULL, + .argv = NULL, + .exepath = NULL, + .in = { .closed = false }, + .out = { .s.closed = false }, + .err = { .s.closed = false }, + .cb = NULL, + .closed = false, + .internal_close_cb = NULL, + .internal_exit_cb = NULL, + .detach = false, + .fwd_err = false, + }; +} + +/// Get the path to the executable of the process. +static inline const char *proc_get_exepath(Proc *proc) +{ + return proc->exepath != NULL ? proc->exepath : proc->argv[0]; +} + +static inline bool proc_is_stopped(Proc *proc) +{ + bool exited = (proc->status >= 0); + return exited || (proc->stopped_time != 0); +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/proc.h.generated.h" +#endif diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c deleted file mode 100644 index 70fc31ba21..0000000000 --- a/src/nvim/event/process.c +++ /dev/null @@ -1,451 +0,0 @@ -#include -#include -#include -#include - -#include "klib/klist.h" -#include "nvim/event/libuv_process.h" -#include "nvim/event/loop.h" -#include "nvim/event/multiqueue.h" -#include "nvim/event/process.h" -#include "nvim/event/rstream.h" -#include "nvim/event/stream.h" -#include "nvim/event/wstream.h" -#include "nvim/globals.h" -#include "nvim/log.h" -#include "nvim/main.h" -#include "nvim/os/process.h" -#include "nvim/os/pty_process.h" -#include "nvim/os/shell.h" -#include "nvim/os/time.h" -#include "nvim/ui_client.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/process.c.generated.h" -#endif - -// Time for a process to exit cleanly before we send KILL. -// For PTY processes SIGTERM is sent first (in case SIGHUP was not enough). -#define KILL_TIMEOUT_MS 2000 - -/// Externally defined with gcov. -#ifdef USE_GCOV -void __gcov_flush(void); -#endif - -static bool process_is_tearing_down = false; - -// Delay exit until handles are closed, to avoid deadlocks -static int exit_need_delay = 0; - -/// @returns zero on success, or negative error code -int process_spawn(Process *proc, bool in, bool out, bool err) - FUNC_ATTR_NONNULL_ALL -{ - // forwarding stderr contradicts with processing it internally - assert(!(err && proc->fwd_err)); - - if (in) { - uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0); - } else { - proc->in.closed = true; - } - - if (out) { - uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0); - } else { - proc->out.s.closed = true; - } - - if (err) { - uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0); - } else { - proc->err.s.closed = true; - } - -#ifdef USE_GCOV - // Flush coverage data before forking, to avoid "Merge mismatch" errors. - __gcov_flush(); -#endif - - int status; - switch (proc->type) { - case kProcessTypeUv: - status = libuv_process_spawn((LibuvProcess *)proc); - break; - case kProcessTypePty: - status = pty_process_spawn((PtyProcess *)proc); - break; - } - - if (status) { - if (in) { - uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); - } - if (out) { - uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL); - } - if (err) { - uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL); - } - - if (proc->type == kProcessTypeUv) { - uv_close((uv_handle_t *)&(((LibuvProcess *)proc)->uv), NULL); - } else { - process_close(proc); - } - process_free(proc); - proc->status = -1; - return status; - } - - if (in) { - stream_init(NULL, &proc->in, -1, (uv_stream_t *)&proc->in.uv.pipe); - proc->in.internal_data = proc; - proc->in.internal_close_cb = on_process_stream_close; - proc->refcount++; - } - - if (out) { - stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe); - proc->out.s.internal_data = proc; - proc->out.s.internal_close_cb = on_process_stream_close; - proc->refcount++; - } - - if (err) { - stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe); - proc->err.s.internal_data = proc; - proc->err.s.internal_close_cb = on_process_stream_close; - proc->refcount++; - } - - proc->internal_exit_cb = on_process_exit; - proc->internal_close_cb = decref; - proc->refcount++; - kl_push(WatcherPtr, proc->loop->children, proc); - DLOG("new: pid=%d exepath=[%s]", proc->pid, process_get_exepath(proc)); - return 0; -} - -void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL -{ - process_is_tearing_down = true; - kl_iter(WatcherPtr, loop->children, current) { - Process *proc = (*current)->data; - if (proc->detach || proc->type == kProcessTypePty) { - // Close handles to process without killing it. - CREATE_EVENT(loop->events, process_close_handles, proc); - } else { - process_stop(proc); - } - } - - // Wait until all children exit and all close events are processed. - LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, - kl_empty(loop->children) && multiqueue_empty(loop->events)); - pty_process_teardown(loop); -} - -void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL -{ - wstream_may_close(&proc->in); - rstream_may_close(&proc->out); - rstream_may_close(&proc->err); -} - -/// Synchronously wait for a process to finish -/// -/// @param process Process instance -/// @param ms Time in milliseconds to wait for the process. -/// 0 for no wait. -1 to wait until the process quits. -/// @return Exit code of the process. proc->status will have the same value. -/// -1 if the timeout expired while the process is still running. -/// -2 if the user interrupted the wait. -int process_wait(Process *proc, int ms, MultiQueue *events) - FUNC_ATTR_NONNULL_ARG(1) -{ - if (!proc->refcount) { - int status = proc->status; - LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); - return status; - } - - if (!events) { - events = proc->events; - } - - // Increase refcount to stop the exit callback from being called (and possibly - // freed) before we have a chance to get the status. - proc->refcount++; - LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms, - // Until... - got_int // interrupted by the user - || proc->refcount == 1); // job exited - - // Assume that a user hitting CTRL-C does not like the current job. Kill it. - if (got_int) { - got_int = false; - process_stop(proc); - if (ms == -1) { - // We can only return if all streams/handles are closed and the job - // exited. - LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1, - proc->refcount == 1); - } else { - LOOP_PROCESS_EVENTS(proc->loop, events, 0); - } - - proc->status = -2; - } - - if (proc->refcount == 1) { - // Job exited, free its resources. - decref(proc); - if (proc->events) { - // decref() created an exit event, process it now. - multiqueue_process_events(proc->events); - } - } else { - proc->refcount--; - } - - return proc->status; -} - -/// Ask a process to terminate and eventually kill if it doesn't respond -void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL -{ - bool exited = (proc->status >= 0); - if (exited || proc->stopped_time) { - return; - } - proc->stopped_time = os_hrtime(); - proc->exit_signal = SIGTERM; - - switch (proc->type) { - case kProcessTypeUv: - os_proc_tree_kill(proc->pid, SIGTERM); - break; - case kProcessTypePty: - // close all streams for pty processes to send SIGHUP to the process - process_close_streams(proc); - pty_process_close_master((PtyProcess *)proc); - break; - } - - // (Re)start timer to verify that stopped process(es) died. - uv_timer_start(&proc->loop->children_kill_timer, children_kill_cb, - KILL_TIMEOUT_MS, 0); -} - -/// Frees process-owned resources. -void process_free(Process *proc) FUNC_ATTR_NONNULL_ALL -{ - if (proc->argv != NULL) { - shell_free_argv(proc->argv); - proc->argv = NULL; - } -} - -/// Sends SIGKILL (or SIGTERM..SIGKILL for PTY jobs) to processes that did -/// not terminate after process_stop(). -static void children_kill_cb(uv_timer_t *handle) -{ - Loop *loop = handle->loop->data; - - kl_iter(WatcherPtr, loop->children, current) { - Process *proc = (*current)->data; - bool exited = (proc->status >= 0); - if (exited || !proc->stopped_time) { - continue; - } - uint64_t term_sent = UINT64_MAX == proc->stopped_time; - if (kProcessTypePty != proc->type || term_sent) { - proc->exit_signal = SIGKILL; - os_proc_tree_kill(proc->pid, SIGKILL); - } else { - proc->exit_signal = SIGTERM; - os_proc_tree_kill(proc->pid, SIGTERM); - proc->stopped_time = UINT64_MAX; // Flag: SIGTERM was sent. - // Restart timer. - uv_timer_start(&proc->loop->children_kill_timer, children_kill_cb, - KILL_TIMEOUT_MS, 0); - } - } -} - -static void process_close_event(void **argv) -{ - Process *proc = argv[0]; - if (proc->cb) { - // User (hint: channel_job_start) is responsible for calling - // process_free(). - proc->cb(proc, proc->status, proc->data); - } else { - process_free(proc); - } -} - -static void decref(Process *proc) -{ - if (--proc->refcount != 0) { - return; - } - - Loop *loop = proc->loop; - kliter_t(WatcherPtr) **node = NULL; - kl_iter(WatcherPtr, loop->children, current) { - if ((*current)->data == proc) { - node = current; - break; - } - } - assert(node); - kl_shift_at(WatcherPtr, loop->children, node); - CREATE_EVENT(proc->events, process_close_event, proc); -} - -static void process_close(Process *proc) - FUNC_ATTR_NONNULL_ARG(1) -{ - if (process_is_tearing_down && (proc->detach || proc->type == kProcessTypePty) - && proc->closed) { - // If a detached/pty process dies while tearing down it might get closed - // twice. - return; - } - assert(!proc->closed); - proc->closed = true; - - if (proc->detach) { - if (proc->type == kProcessTypeUv) { - uv_unref((uv_handle_t *)&(((LibuvProcess *)proc)->uv)); - } - } - - switch (proc->type) { - case kProcessTypeUv: - libuv_process_close((LibuvProcess *)proc); - break; - case kProcessTypePty: - pty_process_close((PtyProcess *)proc); - break; - } -} - -/// Flush output stream. -/// -/// @param proc Process, for which an output stream should be flushed. -/// @param stream Stream to flush. -static void flush_stream(Process *proc, RStream *stream) - FUNC_ATTR_NONNULL_ARG(1) -{ - if (!stream || stream->s.closed) { - return; - } - - // Maximal remaining data size of terminated process is system - // buffer size. - // Also helps with a child process that keeps the output streams open. If it - // keeps sending data, we only accept as much data as the system buffer size. - // Otherwise this would block cleanup/teardown. - int system_buffer_size = 0; - int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, - &system_buffer_size); - if (err) { - system_buffer_size = ARENA_BLOCK_SIZE; - } - - size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; - - // Read remaining data. - while (!stream->s.closed && stream->num_bytes < max_bytes) { - // Remember number of bytes before polling - size_t num_bytes = stream->num_bytes; - - // Poll for data and process the generated events. - loop_poll_events(proc->loop, 0); - if (stream->s.events) { - multiqueue_process_events(stream->s.events); - } - - // Stream can be closed if it is empty. - if (num_bytes == stream->num_bytes) { - if (stream->read_cb && !stream->did_eof) { - // Stream callback could miss EOF handling if a child keeps the stream - // open. But only send EOF if we haven't already. - stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true); - } - break; - } - } -} - -static void process_close_handles(void **argv) -{ - Process *proc = argv[0]; - - exit_need_delay++; - flush_stream(proc, &proc->out); - flush_stream(proc, &proc->err); - - process_close_streams(proc); - process_close(proc); - exit_need_delay--; -} - -static void exit_delay_cb(uv_timer_t *handle) -{ - uv_timer_stop(&main_loop.exit_delay_timer); - multiqueue_put(main_loop.fast_events, exit_event, main_loop.exit_delay_timer.data); -} - -static void exit_event(void **argv) -{ - int status = (int)(intptr_t)argv[0]; - if (exit_need_delay) { - main_loop.exit_delay_timer.data = argv[0]; - uv_timer_start(&main_loop.exit_delay_timer, exit_delay_cb, 0, 0); - return; - } - - if (!exiting) { - if (ui_client_channel_id) { - ui_client_exit_status = status; - os_exit(status); - } else { - assert(status == 0); // Called from rpc_close(), which passes 0 as status. - preserve_exit(NULL); - } - } -} - -void exit_from_channel(int status) -{ - multiqueue_put(main_loop.fast_events, exit_event, (void *)(intptr_t)status); -} - -static void on_process_exit(Process *proc) -{ - Loop *loop = proc->loop; - ILOG("exited: pid=%d status=%d stoptime=%" PRIu64, proc->pid, proc->status, - proc->stopped_time); - - if (ui_client_channel_id) { - exit_from_channel(proc->status); - } - - // Process has terminated, but there could still be data to be read from the - // OS. We are still in the libuv loop, so we cannot call code that polls for - // more data directly. Instead delay the reading after the libuv loop by - // queueing process_close_handles() as an event. - MultiQueue *queue = proc->events ? proc->events : loop->events; - CREATE_EVENT(queue, process_close_handles, proc); -} - -static void on_process_stream_close(Stream *stream, void *data) -{ - Process *proc = data; - decref(proc); -} diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h deleted file mode 100644 index 74b52cbbb1..0000000000 --- a/src/nvim/event/process.h +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include -#include - -#include "nvim/event/defs.h" // IWYU pragma: keep -#include "nvim/types_defs.h" - -static inline Process process_init(Loop *loop, ProcessType type, void *data) -{ - return (Process) { - .type = type, - .data = data, - .loop = loop, - .events = NULL, - .pid = 0, - .status = -1, - .refcount = 0, - .stopped_time = 0, - .cwd = NULL, - .argv = NULL, - .exepath = NULL, - .in = { .closed = false }, - .out = { .s.closed = false }, - .err = { .s.closed = false }, - .cb = NULL, - .closed = false, - .internal_close_cb = NULL, - .internal_exit_cb = NULL, - .detach = false, - .fwd_err = false, - }; -} - -/// Get the path to the executable of the process. -static inline const char *process_get_exepath(Process *proc) -{ - return proc->exepath != NULL ? proc->exepath : proc->argv[0]; -} - -static inline bool process_is_stopped(Process *proc) -{ - bool exited = (proc->status >= 0); - return exited || (proc->stopped_time != 0); -} - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/process.h.generated.h" -#endif -- cgit From 046e0956eeb35e4688daf4eb76461e0a73727be4 Mon Sep 17 00:00:00 2001 From: dundargoc Date: Wed, 25 Sep 2024 15:20:24 +0200 Subject: build: fix or silence new clang-tidy warnings --- src/nvim/event/loop.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index da4852b836..b86e83de3c 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -9,8 +9,8 @@ typedef void *WatcherPtr; -#define _NOOP(x) -KLIST_INIT(WatcherPtr, WatcherPtr, _NOOP) +#define NOOP(x) +KLIST_INIT(WatcherPtr, WatcherPtr, NOOP) struct loop { uv_loop_t uv; -- cgit From 80709882476206b8f1ab3c004c82a1efd039c684 Mon Sep 17 00:00:00 2001 From: zeertzjq Date: Thu, 26 Sep 2024 16:36:50 +0800 Subject: fix(channel): handle writing to file instead of pipe (#30519) --- src/nvim/event/defs.h | 6 +++--- src/nvim/event/rstream.c | 5 ++--- src/nvim/event/stream.c | 3 +++ src/nvim/event/wstream.c | 24 ++++++++++++++++++++---- 4 files changed, 28 insertions(+), 10 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 33e2f9cecb..20724f9263 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -86,8 +86,9 @@ struct stream { uv_tty_t tty; #endif } uv; - uv_stream_t *uvstream; - uv_file fd; + uv_stream_t *uvstream; ///< NULL when the stream is a file + uv_file fd; ///< When the stream is a file, this is its file descriptor + int64_t fpos; ///< When the stream is a file, this is the position in file void *cb_data; stream_close_cb close_cb, internal_close_cb; void *close_cb_data, *internal_data; @@ -112,7 +113,6 @@ struct rstream { uv_buf_t uvbuf; stream_read_cb read_cb; size_t num_bytes; - int64_t fpos; }; #define ADDRESS_MAX_SIZE 256 diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 71290d0c0d..15bdc547d5 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -34,7 +34,6 @@ void rstream_init_stream(RStream *stream, uv_stream_t *uvstream) void rstream_init(RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { - stream->fpos = 0; stream->read_cb = NULL; stream->num_bytes = 0; stream->buffer = alloc_block(); @@ -156,7 +155,7 @@ static void fread_idle_cb(uv_idle_t *handle) stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream)); // Synchronous read - uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->fpos, NULL); + uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->s.fpos, NULL); uv_fs_req_cleanup(&req); @@ -168,7 +167,7 @@ static void fread_idle_cb(uv_idle_t *handle) // no errors (req.result (ssize_t) is positive), it's safe to use. stream->write_pos += req.result; - stream->fpos += req.result; + stream->s.fpos += req.result; invoke_read_cb(stream, false); } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index d9c44e06be..71de6ee1ba 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -44,6 +44,8 @@ int stream_set_blocking(int fd, bool blocking) void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(2) { + // The underlying stream is either a file or an existing uv stream. + assert(uvstream == NULL ? fd >= 0 : fd < 0); stream->uvstream = uvstream; if (fd >= 0) { @@ -83,6 +85,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) stream->uvstream->data = stream; } + stream->fpos = 0; stream->internal_data = NULL; stream->curmem = 0; stream->maxmem = 0; diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 07aab87e4d..5005c4e84f 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -73,6 +73,26 @@ bool wstream_write(Stream *stream, WBuffer *buffer) // This should not be called after a stream was freed assert(!stream->closed); + uv_buf_t uvbuf; + uvbuf.base = buffer->data; + uvbuf.len = UV_BUF_LEN(buffer->size); + + if (!stream->uvstream) { + uv_fs_t req; + + // Synchronous write + uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL); + + uv_fs_req_cleanup(&req); + + wstream_release_wbuffer(buffer); + + assert(stream->write_cb == NULL); + + stream->fpos += MAX(req.result, 0); + return req.result > 0; + } + if (stream->curmem > stream->maxmem) { goto err; } @@ -84,10 +104,6 @@ bool wstream_write(Stream *stream, WBuffer *buffer) data->buffer = buffer; data->uv_req.data = data; - uv_buf_t uvbuf; - uvbuf.base = buffer->data; - uvbuf.len = UV_BUF_LEN(buffer->size); - if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) { xfree(data); goto err; -- cgit From 76163590f0b1a39e281446b6b6b17d00b0dcae15 Mon Sep 17 00:00:00 2001 From: bfredl Date: Sat, 28 Sep 2024 11:56:08 +0200 Subject: refactor(event): change last use of klist to kvec loop->children might have been a linked list because used to be modified in place while looped over. However the loops that exists rather schedules events to be processed later, outside of the loop, so this can not happen anymore. When a linked list is otherwise useful it is better to use lib/queue_defs.h which defines an _intrusive_ linked list (i e it doesn't need to do allocations for list items like klist ). --- src/nvim/event/loop.c | 4 ++-- src/nvim/event/loop.h | 9 ++------- src/nvim/event/proc.c | 29 ++++++++++++++++------------- 3 files changed, 20 insertions(+), 22 deletions(-) (limited to 'src/nvim/event') diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index e1ebcecbd6..15d993cc62 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -20,7 +20,7 @@ void loop_init(Loop *loop, void *data) loop->recursive = 0; loop->closing = false; loop->uv.data = loop; - loop->children = kl_init(WatcherPtr); + kv_init(loop->children); loop->events = multiqueue_new_parent(loop_on_put, loop); loop->fast_events = multiqueue_new_child(loop->events); loop->thread_events = multiqueue_new_parent(NULL, NULL); @@ -187,7 +187,7 @@ bool loop_close(Loop *loop, bool wait) multiqueue_free(loop->fast_events); multiqueue_free(loop->thread_events); multiqueue_free(loop->events); - kl_destroy(WatcherPtr, loop->children); + kv_destroy(loop->children); return rv; } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index b86e83de3c..563b254a0b 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -3,15 +3,10 @@ #include #include -#include "klib/klist.h" +#include "klib/kvec.h" #include "nvim/event/defs.h" // IWYU pragma: keep #include "nvim/types_defs.h" // IWYU pragma: keep -typedef void *WatcherPtr; - -#define NOOP(x) -KLIST_INIT(WatcherPtr, WatcherPtr, NOOP) - struct loop { uv_loop_t uv; MultiQueue *events; @@ -27,7 +22,7 @@ struct loop { MultiQueue *fast_events; // used by process/job-control subsystem - klist_t(WatcherPtr) *children; + kvec_t(Proc *) children; uv_signal_t children_watcher; uv_timer_t children_kill_timer; diff --git a/src/nvim/event/proc.c b/src/nvim/event/proc.c index 808bf794f0..5ae3bd8c2d 100644 --- a/src/nvim/event/proc.c +++ b/src/nvim/event/proc.c @@ -3,7 +3,6 @@ #include #include -#include "klib/klist.h" #include "nvim/event/libuv_proc.h" #include "nvim/event/loop.h" #include "nvim/event/multiqueue.h" @@ -123,7 +122,7 @@ int proc_spawn(Proc *proc, bool in, bool out, bool err) proc->internal_exit_cb = on_proc_exit; proc->internal_close_cb = decref; proc->refcount++; - kl_push(WatcherPtr, proc->loop->children, proc); + kv_push(proc->loop->children, proc); DLOG("new: pid=%d exepath=[%s]", proc->pid, proc_get_exepath(proc)); return 0; } @@ -131,8 +130,8 @@ int proc_spawn(Proc *proc, bool in, bool out, bool err) void proc_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL { proc_is_tearing_down = true; - kl_iter(WatcherPtr, loop->children, current) { - Proc *proc = (*current)->data; + for (size_t i = 0; i < kv_size(loop->children); i++) { + Proc *proc = kv_A(loop->children, i); if (proc->detach || proc->type == kProcTypePty) { // Close handles to process without killing it. CREATE_EVENT(loop->events, proc_close_handles, proc); @@ -143,7 +142,7 @@ void proc_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL // Wait until all children exit and all close events are processed. LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, - kl_empty(loop->children) && multiqueue_empty(loop->events)); + kv_size(loop->children) == 0 && multiqueue_empty(loop->events)); pty_proc_teardown(loop); } @@ -254,8 +253,8 @@ static void children_kill_cb(uv_timer_t *handle) { Loop *loop = handle->loop->data; - kl_iter(WatcherPtr, loop->children, current) { - Proc *proc = (*current)->data; + for (size_t i = 0; i < kv_size(loop->children); i++) { + Proc *proc = kv_A(loop->children, i); bool exited = (proc->status >= 0); if (exited || !proc->stopped_time) { continue; @@ -294,15 +293,19 @@ static void decref(Proc *proc) } Loop *loop = proc->loop; - kliter_t(WatcherPtr) **node = NULL; - kl_iter(WatcherPtr, loop->children, current) { - if ((*current)->data == proc) { - node = current; + size_t i; + for (i = 0; i < kv_size(loop->children); i++) { + Proc *current = kv_A(loop->children, i); + if (current == proc) { break; } } - assert(node); - kl_shift_at(WatcherPtr, loop->children, node); + assert(i < kv_size(loop->children)); // element found + if (i < kv_size(loop->children) - 1) { + memmove(&kv_A(loop->children, i), &kv_A(loop->children, i + 1), + sizeof(&kv_A(loop->children, i)) * (kv_size(loop->children) - (i + 1))); + } + kv_size(loop->children)--; CREATE_EVENT(proc->events, proc_close_event, proc); } -- cgit