aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os')
-rw-r--r--src/nvim/os/channel.c598
-rw-r--r--src/nvim/os/channel.h15
-rw-r--r--src/nvim/os/event.c126
-rw-r--r--src/nvim/os/event.h21
-rw-r--r--src/nvim/os/event_defs.h25
-rw-r--r--src/nvim/os/fs.c28
-rw-r--r--src/nvim/os/input.c82
-rw-r--r--src/nvim/os/job.c204
-rw-r--r--src/nvim/os/msgpack_rpc.c188
-rw-r--r--src/nvim/os/msgpack_rpc.h51
-rw-r--r--src/nvim/os/msgpack_rpc_helpers.c289
-rw-r--r--src/nvim/os/msgpack_rpc_helpers.h16
-rw-r--r--src/nvim/os/provider.c2
-rw-r--r--src/nvim/os/rstream.c60
-rw-r--r--src/nvim/os/server.c273
-rw-r--r--src/nvim/os/server.h7
-rw-r--r--src/nvim/os/server_defs.h7
-rw-r--r--src/nvim/os/shell.c31
-rw-r--r--src/nvim/os/signal.c81
-rw-r--r--src/nvim/os/time.c38
20 files changed, 265 insertions, 1877 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
deleted file mode 100644
index 959fbc6e73..0000000000
--- a/src/nvim/os/channel.c
+++ /dev/null
@@ -1,598 +0,0 @@
-#include <stdbool.h>
-#include <string.h>
-#include <inttypes.h>
-
-#include <uv.h>
-#include <msgpack.h>
-
-#include "nvim/api/private/helpers.h"
-#include "nvim/api/vim.h"
-#include "nvim/os/channel.h"
-#include "nvim/os/event.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/msgpack_rpc.h"
-#include "nvim/os/msgpack_rpc_helpers.h"
-#include "nvim/vim.h"
-#include "nvim/ascii.h"
-#include "nvim/memory.h"
-#include "nvim/os_unix.h"
-#include "nvim/message.h"
-#include "nvim/term.h"
-#include "nvim/map.h"
-#include "nvim/log.h"
-#include "nvim/misc1.h"
-#include "nvim/lib/kvec.h"
-
-#define CHANNEL_BUFFER_SIZE 0xffff
-
-typedef struct {
- uint64_t request_id;
- bool errored;
- Object result;
-} ChannelCallFrame;
-
-typedef struct {
- uint64_t id;
- PMap(cstr_t) *subscribed_events;
- bool is_job, enabled;
- msgpack_unpacker *unpacker;
- union {
- Job *job;
- struct {
- RStream *read;
- WStream *write;
- uv_stream_t *uv;
- } streams;
- } data;
- uint64_t next_request_id;
- kvec_t(ChannelCallFrame *) call_stack;
- size_t rpc_call_level;
-} Channel;
-
-static uint64_t next_id = 1;
-static PMap(uint64_t) *channels = NULL;
-static PMap(cstr_t) *event_strings = NULL;
-static msgpack_sbuffer out_buffer;
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/channel.c.generated.h"
-#endif
-
-/// Initializes the module
-void channel_init(void)
-{
- channels = pmap_new(uint64_t)();
- event_strings = pmap_new(cstr_t)();
- msgpack_sbuffer_init(&out_buffer);
-
- if (embedded_mode) {
- channel_from_stdio();
- }
-}
-
-/// Teardown the module
-void channel_teardown(void)
-{
- if (!channels) {
- return;
- }
-
- Channel *channel;
-
- map_foreach_value(channels, channel, {
- close_channel(channel);
- });
-}
-
-/// Creates an API channel by starting a job and connecting to its
-/// stdin/stdout. stderr is forwarded to the editor error stream.
-///
-/// @param argv The argument vector for the process
-/// @return The channel id
-uint64_t channel_from_job(char **argv)
-{
- Channel *channel = register_channel();
- channel->is_job = true;
-
- int status;
- channel->data.job = job_start(argv,
- channel,
- job_out,
- job_err,
- NULL,
- 0,
- &status);
-
- if (status <= 0) {
- close_channel(channel);
- return 0;
- }
-
- return channel->id;
-}
-
-/// Creates an API channel from a libuv stream representing a tcp or
-/// pipe/socket client connection
-///
-/// @param stream The established connection
-void channel_from_stream(uv_stream_t *stream)
-{
- Channel *channel = register_channel();
- stream->data = NULL;
- channel->is_job = false;
- // read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel,
- NULL);
- rstream_set_stream(channel->data.streams.read, stream);
- rstream_start(channel->data.streams.read);
- // write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_stream(channel->data.streams.write, stream);
- channel->data.streams.uv = stream;
-}
-
-bool channel_exists(uint64_t id)
-{
- Channel *channel;
- return (channel = pmap_get(uint64_t)(channels, id)) != NULL
- && channel->enabled;
-}
-
-/// Sends event/arguments to channel
-///
-/// @param id The channel id. If 0, the event will be sent to all
-/// channels that have subscribed to the event type
-/// @param name The event name, an arbitrary string
-/// @param args Array with event arguments
-/// @return True if the event was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *name, Array args)
-{
- Channel *channel = NULL;
-
- if (id > 0) {
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
- api_free_array(args);
- return false;
- }
- send_event(channel, name, args);
- } else {
- broadcast_event(name, args);
- }
-
- return true;
-}
-
-/// Sends a method call to a channel
-///
-/// @param id The channel id
-/// @param method_name The method name, an arbitrary string
-/// @param args Array with method arguments
-/// @param[out] error True if the return value is an error
-/// @return Whatever the remote method returned
-Object channel_send_call(uint64_t id,
- char *method_name,
- Array args,
- Error *err)
-{
- Channel *channel = NULL;
-
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
- api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
- api_free_array(args);
- return NIL;
- }
-
- if (kv_size(channel->call_stack) > 20) {
- // 20 stack depth is more than anyone should ever need for RPC calls
- api_set_error(err,
- Exception,
- _("Channel %" PRIu64 " crossed maximum stack depth"),
- channel->id);
- api_free_array(args);
- return NIL;
- }
-
- uint64_t request_id = channel->next_request_id++;
- // Send the msgpack-rpc request
- send_request(channel, request_id, method_name, args);
-
- EventSource channel_source = channel->is_job
- ? job_event_source(channel->data.job)
- : rstream_event_source(channel->data.streams.read);
- EventSource sources[] = {channel_source, NULL};
-
- // Push the frame
- ChannelCallFrame frame = {request_id, false, NIL};
- kv_push(ChannelCallFrame *, channel->call_stack, &frame);
- size_t size = kv_size(channel->call_stack);
-
- do {
- event_poll(-1, sources);
- } while (
- // Continue running if ...
- channel->enabled && // the channel is still enabled
- kv_size(channel->call_stack) >= size); // the call didn't return
-
- if (frame.errored) {
- api_set_error(err, Exception, "%s", frame.result.data.string.data);
- return NIL;
- }
-
- return frame.result;
-}
-
-/// Subscribes to event broadcasts
-///
-/// @param id The channel id
-/// @param event The event type string
-void channel_subscribe(uint64_t id, char *event)
-{
- Channel *channel;
-
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
- abort();
- }
-
- char *event_string = pmap_get(cstr_t)(event_strings, event);
-
- if (!event_string) {
- event_string = xstrdup(event);
- pmap_put(cstr_t)(event_strings, event_string, event_string);
- }
-
- pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
-}
-
-/// Unsubscribes to event broadcasts
-///
-/// @param id The channel id
-/// @param event The event type string
-void channel_unsubscribe(uint64_t id, char *event)
-{
- Channel *channel;
-
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
- abort();
- }
-
- unsubscribe(channel, event);
-}
-
-/// Closes a channel
-///
-/// @param id The channel id
-/// @return true if successful, false otherwise
-bool channel_close(uint64_t id)
-{
- Channel *channel;
-
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
- return false;
- }
-
- channel_kill(channel);
- channel->enabled = false;
- return true;
-}
-
-/// Creates an API channel from stdin/stdout. This is used when embedding
-/// Neovim
-static void channel_from_stdio(void)
-{
- Channel *channel = register_channel();
- channel->is_job = false;
- // read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel,
- NULL);
- rstream_set_file(channel->data.streams.read, 0);
- rstream_start(channel->data.streams.read);
- // write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_file(channel->data.streams.write, 1);
- channel->data.streams.uv = NULL;
-}
-
-static void job_out(RStream *rstream, void *data, bool eof)
-{
- Job *job = data;
- parse_msgpack(rstream, job_data(job), eof);
-}
-
-static void job_err(RStream *rstream, void *data, bool eof)
-{
- size_t count;
- char buf[256];
- Channel *channel = job_data(data);
-
- while ((count = rstream_pending(rstream))) {
- size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
- buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
- }
-}
-
-static void parse_msgpack(RStream *rstream, void *data, bool eof)
-{
- Channel *channel = data;
- channel->rpc_call_level++;
-
- if (eof) {
- char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Before returning from a RPC call, channel %" PRIu64 " was "
- "closed by the client",
- channel->id);
- call_set_error(channel, buf);
- goto end;
- }
-
- uint32_t count = rstream_pending(rstream);
- DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
- count,
- rstream);
-
- // Feed the unpacker with data
- msgpack_unpacker_reserve_buffer(channel->unpacker, count);
- rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
- msgpack_unpacker_buffer_consumed(channel->unpacker, count);
-
- msgpack_unpacked unpacked;
- msgpack_unpacked_init(&unpacked);
- msgpack_unpack_return result;
-
- // Deserialize everything we can.
- while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
- MSGPACK_UNPACK_SUCCESS) {
- if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
- if (is_valid_rpc_response(&unpacked.data, channel)) {
- call_stack_pop(&unpacked.data, channel);
- } else {
- char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Channel %" PRIu64 " returned a response that doesn't have "
- " a matching id for the current RPC call. Ensure the client "
- " is properly synchronized",
- channel->id);
- call_set_error(channel, buf);
- }
- msgpack_unpacked_destroy(&unpacked);
- // Bail out from this event loop iteration
- goto end;
- }
-
- // Perform the call
- WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
- // write the response
- if (!channel_write(channel, resp)) {
- goto end;
- }
- }
-
- if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
- OUT_STR(e_outofmem);
- out_char('\n');
- preserve_exit();
- }
-
- if (result == MSGPACK_UNPACK_PARSE_ERROR) {
- // See src/msgpack/unpack_template.h in msgpack source tree for
- // causes for this error(search for 'goto _failed')
- //
- // A not so uncommon cause for this might be deserializing objects with
- // a high nesting level: msgpack will break when it's internal parse stack
- // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
- send_error(channel, 0, "Invalid msgpack payload. "
- "This error can also happen when deserializing "
- "an object with high level of nesting");
- }
-
-end:
- channel->rpc_call_level--;
- if (!channel->enabled && !kv_size(channel->call_stack)) {
- // Now it's safe to destroy the channel
- close_channel(channel);
- }
-}
-
-static bool channel_write(Channel *channel, WBuffer *buffer)
-{
- bool success;
-
- if (channel->is_job) {
- success = job_write(channel->data.job, buffer);
- } else {
- success = wstream_write(channel->data.streams.write, buffer);
- }
-
- if (!success) {
- // If the write failed for any reason, close the channel
- char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Before returning from a RPC call, channel %" PRIu64 " was "
- "closed due to a failed write",
- channel->id);
- call_set_error(channel, buf);
- }
-
- return success;
-}
-
-static void send_error(Channel *channel, uint64_t id, char *err)
-{
- Error e = ERROR_INIT;
- api_set_error(&e, Exception, "%s", err);
- channel_write(channel, serialize_response(id, &e, NIL, &out_buffer));
-}
-
-static void send_request(Channel *channel,
- uint64_t id,
- char *name,
- Array args)
-{
- String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(id, method, args, &out_buffer, 1));
-}
-
-static void send_event(Channel *channel,
- char *name,
- Array args)
-{
- String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(0, method, args, &out_buffer, 1));
-}
-
-static void broadcast_event(char *name, Array args)
-{
- kvec_t(Channel *) subscribed;
- kv_init(subscribed);
- Channel *channel;
-
- map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
- kv_push(Channel *, subscribed, channel);
- }
- });
-
- if (!kv_size(subscribed)) {
- api_free_array(args);
- goto end;
- }
-
- String method = {.size = strlen(name), .data = name};
- WBuffer *buffer = serialize_request(0,
- method,
- args,
- &out_buffer,
- kv_size(subscribed));
-
- for (size_t i = 0; i < kv_size(subscribed); i++) {
- channel_write(kv_A(subscribed, i), buffer);
- }
-
-end:
- kv_destroy(subscribed);
-}
-
-static void unsubscribe(Channel *channel, char *event)
-{
- char *event_string = pmap_get(cstr_t)(event_strings, event);
- pmap_del(cstr_t)(channel->subscribed_events, event_string);
-
- map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
- return;
- }
- });
-
- // Since the string is no longer used by other channels, release it's memory
- pmap_del(cstr_t)(event_strings, event_string);
- free(event_string);
-}
-
-static void close_channel(Channel *channel)
-{
- pmap_del(uint64_t)(channels, channel->id);
- msgpack_unpacker_free(channel->unpacker);
-
- // Unsubscribe from all events
- char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
- unsubscribe(channel, event_string);
- });
-
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
- channel_kill(channel);
-
- free(channel);
-}
-
-static void channel_kill(Channel *channel)
-{
- if (channel->is_job) {
- if (channel->data.job) {
- job_stop(channel->data.job);
- }
- } else {
- rstream_free(channel->data.streams.read);
- wstream_free(channel->data.streams.write);
- if (channel->data.streams.uv) {
- uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
- } else {
- // When the stdin channel closes, it's time to go
- mch_exit(0);
- }
- }
-}
-
-static void close_cb(uv_handle_t *handle)
-{
- free(handle->data);
- free(handle);
-}
-
-static Channel *register_channel(void)
-{
- Channel *rv = xmalloc(sizeof(Channel));
- rv->enabled = true;
- rv->rpc_call_level = 0;
- rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = next_id++;
- rv->subscribed_events = pmap_new(cstr_t)();
- rv->next_request_id = 1;
- kv_init(rv->call_stack);
- pmap_put(uint64_t)(channels, rv->id, rv);
- return rv;
-}
-
-static bool is_rpc_response(msgpack_object *obj)
-{
- return obj->type == MSGPACK_OBJECT_ARRAY
- && obj->via.array.size == 4
- && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
- && obj->via.array.ptr[0].via.u64 == 1
- && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
-}
-
-static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
-{
- uint64_t response_id = obj->via.array.ptr[1].via.u64;
- // Must be equal to the frame at the stack's bottom
- return response_id == kv_A(channel->call_stack,
- kv_size(channel->call_stack) - 1)->request_id;
-}
-
-static void call_stack_pop(msgpack_object *obj, Channel *channel)
-{
- ChannelCallFrame *frame = kv_pop(channel->call_stack);
- frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
-
- if (frame->errored) {
- msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
- } else {
- msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
- }
-}
-
-static void call_set_error(Channel *channel, char *msg)
-{
- for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
- ChannelCallFrame *frame = kv_pop(channel->call_stack);
- frame->errored = true;
- frame->result = STRING_OBJ(cstr_to_string(msg));
- }
-
- channel->enabled = false;
-}
diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h
deleted file mode 100644
index bb409bfde9..0000000000
--- a/src/nvim/os/channel.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#ifndef NVIM_OS_CHANNEL_H
-#define NVIM_OS_CHANNEL_H
-
-#include <stdbool.h>
-#include <uv.h>
-
-#include "nvim/api/private/defs.h"
-#include "nvim/vim.h"
-
-#define METHOD_MAXLEN 512
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/channel.h.generated.h"
-#endif
-#endif // NVIM_OS_CHANNEL_H
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index a460b2db96..2dee529452 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -7,8 +7,10 @@
#include "nvim/os/event.h"
#include "nvim/os/input.h"
-#include "nvim/os/channel.h"
-#include "nvim/os/server.h"
+#include "nvim/msgpack_rpc/defs.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
+#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/os/provider.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream.h"
@@ -25,25 +27,22 @@ KLIST_INIT(Event, Event, _destroy_event)
typedef struct {
bool timed_out;
- int32_t ms;
+ int ms;
uv_timer_t *timer;
} TimerData;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
-static klist_t(Event) *deferred_events, *immediate_events;
-// NULL-terminated array of event sources that we should process immediately.
-//
-// Events from sources that are not contained in this array are processed
-// later when `event_process` is called
-static EventSource *immediate_sources = NULL;
+static klist_t(Event) *pending_events;
void event_init(void)
{
+ // early msgpack-rpc initialization
+ msgpack_rpc_init_method_table();
+ msgpack_rpc_helpers_init();
// Initialize the event queues
- deferred_events = kl_init(Event);
- immediate_events = kl_init(Event);
+ pending_events = kl_init(Event);
// Initialize input events
input_init();
// Timer to wake the event loop if a timeout argument is passed to
@@ -52,9 +51,8 @@ void event_init(void)
signal_init();
// Jobs
job_init();
- // Channels
+ // finish mspgack-rpc initialization
channel_init();
- // Servers
server_init();
// Providers
provider_init();
@@ -68,8 +66,7 @@ void event_teardown(void)
}
// Wait for some event
-bool event_poll(int32_t ms, EventSource sources[])
- FUNC_ATTR_NONNULL_ARG(2)
+void event_poll(int ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
@@ -100,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[])
run_mode = UV_RUN_NOWAIT;
}
- size_t processed_events;
-
- do {
- // Run one event loop iteration, blocking for events if run_mode is
- // UV_RUN_ONCE
- processed_events = loop(run_mode, sources);
- } while (
- // Continue running if ...
- !processed_events && // we didn't process any immediate events
- !event_has_deferred() && // no events are waiting to be processed
- run_mode != UV_RUN_NOWAIT && // ms != 0
- !timer_data.timed_out); // we didn't get a timeout
+ loop(run_mode);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
@@ -123,68 +109,29 @@ bool event_poll(int32_t ms, EventSource sources[])
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
- processed_events += loop(UV_RUN_NOWAIT, sources);
+ loop(UV_RUN_NOWAIT);
}
-
- return !timer_data.timed_out && (processed_events || event_has_deferred());
}
bool event_has_deferred(void)
{
- return !kl_empty(deferred_events);
+ return !kl_empty(pending_events);
}
// Queue an event
void event_push(Event event)
{
- bool defer = true;
-
- if (immediate_sources) {
- size_t i;
- EventSource src;
-
- for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
- if (src == event.source) {
- defer = false;
- break;
- }
- }
- }
-
- *kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
+ *kl_pushp(Event, pending_events) = event;
}
-void event_process(void)
-{
- process_from(deferred_events);
-}
-// Runs the appropriate action for each queued event
-static size_t process_from(klist_t(Event) *queue)
+void event_process(void)
{
- size_t count = 0;
Event event;
- while (kl_shift(Event, queue, &event) == 0) {
- switch (event.type) {
- case kEventSignal:
- signal_handle(event);
- break;
- case kEventRStreamData:
- rstream_read_event(event);
- break;
- case kEventJobExit:
- job_exit_event(event);
- break;
- default:
- abort();
- }
- count++;
+ while (kl_shift(Event, pending_events, &event) == 0) {
+ event.handler(event);
}
-
- DLOG("Processed %u events", count);
-
- return count;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
@@ -202,42 +149,9 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle);
}
-static void requeue_deferred_events(void)
+static void loop(uv_run_mode run_mode)
{
- size_t remaining = deferred_events->size;
-
- DLOG("Number of deferred events: %u", remaining);
-
- while (remaining--) {
- // Re-push each deferred event to ensure it will be in the right queue
- Event event;
- kl_shift(Event, deferred_events, &event);
- event_push(event);
- DLOG("Re-queueing event");
- }
-
- DLOG("Number of deferred events: %u", deferred_events->size);
-}
-
-static size_t loop(uv_run_mode run_mode, EventSource *sources)
-{
- size_t count;
- immediate_sources = sources;
- // It's possible that some events from the immediate sources are waiting
- // in the deferred queue. If so, move them to the immediate queue so they
- // will be processed in order of arrival by the next `process_from` call.
- requeue_deferred_events();
- count = process_from(immediate_events);
-
- if (count) {
- // No need to enter libuv, events were already processed
- return count;
- }
-
DLOG("Enter event loop");
uv_run(uv_default_loop(), run_mode);
DLOG("Exit event loop");
- immediate_sources = NULL;
- count = process_from(immediate_events);
- return count;
}
diff --git a/src/nvim/os/event.h b/src/nvim/os/event.h
index 29e304adc8..f8139e978d 100644
--- a/src/nvim/os/event.h
+++ b/src/nvim/os/event.h
@@ -6,6 +6,27 @@
#include "nvim/os/event_defs.h"
#include "nvim/os/job_defs.h"
+#include "nvim/os/time.h"
+
+// Poll for events until a condition is true or a timeout has passed
+#define event_poll_until(timeout, condition) \
+ do { \
+ int remaining = timeout; \
+ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
+ while (!(condition)) { \
+ event_poll(remaining); \
+ if (remaining == 0) { \
+ break; \
+ } else if (remaining > 0) { \
+ uint64_t now = os_hrtime(); \
+ remaining -= (int) ((now - before) / 1000000); \
+ before = now; \
+ if (remaining <= 0) { \
+ break; \
+ } \
+ } \
+ } \
+ } while (0)
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.h.generated.h"
diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h
index dbee3e2ba7..2dd9403d9f 100644
--- a/src/nvim/os/event_defs.h
+++ b/src/nvim/os/event_defs.h
@@ -6,25 +6,12 @@
#include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h"
-typedef void * EventSource;
+typedef struct event Event;
+typedef void (*event_handler)(Event event);
-typedef enum {
- kEventSignal,
- kEventRStreamData,
- kEventJobExit
-} EventType;
-
-typedef struct {
- EventSource source;
- EventType type;
- union {
- int signum;
- struct {
- RStream *ptr;
- bool eof;
- } rstream;
- Job *job;
- } data;
-} Event;
+struct event {
+ void *data;
+ event_handler handler;
+};
#endif // NVIM_OS_EVENT_DEFS_H
diff --git a/src/nvim/os/fs.c b/src/nvim/os/fs.c
index 36c2bb6d9b..bdaf9ecdda 100644
--- a/src/nvim/os/fs.c
+++ b/src/nvim/os/fs.c
@@ -23,6 +23,7 @@ static const int kLibuvSuccess = 0;
///
/// @return `0` on success, a libuv error code on failure.
int os_chdir(const char *path)
+ FUNC_ATTR_NONNULL_ALL
{
if (p_verbose >= 5) {
verbose_enter();
@@ -38,6 +39,7 @@ int os_chdir(const char *path)
/// @param len Length of `buf`.
/// @return `OK` for success, `FAIL` for failure.
int os_dirname(char_u *buf, size_t len)
+ FUNC_ATTR_NONNULL_ALL
{
assert(buf && len);
@@ -53,6 +55,7 @@ int os_dirname(char_u *buf, size_t len)
///
/// @return `true` if `fname` is a directory.
bool os_isdir(const char_u *name)
+ FUNC_ATTR_NONNULL_ALL
{
int32_t mode = os_getperm(name);
if (mode < 0) {
@@ -78,6 +81,7 @@ bool os_isdir(const char_u *name)
///
/// @return `false` otherwise.
bool os_can_exe(const char_u *name, char_u **abspath)
+ FUNC_ATTR_NONNULL_ARG(1)
{
// If it's an absolute or relative path don't need to use $PATH.
if (path_is_absolute_path(name) ||
@@ -100,6 +104,7 @@ bool os_can_exe(const char_u *name, char_u **abspath)
// Return true if "name" is an executable file, false if not or it doesn't
// exist.
static bool is_executable(const char_u *name)
+ FUNC_ATTR_NONNULL_ALL
{
int32_t mode = os_getperm(name);
@@ -121,6 +126,7 @@ static bool is_executable(const char_u *name)
///
/// @return `true` if `name` is an executable inside `$PATH`.
static bool is_executable_in_path(const char_u *name, char_u **abspath)
+ FUNC_ATTR_NONNULL_ARG(1)
{
const char *path = getenv("PATH");
// PATH environment variable does not exist or is empty.
@@ -176,6 +182,7 @@ static bool is_executable_in_path(const char_u *name, char_u **abspath)
/// not `O_CREAT` or `O_TMPFILE`), subject to the current umask
/// @return file descriptor, or negative `errno` on failure
int os_open(const char* path, int flags, int mode)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t open_req;
int r = uv_fs_open(uv_default_loop(), &open_req, path, flags, mode, NULL);
@@ -188,6 +195,7 @@ int os_open(const char* path, int flags, int mode)
///
/// @return OK on success, FAIL if a failure occurred.
static bool os_stat(const char *name, uv_stat_t *statbuf)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_stat(uv_default_loop(), &request, name, NULL);
@@ -200,6 +208,7 @@ static bool os_stat(const char *name, uv_stat_t *statbuf)
///
/// @return `-1` when `name` doesn't exist.
int32_t os_getperm(const char_u *name)
+ FUNC_ATTR_NONNULL_ALL
{
uv_stat_t statbuf;
if (os_stat((char *)name, &statbuf)) {
@@ -213,6 +222,7 @@ int32_t os_getperm(const char_u *name)
///
/// @return `OK` for success, `FAIL` for failure.
int os_setperm(const char_u *name, int perm)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_chmod(uv_default_loop(), &request,
@@ -233,6 +243,7 @@ int os_setperm(const char_u *name, int perm)
/// @note If the `owner` or `group` is specified as `-1`, then that ID is not
/// changed.
int os_fchown(int file_descriptor, uv_uid_t owner, uv_gid_t group)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_fchown(uv_default_loop(), &request, file_descriptor,
@@ -245,6 +256,7 @@ int os_fchown(int file_descriptor, uv_uid_t owner, uv_gid_t group)
///
/// @return `true` if `name` exists.
bool os_file_exists(const char_u *name)
+ FUNC_ATTR_NONNULL_ALL
{
uv_stat_t statbuf;
return os_stat((char *)name, &statbuf);
@@ -254,6 +266,7 @@ bool os_file_exists(const char_u *name)
///
/// @return `true` if `name` is readonly.
bool os_file_is_readonly(const char *name)
+ FUNC_ATTR_NONNULL_ALL
{
return access(name, W_OK) != 0;
}
@@ -264,6 +277,7 @@ bool os_file_is_readonly(const char *name)
/// @return `1` if `name` is writable,
/// @return `2` for a directory which we have rights to write into.
int os_file_is_writable(const char *name)
+ FUNC_ATTR_NONNULL_ALL
{
if (access(name, W_OK) == 0) {
if (os_isdir((char_u *)name)) {
@@ -278,6 +292,7 @@ int os_file_is_writable(const char *name)
///
/// @return `OK` for success, `FAIL` for failure.
int os_rename(const char_u *path, const char_u *new_path)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_rename(uv_default_loop(), &request,
@@ -295,6 +310,7 @@ int os_rename(const char_u *path, const char_u *new_path)
///
/// @return `0` for success, non-zero for failure.
int os_mkdir(const char *path, int32_t mode)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_mkdir(uv_default_loop(), &request, path, mode, NULL);
@@ -310,6 +326,7 @@ int os_mkdir(const char *path, int32_t mode)
/// failure.
/// @return `0` for success, non-zero for failure.
int os_mkdtemp(const char *template, char *path)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_mkdtemp(uv_default_loop(), &request, template, NULL);
@@ -324,6 +341,7 @@ int os_mkdtemp(const char *template, char *path)
///
/// @return `0` for success, non-zero for failure.
int os_rmdir(const char *path)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_rmdir(uv_default_loop(), &request, path, NULL);
@@ -335,6 +353,7 @@ int os_rmdir(const char *path)
///
/// @return `0` for success, non-zero for failure.
int os_remove(const char *path)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_unlink(uv_default_loop(), &request, path, NULL);
@@ -348,6 +367,7 @@ int os_remove(const char *path)
/// @param[out] file_info Pointer to a FileInfo to put the information in.
/// @return `true` on success, `false` for failure.
bool os_fileinfo(const char *path, FileInfo *file_info)
+ FUNC_ATTR_NONNULL_ALL
{
return os_stat(path, &(file_info->stat));
}
@@ -358,6 +378,7 @@ bool os_fileinfo(const char *path, FileInfo *file_info)
/// @param[out] file_info Pointer to a FileInfo to put the information in.
/// @return `true` on success, `false` for failure.
bool os_fileinfo_link(const char *path, FileInfo *file_info)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_lstat(uv_default_loop(), &request, path, NULL);
@@ -372,6 +393,7 @@ bool os_fileinfo_link(const char *path, FileInfo *file_info)
/// @param[out] file_info Pointer to a FileInfo to put the information in.
/// @return `true` on success, `false` for failure.
bool os_fileinfo_fd(int file_descriptor, FileInfo *file_info)
+ FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
int result = uv_fs_fstat(uv_default_loop(), &request, file_descriptor, NULL);
@@ -385,6 +407,7 @@ bool os_fileinfo_fd(int file_descriptor, FileInfo *file_info)
/// @return `true` if the two FileInfos represent the same file.
bool os_fileinfo_id_equal(const FileInfo *file_info_1,
const FileInfo *file_info_2)
+ FUNC_ATTR_NONNULL_ALL
{
return file_info_1->stat.st_ino == file_info_2->stat.st_ino
&& file_info_1->stat.st_dev == file_info_2->stat.st_dev;
@@ -395,6 +418,7 @@ bool os_fileinfo_id_equal(const FileInfo *file_info_1,
/// @param file_info Pointer to the `FileInfo`
/// @param[out] file_id Pointer to a `FileID`
void os_fileinfo_id(const FileInfo *file_info, FileID *file_id)
+ FUNC_ATTR_NONNULL_ALL
{
file_id->inode = file_info->stat.st_ino;
file_id->device_id = file_info->stat.st_dev;
@@ -406,6 +430,7 @@ void os_fileinfo_id(const FileInfo *file_info, FileID *file_id)
/// @param file_info Pointer to the `FileInfo`
/// @return the inode number
uint64_t os_fileinfo_inode(const FileInfo *file_info)
+ FUNC_ATTR_NONNULL_ALL
{
return file_info->stat.st_ino;
}
@@ -443,6 +468,7 @@ uint64_t os_fileinfo_blocksize(const FileInfo *file_info)
/// @param[out] file_info Pointer to a `FileID` to fill in.
/// @return `true` on sucess, `false` for failure.
bool os_fileid(const char *path, FileID *file_id)
+ FUNC_ATTR_NONNULL_ALL
{
uv_stat_t statbuf;
if (os_stat(path, &statbuf)) {
@@ -459,6 +485,7 @@ bool os_fileid(const char *path, FileID *file_id)
/// @param file_id_2 Pointer to second `FileID`
/// @return `true` if the two `FileID`s represent te same file.
bool os_fileid_equal(const FileID *file_id_1, const FileID *file_id_2)
+ FUNC_ATTR_NONNULL_ALL
{
return file_id_1->inode == file_id_2->inode
&& file_id_1->device_id == file_id_2->device_id;
@@ -471,6 +498,7 @@ bool os_fileid_equal(const FileID *file_id_1, const FileID *file_id_2)
/// @return `true` if the `FileID` and the `FileInfo` represent te same file.
bool os_fileid_equal_fileinfo(const FileID *file_id,
const FileInfo *file_info)
+ FUNC_ATTR_NONNULL_ALL
{
return file_id->inode == file_info->stat.st_ino
&& file_id->device_id == file_info->stat.st_dev;
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index a18d735ce6..d948a48b64 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -1,3 +1,4 @@
+#include <assert.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
@@ -7,7 +8,6 @@
#include "nvim/api/private/defs.h"
#include "nvim/os/input.h"
#include "nvim/os/event.h"
-#include "nvim/os/signal.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/ascii.h"
@@ -20,8 +20,8 @@
#include "nvim/getchar.h"
#include "nvim/term.h"
-#define READ_BUFFER_SIZE 0xffff
-#define INPUT_BUFFER_SIZE 4096
+#define READ_BUFFER_SIZE 0xfff
+#define INPUT_BUFFER_SIZE (READ_BUFFER_SIZE * 4)
typedef enum {
kInputNone,
@@ -48,10 +48,7 @@ void input_init(void)
}
read_buffer = rbuffer_new(READ_BUFFER_SIZE);
- read_stream = rstream_new(read_cb,
- read_buffer,
- NULL,
- NULL);
+ read_stream = rstream_new(read_cb, read_buffer, NULL);
rstream_set_file(read_stream, read_cmd_fd);
}
@@ -76,7 +73,7 @@ void input_stop(void)
}
// Low level input function.
-int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
+int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
{
InbufPollResult result;
@@ -90,7 +87,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
return 0;
}
} else {
- if ((result = inbuf_poll(p_ut)) == kInputNone) {
+ if ((result = inbuf_poll((int)p_ut)) == kInputNone) {
if (trigger_cursorhold() && maxlen >= 3
&& !typebuf_changed(tb_change_cnt)) {
buf[0] = K_SPECIAL;
@@ -119,8 +116,9 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
return 0;
}
- convert_input();
- return rbuffer_read(input_buffer, (char *)buf, maxlen);
+ // Safe to convert rbuffer_read to int, it will never overflow since
+ // we use relatively small buffers.
+ return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen);
}
// Check if a character is available for reading
@@ -133,8 +131,8 @@ bool os_char_avail(void)
// In cooked mode we should get SIGINT, no need to check.
void os_breakcheck(void)
{
- if (curr_tmode == TMODE_RAW && input_poll(0))
- convert_input();
+ if (curr_tmode == TMODE_RAW)
+ input_poll(0);
}
/// Test whether a file descriptor refers to a terminal.
@@ -167,23 +165,21 @@ void input_buffer_restore(String str)
free(str.data);
}
-static bool input_poll(int32_t ms)
+size_t input_enqueue(String keys)
{
- if (embedded_mode) {
- EventSource input_sources[] = { signal_event_source(), NULL };
- return event_poll(ms, input_sources);
- }
-
- EventSource input_sources[] = {
- rstream_event_source(read_stream),
- NULL
- };
+ size_t rv = rbuffer_write(input_buffer, keys.data, keys.size);
+ process_interrupts();
+ return rv;
+}
- return input_ready() || event_poll(ms, input_sources) || input_ready();
+static bool input_poll(int ms)
+{
+ event_poll_until(ms, input_ready());
+ return input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c
-static InbufPollResult inbuf_poll(int32_t ms)
+static InbufPollResult inbuf_poll(int ms)
{
if (typebuf_was_filled || rbuffer_pending(input_buffer)) {
return kInputAvail;
@@ -230,12 +226,14 @@ static void read_cb(RStream *rstream, void *data, bool at_eof)
}
}
+ convert_input();
+ process_interrupts();
started_reading = true;
}
static void convert_input(void)
{
- if (!rbuffer_available(input_buffer)) {
+ if (embedded_mode || !rbuffer_available(input_buffer)) {
// No input buffer space
return;
}
@@ -248,24 +246,32 @@ static void convert_input(void)
if (convert) {
// Perform input conversion according to `input_conv`
- size_t unconverted_length;
+ size_t unconverted_length = 0;
data = (char *)string_convert_ext(&input_conv,
(uint8_t *)data,
(int *)&converted_length,
(int *)&unconverted_length);
- data_length = rbuffer_pending(read_buffer) - unconverted_length;
+ data_length -= unconverted_length;
}
- // Write processed data to input buffer
- size_t consumed = rbuffer_write(input_buffer, data, data_length);
+ // The conversion code will be gone eventually, for now assume `input_buffer`
+ // always has space for the converted data(it's many times the size of
+ // `read_buffer`, so it's hard to imagine a scenario where the converted data
+ // doesn't fit)
+ assert(converted_length <= rbuffer_available(input_buffer));
+ // Write processed data to input buffer.
+ (void)rbuffer_write(input_buffer, data, converted_length);
// Adjust raw buffer pointers
- rbuffer_consumed(read_buffer, consumed);
+ rbuffer_consumed(read_buffer, data_length);
if (convert) {
// data points to memory allocated by `string_convert_ext`, free it.
free(data);
}
+}
+static void process_interrupts(void)
+{
if (!ctrl_c_interrupts) {
return;
}
@@ -273,17 +279,17 @@ static void convert_input(void)
char *inbuf = rbuffer_read_ptr(input_buffer);
size_t count = rbuffer_pending(input_buffer), consume_count = 0;
- for (int i = count - 1; i >= 0; i--) {
+ for (int i = (int)count - 1; i >= 0; i--) {
if (inbuf[i] == 3) {
- consume_count = i + 1;
+ got_int = true;
+ consume_count = (size_t)i;
break;
}
}
- if (consume_count) {
+ if (got_int) {
// Remove everything typed before the CTRL-C
rbuffer_consumed(input_buffer, consume_count);
- got_int = true;
}
}
@@ -304,6 +310,10 @@ static int push_event_key(uint8_t *buf, int maxlen)
// Check if there's pending input
static bool input_ready(void)
{
- return rstream_pending(read_stream) > 0 || eof;
+ return typebuf_was_filled || // API call filled typeahead
+ event_has_deferred() || // Events must be processed
+ (!embedded_mode && (
+ rbuffer_pending(input_buffer) > 0 || // Stdin input
+ eof)); // Stdin closed
}
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index 2ca1023290..f8ad6874c9 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -12,17 +12,29 @@
#include "nvim/os/wstream_defs.h"
#include "nvim/os/event.h"
#include "nvim/os/event_defs.h"
-#include "nvim/os/time.h"
#include "nvim/os/shell.h"
-#include "nvim/os/signal.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
-#include "nvim/term.h"
#define EXIT_TIMEOUT 25
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 0xFFFF
+#define close_job_stream(job, stream, type) \
+ do { \
+ if (job->stream) { \
+ type##stream_free(job->stream); \
+ job->stream = NULL; \
+ if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \
+ uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \
+ } \
+ } \
+ } while (0)
+
+#define close_job_in(job) close_job_stream(job, in, w)
+#define close_job_out(job) close_job_stream(job, out, r)
+#define close_job_err(job) close_job_stream(job, err, r)
+
struct job {
// Job id the index in the job table plus one.
int id;
@@ -30,13 +42,9 @@ struct job {
int64_t status;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
- // exit_cb may be called while there's still pending data from stdout/stderr.
- // We use this reference count to ensure the JobExit event is only emitted
- // when stdout/stderr are drained
- int pending_refs;
- // Same as above, but for freeing the job memory which contains
- // libuv handles. Only after all are closed the job can be safely freed.
- int pending_closes;
+ // Number of references to the job. The job resources will only be freed by
+ // close_cb when this is 0
+ int refcount;
// If the job was already stopped
bool stopped;
// Data associated with the job
@@ -99,25 +107,28 @@ void job_teardown(void)
// their status with `wait` or handling SIGCHLD. libuv does that
// automatically (and then calls `exit_cb`) but we have to give it a chance
// by running the loop one more time
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_poll(0);
// Prepare to start shooting
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL) {
- continue;
- }
+ job = table[i];
// Still alive
- while (is_alive(job) && remaining_tries--) {
+ while (job && is_alive(job) && remaining_tries--) {
os_delay(50, 0);
// Acknowledge child exits
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_poll(0);
+ // It's possible that the event_poll call removed the job from the table,
+ // reset 'job' so the next iteration won't run in that case.
+ job = table[i];
}
- if (is_alive(job)) {
+ if (job && is_alive(job)) {
uv_process_kill(&job->proc, SIGKILL);
}
}
+ // Last run to ensure all children were removed
+ event_poll(0);
}
/// Tries to start a new job.
@@ -163,8 +174,7 @@ Job *job_start(char **argv,
job->id = i + 1;
*status = job->id;
job->status = -1;
- job->pending_refs = 3;
- job->pending_closes = 4;
+ job->refcount = 4;
job->data = data;
job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb;
@@ -205,7 +215,6 @@ Job *job_start(char **argv,
// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
- free_job(job);
*status = -1;
return NULL;
}
@@ -213,14 +222,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
- job->out = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
- job->err = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
+ job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
+ job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -273,51 +276,30 @@ void job_stop(Job *job)
/// is possible on some OS.
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
{
- // switch to cooked so `got_int` will be set if the user interrupts
- int old_mode = cur_tmode;
- settmode(TMODE_COOK);
-
- EventSource sources[] = {job_event_source(job), signal_event_source(), NULL};
-
- // keep track of the elapsed time if ms > 0
- uint64_t before = (ms > 0) ? os_hrtime() : 0;
-
- while (1) {
- // check if the job has exited (and the status is available).
- if (job->pending_refs == 0) {
- break;
- }
-
- event_poll(ms, sources);
-
- // we'll assume that a user frantically hitting interrupt doesn't like
- // the current job. Signal that it has to be killed.
- if (got_int) {
- job_stop(job);
- }
-
- if (ms == 0) {
- break;
- }
-
- // check if the poll timed out, if not, decrease the ms to wait for the
- // next run
- if (ms > 0) {
- uint64_t now = os_hrtime();
- ms -= (int) ((now - before) / 1000000);
- before = now;
-
- // if the time elapsed is greater than the `ms` wait time, break
- if (ms <= 0) {
- break;
- }
- }
+ // Increase refcount to stop the job from being freed before we have a
+ // chance to get the status.
+ job->refcount++;
+ event_poll_until(ms,
+ // Until...
+ got_int || // interrupted by the user
+ job->refcount == 1); // job exited
+
+ // we'll assume that a user frantically hitting interrupt doesn't like
+ // the current job. Signal that it has to be killed.
+ if (got_int) {
+ job_stop(job);
+ event_poll(0);
}
- settmode(old_mode);
+ if (!--job->refcount) {
+ int status = (int) job->status;
+ // Manually invoke close_cb to free the job resources
+ close_cb((uv_handle_t *)&job->proc);
+ return status;
+ }
- // return -1 for a timeout, the job status otherwise
- return (job->pending_refs) ? -1 : (int) job->status;
+ // return -1 for a timeout
+ return -1;
}
/// Close the pipe used to write to the job.
@@ -331,15 +313,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
/// @param job The job instance
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
{
- if (!job->in) {
- return;
- }
-
- // let other functions in the job module know that the in pipe is no more
- wstream_free(job->in);
- job->in = NULL;
-
- uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
+ close_job_in(job);
}
/// All writes that complete after calling this function will be reported
@@ -369,14 +343,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
-/// Runs the read callback associated with the job exit event
-///
-/// @param event Object containing data necessary to invoke the callback
-void job_exit_event(Event event)
-{
- job_exit_callback(event.data.job);
-}
-
/// Get the job id
///
/// @param job A pointer to the job
@@ -395,11 +361,6 @@ void *job_data(Job *job)
return job->data;
}
-EventSource job_event_source(Job *job)
-{
- return job;
-}
-
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@@ -411,9 +372,6 @@ static void job_exit_callback(Job *job)
job->exit_cb(job, job->data);
}
- // Free the job resources
- free_job(job);
-
// Stop polling job status if this was the last
job_count--;
if (job_count == 0) {
@@ -426,16 +384,6 @@ static bool is_alive(Job *job)
return uv_process_kill(&job->proc, 0) == 0;
}
-static void free_job(Job *job)
-{
- uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
- if (job->in) {
- uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
- }
- uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
- uv_close((uv_handle_t *)&job->proc, close_cb);
-}
-
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_prepare_cb(uv_prepare_t *handle)
@@ -465,12 +413,14 @@ static void read_cb(RStream *rstream, void *data, bool eof)
if (rstream == job->out) {
job->stdout_cb(rstream, data, eof);
+ if (eof) {
+ close_job_out(job);
+ }
} else {
job->stderr_cb(rstream, data, eof);
- }
-
- if (eof && --job->pending_refs == 0) {
- emit_exit_event(job);
+ if (eof) {
+ close_job_err(job);
+ }
}
}
@@ -480,41 +430,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
Job *job = handle_get_job((uv_handle_t *)proc);
job->status = status;
- if (--job->pending_refs == 0) {
- emit_exit_event(job);
- }
-}
-
-static void emit_exit_event(Job *job)
-{
- Event event = {
- .source = job_event_source(job),
- .type = kEventJobExit,
- .data.job = job
- };
- event_push(event);
+ uv_close((uv_handle_t *)&job->proc, close_cb);
}
static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);
- if (--job->pending_closes == 0) {
- // Only free the job memory after all the associated handles are properly
- // closed by libuv
- rstream_free(job->out);
- rstream_free(job->err);
- if (job->in) {
- wstream_free(job->in);
- }
+ if (handle == (uv_handle_t *)&job->proc) {
+ // Make sure all streams are properly closed to trigger callback invocation
+ // when job->proc is closed
+ close_job_in(job);
+ close_job_out(job);
+ close_job_err(job);
+ }
- // Free data memory of process and pipe handles, that was allocated
- // by handle_set_job in job_start.
+ if (--job->refcount == 0) {
+ // Invoke the exit_cb
+ job_exit_callback(job);
+ // Free all memory allocated for the job
free(job->proc.data);
free(job->proc_stdin.data);
free(job->proc_stdout.data);
free(job->proc_stderr.data);
-
shell_free_argv(job->proc_opts.args);
free(job);
}
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
deleted file mode 100644
index 55bc006ad1..0000000000
--- a/src/nvim/os/msgpack_rpc.c
+++ /dev/null
@@ -1,188 +0,0 @@
-#include <stdint.h>
-#include <stdbool.h>
-#include <inttypes.h>
-
-#include <msgpack.h>
-
-#include "nvim/vim.h"
-#include "nvim/log.h"
-#include "nvim/memory.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/msgpack_rpc.h"
-#include "nvim/os/msgpack_rpc_helpers.h"
-#include "nvim/api/private/helpers.h"
-#include "nvim/func_attr.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc.c.generated.h"
-#endif
-
-/// Validates the basic structure of the msgpack-rpc call and fills `res`
-/// with the basic response structure.
-///
-/// @param channel_id The channel id
-/// @param req The parsed request object
-/// @param res A packer that contains the response
-WBuffer *msgpack_rpc_call(uint64_t channel_id,
- msgpack_object *req,
- msgpack_sbuffer *sbuffer)
- FUNC_ATTR_NONNULL_ARG(2)
- FUNC_ATTR_NONNULL_ARG(3)
-{
- uint64_t response_id;
- Error error = ERROR_INIT;
- msgpack_rpc_validate(&response_id, req, &error);
-
- if (error.set) {
- return serialize_response(response_id, &error, NIL, sbuffer);
- }
-
- // dispatch the call
- Object rv = msgpack_rpc_dispatch(channel_id, req, &error);
- // send the response
- msgpack_packer response;
- msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
-
- if (error.set) {
- ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
- error.msg,
- response_id);
- return serialize_response(response_id, &error, NIL, sbuffer);
- }
-
- DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
- response_id);
- return serialize_response(response_id, &error, rv, sbuffer);
-}
-
-/// Finishes the msgpack-rpc call with an error message.
-///
-/// @param msg The error message
-/// @param res A packer that contains the response
-void msgpack_rpc_error(char *msg, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ALL
-{
- size_t len = strlen(msg);
-
- // error message
- msgpack_pack_bin(res, len);
- msgpack_pack_bin_body(res, msg, len);
- // Nil result
- msgpack_pack_nil(res);
-}
-
-/// Handler executed when an invalid method name is passed
-Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
- msgpack_object *req,
- Error *error)
-{
- snprintf(error->msg, sizeof(error->msg), "Invalid method name");
- error->set = true;
- return NIL;
-}
-
-/// Serializes a msgpack-rpc request or notification(id == 0)
-WBuffer *serialize_request(uint64_t request_id,
- String method,
- Array args,
- msgpack_sbuffer *sbuffer,
- size_t refcount)
- FUNC_ATTR_NONNULL_ARG(4)
-{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&pac, request_id ? 4 : 3);
- msgpack_pack_int(&pac, request_id ? 0 : 2);
-
- if (request_id) {
- msgpack_pack_uint64(&pac, request_id);
- }
-
- msgpack_pack_bin(&pac, method.size);
- msgpack_pack_bin_body(&pac, method.data, method.size);
- msgpack_rpc_from_array(args, &pac);
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- refcount,
- free);
- api_free_array(args);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
-}
-
-/// Serializes a msgpack-rpc response
-WBuffer *serialize_response(uint64_t response_id,
- Error *err,
- Object arg,
- msgpack_sbuffer *sbuffer)
- FUNC_ATTR_NONNULL_ARG(2, 4)
-{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&pac, 4);
- msgpack_pack_int(&pac, 1);
- msgpack_pack_uint64(&pac, response_id);
-
- if (err->set) {
- // error represented by a [type, message] array
- msgpack_pack_array(&pac, 2);
- msgpack_rpc_from_integer(err->type, &pac);
- msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
- // Nil result
- msgpack_pack_nil(&pac);
- } else {
- // Nil error
- msgpack_pack_nil(&pac);
- // Return value
- msgpack_rpc_from_object(arg, &pac);
- }
-
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- 1, // responses only go though 1 channel
- free);
- api_free_object(arg);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
-}
-
-static void msgpack_rpc_validate(uint64_t *response_id,
- msgpack_object *req,
- Error *err)
-{
- // response id not known yet
-
- *response_id = 0;
- // Validate the basic structure of the msgpack-rpc payload
- if (req->type != MSGPACK_OBJECT_ARRAY) {
- api_set_error(err, Validation, _("Request is not an array"));
- }
-
- if (req->via.array.size != 4) {
- api_set_error(err, Validation, _("Request array size should be 4"));
- }
-
- if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Id must be a positive integer"));
- }
-
- // Set the response id, which is the same as the request
- *response_id = req->via.array.ptr[1].via.u64;
-
- if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Message type must be an integer"));
- }
-
- if (req->via.array.ptr[0].via.u64 != 0) {
- api_set_error(err, Validation, _("Message type must be 0"));
- }
-
- if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
- && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
- api_set_error(err, Validation, _("Method must be a string"));
- }
-
- if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
- api_set_error(err, Validation, _("Paremeters must be an array"));
- }
-}
diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h
deleted file mode 100644
index 3476d791ea..0000000000
--- a/src/nvim/os/msgpack_rpc.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#ifndef NVIM_OS_MSGPACK_RPC_H
-#define NVIM_OS_MSGPACK_RPC_H
-
-#include <stdint.h>
-
-#include <msgpack.h>
-
-#include "nvim/func_attr.h"
-#include "nvim/api/private/defs.h"
-#include "nvim/os/wstream.h"
-
-typedef enum {
- kUnpackResultOk, /// Successfully parsed a document
- kUnpackResultFail, /// Got unexpected input
- kUnpackResultNeedMore /// Need more data
-} UnpackResult;
-
-/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores
-/// functions of this type.
-typedef Object (*rpc_method_handler_fn)(uint64_t channel_id,
- msgpack_object *req,
- Error *error);
-
-
-/// Initializes the msgpack-rpc method table
-void msgpack_rpc_init(void);
-
-void msgpack_rpc_init_function_metadata(Dictionary *metadata);
-
-/// Dispatches to the actual API function after basic payload validation by
-/// `msgpack_rpc_call`. It is responsible for validating/converting arguments
-/// to C types, and converting the return value back to msgpack types.
-/// The implementation is generated at compile time with metadata extracted
-/// from the api/*.h headers,
-///
-/// @param channel_id The channel id
-/// @param method_id The method id
-/// @param req The parsed request object
-/// @param error Pointer to error structure
-/// @return Some object
-Object msgpack_rpc_dispatch(uint64_t channel_id,
- msgpack_object *req,
- Error *error)
- FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
-
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc.h.generated.h"
-#endif
-
-#endif // NVIM_OS_MSGPACK_RPC_H
diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c
deleted file mode 100644
index b14de8245c..0000000000
--- a/src/nvim/os/msgpack_rpc_helpers.c
+++ /dev/null
@@ -1,289 +0,0 @@
-#include <stdint.h>
-#include <stdbool.h>
-
-#include <msgpack.h>
-
-#include "nvim/os/msgpack_rpc_helpers.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc_helpers.c.generated.h"
-#endif
-
-static msgpack_zone zone;
-static msgpack_sbuffer sbuffer;
-
-#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \
- bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
- FUNC_ATTR_NONNULL_ALL \
- { \
- if (obj->type != MSGPACK_OBJECT_EXT \
- || obj->via.ext.type != kObjectType##t) { \
- return false; \
- } \
- \
- msgpack_object data; \
- msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \
- obj->via.ext.size, \
- NULL, \
- &zone, \
- &data); \
- \
- if (ret != MSGPACK_UNPACK_SUCCESS) { \
- return false; \
- } \
- \
- *arg = data.via.u64; \
- return true; \
- } \
- \
- void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \
- FUNC_ATTR_NONNULL_ARG(2) \
- { \
- msgpack_packer pac; \
- msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \
- msgpack_pack_uint64(&pac, o); \
- msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \
- msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \
- msgpack_sbuffer_clear(&sbuffer); \
- }
-
-void msgpack_rpc_helpers_init(void)
-{
- msgpack_zone_init(&zone, 0xfff);
- msgpack_sbuffer_init(&sbuffer);
-}
-
-HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer)
-HANDLE_TYPE_CONVERSION_IMPL(Window, window)
-HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage)
-
-bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- *arg = obj->via.boolean;
- return obj->type == MSGPACK_OBJECT_BOOLEAN;
-}
-
-bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
- && obj->via.u64 <= INT64_MAX) {
- *arg = (int64_t)obj->via.u64;
- return true;
- }
-
- *arg = obj->via.i64;
- return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
-}
-
-bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- *arg = obj->via.dec;
- return obj->type == MSGPACK_OBJECT_DOUBLE;
-}
-
-bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) {
- arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size);
- arg->size = obj->via.bin.size;
- } else {
- return false;
- }
-
- return true;
-}
-
-bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- switch (obj->type) {
- case MSGPACK_OBJECT_NIL:
- arg->type = kObjectTypeNil;
- return true;
-
- case MSGPACK_OBJECT_BOOLEAN:
- arg->type = kObjectTypeBoolean;
- return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
-
- case MSGPACK_OBJECT_POSITIVE_INTEGER:
- case MSGPACK_OBJECT_NEGATIVE_INTEGER:
- arg->type = kObjectTypeInteger;
- return msgpack_rpc_to_integer(obj, &arg->data.integer);
-
- case MSGPACK_OBJECT_DOUBLE:
- arg->type = kObjectTypeFloat;
- return msgpack_rpc_to_float(obj, &arg->data.floating);
-
- case MSGPACK_OBJECT_BIN:
- case MSGPACK_OBJECT_STR:
- arg->type = kObjectTypeString;
- return msgpack_rpc_to_string(obj, &arg->data.string);
-
- case MSGPACK_OBJECT_ARRAY:
- arg->type = kObjectTypeArray;
- return msgpack_rpc_to_array(obj, &arg->data.array);
-
- case MSGPACK_OBJECT_MAP:
- arg->type = kObjectTypeDictionary;
- return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
-
- case MSGPACK_OBJECT_EXT:
- switch (obj->via.ext.type) {
- case kObjectTypeBuffer:
- return msgpack_rpc_to_buffer(obj, &arg->data.buffer);
- case kObjectTypeWindow:
- return msgpack_rpc_to_window(obj, &arg->data.window);
- case kObjectTypeTabpage:
- return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage);
- }
- default:
- return false;
- }
-}
-
-bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- if (obj->type != MSGPACK_OBJECT_ARRAY) {
- return false;
- }
-
- arg->size = obj->via.array.size;
- arg->items = xcalloc(obj->via.array.size, sizeof(Object));
-
- for (uint32_t i = 0; i < obj->via.array.size; i++) {
- if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
- return false;
- }
- }
-
- return true;
-}
-
-bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
- FUNC_ATTR_NONNULL_ALL
-{
- if (obj->type != MSGPACK_OBJECT_MAP) {
- return false;
- }
-
- arg->size = obj->via.array.size;
- arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
-
-
- for (uint32_t i = 0; i < obj->via.map.size; i++) {
- if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
- &arg->items[i].key)) {
- return false;
- }
-
- if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
- &arg->items[i].value)) {
- return false;
- }
- }
-
- return true;
-}
-
-void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- if (result) {
- msgpack_pack_true(res);
- } else {
- msgpack_pack_false(res);
- }
-}
-
-void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- msgpack_pack_int64(res, result);
-}
-
-void msgpack_rpc_from_float(Float result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- msgpack_pack_double(res, result);
-}
-
-void msgpack_rpc_from_string(String result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- msgpack_pack_bin(res, result.size);
- msgpack_pack_bin_body(res, result.data, result.size);
-}
-
-void msgpack_rpc_from_object(Object result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- switch (result.type) {
- case kObjectTypeNil:
- msgpack_pack_nil(res);
- break;
-
- case kObjectTypeBoolean:
- msgpack_rpc_from_boolean(result.data.boolean, res);
- break;
-
- case kObjectTypeInteger:
- msgpack_rpc_from_integer(result.data.integer, res);
- break;
-
- case kObjectTypeFloat:
- msgpack_rpc_from_float(result.data.floating, res);
- break;
-
- case kObjectTypeString:
- msgpack_rpc_from_string(result.data.string, res);
- break;
-
- case kObjectTypeArray:
- msgpack_rpc_from_array(result.data.array, res);
- break;
-
- case kObjectTypeBuffer:
- msgpack_rpc_from_buffer(result.data.buffer, res);
- break;
-
- case kObjectTypeWindow:
- msgpack_rpc_from_window(result.data.window, res);
- break;
-
- case kObjectTypeTabpage:
- msgpack_rpc_from_tabpage(result.data.tabpage, res);
- break;
-
- case kObjectTypeDictionary:
- msgpack_rpc_from_dictionary(result.data.dictionary, res);
- break;
- }
-}
-
-void msgpack_rpc_from_array(Array result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- msgpack_pack_array(res, result.size);
-
- for (size_t i = 0; i < result.size; i++) {
- msgpack_rpc_from_object(result.items[i], res);
- }
-}
-
-void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2)
-{
- msgpack_pack_map(res, result.size);
-
- for (size_t i = 0; i < result.size; i++) {
- msgpack_rpc_from_string(result.items[i].key, res);
- msgpack_rpc_from_object(result.items[i].value, res);
- }
-}
diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h
deleted file mode 100644
index aede6b1587..0000000000
--- a/src/nvim/os/msgpack_rpc_helpers.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H
-#define NVIM_OS_MSGPACK_RPC_HELPERS_H
-
-#include <stdint.h>
-#include <stdbool.h>
-
-#include <msgpack.h>
-
-#include "nvim/api/private/defs.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc_helpers.h.generated.h"
-#endif
-
-#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H
-
diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c
index d4fffaa053..414c8841fa 100644
--- a/src/nvim/os/provider.c
+++ b/src/nvim/os/provider.c
@@ -8,7 +8,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
-#include "nvim/os/channel.h"
+#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/shell.h"
#include "nvim/os/os.h"
#include "nvim/log.h"
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index 8f1c30de50..beff404fd0 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -8,8 +8,6 @@
#include "nvim/os/uv_helpers.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/event.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
@@ -33,7 +31,6 @@ struct rstream {
uv_file fd;
rstream_cb cb;
bool free_handle;
- EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -76,18 +73,14 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count)
void rbuffer_produced(RBuffer *rbuffer, size_t count)
{
rbuffer->wpos += count;
- DLOG("Received %u bytes from RStream(address: %p, source: %p)",
- (size_t)cnt,
- rbuffer->rstream,
- rstream_event_source(rbuffer->rstream));
+ DLOG("Received %u bytes from RStream(%p)", (size_t)count, rbuffer->rstream);
rbuffer_relocate(rbuffer);
if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
// The last read filled the buffer, stop reading for now
+ //
rstream_stop(rbuffer->rstream);
- DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
- rstream,
- rstream_event_source(rstream));
+ DLOG("Buffer for RStream(%p) is full, stopping it", rbuffer->rstream);
}
}
@@ -180,13 +173,8 @@ void rbuffer_free(RBuffer *rbuffer)
/// for reading with `rstream_read`
/// @param buffer RBuffer instance to associate with the RStream
/// @param data Some state to associate with the `RStream` instance
-/// @param source_override Replacement for the default source used in events
-/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
-RStream * rstream_new(rstream_cb cb,
- RBuffer *buffer,
- void *data,
- EventSource source_override)
+RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = buffer;
@@ -198,7 +186,6 @@ RStream * rstream_new(rstream_cb cb,
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
- rv->source_override = source_override ? source_override : rv;
return rv;
}
@@ -322,21 +309,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count)
return rbuffer_read(rstream->buffer, buffer, count);
}
-/// Runs the read callback associated with the rstream
-///
-/// @param event Object containing data necessary to invoke the callback
-void rstream_read_event(Event event)
-{
- RStream *rstream = event.data.rstream.ptr;
-
- rstream->cb(rstream, rstream->data, event.data.rstream.eof);
-}
-
-EventSource rstream_event_source(RStream *rstream)
-{
- return rstream->source_override;
-}
-
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@@ -357,13 +329,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
- DLOG("Closing RStream(address: %p, source: %p)",
- rstream,
- rstream_event_source(rstream));
+ DLOG("Closing RStream(%p)", rstream);
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
- emit_read_event(rstream, true);
+ rstream->cb(rstream, rstream->data, true);
}
return;
}
@@ -374,7 +344,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(rstream->buffer, nread);
- emit_read_event(rstream, false);
+ rstream->cb(rstream, rstream->data, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -409,7 +379,6 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
- emit_read_event(rstream, true);
return;
}
@@ -417,7 +386,6 @@ static void fread_idle_cb(uv_idle_t *handle)
size_t nread = (size_t) req.result;
rbuffer_produced(rstream->buffer, nread);
rstream->fpos += nread;
- emit_read_event(rstream, false);
}
static void close_cb(uv_handle_t *handle)
@@ -426,21 +394,9 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
-static void emit_read_event(RStream *rstream, bool eof)
-{
- Event event = {
- .source = rstream_event_source(rstream),
- .type = kEventRStreamData,
- .data.rstream = {
- .ptr = rstream,
- .eof = eof
- }
- };
- event_push(event);
-}
-
static void rbuffer_relocate(RBuffer *rbuffer)
{
+ assert(rbuffer->rpos <= rbuffer->wpos);
// Move data ...
memmove(
rbuffer->data, // ...to the beginning of the buffer(rpos 0)
diff --git a/src/nvim/os/server.c b/src/nvim/os/server.c
deleted file mode 100644
index 9f7f5b34da..0000000000
--- a/src/nvim/os/server.c
+++ /dev/null
@@ -1,273 +0,0 @@
-#include <assert.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdint.h>
-
-#include <uv.h>
-
-#include "nvim/os/channel.h"
-#include "nvim/os/server.h"
-#include "nvim/os/os.h"
-#include "nvim/ascii.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-#include "nvim/message.h"
-#include "nvim/tempfile.h"
-#include "nvim/map.h"
-#include "nvim/path.h"
-
-#define MAX_CONNECTIONS 32
-#define ADDRESS_MAX_SIZE 256
-#define NEOVIM_DEFAULT_TCP_PORT 7450
-#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS"
-
-typedef enum {
- kServerTypeTcp,
- kServerTypePipe
-} ServerType;
-
-typedef struct {
- // Type of the union below
- ServerType type;
-
- // This is either a tcp server or unix socket(named pipe on windows)
- union {
- struct {
- uv_tcp_t handle;
- struct sockaddr_in addr;
- } tcp;
- struct {
- uv_pipe_t handle;
- char addr[ADDRESS_MAX_SIZE];
- } pipe;
- } socket;
-} Server;
-
-static PMap(cstr_t) *servers = NULL;
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/server.c.generated.h"
-#endif
-
-/// Initializes the module
-bool server_init(void)
-{
- servers = pmap_new(cstr_t)();
-
- if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) {
- char *listen_address = (char *)vim_tempname();
- os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1);
- free(listen_address);
- }
-
- return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0;
-}
-
-/// Teardown the server module
-void server_teardown(void)
-{
- if (!servers) {
- return;
- }
-
- Server *server;
-
- map_foreach_value(servers, server, {
- if (server->type == kServerTypeTcp) {
- uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
- } else {
- uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
- }
- });
-}
-
-/// Starts listening on arbitrary tcp/unix addresses specified by
-/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will
-/// be determined by parsing `endpoint`: If it's a valid tcp address in the
-/// 'ip[:port]' format, then it will be tcp socket. The port is optional
-/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will
-/// be a unix socket or named pipe.
-///
-/// @param endpoint Address of the server. Either a 'ip[:port]' string or an
-/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or
-/// named pipe.
-/// @returns zero if successful, one on a regular error, and negative errno
-/// on failure to bind or connect.
-int server_start(const char *endpoint)
- FUNC_ATTR_NONNULL_ALL
-{
- char addr[ADDRESS_MAX_SIZE];
-
- // Trim to `ADDRESS_MAX_SIZE`
- if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) {
- // TODO(aktau): since this is not what the user wanted, perhaps we
- // should return an error here
- EMSG2("Address was too long, truncated to %s", addr);
- }
-
- // Check if the server already exists
- if (pmap_has(cstr_t)(servers, addr)) {
- EMSG2("Already listening on %s", addr);
- return 1;
- }
-
- ServerType server_type = kServerTypeTcp;
- Server *server = xmalloc(sizeof(Server));
- char ip[16], *ip_end = strrchr(addr, ':');
-
- if (!ip_end) {
- ip_end = strchr(addr, NUL);
- }
-
- uint32_t addr_len = ip_end - addr;
-
- if (addr_len > sizeof(ip) - 1) {
- // Maximum length of an IP address buffer is 15(eg: 255.255.255.255)
- addr_len = sizeof(ip) - 1;
- }
-
- // Extract the address part
- xstrlcpy(ip, addr, addr_len + 1);
-
- int port = NEOVIM_DEFAULT_TCP_PORT;
-
- if (*ip_end == ':') {
- // Extract the port
- long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
- if (lport <= 0 || lport > 0xffff) {
- // Invalid port, treat as named pipe or unix socket
- server_type = kServerTypePipe;
- } else {
- port = (int) lport;
- }
- }
-
- if (server_type == kServerTypeTcp) {
- // Try to parse ip address
- if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) {
- // Invalid address, treat as named pipe or unix socket
- server_type = kServerTypePipe;
- }
- }
-
- int result;
-
- if (server_type == kServerTypeTcp) {
- // Listen on tcp address/port
- uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle);
- server->socket.tcp.handle.data = server;
- result = uv_tcp_bind(&server->socket.tcp.handle,
- (const struct sockaddr *)&server->socket.tcp.addr,
- 0);
- if (result == 0) {
- result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
- MAX_CONNECTIONS,
- connection_cb);
- if (result) {
- uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
- }
- }
- } else {
- // Listen on named pipe or unix socket
- xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr));
- uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
- server->socket.pipe.handle.data = server;
- result = uv_pipe_bind(&server->socket.pipe.handle,
- server->socket.pipe.addr);
- if (result == 0) {
- result = uv_listen((uv_stream_t *)&server->socket.pipe.handle,
- MAX_CONNECTIONS,
- connection_cb);
-
- if (result) {
- uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
- }
- }
- }
-
- assert(result <= 0); // libuv should have returned -errno or zero.
- if (result < 0) {
- if (result == -EACCES) {
- // Libuv converts ENOENT to EACCES for Windows compatibility, but if
- // the parent directory does not exist, ENOENT would be more accurate.
- *path_tail((char_u *) addr) = NUL;
- if (!os_file_exists((char_u *) addr)) {
- result = -ENOENT;
- }
- }
- EMSG2("Failed to start server: %s", uv_strerror(result));
- free(server);
- return result;
- }
-
- server->type = server_type;
-
- // Add the server to the hash table
- pmap_put(cstr_t)(servers, addr, server);
-
- return 0;
-}
-
-/// Stops listening on the address specified by `endpoint`.
-///
-/// @param endpoint Address of the server.
-void server_stop(char *endpoint)
-{
- Server *server;
- char addr[ADDRESS_MAX_SIZE];
-
- // Trim to `ADDRESS_MAX_SIZE`
- xstrlcpy(addr, endpoint, sizeof(addr));
-
- if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) {
- EMSG2("Not listening on %s", addr);
- return;
- }
-
- if (server->type == kServerTypeTcp) {
- uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
- } else {
- uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
- }
-
- pmap_del(cstr_t)(servers, addr);
-}
-
-static void connection_cb(uv_stream_t *server, int status)
-{
- int result;
- uv_stream_t *client;
- Server *srv = server->data;
-
- if (status < 0) {
- abort();
- }
-
- if (srv->type == kServerTypeTcp) {
- client = xmalloc(sizeof(uv_tcp_t));
- uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client);
- } else {
- client = xmalloc(sizeof(uv_pipe_t));
- uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0);
- }
-
- result = uv_accept(server, client);
-
- if (result) {
- EMSG2("Failed to accept connection: %s", uv_strerror(result));
- uv_close((uv_handle_t *)client, free_client);
- return;
- }
-
- channel_from_stream(client);
-}
-
-static void free_client(uv_handle_t *handle)
-{
- free(handle);
-}
-
-static void free_server(uv_handle_t *handle)
-{
- free(handle->data);
-}
diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h
deleted file mode 100644
index 43592a91e4..0000000000
--- a/src/nvim/os/server.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef NVIM_OS_SERVER_H
-#define NVIM_OS_SERVER_H
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/server.h.generated.h"
-#endif
-#endif // NVIM_OS_SERVER_H
diff --git a/src/nvim/os/server_defs.h b/src/nvim/os/server_defs.h
deleted file mode 100644
index 08cdf55428..0000000000
--- a/src/nvim/os/server_defs.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef NVIM_OS_SERVER_DEFS_H
-#define NVIM_OS_SERVER_DEFS_H
-
-typedef struct server Server;
-
-#endif // NVIM_OS_SERVER_DEFS_H
-
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 453cc6d605..d5464f7975 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -1,4 +1,5 @@
#include <string.h>
+#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
@@ -7,6 +8,7 @@
#include "nvim/ascii.h"
#include "nvim/lib/kvec.h"
#include "nvim/log.h"
+#include "nvim/os/event.h"
#include "nvim/os/job.h"
#include "nvim/os/rstream.h"
#include "nvim/os/shell.h"
@@ -58,11 +60,11 @@ typedef struct {
/// `shell_free_argv` when no longer needed.
char **shell_build_argv(const char_u *cmd, const char_u *extra_shell_opt)
{
- int argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL);
+ size_t argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL);
char **rv = xmalloc((unsigned)((argc + 4) * sizeof(char *)));
// Split 'shell'
- int i = tokenize(p_sh, rv);
+ size_t i = tokenize(p_sh, rv);
if (extra_shell_opt != NULL) {
// Push a copy of `extra_shell_opt`
@@ -212,7 +214,7 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg)
// Keep running the loop until all three handles are completely closed
while (pdata.exited < expected_exits) {
- uv_run(uv_default_loop(), UV_RUN_ONCE);
+ event_poll(0);
if (got_int) {
// Forward SIGINT to the shell
@@ -356,9 +358,9 @@ static void system_data_cb(RStream *rstream, void *data, bool eof)
/// @param argv The vector that will be filled with copies of the parsed
/// words. It can be NULL if the caller only needs to count words.
/// @return The number of words parsed.
-static int tokenize(const char_u *str, char **argv)
+static size_t tokenize(const char_u *str, char **argv)
{
- int argc = 0, len;
+ size_t argc = 0, len;
char_u *p = (char_u *) str;
while (*p != NUL) {
@@ -383,11 +385,11 @@ static int tokenize(const char_u *str, char **argv)
///
/// @param str A pointer to the first character of the word
/// @return The offset from `str` at which the word ends.
-static int word_length(const char_u *str)
+static size_t word_length(const char_u *str)
{
const char_u *p = str;
bool inquote = false;
- int length = 0;
+ size_t length = 0;
// Move `p` to the end of shell word by advancing the pointer while it's
// inside a quote or it's a non-whitespace character
@@ -418,15 +420,15 @@ static void write_selection(uv_write_t *req)
// TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and
// only after filled we should start allocating memory(skip unnecessary
// allocations for small writes)
- int buflen = BUFFER_LENGTH;
+ size_t buflen = BUFFER_LENGTH;
pdata->wbuffer = (char *)xmalloc(buflen);
uv_buf_t uvbuf;
linenr_T lnum = curbuf->b_op_start.lnum;
- int off = 0;
- int written = 0;
+ size_t off = 0;
+ size_t written = 0;
char_u *lp = ml_get(lnum);
- int l;
- int len;
+ size_t l;
+ size_t len;
for (;;) {
l = strlen((char *)lp + written);
@@ -443,7 +445,7 @@ static void write_selection(uv_write_t *req)
pdata->wbuffer[off++] = NUL;
} else {
char_u *s = vim_strchr(lp + written, NL);
- len = s == NULL ? l : s - (lp + written);
+ len = s == NULL ? l : (size_t)(s - (lp + written));
while (off + len >= buflen) {
// Resize the buffer
buflen *= 2;
@@ -584,6 +586,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
ProcessData *data = (ProcessData *)proc->data;
data->exited++;
- data->exit_status = status;
+ assert(status <= INT_MAX);
+ data->exit_status = (int)status;
uv_close((uv_handle_t *)proc, NULL);
}
diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c
index 2f93cfb08a..36f7b37c48 100644
--- a/src/nvim/os/signal.c
+++ b/src/nvim/os/signal.c
@@ -12,8 +12,6 @@
#include "nvim/memory.h"
#include "nvim/misc1.h"
#include "nvim/misc2.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/event.h"
#include "nvim/os/signal.h"
static uv_signal_t sint, spipe, shup, squit, sterm, swinch;
@@ -72,45 +70,6 @@ void signal_accept_deadly(void)
rejecting_deadly = false;
}
-void signal_handle(Event event)
-{
- int signum = event.data.signum;
-
- switch (signum) {
- case SIGINT:
- got_int = true;
- break;
-#ifdef SIGPWR
- case SIGPWR:
- // Signal of a power failure(eg batteries low), flush the swap files to
- // be safe
- ml_sync_all(false, false);
- break;
-#endif
- case SIGPIPE:
- // Ignore
- break;
- case SIGWINCH:
- shell_resized();
- break;
- case SIGTERM:
- case SIGQUIT:
- case SIGHUP:
- if (!rejecting_deadly) {
- deadly_signal(signum);
- }
- break;
- default:
- fprintf(stderr, "Invalid signal %d", signum);
- break;
- }
-}
-
-EventSource signal_event_source(void)
-{
- return &sint;
-}
-
static char * signal_name(int signum)
{
switch (signum) {
@@ -154,20 +113,32 @@ static void deadly_signal(int signum)
static void signal_cb(uv_signal_t *handle, int signum)
{
- if (rejecting_deadly) {
- if (signum == SIGINT) {
+ switch (signum) {
+ case SIGINT:
got_int = true;
- }
-
- return;
+ break;
+#ifdef SIGPWR
+ case SIGPWR:
+ // Signal of a power failure(eg batteries low), flush the swap files to
+ // be safe
+ ml_sync_all(false, false);
+ break;
+#endif
+ case SIGPIPE:
+ // Ignore
+ break;
+ case SIGWINCH:
+ shell_resized();
+ break;
+ case SIGTERM:
+ case SIGQUIT:
+ case SIGHUP:
+ if (!rejecting_deadly) {
+ deadly_signal(signum);
+ }
+ break;
+ default:
+ fprintf(stderr, "Invalid signal %d", signum);
+ break;
}
-
- Event event = {
- .source = signal_event_source(),
- .type = kEventSignal,
- .data = {
- .signum = signum
- }
- };
- event_push(event);
}
diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c
index e3b76ac833..a4871ef499 100644
--- a/src/nvim/os/time.c
+++ b/src/nvim/os/time.c
@@ -1,3 +1,4 @@
+#include <assert.h>
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
@@ -64,23 +65,6 @@ void os_microdelay(uint64_t microseconds, bool ignoreinput)
}
}
-static void microdelay(uint64_t microseconds)
-{
- uint64_t hrtime;
- int64_t ns = microseconds * 1000; // convert to nanoseconds
-
- uv_mutex_lock(&delay_mutex);
-
- while (ns > 0) {
- hrtime = uv_hrtime();
- if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns) == UV_ETIMEDOUT)
- break;
- ns -= uv_hrtime() - hrtime;
- }
-
- uv_mutex_unlock(&delay_mutex);
-}
-
/// Portable version of POSIX localtime_r()
///
/// @return NULL in case of error
@@ -112,3 +96,23 @@ struct tm *os_get_localtime(struct tm *result) FUNC_ATTR_NONNULL_ALL
time_t rawtime = time(NULL);
return os_localtime_r(&rawtime, result);
}
+
+static void microdelay(uint64_t microseconds)
+{
+ uint64_t elapsed = 0;
+ uint64_t ns = microseconds * 1000; // convert to nanoseconds
+ uint64_t base = uv_hrtime();
+
+ uv_mutex_lock(&delay_mutex);
+
+ while (elapsed < ns) {
+ if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns - elapsed)
+ == UV_ETIMEDOUT)
+ break;
+ uint64_t now = uv_hrtime();
+ elapsed += now - base;
+ base = now;
+ }
+
+ uv_mutex_unlock(&delay_mutex);
+}