diff options
| author | Josh Rahm <joshuarahm@gmail.com> | 2024-11-19 22:57:13 +0000 |
|---|---|---|
| committer | Josh Rahm <joshuarahm@gmail.com> | 2024-11-19 22:57:13 +0000 |
| commit | 9be89f131f87608f224f0ee06d199fcd09d32176 (patch) | |
| tree | 11022dcfa9e08cb4ac5581b16734196128688d48 /src/nvim/event | |
| parent | ff7ed8f586589d620a806c3758fac4a47a8e7e15 (diff) | |
| parent | 88085c2e80a7e3ac29aabb6b5420377eed99b8b6 (diff) | |
| download | rneovim-9be89f131f87608f224f0ee06d199fcd09d32176.tar.gz rneovim-9be89f131f87608f224f0ee06d199fcd09d32176.tar.bz2 rneovim-9be89f131f87608f224f0ee06d199fcd09d32176.zip | |
Merge remote-tracking branch 'upstream/master' into mix_20240309
Diffstat (limited to 'src/nvim/event')
| -rw-r--r-- | src/nvim/event/defs.h | 70 | ||||
| -rw-r--r-- | src/nvim/event/libuv_proc.c (renamed from src/nvim/event/libuv_process.c) | 34 | ||||
| -rw-r--r-- | src/nvim/event/libuv_proc.h (renamed from src/nvim/event/libuv_process.h) | 6 | ||||
| -rw-r--r-- | src/nvim/event/loop.c | 4 | ||||
| -rw-r--r-- | src/nvim/event/loop.h | 26 | ||||
| -rw-r--r-- | src/nvim/event/proc.c (renamed from src/nvim/event/process.c) | 194 | ||||
| -rw-r--r-- | src/nvim/event/proc.h (renamed from src/nvim/event/process.h) | 14 | ||||
| -rw-r--r-- | src/nvim/event/rstream.c | 200 | ||||
| -rw-r--r-- | src/nvim/event/socket.c | 18 | ||||
| -rw-r--r-- | src/nvim/event/stream.c | 42 | ||||
| -rw-r--r-- | src/nvim/event/wstream.c | 31 |
11 files changed, 353 insertions, 286 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 9b7d8708be..20724f9263 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -6,7 +6,6 @@ #include <uv.h> #include "nvim/eval/typval_defs.h" -#include "nvim/rbuffer_defs.h" #include "nvim/types_defs.h" enum { EVENT_HANDLER_MAX_ARGC = 10, }; @@ -55,14 +54,17 @@ struct wbuffer { }; typedef struct stream Stream; -/// Type of function called when the Stream buffer is filled with data +typedef struct rstream RStream; +/// Type of function called when the RStream buffer is filled with data /// /// @param stream The Stream instance -/// @param buf The associated RBuffer instance +/// @param read_data data that was read /// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof); +/// @return number of bytes which were consumed +typedef size_t (*stream_read_cb)(RStream *stream, const char *read_data, size_t count, void *data, + bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -71,11 +73,11 @@ typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void /// @param data User-defined data /// @param status 0 on success, anything else indicates failure typedef void (*stream_write_cb)(Stream *stream, void *data, int status); + typedef void (*stream_close_cb)(Stream *stream, void *data); struct stream { bool closed; - bool did_eof; union { uv_pipe_t pipe; uv_tcp_t tcp; @@ -84,21 +86,33 @@ struct stream { uv_tty_t tty; #endif } uv; - uv_stream_t *uvstream; - uv_buf_t uvbuf; - RBuffer *buffer; - uv_file fd; - stream_read_cb read_cb; - stream_write_cb write_cb; + uv_stream_t *uvstream; ///< NULL when the stream is a file + uv_file fd; ///< When the stream is a file, this is its file descriptor + int64_t fpos; ///< When the stream is a file, this is the position in file void *cb_data; stream_close_cb close_cb, internal_close_cb; void *close_cb_data, *internal_data; - size_t fpos; + size_t pending_reqs; + MultiQueue *events; + + // only used for writing: + stream_write_cb write_cb; size_t curmem; size_t maxmem; - size_t pending_reqs; +}; + +struct rstream { + Stream s; + bool did_eof; + bool want_read; + bool pending_read; + bool paused_full; + char *buffer; // ARENA_BLOCK_SIZE + char *read_pos; + char *write_pos; + uv_buf_t uvbuf; + stream_read_cb read_cb; size_t num_bytes; - MultiQueue *events; }; #define ADDRESS_MAX_SIZE 256 @@ -128,29 +142,31 @@ struct socket_watcher { }; typedef enum { - kProcessTypeUv, - kProcessTypePty, -} ProcessType; + kProcTypeUv, + kProcTypePty, +} ProcType; -typedef struct process Process; -typedef void (*process_exit_cb)(Process *proc, int status, void *data); -typedef void (*internal_process_cb)(Process *proc); +/// OS process +typedef struct proc Proc; +typedef void (*proc_exit_cb)(Proc *proc, int status, void *data); +typedef void (*internal_proc_cb)(Proc *proc); -struct process { - ProcessType type; +struct proc { + ProcType type; Loop *loop; void *data; int pid, status, refcount; uint8_t exit_signal; // Signal used when killing (on Windows). - uint64_t stopped_time; // process_stop() timestamp + uint64_t stopped_time; // proc_stop() timestamp const char *cwd; char **argv; const char *exepath; dict_T *env; - Stream in, out, err; - /// Exit handler. If set, user must call process_free(). - process_exit_cb cb; - internal_process_cb internal_exit_cb, internal_close_cb; + Stream in; + RStream out, err; + /// Exit handler. If set, user must call proc_free(). + proc_exit_cb cb; + internal_proc_cb internal_exit_cb, internal_close_cb; bool closed, detach, overlapped, fwd_err; MultiQueue *events; }; diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_proc.c index f77d686c10..5b445cdda7 100644 --- a/src/nvim/event/libuv_process.c +++ b/src/nvim/event/libuv_proc.c @@ -5,9 +5,9 @@ #include "nvim/eval/typval.h" #include "nvim/event/defs.h" -#include "nvim/event/libuv_process.h" +#include "nvim/event/libuv_proc.h" #include "nvim/event/loop.h" -#include "nvim/event/process.h" +#include "nvim/event/proc.h" #include "nvim/log.h" #include "nvim/os/os.h" #include "nvim/os/os_defs.h" @@ -15,15 +15,15 @@ #include "nvim/ui_client.h" #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/libuv_process.c.generated.h" +# include "event/libuv_proc.c.generated.h" #endif /// @returns zero on success, or negative error code -int libuv_process_spawn(LibuvProcess *uvproc) +int libuv_proc_spawn(LibuvProc *uvproc) FUNC_ATTR_NONNULL_ALL { - Process *proc = (Process *)uvproc; - uvproc->uvopts.file = process_get_exepath(proc); + Proc *proc = (Proc *)uvproc; + uvproc->uvopts.file = proc_get_exepath(proc); uvproc->uvopts.args = proc->argv; uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; #ifdef MSWIN @@ -70,19 +70,19 @@ int libuv_process_spawn(LibuvProcess *uvproc) uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe); } - if (!proc->out.closed) { + if (!proc->out.s.closed) { uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; #ifdef MSWIN // pipe must be readable for IOCP to work on Windows. uvproc->uvstdio[1].flags |= proc->overlapped ? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0; #endif - uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.uv.pipe); + uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe); } - if (!proc->err.closed) { + if (!proc->err.s.closed) { uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.uv.pipe); + uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe); } else if (proc->fwd_err) { uvproc->uvstdio[2].flags = UV_INHERIT_FD; uvproc->uvstdio[2].data.fd = STDERR_FILENO; @@ -101,7 +101,7 @@ int libuv_process_spawn(LibuvProcess *uvproc) return status; } -void libuv_process_close(LibuvProcess *uvproc) +void libuv_proc_close(LibuvProc *uvproc) FUNC_ATTR_NONNULL_ARG(1) { uv_close((uv_handle_t *)&uvproc->uv, close_cb); @@ -109,11 +109,11 @@ void libuv_process_close(LibuvProcess *uvproc) static void close_cb(uv_handle_t *handle) { - Process *proc = handle->data; + Proc *proc = handle->data; if (proc->internal_close_cb) { proc->internal_close_cb(proc); } - LibuvProcess *uvproc = (LibuvProcess *)proc; + LibuvProc *uvproc = (LibuvProc *)proc; if (uvproc->uvopts.env) { os_free_fullenv(uvproc->uvopts.env); } @@ -121,7 +121,7 @@ static void close_cb(uv_handle_t *handle) static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) { - Process *proc = handle->data; + Proc *proc = handle->data; #if defined(MSWIN) // Use stored/expected signal. term_signal = proc->exit_signal; @@ -130,10 +130,10 @@ static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) proc->internal_exit_cb(proc); } -LibuvProcess libuv_process_init(Loop *loop, void *data) +LibuvProc libuv_proc_init(Loop *loop, void *data) { - LibuvProcess rv = { - .process = process_init(loop, kProcessTypeUv, data) + LibuvProc rv = { + .proc = proc_init(loop, kProcTypeUv, data) }; return rv; } diff --git a/src/nvim/event/libuv_process.h b/src/nvim/event/libuv_proc.h index 12401dbb35..3127e166c0 100644 --- a/src/nvim/event/libuv_process.h +++ b/src/nvim/event/libuv_proc.h @@ -5,12 +5,12 @@ #include "nvim/event/defs.h" typedef struct { - Process process; + Proc proc; uv_process_t uv; uv_process_options_t uvopts; uv_stdio_container_t uvstdio[4]; -} LibuvProcess; +} LibuvProc; #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/libuv_process.h.generated.h" +# include "event/libuv_proc.h.generated.h" #endif diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index e1ebcecbd6..15d993cc62 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -20,7 +20,7 @@ void loop_init(Loop *loop, void *data) loop->recursive = 0; loop->closing = false; loop->uv.data = loop; - loop->children = kl_init(WatcherPtr); + kv_init(loop->children); loop->events = multiqueue_new_parent(loop_on_put, loop); loop->fast_events = multiqueue_new_child(loop->events); loop->thread_events = multiqueue_new_parent(NULL, NULL); @@ -187,7 +187,7 @@ bool loop_close(Loop *loop, bool wait) multiqueue_free(loop->fast_events); multiqueue_free(loop->thread_events); multiqueue_free(loop->events); - kl_destroy(WatcherPtr, loop->children); + kv_destroy(loop->children); return rv; } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 6ecc7cb781..563b254a0b 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -3,32 +3,26 @@ #include <stdbool.h> #include <uv.h> -#include "klib/klist.h" +#include "klib/kvec.h" #include "nvim/event/defs.h" // IWYU pragma: keep #include "nvim/types_defs.h" // IWYU pragma: keep -typedef void *WatcherPtr; - -#define _NOOP(x) -KLIST_INIT(WatcherPtr, WatcherPtr, _NOOP) - struct loop { uv_loop_t uv; MultiQueue *events; MultiQueue *thread_events; - // Immediate events: - // "Processed after exiting uv_run() (to avoid recursion), but before - // returning from loop_poll_events()." 502aee690c98 - // Practical consequence (for main_loop): these events are processed by - // state_enter()..os_inchar() - // whereas "regular" events (main_loop.events) are processed by - // state_enter()..VimState.execute() - // But state_enter()..os_inchar() can be "too early" if you want the event - // to trigger UI updates and other user-activity-related side-effects. + // Immediate events. + // - "Processed after exiting `uv_run()` (to avoid recursion), but before returning from + // `loop_poll_events()`." 502aee690c98 + // - Practical consequence (for `main_loop`): + // - these are processed by `state_enter()..input_get()` whereas "regular" events + // (`main_loop.events`) are processed by `state_enter()..VimState.execute()` + // - `state_enter()..input_get()` can be "too early" if you want the event to trigger UI + // updates and other user-activity-related side-effects. MultiQueue *fast_events; // used by process/job-control subsystem - klist_t(WatcherPtr) *children; + kvec_t(Proc *) children; uv_signal_t children_watcher; uv_timer_t children_kill_timer; diff --git a/src/nvim/event/process.c b/src/nvim/event/proc.c index 7460e92766..5ae3bd8c2d 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/proc.c @@ -3,24 +3,24 @@ #include <signal.h> #include <uv.h> -#include "klib/klist.h" -#include "nvim/event/libuv_process.h" +#include "nvim/event/libuv_proc.h" #include "nvim/event/loop.h" #include "nvim/event/multiqueue.h" -#include "nvim/event/process.h" +#include "nvim/event/proc.h" +#include "nvim/event/rstream.h" #include "nvim/event/stream.h" +#include "nvim/event/wstream.h" #include "nvim/globals.h" #include "nvim/log.h" #include "nvim/main.h" -#include "nvim/os/process.h" -#include "nvim/os/pty_process.h" +#include "nvim/os/proc.h" +#include "nvim/os/pty_proc.h" #include "nvim/os/shell.h" #include "nvim/os/time.h" -#include "nvim/rbuffer_defs.h" #include "nvim/ui_client.h" #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/process.c.generated.h" +# include "event/proc.c.generated.h" #endif // Time for a process to exit cleanly before we send KILL. @@ -32,13 +32,13 @@ void __gcov_flush(void); #endif -static bool process_is_tearing_down = false; +static bool proc_is_tearing_down = false; // Delay exit until handles are closed, to avoid deadlocks static int exit_need_delay = 0; /// @returns zero on success, or negative error code -int process_spawn(Process *proc, bool in, bool out, bool err) +int proc_spawn(Proc *proc, bool in, bool out, bool err) FUNC_ATTR_NONNULL_ALL { // forwarding stderr contradicts with processing it internally @@ -51,15 +51,15 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0); } else { - proc->out.closed = true; + proc->out.s.closed = true; } if (err) { - uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0); } else { - proc->err.closed = true; + proc->err.s.closed = true; } #ifdef USE_GCOV @@ -69,11 +69,11 @@ int process_spawn(Process *proc, bool in, bool out, bool err) int status; switch (proc->type) { - case kProcessTypeUv: - status = libuv_process_spawn((LibuvProcess *)proc); + case kProcTypeUv: + status = libuv_proc_spawn((LibuvProc *)proc); break; - case kProcessTypePty: - status = pty_process_spawn((PtyProcess *)proc); + case kProcTypePty: + status = pty_proc_spawn((PtyProc *)proc); break; } @@ -82,18 +82,18 @@ int process_spawn(Process *proc, bool in, bool out, bool err) uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); } if (out) { - uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL); } if (err) { - uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL); } - if (proc->type == kProcessTypeUv) { - uv_close((uv_handle_t *)&(((LibuvProcess *)proc)->uv), NULL); + if (proc->type == kProcTypeUv) { + uv_close((uv_handle_t *)&(((LibuvProc *)proc)->uv), NULL); } else { - process_close(proc); + proc_close(proc); } - process_free(proc); + proc_free(proc); proc->status = -1; return status; } @@ -101,56 +101,56 @@ int process_spawn(Process *proc, bool in, bool out, bool err) if (in) { stream_init(NULL, &proc->in, -1, (uv_stream_t *)&proc->in.uv.pipe); proc->in.internal_data = proc; - proc->in.internal_close_cb = on_process_stream_close; + proc->in.internal_close_cb = on_proc_stream_close; proc->refcount++; } if (out) { - stream_init(NULL, &proc->out, -1, (uv_stream_t *)&proc->out.uv.pipe); - proc->out.internal_data = proc; - proc->out.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe); + proc->out.s.internal_data = proc; + proc->out.s.internal_close_cb = on_proc_stream_close; proc->refcount++; } if (err) { - stream_init(NULL, &proc->err, -1, (uv_stream_t *)&proc->err.uv.pipe); - proc->err.internal_data = proc; - proc->err.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe); + proc->err.s.internal_data = proc; + proc->err.s.internal_close_cb = on_proc_stream_close; proc->refcount++; } - proc->internal_exit_cb = on_process_exit; + proc->internal_exit_cb = on_proc_exit; proc->internal_close_cb = decref; proc->refcount++; - kl_push(WatcherPtr, proc->loop->children, proc); - DLOG("new: pid=%d exepath=[%s]", proc->pid, process_get_exepath(proc)); + kv_push(proc->loop->children, proc); + DLOG("new: pid=%d exepath=[%s]", proc->pid, proc_get_exepath(proc)); return 0; } -void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL +void proc_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL { - process_is_tearing_down = true; - kl_iter(WatcherPtr, loop->children, current) { - Process *proc = (*current)->data; - if (proc->detach || proc->type == kProcessTypePty) { + proc_is_tearing_down = true; + for (size_t i = 0; i < kv_size(loop->children); i++) { + Proc *proc = kv_A(loop->children, i); + if (proc->detach || proc->type == kProcTypePty) { // Close handles to process without killing it. - CREATE_EVENT(loop->events, process_close_handles, proc); + CREATE_EVENT(loop->events, proc_close_handles, proc); } else { - process_stop(proc); + proc_stop(proc); } } // Wait until all children exit and all close events are processed. LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, - kl_empty(loop->children) && multiqueue_empty(loop->events)); - pty_process_teardown(loop); + kv_size(loop->children) == 0 && multiqueue_empty(loop->events)); + pty_proc_teardown(loop); } -void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL +void proc_close_streams(Proc *proc) FUNC_ATTR_NONNULL_ALL { - stream_may_close(&proc->in); - stream_may_close(&proc->out); - stream_may_close(&proc->err); + wstream_may_close(&proc->in); + rstream_may_close(&proc->out); + rstream_may_close(&proc->err); } /// Synchronously wait for a process to finish @@ -161,7 +161,7 @@ void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL /// @return Exit code of the process. proc->status will have the same value. /// -1 if the timeout expired while the process is still running. /// -2 if the user interrupted the wait. -int process_wait(Process *proc, int ms, MultiQueue *events) +int proc_wait(Proc *proc, int ms, MultiQueue *events) FUNC_ATTR_NONNULL_ARG(1) { if (!proc->refcount) { @@ -185,7 +185,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events) // Assume that a user hitting CTRL-C does not like the current job. Kill it. if (got_int) { got_int = false; - process_stop(proc); + proc_stop(proc); if (ms == -1) { // We can only return if all streams/handles are closed and the job // exited. @@ -213,7 +213,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events) } /// Ask a process to terminate and eventually kill if it doesn't respond -void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL +void proc_stop(Proc *proc) FUNC_ATTR_NONNULL_ALL { bool exited = (proc->status >= 0); if (exited || proc->stopped_time) { @@ -223,13 +223,13 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL proc->exit_signal = SIGTERM; switch (proc->type) { - case kProcessTypeUv: + case kProcTypeUv: os_proc_tree_kill(proc->pid, SIGTERM); break; - case kProcessTypePty: + case kProcTypePty: // close all streams for pty processes to send SIGHUP to the process - process_close_streams(proc); - pty_process_close_master((PtyProcess *)proc); + proc_close_streams(proc); + pty_proc_close_master((PtyProc *)proc); break; } @@ -239,7 +239,7 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL } /// Frees process-owned resources. -void process_free(Process *proc) FUNC_ATTR_NONNULL_ALL +void proc_free(Proc *proc) FUNC_ATTR_NONNULL_ALL { if (proc->argv != NULL) { shell_free_argv(proc->argv); @@ -248,19 +248,19 @@ void process_free(Process *proc) FUNC_ATTR_NONNULL_ALL } /// Sends SIGKILL (or SIGTERM..SIGKILL for PTY jobs) to processes that did -/// not terminate after process_stop(). +/// not terminate after proc_stop(). static void children_kill_cb(uv_timer_t *handle) { Loop *loop = handle->loop->data; - kl_iter(WatcherPtr, loop->children, current) { - Process *proc = (*current)->data; + for (size_t i = 0; i < kv_size(loop->children); i++) { + Proc *proc = kv_A(loop->children, i); bool exited = (proc->status >= 0); if (exited || !proc->stopped_time) { continue; } uint64_t term_sent = UINT64_MAX == proc->stopped_time; - if (kProcessTypePty != proc->type || term_sent) { + if (kProcTypePty != proc->type || term_sent) { proc->exit_signal = SIGKILL; os_proc_tree_kill(proc->pid, SIGKILL); } else { @@ -274,41 +274,45 @@ static void children_kill_cb(uv_timer_t *handle) } } -static void process_close_event(void **argv) +static void proc_close_event(void **argv) { - Process *proc = argv[0]; + Proc *proc = argv[0]; if (proc->cb) { // User (hint: channel_job_start) is responsible for calling - // process_free(). + // proc_free(). proc->cb(proc, proc->status, proc->data); } else { - process_free(proc); + proc_free(proc); } } -static void decref(Process *proc) +static void decref(Proc *proc) { if (--proc->refcount != 0) { return; } Loop *loop = proc->loop; - kliter_t(WatcherPtr) **node = NULL; - kl_iter(WatcherPtr, loop->children, current) { - if ((*current)->data == proc) { - node = current; + size_t i; + for (i = 0; i < kv_size(loop->children); i++) { + Proc *current = kv_A(loop->children, i); + if (current == proc) { break; } } - assert(node); - kl_shift_at(WatcherPtr, loop->children, node); - CREATE_EVENT(proc->events, process_close_event, proc); + assert(i < kv_size(loop->children)); // element found + if (i < kv_size(loop->children) - 1) { + memmove(&kv_A(loop->children, i), &kv_A(loop->children, i + 1), + sizeof(&kv_A(loop->children, i)) * (kv_size(loop->children) - (i + 1))); + } + kv_size(loop->children)--; + CREATE_EVENT(proc->events, proc_close_event, proc); } -static void process_close(Process *proc) +static void proc_close(Proc *proc) FUNC_ATTR_NONNULL_ARG(1) { - if (process_is_tearing_down && (proc->detach || proc->type == kProcessTypePty) + if (proc_is_tearing_down && (proc->detach || proc->type == kProcTypePty) && proc->closed) { // If a detached/pty process dies while tearing down it might get closed // twice. @@ -318,17 +322,17 @@ static void process_close(Process *proc) proc->closed = true; if (proc->detach) { - if (proc->type == kProcessTypeUv) { - uv_unref((uv_handle_t *)&(((LibuvProcess *)proc)->uv)); + if (proc->type == kProcTypeUv) { + uv_unref((uv_handle_t *)&(((LibuvProc *)proc)->uv)); } } switch (proc->type) { - case kProcessTypeUv: - libuv_process_close((LibuvProcess *)proc); + case kProcTypeUv: + libuv_proc_close((LibuvProc *)proc); break; - case kProcessTypePty: - pty_process_close((PtyProcess *)proc); + case kProcTypePty: + pty_proc_close((PtyProc *)proc); break; } } @@ -337,10 +341,10 @@ static void process_close(Process *proc) /// /// @param proc Process, for which an output stream should be flushed. /// @param stream Stream to flush. -static void flush_stream(Process *proc, Stream *stream) +static void flush_stream(Proc *proc, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { - if (!stream || stream->closed) { + if (!stream || stream->s.closed) { return; } @@ -350,23 +354,23 @@ static void flush_stream(Process *proc, Stream *stream) // keeps sending data, we only accept as much data as the system buffer size. // Otherwise this would block cleanup/teardown. int system_buffer_size = 0; - int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe, + int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, &system_buffer_size); if (err) { - system_buffer_size = (int)rbuffer_capacity(stream->buffer); + system_buffer_size = ARENA_BLOCK_SIZE; } size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; // Read remaining data. - while (!stream->closed && stream->num_bytes < max_bytes) { + while (!stream->s.closed && stream->num_bytes < max_bytes) { // Remember number of bytes before polling size_t num_bytes = stream->num_bytes; // Poll for data and process the generated events. loop_poll_events(proc->loop, 0); - if (stream->events) { - multiqueue_process_events(stream->events); + if (stream->s.events) { + multiqueue_process_events(stream->s.events); } // Stream can be closed if it is empty. @@ -374,23 +378,23 @@ static void flush_stream(Process *proc, Stream *stream) if (stream->read_cb && !stream->did_eof) { // Stream callback could miss EOF handling if a child keeps the stream // open. But only send EOF if we haven't already. - stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); + stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true); } break; } } } -static void process_close_handles(void **argv) +static void proc_close_handles(void **argv) { - Process *proc = argv[0]; + Proc *proc = argv[0]; exit_need_delay++; flush_stream(proc, &proc->out); flush_stream(proc, &proc->err); - process_close_streams(proc); - process_close(proc); + proc_close_streams(proc); + proc_close(proc); exit_need_delay--; } @@ -425,7 +429,7 @@ void exit_from_channel(int status) multiqueue_put(main_loop.fast_events, exit_event, (void *)(intptr_t)status); } -static void on_process_exit(Process *proc) +static void on_proc_exit(Proc *proc) { Loop *loop = proc->loop; ILOG("exited: pid=%d status=%d stoptime=%" PRIu64, proc->pid, proc->status, @@ -438,13 +442,13 @@ static void on_process_exit(Process *proc) // Process has terminated, but there could still be data to be read from the // OS. We are still in the libuv loop, so we cannot call code that polls for // more data directly. Instead delay the reading after the libuv loop by - // queueing process_close_handles() as an event. + // queueing proc_close_handles() as an event. MultiQueue *queue = proc->events ? proc->events : loop->events; - CREATE_EVENT(queue, process_close_handles, proc); + CREATE_EVENT(queue, proc_close_handles, proc); } -static void on_process_stream_close(Stream *stream, void *data) +static void on_proc_stream_close(Stream *stream, void *data) { - Process *proc = data; + Proc *proc = data; decref(proc); } diff --git a/src/nvim/event/process.h b/src/nvim/event/proc.h index 421a470244..f525d46f87 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/proc.h @@ -6,9 +6,9 @@ #include "nvim/event/defs.h" // IWYU pragma: keep #include "nvim/types_defs.h" -static inline Process process_init(Loop *loop, ProcessType type, void *data) +static inline Proc proc_init(Loop *loop, ProcType type, void *data) { - return (Process) { + return (Proc) { .type = type, .data = data, .loop = loop, @@ -21,8 +21,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .argv = NULL, .exepath = NULL, .in = { .closed = false }, - .out = { .closed = false }, - .err = { .closed = false }, + .out = { .s.closed = false }, + .err = { .s.closed = false }, .cb = NULL, .closed = false, .internal_close_cb = NULL, @@ -33,17 +33,17 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) } /// Get the path to the executable of the process. -static inline const char *process_get_exepath(Process *proc) +static inline const char *proc_get_exepath(Proc *proc) { return proc->exepath != NULL ? proc->exepath : proc->argv[0]; } -static inline bool process_is_stopped(Process *proc) +static inline bool proc_is_stopped(Proc *proc) { bool exited = (proc->status >= 0); return exited || (proc->stopped_time != 0); } #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "event/process.h.generated.h" +# include "event/proc.h.generated.h" #endif diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 6b4ab472e4..15bdc547d5 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -11,75 +11,80 @@ #include "nvim/macros_defs.h" #include "nvim/main.h" #include "nvim/os/os_defs.h" -#include "nvim/rbuffer.h" -#include "nvim/rbuffer_defs.h" #include "nvim/types_defs.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) +void rstream_init_fd(Loop *loop, RStream *stream, int fd) FUNC_ATTR_NONNULL_ARG(1, 2) { - stream_init(loop, stream, fd, NULL); - rstream_init(stream, bufsize); + stream_init(loop, &stream->s, fd, NULL); + rstream_init(stream); } -void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) +void rstream_init_stream(RStream *stream, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(1, 2) { - stream_init(NULL, stream, -1, uvstream); - rstream_init(stream, bufsize); + stream_init(NULL, &stream->s, -1, uvstream); + rstream_init(stream); } -void rstream_init(Stream *stream, size_t bufsize) +void rstream_init(RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { - stream->buffer = rbuffer_new(bufsize); - stream->buffer->data = stream; - stream->buffer->full_cb = on_rbuffer_full; - stream->buffer->nonfull_cb = on_rbuffer_nonfull; + stream->read_cb = NULL; + stream->num_bytes = 0; + stream->buffer = alloc_block(); + stream->read_pos = stream->write_pos = stream->buffer; +} + +void rstream_start_inner(RStream *stream) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (stream->s.uvstream) { + uv_read_start(stream->s.uvstream, alloc_cb, read_cb); + } else { + uv_idle_start(&stream->s.uv.idle, fread_idle_cb); + } } /// Starts watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_start(Stream *stream, stream_read_cb cb, void *data) +void rstream_start(RStream *stream, stream_read_cb cb, void *data) FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; - stream->cb_data = data; - if (stream->uvstream) { - uv_read_start(stream->uvstream, alloc_cb, read_cb); - } else { - uv_idle_start(&stream->uv.idle, fread_idle_cb); + stream->s.cb_data = data; + stream->want_read = true; + if (!stream->paused_full) { + rstream_start_inner(stream); } } /// Stops watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_stop(Stream *stream) +void rstream_stop_inner(RStream *stream) FUNC_ATTR_NONNULL_ALL { - if (stream->uvstream) { - uv_read_stop(stream->uvstream); + if (stream->s.uvstream) { + uv_read_stop(stream->s.uvstream); } else { - uv_idle_stop(&stream->uv.idle); + uv_idle_stop(&stream->s.uv.idle); } } -static void on_rbuffer_full(RBuffer *buf, void *data) -{ - rstream_stop(data); -} - -static void on_rbuffer_nonfull(RBuffer *buf, void *data) +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(RStream *stream) + FUNC_ATTR_NONNULL_ALL { - Stream *stream = data; - assert(stream->read_cb); - rstream_start(stream, stream->read_cb, stream->cb_data); + rstream_stop_inner(stream); + stream->want_read = false; } // Callbacks used by libuv @@ -87,11 +92,10 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) /// Called by libuv to allocate memory for reading. static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { - Stream *stream = handle->data; - // `uv_buf_t.len` happens to have different size on Windows. - size_t write_count; - buf->base = rbuffer_write_ptr(stream->buffer, &write_count); - buf->len = UV_BUF_LEN(write_count); + RStream *stream = handle->data; + buf->base = stream->write_pos; + // `uv_buf_t.len` happens to have different size on Windows (as a treat) + buf->len = UV_BUF_LEN(rstream_space(stream)); } /// Callback invoked by libuv after it copies the data into the buffer provided @@ -99,27 +103,27 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) /// 0-length buffer. static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) { - Stream *stream = uvstream->data; + RStream *stream = uvstream->data; if (cnt <= 0) { // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. // - // We don't need to do anything with the RBuffer because the next call + // We don't need to do anything with the buffer because the next call // to `alloc_cb` will return the same unused pointer (`rbuffer_produced` // won't be called) if (cnt == UV_ENOBUFS || cnt == 0) { return; } else if (cnt == UV_EOF && uvstream->type == UV_TTY) { // The TTY driver might signal EOF without closing the stream - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); } else { DLOG("closing Stream (%p): %s (%s)", (void *)stream, uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); } return; } @@ -127,10 +131,13 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // at this point we're sure that cnt is positive, no error occurred size_t nread = (size_t)cnt; stream->num_bytes += nread; - // Data was already written, so all we need is to update 'wpos' to reflect - // the space actually used in the buffer. - rbuffer_produced(stream->buffer, nread); - invoke_read_cb(stream, nread, false); + stream->write_pos += cnt; + invoke_read_cb(stream, false); +} + +static size_t rstream_space(RStream *stream) +{ + return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos); } /// Called by the by the 'idle' handle to emulate a reading event @@ -141,66 +148,91 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) static void fread_idle_cb(uv_idle_t *handle) { uv_fs_t req; - Stream *stream = handle->data; + RStream *stream = handle->data; + stream->uvbuf.base = stream->write_pos; // `uv_buf_t.len` happens to have different size on Windows. - size_t write_count; - stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count); - stream->uvbuf.len = UV_BUF_LEN(write_count); - - // the offset argument to uv_fs_read is int64_t, could someone really try - // to read more than 9 quintillion (9e18) bytes? - // upcast is meant to avoid tautological condition warning on 32 bits - uintmax_t fpos_intmax = stream->fpos; - if (fpos_intmax > INT64_MAX) { - ELOG("stream offset overflow"); - preserve_exit("stream offset overflow"); - } + stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream)); // Synchronous read - uv_fs_read(handle->loop, - &req, - stream->fd, - &stream->uvbuf, - 1, - (int64_t)stream->fpos, - NULL); + uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->s.fpos, NULL); uv_fs_req_cleanup(&req); if (req.result <= 0) { - uv_idle_stop(&stream->uv.idle); - invoke_read_cb(stream, 0, true); + uv_idle_stop(&stream->s.uv.idle); + invoke_read_cb(stream, true); return; } - // no errors (req.result (ssize_t) is positive), it's safe to cast. - size_t nread = (size_t)req.result; - rbuffer_produced(stream->buffer, nread); - stream->fpos += nread; - invoke_read_cb(stream, nread, false); + // no errors (req.result (ssize_t) is positive), it's safe to use. + stream->write_pos += req.result; + stream->s.fpos += req.result; + invoke_read_cb(stream, false); } static void read_event(void **argv) { - Stream *stream = argv[0]; + RStream *stream = argv[0]; + stream->pending_read = false; if (stream->read_cb) { - size_t count = (uintptr_t)argv[1]; - bool eof = (uintptr_t)argv[2]; - stream->did_eof = eof; - stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); + size_t available = rstream_available(stream); + size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data, + stream->did_eof); + assert(consumed <= available); + rstream_consume(stream, consumed); + } + stream->s.pending_reqs--; + if (stream->s.closed && !stream->s.pending_reqs) { + stream_close_handle(&stream->s, true); + } +} + +size_t rstream_available(RStream *stream) +{ + return (size_t)(stream->write_pos - stream->read_pos); +} + +void rstream_consume(RStream *stream, size_t consumed) +{ + stream->read_pos += consumed; + size_t remaining = (size_t)(stream->write_pos - stream->read_pos); + if (remaining > 0 && stream->read_pos > stream->buffer) { + memmove(stream->buffer, stream->read_pos, remaining); + stream->read_pos = stream->buffer; + stream->write_pos = stream->buffer + remaining; + } else if (remaining == 0) { + stream->read_pos = stream->write_pos = stream->buffer; } - stream->pending_reqs--; - if (stream->closed && !stream->pending_reqs) { - stream_close_handle(stream); + + if (stream->want_read && stream->paused_full && rstream_space(stream)) { + assert(stream->read_cb); + stream->paused_full = false; + rstream_start_inner(stream); } } -static void invoke_read_cb(Stream *stream, size_t count, bool eof) +static void invoke_read_cb(RStream *stream, bool eof) { + stream->did_eof |= eof; + + if (!rstream_space(stream)) { + rstream_stop_inner(stream); + stream->paused_full = true; + } + + // we cannot use pending_reqs as a socket can have both pending reads and writes + if (stream->pending_read) { + return; + } + // Don't let the stream be closed before the event is processed. - stream->pending_reqs++; + stream->s.pending_reqs++; + stream->pending_read = true; + CREATE_EVENT(stream->s.events, read_event, stream); +} - CREATE_EVENT(stream->events, read_event, - stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); +void rstream_may_close(RStream *stream) +{ + stream_may_close(&stream->s, true); } diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 4e878a2ecf..1214c3e336 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -35,7 +35,7 @@ int socket_watcher_init(Loop *loop, SocketWatcher *watcher, const char *endpoint if (host_end && addr != host_end) { // Split user specified address into two strings, addr(hostname) and port. // The port part in watcher->addr will be updated later. - *host_end = '\0'; + *host_end = NUL; char *port = host_end + 1; intmax_t iport; @@ -135,17 +135,17 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) return 0; } -int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) +int socket_watcher_accept(SocketWatcher *watcher, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; if (watcher->stream->type == UV_TCP) { - client = (uv_stream_t *)(&stream->uv.tcp); + client = (uv_stream_t *)(&stream->s.uv.tcp); uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); uv_tcp_nodelay((uv_tcp_t *)client, true); } else { - client = (uv_stream_t *)&stream->uv.pipe; + client = (uv_stream_t *)&stream->s.uv.pipe; uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); } @@ -156,7 +156,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) return result; } - stream_init(NULL, stream, -1, client); + stream_init(NULL, &stream->s, -1, client); return 0; } @@ -197,7 +197,7 @@ static void connect_cb(uv_connect_t *req, int status) } } -bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address, int timeout, +bool socket_connect(Loop *loop, RStream *stream, bool is_tcp, const char *address, int timeout, const char **error) { bool success = false; @@ -206,7 +206,7 @@ bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address req.data = &status; uv_stream_t *uv_stream; - uv_tcp_t *tcp = &stream->uv.tcp; + uv_tcp_t *tcp = &stream->s.uv.tcp; uv_getaddrinfo_t addr_req; addr_req.addrinfo = NULL; const struct addrinfo *addrinfo = NULL; @@ -237,7 +237,7 @@ tcp_retry: uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb); uv_stream = (uv_stream_t *)tcp; } else { - uv_pipe_t *pipe = &stream->uv.pipe; + uv_pipe_t *pipe = &stream->s.uv.pipe; uv_pipe_init(&loop->uv, pipe, 0); uv_pipe_connect(&req, pipe, address, connect_cb); uv_stream = (uv_stream_t *)pipe; @@ -245,7 +245,7 @@ tcp_retry: status = 1; LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1); if (status == 0) { - stream_init(NULL, stream, -1, uv_stream); + stream_init(NULL, &stream->s, -1, uv_stream); success = true; } else if (is_tcp && addrinfo->ai_next) { addrinfo = addrinfo->ai_next; diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 0b9ed4f25b..71de6ee1ba 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -8,7 +8,6 @@ #include "nvim/event/loop.h" #include "nvim/event/stream.h" #include "nvim/log.h" -#include "nvim/rbuffer.h" #include "nvim/types_defs.h" #ifdef MSWIN # include "nvim/os/os_win_console.h" @@ -45,6 +44,8 @@ int stream_set_blocking(int fd, bool blocking) void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(2) { + // The underlying stream is either a file or an existing uv stream. + assert(uvstream == NULL ? fd >= 0 : fd < 0); stream->uvstream = uvstream; if (fd >= 0) { @@ -84,29 +85,29 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) stream->uvstream->data = stream; } - stream->internal_data = NULL; stream->fpos = 0; + stream->internal_data = NULL; stream->curmem = 0; stream->maxmem = 0; stream->pending_reqs = 0; - stream->read_cb = NULL; stream->write_cb = NULL; stream->close_cb = NULL; stream->internal_close_cb = NULL; stream->closed = false; - stream->buffer = NULL; stream->events = NULL; - stream->num_bytes = 0; } -void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) +void stream_may_close(Stream *stream, bool rstream) FUNC_ATTR_NONNULL_ARG(1) { + if (stream->closed) { + return; + } assert(!stream->closed); DLOG("closing Stream: %p", (void *)stream); stream->closed = true; - stream->close_cb = on_stream_close; - stream->close_cb_data = data; + stream->close_cb = NULL; + stream->close_cb_data = NULL; #ifdef MSWIN if (UV_TTY == uv_guess_handle(stream->fd)) { @@ -116,18 +117,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) #endif if (!stream->pending_reqs) { - stream_close_handle(stream); - } -} - -void stream_may_close(Stream *stream) -{ - if (!stream->closed) { - stream_close(stream, NULL, NULL); + stream_close_handle(stream, rstream); } } -void stream_close_handle(Stream *stream) +void stream_close_handle(Stream *stream, bool rstream) FUNC_ATTR_NONNULL_ALL { uv_handle_t *handle = NULL; @@ -145,16 +139,22 @@ void stream_close_handle(Stream *stream) assert(handle != NULL); if (!uv_is_closing(handle)) { - uv_close(handle, close_cb); + uv_close(handle, rstream ? rstream_close_cb : close_cb); } } -static void close_cb(uv_handle_t *handle) +static void rstream_close_cb(uv_handle_t *handle) { - Stream *stream = handle->data; + RStream *stream = handle->data; if (stream->buffer) { - rbuffer_free(stream->buffer); + free_block(stream->buffer); } + close_cb(handle); +} + +static void close_cb(uv_handle_t *handle) +{ + Stream *stream = handle->data; if (stream->close_cb) { stream->close_cb(stream, stream->close_cb_data); } diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index c67a9b96ed..5005c4e84f 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -73,6 +73,26 @@ bool wstream_write(Stream *stream, WBuffer *buffer) // This should not be called after a stream was freed assert(!stream->closed); + uv_buf_t uvbuf; + uvbuf.base = buffer->data; + uvbuf.len = UV_BUF_LEN(buffer->size); + + if (!stream->uvstream) { + uv_fs_t req; + + // Synchronous write + uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL); + + uv_fs_req_cleanup(&req); + + wstream_release_wbuffer(buffer); + + assert(stream->write_cb == NULL); + + stream->fpos += MAX(req.result, 0); + return req.result > 0; + } + if (stream->curmem > stream->maxmem) { goto err; } @@ -84,10 +104,6 @@ bool wstream_write(Stream *stream, WBuffer *buffer) data->buffer = buffer; data->uv_req.data = data; - uv_buf_t uvbuf; - uvbuf.base = buffer->data; - uvbuf.len = UV_BUF_LEN(buffer->size); - if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) { xfree(data); goto err; @@ -141,7 +157,7 @@ static void write_cb(uv_write_t *req, int status) if (data->stream->closed && data->stream->pending_reqs == 0) { // Last pending write, free the stream; - stream_close_handle(data->stream); + stream_close_handle(data->stream, false); } xfree(data); @@ -158,3 +174,8 @@ void wstream_release_wbuffer(WBuffer *buffer) xfree(buffer); } } + +void wstream_may_close(Stream *stream) +{ + stream_may_close(stream, false); +} |