aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
authorJames McCoy <jamessan@jamessan.com>2018-03-28 21:52:06 -0400
committerJames McCoy <jamessan@jamessan.com>2018-03-28 21:54:39 -0400
commit79f9c2d9c650ceab27cdc6707fd6d7fa1de29fc1 (patch)
tree4e0589d75801f3ff6a9678f84f5009102766661e /src/nvim/event
parent4403864da3c48412595d439f36458d1e6ccfc49f (diff)
parent3f3de9b1a95d273463a87516365510dbffcaf3d2 (diff)
downloadrneovim-79f9c2d9c650ceab27cdc6707fd6d7fa1de29fc1.tar.gz
rneovim-79f9c2d9c650ceab27cdc6707fd6d7fa1de29fc1.tar.bz2
rneovim-79f9c2d9c650ceab27cdc6707fd6d7fa1de29fc1.zip
Merge branch 'master' into yagebu/option-fixes
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/libuv_process.c23
-rw-r--r--src/nvim/event/loop.c49
-rw-r--r--src/nvim/event/loop.h21
-rw-r--r--src/nvim/event/process.c161
-rw-r--r--src/nvim/event/process.h19
-rw-r--r--src/nvim/event/rstream.c30
-rw-r--r--src/nvim/event/socket.c9
-rw-r--r--src/nvim/event/stream.c9
-rw-r--r--src/nvim/event/stream.h8
-rw-r--r--src/nvim/event/wstream.c3
10 files changed, 196 insertions, 136 deletions
diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c
index f6a567a520..0c2bf397e5 100644
--- a/src/nvim/event/libuv_process.c
+++ b/src/nvim/event/libuv_process.c
@@ -26,19 +26,22 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvopts.file = proc->argv[0];
uvproc->uvopts.args = proc->argv;
uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE;
- if (proc->detach) {
- uvproc->uvopts.flags |= UV_PROCESS_DETACHED;
- }
#ifdef WIN32
// libuv collapses the argv to a CommandLineToArgvW()-style string. cmd.exe
// expects a different syntax (must be prepared by the caller before now).
if (os_shell_is_cmdexe(proc->argv[0])) {
uvproc->uvopts.flags |= UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS;
}
+ if (proc->detach) {
+ uvproc->uvopts.flags |= UV_PROCESS_DETACHED;
+ }
+#else
+ // Always setsid() on unix-likes. #8107
+ uvproc->uvopts.flags |= UV_PROCESS_DETACHED;
#endif
uvproc->uvopts.exit_cb = exit_cb;
uvproc->uvopts.cwd = proc->cwd;
- uvproc->uvopts.env = NULL;
+ uvproc->uvopts.env = NULL; // Inherits the parent (nvim) env.
uvproc->uvopts.stdio = uvproc->uvstdio;
uvproc->uvopts.stdio_count = 3;
uvproc->uvstdio[0].flags = UV_IGNORE;
@@ -46,22 +49,22 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvstdio[2].flags = UV_IGNORE;
uvproc->uv.data = proc;
- if (proc->in) {
+ if (!proc->in.closed) {
uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t,
- &proc->in->uv.pipe);
+ &proc->in.uv.pipe);
}
- if (proc->out) {
+ if (!proc->out.closed) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t,
- &proc->out->uv.pipe);
+ &proc->out.uv.pipe);
}
- if (proc->err) {
+ if (!proc->err.closed) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t,
- &proc->err->uv.pipe);
+ &proc->err.uv.pipe);
}
int status;
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
index 25701a1621..d92464f17b 100644
--- a/src/nvim/event/loop.c
+++ b/src/nvim/event/loop.c
@@ -30,19 +30,25 @@ void loop_init(Loop *loop, void *data)
uv_signal_init(&loop->uv, &loop->children_watcher);
uv_timer_init(&loop->uv, &loop->children_kill_timer);
uv_timer_init(&loop->uv, &loop->poll_timer);
+ loop->poll_timer.data = xmalloc(sizeof(bool)); // "timeout expired" flag
}
-void loop_poll_events(Loop *loop, int ms)
+/// Processes one `Loop.uv` event (at most).
+/// Processes all `Loop.fast_events` events.
+///
+/// @returns true if `ms` timeout was reached
+bool loop_poll_events(Loop *loop, int ms)
{
if (loop->recursive++) {
abort(); // Should not re-enter uv_run
}
uv_run_mode mode = UV_RUN_ONCE;
+ bool timeout_expired = false;
if (ms > 0) {
- // Use a repeating timeout of ms milliseconds to make sure
- // we do not block indefinitely for I/O.
+ *((bool *)loop->poll_timer.data) = false; // reset "timeout expired" flag
+ // Dummy timer to ensure UV_RUN_ONCE does not block indefinitely for I/O.
uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
} else if (ms == 0) {
// For ms == 0, do a non-blocking event poll.
@@ -52,14 +58,23 @@ void loop_poll_events(Loop *loop, int ms)
uv_run(&loop->uv, mode);
if (ms > 0) {
+ timeout_expired = *((bool *)loop->poll_timer.data);
uv_timer_stop(&loop->poll_timer);
}
loop->recursive--; // Can re-enter uv_run now
multiqueue_process_events(loop->fast_events);
+ return timeout_expired;
}
-// Schedule an event from another thread
+/// Schedules an event from another thread.
+///
+/// @note Event is queued into `fast_events`, which is processed outside of the
+/// primary `events` queue by loop_poll_events(). For `main_loop`, that
+/// means `fast_events` is NOT processed in an "editor mode"
+/// (VimState.execute), so redraw and other side-effects are likely to be
+/// skipped.
+/// @see loop_schedule_deferred
void loop_schedule(Loop *loop, Event event)
{
uv_mutex_lock(&loop->mutex);
@@ -68,6 +83,24 @@ void loop_schedule(Loop *loop, Event event)
uv_mutex_unlock(&loop->mutex);
}
+/// Schedules an event from another thread. Unlike loop_schedule(), the event
+/// is forwarded to `Loop.events`, instead of being processed immediately.
+///
+/// @see loop_schedule
+void loop_schedule_deferred(Loop *loop, Event event)
+{
+ Event *eventp = xmalloc(sizeof(*eventp));
+ *eventp = event;
+ loop_schedule(loop, event_create(loop_deferred_event, 2, loop, eventp));
+}
+static void loop_deferred_event(void **argv)
+{
+ Loop *loop = argv[0];
+ Event *eventp = argv[1];
+ multiqueue_put_event(loop->events, *eventp);
+ xfree(eventp);
+}
+
void loop_on_put(MultiQueue *queue, void *data)
{
Loop *loop = data;
@@ -86,7 +119,7 @@ bool loop_close(Loop *loop, bool wait)
uv_mutex_destroy(&loop->mutex);
uv_close((uv_handle_t *)&loop->children_watcher, NULL);
uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
- uv_close((uv_handle_t *)&loop->poll_timer, NULL);
+ uv_close((uv_handle_t *)&loop->poll_timer, timer_close_cb);
uv_close((uv_handle_t *)&loop->async, NULL);
uint64_t start = wait ? os_hrtime() : 0;
while (true) {
@@ -138,5 +171,11 @@ static void async_cb(uv_async_t *handle)
static void timer_cb(uv_timer_t *handle)
{
+ bool *timeout_expired = handle->data;
+ *timeout_expired = true;
}
+static void timer_close_cb(uv_handle_t *handle)
+{
+ xfree(handle->data);
+}
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
index e7d7bdd483..d1a40d5cc9 100644
--- a/src/nvim/event/loop.h
+++ b/src/nvim/event/loop.h
@@ -16,10 +16,27 @@ KLIST_INIT(WatcherPtr, WatcherPtr, _noop)
typedef struct loop {
uv_loop_t uv;
- MultiQueue *events, *fast_events, *thread_events;
+ MultiQueue *events;
+ MultiQueue *thread_events;
+ // Immediate events:
+ // "Processed after exiting uv_run() (to avoid recursion), but before
+ // returning from loop_poll_events()." 502aee690c98
+ // Practical consequence (for main_loop): these events are processed by
+ // state_enter()..os_inchar()
+ // whereas "regular" events (main_loop.events) are processed by
+ // state_enter()..VimState.execute()
+ // But state_enter()..os_inchar() can be "too early" if you want the event
+ // to trigger UI updates and other user-activity-related side-effects.
+ MultiQueue *fast_events;
+
+ // used by process/job-control subsystem
klist_t(WatcherPtr) *children;
uv_signal_t children_watcher;
- uv_timer_t children_kill_timer, poll_timer;
+ uv_timer_t children_kill_timer;
+
+ // generic timer, used by loop_poll_events()
+ uv_timer_t poll_timer;
+
size_t children_stop_requests;
uv_async_t async;
uv_mutex_t mutex;
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index cad49e2007..60650344ce 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -12,6 +12,7 @@
#include "nvim/event/wstream.h"
#include "nvim/event/process.h"
#include "nvim/event/libuv_process.h"
+#include "nvim/os/process.h"
#include "nvim/os/pty_process.h"
#include "nvim/globals.h"
#include "nvim/macros.h"
@@ -21,32 +22,32 @@
# include "event/process.c.generated.h"
#endif
-// Time (ns) for a process to exit cleanly before we send TERM/KILL.
-#define TERM_TIMEOUT 1000000000
-#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
-
-#define CLOSE_PROC_STREAM(proc, stream) \
- do { \
- if (proc->stream && !proc->stream->closed) { \
- stream_close(proc->stream, NULL, NULL); \
- } \
- } while (0)
+// Time for a process to exit cleanly before we send KILL.
+// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
+#define KILL_TIMEOUT_MS 2000
static bool process_is_tearing_down = false;
/// @returns zero on success, or negative error code
-int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
+int process_spawn(Process *proc, bool in, bool out, bool err)
+ FUNC_ATTR_NONNULL_ALL
{
- if (proc->in) {
- uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
+ if (in) {
+ uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0);
+ } else {
+ proc->in.closed = true;
}
- if (proc->out) {
- uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
+ if (out) {
+ uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0);
+ } else {
+ proc->out.closed = true;
}
- if (proc->err) {
- uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
+ if (err) {
+ uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0);
+ } else {
+ proc->err.closed = true;
}
int status;
@@ -62,14 +63,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (status) {
- if (proc->in) {
- uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
+ if (in) {
+ uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
}
- if (proc->out) {
- uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
+ if (out) {
+ uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL);
}
- if (proc->err) {
- uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
+ if (err) {
+ uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL);
}
if (proc->type == kProcessTypeUv) {
@@ -82,30 +83,27 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return status;
}
- if (proc->in) {
- stream_init(NULL, proc->in, -1,
- STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe));
- proc->in->events = proc->events;
- proc->in->internal_data = proc;
- proc->in->internal_close_cb = on_process_stream_close;
+ if (in) {
+ stream_init(NULL, &proc->in, -1,
+ STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe));
+ proc->in.internal_data = proc;
+ proc->in.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
- if (proc->out) {
- stream_init(NULL, proc->out, -1,
- STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe));
- proc->out->events = proc->events;
- proc->out->internal_data = proc;
- proc->out->internal_close_cb = on_process_stream_close;
+ if (out) {
+ stream_init(NULL, &proc->out, -1,
+ STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe));
+ proc->out.internal_data = proc;
+ proc->out.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
- if (proc->err) {
- stream_init(NULL, proc->err, -1,
- STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe));
- proc->err->events = proc->events;
- proc->err->internal_data = proc;
- proc->err->internal_close_cb = on_process_stream_close;
+ if (err) {
+ stream_init(NULL, &proc->err, -1,
+ STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe));
+ proc->err.internal_data = proc;
+ proc->err.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
@@ -125,8 +123,6 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
// Close handles to process without killing it.
CREATE_EVENT(loop->events, process_close_handles, 1, proc);
} else {
- uv_kill(proc->pid, SIGTERM);
- proc->term_sent = true;
process_stop(proc);
}
}
@@ -138,27 +134,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
pty_process_teardown(loop);
}
-// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
- process_close_in(proc);
- process_close_out(proc);
- process_close_err(proc);
-}
-
-void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, in);
-}
-
-void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, out);
-}
-
-void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, err);
+ stream_may_close(&proc->in);
+ stream_may_close(&proc->out);
+ stream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@@ -166,16 +146,14 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// @param process Process instance
/// @param ms Time in milliseconds to wait for the process.
/// 0 for no wait. -1 to wait until the process quits.
-/// @return Exit code of the process.
+/// @return Exit code of the process. proc->status will have the same value.
/// -1 if the timeout expired while the process is still running.
/// -2 if the user interruped the wait.
int process_wait(Process *proc, int ms, MultiQueue *events)
FUNC_ATTR_NONNULL_ARG(1)
{
- int status = -1; // default
- bool interrupted = false;
if (!proc->refcount) {
- status = proc->status;
+ int status = proc->status;
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
return status;
}
@@ -195,7 +173,6 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
if (got_int) {
- interrupted = true;
got_int = false;
process_stop(proc);
if (ms == -1) {
@@ -206,12 +183,13 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
} else {
LOOP_PROCESS_EVENTS(proc->loop, events, 0);
}
+
+ proc->status = -2;
}
if (proc->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job
// resources
- status = interrupted ? -2 : proc->status;
decref(proc);
if (events) {
// the decref call created an exit event, process it now
@@ -221,7 +199,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
proc->refcount--;
}
- return status;
+ return proc->status;
}
/// Ask a process to terminate and eventually kill if it doesn't respond
@@ -237,7 +215,8 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
// Close the process's stdin. If the process doesn't close its own
// stdout/stderr, they will be closed when it exits(possibly due to being
// terminated after a timeout)
- process_close_in(proc);
+ stream_may_close(&proc->in);
+ os_proc_tree_kill(proc->pid, SIGTERM);
break;
case kProcessTypePty:
// close all streams for pty processes to send SIGHUP to the process
@@ -251,9 +230,10 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
Loop *loop = proc->loop;
if (!loop->children_stop_requests++) {
// When there's at least one stop request pending, start a timer that
- // will periodically check if a signal should be send to a to the job
- DLOG("Starting job kill timer");
- uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100);
+ // will periodically check if a signal should be send to the job.
+ ILOG("starting job kill timer");
+ uv_timer_start(&loop->children_kill_timer, children_kill_cb,
+ KILL_TIMEOUT_MS, KILL_TIMEOUT_MS);
}
}
@@ -269,15 +249,13 @@ static void children_kill_cb(uv_timer_t *handle)
if (!proc->stopped_time) {
continue;
}
- uint64_t elapsed = now - proc->stopped_time;
-
- if (!proc->term_sent && elapsed >= TERM_TIMEOUT) {
- ILOG("Sending SIGTERM to pid %d", proc->pid);
- uv_kill(proc->pid, SIGTERM);
- proc->term_sent = true;
- } else if (elapsed >= KILL_TIMEOUT) {
- ILOG("Sending SIGKILL to pid %d", proc->pid);
- uv_kill(proc->pid, SIGKILL);
+ uint64_t elapsed = (now - proc->stopped_time) / 1000000 + 1;
+
+ if (elapsed >= KILL_TIMEOUT_MS) {
+ int sig = proc->type == kProcessTypePty && elapsed < KILL_TIMEOUT_MS * 2
+ ? SIGTERM
+ : SIGKILL;
+ os_proc_tree_kill(proc->pid, sig);
}
}
}
@@ -324,6 +302,13 @@ static void process_close(Process *proc)
}
assert(!proc->closed);
proc->closed = true;
+
+ if (proc->detach) {
+ if (proc->type == kProcessTypeUv) {
+ uv_unref((uv_handle_t *)&(((LibuvProcess *)proc)->uv));
+ }
+ }
+
switch (proc->type) {
case kProcessTypeUv:
libuv_process_close((LibuvProcess *)proc);
@@ -368,15 +353,15 @@ static void flush_stream(Process *proc, Stream *stream)
// Poll for data and process the generated events.
loop_poll_events(proc->loop, 0);
- if (proc->events) {
- multiqueue_process_events(proc->events);
+ if (stream->events) {
+ multiqueue_process_events(stream->events);
}
// Stream can be closed if it is empty.
if (num_bytes == stream->num_bytes) {
- if (stream->read_cb) {
+ if (stream->read_cb && !stream->did_eof) {
// Stream callback could miss EOF handling if a child keeps the stream
- // open.
+ // open. But only send EOF if we haven't already.
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
@@ -388,8 +373,8 @@ static void process_close_handles(void **argv)
{
Process *proc = argv[0];
- flush_stream(proc, proc->out);
- flush_stream(proc, proc->err);
+ flush_stream(proc, &proc->out);
+ flush_stream(proc, &proc->err);
process_close_streams(proc);
process_close(proc);
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 26d70a5e6d..033ce3604b 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -23,13 +23,14 @@ struct process {
uint64_t stopped_time;
const char *cwd;
char **argv;
- Stream *in, *out, *err;
+ Stream in, out, err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
- bool closed, term_sent, detach;
+ bool closed, detach;
MultiQueue *events;
};
+
static inline Process process_init(Loop *loop, ProcessType type, void *data)
{
return (Process) {
@@ -38,23 +39,27 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.loop = loop,
.events = NULL,
.pid = 0,
- .status = 0,
+ .status = -1,
.refcount = 0,
.stopped_time = 0,
.cwd = NULL,
.argv = NULL,
- .in = NULL,
- .out = NULL,
- .err = NULL,
+ .in = { .closed = false },
+ .out = { .closed = false },
+ .err = { .closed = false },
.cb = NULL,
.closed = false,
- .term_sent = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
.detach = false
};
}
+static inline bool process_is_stopped(Process *proc)
+{
+ return proc->stopped_time != 0;
+}
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.h.generated.h"
#endif
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 854af474b2..2fbe7f6773 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -95,7 +95,7 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
// `uv_buf_t.len` happens to have different size on Windows.
size_t write_count;
buf->base = rbuffer_write_ptr(stream->buffer, &write_count);
- buf->len = write_count;
+ buf->len = UV_BUF_LEN(write_count);
}
// Callback invoked by libuv after it copies the data into the buffer provided
@@ -105,19 +105,19 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
Stream *stream = uvstream->data;
- if (cnt > 0) {
- stream->num_bytes += (size_t)cnt;
- }
-
if (cnt <= 0) {
- if (cnt != UV_ENOBUFS
- // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
- // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
- //
- // We don't need to do anything with the RBuffer because the next call
- // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
- // won't be called)
- && cnt != 0) {
+ // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
+ // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
+ //
+ // We don't need to do anything with the RBuffer because the next call
+ // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
+ // won't be called)
+ if (cnt == UV_ENOBUFS || cnt == 0) {
+ return;
+ } else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
+ // The TTY driver might signal TTY without closing the stream
+ invoke_read_cb(stream, 0, true);
+ } else {
DLOG("Closing Stream (%p): %s (%s)", stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
@@ -130,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
+ stream->num_bytes += nread;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
@@ -145,7 +146,7 @@ static void fread_idle_cb(uv_idle_t *handle)
// `uv_buf_t.len` happens to have different size on Windows.
size_t write_count;
stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count);
- stream->uvbuf.len = write_count;
+ stream->uvbuf.len = UV_BUF_LEN(write_count);
// the offset argument to uv_fs_read is int64_t, could someone really try
// to read more than 9 quintillion (9e18) bytes?
@@ -187,6 +188,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
+ stream->did_eof = eof;
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index a796f303ab..6f45b09fce 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -105,9 +105,10 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
// contain 0 in this case, unless uv_tcp_getsockname() is used first.
uv_tcp_getsockname(&watcher->uv.tcp.handle, (struct sockaddr *)&sas,
&(int){ sizeof(sas) });
- uint16_t port = (uint16_t)((sas.ss_family == AF_INET)
- ? ((struct sockaddr_in *)&sas)->sin_port
- : ((struct sockaddr_in6 *)&sas)->sin6_port);
+ uint16_t port = (uint16_t)(
+ (sas.ss_family == AF_INET)
+ ? (STRUCT_CAST(struct sockaddr_in, &sas))->sin_port
+ : (STRUCT_CAST(struct sockaddr_in6, &sas))->sin6_port);
// v:servername uses the string from watcher->addr
size_t len = strlen(watcher->addr);
snprintf(watcher->addr+len, sizeof(watcher->addr)-len, ":%" PRIu16,
@@ -247,7 +248,7 @@ tcp_retry:
uv_pipe_t *pipe = &stream->uv.pipe;
uv_pipe_init(&loop->uv, pipe, 0);
uv_pipe_connect(&req, pipe, address, connect_cb);
- uv_stream = (uv_stream_t *)pipe;
+ uv_stream = STRUCT_CAST(uv_stream_t, pipe);
}
status = 1;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1);
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 60ceff9b24..ba25b76ec7 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -7,6 +7,7 @@
#include <uv.h>
+#include "nvim/log.h"
#include "nvim/rbuffer.h"
#include "nvim/macros.h"
#include "nvim/event/stream.h"
@@ -81,6 +82,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
+ DLOG("closing Stream: %p", stream);
stream->closed = true;
stream->close_cb = on_stream_close;
stream->close_cb_data = data;
@@ -90,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
}
}
+void stream_may_close(Stream *stream)
+{
+ if (!stream->closed) {
+ stream_close(stream, NULL, NULL);
+ }
+}
+
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index d27497e4a4..e713323f5c 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -14,10 +14,7 @@ typedef struct stream Stream;
///
/// @param stream The Stream instance
/// @param rbuffer The associated RBuffer instance
-/// @param count Number of bytes to read. This must be respected if keeping
-/// the order of events is a requirement. This is because events
-/// may be queued and only processed later when more data is copied
-/// into to the buffer, so one read may starve another.
+/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
@@ -33,6 +30,8 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream {
+ bool closed;
+ bool did_eof;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
@@ -52,7 +51,6 @@ struct stream {
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
- bool closed;
MultiQueue *events;
};
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index f453e5898d..d2fb52243c 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -8,6 +8,7 @@
#include <uv.h>
+#include "nvim/log.h"
#include "nvim/event/loop.h"
#include "nvim/event/wstream.h"
#include "nvim/vim.h"
@@ -89,7 +90,7 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
uv_buf_t uvbuf;
uvbuf.base = buffer->data;
- uvbuf.len = buffer->size;
+ uvbuf.len = UV_BUF_LEN(buffer->size);
if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) {
xfree(data);