diff options
Diffstat (limited to 'src/nvim/os')
| -rw-r--r-- | src/nvim/os/channel.c | 598 | ||||
| -rw-r--r-- | src/nvim/os/channel.h | 15 | ||||
| -rw-r--r-- | src/nvim/os/event.c | 126 | ||||
| -rw-r--r-- | src/nvim/os/event.h | 21 | ||||
| -rw-r--r-- | src/nvim/os/event_defs.h | 25 | ||||
| -rw-r--r-- | src/nvim/os/fs.c | 28 | ||||
| -rw-r--r-- | src/nvim/os/input.c | 82 | ||||
| -rw-r--r-- | src/nvim/os/job.c | 204 | ||||
| -rw-r--r-- | src/nvim/os/msgpack_rpc.c | 188 | ||||
| -rw-r--r-- | src/nvim/os/msgpack_rpc.h | 51 | ||||
| -rw-r--r-- | src/nvim/os/msgpack_rpc_helpers.c | 289 | ||||
| -rw-r--r-- | src/nvim/os/msgpack_rpc_helpers.h | 16 | ||||
| -rw-r--r-- | src/nvim/os/provider.c | 2 | ||||
| -rw-r--r-- | src/nvim/os/rstream.c | 60 | ||||
| -rw-r--r-- | src/nvim/os/server.c | 273 | ||||
| -rw-r--r-- | src/nvim/os/server.h | 7 | ||||
| -rw-r--r-- | src/nvim/os/server_defs.h | 7 | ||||
| -rw-r--r-- | src/nvim/os/shell.c | 31 | ||||
| -rw-r--r-- | src/nvim/os/signal.c | 81 | ||||
| -rw-r--r-- | src/nvim/os/time.c | 38 |
20 files changed, 265 insertions, 1877 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c deleted file mode 100644 index 959fbc6e73..0000000000 --- a/src/nvim/os/channel.c +++ /dev/null @@ -1,598 +0,0 @@ -#include <stdbool.h> -#include <string.h> -#include <inttypes.h> - -#include <uv.h> -#include <msgpack.h> - -#include "nvim/api/private/helpers.h" -#include "nvim/api/vim.h" -#include "nvim/os/channel.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/vim.h" -#include "nvim/ascii.h" -#include "nvim/memory.h" -#include "nvim/os_unix.h" -#include "nvim/message.h" -#include "nvim/term.h" -#include "nvim/map.h" -#include "nvim/log.h" -#include "nvim/misc1.h" -#include "nvim/lib/kvec.h" - -#define CHANNEL_BUFFER_SIZE 0xffff - -typedef struct { - uint64_t request_id; - bool errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - PMap(cstr_t) *subscribed_events; - bool is_job, enabled; - msgpack_unpacker *unpacker; - union { - Job *job; - struct { - RStream *read; - WStream *write; - uv_stream_t *uv; - } streams; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; -} Channel; - -static uint64_t next_id = 1; -static PMap(uint64_t) *channels = NULL; -static PMap(cstr_t) *event_strings = NULL; -static msgpack_sbuffer out_buffer; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.c.generated.h" -#endif - -/// Initializes the module -void channel_init(void) -{ - channels = pmap_new(uint64_t)(); - event_strings = pmap_new(cstr_t)(); - msgpack_sbuffer_init(&out_buffer); - - if (embedded_mode) { - channel_from_stdio(); - } -} - -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a job and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. -/// -/// @param argv The argument vector for the process -/// @return The channel id -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(); - channel->is_job = true; - - int status; - channel->data.job = job_start(argv, - channel, - job_out, - job_err, - NULL, - 0, - &status); - - if (status <= 0) { - close_channel(channel); - return 0; - } - - return channel->id; -} - -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection -/// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) -{ - Channel *channel = register_channel(); - stream->data = NULL; - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); - rstream_set_stream(channel->data.streams.read, stream); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_stream(channel->data.streams.write, stream); - channel->data.streams.uv = stream; -} - -bool channel_exists(uint64_t id) -{ - Channel *channel; - return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; -} - -/// Sends event/arguments to channel -/// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments -/// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *name, Array args) -{ - Channel *channel = NULL; - - if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - api_free_array(args); - return false; - } - send_event(channel, name, args); - } else { - broadcast_event(name, args); - } - - return true; -} - -/// Sends a method call to a channel -/// -/// @param id The channel id -/// @param method_name The method name, an arbitrary string -/// @param args Array with method arguments -/// @param[out] error True if the return value is an error -/// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - char *method_name, - Array args, - Error *err) -{ - Channel *channel = NULL; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); - api_free_array(args); - return NIL; - } - - if (kv_size(channel->call_stack) > 20) { - // 20 stack depth is more than anyone should ever need for RPC calls - api_set_error(err, - Exception, - _("Channel %" PRIu64 " crossed maximum stack depth"), - channel->id); - api_free_array(args); - return NIL; - } - - uint64_t request_id = channel->next_request_id++; - // Send the msgpack-rpc request - send_request(channel, request_id, method_name, args); - - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - - // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; - kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); - - do { - event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return - - if (frame.errored) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); - return NIL; - } - - return frame.result; -} - -/// Subscribes to event broadcasts -/// -/// @param id The channel id -/// @param event The event type string -void channel_subscribe(uint64_t id, char *event) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - abort(); - } - - char *event_string = pmap_get(cstr_t)(event_strings, event); - - if (!event_string) { - event_string = xstrdup(event); - pmap_put(cstr_t)(event_strings, event_string, event_string); - } - - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); -} - -/// Unsubscribes to event broadcasts -/// -/// @param id The channel id -/// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - abort(); - } - - unsubscribe(channel, event); -} - -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - return false; - } - - channel_kill(channel); - channel->enabled = false; - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -static void channel_from_stdio(void) -{ - Channel *channel = register_channel(); - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); - rstream_set_file(channel->data.streams.read, 0); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_file(channel->data.streams.write, 1); - channel->data.streams.uv = NULL; -} - -static void job_out(RStream *rstream, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(rstream, job_data(job), eof); -} - -static void job_err(RStream *rstream, void *data, bool eof) -{ - size_t count; - char buf[256]; - Channel *channel = job_data(data); - - while ((count = rstream_pending(rstream))) { - size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); - buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); - } -} - -static void parse_msgpack(RStream *rstream, void *data, bool eof) -{ - Channel *channel = data; - channel->rpc_call_level++; - - if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); - goto end; - } - - uint32_t count = rstream_pending(rstream); - DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", - count, - rstream); - - // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); - - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - msgpack_unpack_return result; - - // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { - if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { - if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); - } else { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Channel %" PRIu64 " returned a response that doesn't have " - " a matching id for the current RPC call. Ensure the client " - " is properly synchronized", - channel->id); - call_set_error(channel, buf); - } - msgpack_unpacked_destroy(&unpacked); - // Bail out from this event loop iteration - goto end; - } - - // Perform the call - WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); - // write the response - if (!channel_write(channel, resp)) { - goto end; - } - } - - if (result == MSGPACK_UNPACK_NOMEM_ERROR) { - OUT_STR(e_outofmem); - out_char('\n'); - preserve_exit(); - } - - if (result == MSGPACK_UNPACK_PARSE_ERROR) { - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - send_error(channel, 0, "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); - } - -end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); - } -} - -static bool channel_write(Channel *channel, WBuffer *buffer) -{ - bool success; - - if (channel->is_job) { - success = job_write(channel->data.job, buffer); - } else { - success = wstream_write(channel->data.streams.write, buffer); - } - - if (!success) { - // If the write failed for any reason, close the channel - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed due to a failed write", - channel->id); - call_set_error(channel, buf); - } - - return success; -} - -static void send_error(Channel *channel, uint64_t id, char *err) -{ - Error e = ERROR_INIT; - api_set_error(&e, Exception, "%s", err); - channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); -} - -static void send_request(Channel *channel, - uint64_t id, - char *name, - Array args) -{ - String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); -} - -static void send_event(Channel *channel, - char *name, - Array args) -{ - String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); -} - -static void broadcast_event(char *name, Array args) -{ - kvec_t(Channel *) subscribed; - kv_init(subscribed); - Channel *channel; - - map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { - kv_push(Channel *, subscribed, channel); - } - }); - - if (!kv_size(subscribed)) { - api_free_array(args); - goto end; - } - - String method = {.size = strlen(name), .data = name}; - WBuffer *buffer = serialize_request(0, - method, - args, - &out_buffer, - kv_size(subscribed)); - - for (size_t i = 0; i < kv_size(subscribed); i++) { - channel_write(kv_A(subscribed, i), buffer); - } - -end: - kv_destroy(subscribed); -} - -static void unsubscribe(Channel *channel, char *event) -{ - char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); - - map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { - return; - } - }); - - // Since the string is no longer used by other channels, release it's memory - pmap_del(cstr_t)(event_strings, event_string); - free(event_string); -} - -static void close_channel(Channel *channel) -{ - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ - if (channel->is_job) { - if (channel->data.job) { - job_stop(channel->data.job); - } - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - if (channel->data.streams.uv) { - uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); - } else { - // When the stdin channel closes, it's time to go - mch_exit(0); - } - } -} - -static void close_cb(uv_handle_t *handle) -{ - free(handle->data); - free(handle); -} - -static Channel *register_channel(void) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = next_id++; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; -} - -static bool is_rpc_response(msgpack_object *obj) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 4 - && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.array.ptr[0].via.u64 == 1 - && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; -} - -static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) -{ - uint64_t response_id = obj->via.array.ptr[1].via.u64; - // Must be equal to the frame at the stack's bottom - return response_id == kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1)->request_id; -} - -static void call_stack_pop(msgpack_object *obj, Channel *channel) -{ - ChannelCallFrame *frame = kv_pop(channel->call_stack); - frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - - if (frame->errored) { - msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); - } else { - msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); - } -} - -static void call_set_error(Channel *channel, char *msg) -{ - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); - frame->errored = true; - frame->result = STRING_OBJ(cstr_to_string(msg)); - } - - channel->enabled = false; -} diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h deleted file mode 100644 index bb409bfde9..0000000000 --- a/src/nvim/os/channel.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef NVIM_OS_CHANNEL_H -#define NVIM_OS_CHANNEL_H - -#include <stdbool.h> -#include <uv.h> - -#include "nvim/api/private/defs.h" -#include "nvim/vim.h" - -#define METHOD_MAXLEN 512 - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.h.generated.h" -#endif -#endif // NVIM_OS_CHANNEL_H diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index a460b2db96..2dee529452 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -7,8 +7,10 @@ #include "nvim/os/event.h" #include "nvim/os/input.h" -#include "nvim/os/channel.h" -#include "nvim/os/server.h" +#include "nvim/msgpack_rpc/defs.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/os/provider.h" #include "nvim/os/signal.h" #include "nvim/os/rstream.h" @@ -25,25 +27,22 @@ KLIST_INIT(Event, Event, _destroy_event) typedef struct { bool timed_out; - int32_t ms; + int ms; uv_timer_t *timer; } TimerData; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.c.generated.h" #endif -static klist_t(Event) *deferred_events, *immediate_events; -// NULL-terminated array of event sources that we should process immediately. -// -// Events from sources that are not contained in this array are processed -// later when `event_process` is called -static EventSource *immediate_sources = NULL; +static klist_t(Event) *pending_events; void event_init(void) { + // early msgpack-rpc initialization + msgpack_rpc_init_method_table(); + msgpack_rpc_helpers_init(); // Initialize the event queues - deferred_events = kl_init(Event); - immediate_events = kl_init(Event); + pending_events = kl_init(Event); // Initialize input events input_init(); // Timer to wake the event loop if a timeout argument is passed to @@ -52,9 +51,8 @@ void event_init(void) signal_init(); // Jobs job_init(); - // Channels + // finish mspgack-rpc initialization channel_init(); - // Servers server_init(); // Providers provider_init(); @@ -68,8 +66,7 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms, EventSource sources[]) - FUNC_ATTR_NONNULL_ARG(2) +void event_poll(int ms) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -100,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[]) run_mode = UV_RUN_NOWAIT; } - size_t processed_events; - - do { - // Run one event loop iteration, blocking for events if run_mode is - // UV_RUN_ONCE - processed_events = loop(run_mode, sources); - } while ( - // Continue running if ... - !processed_events && // we didn't process any immediate events - !event_has_deferred() && // no events are waiting to be processed - run_mode != UV_RUN_NOWAIT && // ms != 0 - !timer_data.timed_out); // we didn't get a timeout + loop(run_mode); if (!(--recursive)) { // Again, only stop when we leave the top-level invocation @@ -123,68 +109,29 @@ bool event_poll(int32_t ms, EventSource sources[]) // once more to let libuv perform it's cleanup uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL); - processed_events += loop(UV_RUN_NOWAIT, sources); + loop(UV_RUN_NOWAIT); } - - return !timer_data.timed_out && (processed_events || event_has_deferred()); } bool event_has_deferred(void) { - return !kl_empty(deferred_events); + return !kl_empty(pending_events); } // Queue an event void event_push(Event event) { - bool defer = true; - - if (immediate_sources) { - size_t i; - EventSource src; - - for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) { - if (src == event.source) { - defer = false; - break; - } - } - } - - *kl_pushp(Event, defer ? deferred_events : immediate_events) = event; + *kl_pushp(Event, pending_events) = event; } -void event_process(void) -{ - process_from(deferred_events); -} -// Runs the appropriate action for each queued event -static size_t process_from(klist_t(Event) *queue) +void event_process(void) { - size_t count = 0; Event event; - while (kl_shift(Event, queue, &event) == 0) { - switch (event.type) { - case kEventSignal: - signal_handle(event); - break; - case kEventRStreamData: - rstream_read_event(event); - break; - case kEventJobExit: - job_exit_event(event); - break; - default: - abort(); - } - count++; + while (kl_shift(Event, pending_events, &event) == 0) { + event.handler(event); } - - DLOG("Processed %u events", count); - - return count; } // Set a flag in the `event_poll` loop for signaling of a timeout @@ -202,42 +149,9 @@ static void timer_prepare_cb(uv_prepare_t *handle) uv_prepare_stop(handle); } -static void requeue_deferred_events(void) +static void loop(uv_run_mode run_mode) { - size_t remaining = deferred_events->size; - - DLOG("Number of deferred events: %u", remaining); - - while (remaining--) { - // Re-push each deferred event to ensure it will be in the right queue - Event event; - kl_shift(Event, deferred_events, &event); - event_push(event); - DLOG("Re-queueing event"); - } - - DLOG("Number of deferred events: %u", deferred_events->size); -} - -static size_t loop(uv_run_mode run_mode, EventSource *sources) -{ - size_t count; - immediate_sources = sources; - // It's possible that some events from the immediate sources are waiting - // in the deferred queue. If so, move them to the immediate queue so they - // will be processed in order of arrival by the next `process_from` call. - requeue_deferred_events(); - count = process_from(immediate_events); - - if (count) { - // No need to enter libuv, events were already processed - return count; - } - DLOG("Enter event loop"); uv_run(uv_default_loop(), run_mode); DLOG("Exit event loop"); - immediate_sources = NULL; - count = process_from(immediate_events); - return count; } diff --git a/src/nvim/os/event.h b/src/nvim/os/event.h index 29e304adc8..f8139e978d 100644 --- a/src/nvim/os/event.h +++ b/src/nvim/os/event.h @@ -6,6 +6,27 @@ #include "nvim/os/event_defs.h" #include "nvim/os/job_defs.h" +#include "nvim/os/time.h" + +// Poll for events until a condition is true or a timeout has passed +#define event_poll_until(timeout, condition) \ + do { \ + int remaining = timeout; \ + uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ + while (!(condition)) { \ + event_poll(remaining); \ + if (remaining == 0) { \ + break; \ + } else if (remaining > 0) { \ + uint64_t now = os_hrtime(); \ + remaining -= (int) ((now - before) / 1000000); \ + before = now; \ + if (remaining <= 0) { \ + break; \ + } \ + } \ + } \ + } while (0) #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.h.generated.h" diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index dbee3e2ba7..2dd9403d9f 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -6,25 +6,12 @@ #include "nvim/os/job_defs.h" #include "nvim/os/rstream_defs.h" -typedef void * EventSource; +typedef struct event Event; +typedef void (*event_handler)(Event event); -typedef enum { - kEventSignal, - kEventRStreamData, - kEventJobExit -} EventType; - -typedef struct { - EventSource source; - EventType type; - union { - int signum; - struct { - RStream *ptr; - bool eof; - } rstream; - Job *job; - } data; -} Event; +struct event { + void *data; + event_handler handler; +}; #endif // NVIM_OS_EVENT_DEFS_H diff --git a/src/nvim/os/fs.c b/src/nvim/os/fs.c index 36c2bb6d9b..bdaf9ecdda 100644 --- a/src/nvim/os/fs.c +++ b/src/nvim/os/fs.c @@ -23,6 +23,7 @@ static const int kLibuvSuccess = 0; /// /// @return `0` on success, a libuv error code on failure. int os_chdir(const char *path) + FUNC_ATTR_NONNULL_ALL { if (p_verbose >= 5) { verbose_enter(); @@ -38,6 +39,7 @@ int os_chdir(const char *path) /// @param len Length of `buf`. /// @return `OK` for success, `FAIL` for failure. int os_dirname(char_u *buf, size_t len) + FUNC_ATTR_NONNULL_ALL { assert(buf && len); @@ -53,6 +55,7 @@ int os_dirname(char_u *buf, size_t len) /// /// @return `true` if `fname` is a directory. bool os_isdir(const char_u *name) + FUNC_ATTR_NONNULL_ALL { int32_t mode = os_getperm(name); if (mode < 0) { @@ -78,6 +81,7 @@ bool os_isdir(const char_u *name) /// /// @return `false` otherwise. bool os_can_exe(const char_u *name, char_u **abspath) + FUNC_ATTR_NONNULL_ARG(1) { // If it's an absolute or relative path don't need to use $PATH. if (path_is_absolute_path(name) || @@ -100,6 +104,7 @@ bool os_can_exe(const char_u *name, char_u **abspath) // Return true if "name" is an executable file, false if not or it doesn't // exist. static bool is_executable(const char_u *name) + FUNC_ATTR_NONNULL_ALL { int32_t mode = os_getperm(name); @@ -121,6 +126,7 @@ static bool is_executable(const char_u *name) /// /// @return `true` if `name` is an executable inside `$PATH`. static bool is_executable_in_path(const char_u *name, char_u **abspath) + FUNC_ATTR_NONNULL_ARG(1) { const char *path = getenv("PATH"); // PATH environment variable does not exist or is empty. @@ -176,6 +182,7 @@ static bool is_executable_in_path(const char_u *name, char_u **abspath) /// not `O_CREAT` or `O_TMPFILE`), subject to the current umask /// @return file descriptor, or negative `errno` on failure int os_open(const char* path, int flags, int mode) + FUNC_ATTR_NONNULL_ALL { uv_fs_t open_req; int r = uv_fs_open(uv_default_loop(), &open_req, path, flags, mode, NULL); @@ -188,6 +195,7 @@ int os_open(const char* path, int flags, int mode) /// /// @return OK on success, FAIL if a failure occurred. static bool os_stat(const char *name, uv_stat_t *statbuf) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_stat(uv_default_loop(), &request, name, NULL); @@ -200,6 +208,7 @@ static bool os_stat(const char *name, uv_stat_t *statbuf) /// /// @return `-1` when `name` doesn't exist. int32_t os_getperm(const char_u *name) + FUNC_ATTR_NONNULL_ALL { uv_stat_t statbuf; if (os_stat((char *)name, &statbuf)) { @@ -213,6 +222,7 @@ int32_t os_getperm(const char_u *name) /// /// @return `OK` for success, `FAIL` for failure. int os_setperm(const char_u *name, int perm) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_chmod(uv_default_loop(), &request, @@ -233,6 +243,7 @@ int os_setperm(const char_u *name, int perm) /// @note If the `owner` or `group` is specified as `-1`, then that ID is not /// changed. int os_fchown(int file_descriptor, uv_uid_t owner, uv_gid_t group) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_fchown(uv_default_loop(), &request, file_descriptor, @@ -245,6 +256,7 @@ int os_fchown(int file_descriptor, uv_uid_t owner, uv_gid_t group) /// /// @return `true` if `name` exists. bool os_file_exists(const char_u *name) + FUNC_ATTR_NONNULL_ALL { uv_stat_t statbuf; return os_stat((char *)name, &statbuf); @@ -254,6 +266,7 @@ bool os_file_exists(const char_u *name) /// /// @return `true` if `name` is readonly. bool os_file_is_readonly(const char *name) + FUNC_ATTR_NONNULL_ALL { return access(name, W_OK) != 0; } @@ -264,6 +277,7 @@ bool os_file_is_readonly(const char *name) /// @return `1` if `name` is writable, /// @return `2` for a directory which we have rights to write into. int os_file_is_writable(const char *name) + FUNC_ATTR_NONNULL_ALL { if (access(name, W_OK) == 0) { if (os_isdir((char_u *)name)) { @@ -278,6 +292,7 @@ int os_file_is_writable(const char *name) /// /// @return `OK` for success, `FAIL` for failure. int os_rename(const char_u *path, const char_u *new_path) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_rename(uv_default_loop(), &request, @@ -295,6 +310,7 @@ int os_rename(const char_u *path, const char_u *new_path) /// /// @return `0` for success, non-zero for failure. int os_mkdir(const char *path, int32_t mode) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_mkdir(uv_default_loop(), &request, path, mode, NULL); @@ -310,6 +326,7 @@ int os_mkdir(const char *path, int32_t mode) /// failure. /// @return `0` for success, non-zero for failure. int os_mkdtemp(const char *template, char *path) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_mkdtemp(uv_default_loop(), &request, template, NULL); @@ -324,6 +341,7 @@ int os_mkdtemp(const char *template, char *path) /// /// @return `0` for success, non-zero for failure. int os_rmdir(const char *path) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_rmdir(uv_default_loop(), &request, path, NULL); @@ -335,6 +353,7 @@ int os_rmdir(const char *path) /// /// @return `0` for success, non-zero for failure. int os_remove(const char *path) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_unlink(uv_default_loop(), &request, path, NULL); @@ -348,6 +367,7 @@ int os_remove(const char *path) /// @param[out] file_info Pointer to a FileInfo to put the information in. /// @return `true` on success, `false` for failure. bool os_fileinfo(const char *path, FileInfo *file_info) + FUNC_ATTR_NONNULL_ALL { return os_stat(path, &(file_info->stat)); } @@ -358,6 +378,7 @@ bool os_fileinfo(const char *path, FileInfo *file_info) /// @param[out] file_info Pointer to a FileInfo to put the information in. /// @return `true` on success, `false` for failure. bool os_fileinfo_link(const char *path, FileInfo *file_info) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_lstat(uv_default_loop(), &request, path, NULL); @@ -372,6 +393,7 @@ bool os_fileinfo_link(const char *path, FileInfo *file_info) /// @param[out] file_info Pointer to a FileInfo to put the information in. /// @return `true` on success, `false` for failure. bool os_fileinfo_fd(int file_descriptor, FileInfo *file_info) + FUNC_ATTR_NONNULL_ALL { uv_fs_t request; int result = uv_fs_fstat(uv_default_loop(), &request, file_descriptor, NULL); @@ -385,6 +407,7 @@ bool os_fileinfo_fd(int file_descriptor, FileInfo *file_info) /// @return `true` if the two FileInfos represent the same file. bool os_fileinfo_id_equal(const FileInfo *file_info_1, const FileInfo *file_info_2) + FUNC_ATTR_NONNULL_ALL { return file_info_1->stat.st_ino == file_info_2->stat.st_ino && file_info_1->stat.st_dev == file_info_2->stat.st_dev; @@ -395,6 +418,7 @@ bool os_fileinfo_id_equal(const FileInfo *file_info_1, /// @param file_info Pointer to the `FileInfo` /// @param[out] file_id Pointer to a `FileID` void os_fileinfo_id(const FileInfo *file_info, FileID *file_id) + FUNC_ATTR_NONNULL_ALL { file_id->inode = file_info->stat.st_ino; file_id->device_id = file_info->stat.st_dev; @@ -406,6 +430,7 @@ void os_fileinfo_id(const FileInfo *file_info, FileID *file_id) /// @param file_info Pointer to the `FileInfo` /// @return the inode number uint64_t os_fileinfo_inode(const FileInfo *file_info) + FUNC_ATTR_NONNULL_ALL { return file_info->stat.st_ino; } @@ -443,6 +468,7 @@ uint64_t os_fileinfo_blocksize(const FileInfo *file_info) /// @param[out] file_info Pointer to a `FileID` to fill in. /// @return `true` on sucess, `false` for failure. bool os_fileid(const char *path, FileID *file_id) + FUNC_ATTR_NONNULL_ALL { uv_stat_t statbuf; if (os_stat(path, &statbuf)) { @@ -459,6 +485,7 @@ bool os_fileid(const char *path, FileID *file_id) /// @param file_id_2 Pointer to second `FileID` /// @return `true` if the two `FileID`s represent te same file. bool os_fileid_equal(const FileID *file_id_1, const FileID *file_id_2) + FUNC_ATTR_NONNULL_ALL { return file_id_1->inode == file_id_2->inode && file_id_1->device_id == file_id_2->device_id; @@ -471,6 +498,7 @@ bool os_fileid_equal(const FileID *file_id_1, const FileID *file_id_2) /// @return `true` if the `FileID` and the `FileInfo` represent te same file. bool os_fileid_equal_fileinfo(const FileID *file_id, const FileInfo *file_info) + FUNC_ATTR_NONNULL_ALL { return file_id->inode == file_info->stat.st_ino && file_id->device_id == file_info->stat.st_dev; diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index a18d735ce6..d948a48b64 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -1,3 +1,4 @@ +#include <assert.h> #include <string.h> #include <stdint.h> #include <stdbool.h> @@ -7,7 +8,6 @@ #include "nvim/api/private/defs.h" #include "nvim/os/input.h" #include "nvim/os/event.h" -#include "nvim/os/signal.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" #include "nvim/ascii.h" @@ -20,8 +20,8 @@ #include "nvim/getchar.h" #include "nvim/term.h" -#define READ_BUFFER_SIZE 0xffff -#define INPUT_BUFFER_SIZE 4096 +#define READ_BUFFER_SIZE 0xfff +#define INPUT_BUFFER_SIZE (READ_BUFFER_SIZE * 4) typedef enum { kInputNone, @@ -48,10 +48,7 @@ void input_init(void) } read_buffer = rbuffer_new(READ_BUFFER_SIZE); - read_stream = rstream_new(read_cb, - read_buffer, - NULL, - NULL); + read_stream = rstream_new(read_cb, read_buffer, NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -76,7 +73,7 @@ void input_stop(void) } // Low level input function. -int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) +int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) { InbufPollResult result; @@ -90,7 +87,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) return 0; } } else { - if ((result = inbuf_poll(p_ut)) == kInputNone) { + if ((result = inbuf_poll((int)p_ut)) == kInputNone) { if (trigger_cursorhold() && maxlen >= 3 && !typebuf_changed(tb_change_cnt)) { buf[0] = K_SPECIAL; @@ -119,8 +116,9 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) return 0; } - convert_input(); - return rbuffer_read(input_buffer, (char *)buf, maxlen); + // Safe to convert rbuffer_read to int, it will never overflow since + // we use relatively small buffers. + return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); } // Check if a character is available for reading @@ -133,8 +131,8 @@ bool os_char_avail(void) // In cooked mode we should get SIGINT, no need to check. void os_breakcheck(void) { - if (curr_tmode == TMODE_RAW && input_poll(0)) - convert_input(); + if (curr_tmode == TMODE_RAW) + input_poll(0); } /// Test whether a file descriptor refers to a terminal. @@ -167,23 +165,21 @@ void input_buffer_restore(String str) free(str.data); } -static bool input_poll(int32_t ms) +size_t input_enqueue(String keys) { - if (embedded_mode) { - EventSource input_sources[] = { signal_event_source(), NULL }; - return event_poll(ms, input_sources); - } - - EventSource input_sources[] = { - rstream_event_source(read_stream), - NULL - }; + size_t rv = rbuffer_write(input_buffer, keys.data, keys.size); + process_interrupts(); + return rv; +} - return input_ready() || event_poll(ms, input_sources) || input_ready(); +static bool input_poll(int ms) +{ + event_poll_until(ms, input_ready()); + return input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c -static InbufPollResult inbuf_poll(int32_t ms) +static InbufPollResult inbuf_poll(int ms) { if (typebuf_was_filled || rbuffer_pending(input_buffer)) { return kInputAvail; @@ -230,12 +226,14 @@ static void read_cb(RStream *rstream, void *data, bool at_eof) } } + convert_input(); + process_interrupts(); started_reading = true; } static void convert_input(void) { - if (!rbuffer_available(input_buffer)) { + if (embedded_mode || !rbuffer_available(input_buffer)) { // No input buffer space return; } @@ -248,24 +246,32 @@ static void convert_input(void) if (convert) { // Perform input conversion according to `input_conv` - size_t unconverted_length; + size_t unconverted_length = 0; data = (char *)string_convert_ext(&input_conv, (uint8_t *)data, (int *)&converted_length, (int *)&unconverted_length); - data_length = rbuffer_pending(read_buffer) - unconverted_length; + data_length -= unconverted_length; } - // Write processed data to input buffer - size_t consumed = rbuffer_write(input_buffer, data, data_length); + // The conversion code will be gone eventually, for now assume `input_buffer` + // always has space for the converted data(it's many times the size of + // `read_buffer`, so it's hard to imagine a scenario where the converted data + // doesn't fit) + assert(converted_length <= rbuffer_available(input_buffer)); + // Write processed data to input buffer. + (void)rbuffer_write(input_buffer, data, converted_length); // Adjust raw buffer pointers - rbuffer_consumed(read_buffer, consumed); + rbuffer_consumed(read_buffer, data_length); if (convert) { // data points to memory allocated by `string_convert_ext`, free it. free(data); } +} +static void process_interrupts(void) +{ if (!ctrl_c_interrupts) { return; } @@ -273,17 +279,17 @@ static void convert_input(void) char *inbuf = rbuffer_read_ptr(input_buffer); size_t count = rbuffer_pending(input_buffer), consume_count = 0; - for (int i = count - 1; i >= 0; i--) { + for (int i = (int)count - 1; i >= 0; i--) { if (inbuf[i] == 3) { - consume_count = i + 1; + got_int = true; + consume_count = (size_t)i; break; } } - if (consume_count) { + if (got_int) { // Remove everything typed before the CTRL-C rbuffer_consumed(input_buffer, consume_count); - got_int = true; } } @@ -304,6 +310,10 @@ static int push_event_key(uint8_t *buf, int maxlen) // Check if there's pending input static bool input_ready(void) { - return rstream_pending(read_stream) > 0 || eof; + return typebuf_was_filled || // API call filled typeahead + event_has_deferred() || // Events must be processed + (!embedded_mode && ( + rbuffer_pending(input_buffer) > 0 || // Stdin input + eof)); // Stdin closed } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 2ca1023290..f8ad6874c9 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -12,17 +12,29 @@ #include "nvim/os/wstream_defs.h" #include "nvim/os/event.h" #include "nvim/os/event_defs.h" -#include "nvim/os/time.h" #include "nvim/os/shell.h" -#include "nvim/os/signal.h" #include "nvim/vim.h" #include "nvim/memory.h" -#include "nvim/term.h" #define EXIT_TIMEOUT 25 #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 0xFFFF +#define close_job_stream(job, stream, type) \ + do { \ + if (job->stream) { \ + type##stream_free(job->stream); \ + job->stream = NULL; \ + if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \ + uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \ + } \ + } \ + } while (0) + +#define close_job_in(job) close_job_stream(job, in, w) +#define close_job_out(job) close_job_stream(job, out, r) +#define close_job_err(job) close_job_stream(job, err, r) + struct job { // Job id the index in the job table plus one. int id; @@ -30,13 +42,9 @@ struct job { int64_t status; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; - // exit_cb may be called while there's still pending data from stdout/stderr. - // We use this reference count to ensure the JobExit event is only emitted - // when stdout/stderr are drained - int pending_refs; - // Same as above, but for freeing the job memory which contains - // libuv handles. Only after all are closed the job can be safely freed. - int pending_closes; + // Number of references to the job. The job resources will only be freed by + // close_cb when this is 0 + int refcount; // If the job was already stopped bool stopped; // Data associated with the job @@ -99,25 +107,28 @@ void job_teardown(void) // their status with `wait` or handling SIGCHLD. libuv does that // automatically (and then calls `exit_cb`) but we have to give it a chance // by running the loop one more time - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); // Prepare to start shooting for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL) { - continue; - } + job = table[i]; // Still alive - while (is_alive(job) && remaining_tries--) { + while (job && is_alive(job) && remaining_tries--) { os_delay(50, 0); // Acknowledge child exits - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); + // It's possible that the event_poll call removed the job from the table, + // reset 'job' so the next iteration won't run in that case. + job = table[i]; } - if (is_alive(job)) { + if (job && is_alive(job)) { uv_process_kill(&job->proc, SIGKILL); } } + // Last run to ensure all children were removed + event_poll(0); } /// Tries to start a new job. @@ -163,8 +174,7 @@ Job *job_start(char **argv, job->id = i + 1; *status = job->id; job->status = -1; - job->pending_refs = 3; - job->pending_closes = 4; + job->refcount = 4; job->data = data; job->stdout_cb = stdout_cb; job->stderr_cb = stderr_cb; @@ -205,7 +215,6 @@ Job *job_start(char **argv, // Spawn the job if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { - free_job(job); *status = -1; return NULL; } @@ -213,14 +222,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); - job->err = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); + job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -273,51 +276,30 @@ void job_stop(Job *job) /// is possible on some OS. int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL { - // switch to cooked so `got_int` will be set if the user interrupts - int old_mode = cur_tmode; - settmode(TMODE_COOK); - - EventSource sources[] = {job_event_source(job), signal_event_source(), NULL}; - - // keep track of the elapsed time if ms > 0 - uint64_t before = (ms > 0) ? os_hrtime() : 0; - - while (1) { - // check if the job has exited (and the status is available). - if (job->pending_refs == 0) { - break; - } - - event_poll(ms, sources); - - // we'll assume that a user frantically hitting interrupt doesn't like - // the current job. Signal that it has to be killed. - if (got_int) { - job_stop(job); - } - - if (ms == 0) { - break; - } - - // check if the poll timed out, if not, decrease the ms to wait for the - // next run - if (ms > 0) { - uint64_t now = os_hrtime(); - ms -= (int) ((now - before) / 1000000); - before = now; - - // if the time elapsed is greater than the `ms` wait time, break - if (ms <= 0) { - break; - } - } + // Increase refcount to stop the job from being freed before we have a + // chance to get the status. + job->refcount++; + event_poll_until(ms, + // Until... + got_int || // interrupted by the user + job->refcount == 1); // job exited + + // we'll assume that a user frantically hitting interrupt doesn't like + // the current job. Signal that it has to be killed. + if (got_int) { + job_stop(job); + event_poll(0); } - settmode(old_mode); + if (!--job->refcount) { + int status = (int) job->status; + // Manually invoke close_cb to free the job resources + close_cb((uv_handle_t *)&job->proc); + return status; + } - // return -1 for a timeout, the job status otherwise - return (job->pending_refs) ? -1 : (int) job->status; + // return -1 for a timeout + return -1; } /// Close the pipe used to write to the job. @@ -331,15 +313,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL /// @param job The job instance void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL { - if (!job->in) { - return; - } - - // let other functions in the job module know that the in pipe is no more - wstream_free(job->in); - job->in = NULL; - - uv_close((uv_handle_t *)&job->proc_stdin, close_cb); + close_job_in(job); } /// All writes that complete after calling this function will be reported @@ -369,14 +343,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Runs the read callback associated with the job exit event -/// -/// @param event Object containing data necessary to invoke the callback -void job_exit_event(Event event) -{ - job_exit_callback(event.data.job); -} - /// Get the job id /// /// @param job A pointer to the job @@ -395,11 +361,6 @@ void *job_data(Job *job) return job->data; } -EventSource job_event_source(Job *job) -{ - return job; -} - static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -411,9 +372,6 @@ static void job_exit_callback(Job *job) job->exit_cb(job, job->data); } - // Free the job resources - free_job(job); - // Stop polling job status if this was the last job_count--; if (job_count == 0) { @@ -426,16 +384,6 @@ static bool is_alive(Job *job) return uv_process_kill(&job->proc, 0) == 0; } -static void free_job(Job *job) -{ - uv_close((uv_handle_t *)&job->proc_stdout, close_cb); - if (job->in) { - uv_close((uv_handle_t *)&job->proc_stdin, close_cb); - } - uv_close((uv_handle_t *)&job->proc_stderr, close_cb); - uv_close((uv_handle_t *)&job->proc, close_cb); -} - /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// that didn't die from SIGTERM after a while(exit_timeout is 0). static void job_prepare_cb(uv_prepare_t *handle) @@ -465,12 +413,14 @@ static void read_cb(RStream *rstream, void *data, bool eof) if (rstream == job->out) { job->stdout_cb(rstream, data, eof); + if (eof) { + close_job_out(job); + } } else { job->stderr_cb(rstream, data, eof); - } - - if (eof && --job->pending_refs == 0) { - emit_exit_event(job); + if (eof) { + close_job_err(job); + } } } @@ -480,41 +430,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) Job *job = handle_get_job((uv_handle_t *)proc); job->status = status; - if (--job->pending_refs == 0) { - emit_exit_event(job); - } -} - -static void emit_exit_event(Job *job) -{ - Event event = { - .source = job_event_source(job), - .type = kEventJobExit, - .data.job = job - }; - event_push(event); + uv_close((uv_handle_t *)&job->proc, close_cb); } static void close_cb(uv_handle_t *handle) { Job *job = handle_get_job(handle); - if (--job->pending_closes == 0) { - // Only free the job memory after all the associated handles are properly - // closed by libuv - rstream_free(job->out); - rstream_free(job->err); - if (job->in) { - wstream_free(job->in); - } + if (handle == (uv_handle_t *)&job->proc) { + // Make sure all streams are properly closed to trigger callback invocation + // when job->proc is closed + close_job_in(job); + close_job_out(job); + close_job_err(job); + } - // Free data memory of process and pipe handles, that was allocated - // by handle_set_job in job_start. + if (--job->refcount == 0) { + // Invoke the exit_cb + job_exit_callback(job); + // Free all memory allocated for the job free(job->proc.data); free(job->proc_stdin.data); free(job->proc_stdout.data); free(job->proc_stderr.data); - shell_free_argv(job->proc_opts.args); free(job); } diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c deleted file mode 100644 index 55bc006ad1..0000000000 --- a/src/nvim/os/msgpack_rpc.c +++ /dev/null @@ -1,188 +0,0 @@ -#include <stdint.h> -#include <stdbool.h> -#include <inttypes.h> - -#include <msgpack.h> - -#include "nvim/vim.h" -#include "nvim/log.h" -#include "nvim/memory.h" -#include "nvim/os/wstream.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/api/private/helpers.h" -#include "nvim/func_attr.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.c.generated.h" -#endif - -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param channel_id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -WBuffer *msgpack_rpc_call(uint64_t channel_id, - msgpack_object *req, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2) - FUNC_ATTR_NONNULL_ARG(3) -{ - uint64_t response_id; - Error error = ERROR_INIT; - msgpack_rpc_validate(&response_id, req, &error); - - if (error.set) { - return serialize_response(response_id, &error, NIL, sbuffer); - } - - // dispatch the call - Object rv = msgpack_rpc_dispatch(channel_id, req, &error); - // send the response - msgpack_packer response; - msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); - - if (error.set) { - ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", - error.msg, - response_id); - return serialize_response(response_id, &error, NIL, sbuffer); - } - - DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", - response_id); - return serialize_response(response_id, &error, rv, sbuffer); -} - -/// Finishes the msgpack-rpc call with an error message. -/// -/// @param msg The error message -/// @param res A packer that contains the response -void msgpack_rpc_error(char *msg, msgpack_packer *res) - FUNC_ATTR_NONNULL_ALL -{ - size_t len = strlen(msg); - - // error message - msgpack_pack_bin(res, len); - msgpack_pack_bin_body(res, msg, len); - // Nil result - msgpack_pack_nil(res); -} - -/// Handler executed when an invalid method name is passed -Object msgpack_rpc_handle_missing_method(uint64_t channel_id, - msgpack_object *req, - Error *error) -{ - snprintf(error->msg, sizeof(error->msg), "Invalid method name"); - error->set = true; - return NIL; -} - -/// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t request_id, - String method, - Array args, - msgpack_sbuffer *sbuffer, - size_t refcount) - FUNC_ATTR_NONNULL_ARG(4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, request_id ? 4 : 3); - msgpack_pack_int(&pac, request_id ? 0 : 2); - - if (request_id) { - msgpack_pack_uint64(&pac, request_id); - } - - msgpack_pack_bin(&pac, method.size); - msgpack_pack_bin_body(&pac, method.data, method.size); - msgpack_rpc_from_array(args, &pac); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - free); - api_free_array(args); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -/// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t response_id, - Error *err, - Object arg, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2, 4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, 4); - msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, response_id); - - if (err->set) { - // error represented by a [type, message] array - msgpack_pack_array(&pac, 2); - msgpack_rpc_from_integer(err->type, &pac); - msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); - // Nil result - msgpack_pack_nil(&pac); - } else { - // Nil error - msgpack_pack_nil(&pac); - // Return value - msgpack_rpc_from_object(arg, &pac); - } - - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - free); - api_free_object(arg); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -static void msgpack_rpc_validate(uint64_t *response_id, - msgpack_object *req, - Error *err) -{ - // response id not known yet - - *response_id = 0; - // Validate the basic structure of the msgpack-rpc payload - if (req->type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Request is not an array")); - } - - if (req->via.array.size != 4) { - api_set_error(err, Validation, _("Request array size should be 4")); - } - - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Id must be a positive integer")); - } - - // Set the response id, which is the same as the request - *response_id = req->via.array.ptr[1].via.u64; - - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Message type must be an integer")); - } - - if (req->via.array.ptr[0].via.u64 != 0) { - api_set_error(err, Validation, _("Message type must be 0")); - } - - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN - && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { - api_set_error(err, Validation, _("Method must be a string")); - } - - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Paremeters must be an array")); - } -} diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h deleted file mode 100644 index 3476d791ea..0000000000 --- a/src/nvim/os/msgpack_rpc.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_H -#define NVIM_OS_MSGPACK_RPC_H - -#include <stdint.h> - -#include <msgpack.h> - -#include "nvim/func_attr.h" -#include "nvim/api/private/defs.h" -#include "nvim/os/wstream.h" - -typedef enum { - kUnpackResultOk, /// Successfully parsed a document - kUnpackResultFail, /// Got unexpected input - kUnpackResultNeedMore /// Need more data -} UnpackResult; - -/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores -/// functions of this type. -typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, - msgpack_object *req, - Error *error); - - -/// Initializes the msgpack-rpc method table -void msgpack_rpc_init(void); - -void msgpack_rpc_init_function_metadata(Dictionary *metadata); - -/// Dispatches to the actual API function after basic payload validation by -/// `msgpack_rpc_call`. It is responsible for validating/converting arguments -/// to C types, and converting the return value back to msgpack types. -/// The implementation is generated at compile time with metadata extracted -/// from the api/*.h headers, -/// -/// @param channel_id The channel id -/// @param method_id The method id -/// @param req The parsed request object -/// @param error Pointer to error structure -/// @return Some object -Object msgpack_rpc_dispatch(uint64_t channel_id, - msgpack_object *req, - Error *error) - FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); - - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_H diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c deleted file mode 100644 index b14de8245c..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.c +++ /dev/null @@ -1,289 +0,0 @@ -#include <stdint.h> -#include <stdbool.h> - -#include <msgpack.h> - -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.c.generated.h" -#endif - -static msgpack_zone zone; -static msgpack_sbuffer sbuffer; - -#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ - FUNC_ATTR_NONNULL_ALL \ - { \ - if (obj->type != MSGPACK_OBJECT_EXT \ - || obj->via.ext.type != kObjectType##t) { \ - return false; \ - } \ - \ - msgpack_object data; \ - msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \ - obj->via.ext.size, \ - NULL, \ - &zone, \ - &data); \ - \ - if (ret != MSGPACK_UNPACK_SUCCESS) { \ - return false; \ - } \ - \ - *arg = data.via.u64; \ - return true; \ - } \ - \ - void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \ - FUNC_ATTR_NONNULL_ARG(2) \ - { \ - msgpack_packer pac; \ - msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ - msgpack_pack_uint64(&pac, o); \ - msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ - msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ - msgpack_sbuffer_clear(&sbuffer); \ - } - -void msgpack_rpc_helpers_init(void) -{ - msgpack_zone_init(&zone, 0xfff); - msgpack_sbuffer_init(&sbuffer); -} - -HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer) -HANDLE_TYPE_CONVERSION_IMPL(Window, window) -HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage) - -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) - FUNC_ATTR_NONNULL_ALL -{ - *arg = obj->via.boolean; - return obj->type == MSGPACK_OBJECT_BOOLEAN; -} - -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.u64 <= INT64_MAX) { - *arg = (int64_t)obj->via.u64; - return true; - } - - *arg = obj->via.i64; - return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; -} - -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) - FUNC_ATTR_NONNULL_ALL -{ - *arg = obj->via.dec; - return obj->type == MSGPACK_OBJECT_DOUBLE; -} - -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) { - arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size); - arg->size = obj->via.bin.size; - } else { - return false; - } - - return true; -} - -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) - FUNC_ATTR_NONNULL_ALL -{ - switch (obj->type) { - case MSGPACK_OBJECT_NIL: - arg->type = kObjectTypeNil; - return true; - - case MSGPACK_OBJECT_BOOLEAN: - arg->type = kObjectTypeBoolean; - return msgpack_rpc_to_boolean(obj, &arg->data.boolean); - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - arg->type = kObjectTypeInteger; - return msgpack_rpc_to_integer(obj, &arg->data.integer); - - case MSGPACK_OBJECT_DOUBLE: - arg->type = kObjectTypeFloat; - return msgpack_rpc_to_float(obj, &arg->data.floating); - - case MSGPACK_OBJECT_BIN: - case MSGPACK_OBJECT_STR: - arg->type = kObjectTypeString; - return msgpack_rpc_to_string(obj, &arg->data.string); - - case MSGPACK_OBJECT_ARRAY: - arg->type = kObjectTypeArray; - return msgpack_rpc_to_array(obj, &arg->data.array); - - case MSGPACK_OBJECT_MAP: - arg->type = kObjectTypeDictionary; - return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); - - case MSGPACK_OBJECT_EXT: - switch (obj->via.ext.type) { - case kObjectTypeBuffer: - return msgpack_rpc_to_buffer(obj, &arg->data.buffer); - case kObjectTypeWindow: - return msgpack_rpc_to_window(obj, &arg->data.window); - case kObjectTypeTabpage: - return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); - } - default: - return false; - } -} - -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type != MSGPACK_OBJECT_ARRAY) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.array.size, sizeof(Object)); - - for (uint32_t i = 0; i < obj->via.array.size; i++) { - if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { - return false; - } - } - - return true; -} - -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type != MSGPACK_OBJECT_MAP) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); - - - for (uint32_t i = 0; i < obj->via.map.size; i++) { - if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, - &arg->items[i].key)) { - return false; - } - - if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, - &arg->items[i].value)) { - return false; - } - } - - return true; -} - -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - if (result) { - msgpack_pack_true(res); - } else { - msgpack_pack_false(res); - } -} - -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_int64(res, result); -} - -void msgpack_rpc_from_float(Float result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_double(res, result); -} - -void msgpack_rpc_from_string(String result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_bin(res, result.size); - msgpack_pack_bin_body(res, result.data, result.size); -} - -void msgpack_rpc_from_object(Object result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - switch (result.type) { - case kObjectTypeNil: - msgpack_pack_nil(res); - break; - - case kObjectTypeBoolean: - msgpack_rpc_from_boolean(result.data.boolean, res); - break; - - case kObjectTypeInteger: - msgpack_rpc_from_integer(result.data.integer, res); - break; - - case kObjectTypeFloat: - msgpack_rpc_from_float(result.data.floating, res); - break; - - case kObjectTypeString: - msgpack_rpc_from_string(result.data.string, res); - break; - - case kObjectTypeArray: - msgpack_rpc_from_array(result.data.array, res); - break; - - case kObjectTypeBuffer: - msgpack_rpc_from_buffer(result.data.buffer, res); - break; - - case kObjectTypeWindow: - msgpack_rpc_from_window(result.data.window, res); - break; - - case kObjectTypeTabpage: - msgpack_rpc_from_tabpage(result.data.tabpage, res); - break; - - case kObjectTypeDictionary: - msgpack_rpc_from_dictionary(result.data.dictionary, res); - break; - } -} - -void msgpack_rpc_from_array(Array result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_array(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_object(result.items[i], res); - } -} - -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_map(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_string(result.items[i].key, res); - msgpack_rpc_from_object(result.items[i].value, res); - } -} diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h deleted file mode 100644 index aede6b1587..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H -#define NVIM_OS_MSGPACK_RPC_HELPERS_H - -#include <stdint.h> -#include <stdbool.h> - -#include <msgpack.h> - -#include "nvim/api/private/defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H - diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c index d4fffaa053..414c8841fa 100644 --- a/src/nvim/os/provider.c +++ b/src/nvim/os/provider.c @@ -8,7 +8,7 @@ #include "nvim/api/vim.h" #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/shell.h" #include "nvim/os/os.h" #include "nvim/log.h" diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 8f1c30de50..beff404fd0 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -8,8 +8,6 @@ #include "nvim/os/uv_helpers.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/ascii.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -33,7 +31,6 @@ struct rstream { uv_file fd; rstream_cb cb; bool free_handle; - EventSource source_override; }; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -76,18 +73,14 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count) void rbuffer_produced(RBuffer *rbuffer, size_t count) { rbuffer->wpos += count; - DLOG("Received %u bytes from RStream(address: %p, source: %p)", - (size_t)cnt, - rbuffer->rstream, - rstream_event_source(rbuffer->rstream)); + DLOG("Received %u bytes from RStream(%p)", (size_t)count, rbuffer->rstream); rbuffer_relocate(rbuffer); if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { // The last read filled the buffer, stop reading for now + // rstream_stop(rbuffer->rstream); - DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", - rstream, - rstream_event_source(rstream)); + DLOG("Buffer for RStream(%p) is full, stopping it", rbuffer->rstream); } } @@ -180,13 +173,8 @@ void rbuffer_free(RBuffer *rbuffer) /// for reading with `rstream_read` /// @param buffer RBuffer instance to associate with the RStream /// @param data Some state to associate with the `RStream` instance -/// @param source_override Replacement for the default source used in events -/// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance -RStream * rstream_new(rstream_cb cb, - RBuffer *buffer, - void *data, - EventSource source_override) +RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) { RStream *rv = xmalloc(sizeof(RStream)); rv->buffer = buffer; @@ -198,7 +186,6 @@ RStream * rstream_new(rstream_cb cb, rv->fread_idle = NULL; rv->free_handle = false; rv->file_type = UV_UNKNOWN_HANDLE; - rv->source_override = source_override ? source_override : rv; return rv; } @@ -322,21 +309,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count) return rbuffer_read(rstream->buffer, buffer, count); } -/// Runs the read callback associated with the rstream -/// -/// @param event Object containing data necessary to invoke the callback -void rstream_read_event(Event event) -{ - RStream *rstream = event.data.rstream.ptr; - - rstream->cb(rstream, rstream->data, event.data.rstream.eof); -} - -EventSource rstream_event_source(RStream *rstream) -{ - return rstream->source_override; -} - // Callbacks used by libuv // Called by libuv to allocate memory for reading. @@ -357,13 +329,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt <= 0) { if (cnt != UV_ENOBUFS) { - DLOG("Closing RStream(address: %p, source: %p)", - rstream, - rstream_event_source(rstream)); + DLOG("Closing RStream(%p)", rstream); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); - emit_read_event(rstream, true); + rstream->cb(rstream, rstream->data, true); } return; } @@ -374,7 +344,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(rstream->buffer, nread); - emit_read_event(rstream, false); + rstream->cb(rstream, rstream->data, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -409,7 +379,6 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(rstream->fread_idle); - emit_read_event(rstream, true); return; } @@ -417,7 +386,6 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(rstream->buffer, nread); rstream->fpos += nread; - emit_read_event(rstream, false); } static void close_cb(uv_handle_t *handle) @@ -426,21 +394,9 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static void emit_read_event(RStream *rstream, bool eof) -{ - Event event = { - .source = rstream_event_source(rstream), - .type = kEventRStreamData, - .data.rstream = { - .ptr = rstream, - .eof = eof - } - }; - event_push(event); -} - static void rbuffer_relocate(RBuffer *rbuffer) { + assert(rbuffer->rpos <= rbuffer->wpos); // Move data ... memmove( rbuffer->data, // ...to the beginning of the buffer(rpos 0) diff --git a/src/nvim/os/server.c b/src/nvim/os/server.c deleted file mode 100644 index 9f7f5b34da..0000000000 --- a/src/nvim/os/server.c +++ /dev/null @@ -1,273 +0,0 @@ -#include <assert.h> -#include <stdlib.h> -#include <string.h> -#include <stdint.h> - -#include <uv.h> - -#include "nvim/os/channel.h" -#include "nvim/os/server.h" -#include "nvim/os/os.h" -#include "nvim/ascii.h" -#include "nvim/vim.h" -#include "nvim/memory.h" -#include "nvim/message.h" -#include "nvim/tempfile.h" -#include "nvim/map.h" -#include "nvim/path.h" - -#define MAX_CONNECTIONS 32 -#define ADDRESS_MAX_SIZE 256 -#define NEOVIM_DEFAULT_TCP_PORT 7450 -#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" - -typedef enum { - kServerTypeTcp, - kServerTypePipe -} ServerType; - -typedef struct { - // Type of the union below - ServerType type; - - // This is either a tcp server or unix socket(named pipe on windows) - union { - struct { - uv_tcp_t handle; - struct sockaddr_in addr; - } tcp; - struct { - uv_pipe_t handle; - char addr[ADDRESS_MAX_SIZE]; - } pipe; - } socket; -} Server; - -static PMap(cstr_t) *servers = NULL; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.c.generated.h" -#endif - -/// Initializes the module -bool server_init(void) -{ - servers = pmap_new(cstr_t)(); - - if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) { - char *listen_address = (char *)vim_tempname(); - os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1); - free(listen_address); - } - - return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0; -} - -/// Teardown the server module -void server_teardown(void) -{ - if (!servers) { - return; - } - - Server *server; - - map_foreach_value(servers, server, { - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - }); -} - -/// Starts listening on arbitrary tcp/unix addresses specified by -/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will -/// be determined by parsing `endpoint`: If it's a valid tcp address in the -/// 'ip[:port]' format, then it will be tcp socket. The port is optional -/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will -/// be a unix socket or named pipe. -/// -/// @param endpoint Address of the server. Either a 'ip[:port]' string or an -/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or -/// named pipe. -/// @returns zero if successful, one on a regular error, and negative errno -/// on failure to bind or connect. -int server_start(const char *endpoint) - FUNC_ATTR_NONNULL_ALL -{ - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { - // TODO(aktau): since this is not what the user wanted, perhaps we - // should return an error here - EMSG2("Address was too long, truncated to %s", addr); - } - - // Check if the server already exists - if (pmap_has(cstr_t)(servers, addr)) { - EMSG2("Already listening on %s", addr); - return 1; - } - - ServerType server_type = kServerTypeTcp; - Server *server = xmalloc(sizeof(Server)); - char ip[16], *ip_end = strrchr(addr, ':'); - - if (!ip_end) { - ip_end = strchr(addr, NUL); - } - - uint32_t addr_len = ip_end - addr; - - if (addr_len > sizeof(ip) - 1) { - // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) - addr_len = sizeof(ip) - 1; - } - - // Extract the address part - xstrlcpy(ip, addr, addr_len + 1); - - int port = NEOVIM_DEFAULT_TCP_PORT; - - if (*ip_end == ':') { - // Extract the port - long lport = strtol(ip_end + 1, NULL, 10); // NOLINT - if (lport <= 0 || lport > 0xffff) { - // Invalid port, treat as named pipe or unix socket - server_type = kServerTypePipe; - } else { - port = (int) lport; - } - } - - if (server_type == kServerTypeTcp) { - // Try to parse ip address - if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { - // Invalid address, treat as named pipe or unix socket - server_type = kServerTypePipe; - } - } - - int result; - - if (server_type == kServerTypeTcp) { - // Listen on tcp address/port - uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); - server->socket.tcp.handle.data = server; - result = uv_tcp_bind(&server->socket.tcp.handle, - (const struct sockaddr *)&server->socket.tcp.addr, - 0); - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, - MAX_CONNECTIONS, - connection_cb); - if (result) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } - } - } else { - // Listen on named pipe or unix socket - xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr)); - uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); - server->socket.pipe.handle.data = server; - result = uv_pipe_bind(&server->socket.pipe.handle, - server->socket.pipe.addr); - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.pipe.handle, - MAX_CONNECTIONS, - connection_cb); - - if (result) { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - } - } - - assert(result <= 0); // libuv should have returned -errno or zero. - if (result < 0) { - if (result == -EACCES) { - // Libuv converts ENOENT to EACCES for Windows compatibility, but if - // the parent directory does not exist, ENOENT would be more accurate. - *path_tail((char_u *) addr) = NUL; - if (!os_file_exists((char_u *) addr)) { - result = -ENOENT; - } - } - EMSG2("Failed to start server: %s", uv_strerror(result)); - free(server); - return result; - } - - server->type = server_type; - - // Add the server to the hash table - pmap_put(cstr_t)(servers, addr, server); - - return 0; -} - -/// Stops listening on the address specified by `endpoint`. -/// -/// @param endpoint Address of the server. -void server_stop(char *endpoint) -{ - Server *server; - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - xstrlcpy(addr, endpoint, sizeof(addr)); - - if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) { - EMSG2("Not listening on %s", addr); - return; - } - - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - - pmap_del(cstr_t)(servers, addr); -} - -static void connection_cb(uv_stream_t *server, int status) -{ - int result; - uv_stream_t *client; - Server *srv = server->data; - - if (status < 0) { - abort(); - } - - if (srv->type == kServerTypeTcp) { - client = xmalloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); - } else { - client = xmalloc(sizeof(uv_pipe_t)); - uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); - } - - result = uv_accept(server, client); - - if (result) { - EMSG2("Failed to accept connection: %s", uv_strerror(result)); - uv_close((uv_handle_t *)client, free_client); - return; - } - - channel_from_stream(client); -} - -static void free_client(uv_handle_t *handle) -{ - free(handle); -} - -static void free_server(uv_handle_t *handle) -{ - free(handle->data); -} diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h deleted file mode 100644 index 43592a91e4..0000000000 --- a/src/nvim/os/server.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_H -#define NVIM_OS_SERVER_H - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.h.generated.h" -#endif -#endif // NVIM_OS_SERVER_H diff --git a/src/nvim/os/server_defs.h b/src/nvim/os/server_defs.h deleted file mode 100644 index 08cdf55428..0000000000 --- a/src/nvim/os/server_defs.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_DEFS_H -#define NVIM_OS_SERVER_DEFS_H - -typedef struct server Server; - -#endif // NVIM_OS_SERVER_DEFS_H - diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 453cc6d605..d5464f7975 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -1,4 +1,5 @@ #include <string.h> +#include <assert.h> #include <stdbool.h> #include <stdlib.h> @@ -7,6 +8,7 @@ #include "nvim/ascii.h" #include "nvim/lib/kvec.h" #include "nvim/log.h" +#include "nvim/os/event.h" #include "nvim/os/job.h" #include "nvim/os/rstream.h" #include "nvim/os/shell.h" @@ -58,11 +60,11 @@ typedef struct { /// `shell_free_argv` when no longer needed. char **shell_build_argv(const char_u *cmd, const char_u *extra_shell_opt) { - int argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL); + size_t argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL); char **rv = xmalloc((unsigned)((argc + 4) * sizeof(char *))); // Split 'shell' - int i = tokenize(p_sh, rv); + size_t i = tokenize(p_sh, rv); if (extra_shell_opt != NULL) { // Push a copy of `extra_shell_opt` @@ -212,7 +214,7 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg) // Keep running the loop until all three handles are completely closed while (pdata.exited < expected_exits) { - uv_run(uv_default_loop(), UV_RUN_ONCE); + event_poll(0); if (got_int) { // Forward SIGINT to the shell @@ -356,9 +358,9 @@ static void system_data_cb(RStream *rstream, void *data, bool eof) /// @param argv The vector that will be filled with copies of the parsed /// words. It can be NULL if the caller only needs to count words. /// @return The number of words parsed. -static int tokenize(const char_u *str, char **argv) +static size_t tokenize(const char_u *str, char **argv) { - int argc = 0, len; + size_t argc = 0, len; char_u *p = (char_u *) str; while (*p != NUL) { @@ -383,11 +385,11 @@ static int tokenize(const char_u *str, char **argv) /// /// @param str A pointer to the first character of the word /// @return The offset from `str` at which the word ends. -static int word_length(const char_u *str) +static size_t word_length(const char_u *str) { const char_u *p = str; bool inquote = false; - int length = 0; + size_t length = 0; // Move `p` to the end of shell word by advancing the pointer while it's // inside a quote or it's a non-whitespace character @@ -418,15 +420,15 @@ static void write_selection(uv_write_t *req) // TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and // only after filled we should start allocating memory(skip unnecessary // allocations for small writes) - int buflen = BUFFER_LENGTH; + size_t buflen = BUFFER_LENGTH; pdata->wbuffer = (char *)xmalloc(buflen); uv_buf_t uvbuf; linenr_T lnum = curbuf->b_op_start.lnum; - int off = 0; - int written = 0; + size_t off = 0; + size_t written = 0; char_u *lp = ml_get(lnum); - int l; - int len; + size_t l; + size_t len; for (;;) { l = strlen((char *)lp + written); @@ -443,7 +445,7 @@ static void write_selection(uv_write_t *req) pdata->wbuffer[off++] = NUL; } else { char_u *s = vim_strchr(lp + written, NL); - len = s == NULL ? l : s - (lp + written); + len = s == NULL ? l : (size_t)(s - (lp + written)); while (off + len >= buflen) { // Resize the buffer buflen *= 2; @@ -584,6 +586,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { ProcessData *data = (ProcessData *)proc->data; data->exited++; - data->exit_status = status; + assert(status <= INT_MAX); + data->exit_status = (int)status; uv_close((uv_handle_t *)proc, NULL); } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 2f93cfb08a..36f7b37c48 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -12,8 +12,6 @@ #include "nvim/memory.h" #include "nvim/misc1.h" #include "nvim/misc2.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/os/signal.h" static uv_signal_t sint, spipe, shup, squit, sterm, swinch; @@ -72,45 +70,6 @@ void signal_accept_deadly(void) rejecting_deadly = false; } -void signal_handle(Event event) -{ - int signum = event.data.signum; - - switch (signum) { - case SIGINT: - got_int = true; - break; -#ifdef SIGPWR - case SIGPWR: - // Signal of a power failure(eg batteries low), flush the swap files to - // be safe - ml_sync_all(false, false); - break; -#endif - case SIGPIPE: - // Ignore - break; - case SIGWINCH: - shell_resized(); - break; - case SIGTERM: - case SIGQUIT: - case SIGHUP: - if (!rejecting_deadly) { - deadly_signal(signum); - } - break; - default: - fprintf(stderr, "Invalid signal %d", signum); - break; - } -} - -EventSource signal_event_source(void) -{ - return &sint; -} - static char * signal_name(int signum) { switch (signum) { @@ -154,20 +113,32 @@ static void deadly_signal(int signum) static void signal_cb(uv_signal_t *handle, int signum) { - if (rejecting_deadly) { - if (signum == SIGINT) { + switch (signum) { + case SIGINT: got_int = true; - } - - return; + break; +#ifdef SIGPWR + case SIGPWR: + // Signal of a power failure(eg batteries low), flush the swap files to + // be safe + ml_sync_all(false, false); + break; +#endif + case SIGPIPE: + // Ignore + break; + case SIGWINCH: + shell_resized(); + break; + case SIGTERM: + case SIGQUIT: + case SIGHUP: + if (!rejecting_deadly) { + deadly_signal(signum); + } + break; + default: + fprintf(stderr, "Invalid signal %d", signum); + break; } - - Event event = { - .source = signal_event_source(), - .type = kEventSignal, - .data = { - .signum = signum - } - }; - event_push(event); } diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index e3b76ac833..a4871ef499 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -1,3 +1,4 @@ +#include <assert.h> #include <stdint.h> #include <stdbool.h> #include <time.h> @@ -64,23 +65,6 @@ void os_microdelay(uint64_t microseconds, bool ignoreinput) } } -static void microdelay(uint64_t microseconds) -{ - uint64_t hrtime; - int64_t ns = microseconds * 1000; // convert to nanoseconds - - uv_mutex_lock(&delay_mutex); - - while (ns > 0) { - hrtime = uv_hrtime(); - if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns) == UV_ETIMEDOUT) - break; - ns -= uv_hrtime() - hrtime; - } - - uv_mutex_unlock(&delay_mutex); -} - /// Portable version of POSIX localtime_r() /// /// @return NULL in case of error @@ -112,3 +96,23 @@ struct tm *os_get_localtime(struct tm *result) FUNC_ATTR_NONNULL_ALL time_t rawtime = time(NULL); return os_localtime_r(&rawtime, result); } + +static void microdelay(uint64_t microseconds) +{ + uint64_t elapsed = 0; + uint64_t ns = microseconds * 1000; // convert to nanoseconds + uint64_t base = uv_hrtime(); + + uv_mutex_lock(&delay_mutex); + + while (elapsed < ns) { + if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns - elapsed) + == UV_ETIMEDOUT) + break; + uint64_t now = uv_hrtime(); + elapsed += now - base; + base = now; + } + + uv_mutex_unlock(&delay_mutex); +} |