aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/defs.h70
-rw-r--r--src/nvim/event/libuv_proc.c (renamed from src/nvim/event/libuv_process.c)34
-rw-r--r--src/nvim/event/libuv_proc.h (renamed from src/nvim/event/libuv_process.h)6
-rw-r--r--src/nvim/event/loop.c4
-rw-r--r--src/nvim/event/loop.h26
-rw-r--r--src/nvim/event/proc.c (renamed from src/nvim/event/process.c)194
-rw-r--r--src/nvim/event/proc.h (renamed from src/nvim/event/process.h)14
-rw-r--r--src/nvim/event/rstream.c200
-rw-r--r--src/nvim/event/socket.c18
-rw-r--r--src/nvim/event/stream.c42
-rw-r--r--src/nvim/event/wstream.c31
11 files changed, 353 insertions, 286 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h
index 9b7d8708be..20724f9263 100644
--- a/src/nvim/event/defs.h
+++ b/src/nvim/event/defs.h
@@ -6,7 +6,6 @@
#include <uv.h>
#include "nvim/eval/typval_defs.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
enum { EVENT_HANDLER_MAX_ARGC = 10, };
@@ -55,14 +54,17 @@ struct wbuffer {
};
typedef struct stream Stream;
-/// Type of function called when the Stream buffer is filled with data
+typedef struct rstream RStream;
+/// Type of function called when the RStream buffer is filled with data
///
/// @param stream The Stream instance
-/// @param buf The associated RBuffer instance
+/// @param read_data data that was read
/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
-typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof);
+/// @return number of bytes which were consumed
+typedef size_t (*stream_read_cb)(RStream *stream, const char *read_data, size_t count, void *data,
+ bool eof);
/// Type of function called when the Stream has information about a write
/// request.
@@ -71,11 +73,11 @@ typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void
/// @param data User-defined data
/// @param status 0 on success, anything else indicates failure
typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
+
typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream {
bool closed;
- bool did_eof;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
@@ -84,21 +86,33 @@ struct stream {
uv_tty_t tty;
#endif
} uv;
- uv_stream_t *uvstream;
- uv_buf_t uvbuf;
- RBuffer *buffer;
- uv_file fd;
- stream_read_cb read_cb;
- stream_write_cb write_cb;
+ uv_stream_t *uvstream; ///< NULL when the stream is a file
+ uv_file fd; ///< When the stream is a file, this is its file descriptor
+ int64_t fpos; ///< When the stream is a file, this is the position in file
void *cb_data;
stream_close_cb close_cb, internal_close_cb;
void *close_cb_data, *internal_data;
- size_t fpos;
+ size_t pending_reqs;
+ MultiQueue *events;
+
+ // only used for writing:
+ stream_write_cb write_cb;
size_t curmem;
size_t maxmem;
- size_t pending_reqs;
+};
+
+struct rstream {
+ Stream s;
+ bool did_eof;
+ bool want_read;
+ bool pending_read;
+ bool paused_full;
+ char *buffer; // ARENA_BLOCK_SIZE
+ char *read_pos;
+ char *write_pos;
+ uv_buf_t uvbuf;
+ stream_read_cb read_cb;
size_t num_bytes;
- MultiQueue *events;
};
#define ADDRESS_MAX_SIZE 256
@@ -128,29 +142,31 @@ struct socket_watcher {
};
typedef enum {
- kProcessTypeUv,
- kProcessTypePty,
-} ProcessType;
+ kProcTypeUv,
+ kProcTypePty,
+} ProcType;
-typedef struct process Process;
-typedef void (*process_exit_cb)(Process *proc, int status, void *data);
-typedef void (*internal_process_cb)(Process *proc);
+/// OS process
+typedef struct proc Proc;
+typedef void (*proc_exit_cb)(Proc *proc, int status, void *data);
+typedef void (*internal_proc_cb)(Proc *proc);
-struct process {
- ProcessType type;
+struct proc {
+ ProcType type;
Loop *loop;
void *data;
int pid, status, refcount;
uint8_t exit_signal; // Signal used when killing (on Windows).
- uint64_t stopped_time; // process_stop() timestamp
+ uint64_t stopped_time; // proc_stop() timestamp
const char *cwd;
char **argv;
const char *exepath;
dict_T *env;
- Stream in, out, err;
- /// Exit handler. If set, user must call process_free().
- process_exit_cb cb;
- internal_process_cb internal_exit_cb, internal_close_cb;
+ Stream in;
+ RStream out, err;
+ /// Exit handler. If set, user must call proc_free().
+ proc_exit_cb cb;
+ internal_proc_cb internal_exit_cb, internal_close_cb;
bool closed, detach, overlapped, fwd_err;
MultiQueue *events;
};
diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_proc.c
index f77d686c10..5b445cdda7 100644
--- a/src/nvim/event/libuv_process.c
+++ b/src/nvim/event/libuv_proc.c
@@ -5,9 +5,9 @@
#include "nvim/eval/typval.h"
#include "nvim/event/defs.h"
-#include "nvim/event/libuv_process.h"
+#include "nvim/event/libuv_proc.h"
#include "nvim/event/loop.h"
-#include "nvim/event/process.h"
+#include "nvim/event/proc.h"
#include "nvim/log.h"
#include "nvim/os/os.h"
#include "nvim/os/os_defs.h"
@@ -15,15 +15,15 @@
#include "nvim/ui_client.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/libuv_process.c.generated.h"
+# include "event/libuv_proc.c.generated.h"
#endif
/// @returns zero on success, or negative error code
-int libuv_process_spawn(LibuvProcess *uvproc)
+int libuv_proc_spawn(LibuvProc *uvproc)
FUNC_ATTR_NONNULL_ALL
{
- Process *proc = (Process *)uvproc;
- uvproc->uvopts.file = process_get_exepath(proc);
+ Proc *proc = (Proc *)uvproc;
+ uvproc->uvopts.file = proc_get_exepath(proc);
uvproc->uvopts.args = proc->argv;
uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE;
#ifdef MSWIN
@@ -70,19 +70,19 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe);
}
- if (!proc->out.closed) {
+ if (!proc->out.s.closed) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
#ifdef MSWIN
// pipe must be readable for IOCP to work on Windows.
uvproc->uvstdio[1].flags |= proc->overlapped
? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0;
#endif
- uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.uv.pipe);
+ uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe);
}
- if (!proc->err.closed) {
+ if (!proc->err.s.closed) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.uv.pipe);
+ uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe);
} else if (proc->fwd_err) {
uvproc->uvstdio[2].flags = UV_INHERIT_FD;
uvproc->uvstdio[2].data.fd = STDERR_FILENO;
@@ -101,7 +101,7 @@ int libuv_process_spawn(LibuvProcess *uvproc)
return status;
}
-void libuv_process_close(LibuvProcess *uvproc)
+void libuv_proc_close(LibuvProc *uvproc)
FUNC_ATTR_NONNULL_ARG(1)
{
uv_close((uv_handle_t *)&uvproc->uv, close_cb);
@@ -109,11 +109,11 @@ void libuv_process_close(LibuvProcess *uvproc)
static void close_cb(uv_handle_t *handle)
{
- Process *proc = handle->data;
+ Proc *proc = handle->data;
if (proc->internal_close_cb) {
proc->internal_close_cb(proc);
}
- LibuvProcess *uvproc = (LibuvProcess *)proc;
+ LibuvProc *uvproc = (LibuvProc *)proc;
if (uvproc->uvopts.env) {
os_free_fullenv(uvproc->uvopts.env);
}
@@ -121,7 +121,7 @@ static void close_cb(uv_handle_t *handle)
static void exit_cb(uv_process_t *handle, int64_t status, int term_signal)
{
- Process *proc = handle->data;
+ Proc *proc = handle->data;
#if defined(MSWIN)
// Use stored/expected signal.
term_signal = proc->exit_signal;
@@ -130,10 +130,10 @@ static void exit_cb(uv_process_t *handle, int64_t status, int term_signal)
proc->internal_exit_cb(proc);
}
-LibuvProcess libuv_process_init(Loop *loop, void *data)
+LibuvProc libuv_proc_init(Loop *loop, void *data)
{
- LibuvProcess rv = {
- .process = process_init(loop, kProcessTypeUv, data)
+ LibuvProc rv = {
+ .proc = proc_init(loop, kProcTypeUv, data)
};
return rv;
}
diff --git a/src/nvim/event/libuv_process.h b/src/nvim/event/libuv_proc.h
index 12401dbb35..3127e166c0 100644
--- a/src/nvim/event/libuv_process.h
+++ b/src/nvim/event/libuv_proc.h
@@ -5,12 +5,12 @@
#include "nvim/event/defs.h"
typedef struct {
- Process process;
+ Proc proc;
uv_process_t uv;
uv_process_options_t uvopts;
uv_stdio_container_t uvstdio[4];
-} LibuvProcess;
+} LibuvProc;
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/libuv_process.h.generated.h"
+# include "event/libuv_proc.h.generated.h"
#endif
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
index e1ebcecbd6..15d993cc62 100644
--- a/src/nvim/event/loop.c
+++ b/src/nvim/event/loop.c
@@ -20,7 +20,7 @@ void loop_init(Loop *loop, void *data)
loop->recursive = 0;
loop->closing = false;
loop->uv.data = loop;
- loop->children = kl_init(WatcherPtr);
+ kv_init(loop->children);
loop->events = multiqueue_new_parent(loop_on_put, loop);
loop->fast_events = multiqueue_new_child(loop->events);
loop->thread_events = multiqueue_new_parent(NULL, NULL);
@@ -187,7 +187,7 @@ bool loop_close(Loop *loop, bool wait)
multiqueue_free(loop->fast_events);
multiqueue_free(loop->thread_events);
multiqueue_free(loop->events);
- kl_destroy(WatcherPtr, loop->children);
+ kv_destroy(loop->children);
return rv;
}
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
index 6ecc7cb781..563b254a0b 100644
--- a/src/nvim/event/loop.h
+++ b/src/nvim/event/loop.h
@@ -3,32 +3,26 @@
#include <stdbool.h>
#include <uv.h>
-#include "klib/klist.h"
+#include "klib/kvec.h"
#include "nvim/event/defs.h" // IWYU pragma: keep
#include "nvim/types_defs.h" // IWYU pragma: keep
-typedef void *WatcherPtr;
-
-#define _NOOP(x)
-KLIST_INIT(WatcherPtr, WatcherPtr, _NOOP)
-
struct loop {
uv_loop_t uv;
MultiQueue *events;
MultiQueue *thread_events;
- // Immediate events:
- // "Processed after exiting uv_run() (to avoid recursion), but before
- // returning from loop_poll_events()." 502aee690c98
- // Practical consequence (for main_loop): these events are processed by
- // state_enter()..os_inchar()
- // whereas "regular" events (main_loop.events) are processed by
- // state_enter()..VimState.execute()
- // But state_enter()..os_inchar() can be "too early" if you want the event
- // to trigger UI updates and other user-activity-related side-effects.
+ // Immediate events.
+ // - "Processed after exiting `uv_run()` (to avoid recursion), but before returning from
+ // `loop_poll_events()`." 502aee690c98
+ // - Practical consequence (for `main_loop`):
+ // - these are processed by `state_enter()..input_get()` whereas "regular" events
+ // (`main_loop.events`) are processed by `state_enter()..VimState.execute()`
+ // - `state_enter()..input_get()` can be "too early" if you want the event to trigger UI
+ // updates and other user-activity-related side-effects.
MultiQueue *fast_events;
// used by process/job-control subsystem
- klist_t(WatcherPtr) *children;
+ kvec_t(Proc *) children;
uv_signal_t children_watcher;
uv_timer_t children_kill_timer;
diff --git a/src/nvim/event/process.c b/src/nvim/event/proc.c
index 7460e92766..5ae3bd8c2d 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/proc.c
@@ -3,24 +3,24 @@
#include <signal.h>
#include <uv.h>
-#include "klib/klist.h"
-#include "nvim/event/libuv_process.h"
+#include "nvim/event/libuv_proc.h"
#include "nvim/event/loop.h"
#include "nvim/event/multiqueue.h"
-#include "nvim/event/process.h"
+#include "nvim/event/proc.h"
+#include "nvim/event/rstream.h"
#include "nvim/event/stream.h"
+#include "nvim/event/wstream.h"
#include "nvim/globals.h"
#include "nvim/log.h"
#include "nvim/main.h"
-#include "nvim/os/process.h"
-#include "nvim/os/pty_process.h"
+#include "nvim/os/proc.h"
+#include "nvim/os/pty_proc.h"
#include "nvim/os/shell.h"
#include "nvim/os/time.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/ui_client.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/process.c.generated.h"
+# include "event/proc.c.generated.h"
#endif
// Time for a process to exit cleanly before we send KILL.
@@ -32,13 +32,13 @@
void __gcov_flush(void);
#endif
-static bool process_is_tearing_down = false;
+static bool proc_is_tearing_down = false;
// Delay exit until handles are closed, to avoid deadlocks
static int exit_need_delay = 0;
/// @returns zero on success, or negative error code
-int process_spawn(Process *proc, bool in, bool out, bool err)
+int proc_spawn(Proc *proc, bool in, bool out, bool err)
FUNC_ATTR_NONNULL_ALL
{
// forwarding stderr contradicts with processing it internally
@@ -51,15 +51,15 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
}
if (out) {
- uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0);
+ uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0);
} else {
- proc->out.closed = true;
+ proc->out.s.closed = true;
}
if (err) {
- uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0);
+ uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0);
} else {
- proc->err.closed = true;
+ proc->err.s.closed = true;
}
#ifdef USE_GCOV
@@ -69,11 +69,11 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
int status;
switch (proc->type) {
- case kProcessTypeUv:
- status = libuv_process_spawn((LibuvProcess *)proc);
+ case kProcTypeUv:
+ status = libuv_proc_spawn((LibuvProc *)proc);
break;
- case kProcessTypePty:
- status = pty_process_spawn((PtyProcess *)proc);
+ case kProcTypePty:
+ status = pty_proc_spawn((PtyProc *)proc);
break;
}
@@ -82,18 +82,18 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
}
if (out) {
- uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL);
+ uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL);
}
if (err) {
- uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL);
+ uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL);
}
- if (proc->type == kProcessTypeUv) {
- uv_close((uv_handle_t *)&(((LibuvProcess *)proc)->uv), NULL);
+ if (proc->type == kProcTypeUv) {
+ uv_close((uv_handle_t *)&(((LibuvProc *)proc)->uv), NULL);
} else {
- process_close(proc);
+ proc_close(proc);
}
- process_free(proc);
+ proc_free(proc);
proc->status = -1;
return status;
}
@@ -101,56 +101,56 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
if (in) {
stream_init(NULL, &proc->in, -1, (uv_stream_t *)&proc->in.uv.pipe);
proc->in.internal_data = proc;
- proc->in.internal_close_cb = on_process_stream_close;
+ proc->in.internal_close_cb = on_proc_stream_close;
proc->refcount++;
}
if (out) {
- stream_init(NULL, &proc->out, -1, (uv_stream_t *)&proc->out.uv.pipe);
- proc->out.internal_data = proc;
- proc->out.internal_close_cb = on_process_stream_close;
+ stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe);
+ proc->out.s.internal_data = proc;
+ proc->out.s.internal_close_cb = on_proc_stream_close;
proc->refcount++;
}
if (err) {
- stream_init(NULL, &proc->err, -1, (uv_stream_t *)&proc->err.uv.pipe);
- proc->err.internal_data = proc;
- proc->err.internal_close_cb = on_process_stream_close;
+ stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe);
+ proc->err.s.internal_data = proc;
+ proc->err.s.internal_close_cb = on_proc_stream_close;
proc->refcount++;
}
- proc->internal_exit_cb = on_process_exit;
+ proc->internal_exit_cb = on_proc_exit;
proc->internal_close_cb = decref;
proc->refcount++;
- kl_push(WatcherPtr, proc->loop->children, proc);
- DLOG("new: pid=%d exepath=[%s]", proc->pid, process_get_exepath(proc));
+ kv_push(proc->loop->children, proc);
+ DLOG("new: pid=%d exepath=[%s]", proc->pid, proc_get_exepath(proc));
return 0;
}
-void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
+void proc_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
{
- process_is_tearing_down = true;
- kl_iter(WatcherPtr, loop->children, current) {
- Process *proc = (*current)->data;
- if (proc->detach || proc->type == kProcessTypePty) {
+ proc_is_tearing_down = true;
+ for (size_t i = 0; i < kv_size(loop->children); i++) {
+ Proc *proc = kv_A(loop->children, i);
+ if (proc->detach || proc->type == kProcTypePty) {
// Close handles to process without killing it.
- CREATE_EVENT(loop->events, process_close_handles, proc);
+ CREATE_EVENT(loop->events, proc_close_handles, proc);
} else {
- process_stop(proc);
+ proc_stop(proc);
}
}
// Wait until all children exit and all close events are processed.
LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1,
- kl_empty(loop->children) && multiqueue_empty(loop->events));
- pty_process_teardown(loop);
+ kv_size(loop->children) == 0 && multiqueue_empty(loop->events));
+ pty_proc_teardown(loop);
}
-void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
+void proc_close_streams(Proc *proc) FUNC_ATTR_NONNULL_ALL
{
- stream_may_close(&proc->in);
- stream_may_close(&proc->out);
- stream_may_close(&proc->err);
+ wstream_may_close(&proc->in);
+ rstream_may_close(&proc->out);
+ rstream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@@ -161,7 +161,7 @@ void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
/// @return Exit code of the process. proc->status will have the same value.
/// -1 if the timeout expired while the process is still running.
/// -2 if the user interrupted the wait.
-int process_wait(Process *proc, int ms, MultiQueue *events)
+int proc_wait(Proc *proc, int ms, MultiQueue *events)
FUNC_ATTR_NONNULL_ARG(1)
{
if (!proc->refcount) {
@@ -185,7 +185,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
// Assume that a user hitting CTRL-C does not like the current job. Kill it.
if (got_int) {
got_int = false;
- process_stop(proc);
+ proc_stop(proc);
if (ms == -1) {
// We can only return if all streams/handles are closed and the job
// exited.
@@ -213,7 +213,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
}
/// Ask a process to terminate and eventually kill if it doesn't respond
-void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
+void proc_stop(Proc *proc) FUNC_ATTR_NONNULL_ALL
{
bool exited = (proc->status >= 0);
if (exited || proc->stopped_time) {
@@ -223,13 +223,13 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
proc->exit_signal = SIGTERM;
switch (proc->type) {
- case kProcessTypeUv:
+ case kProcTypeUv:
os_proc_tree_kill(proc->pid, SIGTERM);
break;
- case kProcessTypePty:
+ case kProcTypePty:
// close all streams for pty processes to send SIGHUP to the process
- process_close_streams(proc);
- pty_process_close_master((PtyProcess *)proc);
+ proc_close_streams(proc);
+ pty_proc_close_master((PtyProc *)proc);
break;
}
@@ -239,7 +239,7 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
}
/// Frees process-owned resources.
-void process_free(Process *proc) FUNC_ATTR_NONNULL_ALL
+void proc_free(Proc *proc) FUNC_ATTR_NONNULL_ALL
{
if (proc->argv != NULL) {
shell_free_argv(proc->argv);
@@ -248,19 +248,19 @@ void process_free(Process *proc) FUNC_ATTR_NONNULL_ALL
}
/// Sends SIGKILL (or SIGTERM..SIGKILL for PTY jobs) to processes that did
-/// not terminate after process_stop().
+/// not terminate after proc_stop().
static void children_kill_cb(uv_timer_t *handle)
{
Loop *loop = handle->loop->data;
- kl_iter(WatcherPtr, loop->children, current) {
- Process *proc = (*current)->data;
+ for (size_t i = 0; i < kv_size(loop->children); i++) {
+ Proc *proc = kv_A(loop->children, i);
bool exited = (proc->status >= 0);
if (exited || !proc->stopped_time) {
continue;
}
uint64_t term_sent = UINT64_MAX == proc->stopped_time;
- if (kProcessTypePty != proc->type || term_sent) {
+ if (kProcTypePty != proc->type || term_sent) {
proc->exit_signal = SIGKILL;
os_proc_tree_kill(proc->pid, SIGKILL);
} else {
@@ -274,41 +274,45 @@ static void children_kill_cb(uv_timer_t *handle)
}
}
-static void process_close_event(void **argv)
+static void proc_close_event(void **argv)
{
- Process *proc = argv[0];
+ Proc *proc = argv[0];
if (proc->cb) {
// User (hint: channel_job_start) is responsible for calling
- // process_free().
+ // proc_free().
proc->cb(proc, proc->status, proc->data);
} else {
- process_free(proc);
+ proc_free(proc);
}
}
-static void decref(Process *proc)
+static void decref(Proc *proc)
{
if (--proc->refcount != 0) {
return;
}
Loop *loop = proc->loop;
- kliter_t(WatcherPtr) **node = NULL;
- kl_iter(WatcherPtr, loop->children, current) {
- if ((*current)->data == proc) {
- node = current;
+ size_t i;
+ for (i = 0; i < kv_size(loop->children); i++) {
+ Proc *current = kv_A(loop->children, i);
+ if (current == proc) {
break;
}
}
- assert(node);
- kl_shift_at(WatcherPtr, loop->children, node);
- CREATE_EVENT(proc->events, process_close_event, proc);
+ assert(i < kv_size(loop->children)); // element found
+ if (i < kv_size(loop->children) - 1) {
+ memmove(&kv_A(loop->children, i), &kv_A(loop->children, i + 1),
+ sizeof(&kv_A(loop->children, i)) * (kv_size(loop->children) - (i + 1)));
+ }
+ kv_size(loop->children)--;
+ CREATE_EVENT(proc->events, proc_close_event, proc);
}
-static void process_close(Process *proc)
+static void proc_close(Proc *proc)
FUNC_ATTR_NONNULL_ARG(1)
{
- if (process_is_tearing_down && (proc->detach || proc->type == kProcessTypePty)
+ if (proc_is_tearing_down && (proc->detach || proc->type == kProcTypePty)
&& proc->closed) {
// If a detached/pty process dies while tearing down it might get closed
// twice.
@@ -318,17 +322,17 @@ static void process_close(Process *proc)
proc->closed = true;
if (proc->detach) {
- if (proc->type == kProcessTypeUv) {
- uv_unref((uv_handle_t *)&(((LibuvProcess *)proc)->uv));
+ if (proc->type == kProcTypeUv) {
+ uv_unref((uv_handle_t *)&(((LibuvProc *)proc)->uv));
}
}
switch (proc->type) {
- case kProcessTypeUv:
- libuv_process_close((LibuvProcess *)proc);
+ case kProcTypeUv:
+ libuv_proc_close((LibuvProc *)proc);
break;
- case kProcessTypePty:
- pty_process_close((PtyProcess *)proc);
+ case kProcTypePty:
+ pty_proc_close((PtyProc *)proc);
break;
}
}
@@ -337,10 +341,10 @@ static void process_close(Process *proc)
///
/// @param proc Process, for which an output stream should be flushed.
/// @param stream Stream to flush.
-static void flush_stream(Process *proc, Stream *stream)
+static void flush_stream(Proc *proc, RStream *stream)
FUNC_ATTR_NONNULL_ARG(1)
{
- if (!stream || stream->closed) {
+ if (!stream || stream->s.closed) {
return;
}
@@ -350,23 +354,23 @@ static void flush_stream(Process *proc, Stream *stream)
// keeps sending data, we only accept as much data as the system buffer size.
// Otherwise this would block cleanup/teardown.
int system_buffer_size = 0;
- int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe,
+ int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe,
&system_buffer_size);
if (err) {
- system_buffer_size = (int)rbuffer_capacity(stream->buffer);
+ system_buffer_size = ARENA_BLOCK_SIZE;
}
size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size;
// Read remaining data.
- while (!stream->closed && stream->num_bytes < max_bytes) {
+ while (!stream->s.closed && stream->num_bytes < max_bytes) {
// Remember number of bytes before polling
size_t num_bytes = stream->num_bytes;
// Poll for data and process the generated events.
loop_poll_events(proc->loop, 0);
- if (stream->events) {
- multiqueue_process_events(stream->events);
+ if (stream->s.events) {
+ multiqueue_process_events(stream->s.events);
}
// Stream can be closed if it is empty.
@@ -374,23 +378,23 @@ static void flush_stream(Process *proc, Stream *stream)
if (stream->read_cb && !stream->did_eof) {
// Stream callback could miss EOF handling if a child keeps the stream
// open. But only send EOF if we haven't already.
- stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
+ stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true);
}
break;
}
}
}
-static void process_close_handles(void **argv)
+static void proc_close_handles(void **argv)
{
- Process *proc = argv[0];
+ Proc *proc = argv[0];
exit_need_delay++;
flush_stream(proc, &proc->out);
flush_stream(proc, &proc->err);
- process_close_streams(proc);
- process_close(proc);
+ proc_close_streams(proc);
+ proc_close(proc);
exit_need_delay--;
}
@@ -425,7 +429,7 @@ void exit_from_channel(int status)
multiqueue_put(main_loop.fast_events, exit_event, (void *)(intptr_t)status);
}
-static void on_process_exit(Process *proc)
+static void on_proc_exit(Proc *proc)
{
Loop *loop = proc->loop;
ILOG("exited: pid=%d status=%d stoptime=%" PRIu64, proc->pid, proc->status,
@@ -438,13 +442,13 @@ static void on_process_exit(Process *proc)
// Process has terminated, but there could still be data to be read from the
// OS. We are still in the libuv loop, so we cannot call code that polls for
// more data directly. Instead delay the reading after the libuv loop by
- // queueing process_close_handles() as an event.
+ // queueing proc_close_handles() as an event.
MultiQueue *queue = proc->events ? proc->events : loop->events;
- CREATE_EVENT(queue, process_close_handles, proc);
+ CREATE_EVENT(queue, proc_close_handles, proc);
}
-static void on_process_stream_close(Stream *stream, void *data)
+static void on_proc_stream_close(Stream *stream, void *data)
{
- Process *proc = data;
+ Proc *proc = data;
decref(proc);
}
diff --git a/src/nvim/event/process.h b/src/nvim/event/proc.h
index 421a470244..f525d46f87 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/proc.h
@@ -6,9 +6,9 @@
#include "nvim/event/defs.h" // IWYU pragma: keep
#include "nvim/types_defs.h"
-static inline Process process_init(Loop *loop, ProcessType type, void *data)
+static inline Proc proc_init(Loop *loop, ProcType type, void *data)
{
- return (Process) {
+ return (Proc) {
.type = type,
.data = data,
.loop = loop,
@@ -21,8 +21,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.argv = NULL,
.exepath = NULL,
.in = { .closed = false },
- .out = { .closed = false },
- .err = { .closed = false },
+ .out = { .s.closed = false },
+ .err = { .s.closed = false },
.cb = NULL,
.closed = false,
.internal_close_cb = NULL,
@@ -33,17 +33,17 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
}
/// Get the path to the executable of the process.
-static inline const char *process_get_exepath(Process *proc)
+static inline const char *proc_get_exepath(Proc *proc)
{
return proc->exepath != NULL ? proc->exepath : proc->argv[0];
}
-static inline bool process_is_stopped(Process *proc)
+static inline bool proc_is_stopped(Proc *proc)
{
bool exited = (proc->status >= 0);
return exited || (proc->stopped_time != 0);
}
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "event/process.h.generated.h"
+# include "event/proc.h.generated.h"
#endif
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 6b4ab472e4..15bdc547d5 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -11,75 +11,80 @@
#include "nvim/macros_defs.h"
#include "nvim/main.h"
#include "nvim/os/os_defs.h"
-#include "nvim/rbuffer.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/rstream.c.generated.h"
#endif
-void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
+void rstream_init_fd(Loop *loop, RStream *stream, int fd)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
- stream_init(loop, stream, fd, NULL);
- rstream_init(stream, bufsize);
+ stream_init(loop, &stream->s, fd, NULL);
+ rstream_init(stream);
}
-void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
+void rstream_init_stream(RStream *stream, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
- stream_init(NULL, stream, -1, uvstream);
- rstream_init(stream, bufsize);
+ stream_init(NULL, &stream->s, -1, uvstream);
+ rstream_init(stream);
}
-void rstream_init(Stream *stream, size_t bufsize)
+void rstream_init(RStream *stream)
FUNC_ATTR_NONNULL_ARG(1)
{
- stream->buffer = rbuffer_new(bufsize);
- stream->buffer->data = stream;
- stream->buffer->full_cb = on_rbuffer_full;
- stream->buffer->nonfull_cb = on_rbuffer_nonfull;
+ stream->read_cb = NULL;
+ stream->num_bytes = 0;
+ stream->buffer = alloc_block();
+ stream->read_pos = stream->write_pos = stream->buffer;
+}
+
+void rstream_start_inner(RStream *stream)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ if (stream->s.uvstream) {
+ uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
+ } else {
+ uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
+ }
}
/// Starts watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
-void rstream_start(Stream *stream, stream_read_cb cb, void *data)
+void rstream_start(RStream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
- stream->cb_data = data;
- if (stream->uvstream) {
- uv_read_start(stream->uvstream, alloc_cb, read_cb);
- } else {
- uv_idle_start(&stream->uv.idle, fread_idle_cb);
+ stream->s.cb_data = data;
+ stream->want_read = true;
+ if (!stream->paused_full) {
+ rstream_start_inner(stream);
}
}
/// Stops watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
-void rstream_stop(Stream *stream)
+void rstream_stop_inner(RStream *stream)
FUNC_ATTR_NONNULL_ALL
{
- if (stream->uvstream) {
- uv_read_stop(stream->uvstream);
+ if (stream->s.uvstream) {
+ uv_read_stop(stream->s.uvstream);
} else {
- uv_idle_stop(&stream->uv.idle);
+ uv_idle_stop(&stream->s.uv.idle);
}
}
-static void on_rbuffer_full(RBuffer *buf, void *data)
-{
- rstream_stop(data);
-}
-
-static void on_rbuffer_nonfull(RBuffer *buf, void *data)
+/// Stops watching for events from a `Stream` instance.
+///
+/// @param stream The `Stream` instance
+void rstream_stop(RStream *stream)
+ FUNC_ATTR_NONNULL_ALL
{
- Stream *stream = data;
- assert(stream->read_cb);
- rstream_start(stream, stream->read_cb, stream->cb_data);
+ rstream_stop_inner(stream);
+ stream->want_read = false;
}
// Callbacks used by libuv
@@ -87,11 +92,10 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
/// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
- Stream *stream = handle->data;
- // `uv_buf_t.len` happens to have different size on Windows.
- size_t write_count;
- buf->base = rbuffer_write_ptr(stream->buffer, &write_count);
- buf->len = UV_BUF_LEN(write_count);
+ RStream *stream = handle->data;
+ buf->base = stream->write_pos;
+ // `uv_buf_t.len` happens to have different size on Windows (as a treat)
+ buf->len = UV_BUF_LEN(rstream_space(stream));
}
/// Callback invoked by libuv after it copies the data into the buffer provided
@@ -99,27 +103,27 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
/// 0-length buffer.
static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
- Stream *stream = uvstream->data;
+ RStream *stream = uvstream->data;
if (cnt <= 0) {
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
//
- // We don't need to do anything with the RBuffer because the next call
+ // We don't need to do anything with the buffer because the next call
// to `alloc_cb` will return the same unused pointer (`rbuffer_produced`
// won't be called)
if (cnt == UV_ENOBUFS || cnt == 0) {
return;
} else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
// The TTY driver might signal EOF without closing the stream
- invoke_read_cb(stream, 0, true);
+ invoke_read_cb(stream, true);
} else {
DLOG("closing Stream (%p): %s (%s)", (void *)stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(uvstream);
- invoke_read_cb(stream, 0, true);
+ invoke_read_cb(stream, true);
}
return;
}
@@ -127,10 +131,13 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
stream->num_bytes += nread;
- // Data was already written, so all we need is to update 'wpos' to reflect
- // the space actually used in the buffer.
- rbuffer_produced(stream->buffer, nread);
- invoke_read_cb(stream, nread, false);
+ stream->write_pos += cnt;
+ invoke_read_cb(stream, false);
+}
+
+static size_t rstream_space(RStream *stream)
+{
+ return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos);
}
/// Called by the by the 'idle' handle to emulate a reading event
@@ -141,66 +148,91 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
static void fread_idle_cb(uv_idle_t *handle)
{
uv_fs_t req;
- Stream *stream = handle->data;
+ RStream *stream = handle->data;
+ stream->uvbuf.base = stream->write_pos;
// `uv_buf_t.len` happens to have different size on Windows.
- size_t write_count;
- stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count);
- stream->uvbuf.len = UV_BUF_LEN(write_count);
-
- // the offset argument to uv_fs_read is int64_t, could someone really try
- // to read more than 9 quintillion (9e18) bytes?
- // upcast is meant to avoid tautological condition warning on 32 bits
- uintmax_t fpos_intmax = stream->fpos;
- if (fpos_intmax > INT64_MAX) {
- ELOG("stream offset overflow");
- preserve_exit("stream offset overflow");
- }
+ stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream));
// Synchronous read
- uv_fs_read(handle->loop,
- &req,
- stream->fd,
- &stream->uvbuf,
- 1,
- (int64_t)stream->fpos,
- NULL);
+ uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->s.fpos, NULL);
uv_fs_req_cleanup(&req);
if (req.result <= 0) {
- uv_idle_stop(&stream->uv.idle);
- invoke_read_cb(stream, 0, true);
+ uv_idle_stop(&stream->s.uv.idle);
+ invoke_read_cb(stream, true);
return;
}
- // no errors (req.result (ssize_t) is positive), it's safe to cast.
- size_t nread = (size_t)req.result;
- rbuffer_produced(stream->buffer, nread);
- stream->fpos += nread;
- invoke_read_cb(stream, nread, false);
+ // no errors (req.result (ssize_t) is positive), it's safe to use.
+ stream->write_pos += req.result;
+ stream->s.fpos += req.result;
+ invoke_read_cb(stream, false);
}
static void read_event(void **argv)
{
- Stream *stream = argv[0];
+ RStream *stream = argv[0];
+ stream->pending_read = false;
if (stream->read_cb) {
- size_t count = (uintptr_t)argv[1];
- bool eof = (uintptr_t)argv[2];
- stream->did_eof = eof;
- stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
+ size_t available = rstream_available(stream);
+ size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data,
+ stream->did_eof);
+ assert(consumed <= available);
+ rstream_consume(stream, consumed);
+ }
+ stream->s.pending_reqs--;
+ if (stream->s.closed && !stream->s.pending_reqs) {
+ stream_close_handle(&stream->s, true);
+ }
+}
+
+size_t rstream_available(RStream *stream)
+{
+ return (size_t)(stream->write_pos - stream->read_pos);
+}
+
+void rstream_consume(RStream *stream, size_t consumed)
+{
+ stream->read_pos += consumed;
+ size_t remaining = (size_t)(stream->write_pos - stream->read_pos);
+ if (remaining > 0 && stream->read_pos > stream->buffer) {
+ memmove(stream->buffer, stream->read_pos, remaining);
+ stream->read_pos = stream->buffer;
+ stream->write_pos = stream->buffer + remaining;
+ } else if (remaining == 0) {
+ stream->read_pos = stream->write_pos = stream->buffer;
}
- stream->pending_reqs--;
- if (stream->closed && !stream->pending_reqs) {
- stream_close_handle(stream);
+
+ if (stream->want_read && stream->paused_full && rstream_space(stream)) {
+ assert(stream->read_cb);
+ stream->paused_full = false;
+ rstream_start_inner(stream);
}
}
-static void invoke_read_cb(Stream *stream, size_t count, bool eof)
+static void invoke_read_cb(RStream *stream, bool eof)
{
+ stream->did_eof |= eof;
+
+ if (!rstream_space(stream)) {
+ rstream_stop_inner(stream);
+ stream->paused_full = true;
+ }
+
+ // we cannot use pending_reqs as a socket can have both pending reads and writes
+ if (stream->pending_read) {
+ return;
+ }
+
// Don't let the stream be closed before the event is processed.
- stream->pending_reqs++;
+ stream->s.pending_reqs++;
+ stream->pending_read = true;
+ CREATE_EVENT(stream->s.events, read_event, stream);
+}
- CREATE_EVENT(stream->events, read_event,
- stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
+void rstream_may_close(RStream *stream)
+{
+ stream_may_close(&stream->s, true);
}
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index 4e878a2ecf..1214c3e336 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -35,7 +35,7 @@ int socket_watcher_init(Loop *loop, SocketWatcher *watcher, const char *endpoint
if (host_end && addr != host_end) {
// Split user specified address into two strings, addr(hostname) and port.
// The port part in watcher->addr will be updated later.
- *host_end = '\0';
+ *host_end = NUL;
char *port = host_end + 1;
intmax_t iport;
@@ -135,17 +135,17 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0;
}
-int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
+int socket_watcher_accept(SocketWatcher *watcher, RStream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
if (watcher->stream->type == UV_TCP) {
- client = (uv_stream_t *)(&stream->uv.tcp);
+ client = (uv_stream_t *)(&stream->s.uv.tcp);
uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client);
uv_tcp_nodelay((uv_tcp_t *)client, true);
} else {
- client = (uv_stream_t *)&stream->uv.pipe;
+ client = (uv_stream_t *)&stream->s.uv.pipe;
uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0);
}
@@ -156,7 +156,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
return result;
}
- stream_init(NULL, stream, -1, client);
+ stream_init(NULL, &stream->s, -1, client);
return 0;
}
@@ -197,7 +197,7 @@ static void connect_cb(uv_connect_t *req, int status)
}
}
-bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address, int timeout,
+bool socket_connect(Loop *loop, RStream *stream, bool is_tcp, const char *address, int timeout,
const char **error)
{
bool success = false;
@@ -206,7 +206,7 @@ bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address
req.data = &status;
uv_stream_t *uv_stream;
- uv_tcp_t *tcp = &stream->uv.tcp;
+ uv_tcp_t *tcp = &stream->s.uv.tcp;
uv_getaddrinfo_t addr_req;
addr_req.addrinfo = NULL;
const struct addrinfo *addrinfo = NULL;
@@ -237,7 +237,7 @@ tcp_retry:
uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb);
uv_stream = (uv_stream_t *)tcp;
} else {
- uv_pipe_t *pipe = &stream->uv.pipe;
+ uv_pipe_t *pipe = &stream->s.uv.pipe;
uv_pipe_init(&loop->uv, pipe, 0);
uv_pipe_connect(&req, pipe, address, connect_cb);
uv_stream = (uv_stream_t *)pipe;
@@ -245,7 +245,7 @@ tcp_retry:
status = 1;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1);
if (status == 0) {
- stream_init(NULL, stream, -1, uv_stream);
+ stream_init(NULL, &stream->s, -1, uv_stream);
success = true;
} else if (is_tcp && addrinfo->ai_next) {
addrinfo = addrinfo->ai_next;
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 0b9ed4f25b..71de6ee1ba 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -8,7 +8,6 @@
#include "nvim/event/loop.h"
#include "nvim/event/stream.h"
#include "nvim/log.h"
-#include "nvim/rbuffer.h"
#include "nvim/types_defs.h"
#ifdef MSWIN
# include "nvim/os/os_win_console.h"
@@ -45,6 +44,8 @@ int stream_set_blocking(int fd, bool blocking)
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(2)
{
+ // The underlying stream is either a file or an existing uv stream.
+ assert(uvstream == NULL ? fd >= 0 : fd < 0);
stream->uvstream = uvstream;
if (fd >= 0) {
@@ -84,29 +85,29 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
stream->uvstream->data = stream;
}
- stream->internal_data = NULL;
stream->fpos = 0;
+ stream->internal_data = NULL;
stream->curmem = 0;
stream->maxmem = 0;
stream->pending_reqs = 0;
- stream->read_cb = NULL;
stream->write_cb = NULL;
stream->close_cb = NULL;
stream->internal_close_cb = NULL;
stream->closed = false;
- stream->buffer = NULL;
stream->events = NULL;
- stream->num_bytes = 0;
}
-void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
+void stream_may_close(Stream *stream, bool rstream)
FUNC_ATTR_NONNULL_ARG(1)
{
+ if (stream->closed) {
+ return;
+ }
assert(!stream->closed);
DLOG("closing Stream: %p", (void *)stream);
stream->closed = true;
- stream->close_cb = on_stream_close;
- stream->close_cb_data = data;
+ stream->close_cb = NULL;
+ stream->close_cb_data = NULL;
#ifdef MSWIN
if (UV_TTY == uv_guess_handle(stream->fd)) {
@@ -116,18 +117,11 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
#endif
if (!stream->pending_reqs) {
- stream_close_handle(stream);
- }
-}
-
-void stream_may_close(Stream *stream)
-{
- if (!stream->closed) {
- stream_close(stream, NULL, NULL);
+ stream_close_handle(stream, rstream);
}
}
-void stream_close_handle(Stream *stream)
+void stream_close_handle(Stream *stream, bool rstream)
FUNC_ATTR_NONNULL_ALL
{
uv_handle_t *handle = NULL;
@@ -145,16 +139,22 @@ void stream_close_handle(Stream *stream)
assert(handle != NULL);
if (!uv_is_closing(handle)) {
- uv_close(handle, close_cb);
+ uv_close(handle, rstream ? rstream_close_cb : close_cb);
}
}
-static void close_cb(uv_handle_t *handle)
+static void rstream_close_cb(uv_handle_t *handle)
{
- Stream *stream = handle->data;
+ RStream *stream = handle->data;
if (stream->buffer) {
- rbuffer_free(stream->buffer);
+ free_block(stream->buffer);
}
+ close_cb(handle);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ Stream *stream = handle->data;
if (stream->close_cb) {
stream->close_cb(stream, stream->close_cb_data);
}
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index c67a9b96ed..5005c4e84f 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -73,6 +73,26 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
// This should not be called after a stream was freed
assert(!stream->closed);
+ uv_buf_t uvbuf;
+ uvbuf.base = buffer->data;
+ uvbuf.len = UV_BUF_LEN(buffer->size);
+
+ if (!stream->uvstream) {
+ uv_fs_t req;
+
+ // Synchronous write
+ uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL);
+
+ uv_fs_req_cleanup(&req);
+
+ wstream_release_wbuffer(buffer);
+
+ assert(stream->write_cb == NULL);
+
+ stream->fpos += MAX(req.result, 0);
+ return req.result > 0;
+ }
+
if (stream->curmem > stream->maxmem) {
goto err;
}
@@ -84,10 +104,6 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
data->buffer = buffer;
data->uv_req.data = data;
- uv_buf_t uvbuf;
- uvbuf.base = buffer->data;
- uvbuf.len = UV_BUF_LEN(buffer->size);
-
if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) {
xfree(data);
goto err;
@@ -141,7 +157,7 @@ static void write_cb(uv_write_t *req, int status)
if (data->stream->closed && data->stream->pending_reqs == 0) {
// Last pending write, free the stream;
- stream_close_handle(data->stream);
+ stream_close_handle(data->stream, false);
}
xfree(data);
@@ -158,3 +174,8 @@ void wstream_release_wbuffer(WBuffer *buffer)
xfree(buffer);
}
}
+
+void wstream_may_close(Stream *stream)
+{
+ stream_may_close(stream, false);
+}