aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
authorZyX <kp-pav@yandex.ru>2017-12-03 16:49:30 +0300
committerZyX <kp-pav@yandex.ru>2017-12-03 16:49:30 +0300
commitc49e22d3964d6c7ae1c24e8ad01b5fec4ca40b57 (patch)
treeb7e59c416d1435725c65f8952b6e55c70544d97e /src/nvim/event
parent62108c3b0be46936c83f6d4c98b44ceb5e6f77fd (diff)
parent27a577586eace687c47e7398845178208cae524a (diff)
downloadrneovim-c49e22d3964d6c7ae1c24e8ad01b5fec4ca40b57.tar.gz
rneovim-c49e22d3964d6c7ae1c24e8ad01b5fec4ca40b57.tar.bz2
rneovim-c49e22d3964d6c7ae1c24e8ad01b5fec4ca40b57.zip
Merge branch 'master' into s-dash-stdin
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/defs.h8
-rw-r--r--src/nvim/event/libuv_process.c29
-rw-r--r--src/nvim/event/loop.c54
-rw-r--r--src/nvim/event/loop.h21
-rw-r--r--src/nvim/event/multiqueue.c51
-rw-r--r--src/nvim/event/multiqueue.h2
-rw-r--r--src/nvim/event/process.c163
-rw-r--r--src/nvim/event/process.h21
-rw-r--r--src/nvim/event/rstream.c39
-rw-r--r--src/nvim/event/signal.c3
-rw-r--r--src/nvim/event/socket.c210
-rw-r--r--src/nvim/event/socket.h2
-rw-r--r--src/nvim/event/stream.c20
-rw-r--r--src/nvim/event/stream.h8
-rw-r--r--src/nvim/event/time.c3
-rw-r--r--src/nvim/event/wstream.c4
16 files changed, 432 insertions, 206 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h
index e5335d9f25..cc875d74b9 100644
--- a/src/nvim/event/defs.h
+++ b/src/nvim/event/defs.h
@@ -8,16 +8,14 @@
typedef void (*argv_callback)(void **argv);
typedef struct message {
- int priority;
argv_callback handler;
void *argv[EVENT_HANDLER_MAX_ARGC];
} Event;
typedef void(*event_scheduler)(Event event, void *data);
-#define VA_EVENT_INIT(event, p, h, a) \
+#define VA_EVENT_INIT(event, h, a) \
do { \
assert(a <= EVENT_HANDLER_MAX_ARGC); \
- (event)->priority = p; \
(event)->handler = h; \
if (a) { \
va_list args; \
@@ -29,11 +27,11 @@ typedef void(*event_scheduler)(Event event, void *data);
} \
} while (0)
-static inline Event event_create(int priority, argv_callback cb, int argc, ...)
+static inline Event event_create(argv_callback cb, int argc, ...)
{
assert(argc <= EVENT_HANDLER_MAX_ARGC);
Event event;
- VA_EVENT_INIT(&event, priority, cb, argc);
+ VA_EVENT_INIT(&event, cb, argc);
return event;
}
diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c
index 3da0c386b4..c101cb1bb9 100644
--- a/src/nvim/event/libuv_process.c
+++ b/src/nvim/event/libuv_process.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <uv.h>
@@ -8,6 +11,8 @@
#include "nvim/event/process.h"
#include "nvim/event/libuv_process.h"
#include "nvim/log.h"
+#include "nvim/macros.h"
+#include "nvim/os/os.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/libuv_process.c.generated.h"
@@ -24,9 +29,16 @@ int libuv_process_spawn(LibuvProcess *uvproc)
if (proc->detach) {
uvproc->uvopts.flags |= UV_PROCESS_DETACHED;
}
+#ifdef WIN32
+ // libuv collapses the argv to a CommandLineToArgvW()-style string. cmd.exe
+ // expects a different syntax (must be prepared by the caller before now).
+ if (os_shell_is_cmdexe(proc->argv[0])) {
+ uvproc->uvopts.flags |= UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS;
+ }
+#endif
uvproc->uvopts.exit_cb = exit_cb;
uvproc->uvopts.cwd = proc->cwd;
- uvproc->uvopts.env = NULL;
+ uvproc->uvopts.env = NULL; // Inherits the parent (nvim) env.
uvproc->uvopts.stdio = uvproc->uvstdio;
uvproc->uvopts.stdio_count = 3;
uvproc->uvstdio[0].flags = UV_IGNORE;
@@ -34,19 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvstdio[2].flags = UV_IGNORE;
uvproc->uv.data = proc;
- if (proc->in) {
+ if (!proc->in.closed) {
uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
- uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe;
+ uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t,
+ &proc->in.uv.pipe);
}
- if (proc->out) {
+ if (!proc->out.closed) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe;
+ uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t,
+ &proc->out.uv.pipe);
}
- if (proc->err) {
+ if (!proc->err.closed) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe;
+ uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t,
+ &proc->err.uv.pipe);
}
int status;
diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c
index 0e1775d01b..5adf16c0f3 100644
--- a/src/nvim/event/loop.c
+++ b/src/nvim/event/loop.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <stdarg.h>
#include <stdint.h>
@@ -5,6 +8,7 @@
#include "nvim/event/loop.h"
#include "nvim/event/process.h"
+#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/loop.c.generated.h"
@@ -41,8 +45,7 @@ void loop_poll_events(Loop *loop, int ms)
// we do not block indefinitely for I/O.
uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
} else if (ms == 0) {
- // For ms == 0, we need to do a non-blocking event poll by
- // setting the run mode to UV_RUN_NOWAIT.
+ // For ms == 0, do a non-blocking event poll.
mode = UV_RUN_NOWAIT;
}
@@ -56,7 +59,14 @@ void loop_poll_events(Loop *loop, int ms)
multiqueue_process_events(loop->fast_events);
}
-// Schedule an event from another thread
+/// Schedules an event from another thread.
+///
+/// @note Event is queued into `fast_events`, which is processed outside of the
+/// primary `events` queue by loop_poll_events(). For `main_loop`, that
+/// means `fast_events` is NOT processed in an "editor mode"
+/// (VimState.execute), so redraw and other side-effects are likely to be
+/// skipped.
+/// @see loop_schedule_deferred
void loop_schedule(Loop *loop, Event event)
{
uv_mutex_lock(&loop->mutex);
@@ -65,6 +75,24 @@ void loop_schedule(Loop *loop, Event event)
uv_mutex_unlock(&loop->mutex);
}
+/// Schedules an event from another thread. Unlike loop_schedule(), the event
+/// is forwarded to `Loop.events`, instead of being processed immediately.
+///
+/// @see loop_schedule
+void loop_schedule_deferred(Loop *loop, Event event)
+{
+ Event *eventp = xmalloc(sizeof(*eventp));
+ *eventp = event;
+ loop_schedule(loop, event_create(loop_deferred_event, 2, loop, eventp));
+}
+static void loop_deferred_event(void **argv)
+{
+ Loop *loop = argv[0];
+ Event *eventp = argv[1];
+ multiqueue_put_event(loop->events, *eventp);
+ xfree(eventp);
+}
+
void loop_on_put(MultiQueue *queue, void *data)
{
Loop *loop = data;
@@ -76,20 +104,34 @@ void loop_on_put(MultiQueue *queue, void *data)
uv_stop(&loop->uv);
}
-void loop_close(Loop *loop, bool wait)
+/// @returns false if the loop could not be closed gracefully
+bool loop_close(Loop *loop, bool wait)
{
+ bool rv = true;
uv_mutex_destroy(&loop->mutex);
uv_close((uv_handle_t *)&loop->children_watcher, NULL);
uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
uv_close((uv_handle_t *)&loop->poll_timer, NULL);
uv_close((uv_handle_t *)&loop->async, NULL);
- do {
+ uint64_t start = wait ? os_hrtime() : 0;
+ while (true) {
uv_run(&loop->uv, wait ? UV_RUN_DEFAULT : UV_RUN_NOWAIT);
- } while (uv_loop_close(&loop->uv) && wait);
+ if (!uv_loop_close(&loop->uv) || !wait) {
+ break;
+ }
+ if (os_hrtime() - start >= 2 * 1000000000) {
+ // Some libuv resource was not correctly deref'd. Log and bail.
+ rv = false;
+ ELOG("uv_loop_close() hang?");
+ log_uv_handles(&loop->uv);
+ break;
+ }
+ }
multiqueue_free(loop->fast_events);
multiqueue_free(loop->thread_events);
multiqueue_free(loop->events);
kl_destroy(WatcherPtr, loop->children);
+ return rv;
}
void loop_purge(Loop *loop)
diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h
index e7d7bdd483..d1a40d5cc9 100644
--- a/src/nvim/event/loop.h
+++ b/src/nvim/event/loop.h
@@ -16,10 +16,27 @@ KLIST_INIT(WatcherPtr, WatcherPtr, _noop)
typedef struct loop {
uv_loop_t uv;
- MultiQueue *events, *fast_events, *thread_events;
+ MultiQueue *events;
+ MultiQueue *thread_events;
+ // Immediate events:
+ // "Processed after exiting uv_run() (to avoid recursion), but before
+ // returning from loop_poll_events()." 502aee690c98
+ // Practical consequence (for main_loop): these events are processed by
+ // state_enter()..os_inchar()
+ // whereas "regular" events (main_loop.events) are processed by
+ // state_enter()..VimState.execute()
+ // But state_enter()..os_inchar() can be "too early" if you want the event
+ // to trigger UI updates and other user-activity-related side-effects.
+ MultiQueue *fast_events;
+
+ // used by process/job-control subsystem
klist_t(WatcherPtr) *children;
uv_signal_t children_watcher;
- uv_timer_t children_kill_timer, poll_timer;
+ uv_timer_t children_kill_timer;
+
+ // generic timer, used by loop_poll_events()
+ uv_timer_t poll_timer;
+
size_t children_stop_requests;
uv_async_t async;
uv_mutex_t mutex;
diff --git a/src/nvim/event/multiqueue.c b/src/nvim/event/multiqueue.c
index 79b4dd9458..ef9f3f1870 100644
--- a/src/nvim/event/multiqueue.c
+++ b/src/nvim/event/multiqueue.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
// Multi-level queue for selective async event processing.
// Not threadsafe; access must be synchronized externally.
//
@@ -123,6 +126,7 @@ void multiqueue_free(MultiQueue *this)
xfree(this);
}
+/// Removes the next item and returns its Event.
Event multiqueue_get(MultiQueue *this)
{
return multiqueue_empty(this) ? NILEVENT : multiqueue_remove(this);
@@ -141,7 +145,7 @@ void multiqueue_process_events(MultiQueue *this)
{
assert(this);
while (!multiqueue_empty(this)) {
- Event event = multiqueue_get(this);
+ Event event = multiqueue_remove(this);
if (event.handler) {
event.handler(event.argv);
}
@@ -175,36 +179,48 @@ size_t multiqueue_size(MultiQueue *this)
return this->size;
}
-static Event multiqueue_remove(MultiQueue *this)
+/// Gets an Event from an item.
+///
+/// @param remove Remove the node from its queue, and free it.
+static Event multiqueueitem_get_event(MultiQueueItem *item, bool remove)
{
- assert(!multiqueue_empty(this));
- QUEUE *h = QUEUE_HEAD(&this->headtail);
- QUEUE_REMOVE(h);
- MultiQueueItem *item = multiqueue_node_data(h);
- Event rv;
-
+ assert(item != NULL);
+ Event ev;
if (item->link) {
- assert(!this->parent);
- // remove the next node in the linked queue
+ // get the next node in the linked queue
MultiQueue *linked = item->data.queue;
assert(!multiqueue_empty(linked));
MultiQueueItem *child =
multiqueue_node_data(QUEUE_HEAD(&linked->headtail));
- QUEUE_REMOVE(&child->node);
- rv = child->data.item.event;
- xfree(child);
+ ev = child->data.item.event;
+ // remove the child node
+ if (remove) {
+ QUEUE_REMOVE(&child->node);
+ xfree(child);
+ }
} else {
- if (this->parent) {
- // remove the corresponding link node in the parent queue
+ // remove the corresponding link node in the parent queue
+ if (remove && item->data.item.parent_item) {
QUEUE_REMOVE(&item->data.item.parent_item->node);
xfree(item->data.item.parent_item);
+ item->data.item.parent_item = NULL;
}
- rv = item->data.item.event;
+ ev = item->data.item.event;
}
+ return ev;
+}
+static Event multiqueue_remove(MultiQueue *this)
+{
+ assert(!multiqueue_empty(this));
+ QUEUE *h = QUEUE_HEAD(&this->headtail);
+ QUEUE_REMOVE(h);
+ MultiQueueItem *item = multiqueue_node_data(h);
+ assert(!item->link || !this->parent); // Only a parent queue has link-nodes
+ Event ev = multiqueueitem_get_event(item, true);
this->size--;
xfree(item);
- return rv;
+ return ev;
}
static void multiqueue_push(MultiQueue *this, Event event)
@@ -212,6 +228,7 @@ static void multiqueue_push(MultiQueue *this, Event event)
MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem));
item->link = false;
item->data.item.event = event;
+ item->data.item.parent_item = NULL;
QUEUE_INSERT_TAIL(&this->headtail, &item->node);
if (this->parent) {
// push link node to the parent queue
diff --git a/src/nvim/event/multiqueue.h b/src/nvim/event/multiqueue.h
index def6b95a10..a688107665 100644
--- a/src/nvim/event/multiqueue.h
+++ b/src/nvim/event/multiqueue.h
@@ -10,7 +10,7 @@ typedef struct multiqueue MultiQueue;
typedef void (*put_callback)(MultiQueue *multiq, void *data);
#define multiqueue_put(q, h, ...) \
- multiqueue_put_event(q, event_create(1, h, __VA_ARGS__));
+ multiqueue_put_event(q, event_create(h, __VA_ARGS__));
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 4429a65f92..4eb2dd0baf 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <stdlib.h>
@@ -11,38 +14,39 @@
#include "nvim/event/libuv_process.h"
#include "nvim/os/pty_process.h"
#include "nvim/globals.h"
+#include "nvim/macros.h"
#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.c.generated.h"
#endif
-// Time (ns) for a process to exit cleanly before we send TERM/KILL.
-#define TERM_TIMEOUT 1000000000
-#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
-
-#define CLOSE_PROC_STREAM(proc, stream) \
- do { \
- if (proc->stream && !proc->stream->closed) { \
- stream_close(proc->stream, NULL, NULL); \
- } \
- } while (0)
+// Time for a process to exit cleanly before we send KILL.
+// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
+#define KILL_TIMEOUT_MS 2000
static bool process_is_tearing_down = false;
/// @returns zero on success, or negative error code
-int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
+int process_spawn(Process *proc, bool in, bool out, bool err)
+ FUNC_ATTR_NONNULL_ALL
{
- if (proc->in) {
- uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
+ if (in) {
+ uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0);
+ } else {
+ proc->in.closed = true;
}
- if (proc->out) {
- uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
+ if (out) {
+ uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0);
+ } else {
+ proc->out.closed = true;
}
- if (proc->err) {
- uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
+ if (err) {
+ uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0);
+ } else {
+ proc->err.closed = true;
}
int status;
@@ -58,14 +62,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (status) {
- if (proc->in) {
- uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
+ if (in) {
+ uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
}
- if (proc->out) {
- uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
+ if (out) {
+ uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL);
}
- if (proc->err) {
- uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
+ if (err) {
+ uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL);
}
if (proc->type == kProcessTypeUv) {
@@ -78,27 +82,27 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return status;
}
- if (proc->in) {
- stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
- proc->in->events = proc->events;
- proc->in->internal_data = proc;
- proc->in->internal_close_cb = on_process_stream_close;
+ if (in) {
+ stream_init(NULL, &proc->in, -1,
+ STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe));
+ proc->in.internal_data = proc;
+ proc->in.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
- if (proc->out) {
- stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
- proc->out->events = proc->events;
- proc->out->internal_data = proc;
- proc->out->internal_close_cb = on_process_stream_close;
+ if (out) {
+ stream_init(NULL, &proc->out, -1,
+ STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe));
+ proc->out.internal_data = proc;
+ proc->out.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
- if (proc->err) {
- stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
- proc->err->events = proc->events;
- proc->err->internal_data = proc;
- proc->err->internal_close_cb = on_process_stream_close;
+ if (err) {
+ stream_init(NULL, &proc->err, -1,
+ STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe));
+ proc->err.internal_data = proc;
+ proc->err.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
@@ -118,8 +122,6 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
// Close handles to process without killing it.
CREATE_EVENT(loop->events, process_close_handles, 1, proc);
} else {
- uv_kill(proc->pid, SIGTERM);
- proc->term_sent = true;
process_stop(proc);
}
}
@@ -131,27 +133,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
pty_process_teardown(loop);
}
-// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
- process_close_in(proc);
- process_close_out(proc);
- process_close_err(proc);
-}
-
-void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, in);
-}
-
-void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, out);
-}
-
-void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, err);
+ stream_may_close(&proc->in);
+ stream_may_close(&proc->out);
+ stream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@@ -159,16 +145,15 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// @param process Process instance
/// @param ms Time in milliseconds to wait for the process.
/// 0 for no wait. -1 to wait until the process quits.
-/// @return Exit code of the process.
+/// @return Exit code of the process. proc->status will have the same value.
/// -1 if the timeout expired while the process is still running.
/// -2 if the user interruped the wait.
int process_wait(Process *proc, int ms, MultiQueue *events)
FUNC_ATTR_NONNULL_ARG(1)
{
- int status = -1; // default
bool interrupted = false;
if (!proc->refcount) {
- status = proc->status;
+ int status = proc->status;
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
return status;
}
@@ -204,7 +189,9 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
if (proc->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job
// resources
- status = interrupted ? -2 : proc->status;
+ if (interrupted) {
+ proc->status = -2;
+ }
decref(proc);
if (events) {
// the decref call created an exit event, process it now
@@ -214,7 +201,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
proc->refcount--;
}
- return status;
+ return proc->status;
}
/// Ask a process to terminate and eventually kill if it doesn't respond
@@ -230,7 +217,9 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
// Close the process's stdin. If the process doesn't close its own
// stdout/stderr, they will be closed when it exits(possibly due to being
// terminated after a timeout)
- process_close_in(proc);
+ stream_may_close(&proc->in);
+ ILOG("Sending SIGTERM to pid %d", proc->pid);
+ uv_kill(proc->pid, SIGTERM);
break;
case kProcessTypePty:
// close all streams for pty processes to send SIGHUP to the process
@@ -244,9 +233,10 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
Loop *loop = proc->loop;
if (!loop->children_stop_requests++) {
// When there's at least one stop request pending, start a timer that
- // will periodically check if a signal should be send to a to the job
- DLOG("Starting job kill timer");
- uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100);
+ // will periodically check if a signal should be send to the job.
+ ILOG("Starting job kill timer");
+ uv_timer_start(&loop->children_kill_timer, children_kill_cb,
+ KILL_TIMEOUT_MS, KILL_TIMEOUT_MS);
}
}
@@ -262,15 +252,15 @@ static void children_kill_cb(uv_timer_t *handle)
if (!proc->stopped_time) {
continue;
}
- uint64_t elapsed = now - proc->stopped_time;
-
- if (!proc->term_sent && elapsed >= TERM_TIMEOUT) {
- ILOG("Sending SIGTERM to pid %d", proc->pid);
- uv_kill(proc->pid, SIGTERM);
- proc->term_sent = true;
- } else if (elapsed >= KILL_TIMEOUT) {
- ILOG("Sending SIGKILL to pid %d", proc->pid);
- uv_kill(proc->pid, SIGKILL);
+ uint64_t elapsed = (now - proc->stopped_time) / 1000000 + 1;
+
+ if (elapsed >= KILL_TIMEOUT_MS) {
+ int sig = proc->type == kProcessTypePty && elapsed < KILL_TIMEOUT_MS * 2
+ ? SIGTERM
+ : SIGKILL;
+ ILOG("Sending %s to pid %d", sig == SIGTERM ? "SIGTERM" : "SIGKILL",
+ proc->pid);
+ uv_kill(proc->pid, sig);
}
}
}
@@ -317,6 +307,13 @@ static void process_close(Process *proc)
}
assert(!proc->closed);
proc->closed = true;
+
+ if (proc->detach) {
+ if (proc->type == kProcessTypeUv) {
+ uv_unref((uv_handle_t *)&(((LibuvProcess *)proc)->uv));
+ }
+ }
+
switch (proc->type) {
case kProcessTypeUv:
libuv_process_close((LibuvProcess *)proc);
@@ -361,15 +358,15 @@ static void flush_stream(Process *proc, Stream *stream)
// Poll for data and process the generated events.
loop_poll_events(proc->loop, 0);
- if (proc->events) {
- multiqueue_process_events(proc->events);
+ if (stream->events) {
+ multiqueue_process_events(stream->events);
}
// Stream can be closed if it is empty.
if (num_bytes == stream->num_bytes) {
- if (stream->read_cb) {
+ if (stream->read_cb && !stream->did_eof) {
// Stream callback could miss EOF handling if a child keeps the stream
- // open.
+ // open. But only send EOF if we haven't already.
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
@@ -381,8 +378,8 @@ static void process_close_handles(void **argv)
{
Process *proc = argv[0];
- flush_stream(proc, proc->out);
- flush_stream(proc, proc->err);
+ flush_stream(proc, &proc->out);
+ flush_stream(proc, &proc->err);
process_close_streams(proc);
process_close(proc);
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5cbf7f9ce7..033ce3604b 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -21,15 +21,16 @@ struct process {
int pid, status, refcount;
// set to the hrtime of when process_stop was called for the process.
uint64_t stopped_time;
- char *cwd;
+ const char *cwd;
char **argv;
- Stream *in, *out, *err;
+ Stream in, out, err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
- bool closed, term_sent, detach;
+ bool closed, detach;
MultiQueue *events;
};
+
static inline Process process_init(Loop *loop, ProcessType type, void *data)
{
return (Process) {
@@ -38,23 +39,27 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.loop = loop,
.events = NULL,
.pid = 0,
- .status = 0,
+ .status = -1,
.refcount = 0,
.stopped_time = 0,
.cwd = NULL,
.argv = NULL,
- .in = NULL,
- .out = NULL,
- .err = NULL,
+ .in = { .closed = false },
+ .out = { .closed = false },
+ .err = { .closed = false },
.cb = NULL,
.closed = false,
- .term_sent = false,
.internal_close_cb = NULL,
.internal_exit_cb = NULL,
.detach = false
};
}
+static inline bool process_is_stopped(Process *proc)
+{
+ return proc->stopped_time != 0;
+}
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.h.generated.h"
#endif
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 92efc9fa2e..e0500ba828 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <stdint.h>
#include <stdbool.h>
@@ -89,7 +92,10 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
Stream *stream = handle->data;
- buf->base = rbuffer_write_ptr(stream->buffer, &buf->len);
+ // `uv_buf_t.len` happens to have different size on Windows.
+ size_t write_count;
+ buf->base = rbuffer_write_ptr(stream->buffer, &write_count);
+ buf->len = write_count;
}
// Callback invoked by libuv after it copies the data into the buffer provided
@@ -99,19 +105,19 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
Stream *stream = uvstream->data;
- if (cnt > 0) {
- stream->num_bytes += (size_t)cnt;
- }
-
if (cnt <= 0) {
- if (cnt != UV_ENOBUFS
- // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
- // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
- //
- // We don't need to do anything with the RBuffer because the next call
- // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
- // won't be called)
- && cnt != 0) {
+ // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
+ // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
+ //
+ // We don't need to do anything with the RBuffer because the next call
+ // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
+ // won't be called)
+ if (cnt == UV_ENOBUFS || cnt == 0) {
+ return;
+ } else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
+ // The TTY driver might signal TTY without closing the stream
+ invoke_read_cb(stream, 0, true);
+ } else {
DLOG("Closing Stream (%p): %s (%s)", stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
@@ -124,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
+ stream->num_bytes += nread;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
@@ -136,7 +143,10 @@ static void fread_idle_cb(uv_idle_t *handle)
uv_fs_t req;
Stream *stream = handle->data;
- stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len);
+ // `uv_buf_t.len` happens to have different size on Windows.
+ size_t write_count;
+ stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count);
+ stream->uvbuf.len = write_count;
// the offset argument to uv_fs_read is int64_t, could someone really try
// to read more than 9 quintillion (9e18) bytes?
@@ -178,6 +188,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
+ stream->did_eof = eof;
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c
index 11ce15a882..fec46da4ff 100644
--- a/src/nvim/event/signal.c
+++ b/src/nvim/event/signal.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <uv.h>
#include "nvim/event/loop.h"
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index 8f9327f3d4..6f45b09fce 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <stdint.h>
@@ -12,99 +15,123 @@
#include "nvim/vim.h"
#include "nvim/strings.h"
#include "nvim/path.h"
+#include "nvim/main.h"
#include "nvim/memory.h"
+#include "nvim/macros.h"
+#include "nvim/charset.h"
+#include "nvim/log.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/socket.c.generated.h"
#endif
-#define NVIM_DEFAULT_TCP_PORT 7450
-
-void socket_watcher_init(Loop *loop, SocketWatcher *watcher,
- const char *endpoint, void *data)
- FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3)
+int socket_watcher_init(Loop *loop, SocketWatcher *watcher,
+ const char *endpoint)
+ FUNC_ATTR_NONNULL_ALL
{
- // Trim to `ADDRESS_MAX_SIZE`
- if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr))
- >= sizeof(watcher->addr)) {
- // TODO(aktau): since this is not what the user wanted, perhaps we
- // should return an error here
- WLOG("Address was too long, truncated to %s", watcher->addr);
- }
-
- bool tcp = true;
- char ip[16], *ip_end = xstrchrnul(watcher->addr, ':');
+ xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr));
+ char *addr = watcher->addr;
+ char *host_end = strrchr(addr, ':');
- // (ip_end - addr) is always > 0, so convert to size_t
- size_t addr_len = (size_t)(ip_end - watcher->addr);
+ if (host_end && addr != host_end) {
+ // Split user specified address into two strings, addr(hostname) and port.
+ // The port part in watcher->addr will be updated later.
+ *host_end = '\0';
+ char *port = host_end + 1;
+ intmax_t iport;
- if (addr_len > sizeof(ip) - 1) {
- // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255)
- addr_len = sizeof(ip) - 1;
- }
+ int ret = getdigits_safe(&(char_u *){ (char_u *)port }, &iport);
+ if (ret == FAIL || iport < 0 || iport > UINT16_MAX) {
+ ELOG("Invalid port: %s", port);
+ return UV_EINVAL;
+ }
- // Extract the address part
- xstrlcpy(ip, watcher->addr, addr_len + 1);
- int port = NVIM_DEFAULT_TCP_PORT;
-
- if (*ip_end == ':') {
- // Extract the port
- long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
- if (lport <= 0 || lport > 0xffff) {
- // Invalid port, treat as named pipe or unix socket
- tcp = false;
- } else {
- port = (int) lport;
+ if (*port == NUL) {
+ // When no port is given, (uv_)getaddrinfo expects NULL otherwise the
+ // implementation may attempt to lookup the service by name (and fail)
+ port = NULL;
}
- }
- if (tcp) {
- // Try to parse ip address
- if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) {
- // Invalid address, treat as named pipe or unix socket
- tcp = false;
+ uv_getaddrinfo_t request;
+
+ int retval = uv_getaddrinfo(&loop->uv, &request, NULL, addr, port,
+ &(struct addrinfo){
+ .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_STREAM,
+ });
+ if (retval != 0) {
+ ELOG("Host lookup failed: %s", endpoint);
+ return retval;
}
- }
+ watcher->uv.tcp.addrinfo = request.addrinfo;
- if (tcp) {
uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle);
- watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle;
+ uv_tcp_nodelay(&watcher->uv.tcp.handle, true);
+ watcher->stream = STRUCT_CAST(uv_stream_t, &watcher->uv.tcp.handle);
} else {
uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0);
- watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle;
+ watcher->stream = STRUCT_CAST(uv_stream_t, &watcher->uv.pipe.handle);
}
watcher->stream->data = watcher;
watcher->cb = NULL;
watcher->close_cb = NULL;
watcher->events = NULL;
+ watcher->data = NULL;
+
+ return 0;
}
int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
FUNC_ATTR_NONNULL_ALL
{
watcher->cb = cb;
- int result;
+ int result = UV_EINVAL;
if (watcher->stream->type == UV_TCP) {
- result = uv_tcp_bind(&watcher->uv.tcp.handle,
- (const struct sockaddr *)&watcher->uv.tcp.addr, 0);
+ struct addrinfo *ai = watcher->uv.tcp.addrinfo;
+
+ for (; ai; ai = ai->ai_next) {
+ result = uv_tcp_bind(&watcher->uv.tcp.handle, ai->ai_addr, 0);
+ if (result != 0) {
+ continue;
+ }
+ result = uv_listen(watcher->stream, backlog, connection_cb);
+ if (result == 0) {
+ struct sockaddr_storage sas;
+
+ // When the endpoint in socket_watcher_init() didn't specify a port
+ // number, a free random port number will be assigned. sin_port will
+ // contain 0 in this case, unless uv_tcp_getsockname() is used first.
+ uv_tcp_getsockname(&watcher->uv.tcp.handle, (struct sockaddr *)&sas,
+ &(int){ sizeof(sas) });
+ uint16_t port = (uint16_t)(
+ (sas.ss_family == AF_INET)
+ ? (STRUCT_CAST(struct sockaddr_in, &sas))->sin_port
+ : (STRUCT_CAST(struct sockaddr_in6, &sas))->sin6_port);
+ // v:servername uses the string from watcher->addr
+ size_t len = strlen(watcher->addr);
+ snprintf(watcher->addr+len, sizeof(watcher->addr)-len, ":%" PRIu16,
+ ntohs(port));
+ break;
+ }
+ }
+ uv_freeaddrinfo(watcher->uv.tcp.addrinfo);
} else {
result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr);
- }
-
- if (result == 0) {
- result = uv_listen(watcher->stream, backlog, connection_cb);
+ if (result == 0) {
+ result = uv_listen(watcher->stream, backlog, connection_cb);
+ }
}
assert(result <= 0); // libuv should return negative error code or zero.
if (result < 0) {
- if (result == -EACCES) {
+ if (result == UV_EACCES) {
// Libuv converts ENOENT to EACCES for Windows compatibility, but if
// the parent directory does not exist, ENOENT would be more accurate.
*path_tail((char_u *)watcher->addr) = NUL;
if (!os_path_exists((char_u *)watcher->addr)) {
- result = -ENOENT;
+ result = UV_ENOENT;
}
}
return result;
@@ -119,10 +146,11 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
uv_stream_t *client;
if (watcher->stream->type == UV_TCP) {
- client = (uv_stream_t *)&stream->uv.tcp;
+ client = STRUCT_CAST(uv_stream_t, &stream->uv.tcp);
uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client);
+ uv_tcp_nodelay((uv_tcp_t *)client, true);
} else {
- client = (uv_stream_t *)&stream->uv.pipe;
+ client = STRUCT_CAST(uv_stream_t, &stream->uv.pipe);
uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0);
}
@@ -165,3 +193,77 @@ static void close_cb(uv_handle_t *handle)
watcher->close_cb(watcher, watcher->data);
}
}
+
+static void connect_cb(uv_connect_t *req, int status)
+{
+ int *ret_status = req->data;
+ *ret_status = status;
+ if (status != 0) {
+ uv_close((uv_handle_t *)req->handle, NULL);
+ }
+}
+
+bool socket_connect(Loop *loop, Stream *stream,
+ bool is_tcp, const char *address,
+ int timeout, const char **error)
+{
+ bool success = false;
+ int status;
+ uv_connect_t req;
+ req.data = &status;
+ uv_stream_t *uv_stream;
+
+ uv_tcp_t *tcp = &stream->uv.tcp;
+ uv_getaddrinfo_t addr_req;
+ addr_req.addrinfo = NULL;
+ const struct addrinfo *addrinfo = NULL;
+ char *addr = NULL;
+ if (is_tcp) {
+ addr = xstrdup(address);
+ char *host_end = strrchr(addr, ':');
+ if (!host_end) {
+ *error = _("tcp address must be host:port");
+ goto cleanup;
+ }
+ *host_end = NUL;
+
+ const struct addrinfo hints = { .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_STREAM,
+ .ai_flags = AI_NUMERICSERV };
+ int retval = uv_getaddrinfo(&loop->uv, &addr_req, NULL,
+ addr, host_end+1, &hints);
+ if (retval != 0) {
+ *error = _("failed to lookup host or port");
+ goto cleanup;
+ }
+ addrinfo = addr_req.addrinfo;
+
+tcp_retry:
+ uv_tcp_init(&loop->uv, tcp);
+ uv_tcp_nodelay(tcp, true);
+ uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb);
+ uv_stream = (uv_stream_t *)tcp;
+
+ } else {
+ uv_pipe_t *pipe = &stream->uv.pipe;
+ uv_pipe_init(&loop->uv, pipe, 0);
+ uv_pipe_connect(&req, pipe, address, connect_cb);
+ uv_stream = STRUCT_CAST(uv_stream_t, pipe);
+ }
+ status = 1;
+ LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1);
+ if (status == 0) {
+ stream_init(NULL, stream, -1, uv_stream);
+ success = true;
+ } else if (is_tcp && addrinfo->ai_next) {
+ addrinfo = addrinfo->ai_next;
+ goto tcp_retry;
+ } else {
+ *error = _("connection refused");
+ }
+
+cleanup:
+ xfree(addr);
+ uv_freeaddrinfo(addr_req.addrinfo);
+ return success;
+}
diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h
index eb0823c76d..d30ae45502 100644
--- a/src/nvim/event/socket.h
+++ b/src/nvim/event/socket.h
@@ -20,7 +20,7 @@ struct socket_watcher {
union {
struct {
uv_tcp_t handle;
- struct sockaddr_in addr;
+ struct addrinfo *addrinfo;
} tcp;
struct {
uv_pipe_t handle;
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 26083c20f4..ba25b76ec7 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -1,10 +1,15 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <stdio.h>
#include <stdbool.h>
#include <uv.h>
+#include "nvim/log.h"
#include "nvim/rbuffer.h"
+#include "nvim/macros.h"
#include "nvim/event/stream.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -23,8 +28,9 @@ int stream_set_blocking(int fd, bool blocking)
uv_loop_init(&loop);
uv_pipe_init(&loop, &stream, 0);
uv_pipe_open(&stream, fd);
- int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking);
- uv_close((uv_handle_t *)&stream, NULL);
+ int retval = uv_stream_set_blocking(STRUCT_CAST(uv_stream_t, &stream),
+ blocking);
+ uv_close(STRUCT_CAST(uv_handle_t, &stream), NULL);
uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt.
uv_loop_close(&loop);
return retval;
@@ -49,7 +55,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
assert(type == UV_NAMED_PIPE || type == UV_TTY);
uv_pipe_init(&loop->uv, &stream->uv.pipe, 0);
uv_pipe_open(&stream->uv.pipe, fd);
- stream->uvstream = (uv_stream_t *)&stream->uv.pipe;
+ stream->uvstream = STRUCT_CAST(uv_stream_t, &stream->uv.pipe);
}
}
@@ -76,6 +82,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
+ DLOG("closing Stream: %p", stream);
stream->closed = true;
stream->close_cb = on_stream_close;
stream->close_cb_data = data;
@@ -85,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
}
}
+void stream_may_close(Stream *stream)
+{
+ if (!stream->closed) {
+ stream_close(stream, NULL, NULL);
+ }
+}
+
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index d27497e4a4..e713323f5c 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -14,10 +14,7 @@ typedef struct stream Stream;
///
/// @param stream The Stream instance
/// @param rbuffer The associated RBuffer instance
-/// @param count Number of bytes to read. This must be respected if keeping
-/// the order of events is a requirement. This is because events
-/// may be queued and only processed later when more data is copied
-/// into to the buffer, so one read may starve another.
+/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
@@ -33,6 +30,8 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream {
+ bool closed;
+ bool did_eof;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
@@ -52,7 +51,6 @@ struct stream {
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
- bool closed;
MultiQueue *events;
};
diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c
index 77260546db..80289c27d1 100644
--- a/src/nvim/event/time.c
+++ b/src/nvim/event/time.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <stdint.h>
#include <uv.h>
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index fc7aad8eb9..320006890d 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <stdint.h>
#include <stdbool.h>
@@ -5,6 +8,7 @@
#include <uv.h>
+#include "nvim/log.h"
#include "nvim/event/loop.h"
#include "nvim/event/wstream.h"
#include "nvim/vim.h"