From b280308ac649da61e2a0f40a222eae21af5352c9 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 07:35:10 -0300 Subject: msgpack-rpc: Create subdirectory for msgpack-rpc modules Create the msgpack_rpc subdirectory and move all modules that deal with msgpack-rpc to it. Also merge msgpack_rpc.c into msgpack_rpc/helpers.c --- src/nvim/CMakeLists.txt | 8 +- src/nvim/api/vim.c | 2 +- src/nvim/eval.c | 2 +- src/nvim/main.c | 2 +- src/nvim/map.c | 2 +- src/nvim/map.h | 2 +- src/nvim/msgpack_rpc/channel.c | 597 +++++++++++++++++++++++++++++++++++++ src/nvim/msgpack_rpc/channel.h | 15 + src/nvim/msgpack_rpc/defs.h | 34 +++ src/nvim/msgpack_rpc/helpers.c | 463 +++++++++++++++++++++++++++++ src/nvim/msgpack_rpc/helpers.h | 17 ++ src/nvim/msgpack_rpc/server.c | 273 +++++++++++++++++ src/nvim/msgpack_rpc/server.h | 7 + src/nvim/os/channel.c | 598 -------------------------------------- src/nvim/os/channel.h | 15 - src/nvim/os/event.c | 12 +- src/nvim/os/msgpack_rpc.c | 188 ------------ src/nvim/os/msgpack_rpc.h | 51 ---- src/nvim/os/msgpack_rpc_helpers.c | 289 ------------------ src/nvim/os/msgpack_rpc_helpers.h | 16 - src/nvim/os/provider.c | 2 +- src/nvim/os/server.c | 273 ----------------- src/nvim/os/server.h | 7 - src/nvim/os/server_defs.h | 7 - src/nvim/os_unix.c | 6 +- 25 files changed, 1427 insertions(+), 1461 deletions(-) create mode 100644 src/nvim/msgpack_rpc/channel.c create mode 100644 src/nvim/msgpack_rpc/channel.h create mode 100644 src/nvim/msgpack_rpc/defs.h create mode 100644 src/nvim/msgpack_rpc/helpers.c create mode 100644 src/nvim/msgpack_rpc/helpers.h create mode 100644 src/nvim/msgpack_rpc/server.c create mode 100644 src/nvim/msgpack_rpc/server.h delete mode 100644 src/nvim/os/channel.c delete mode 100644 src/nvim/os/channel.h delete mode 100644 src/nvim/os/msgpack_rpc.c delete mode 100644 src/nvim/os/msgpack_rpc.h delete mode 100644 src/nvim/os/msgpack_rpc_helpers.c delete mode 100644 src/nvim/os/msgpack_rpc_helpers.h delete mode 100644 src/nvim/os/server.c delete mode 100644 src/nvim/os/server.h delete mode 100644 src/nvim/os/server_defs.h (limited to 'src') diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index 208df31596..1cedeebb37 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -3,7 +3,7 @@ include(CheckLibraryExists) set(GENERATED_DIR ${PROJECT_BINARY_DIR}/src/nvim/auto) set(DISPATCH_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/msgpack-gen.lua) file(GLOB API_HEADERS api/*.h) -set(MSGPACK_RPC_HEADER ${PROJECT_SOURCE_DIR}/src/nvim/os/msgpack_rpc.h) +file(GLOB MSGPACK_RPC_HEADERS msgpack_rpc/*.h) set(MSGPACK_DISPATCH ${GENERATED_DIR}/msgpack_dispatch.c) set(HEADER_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/gendeclarations.lua) set(GENERATED_INCLUDES_DIR ${PROJECT_BINARY_DIR}/include) @@ -19,12 +19,14 @@ file(MAKE_DIRECTORY ${GENERATED_DIR}) file(MAKE_DIRECTORY ${GENERATED_DIR}/os) file(MAKE_DIRECTORY ${GENERATED_DIR}/api) file(MAKE_DIRECTORY ${GENERATED_DIR}/api/private) +file(MAKE_DIRECTORY ${GENERATED_DIR}/msgpack_rpc) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/os) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api/private) +file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/msgpack_rpc) -file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c) +file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c msgpack_rpc/*.c) file(GLOB_RECURSE NEOVIM_HEADERS *.h) foreach(sfile ${NEOVIM_SOURCES}) @@ -126,7 +128,7 @@ add_custom_command(OUTPUT ${MSGPACK_DISPATCH} COMMAND ${LUA_PRG} ${DISPATCH_GENERATOR} ${API_HEADERS} ${MSGPACK_DISPATCH} DEPENDS ${API_HEADERS} - ${MSGPACK_RPC_HEADER} + ${MSGPACK_RPC_HEADERS} ${DISPATCH_GENERATOR} ) diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index 5e0f3e0c32..c7b5b1cfbf 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -10,7 +10,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/buffer.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/provider.h" #include "nvim/vim.h" #include "nvim/buffer.h" diff --git a/src/nvim/eval.c b/src/nvim/eval.c index e3bd37a03f..c8f0799d5a 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -81,7 +81,7 @@ #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/time.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/os/dl.h" diff --git a/src/nvim/main.c b/src/nvim/main.c index 128d1a784c..a63ffb4a31 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -59,7 +59,7 @@ #include "nvim/os/input.h" #include "nvim/os/os.h" #include "nvim/os/signal.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/private/helpers.h" diff --git a/src/nvim/map.c b/src/nvim/map.c index 24aa38d67d..24a869e2e6 100644 --- a/src/nvim/map.c +++ b/src/nvim/map.c @@ -6,7 +6,7 @@ #include "nvim/map_defs.h" #include "nvim/vim.h" #include "nvim/memory.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/msgpack_rpc/defs.h" #include "nvim/lib/khash.h" diff --git a/src/nvim/map.h b/src/nvim/map.h index 616516c3e1..78f4218a72 100644 --- a/src/nvim/map.h +++ b/src/nvim/map.h @@ -5,7 +5,7 @@ #include "nvim/map_defs.h" #include "nvim/api/private/defs.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/msgpack_rpc/defs.h" #define MAP_DECLS(T, U) \ KHASH_DECLARE(T##_##U##_map, T, U) \ diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c new file mode 100644 index 0000000000..dcd7e41737 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel.c @@ -0,0 +1,597 @@ +#include +#include +#include + +#include +#include + +#include "nvim/api/private/helpers.h" +#include "nvim/api/vim.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/os/event.h" +#include "nvim/os/rstream.h" +#include "nvim/os/rstream_defs.h" +#include "nvim/os/wstream.h" +#include "nvim/os/wstream_defs.h" +#include "nvim/os/job.h" +#include "nvim/os/job_defs.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/vim.h" +#include "nvim/ascii.h" +#include "nvim/memory.h" +#include "nvim/os_unix.h" +#include "nvim/message.h" +#include "nvim/term.h" +#include "nvim/map.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/lib/kvec.h" + +#define CHANNEL_BUFFER_SIZE 0xffff + +typedef struct { + uint64_t request_id; + bool errored; + Object result; +} ChannelCallFrame; + +typedef struct { + uint64_t id; + PMap(cstr_t) *subscribed_events; + bool is_job, enabled; + msgpack_unpacker *unpacker; + union { + Job *job; + struct { + RStream *read; + WStream *write; + uv_stream_t *uv; + } streams; + } data; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; + size_t rpc_call_level; +} Channel; + +static uint64_t next_id = 1; +static PMap(uint64_t) *channels = NULL; +static PMap(cstr_t) *event_strings = NULL; +static msgpack_sbuffer out_buffer; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/channel.c.generated.h" +#endif + +/// Initializes the module +void channel_init(void) +{ + channels = pmap_new(uint64_t)(); + event_strings = pmap_new(cstr_t)(); + msgpack_sbuffer_init(&out_buffer); + + if (embedded_mode) { + channel_from_stdio(); + } +} + +/// Teardown the module +void channel_teardown(void) +{ + if (!channels) { + return; + } + + Channel *channel; + + map_foreach_value(channels, channel, { + close_channel(channel); + }); +} + +/// Creates an API channel by starting a job and connecting to its +/// stdin/stdout. stderr is forwarded to the editor error stream. +/// +/// @param argv The argument vector for the process +/// @return The channel id +uint64_t channel_from_job(char **argv) +{ + Channel *channel = register_channel(); + channel->is_job = true; + + int status; + channel->data.job = job_start(argv, + channel, + job_out, + job_err, + NULL, + 0, + &status); + + if (status <= 0) { + close_channel(channel); + return 0; + } + + return channel->id; +} + +/// Creates an API channel from a libuv stream representing a tcp or +/// pipe/socket client connection +/// +/// @param stream The established connection +void channel_from_stream(uv_stream_t *stream) +{ + Channel *channel = register_channel(); + stream->data = NULL; + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(parse_msgpack, + rbuffer_new(CHANNEL_BUFFER_SIZE), + channel, + NULL); + rstream_set_stream(channel->data.streams.read, stream); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(0); + wstream_set_stream(channel->data.streams.write, stream); + channel->data.streams.uv = stream; +} + +bool channel_exists(uint64_t id) +{ + Channel *channel; + return (channel = pmap_get(uint64_t)(channels, id)) != NULL + && channel->enabled; +} + +/// Sends event/arguments to channel +/// +/// @param id The channel id. If 0, the event will be sent to all +/// channels that have subscribed to the event type +/// @param name The event name, an arbitrary string +/// @param args Array with event arguments +/// @return True if the event was sent successfully, false otherwise. +bool channel_send_event(uint64_t id, char *name, Array args) +{ + Channel *channel = NULL; + + if (id > 0) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + api_free_array(args); + return false; + } + send_event(channel, name, args); + } else { + broadcast_event(name, args); + } + + return true; +} + +/// Sends a method call to a channel +/// +/// @param id The channel id +/// @param method_name The method name, an arbitrary string +/// @param args Array with method arguments +/// @param[out] error True if the return value is an error +/// @return Whatever the remote method returned +Object channel_send_call(uint64_t id, + char *method_name, + Array args, + Error *err) +{ + Channel *channel = NULL; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); + api_free_array(args); + return NIL; + } + + if (kv_size(channel->call_stack) > 20) { + // 20 stack depth is more than anyone should ever need for RPC calls + api_set_error(err, + Exception, + _("Channel %" PRIu64 " crossed maximum stack depth"), + channel->id); + api_free_array(args); + return NIL; + } + + uint64_t request_id = channel->next_request_id++; + // Send the msgpack-rpc request + send_request(channel, request_id, method_name, args); + + EventSource channel_source = channel->is_job + ? job_event_source(channel->data.job) + : rstream_event_source(channel->data.streams.read); + EventSource sources[] = {channel_source, NULL}; + + // Push the frame + ChannelCallFrame frame = {request_id, false, NIL}; + kv_push(ChannelCallFrame *, channel->call_stack, &frame); + size_t size = kv_size(channel->call_stack); + + do { + event_poll(-1, sources); + } while ( + // Continue running if ... + channel->enabled && // the channel is still enabled + kv_size(channel->call_stack) >= size); // the call didn't return + + if (frame.errored) { + api_set_error(err, Exception, "%s", frame.result.data.string.data); + return NIL; + } + + return frame.result; +} + +/// Subscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_subscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + abort(); + } + + char *event_string = pmap_get(cstr_t)(event_strings, event); + + if (!event_string) { + event_string = xstrdup(event); + pmap_put(cstr_t)(event_strings, event_string, event_string); + } + + pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); +} + +/// Unsubscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_unsubscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + abort(); + } + + unsubscribe(channel, event); +} + +/// Closes a channel +/// +/// @param id The channel id +/// @return true if successful, false otherwise +bool channel_close(uint64_t id) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + return false; + } + + channel_kill(channel); + channel->enabled = false; + return true; +} + +/// Creates an API channel from stdin/stdout. This is used when embedding +/// Neovim +static void channel_from_stdio(void) +{ + Channel *channel = register_channel(); + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(parse_msgpack, + rbuffer_new(CHANNEL_BUFFER_SIZE), + channel, + NULL); + rstream_set_file(channel->data.streams.read, 0); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(0); + wstream_set_file(channel->data.streams.write, 1); + channel->data.streams.uv = NULL; +} + +static void job_out(RStream *rstream, void *data, bool eof) +{ + Job *job = data; + parse_msgpack(rstream, job_data(job), eof); +} + +static void job_err(RStream *rstream, void *data, bool eof) +{ + size_t count; + char buf[256]; + Channel *channel = job_data(data); + + while ((count = rstream_pending(rstream))) { + size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); + buf[read] = NUL; + ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); + } +} + +static void parse_msgpack(RStream *rstream, void *data, bool eof) +{ + Channel *channel = data; + channel->rpc_call_level++; + + if (eof) { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed by the client", + channel->id); + call_set_error(channel, buf); + goto end; + } + + uint32_t count = rstream_pending(rstream); + DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + count, + rstream); + + // Feed the unpacker with data + msgpack_unpacker_reserve_buffer(channel->unpacker, count); + rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); + msgpack_unpacker_buffer_consumed(channel->unpacker, count); + + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + msgpack_unpack_return result; + + // Deserialize everything we can. + while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { + if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + if (is_valid_rpc_response(&unpacked.data, channel)) { + call_stack_pop(&unpacked.data, channel); + } else { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " returned a response that doesn't have " + " a matching id for the current RPC call. Ensure the client " + " is properly synchronized", + channel->id); + call_set_error(channel, buf); + } + msgpack_unpacked_destroy(&unpacked); + // Bail out from this event loop iteration + goto end; + } + + // Perform the call + WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); + // write the response + if (!channel_write(channel, resp)) { + goto end; + } + } + + if (result == MSGPACK_UNPACK_NOMEM_ERROR) { + OUT_STR(e_outofmem); + out_char('\n'); + preserve_exit(); + } + + if (result == MSGPACK_UNPACK_PARSE_ERROR) { + // See src/msgpack/unpack_template.h in msgpack source tree for + // causes for this error(search for 'goto _failed') + // + // A not so uncommon cause for this might be deserializing objects with + // a high nesting level: msgpack will break when it's internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + send_error(channel, 0, "Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting"); + } + +end: + channel->rpc_call_level--; + if (!channel->enabled && !kv_size(channel->call_stack)) { + // Now it's safe to destroy the channel + close_channel(channel); + } +} + +static bool channel_write(Channel *channel, WBuffer *buffer) +{ + bool success; + + if (channel->is_job) { + success = job_write(channel->data.job, buffer); + } else { + success = wstream_write(channel->data.streams.write, buffer); + } + + if (!success) { + // If the write failed for any reason, close the channel + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed due to a failed write", + channel->id); + call_set_error(channel, buf); + } + + return success; +} + +static void send_error(Channel *channel, uint64_t id, char *err) +{ + Error e = ERROR_INIT; + api_set_error(&e, Exception, "%s", err); + channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); +} + +static void send_request(Channel *channel, + uint64_t id, + char *name, + Array args) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); +} + +static void send_event(Channel *channel, + char *name, + Array args) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); +} + +static void broadcast_event(char *name, Array args) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + api_free_array(args); + goto end; + } + + String method = {.size = strlen(name), .data = name}; + WBuffer *buffer = serialize_request(0, + method, + args, + &out_buffer, + kv_size(subscribed)); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + channel_write(kv_A(subscribed, i), buffer); + } + +end: + kv_destroy(subscribed); +} + +static void unsubscribe(Channel *channel, char *event) +{ + char *event_string = pmap_get(cstr_t)(event_strings, event); + pmap_del(cstr_t)(channel->subscribed_events, event_string); + + map_foreach_value(channels, channel, { + if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + return; + } + }); + + // Since the string is no longer used by other channels, release it's memory + pmap_del(cstr_t)(event_strings, event_string); + free(event_string); +} + +static void close_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + channel_kill(channel); + + free(channel); +} + +static void channel_kill(Channel *channel) +{ + if (channel->is_job) { + if (channel->data.job) { + job_stop(channel->data.job); + } + } else { + rstream_free(channel->data.streams.read); + wstream_free(channel->data.streams.write); + if (channel->data.streams.uv) { + uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + } else { + // When the stdin channel closes, it's time to go + mch_exit(0); + } + } +} + +static void close_cb(uv_handle_t *handle) +{ + free(handle->data); + free(handle); +} + +static Channel *register_channel(void) +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->enabled = true; + rv->rpc_call_level = 0; + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->id = next_id++; + rv->subscribed_events = pmap_new(cstr_t)(); + rv->next_request_id = 1; + kv_init(rv->call_stack); + pmap_put(uint64_t)(channels, rv->id, rv); + return rv; +} + +static bool is_rpc_response(msgpack_object *obj) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 4 + && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.array.ptr[0].via.u64 == 1 + && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; +} + +static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) +{ + uint64_t response_id = obj->via.array.ptr[1].via.u64; + // Must be equal to the frame at the stack's bottom + return response_id == kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1)->request_id; +} + +static void call_stack_pop(msgpack_object *obj, Channel *channel) +{ + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; + + if (frame->errored) { + msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); + } else { + msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); + } +} + +static void call_set_error(Channel *channel, char *msg) +{ + for (size_t i = 0; i < kv_size(channel->call_stack); i++) { + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = true; + frame->result = STRING_OBJ(cstr_to_string(msg)); + } + + channel->enabled = false; +} diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h new file mode 100644 index 0000000000..df742fe368 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel.h @@ -0,0 +1,15 @@ +#ifndef NVIM_MSGPACK_RPC_CHANNEL_H +#define NVIM_MSGPACK_RPC_CHANNEL_H + +#include +#include + +#include "nvim/api/private/defs.h" +#include "nvim/vim.h" + +#define METHOD_MAXLEN 512 + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/channel.h.generated.h" +#endif +#endif // NVIM_MSGPACK_RPC_CHANNEL_H diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h new file mode 100644 index 0000000000..5eec4ced54 --- /dev/null +++ b/src/nvim/msgpack_rpc/defs.h @@ -0,0 +1,34 @@ +#ifndef NVIM_MSGPACK_RPC_DEFS_H +#define NVIM_MSGPACK_RPC_DEFS_H + +#include + + +/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores +/// functions of this type. +typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, + msgpack_object *req, + Error *error); + +/// Initializes the msgpack-rpc method table +void msgpack_rpc_init_method_table(void); + +void msgpack_rpc_init_function_metadata(Dictionary *metadata); + +/// Dispatches to the actual API function after basic payload validation by +/// `msgpack_rpc_call`. It is responsible for validating/converting arguments +/// to C types, and converting the return value back to msgpack types. +/// The implementation is generated at compile time with metadata extracted +/// from the api/*.h headers, +/// +/// @param channel_id The channel id +/// @param method_id The method id +/// @param req The parsed request object +/// @param error Pointer to error structure +/// @return Some object +Object msgpack_rpc_dispatch(uint64_t channel_id, + msgpack_object *req, + Error *error) + FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); + +#endif // NVIM_MSGPACK_RPC_DEFS_H diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c new file mode 100644 index 0000000000..4b96e4985e --- /dev/null +++ b/src/nvim/msgpack_rpc/helpers.c @@ -0,0 +1,463 @@ +#include +#include +#include + +#include + +#include "nvim/api/private/helpers.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/defs.h" +#include "nvim/vim.h" +#include "nvim/log.h" +#include "nvim/memory.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/helpers.c.generated.h" +#endif + +static msgpack_zone zone; +static msgpack_sbuffer sbuffer; + +#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ + bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ + FUNC_ATTR_NONNULL_ALL \ + { \ + if (obj->type != MSGPACK_OBJECT_EXT \ + || obj->via.ext.type != kObjectType##t) { \ + return false; \ + } \ + \ + msgpack_object data; \ + msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \ + obj->via.ext.size, \ + NULL, \ + &zone, \ + &data); \ + \ + if (ret != MSGPACK_UNPACK_SUCCESS) { \ + return false; \ + } \ + \ + *arg = data.via.u64; \ + return true; \ + } \ + \ + void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \ + FUNC_ATTR_NONNULL_ARG(2) \ + { \ + msgpack_packer pac; \ + msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ + msgpack_pack_uint64(&pac, o); \ + msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ + msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ + msgpack_sbuffer_clear(&sbuffer); \ + } + +void msgpack_rpc_helpers_init(void) +{ + msgpack_zone_init(&zone, 0xfff); + msgpack_sbuffer_init(&sbuffer); +} + +HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer) +HANDLE_TYPE_CONVERSION_IMPL(Window, window) +HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage) + +bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) + FUNC_ATTR_NONNULL_ALL +{ + *arg = obj->via.boolean; + return obj->type == MSGPACK_OBJECT_BOOLEAN; +} + +bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.u64 <= INT64_MAX) { + *arg = (int64_t)obj->via.u64; + return true; + } + + *arg = obj->via.i64; + return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; +} + +bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) + FUNC_ATTR_NONNULL_ALL +{ + *arg = obj->via.dec; + return obj->type == MSGPACK_OBJECT_DOUBLE; +} + +bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) { + arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size); + arg->size = obj->via.bin.size; + } else { + return false; + } + + return true; +} + +bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) + FUNC_ATTR_NONNULL_ALL +{ + switch (obj->type) { + case MSGPACK_OBJECT_NIL: + arg->type = kObjectTypeNil; + return true; + + case MSGPACK_OBJECT_BOOLEAN: + arg->type = kObjectTypeBoolean; + return msgpack_rpc_to_boolean(obj, &arg->data.boolean); + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + arg->type = kObjectTypeInteger; + return msgpack_rpc_to_integer(obj, &arg->data.integer); + + case MSGPACK_OBJECT_DOUBLE: + arg->type = kObjectTypeFloat; + return msgpack_rpc_to_float(obj, &arg->data.floating); + + case MSGPACK_OBJECT_BIN: + case MSGPACK_OBJECT_STR: + arg->type = kObjectTypeString; + return msgpack_rpc_to_string(obj, &arg->data.string); + + case MSGPACK_OBJECT_ARRAY: + arg->type = kObjectTypeArray; + return msgpack_rpc_to_array(obj, &arg->data.array); + + case MSGPACK_OBJECT_MAP: + arg->type = kObjectTypeDictionary; + return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); + + case MSGPACK_OBJECT_EXT: + switch (obj->via.ext.type) { + case kObjectTypeBuffer: + return msgpack_rpc_to_buffer(obj, &arg->data.buffer); + case kObjectTypeWindow: + return msgpack_rpc_to_window(obj, &arg->data.window); + case kObjectTypeTabpage: + return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); + } + default: + return false; + } +} + +bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type != MSGPACK_OBJECT_ARRAY) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.array.size, sizeof(Object)); + + for (uint32_t i = 0; i < obj->via.array.size; i++) { + if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { + return false; + } + } + + return true; +} + +bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type != MSGPACK_OBJECT_MAP) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); + + + for (uint32_t i = 0; i < obj->via.map.size; i++) { + if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, + &arg->items[i].key)) { + return false; + } + + if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, + &arg->items[i].value)) { + return false; + } + } + + return true; +} + +void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + if (result) { + msgpack_pack_true(res); + } else { + msgpack_pack_false(res); + } +} + +void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_int64(res, result); +} + +void msgpack_rpc_from_float(Float result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_double(res, result); +} + +void msgpack_rpc_from_string(String result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_bin(res, result.size); + msgpack_pack_bin_body(res, result.data, result.size); +} + +void msgpack_rpc_from_object(Object result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + switch (result.type) { + case kObjectTypeNil: + msgpack_pack_nil(res); + break; + + case kObjectTypeBoolean: + msgpack_rpc_from_boolean(result.data.boolean, res); + break; + + case kObjectTypeInteger: + msgpack_rpc_from_integer(result.data.integer, res); + break; + + case kObjectTypeFloat: + msgpack_rpc_from_float(result.data.floating, res); + break; + + case kObjectTypeString: + msgpack_rpc_from_string(result.data.string, res); + break; + + case kObjectTypeArray: + msgpack_rpc_from_array(result.data.array, res); + break; + + case kObjectTypeBuffer: + msgpack_rpc_from_buffer(result.data.buffer, res); + break; + + case kObjectTypeWindow: + msgpack_rpc_from_window(result.data.window, res); + break; + + case kObjectTypeTabpage: + msgpack_rpc_from_tabpage(result.data.tabpage, res); + break; + + case kObjectTypeDictionary: + msgpack_rpc_from_dictionary(result.data.dictionary, res); + break; + } +} + +void msgpack_rpc_from_array(Array result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_array(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_object(result.items[i], res); + } +} + +void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_map(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_string(result.items[i].key, res); + msgpack_rpc_from_object(result.items[i].value, res); + } +} + +/// Validates the basic structure of the msgpack-rpc call and fills `res` +/// with the basic response structure. +/// +/// @param channel_id The channel id +/// @param req The parsed request object +/// @param res A packer that contains the response +WBuffer *msgpack_rpc_call(uint64_t channel_id, + msgpack_object *req, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2) + FUNC_ATTR_NONNULL_ARG(3) +{ + uint64_t response_id; + Error error = ERROR_INIT; + msgpack_rpc_validate(&response_id, req, &error); + + if (error.set) { + return serialize_response(response_id, &error, NIL, sbuffer); + } + + // dispatch the call + Object rv = msgpack_rpc_dispatch(channel_id, req, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); + + if (error.set) { + ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", + error.msg, + response_id); + return serialize_response(response_id, &error, NIL, sbuffer); + } + + DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", + response_id); + return serialize_response(response_id, &error, rv, sbuffer); +} + +/// Finishes the msgpack-rpc call with an error message. +/// +/// @param msg The error message +/// @param res A packer that contains the response +void msgpack_rpc_error(char *msg, msgpack_packer *res) + FUNC_ATTR_NONNULL_ALL +{ + size_t len = strlen(msg); + + // error message + msgpack_pack_bin(res, len); + msgpack_pack_bin_body(res, msg, len); + // Nil result + msgpack_pack_nil(res); +} + +/// Handler executed when an invalid method name is passed +Object msgpack_rpc_handle_missing_method(uint64_t channel_id, + msgpack_object *req, + Error *error) +{ + snprintf(error->msg, sizeof(error->msg), "Invalid method name"); + error->set = true; + return NIL; +} + +/// Serializes a msgpack-rpc request or notification(id == 0) +WBuffer *serialize_request(uint64_t request_id, + String method, + Array args, + msgpack_sbuffer *sbuffer, + size_t refcount) + FUNC_ATTR_NONNULL_ARG(4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, request_id ? 4 : 3); + msgpack_pack_int(&pac, request_id ? 0 : 2); + + if (request_id) { + msgpack_pack_uint64(&pac, request_id); + } + + msgpack_pack_bin(&pac, method.size); + msgpack_pack_bin_body(&pac, method.data, method.size); + msgpack_rpc_from_array(args, &pac); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + refcount, + free); + api_free_array(args); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +/// Serializes a msgpack-rpc response +WBuffer *serialize_response(uint64_t response_id, + Error *err, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2, 4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, response_id); + + if (err->set) { + // error represented by a [type, message] array + msgpack_pack_array(&pac, 2); + msgpack_rpc_from_integer(err->type, &pac); + msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); + // Nil result + msgpack_pack_nil(&pac); + } else { + // Nil error + msgpack_pack_nil(&pac); + // Return value + msgpack_rpc_from_object(arg, &pac); + } + + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + 1, // responses only go though 1 channel + free); + api_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +void msgpack_rpc_validate(uint64_t *response_id, + msgpack_object *req, + Error *err) +{ + // response id not known yet + + *response_id = 0; + // Validate the basic structure of the msgpack-rpc payload + if (req->type != MSGPACK_OBJECT_ARRAY) { + api_set_error(err, Validation, _("Request is not an array")); + } + + if (req->via.array.size != 4) { + api_set_error(err, Validation, _("Request array size should be 4")); + } + + if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Id must be a positive integer")); + } + + // Set the response id, which is the same as the request + *response_id = req->via.array.ptr[1].via.u64; + + if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Message type must be an integer")); + } + + if (req->via.array.ptr[0].via.u64 != 0) { + api_set_error(err, Validation, _("Message type must be 0")); + } + + if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN + && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { + api_set_error(err, Validation, _("Method must be a string")); + } + + if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { + api_set_error(err, Validation, _("Paremeters must be an array")); + } +} diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h new file mode 100644 index 0000000000..bf161d54e0 --- /dev/null +++ b/src/nvim/msgpack_rpc/helpers.h @@ -0,0 +1,17 @@ +#ifndef NVIM_MSGPACK_RPC_HELPERS_H +#define NVIM_MSGPACK_RPC_HELPERS_H + +#include +#include + +#include + +#include "nvim/os/wstream.h" +#include "nvim/api/private/defs.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/helpers.h.generated.h" +#endif + +#endif // NVIM_MSGPACK_RPC_HELPERS_H + diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c new file mode 100644 index 0000000000..33e01fe562 --- /dev/null +++ b/src/nvim/msgpack_rpc/server.c @@ -0,0 +1,273 @@ +#include +#include +#include +#include + +#include + +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/os/os.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/memory.h" +#include "nvim/message.h" +#include "nvim/tempfile.h" +#include "nvim/map.h" +#include "nvim/path.h" + +#define MAX_CONNECTIONS 32 +#define ADDRESS_MAX_SIZE 256 +#define NEOVIM_DEFAULT_TCP_PORT 7450 +#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" + +typedef enum { + kServerTypeTcp, + kServerTypePipe +} ServerType; + +typedef struct { + // Type of the union below + ServerType type; + + // This is either a tcp server or unix socket(named pipe on windows) + union { + struct { + uv_tcp_t handle; + struct sockaddr_in addr; + } tcp; + struct { + uv_pipe_t handle; + char addr[ADDRESS_MAX_SIZE]; + } pipe; + } socket; +} Server; + +static PMap(cstr_t) *servers = NULL; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/server.c.generated.h" +#endif + +/// Initializes the module +bool server_init(void) +{ + servers = pmap_new(cstr_t)(); + + if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) { + char *listen_address = (char *)vim_tempname(); + os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1); + free(listen_address); + } + + return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0; +} + +/// Teardown the server module +void server_teardown(void) +{ + if (!servers) { + return; + } + + Server *server; + + map_foreach_value(servers, server, { + if (server->type == kServerTypeTcp) { + uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); + } else { + uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + } + }); +} + +/// Starts listening on arbitrary tcp/unix addresses specified by +/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will +/// be determined by parsing `endpoint`: If it's a valid tcp address in the +/// 'ip[:port]' format, then it will be tcp socket. The port is optional +/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will +/// be a unix socket or named pipe. +/// +/// @param endpoint Address of the server. Either a 'ip[:port]' string or an +/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or +/// named pipe. +/// @returns zero if successful, one on a regular error, and negative errno +/// on failure to bind or connect. +int server_start(const char *endpoint) + FUNC_ATTR_NONNULL_ALL +{ + char addr[ADDRESS_MAX_SIZE]; + + // Trim to `ADDRESS_MAX_SIZE` + if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { + // TODO(aktau): since this is not what the user wanted, perhaps we + // should return an error here + EMSG2("Address was too long, truncated to %s", addr); + } + + // Check if the server already exists + if (pmap_has(cstr_t)(servers, addr)) { + EMSG2("Already listening on %s", addr); + return 1; + } + + ServerType server_type = kServerTypeTcp; + Server *server = xmalloc(sizeof(Server)); + char ip[16], *ip_end = strrchr(addr, ':'); + + if (!ip_end) { + ip_end = strchr(addr, NUL); + } + + uint32_t addr_len = ip_end - addr; + + if (addr_len > sizeof(ip) - 1) { + // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) + addr_len = sizeof(ip) - 1; + } + + // Extract the address part + xstrlcpy(ip, addr, addr_len + 1); + + int port = NEOVIM_DEFAULT_TCP_PORT; + + if (*ip_end == ':') { + // Extract the port + long lport = strtol(ip_end + 1, NULL, 10); // NOLINT + if (lport <= 0 || lport > 0xffff) { + // Invalid port, treat as named pipe or unix socket + server_type = kServerTypePipe; + } else { + port = (int) lport; + } + } + + if (server_type == kServerTypeTcp) { + // Try to parse ip address + if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { + // Invalid address, treat as named pipe or unix socket + server_type = kServerTypePipe; + } + } + + int result; + + if (server_type == kServerTypeTcp) { + // Listen on tcp address/port + uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); + server->socket.tcp.handle.data = server; + result = uv_tcp_bind(&server->socket.tcp.handle, + (const struct sockaddr *)&server->socket.tcp.addr, + 0); + if (result == 0) { + result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, + MAX_CONNECTIONS, + connection_cb); + if (result) { + uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); + } + } + } else { + // Listen on named pipe or unix socket + xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr)); + uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); + server->socket.pipe.handle.data = server; + result = uv_pipe_bind(&server->socket.pipe.handle, + server->socket.pipe.addr); + if (result == 0) { + result = uv_listen((uv_stream_t *)&server->socket.pipe.handle, + MAX_CONNECTIONS, + connection_cb); + + if (result) { + uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + } + } + } + + assert(result <= 0); // libuv should have returned -errno or zero. + if (result < 0) { + if (result == -EACCES) { + // Libuv converts ENOENT to EACCES for Windows compatibility, but if + // the parent directory does not exist, ENOENT would be more accurate. + *path_tail((char_u *) addr) = NUL; + if (!os_file_exists((char_u *) addr)) { + result = -ENOENT; + } + } + EMSG2("Failed to start server: %s", uv_strerror(result)); + free(server); + return result; + } + + server->type = server_type; + + // Add the server to the hash table + pmap_put(cstr_t)(servers, addr, server); + + return 0; +} + +/// Stops listening on the address specified by `endpoint`. +/// +/// @param endpoint Address of the server. +void server_stop(char *endpoint) +{ + Server *server; + char addr[ADDRESS_MAX_SIZE]; + + // Trim to `ADDRESS_MAX_SIZE` + xstrlcpy(addr, endpoint, sizeof(addr)); + + if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) { + EMSG2("Not listening on %s", addr); + return; + } + + if (server->type == kServerTypeTcp) { + uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); + } else { + uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + } + + pmap_del(cstr_t)(servers, addr); +} + +static void connection_cb(uv_stream_t *server, int status) +{ + int result; + uv_stream_t *client; + Server *srv = server->data; + + if (status < 0) { + abort(); + } + + if (srv->type == kServerTypeTcp) { + client = xmalloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); + } else { + client = xmalloc(sizeof(uv_pipe_t)); + uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); + } + + result = uv_accept(server, client); + + if (result) { + EMSG2("Failed to accept connection: %s", uv_strerror(result)); + uv_close((uv_handle_t *)client, free_client); + return; + } + + channel_from_stream(client); +} + +static void free_client(uv_handle_t *handle) +{ + free(handle); +} + +static void free_server(uv_handle_t *handle) +{ + free(handle->data); +} diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h new file mode 100644 index 0000000000..f1a6703938 --- /dev/null +++ b/src/nvim/msgpack_rpc/server.h @@ -0,0 +1,7 @@ +#ifndef NVIM_MSGPACK_RPC_SERVER_H +#define NVIM_MSGPACK_RPC_SERVER_H + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/server.h.generated.h" +#endif +#endif // NVIM_MSGPACK_RPC_SERVER_H diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c deleted file mode 100644 index 959fbc6e73..0000000000 --- a/src/nvim/os/channel.c +++ /dev/null @@ -1,598 +0,0 @@ -#include -#include -#include - -#include -#include - -#include "nvim/api/private/helpers.h" -#include "nvim/api/vim.h" -#include "nvim/os/channel.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/vim.h" -#include "nvim/ascii.h" -#include "nvim/memory.h" -#include "nvim/os_unix.h" -#include "nvim/message.h" -#include "nvim/term.h" -#include "nvim/map.h" -#include "nvim/log.h" -#include "nvim/misc1.h" -#include "nvim/lib/kvec.h" - -#define CHANNEL_BUFFER_SIZE 0xffff - -typedef struct { - uint64_t request_id; - bool errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - PMap(cstr_t) *subscribed_events; - bool is_job, enabled; - msgpack_unpacker *unpacker; - union { - Job *job; - struct { - RStream *read; - WStream *write; - uv_stream_t *uv; - } streams; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; -} Channel; - -static uint64_t next_id = 1; -static PMap(uint64_t) *channels = NULL; -static PMap(cstr_t) *event_strings = NULL; -static msgpack_sbuffer out_buffer; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.c.generated.h" -#endif - -/// Initializes the module -void channel_init(void) -{ - channels = pmap_new(uint64_t)(); - event_strings = pmap_new(cstr_t)(); - msgpack_sbuffer_init(&out_buffer); - - if (embedded_mode) { - channel_from_stdio(); - } -} - -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a job and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. -/// -/// @param argv The argument vector for the process -/// @return The channel id -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(); - channel->is_job = true; - - int status; - channel->data.job = job_start(argv, - channel, - job_out, - job_err, - NULL, - 0, - &status); - - if (status <= 0) { - close_channel(channel); - return 0; - } - - return channel->id; -} - -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection -/// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) -{ - Channel *channel = register_channel(); - stream->data = NULL; - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); - rstream_set_stream(channel->data.streams.read, stream); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_stream(channel->data.streams.write, stream); - channel->data.streams.uv = stream; -} - -bool channel_exists(uint64_t id) -{ - Channel *channel; - return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; -} - -/// Sends event/arguments to channel -/// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments -/// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *name, Array args) -{ - Channel *channel = NULL; - - if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - api_free_array(args); - return false; - } - send_event(channel, name, args); - } else { - broadcast_event(name, args); - } - - return true; -} - -/// Sends a method call to a channel -/// -/// @param id The channel id -/// @param method_name The method name, an arbitrary string -/// @param args Array with method arguments -/// @param[out] error True if the return value is an error -/// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - char *method_name, - Array args, - Error *err) -{ - Channel *channel = NULL; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); - api_free_array(args); - return NIL; - } - - if (kv_size(channel->call_stack) > 20) { - // 20 stack depth is more than anyone should ever need for RPC calls - api_set_error(err, - Exception, - _("Channel %" PRIu64 " crossed maximum stack depth"), - channel->id); - api_free_array(args); - return NIL; - } - - uint64_t request_id = channel->next_request_id++; - // Send the msgpack-rpc request - send_request(channel, request_id, method_name, args); - - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - - // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; - kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); - - do { - event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return - - if (frame.errored) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); - return NIL; - } - - return frame.result; -} - -/// Subscribes to event broadcasts -/// -/// @param id The channel id -/// @param event The event type string -void channel_subscribe(uint64_t id, char *event) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - abort(); - } - - char *event_string = pmap_get(cstr_t)(event_strings, event); - - if (!event_string) { - event_string = xstrdup(event); - pmap_put(cstr_t)(event_strings, event_string, event_string); - } - - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); -} - -/// Unsubscribes to event broadcasts -/// -/// @param id The channel id -/// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - abort(); - } - - unsubscribe(channel, event); -} - -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - return false; - } - - channel_kill(channel); - channel->enabled = false; - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -static void channel_from_stdio(void) -{ - Channel *channel = register_channel(); - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); - rstream_set_file(channel->data.streams.read, 0); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_file(channel->data.streams.write, 1); - channel->data.streams.uv = NULL; -} - -static void job_out(RStream *rstream, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(rstream, job_data(job), eof); -} - -static void job_err(RStream *rstream, void *data, bool eof) -{ - size_t count; - char buf[256]; - Channel *channel = job_data(data); - - while ((count = rstream_pending(rstream))) { - size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); - buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); - } -} - -static void parse_msgpack(RStream *rstream, void *data, bool eof) -{ - Channel *channel = data; - channel->rpc_call_level++; - - if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); - goto end; - } - - uint32_t count = rstream_pending(rstream); - DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", - count, - rstream); - - // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); - - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - msgpack_unpack_return result; - - // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { - if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { - if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); - } else { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Channel %" PRIu64 " returned a response that doesn't have " - " a matching id for the current RPC call. Ensure the client " - " is properly synchronized", - channel->id); - call_set_error(channel, buf); - } - msgpack_unpacked_destroy(&unpacked); - // Bail out from this event loop iteration - goto end; - } - - // Perform the call - WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); - // write the response - if (!channel_write(channel, resp)) { - goto end; - } - } - - if (result == MSGPACK_UNPACK_NOMEM_ERROR) { - OUT_STR(e_outofmem); - out_char('\n'); - preserve_exit(); - } - - if (result == MSGPACK_UNPACK_PARSE_ERROR) { - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - send_error(channel, 0, "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); - } - -end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); - } -} - -static bool channel_write(Channel *channel, WBuffer *buffer) -{ - bool success; - - if (channel->is_job) { - success = job_write(channel->data.job, buffer); - } else { - success = wstream_write(channel->data.streams.write, buffer); - } - - if (!success) { - // If the write failed for any reason, close the channel - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed due to a failed write", - channel->id); - call_set_error(channel, buf); - } - - return success; -} - -static void send_error(Channel *channel, uint64_t id, char *err) -{ - Error e = ERROR_INIT; - api_set_error(&e, Exception, "%s", err); - channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); -} - -static void send_request(Channel *channel, - uint64_t id, - char *name, - Array args) -{ - String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); -} - -static void send_event(Channel *channel, - char *name, - Array args) -{ - String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); -} - -static void broadcast_event(char *name, Array args) -{ - kvec_t(Channel *) subscribed; - kv_init(subscribed); - Channel *channel; - - map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { - kv_push(Channel *, subscribed, channel); - } - }); - - if (!kv_size(subscribed)) { - api_free_array(args); - goto end; - } - - String method = {.size = strlen(name), .data = name}; - WBuffer *buffer = serialize_request(0, - method, - args, - &out_buffer, - kv_size(subscribed)); - - for (size_t i = 0; i < kv_size(subscribed); i++) { - channel_write(kv_A(subscribed, i), buffer); - } - -end: - kv_destroy(subscribed); -} - -static void unsubscribe(Channel *channel, char *event) -{ - char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); - - map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { - return; - } - }); - - // Since the string is no longer used by other channels, release it's memory - pmap_del(cstr_t)(event_strings, event_string); - free(event_string); -} - -static void close_channel(Channel *channel) -{ - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ - if (channel->is_job) { - if (channel->data.job) { - job_stop(channel->data.job); - } - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - if (channel->data.streams.uv) { - uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); - } else { - // When the stdin channel closes, it's time to go - mch_exit(0); - } - } -} - -static void close_cb(uv_handle_t *handle) -{ - free(handle->data); - free(handle); -} - -static Channel *register_channel(void) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = next_id++; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; -} - -static bool is_rpc_response(msgpack_object *obj) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 4 - && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.array.ptr[0].via.u64 == 1 - && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; -} - -static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) -{ - uint64_t response_id = obj->via.array.ptr[1].via.u64; - // Must be equal to the frame at the stack's bottom - return response_id == kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1)->request_id; -} - -static void call_stack_pop(msgpack_object *obj, Channel *channel) -{ - ChannelCallFrame *frame = kv_pop(channel->call_stack); - frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - - if (frame->errored) { - msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); - } else { - msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); - } -} - -static void call_set_error(Channel *channel, char *msg) -{ - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); - frame->errored = true; - frame->result = STRING_OBJ(cstr_to_string(msg)); - } - - channel->enabled = false; -} diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h deleted file mode 100644 index bb409bfde9..0000000000 --- a/src/nvim/os/channel.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef NVIM_OS_CHANNEL_H -#define NVIM_OS_CHANNEL_H - -#include -#include - -#include "nvim/api/private/defs.h" -#include "nvim/vim.h" - -#define METHOD_MAXLEN 512 - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.h.generated.h" -#endif -#endif // NVIM_OS_CHANNEL_H diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index a460b2db96..f20d43c6a4 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -7,8 +7,10 @@ #include "nvim/os/event.h" #include "nvim/os/input.h" -#include "nvim/os/channel.h" -#include "nvim/os/server.h" +#include "nvim/msgpack_rpc/defs.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/os/provider.h" #include "nvim/os/signal.h" #include "nvim/os/rstream.h" @@ -41,6 +43,9 @@ static EventSource *immediate_sources = NULL; void event_init(void) { + // early msgpack-rpc initialization + msgpack_rpc_init_method_table(); + msgpack_rpc_helpers_init(); // Initialize the event queues deferred_events = kl_init(Event); immediate_events = kl_init(Event); @@ -52,9 +57,8 @@ void event_init(void) signal_init(); // Jobs job_init(); - // Channels + // finish mspgack-rpc initialization channel_init(); - // Servers server_init(); // Providers provider_init(); diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c deleted file mode 100644 index 55bc006ad1..0000000000 --- a/src/nvim/os/msgpack_rpc.c +++ /dev/null @@ -1,188 +0,0 @@ -#include -#include -#include - -#include - -#include "nvim/vim.h" -#include "nvim/log.h" -#include "nvim/memory.h" -#include "nvim/os/wstream.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/api/private/helpers.h" -#include "nvim/func_attr.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.c.generated.h" -#endif - -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param channel_id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -WBuffer *msgpack_rpc_call(uint64_t channel_id, - msgpack_object *req, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2) - FUNC_ATTR_NONNULL_ARG(3) -{ - uint64_t response_id; - Error error = ERROR_INIT; - msgpack_rpc_validate(&response_id, req, &error); - - if (error.set) { - return serialize_response(response_id, &error, NIL, sbuffer); - } - - // dispatch the call - Object rv = msgpack_rpc_dispatch(channel_id, req, &error); - // send the response - msgpack_packer response; - msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); - - if (error.set) { - ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", - error.msg, - response_id); - return serialize_response(response_id, &error, NIL, sbuffer); - } - - DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", - response_id); - return serialize_response(response_id, &error, rv, sbuffer); -} - -/// Finishes the msgpack-rpc call with an error message. -/// -/// @param msg The error message -/// @param res A packer that contains the response -void msgpack_rpc_error(char *msg, msgpack_packer *res) - FUNC_ATTR_NONNULL_ALL -{ - size_t len = strlen(msg); - - // error message - msgpack_pack_bin(res, len); - msgpack_pack_bin_body(res, msg, len); - // Nil result - msgpack_pack_nil(res); -} - -/// Handler executed when an invalid method name is passed -Object msgpack_rpc_handle_missing_method(uint64_t channel_id, - msgpack_object *req, - Error *error) -{ - snprintf(error->msg, sizeof(error->msg), "Invalid method name"); - error->set = true; - return NIL; -} - -/// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t request_id, - String method, - Array args, - msgpack_sbuffer *sbuffer, - size_t refcount) - FUNC_ATTR_NONNULL_ARG(4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, request_id ? 4 : 3); - msgpack_pack_int(&pac, request_id ? 0 : 2); - - if (request_id) { - msgpack_pack_uint64(&pac, request_id); - } - - msgpack_pack_bin(&pac, method.size); - msgpack_pack_bin_body(&pac, method.data, method.size); - msgpack_rpc_from_array(args, &pac); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - free); - api_free_array(args); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -/// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t response_id, - Error *err, - Object arg, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2, 4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, 4); - msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, response_id); - - if (err->set) { - // error represented by a [type, message] array - msgpack_pack_array(&pac, 2); - msgpack_rpc_from_integer(err->type, &pac); - msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); - // Nil result - msgpack_pack_nil(&pac); - } else { - // Nil error - msgpack_pack_nil(&pac); - // Return value - msgpack_rpc_from_object(arg, &pac); - } - - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - free); - api_free_object(arg); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -static void msgpack_rpc_validate(uint64_t *response_id, - msgpack_object *req, - Error *err) -{ - // response id not known yet - - *response_id = 0; - // Validate the basic structure of the msgpack-rpc payload - if (req->type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Request is not an array")); - } - - if (req->via.array.size != 4) { - api_set_error(err, Validation, _("Request array size should be 4")); - } - - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Id must be a positive integer")); - } - - // Set the response id, which is the same as the request - *response_id = req->via.array.ptr[1].via.u64; - - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Message type must be an integer")); - } - - if (req->via.array.ptr[0].via.u64 != 0) { - api_set_error(err, Validation, _("Message type must be 0")); - } - - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN - && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { - api_set_error(err, Validation, _("Method must be a string")); - } - - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Paremeters must be an array")); - } -} diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h deleted file mode 100644 index 3476d791ea..0000000000 --- a/src/nvim/os/msgpack_rpc.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_H -#define NVIM_OS_MSGPACK_RPC_H - -#include - -#include - -#include "nvim/func_attr.h" -#include "nvim/api/private/defs.h" -#include "nvim/os/wstream.h" - -typedef enum { - kUnpackResultOk, /// Successfully parsed a document - kUnpackResultFail, /// Got unexpected input - kUnpackResultNeedMore /// Need more data -} UnpackResult; - -/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores -/// functions of this type. -typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, - msgpack_object *req, - Error *error); - - -/// Initializes the msgpack-rpc method table -void msgpack_rpc_init(void); - -void msgpack_rpc_init_function_metadata(Dictionary *metadata); - -/// Dispatches to the actual API function after basic payload validation by -/// `msgpack_rpc_call`. It is responsible for validating/converting arguments -/// to C types, and converting the return value back to msgpack types. -/// The implementation is generated at compile time with metadata extracted -/// from the api/*.h headers, -/// -/// @param channel_id The channel id -/// @param method_id The method id -/// @param req The parsed request object -/// @param error Pointer to error structure -/// @return Some object -Object msgpack_rpc_dispatch(uint64_t channel_id, - msgpack_object *req, - Error *error) - FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); - - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_H diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c deleted file mode 100644 index b14de8245c..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.c +++ /dev/null @@ -1,289 +0,0 @@ -#include -#include - -#include - -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.c.generated.h" -#endif - -static msgpack_zone zone; -static msgpack_sbuffer sbuffer; - -#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ - FUNC_ATTR_NONNULL_ALL \ - { \ - if (obj->type != MSGPACK_OBJECT_EXT \ - || obj->via.ext.type != kObjectType##t) { \ - return false; \ - } \ - \ - msgpack_object data; \ - msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \ - obj->via.ext.size, \ - NULL, \ - &zone, \ - &data); \ - \ - if (ret != MSGPACK_UNPACK_SUCCESS) { \ - return false; \ - } \ - \ - *arg = data.via.u64; \ - return true; \ - } \ - \ - void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \ - FUNC_ATTR_NONNULL_ARG(2) \ - { \ - msgpack_packer pac; \ - msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ - msgpack_pack_uint64(&pac, o); \ - msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ - msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ - msgpack_sbuffer_clear(&sbuffer); \ - } - -void msgpack_rpc_helpers_init(void) -{ - msgpack_zone_init(&zone, 0xfff); - msgpack_sbuffer_init(&sbuffer); -} - -HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer) -HANDLE_TYPE_CONVERSION_IMPL(Window, window) -HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage) - -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) - FUNC_ATTR_NONNULL_ALL -{ - *arg = obj->via.boolean; - return obj->type == MSGPACK_OBJECT_BOOLEAN; -} - -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.u64 <= INT64_MAX) { - *arg = (int64_t)obj->via.u64; - return true; - } - - *arg = obj->via.i64; - return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; -} - -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) - FUNC_ATTR_NONNULL_ALL -{ - *arg = obj->via.dec; - return obj->type == MSGPACK_OBJECT_DOUBLE; -} - -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) { - arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size); - arg->size = obj->via.bin.size; - } else { - return false; - } - - return true; -} - -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) - FUNC_ATTR_NONNULL_ALL -{ - switch (obj->type) { - case MSGPACK_OBJECT_NIL: - arg->type = kObjectTypeNil; - return true; - - case MSGPACK_OBJECT_BOOLEAN: - arg->type = kObjectTypeBoolean; - return msgpack_rpc_to_boolean(obj, &arg->data.boolean); - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - arg->type = kObjectTypeInteger; - return msgpack_rpc_to_integer(obj, &arg->data.integer); - - case MSGPACK_OBJECT_DOUBLE: - arg->type = kObjectTypeFloat; - return msgpack_rpc_to_float(obj, &arg->data.floating); - - case MSGPACK_OBJECT_BIN: - case MSGPACK_OBJECT_STR: - arg->type = kObjectTypeString; - return msgpack_rpc_to_string(obj, &arg->data.string); - - case MSGPACK_OBJECT_ARRAY: - arg->type = kObjectTypeArray; - return msgpack_rpc_to_array(obj, &arg->data.array); - - case MSGPACK_OBJECT_MAP: - arg->type = kObjectTypeDictionary; - return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); - - case MSGPACK_OBJECT_EXT: - switch (obj->via.ext.type) { - case kObjectTypeBuffer: - return msgpack_rpc_to_buffer(obj, &arg->data.buffer); - case kObjectTypeWindow: - return msgpack_rpc_to_window(obj, &arg->data.window); - case kObjectTypeTabpage: - return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); - } - default: - return false; - } -} - -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type != MSGPACK_OBJECT_ARRAY) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.array.size, sizeof(Object)); - - for (uint32_t i = 0; i < obj->via.array.size; i++) { - if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { - return false; - } - } - - return true; -} - -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type != MSGPACK_OBJECT_MAP) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); - - - for (uint32_t i = 0; i < obj->via.map.size; i++) { - if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, - &arg->items[i].key)) { - return false; - } - - if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, - &arg->items[i].value)) { - return false; - } - } - - return true; -} - -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - if (result) { - msgpack_pack_true(res); - } else { - msgpack_pack_false(res); - } -} - -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_int64(res, result); -} - -void msgpack_rpc_from_float(Float result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_double(res, result); -} - -void msgpack_rpc_from_string(String result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_bin(res, result.size); - msgpack_pack_bin_body(res, result.data, result.size); -} - -void msgpack_rpc_from_object(Object result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - switch (result.type) { - case kObjectTypeNil: - msgpack_pack_nil(res); - break; - - case kObjectTypeBoolean: - msgpack_rpc_from_boolean(result.data.boolean, res); - break; - - case kObjectTypeInteger: - msgpack_rpc_from_integer(result.data.integer, res); - break; - - case kObjectTypeFloat: - msgpack_rpc_from_float(result.data.floating, res); - break; - - case kObjectTypeString: - msgpack_rpc_from_string(result.data.string, res); - break; - - case kObjectTypeArray: - msgpack_rpc_from_array(result.data.array, res); - break; - - case kObjectTypeBuffer: - msgpack_rpc_from_buffer(result.data.buffer, res); - break; - - case kObjectTypeWindow: - msgpack_rpc_from_window(result.data.window, res); - break; - - case kObjectTypeTabpage: - msgpack_rpc_from_tabpage(result.data.tabpage, res); - break; - - case kObjectTypeDictionary: - msgpack_rpc_from_dictionary(result.data.dictionary, res); - break; - } -} - -void msgpack_rpc_from_array(Array result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_array(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_object(result.items[i], res); - } -} - -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_map(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_string(result.items[i].key, res); - msgpack_rpc_from_object(result.items[i].value, res); - } -} diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h deleted file mode 100644 index aede6b1587..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H -#define NVIM_OS_MSGPACK_RPC_HELPERS_H - -#include -#include - -#include - -#include "nvim/api/private/defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H - diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c index d4fffaa053..414c8841fa 100644 --- a/src/nvim/os/provider.c +++ b/src/nvim/os/provider.c @@ -8,7 +8,7 @@ #include "nvim/api/vim.h" #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/shell.h" #include "nvim/os/os.h" #include "nvim/log.h" diff --git a/src/nvim/os/server.c b/src/nvim/os/server.c deleted file mode 100644 index 9f7f5b34da..0000000000 --- a/src/nvim/os/server.c +++ /dev/null @@ -1,273 +0,0 @@ -#include -#include -#include -#include - -#include - -#include "nvim/os/channel.h" -#include "nvim/os/server.h" -#include "nvim/os/os.h" -#include "nvim/ascii.h" -#include "nvim/vim.h" -#include "nvim/memory.h" -#include "nvim/message.h" -#include "nvim/tempfile.h" -#include "nvim/map.h" -#include "nvim/path.h" - -#define MAX_CONNECTIONS 32 -#define ADDRESS_MAX_SIZE 256 -#define NEOVIM_DEFAULT_TCP_PORT 7450 -#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" - -typedef enum { - kServerTypeTcp, - kServerTypePipe -} ServerType; - -typedef struct { - // Type of the union below - ServerType type; - - // This is either a tcp server or unix socket(named pipe on windows) - union { - struct { - uv_tcp_t handle; - struct sockaddr_in addr; - } tcp; - struct { - uv_pipe_t handle; - char addr[ADDRESS_MAX_SIZE]; - } pipe; - } socket; -} Server; - -static PMap(cstr_t) *servers = NULL; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.c.generated.h" -#endif - -/// Initializes the module -bool server_init(void) -{ - servers = pmap_new(cstr_t)(); - - if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) { - char *listen_address = (char *)vim_tempname(); - os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1); - free(listen_address); - } - - return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0; -} - -/// Teardown the server module -void server_teardown(void) -{ - if (!servers) { - return; - } - - Server *server; - - map_foreach_value(servers, server, { - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - }); -} - -/// Starts listening on arbitrary tcp/unix addresses specified by -/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will -/// be determined by parsing `endpoint`: If it's a valid tcp address in the -/// 'ip[:port]' format, then it will be tcp socket. The port is optional -/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will -/// be a unix socket or named pipe. -/// -/// @param endpoint Address of the server. Either a 'ip[:port]' string or an -/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or -/// named pipe. -/// @returns zero if successful, one on a regular error, and negative errno -/// on failure to bind or connect. -int server_start(const char *endpoint) - FUNC_ATTR_NONNULL_ALL -{ - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { - // TODO(aktau): since this is not what the user wanted, perhaps we - // should return an error here - EMSG2("Address was too long, truncated to %s", addr); - } - - // Check if the server already exists - if (pmap_has(cstr_t)(servers, addr)) { - EMSG2("Already listening on %s", addr); - return 1; - } - - ServerType server_type = kServerTypeTcp; - Server *server = xmalloc(sizeof(Server)); - char ip[16], *ip_end = strrchr(addr, ':'); - - if (!ip_end) { - ip_end = strchr(addr, NUL); - } - - uint32_t addr_len = ip_end - addr; - - if (addr_len > sizeof(ip) - 1) { - // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) - addr_len = sizeof(ip) - 1; - } - - // Extract the address part - xstrlcpy(ip, addr, addr_len + 1); - - int port = NEOVIM_DEFAULT_TCP_PORT; - - if (*ip_end == ':') { - // Extract the port - long lport = strtol(ip_end + 1, NULL, 10); // NOLINT - if (lport <= 0 || lport > 0xffff) { - // Invalid port, treat as named pipe or unix socket - server_type = kServerTypePipe; - } else { - port = (int) lport; - } - } - - if (server_type == kServerTypeTcp) { - // Try to parse ip address - if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { - // Invalid address, treat as named pipe or unix socket - server_type = kServerTypePipe; - } - } - - int result; - - if (server_type == kServerTypeTcp) { - // Listen on tcp address/port - uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); - server->socket.tcp.handle.data = server; - result = uv_tcp_bind(&server->socket.tcp.handle, - (const struct sockaddr *)&server->socket.tcp.addr, - 0); - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, - MAX_CONNECTIONS, - connection_cb); - if (result) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } - } - } else { - // Listen on named pipe or unix socket - xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr)); - uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); - server->socket.pipe.handle.data = server; - result = uv_pipe_bind(&server->socket.pipe.handle, - server->socket.pipe.addr); - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.pipe.handle, - MAX_CONNECTIONS, - connection_cb); - - if (result) { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - } - } - - assert(result <= 0); // libuv should have returned -errno or zero. - if (result < 0) { - if (result == -EACCES) { - // Libuv converts ENOENT to EACCES for Windows compatibility, but if - // the parent directory does not exist, ENOENT would be more accurate. - *path_tail((char_u *) addr) = NUL; - if (!os_file_exists((char_u *) addr)) { - result = -ENOENT; - } - } - EMSG2("Failed to start server: %s", uv_strerror(result)); - free(server); - return result; - } - - server->type = server_type; - - // Add the server to the hash table - pmap_put(cstr_t)(servers, addr, server); - - return 0; -} - -/// Stops listening on the address specified by `endpoint`. -/// -/// @param endpoint Address of the server. -void server_stop(char *endpoint) -{ - Server *server; - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - xstrlcpy(addr, endpoint, sizeof(addr)); - - if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) { - EMSG2("Not listening on %s", addr); - return; - } - - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - - pmap_del(cstr_t)(servers, addr); -} - -static void connection_cb(uv_stream_t *server, int status) -{ - int result; - uv_stream_t *client; - Server *srv = server->data; - - if (status < 0) { - abort(); - } - - if (srv->type == kServerTypeTcp) { - client = xmalloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); - } else { - client = xmalloc(sizeof(uv_pipe_t)); - uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); - } - - result = uv_accept(server, client); - - if (result) { - EMSG2("Failed to accept connection: %s", uv_strerror(result)); - uv_close((uv_handle_t *)client, free_client); - return; - } - - channel_from_stream(client); -} - -static void free_client(uv_handle_t *handle) -{ - free(handle); -} - -static void free_server(uv_handle_t *handle) -{ - free(handle->data); -} diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h deleted file mode 100644 index 43592a91e4..0000000000 --- a/src/nvim/os/server.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_H -#define NVIM_OS_SERVER_H - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.h.generated.h" -#endif -#endif // NVIM_OS_SERVER_H diff --git a/src/nvim/os/server_defs.h b/src/nvim/os/server_defs.h deleted file mode 100644 index 08cdf55428..0000000000 --- a/src/nvim/os/server_defs.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_DEFS_H -#define NVIM_OS_SERVER_DEFS_H - -typedef struct server Server; - -#endif // NVIM_OS_SERVER_DEFS_H - diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c index 52f57f8262..0ad15bc433 100644 --- a/src/nvim/os_unix.c +++ b/src/nvim/os_unix.c @@ -54,8 +54,8 @@ #include "nvim/os/shell.h" #include "nvim/os/signal.h" #include "nvim/os/job.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/defs.h" #if defined(HAVE_SYS_IOCTL_H) # include @@ -166,8 +166,6 @@ void mch_init(void) mac_conv_init(); #endif - msgpack_rpc_init(); - msgpack_rpc_helpers_init(); event_init(); } -- cgit From cf6f60ce4dc376570e8d71facea76299ca951604 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 07:46:19 -0300 Subject: channel: Simplify resource management - Remove unused rpc_call_level field - Add `returned` field to ChannelCallFrame. This is set when the call returns and is the only condition checked by `channel_send_call`. - Add job_exit callback for properly closing channels created from job(the job_exit callback is only called after all read callbacks, so it's the only safe place to free the channel). --- src/nvim/msgpack_rpc/channel.c | 109 ++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index dcd7e41737..83e7900a54 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -31,14 +31,14 @@ typedef struct { uint64_t request_id; - bool errored; + bool returned, errored; Object result; } ChannelCallFrame; typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job, enabled; + bool is_job, closed; msgpack_unpacker *unpacker; union { Job *job; @@ -50,7 +50,6 @@ typedef struct { } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; } Channel; static uint64_t next_id = 1; @@ -103,12 +102,12 @@ uint64_t channel_from_job(char **argv) channel, job_out, job_err, - NULL, + job_exit, 0, &status); if (status <= 0) { - close_channel(channel); + free_channel(channel); return 0; } @@ -141,7 +140,7 @@ bool channel_exists(uint64_t id) { Channel *channel; return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; + && !channel->closed; } /// Sends event/arguments to channel @@ -156,7 +155,7 @@ bool channel_send_event(uint64_t id, char *name, Array args) Channel *channel = NULL; if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_free_array(args); return false; } @@ -182,7 +181,7 @@ Object channel_send_call(uint64_t id, { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); api_free_array(args); return NIL; @@ -208,16 +207,14 @@ Object channel_send_call(uint64_t id, EventSource sources[] = {channel_source, NULL}; // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; + ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); do { event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return + } while (!frame.returned); + + (void)kv_pop(channel->call_stack); if (frame.errored) { api_set_error(err, Exception, "%s", frame.result.data.string.data); @@ -235,7 +232,7 @@ void channel_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -257,7 +254,7 @@ void channel_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -272,12 +269,11 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { return false; } - channel_kill(channel); - channel->enabled = false; + close_channel(channel); return true; } @@ -319,19 +315,16 @@ static void job_err(RStream *rstream, void *data, bool eof) } } +static void job_exit(Job *job, void *data) +{ + free_channel((Channel *)data); +} + static void parse_msgpack(RStream *rstream, void *data, bool eof) { Channel *channel = data; - channel->rpc_call_level++; if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); goto end; } @@ -354,7 +347,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) MSGPACK_UNPACK_SUCCESS) { if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); + complete_call(&unpacked.data, channel); } else { char buf[256]; snprintf(buf, @@ -397,10 +390,11 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); + if (eof && !channel->is_job && !kv_size(channel->call_stack)) { + // The free_channel call is deferred for jobs because it's possible that + // job_stderr will called after this. For non-job channels, this is the + // last callback so it must be freed now. + free_channel(channel); } } @@ -500,26 +494,11 @@ static void unsubscribe(Channel *channel, char *event) free(event_string); } +/// Close the channel streams/job. The channel resources will be freed by +/// free_channel later. static void close_channel(Channel *channel) { - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ + channel->closed = true; if (channel->is_job) { if (channel->data.job) { job_stop(channel->data.job); @@ -536,6 +515,22 @@ static void channel_kill(Channel *channel) } } +static void free_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + free(channel); +} + static void close_cb(uv_handle_t *handle) { free(handle->data); @@ -545,8 +540,7 @@ static void close_cb(uv_handle_t *handle) static Channel *register_channel(void) { Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; + rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); @@ -573,9 +567,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) kv_size(channel->call_stack) - 1)->request_id; } -static void call_stack_pop(msgpack_object *obj, Channel *channel) +static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; if (frame->errored) { @@ -588,10 +584,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg) { for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, i); + frame->returned = true; frame->errored = true; frame->result = STRING_OBJ(cstr_to_string(msg)); } - channel->enabled = false; + close_channel(channel); } -- cgit From 77cc078c41c4348a3649cc366a262e6fab43980b Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 08:44:46 -0300 Subject: event: Remove EventType enum and pass a callback to `event_push` This approach is more flexible because we don't need to support a fixed set of "event types", any module can push events to be handled in main loop by simply passing a callback to the Event structure. --- src/nvim/os/event.c | 14 +------------- src/nvim/os/event_defs.h | 14 +++++--------- src/nvim/os/job.c | 2 +- src/nvim/os/rstream.c | 2 +- src/nvim/os/signal.c | 2 +- 5 files changed, 9 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index f20d43c6a4..43c02b13b2 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -170,19 +170,7 @@ static size_t process_from(klist_t(Event) *queue) Event event; while (kl_shift(Event, queue, &event) == 0) { - switch (event.type) { - case kEventSignal: - signal_handle(event); - break; - case kEventRStreamData: - rstream_read_event(event); - break; - case kEventJobExit: - job_exit_event(event); - break; - default: - abort(); - } + event.handler(event); count++; } diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index dbee3e2ba7..553d4e3125 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -7,16 +7,12 @@ #include "nvim/os/rstream_defs.h" typedef void * EventSource; +typedef struct event Event; +typedef void (*event_handler)(Event event); -typedef enum { - kEventSignal, - kEventRStreamData, - kEventJobExit -} EventType; - -typedef struct { +struct event { EventSource source; - EventType type; + event_handler handler; union { int signum; struct { @@ -25,6 +21,6 @@ typedef struct { } rstream; Job *job; } data; -} Event; +}; #endif // NVIM_OS_EVENT_DEFS_H diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 2ca1023290..d0ac82c047 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -489,7 +489,7 @@ static void emit_exit_event(Job *job) { Event event = { .source = job_event_source(job), - .type = kEventJobExit, + .handler = job_exit_event, .data.job = job }; event_push(event); diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 8f1c30de50..b95098cc52 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -430,7 +430,7 @@ static void emit_read_event(RStream *rstream, bool eof) { Event event = { .source = rstream_event_source(rstream), - .type = kEventRStreamData, + .handler = rstream_read_event, .data.rstream = { .ptr = rstream, .eof = eof diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 2f93cfb08a..b330c7f788 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -164,7 +164,7 @@ static void signal_cb(uv_signal_t *handle, int signum) Event event = { .source = signal_event_source(), - .type = kEventSignal, + .handler = signal_handle, .data = { .signum = signum } -- cgit From 264e0d872c598062be2b2a118d38c89a6ed5a023 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 09:25:58 -0300 Subject: event: Remove automatic event deferall This is how asynchronous events are currently handled by Nvim: - Libuv event loop is entered when Nvim blocks for user input(os_inchar is called) - Any event delivered by libuv that is not user input is queued for processing - The `K_EVENT` special key code is returned by os_inchar - `K_EVENT` is returned to a loop that is reading keys for the current Nvim mode, which will be handled by calling event_process() This approach has the advantage of integrating nicely with the current codebase, eg: vimscript code can be executed asynchronously with little surprises(Its the same as if the user typed a key). The problem with using keys to represent any event is that it also interferes with operators, and not every event needs or should do that. For example, consider this scenario: - A msgpack-rpc client calls vim_feedkeys("d") - Nvim processes K_EVENT, pushing "d" to the input queue - Nvim processes "d", entering operator-pending mode to wait for a motion - The client calls vim_feedkeys("w"), expecting Nvim to delete a word - Nvim processes K_EVENT, breaking out of operator-pending and pushing "w" - Nvim processes "w", moving a word This commit fixes the above problem by removing all automatic calls to `event_push`(which is what generates K_EVENT input). Right now this also breaks redrawing initiated by asynchronous events(and possibly other stuff too, Nvim is a complex state machine and we can't simply run vimscript code anywhere). In future commits the calls to `event_push` will be inserted only where it's absolutely necessary to run code in "key reading loops", such as when executing vimscript code or mutating editor data structures in ways that currently can only be done by the user. --- src/nvim/msgpack_rpc/channel.c | 13 ++---- src/nvim/os/event.c | 98 +++++------------------------------------- src/nvim/os/event_defs.h | 13 +----- src/nvim/os/input.c | 18 ++------ src/nvim/os/job.c | 55 ++++++------------------ src/nvim/os/rstream.c | 58 +++---------------------- src/nvim/os/signal.c | 81 +++++++++++----------------------- 7 files changed, 65 insertions(+), 271 deletions(-) (limited to 'src') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 83e7900a54..d31e404c23 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -126,8 +126,7 @@ void channel_from_stream(uv_stream_t *stream) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -201,17 +200,12 @@ Object channel_send_call(uint64_t id, // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - // Push the frame ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); do { - event_poll(-1, sources); + event_poll(-1); } while (!frame.returned); (void)kv_pop(channel->call_stack); @@ -286,8 +280,7 @@ static void channel_from_stdio(void) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_file(channel->data.streams.read, 0); rstream_start(channel->data.streams.read); // write stream diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 43c02b13b2..00920fc5cf 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -34,12 +34,7 @@ typedef struct { #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.c.generated.h" #endif -static klist_t(Event) *deferred_events, *immediate_events; -// NULL-terminated array of event sources that we should process immediately. -// -// Events from sources that are not contained in this array are processed -// later when `event_process` is called -static EventSource *immediate_sources = NULL; +static klist_t(Event) *pending_events; void event_init(void) { @@ -47,8 +42,7 @@ void event_init(void) msgpack_rpc_init_method_table(); msgpack_rpc_helpers_init(); // Initialize the event queues - deferred_events = kl_init(Event); - immediate_events = kl_init(Event); + pending_events = kl_init(Event); // Initialize input events input_init(); // Timer to wake the event loop if a timeout argument is passed to @@ -72,8 +66,7 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms, EventSource sources[]) - FUNC_ATTR_NONNULL_ARG(2) +bool event_poll(int32_t ms) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -104,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[]) run_mode = UV_RUN_NOWAIT; } - size_t processed_events; - - do { - // Run one event loop iteration, blocking for events if run_mode is - // UV_RUN_ONCE - processed_events = loop(run_mode, sources); - } while ( - // Continue running if ... - !processed_events && // we didn't process any immediate events - !event_has_deferred() && // no events are waiting to be processed - run_mode != UV_RUN_NOWAIT && // ms != 0 - !timer_data.timed_out); // we didn't get a timeout + loop(run_mode); if (!(--recursive)) { // Again, only stop when we leave the top-level invocation @@ -127,56 +109,31 @@ bool event_poll(int32_t ms, EventSource sources[]) // once more to let libuv perform it's cleanup uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL); - processed_events += loop(UV_RUN_NOWAIT, sources); + loop(UV_RUN_NOWAIT); } - return !timer_data.timed_out && (processed_events || event_has_deferred()); + return !timer_data.timed_out && event_has_deferred(); } bool event_has_deferred(void) { - return !kl_empty(deferred_events); + return !kl_empty(pending_events); } // Queue an event void event_push(Event event) { - bool defer = true; - - if (immediate_sources) { - size_t i; - EventSource src; - - for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) { - if (src == event.source) { - defer = false; - break; - } - } - } - - *kl_pushp(Event, defer ? deferred_events : immediate_events) = event; + *kl_pushp(Event, pending_events) = event; } -void event_process(void) -{ - process_from(deferred_events); -} -// Runs the appropriate action for each queued event -static size_t process_from(klist_t(Event) *queue) +void event_process(void) { - size_t count = 0; Event event; - while (kl_shift(Event, queue, &event) == 0) { + while (kl_shift(Event, pending_events, &event) == 0) { event.handler(event); - count++; } - - DLOG("Processed %u events", count); - - return count; } // Set a flag in the `event_poll` loop for signaling of a timeout @@ -194,42 +151,9 @@ static void timer_prepare_cb(uv_prepare_t *handle) uv_prepare_stop(handle); } -static void requeue_deferred_events(void) +static void loop(uv_run_mode run_mode) { - size_t remaining = deferred_events->size; - - DLOG("Number of deferred events: %u", remaining); - - while (remaining--) { - // Re-push each deferred event to ensure it will be in the right queue - Event event; - kl_shift(Event, deferred_events, &event); - event_push(event); - DLOG("Re-queueing event"); - } - - DLOG("Number of deferred events: %u", deferred_events->size); -} - -static size_t loop(uv_run_mode run_mode, EventSource *sources) -{ - size_t count; - immediate_sources = sources; - // It's possible that some events from the immediate sources are waiting - // in the deferred queue. If so, move them to the immediate queue so they - // will be processed in order of arrival by the next `process_from` call. - requeue_deferred_events(); - count = process_from(immediate_events); - - if (count) { - // No need to enter libuv, events were already processed - return count; - } - DLOG("Enter event loop"); uv_run(uv_default_loop(), run_mode); DLOG("Exit event loop"); - immediate_sources = NULL; - count = process_from(immediate_events); - return count; } diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index 553d4e3125..2dd9403d9f 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -6,21 +6,12 @@ #include "nvim/os/job_defs.h" #include "nvim/os/rstream_defs.h" -typedef void * EventSource; typedef struct event Event; typedef void (*event_handler)(Event event); struct event { - EventSource source; + void *data; event_handler handler; - union { - int signum; - struct { - RStream *ptr; - bool eof; - } rstream; - Job *job; - } data; -}; +}; #endif // NVIM_OS_EVENT_DEFS_H diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index a18d735ce6..d718bf95da 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -7,7 +7,6 @@ #include "nvim/api/private/defs.h" #include "nvim/os/input.h" #include "nvim/os/event.h" -#include "nvim/os/signal.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" #include "nvim/ascii.h" @@ -48,10 +47,7 @@ void input_init(void) } read_buffer = rbuffer_new(READ_BUFFER_SIZE); - read_stream = rstream_new(read_cb, - read_buffer, - NULL, - NULL); + read_stream = rstream_new(read_cb, read_buffer, NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -170,16 +166,10 @@ void input_buffer_restore(String str) static bool input_poll(int32_t ms) { if (embedded_mode) { - EventSource input_sources[] = { signal_event_source(), NULL }; - return event_poll(ms, input_sources); + return event_poll(ms); } - EventSource input_sources[] = { - rstream_event_source(read_stream), - NULL - }; - - return input_ready() || event_poll(ms, input_sources) || input_ready(); + return input_ready() || event_poll(ms) || input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c @@ -235,7 +225,7 @@ static void read_cb(RStream *rstream, void *data, bool at_eof) static void convert_input(void) { - if (!rbuffer_available(input_buffer)) { + if (embedded_mode || !rbuffer_available(input_buffer)) { // No input buffer space return; } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index d0ac82c047..091da5d213 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -14,7 +14,6 @@ #include "nvim/os/event_defs.h" #include "nvim/os/time.h" #include "nvim/os/shell.h" -#include "nvim/os/signal.h" #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/term.h" @@ -103,21 +102,24 @@ void job_teardown(void) // Prepare to start shooting for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL) { - continue; - } + job = table[i]; // Still alive - while (is_alive(job) && remaining_tries--) { + while (job && is_alive(job) && remaining_tries--) { os_delay(50, 0); // Acknowledge child exits uv_run(uv_default_loop(), UV_RUN_NOWAIT); + // It's possible that the uv_run call removed the job from the table, + // reset 'job' so the next iteration won't run in that case. + job = table[i]; } - if (is_alive(job)) { + if (job && is_alive(job)) { uv_process_kill(&job->proc, SIGKILL); } } + // Last run to ensure all children were removed + uv_run(uv_default_loop(), UV_RUN_NOWAIT); } /// Tries to start a new job. @@ -213,14 +215,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); - job->err = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); + job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -277,8 +273,6 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int old_mode = cur_tmode; settmode(TMODE_COOK); - EventSource sources[] = {job_event_source(job), signal_event_source(), NULL}; - // keep track of the elapsed time if ms > 0 uint64_t before = (ms > 0) ? os_hrtime() : 0; @@ -288,7 +282,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL break; } - event_poll(ms, sources); + event_poll(ms); // we'll assume that a user frantically hitting interrupt doesn't like // the current job. Signal that it has to be killed. @@ -369,14 +363,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Runs the read callback associated with the job exit event -/// -/// @param event Object containing data necessary to invoke the callback -void job_exit_event(Event event) -{ - job_exit_callback(event.data.job); -} - /// Get the job id /// /// @param job A pointer to the job @@ -395,11 +381,6 @@ void *job_data(Job *job) return job->data; } -EventSource job_event_source(Job *job) -{ - return job; -} - static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -470,7 +451,7 @@ static void read_cb(RStream *rstream, void *data, bool eof) } if (eof && --job->pending_refs == 0) { - emit_exit_event(job); + job_exit_callback(job); } } @@ -481,20 +462,10 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) job->status = status; if (--job->pending_refs == 0) { - emit_exit_event(job); + job_exit_callback(job); } } -static void emit_exit_event(Job *job) -{ - Event event = { - .source = job_event_source(job), - .handler = job_exit_event, - .data.job = job - }; - event_push(event); -} - static void close_cb(uv_handle_t *handle) { Job *job = handle_get_job(handle); diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index b95098cc52..8cfd9d1b75 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -8,8 +8,6 @@ #include "nvim/os/uv_helpers.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/ascii.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -33,7 +31,6 @@ struct rstream { uv_file fd; rstream_cb cb; bool free_handle; - EventSource source_override; }; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -76,18 +73,13 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count) void rbuffer_produced(RBuffer *rbuffer, size_t count) { rbuffer->wpos += count; - DLOG("Received %u bytes from RStream(address: %p, source: %p)", - (size_t)cnt, - rbuffer->rstream, - rstream_event_source(rbuffer->rstream)); + DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rbuffer->rstream); rbuffer_relocate(rbuffer); if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { // The last read filled the buffer, stop reading for now rstream_stop(rbuffer->rstream); - DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", - rstream, - rstream_event_source(rstream)); + DLOG("Buffer for RStream(%p) is full, stopping it", rstream); } } @@ -180,13 +172,8 @@ void rbuffer_free(RBuffer *rbuffer) /// for reading with `rstream_read` /// @param buffer RBuffer instance to associate with the RStream /// @param data Some state to associate with the `RStream` instance -/// @param source_override Replacement for the default source used in events -/// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance -RStream * rstream_new(rstream_cb cb, - RBuffer *buffer, - void *data, - EventSource source_override) +RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) { RStream *rv = xmalloc(sizeof(RStream)); rv->buffer = buffer; @@ -198,7 +185,6 @@ RStream * rstream_new(rstream_cb cb, rv->fread_idle = NULL; rv->free_handle = false; rv->file_type = UV_UNKNOWN_HANDLE; - rv->source_override = source_override ? source_override : rv; return rv; } @@ -322,21 +308,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count) return rbuffer_read(rstream->buffer, buffer, count); } -/// Runs the read callback associated with the rstream -/// -/// @param event Object containing data necessary to invoke the callback -void rstream_read_event(Event event) -{ - RStream *rstream = event.data.rstream.ptr; - - rstream->cb(rstream, rstream->data, event.data.rstream.eof); -} - -EventSource rstream_event_source(RStream *rstream) -{ - return rstream->source_override; -} - // Callbacks used by libuv // Called by libuv to allocate memory for reading. @@ -357,13 +328,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt <= 0) { if (cnt != UV_ENOBUFS) { - DLOG("Closing RStream(address: %p, source: %p)", - rstream, - rstream_event_source(rstream)); + DLOG("Closing RStream(%p)", rstream); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); - emit_read_event(rstream, true); + rstream->cb(rstream, rstream->data, true); } return; } @@ -374,7 +343,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(rstream->buffer, nread); - emit_read_event(rstream, false); + rstream->cb(rstream, rstream->data, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -409,7 +378,6 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(rstream->fread_idle); - emit_read_event(rstream, true); return; } @@ -417,7 +385,6 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(rstream->buffer, nread); rstream->fpos += nread; - emit_read_event(rstream, false); } static void close_cb(uv_handle_t *handle) @@ -426,19 +393,6 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static void emit_read_event(RStream *rstream, bool eof) -{ - Event event = { - .source = rstream_event_source(rstream), - .handler = rstream_read_event, - .data.rstream = { - .ptr = rstream, - .eof = eof - } - }; - event_push(event); -} - static void rbuffer_relocate(RBuffer *rbuffer) { // Move data ... diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index b330c7f788..36f7b37c48 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -12,8 +12,6 @@ #include "nvim/memory.h" #include "nvim/misc1.h" #include "nvim/misc2.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/os/signal.h" static uv_signal_t sint, spipe, shup, squit, sterm, swinch; @@ -72,45 +70,6 @@ void signal_accept_deadly(void) rejecting_deadly = false; } -void signal_handle(Event event) -{ - int signum = event.data.signum; - - switch (signum) { - case SIGINT: - got_int = true; - break; -#ifdef SIGPWR - case SIGPWR: - // Signal of a power failure(eg batteries low), flush the swap files to - // be safe - ml_sync_all(false, false); - break; -#endif - case SIGPIPE: - // Ignore - break; - case SIGWINCH: - shell_resized(); - break; - case SIGTERM: - case SIGQUIT: - case SIGHUP: - if (!rejecting_deadly) { - deadly_signal(signum); - } - break; - default: - fprintf(stderr, "Invalid signal %d", signum); - break; - } -} - -EventSource signal_event_source(void) -{ - return &sint; -} - static char * signal_name(int signum) { switch (signum) { @@ -154,20 +113,32 @@ static void deadly_signal(int signum) static void signal_cb(uv_signal_t *handle, int signum) { - if (rejecting_deadly) { - if (signum == SIGINT) { + switch (signum) { + case SIGINT: got_int = true; - } - - return; + break; +#ifdef SIGPWR + case SIGPWR: + // Signal of a power failure(eg batteries low), flush the swap files to + // be safe + ml_sync_all(false, false); + break; +#endif + case SIGPIPE: + // Ignore + break; + case SIGWINCH: + shell_resized(); + break; + case SIGTERM: + case SIGQUIT: + case SIGHUP: + if (!rejecting_deadly) { + deadly_signal(signum); + } + break; + default: + fprintf(stderr, "Invalid signal %d", signum); + break; } - - Event event = { - .source = signal_event_source(), - .handler = signal_handle, - .data = { - .signum = signum - } - }; - event_push(event); } -- cgit From b527ac752fd5ebcc74c06306e7009e2b98e4ee01 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 10:39:54 -0300 Subject: event: Extract event_poll loops to `event_poll_until` macro A pattern that is becoming common across the project is to poll for events until a certain condition is true, optionally passing a timeout. To address this scenario, the event_poll_until macro was created and the job/channel/input modules were refactored to use it on their blocking functions. --- src/nvim/msgpack_rpc/channel.c | 6 +---- src/nvim/os/event.c | 6 ++--- src/nvim/os/event.h | 21 +++++++++++++++ src/nvim/os/input.c | 15 ++++++----- src/nvim/os/job.c | 59 ++++++++++++++++-------------------------- 5 files changed, 55 insertions(+), 52 deletions(-) (limited to 'src') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index d31e404c23..91c26ca21e 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -203,11 +203,7 @@ Object channel_send_call(uint64_t id, // Push the frame ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); - - do { - event_poll(-1); - } while (!frame.returned); - + event_poll_until(-1, frame.returned); (void)kv_pop(channel->call_stack); if (frame.errored) { diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 00920fc5cf..2dee529452 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -27,7 +27,7 @@ KLIST_INIT(Event, Event, _destroy_event) typedef struct { bool timed_out; - int32_t ms; + int ms; uv_timer_t *timer; } TimerData; @@ -66,7 +66,7 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms) +void event_poll(int ms) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -111,8 +111,6 @@ bool event_poll(int32_t ms) uv_close((uv_handle_t *)&timer_prepare, NULL); loop(UV_RUN_NOWAIT); } - - return !timer_data.timed_out && event_has_deferred(); } bool event_has_deferred(void) diff --git a/src/nvim/os/event.h b/src/nvim/os/event.h index 29e304adc8..f8139e978d 100644 --- a/src/nvim/os/event.h +++ b/src/nvim/os/event.h @@ -6,6 +6,27 @@ #include "nvim/os/event_defs.h" #include "nvim/os/job_defs.h" +#include "nvim/os/time.h" + +// Poll for events until a condition is true or a timeout has passed +#define event_poll_until(timeout, condition) \ + do { \ + int remaining = timeout; \ + uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ + while (!(condition)) { \ + event_poll(remaining); \ + if (remaining == 0) { \ + break; \ + } else if (remaining > 0) { \ + uint64_t now = os_hrtime(); \ + remaining -= (int) ((now - before) / 1000000); \ + before = now; \ + if (remaining <= 0) { \ + break; \ + } \ + } \ + } \ + } while (0) #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.h.generated.h" diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index d718bf95da..d9dae2b44e 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -163,13 +163,10 @@ void input_buffer_restore(String str) free(str.data); } -static bool input_poll(int32_t ms) +static bool input_poll(int ms) { - if (embedded_mode) { - return event_poll(ms); - } - - return input_ready() || event_poll(ms) || input_ready(); + event_poll_until(ms, input_ready()); + return input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c @@ -294,6 +291,10 @@ static int push_event_key(uint8_t *buf, int maxlen) // Check if there's pending input static bool input_ready(void) { - return rstream_pending(read_stream) > 0 || eof; + return typebuf_was_filled || // API call filled typeahead + event_has_deferred() || // Events must be processed + (!embedded_mode && ( + rstream_pending(read_stream) > 0 || // Stdin input + eof)); // Stdin closed } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 091da5d213..c18a83e817 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -12,7 +12,6 @@ #include "nvim/os/wstream_defs.h" #include "nvim/os/event.h" #include "nvim/os/event_defs.h" -#include "nvim/os/time.h" #include "nvim/os/shell.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -273,45 +272,33 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int old_mode = cur_tmode; settmode(TMODE_COOK); - // keep track of the elapsed time if ms > 0 - uint64_t before = (ms > 0) ? os_hrtime() : 0; - - while (1) { - // check if the job has exited (and the status is available). - if (job->pending_refs == 0) { - break; - } - - event_poll(ms); - - // we'll assume that a user frantically hitting interrupt doesn't like - // the current job. Signal that it has to be killed. - if (got_int) { - job_stop(job); - } - - if (ms == 0) { - break; - } - - // check if the poll timed out, if not, decrease the ms to wait for the - // next run - if (ms > 0) { - uint64_t now = os_hrtime(); - ms -= (int) ((now - before) / 1000000); - before = now; - - // if the time elapsed is greater than the `ms` wait time, break - if (ms <= 0) { - break; - } - } + // Increase pending_refs to stop the exit_cb from being called, which + // could result in the job being freed before we have a chance + // to get the status. + job->pending_refs++; + event_poll_until(ms, + // Until... + got_int || // interrupted by the user + job->pending_refs == 1); // job exited + job->pending_refs--; + + // we'll assume that a user frantically hitting interrupt doesn't like + // the current job. Signal that it has to be killed. + if (got_int) { + job_stop(job); + event_poll(0); } settmode(old_mode); - // return -1 for a timeout, the job status otherwise - return (job->pending_refs) ? -1 : (int) job->status; + if (!job->pending_refs) { + int status = (int) job->status; + job_exit_callback(job); + return status; + } + + // return -1 for a timeout + return -1; } /// Close the pipe used to write to the job. -- cgit From 72f028abcb167b2ca7e2d6d770af81a18ef58a0a Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 16:07:30 -0300 Subject: eval: Defer execution of JobActivity autocommands JobActivity autocommands run vimscript and must be executed on Nvim main loop. Since the previous commit removed automatic calls to `event_push` on RStream/Job callbacks, this adds it back, but in eval.c where job control is implemented. --- src/nvim/eval.c | 56 +++++++++++++++++++++++++++++++++++++++++----------- src/nvim/lib/klist.h | 2 ++ 2 files changed, 47 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index c8f0799d5a..3fc4104258 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -18,6 +18,8 @@ #include #include +#include "nvim/lib/klist.h" + #include "nvim/assert.h" #include "nvim/vim.h" #include "nvim/ascii.h" @@ -86,6 +88,7 @@ #include "nvim/api/vim.h" #include "nvim/os/dl.h" #include "nvim/os/provider.h" +#include "nvim/os/event.h" #define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */ @@ -443,6 +446,16 @@ static dictitem_T vimvars_var; /* variable used for v: */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ +// Memory pool for reusing JobEvent structures +typedef struct { + Job *job; + RStream *rstream; + char *type; +} JobEvent; +#define JobEventFreer(x) +KMEMPOOL_INIT(JobEventPool, JobEvent, JobEventFreer) +kmempool_t(JobEventPool) *job_event_pool = NULL; + /* * Initialize the global and v: variables. */ @@ -478,6 +491,7 @@ void eval_init(void) set_vim_var_nr(VV_HLSEARCH, 1L); set_reg_var(0); /* default for v:register is not 0 but '"' */ + job_event_pool = kmp_init(JobEventPool); } #if defined(EXITFREE) || defined(PROTO) @@ -19508,35 +19522,55 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags) return ret; } +// JobActivity autocommands will execute vimscript code, so it must be executed +// on Nvim main loop +#define push_job_event(j, r, t) \ + do { \ + JobEvent *event_data = kmp_alloc(JobEventPool, job_event_pool); \ + event_data->job = j; \ + event_data->rstream = r; \ + event_data->type = t; \ + event_push((Event) { \ + .handler = on_job_event, \ + .data = event_data \ + }); \ + } while(0) + static void on_job_stdout(RStream *rstream, void *data, bool eof) { if (!eof) { - on_job_data(rstream, data, eof, "stdout"); + push_job_event(data, rstream, "stdout"); } } static void on_job_stderr(RStream *rstream, void *data, bool eof) { if (!eof) { - on_job_data(rstream, data, eof, "stderr"); + push_job_event(data, rstream, "stderr"); } } static void on_job_exit(Job *job, void *data) { - apply_job_autocmds(job, data, "exit", NULL); - free(data); + push_job_event(data, NULL, "exit"); } -static void on_job_data(RStream *rstream, void *data, bool eof, char *type) +static void on_job_event(Event event) { - Job *job = data; - uint32_t read_count = rstream_pending(rstream); - char *str = xmalloc(read_count + 1); + JobEvent *data = event.data; + Job *job = data->job; + char *str = NULL; + + if (data->rstream) { + // Read event + size_t read_count = rstream_pending(data->rstream); + str = xmalloc(read_count + 1); - rstream_read(rstream, str, read_count); - str[read_count] = NUL; - apply_job_autocmds(job, job_data(job), type, str); + rstream_read(data->rstream, str, read_count); + str[read_count] = NUL; + } + apply_job_autocmds(job, job_data(job), data->type, str); + kmp_free(JobEventPool, job_event_pool, data); } static void apply_job_autocmds(Job *job, char *name, char *type, char *str) diff --git a/src/nvim/lib/klist.h b/src/nvim/lib/klist.h index e4a90fef33..f8dc7d4c43 100644 --- a/src/nvim/lib/klist.h +++ b/src/nvim/lib/klist.h @@ -39,6 +39,8 @@ static inline kmp_##name##_t *kmp_init_##name(void) { \ return xcalloc(1, sizeof(kmp_##name##_t)); \ } \ + static inline void kmp_destroy_##name(kmp_##name##_t *mp) \ + REAL_FATTR_UNUSED; \ static inline void kmp_destroy_##name(kmp_##name##_t *mp) { \ size_t k; \ for (k = 0; k < mp->n; ++k) { \ -- cgit From 72e3e57bf1aa128b02724e853365f65fd9451f0b Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 20:07:01 -0300 Subject: msgpack-rpc: Allow selective deferral API calls Since all API functions now run immediately after a msgpack-rpc request is parsed by libuv callbacks, a mechanism was added to override this behavior and allow certain functions to run in Nvim main loop. The mechanism is simple: Any API function tagged with the FUNC_ATTR_DEFERRED (a "dummy" attribute only used by msgpack-gen.lua) will be called when Nvim main loop receives a K_EVENT key. To implement this mechanism it was necessary some restructuration on the msgpack-rpc modules, especially in the msgpack-gen.lua script. --- src/nvim/func_attr.h | 1 + src/nvim/map.c | 3 +- src/nvim/map.h | 2 +- src/nvim/msgpack_rpc/channel.c | 99 +++++++++++++++++++++++++++++++++++++++--- src/nvim/msgpack_rpc/defs.h | 15 +++++-- src/nvim/msgpack_rpc/helpers.c | 44 +++---------------- 6 files changed, 114 insertions(+), 50 deletions(-) (limited to 'src') diff --git a/src/nvim/func_attr.h b/src/nvim/func_attr.h index c75d0ab312..519f61c763 100644 --- a/src/nvim/func_attr.h +++ b/src/nvim/func_attr.h @@ -179,6 +179,7 @@ #endif #ifdef DEFINE_FUNC_ATTRIBUTES + #define FUNC_ATTR_DEFERRED #define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC #define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x) #define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y) diff --git a/src/nvim/map.c b/src/nvim/map.c index 24a869e2e6..3f485cb952 100644 --- a/src/nvim/map.c +++ b/src/nvim/map.c @@ -108,4 +108,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER) MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER) -MAP_IMPL(String, rpc_method_handler_fn, DEFAULT_INITIALIZER) +#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false} +MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER) diff --git a/src/nvim/map.h b/src/nvim/map.h index 78f4218a72..5ade6dcf15 100644 --- a/src/nvim/map.h +++ b/src/nvim/map.h @@ -25,7 +25,7 @@ MAP_DECLS(cstr_t, uint64_t) MAP_DECLS(cstr_t, ptr_t) MAP_DECLS(ptr_t, ptr_t) MAP_DECLS(uint64_t, ptr_t) -MAP_DECLS(String, rpc_method_handler_fn) +MAP_DECLS(String, MsgpackRpcRequestHandler) #define map_new(T, U) map_##T##_##U##_new #define map_free(T, U) map_##T##_##U##_free diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 91c26ca21e..6ddda10c5f 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -5,6 +5,8 @@ #include #include +#include "nvim/lib/klist.h" + #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/msgpack_rpc/channel.h" @@ -52,6 +54,17 @@ typedef struct { kvec_t(ChannelCallFrame *) call_stack; } Channel; +typedef struct { + Channel *channel; + MsgpackRpcRequestHandler handler; + Array args; + uint64_t request_id; +} RequestEvent; + +#define RequestEventFreer(x) +KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer) +kmempool_t(RequestEventPool) *request_event_pool = NULL; + static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; @@ -64,6 +77,7 @@ static msgpack_sbuffer out_buffer; /// Initializes the module void channel_init(void) { + request_event_pool = kmp_init(RequestEventPool); channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); @@ -352,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - // Perform the call - WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); - // write the response - if (!channel_write(channel, resp)) { - goto end; - } + handle_request(channel, &unpacked.data); } if (result == MSGPACK_UNPACK_NOMEM_ERROR) { @@ -387,6 +396,84 @@ end: } } +static void handle_request(Channel *channel, msgpack_object *request) + FUNC_ATTR_NONNULL_ALL +{ + uint64_t request_id; + Error error = ERROR_INIT; + msgpack_rpc_validate(&request_id, request, &error); + + if (error.set) { + // Validation failed, send response with error + channel_write(channel, + serialize_response(request_id, &error, NIL, &out_buffer)); + return; + } + + // Retrieve the request handler + MsgpackRpcRequestHandler handler; + msgpack_object method = request->via.array.ptr[2]; + + if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) { + handler = msgpack_rpc_get_handler_for(method.via.bin.ptr, + method.via.bin.size); + } else { + handler.fn = msgpack_rpc_handle_missing_method; + handler.defer = false; + } + + Array args; + msgpack_rpc_to_array(request->via.array.ptr + 3, &args); + + if (kv_size(channel->call_stack) || !handler.defer) { + call_request_handler(channel, handler, args, request_id); + return; + } + + // Defer calling the request handler. + RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool); + event_data->channel = channel; + event_data->handler = handler; + event_data->args = args; + event_data->request_id = request_id; + event_push((Event) { + .handler = on_request_event, + .data = event_data + }); +} + +static void on_request_event(Event event) +{ + RequestEvent *e = event.data; + call_request_handler(e->channel, e->handler, e->args, e->request_id); + kmp_free(RequestEventPool, request_event_pool, e); +} + +static void call_request_handler(Channel *channel, + MsgpackRpcRequestHandler handler, + Array args, + uint64_t request_id) +{ + Error error = ERROR_INIT; + Object result = handler.fn(channel->id, request_id, args, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); + + if (error.set) { + ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", + error.msg, + request_id); + channel_write(channel, + serialize_response(request_id, &error, NIL, &out_buffer)); + } + + DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", + request_id); + channel_write(channel, + serialize_response(request_id, &error, result, &out_buffer)); +} + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h index 5eec4ced54..13067fb7b4 100644 --- a/src/nvim/msgpack_rpc/defs.h +++ b/src/nvim/msgpack_rpc/defs.h @@ -6,9 +6,15 @@ /// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores /// functions of this type. -typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, - msgpack_object *req, - Error *error); +typedef struct { + Object (*fn)(uint64_t channel_id, + uint64_t request_id, + Array args, + Error *error); + bool defer; // Should the call be deferred to the main loop? This should + // be true if the function mutates editor data structures such + // as buffers, windows, tabs, or if it executes vimscript code. +} MsgpackRpcRequestHandler; /// Initializes the msgpack-rpc method table void msgpack_rpc_init_method_table(void); @@ -31,4 +37,7 @@ Object msgpack_rpc_dispatch(uint64_t channel_id, Error *error) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); +MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name, + size_t name_len) + FUNC_ATTR_NONNULL_ARG(1); #endif // NVIM_MSGPACK_RPC_DEFS_H diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 4b96e4985e..6be221b912 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -140,10 +140,13 @@ bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) case MSGPACK_OBJECT_EXT: switch (obj->via.ext.type) { case kObjectTypeBuffer: + arg->type = kObjectTypeBuffer; return msgpack_rpc_to_buffer(obj, &arg->data.buffer); case kObjectTypeWindow: + arg->type = kObjectTypeWindow; return msgpack_rpc_to_window(obj, &arg->data.window); case kObjectTypeTabpage: + arg->type = kObjectTypeTabpage; return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); } default: @@ -292,44 +295,6 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) } } -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param channel_id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -WBuffer *msgpack_rpc_call(uint64_t channel_id, - msgpack_object *req, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2) - FUNC_ATTR_NONNULL_ARG(3) -{ - uint64_t response_id; - Error error = ERROR_INIT; - msgpack_rpc_validate(&response_id, req, &error); - - if (error.set) { - return serialize_response(response_id, &error, NIL, sbuffer); - } - - // dispatch the call - Object rv = msgpack_rpc_dispatch(channel_id, req, &error); - // send the response - msgpack_packer response; - msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); - - if (error.set) { - ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", - error.msg, - response_id); - return serialize_response(response_id, &error, NIL, sbuffer); - } - - DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", - response_id); - return serialize_response(response_id, &error, rv, sbuffer); -} - /// Finishes the msgpack-rpc call with an error message. /// /// @param msg The error message @@ -348,7 +313,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res) /// Handler executed when an invalid method name is passed Object msgpack_rpc_handle_missing_method(uint64_t channel_id, - msgpack_object *req, + uint64_t request_id, + Array args, Error *error) { snprintf(error->msg, sizeof(error->msg), "Invalid method name"); -- cgit From cf9571b7b144f37b61ceaf3b17e84806913fd969 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 20:21:32 -0300 Subject: api: Add FUNC_ATTR_DEFERRED attribute to a number of functions Any function that can directly mutate the screen or execute vimscript had the attribute applied. --- src/nvim/api/buffer.c | 7 +++++++ src/nvim/api/tabpage.c | 1 + src/nvim/api/vim.c | 20 ++++++++++++-------- src/nvim/api/window.c | 5 +++++ 4 files changed, 25 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c index 4ff5845bd4..982003a31a 100644 --- a/src/nvim/api/buffer.c +++ b/src/nvim/api/buffer.c @@ -69,6 +69,7 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err) /// @param line The new line. /// @param[out] err Details of an error that may have occurred void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) + FUNC_ATTR_DEFERRED { Object l = STRING_OBJ(line); Array array = {.items = &l, .size = 1}; @@ -81,6 +82,7 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) /// @param index The line index /// @param[out] err Details of an error that may have occurred void buffer_del_line(Buffer buffer, Integer index, Error *err) + FUNC_ATTR_DEFERRED { Array array = ARRAY_DICT_INIT; buffer_set_line_slice(buffer, index, index, true, true, array, err); @@ -163,6 +165,7 @@ void buffer_set_line_slice(Buffer buffer, Boolean include_end, ArrayOf(String) replacement, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -314,6 +317,7 @@ Object buffer_get_var(Buffer buffer, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object buffer_set_var(Buffer buffer, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -349,6 +353,7 @@ Object buffer_get_option(Buffer buffer, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void buffer_set_option(Buffer buffer, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -399,6 +404,7 @@ String buffer_get_name(Buffer buffer, Error *err) /// @param name The buffer name /// @param[out] err Details of an error that may have occurred void buffer_set_name(Buffer buffer, String name, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -444,6 +450,7 @@ void buffer_insert(Buffer buffer, Integer lnum, ArrayOf(String) lines, Error *err) + FUNC_ATTR_DEFERRED { buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err); } diff --git a/src/nvim/api/tabpage.c b/src/nvim/api/tabpage.c index 3e5d00671a..cb06825731 100644 --- a/src/nvim/api/tabpage.c +++ b/src/nvim/api/tabpage.c @@ -62,6 +62,7 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The tab page handle Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { tabpage_T *tab = find_tab_by_handle(tabpage, err); diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index c7b5b1cfbf..c90e7039ce 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -31,19 +31,12 @@ # include "api/vim.c.generated.h" #endif -/// Send keys to vim input buffer, simulating user input. -/// -/// @param str The keys to send -void vim_push_keys(String str) -{ - abort(); -} - /// Executes an ex-mode command str /// /// @param str The command str /// @param[out] err Details of an error that may have occurred void vim_command(String str, Error *err) + FUNC_ATTR_DEFERRED { // Run the command try_start(); @@ -111,6 +104,7 @@ String vim_replace_termcodes(String str, Boolean from_part, Boolean do_lt, /// @param[out] err Details of an error that may have occurred /// @return The expanded object Object vim_eval(String str, Error *err) + FUNC_ATTR_DEFERRED { Object rv; // Evaluate the expression @@ -230,6 +224,7 @@ String vim_get_current_line(Error *err) /// @param line The line contents /// @param[out] err Details of an error that may have occurred void vim_set_current_line(String line, Error *err) + FUNC_ATTR_DEFERRED { buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err); } @@ -238,6 +233,7 @@ void vim_set_current_line(String line, Error *err) /// /// @param[out] err Details of an error that may have occurred void vim_del_current_line(Error *err) + FUNC_ATTR_DEFERRED { buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err); } @@ -259,6 +255,7 @@ Object vim_get_var(String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return the old value if any Object vim_set_var(String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { return dict_set_value(&globvardict, name, value, err); } @@ -289,6 +286,7 @@ Object vim_get_option(String name, Error *err) /// @param value The new option value /// @param[out] err Details of an error that may have occurred void vim_set_option(String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { set_option_to(NULL, SREQ_GLOBAL, name, value, err); } @@ -297,6 +295,7 @@ void vim_set_option(String name, Object value, Error *err) /// /// @param str The message void vim_out_write(String str) + FUNC_ATTR_DEFERRED { write_msg(str, false); } @@ -305,6 +304,7 @@ void vim_out_write(String str) /// /// @param str The message void vim_err_write(String str) + FUNC_ATTR_DEFERRED { write_msg(str, true); } @@ -314,6 +314,7 @@ void vim_err_write(String str) /// /// @param str The message void vim_report_error(String str) + FUNC_ATTR_DEFERRED { vim_err_write(str); vim_err_write((String) {.data = "\n", .size = 1}); @@ -357,6 +358,7 @@ Buffer vim_get_current_buffer(void) /// @param id The buffer handle /// @param[out] err Details of an error that may have occurred void vim_set_current_buffer(Buffer buffer, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -407,6 +409,7 @@ Window vim_get_current_window(void) /// /// @param handle The window handle void vim_set_current_window(Window window, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -462,6 +465,7 @@ Tabpage vim_get_current_tabpage(void) /// @param handle The tab page handle /// @param[out] err Details of an error that may have occurred void vim_set_current_tabpage(Tabpage tabpage, Error *err) + FUNC_ATTR_DEFERRED { tabpage_T *tp = find_tab_by_handle(tabpage, err); diff --git a/src/nvim/api/window.c b/src/nvim/api/window.c index 751518424b..fde1ebfa4c 100644 --- a/src/nvim/api/window.c +++ b/src/nvim/api/window.c @@ -52,6 +52,7 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err) /// @param pos the (row, col) tuple representing the new position /// @param[out] err Details of an error that may have occurred void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -111,6 +112,7 @@ Integer window_get_height(Window window, Error *err) /// @param height the new height in rows /// @param[out] err Details of an error that may have occurred void window_set_height(Window window, Integer height, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -154,6 +156,7 @@ Integer window_get_width(Window window, Error *err) /// @param width the new width in columns /// @param[out] err Details of an error that may have occurred void window_set_width(Window window, Integer width, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -199,6 +202,7 @@ Object window_get_var(Window window, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object window_set_var(Window window, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -234,6 +238,7 @@ Object window_get_option(Window window, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void window_set_option(Window window, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); -- cgit From 79b7263f793206167260fcbc99bd76f73bfeb2c7 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Tue, 21 Oct 2014 08:53:55 -0300 Subject: compilation: Add -Wconversion to more files and validate CONV_SOURCES All files under the os, api and msgpack_rpc directories have -Wconversion automatically applied. CONV_SOURCES is also checked for missing files(when renaming, for example) --- src/nvim/CMakeLists.txt | 36 ++++++++++++++---------------------- src/nvim/msgpack_rpc/channel.c | 2 +- src/nvim/msgpack_rpc/server.c | 3 ++- src/nvim/os/input.c | 14 ++++++++------ src/nvim/os/shell.c | 28 +++++++++++++++------------- src/nvim/os/time.c | 38 +++++++++++++++++++++----------------- 6 files changed, 61 insertions(+), 60 deletions(-) (limited to 'src') diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index 1cedeebb37..a77e5e27a0 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -38,8 +38,7 @@ endforeach() list(REMOVE_ITEM NEOVIM_SOURCES ${to_remove}) -set(CONV_SRCS - api.c +set(CONV_SOURCES arabic.c cursor.c garray.c @@ -48,31 +47,24 @@ set(CONV_SRCS map.c memory.c misc2.c - map.c profile.c - os/env.c - os/event.c - os/job.c - os/mem.c - os/rstream.c - os/signal.c - os/users.c - os/provider.c - os/uv_helpers.c - os/wstream.c - os/msgpack_rpc.c tempfile.c - api/buffer.c - api/private/helpers.c - api/private/handle.c - api/tabpage.c - api/window.c - api/vim.h - api/vim.c ) +foreach(sfile ${CONV_SOURCES}) + if(NOT EXISTS "${PROJECT_SOURCE_DIR}/src/nvim/${sfile}") + message(FATAL_ERROR "${sfile} doesn't exist(it was added to CONV_SOURCES)") + endif() +endforeach() + +file(GLOB_RECURSE EXTRA_CONV_SOURCES os/*.c api/*.c msgpack_rpc/*.c) +foreach(sfile ${EXTRA_CONV_SOURCES}) + file(RELATIVE_PATH f "${PROJECT_SOURCE_DIR}/src/nvim" "${sfile}") + list(APPEND CONV_SOURCES ${f}) +endforeach() + set_source_files_properties( - ${CONV_SRCS} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion") + ${CONV_SOURCES} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion") if(CMAKE_C_COMPILER_ID MATCHES "Clang") if(DEFINED ENV{SANITIZE}) diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6ddda10c5f..a1ab12f7c3 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -331,7 +331,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - uint32_t count = rstream_pending(rstream); + size_t count = rstream_pending(rstream); DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", count, rstream); diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index 33e01fe562..087ba24111 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -119,7 +119,8 @@ int server_start(const char *endpoint) ip_end = strchr(addr, NUL); } - uint32_t addr_len = ip_end - addr; + // (ip_end - addr) is always > 0, so convert to size_t + size_t addr_len = (size_t)(ip_end - addr); if (addr_len > sizeof(ip) - 1) { // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index d9dae2b44e..cc693b9f1b 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -72,7 +72,7 @@ void input_stop(void) } // Low level input function. -int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) +int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) { InbufPollResult result; @@ -86,7 +86,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) return 0; } } else { - if ((result = inbuf_poll(p_ut)) == kInputNone) { + if ((result = inbuf_poll((int)p_ut)) == kInputNone) { if (trigger_cursorhold() && maxlen >= 3 && !typebuf_changed(tb_change_cnt)) { buf[0] = K_SPECIAL; @@ -116,7 +116,9 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) } convert_input(); - return rbuffer_read(input_buffer, (char *)buf, maxlen); + // Safe to convert rbuffer_read to int, it will never overflow since + // we use relatively small buffers. + return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); } // Check if a character is available for reading @@ -170,7 +172,7 @@ static bool input_poll(int ms) } // This is a replacement for the old `WaitForChar` function in os_unix.c -static InbufPollResult inbuf_poll(int32_t ms) +static InbufPollResult inbuf_poll(int ms) { if (typebuf_was_filled || rbuffer_pending(input_buffer)) { return kInputAvail; @@ -260,9 +262,9 @@ static void convert_input(void) char *inbuf = rbuffer_read_ptr(input_buffer); size_t count = rbuffer_pending(input_buffer), consume_count = 0; - for (int i = count - 1; i >= 0; i--) { + for (int i = (int)count - 1; i >= 0; i--) { if (inbuf[i] == 3) { - consume_count = i + 1; + consume_count = (size_t)(i + 1); break; } } diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 453cc6d605..a127597e52 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -1,4 +1,5 @@ #include +#include #include #include @@ -58,11 +59,11 @@ typedef struct { /// `shell_free_argv` when no longer needed. char **shell_build_argv(const char_u *cmd, const char_u *extra_shell_opt) { - int argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL); + size_t argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL); char **rv = xmalloc((unsigned)((argc + 4) * sizeof(char *))); // Split 'shell' - int i = tokenize(p_sh, rv); + size_t i = tokenize(p_sh, rv); if (extra_shell_opt != NULL) { // Push a copy of `extra_shell_opt` @@ -356,9 +357,9 @@ static void system_data_cb(RStream *rstream, void *data, bool eof) /// @param argv The vector that will be filled with copies of the parsed /// words. It can be NULL if the caller only needs to count words. /// @return The number of words parsed. -static int tokenize(const char_u *str, char **argv) +static size_t tokenize(const char_u *str, char **argv) { - int argc = 0, len; + size_t argc = 0, len; char_u *p = (char_u *) str; while (*p != NUL) { @@ -383,11 +384,11 @@ static int tokenize(const char_u *str, char **argv) /// /// @param str A pointer to the first character of the word /// @return The offset from `str` at which the word ends. -static int word_length(const char_u *str) +static size_t word_length(const char_u *str) { const char_u *p = str; bool inquote = false; - int length = 0; + size_t length = 0; // Move `p` to the end of shell word by advancing the pointer while it's // inside a quote or it's a non-whitespace character @@ -418,15 +419,15 @@ static void write_selection(uv_write_t *req) // TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and // only after filled we should start allocating memory(skip unnecessary // allocations for small writes) - int buflen = BUFFER_LENGTH; + size_t buflen = BUFFER_LENGTH; pdata->wbuffer = (char *)xmalloc(buflen); uv_buf_t uvbuf; linenr_T lnum = curbuf->b_op_start.lnum; - int off = 0; - int written = 0; + size_t off = 0; + size_t written = 0; char_u *lp = ml_get(lnum); - int l; - int len; + size_t l; + size_t len; for (;;) { l = strlen((char *)lp + written); @@ -443,7 +444,7 @@ static void write_selection(uv_write_t *req) pdata->wbuffer[off++] = NUL; } else { char_u *s = vim_strchr(lp + written, NL); - len = s == NULL ? l : s - (lp + written); + len = s == NULL ? l : (size_t)(s - (lp + written)); while (off + len >= buflen) { // Resize the buffer buflen *= 2; @@ -584,6 +585,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { ProcessData *data = (ProcessData *)proc->data; data->exited++; - data->exit_status = status; + assert(status <= INT_MAX); + data->exit_status = (int)status; uv_close((uv_handle_t *)proc, NULL); } diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index e3b76ac833..a4871ef499 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -64,23 +65,6 @@ void os_microdelay(uint64_t microseconds, bool ignoreinput) } } -static void microdelay(uint64_t microseconds) -{ - uint64_t hrtime; - int64_t ns = microseconds * 1000; // convert to nanoseconds - - uv_mutex_lock(&delay_mutex); - - while (ns > 0) { - hrtime = uv_hrtime(); - if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns) == UV_ETIMEDOUT) - break; - ns -= uv_hrtime() - hrtime; - } - - uv_mutex_unlock(&delay_mutex); -} - /// Portable version of POSIX localtime_r() /// /// @return NULL in case of error @@ -112,3 +96,23 @@ struct tm *os_get_localtime(struct tm *result) FUNC_ATTR_NONNULL_ALL time_t rawtime = time(NULL); return os_localtime_r(&rawtime, result); } + +static void microdelay(uint64_t microseconds) +{ + uint64_t elapsed = 0; + uint64_t ns = microseconds * 1000; // convert to nanoseconds + uint64_t base = uv_hrtime(); + + uv_mutex_lock(&delay_mutex); + + while (elapsed < ns) { + if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns - elapsed) + == UV_ETIMEDOUT) + break; + uint64_t now = uv_hrtime(); + elapsed += now - base; + base = now; + } + + uv_mutex_unlock(&delay_mutex); +} -- cgit From b6c9883169202c2b8ae4ca82a6e70ff3e8cd396f Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Tue, 21 Oct 2014 12:27:25 -0300 Subject: event: Remove direct calls to `uv_run` from job.c/shell.c --- src/nvim/os/job.c | 8 ++++---- src/nvim/os/shell.c | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index c18a83e817..4c01829159 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -97,7 +97,7 @@ void job_teardown(void) // their status with `wait` or handling SIGCHLD. libuv does that // automatically (and then calls `exit_cb`) but we have to give it a chance // by running the loop one more time - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); // Prepare to start shooting for (i = 0; i < MAX_RUNNING_JOBS; i++) { @@ -107,8 +107,8 @@ void job_teardown(void) while (job && is_alive(job) && remaining_tries--) { os_delay(50, 0); // Acknowledge child exits - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - // It's possible that the uv_run call removed the job from the table, + event_poll(0); + // It's possible that the event_poll call removed the job from the table, // reset 'job' so the next iteration won't run in that case. job = table[i]; } @@ -118,7 +118,7 @@ void job_teardown(void) } } // Last run to ensure all children were removed - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); } /// Tries to start a new job. diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index a127597e52..d5464f7975 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -8,6 +8,7 @@ #include "nvim/ascii.h" #include "nvim/lib/kvec.h" #include "nvim/log.h" +#include "nvim/os/event.h" #include "nvim/os/job.h" #include "nvim/os/rstream.h" #include "nvim/os/shell.h" @@ -213,7 +214,7 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg) // Keep running the loop until all three handles are completely closed while (pdata.exited < expected_exits) { - uv_run(uv_default_loop(), UV_RUN_ONCE); + event_poll(0); if (got_int) { // Forward SIGINT to the shell -- cgit From b49460f930edc52be80522fa5da3d5b6a1260629 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Tue, 21 Oct 2014 21:20:31 -0300 Subject: input: Don't remove Ctrl+C from the input_buffer --- src/nvim/os/input.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index cc693b9f1b..b7eba47d5e 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -264,7 +264,7 @@ static void convert_input(void) for (int i = (int)count - 1; i >= 0; i--) { if (inbuf[i] == 3) { - consume_count = (size_t)(i + 1); + consume_count = (size_t)i; break; } } -- cgit