From b280308ac649da61e2a0f40a222eae21af5352c9 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 07:35:10 -0300 Subject: msgpack-rpc: Create subdirectory for msgpack-rpc modules Create the msgpack_rpc subdirectory and move all modules that deal with msgpack-rpc to it. Also merge msgpack_rpc.c into msgpack_rpc/helpers.c --- src/nvim/CMakeLists.txt | 8 +- src/nvim/api/vim.c | 2 +- src/nvim/eval.c | 2 +- src/nvim/main.c | 2 +- src/nvim/map.c | 2 +- src/nvim/map.h | 2 +- src/nvim/msgpack_rpc/channel.c | 597 +++++++++++++++++++++++++++++++++++++ src/nvim/msgpack_rpc/channel.h | 15 + src/nvim/msgpack_rpc/defs.h | 34 +++ src/nvim/msgpack_rpc/helpers.c | 463 +++++++++++++++++++++++++++++ src/nvim/msgpack_rpc/helpers.h | 17 ++ src/nvim/msgpack_rpc/server.c | 273 +++++++++++++++++ src/nvim/msgpack_rpc/server.h | 7 + src/nvim/os/channel.c | 598 -------------------------------------- src/nvim/os/channel.h | 15 - src/nvim/os/event.c | 12 +- src/nvim/os/msgpack_rpc.c | 188 ------------ src/nvim/os/msgpack_rpc.h | 51 ---- src/nvim/os/msgpack_rpc_helpers.c | 289 ------------------ src/nvim/os/msgpack_rpc_helpers.h | 16 - src/nvim/os/provider.c | 2 +- src/nvim/os/server.c | 273 ----------------- src/nvim/os/server.h | 7 - src/nvim/os/server_defs.h | 7 - src/nvim/os_unix.c | 6 +- 25 files changed, 1427 insertions(+), 1461 deletions(-) create mode 100644 src/nvim/msgpack_rpc/channel.c create mode 100644 src/nvim/msgpack_rpc/channel.h create mode 100644 src/nvim/msgpack_rpc/defs.h create mode 100644 src/nvim/msgpack_rpc/helpers.c create mode 100644 src/nvim/msgpack_rpc/helpers.h create mode 100644 src/nvim/msgpack_rpc/server.c create mode 100644 src/nvim/msgpack_rpc/server.h delete mode 100644 src/nvim/os/channel.c delete mode 100644 src/nvim/os/channel.h delete mode 100644 src/nvim/os/msgpack_rpc.c delete mode 100644 src/nvim/os/msgpack_rpc.h delete mode 100644 src/nvim/os/msgpack_rpc_helpers.c delete mode 100644 src/nvim/os/msgpack_rpc_helpers.h delete mode 100644 src/nvim/os/server.c delete mode 100644 src/nvim/os/server.h delete mode 100644 src/nvim/os/server_defs.h (limited to 'src') diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index 208df31596..1cedeebb37 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -3,7 +3,7 @@ include(CheckLibraryExists) set(GENERATED_DIR ${PROJECT_BINARY_DIR}/src/nvim/auto) set(DISPATCH_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/msgpack-gen.lua) file(GLOB API_HEADERS api/*.h) -set(MSGPACK_RPC_HEADER ${PROJECT_SOURCE_DIR}/src/nvim/os/msgpack_rpc.h) +file(GLOB MSGPACK_RPC_HEADERS msgpack_rpc/*.h) set(MSGPACK_DISPATCH ${GENERATED_DIR}/msgpack_dispatch.c) set(HEADER_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/gendeclarations.lua) set(GENERATED_INCLUDES_DIR ${PROJECT_BINARY_DIR}/include) @@ -19,12 +19,14 @@ file(MAKE_DIRECTORY ${GENERATED_DIR}) file(MAKE_DIRECTORY ${GENERATED_DIR}/os) file(MAKE_DIRECTORY ${GENERATED_DIR}/api) file(MAKE_DIRECTORY ${GENERATED_DIR}/api/private) +file(MAKE_DIRECTORY ${GENERATED_DIR}/msgpack_rpc) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/os) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api/private) +file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/msgpack_rpc) -file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c) +file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c msgpack_rpc/*.c) file(GLOB_RECURSE NEOVIM_HEADERS *.h) foreach(sfile ${NEOVIM_SOURCES}) @@ -126,7 +128,7 @@ add_custom_command(OUTPUT ${MSGPACK_DISPATCH} COMMAND ${LUA_PRG} ${DISPATCH_GENERATOR} ${API_HEADERS} ${MSGPACK_DISPATCH} DEPENDS ${API_HEADERS} - ${MSGPACK_RPC_HEADER} + ${MSGPACK_RPC_HEADERS} ${DISPATCH_GENERATOR} ) diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index 5e0f3e0c32..c7b5b1cfbf 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -10,7 +10,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/buffer.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/provider.h" #include "nvim/vim.h" #include "nvim/buffer.h" diff --git a/src/nvim/eval.c b/src/nvim/eval.c index e3bd37a03f..c8f0799d5a 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -81,7 +81,7 @@ #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/time.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/os/dl.h" diff --git a/src/nvim/main.c b/src/nvim/main.c index 128d1a784c..a63ffb4a31 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -59,7 +59,7 @@ #include "nvim/os/input.h" #include "nvim/os/os.h" #include "nvim/os/signal.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/private/helpers.h" diff --git a/src/nvim/map.c b/src/nvim/map.c index 24aa38d67d..24a869e2e6 100644 --- a/src/nvim/map.c +++ b/src/nvim/map.c @@ -6,7 +6,7 @@ #include "nvim/map_defs.h" #include "nvim/vim.h" #include "nvim/memory.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/msgpack_rpc/defs.h" #include "nvim/lib/khash.h" diff --git a/src/nvim/map.h b/src/nvim/map.h index 616516c3e1..78f4218a72 100644 --- a/src/nvim/map.h +++ b/src/nvim/map.h @@ -5,7 +5,7 @@ #include "nvim/map_defs.h" #include "nvim/api/private/defs.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/msgpack_rpc/defs.h" #define MAP_DECLS(T, U) \ KHASH_DECLARE(T##_##U##_map, T, U) \ diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c new file mode 100644 index 0000000000..dcd7e41737 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel.c @@ -0,0 +1,597 @@ +#include +#include +#include + +#include +#include + +#include "nvim/api/private/helpers.h" +#include "nvim/api/vim.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/os/event.h" +#include "nvim/os/rstream.h" +#include "nvim/os/rstream_defs.h" +#include "nvim/os/wstream.h" +#include "nvim/os/wstream_defs.h" +#include "nvim/os/job.h" +#include "nvim/os/job_defs.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/vim.h" +#include "nvim/ascii.h" +#include "nvim/memory.h" +#include "nvim/os_unix.h" +#include "nvim/message.h" +#include "nvim/term.h" +#include "nvim/map.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/lib/kvec.h" + +#define CHANNEL_BUFFER_SIZE 0xffff + +typedef struct { + uint64_t request_id; + bool errored; + Object result; +} ChannelCallFrame; + +typedef struct { + uint64_t id; + PMap(cstr_t) *subscribed_events; + bool is_job, enabled; + msgpack_unpacker *unpacker; + union { + Job *job; + struct { + RStream *read; + WStream *write; + uv_stream_t *uv; + } streams; + } data; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; + size_t rpc_call_level; +} Channel; + +static uint64_t next_id = 1; +static PMap(uint64_t) *channels = NULL; +static PMap(cstr_t) *event_strings = NULL; +static msgpack_sbuffer out_buffer; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/channel.c.generated.h" +#endif + +/// Initializes the module +void channel_init(void) +{ + channels = pmap_new(uint64_t)(); + event_strings = pmap_new(cstr_t)(); + msgpack_sbuffer_init(&out_buffer); + + if (embedded_mode) { + channel_from_stdio(); + } +} + +/// Teardown the module +void channel_teardown(void) +{ + if (!channels) { + return; + } + + Channel *channel; + + map_foreach_value(channels, channel, { + close_channel(channel); + }); +} + +/// Creates an API channel by starting a job and connecting to its +/// stdin/stdout. stderr is forwarded to the editor error stream. +/// +/// @param argv The argument vector for the process +/// @return The channel id +uint64_t channel_from_job(char **argv) +{ + Channel *channel = register_channel(); + channel->is_job = true; + + int status; + channel->data.job = job_start(argv, + channel, + job_out, + job_err, + NULL, + 0, + &status); + + if (status <= 0) { + close_channel(channel); + return 0; + } + + return channel->id; +} + +/// Creates an API channel from a libuv stream representing a tcp or +/// pipe/socket client connection +/// +/// @param stream The established connection +void channel_from_stream(uv_stream_t *stream) +{ + Channel *channel = register_channel(); + stream->data = NULL; + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(parse_msgpack, + rbuffer_new(CHANNEL_BUFFER_SIZE), + channel, + NULL); + rstream_set_stream(channel->data.streams.read, stream); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(0); + wstream_set_stream(channel->data.streams.write, stream); + channel->data.streams.uv = stream; +} + +bool channel_exists(uint64_t id) +{ + Channel *channel; + return (channel = pmap_get(uint64_t)(channels, id)) != NULL + && channel->enabled; +} + +/// Sends event/arguments to channel +/// +/// @param id The channel id. If 0, the event will be sent to all +/// channels that have subscribed to the event type +/// @param name The event name, an arbitrary string +/// @param args Array with event arguments +/// @return True if the event was sent successfully, false otherwise. +bool channel_send_event(uint64_t id, char *name, Array args) +{ + Channel *channel = NULL; + + if (id > 0) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + api_free_array(args); + return false; + } + send_event(channel, name, args); + } else { + broadcast_event(name, args); + } + + return true; +} + +/// Sends a method call to a channel +/// +/// @param id The channel id +/// @param method_name The method name, an arbitrary string +/// @param args Array with method arguments +/// @param[out] error True if the return value is an error +/// @return Whatever the remote method returned +Object channel_send_call(uint64_t id, + char *method_name, + Array args, + Error *err) +{ + Channel *channel = NULL; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); + api_free_array(args); + return NIL; + } + + if (kv_size(channel->call_stack) > 20) { + // 20 stack depth is more than anyone should ever need for RPC calls + api_set_error(err, + Exception, + _("Channel %" PRIu64 " crossed maximum stack depth"), + channel->id); + api_free_array(args); + return NIL; + } + + uint64_t request_id = channel->next_request_id++; + // Send the msgpack-rpc request + send_request(channel, request_id, method_name, args); + + EventSource channel_source = channel->is_job + ? job_event_source(channel->data.job) + : rstream_event_source(channel->data.streams.read); + EventSource sources[] = {channel_source, NULL}; + + // Push the frame + ChannelCallFrame frame = {request_id, false, NIL}; + kv_push(ChannelCallFrame *, channel->call_stack, &frame); + size_t size = kv_size(channel->call_stack); + + do { + event_poll(-1, sources); + } while ( + // Continue running if ... + channel->enabled && // the channel is still enabled + kv_size(channel->call_stack) >= size); // the call didn't return + + if (frame.errored) { + api_set_error(err, Exception, "%s", frame.result.data.string.data); + return NIL; + } + + return frame.result; +} + +/// Subscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_subscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + abort(); + } + + char *event_string = pmap_get(cstr_t)(event_strings, event); + + if (!event_string) { + event_string = xstrdup(event); + pmap_put(cstr_t)(event_strings, event_string, event_string); + } + + pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); +} + +/// Unsubscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_unsubscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + abort(); + } + + unsubscribe(channel, event); +} + +/// Closes a channel +/// +/// @param id The channel id +/// @return true if successful, false otherwise +bool channel_close(uint64_t id) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + return false; + } + + channel_kill(channel); + channel->enabled = false; + return true; +} + +/// Creates an API channel from stdin/stdout. This is used when embedding +/// Neovim +static void channel_from_stdio(void) +{ + Channel *channel = register_channel(); + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(parse_msgpack, + rbuffer_new(CHANNEL_BUFFER_SIZE), + channel, + NULL); + rstream_set_file(channel->data.streams.read, 0); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(0); + wstream_set_file(channel->data.streams.write, 1); + channel->data.streams.uv = NULL; +} + +static void job_out(RStream *rstream, void *data, bool eof) +{ + Job *job = data; + parse_msgpack(rstream, job_data(job), eof); +} + +static void job_err(RStream *rstream, void *data, bool eof) +{ + size_t count; + char buf[256]; + Channel *channel = job_data(data); + + while ((count = rstream_pending(rstream))) { + size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); + buf[read] = NUL; + ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); + } +} + +static void parse_msgpack(RStream *rstream, void *data, bool eof) +{ + Channel *channel = data; + channel->rpc_call_level++; + + if (eof) { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed by the client", + channel->id); + call_set_error(channel, buf); + goto end; + } + + uint32_t count = rstream_pending(rstream); + DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + count, + rstream); + + // Feed the unpacker with data + msgpack_unpacker_reserve_buffer(channel->unpacker, count); + rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); + msgpack_unpacker_buffer_consumed(channel->unpacker, count); + + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + msgpack_unpack_return result; + + // Deserialize everything we can. + while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { + if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + if (is_valid_rpc_response(&unpacked.data, channel)) { + call_stack_pop(&unpacked.data, channel); + } else { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " returned a response that doesn't have " + " a matching id for the current RPC call. Ensure the client " + " is properly synchronized", + channel->id); + call_set_error(channel, buf); + } + msgpack_unpacked_destroy(&unpacked); + // Bail out from this event loop iteration + goto end; + } + + // Perform the call + WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); + // write the response + if (!channel_write(channel, resp)) { + goto end; + } + } + + if (result == MSGPACK_UNPACK_NOMEM_ERROR) { + OUT_STR(e_outofmem); + out_char('\n'); + preserve_exit(); + } + + if (result == MSGPACK_UNPACK_PARSE_ERROR) { + // See src/msgpack/unpack_template.h in msgpack source tree for + // causes for this error(search for 'goto _failed') + // + // A not so uncommon cause for this might be deserializing objects with + // a high nesting level: msgpack will break when it's internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + send_error(channel, 0, "Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting"); + } + +end: + channel->rpc_call_level--; + if (!channel->enabled && !kv_size(channel->call_stack)) { + // Now it's safe to destroy the channel + close_channel(channel); + } +} + +static bool channel_write(Channel *channel, WBuffer *buffer) +{ + bool success; + + if (channel->is_job) { + success = job_write(channel->data.job, buffer); + } else { + success = wstream_write(channel->data.streams.write, buffer); + } + + if (!success) { + // If the write failed for any reason, close the channel + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed due to a failed write", + channel->id); + call_set_error(channel, buf); + } + + return success; +} + +static void send_error(Channel *channel, uint64_t id, char *err) +{ + Error e = ERROR_INIT; + api_set_error(&e, Exception, "%s", err); + channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); +} + +static void send_request(Channel *channel, + uint64_t id, + char *name, + Array args) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); +} + +static void send_event(Channel *channel, + char *name, + Array args) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); +} + +static void broadcast_event(char *name, Array args) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + api_free_array(args); + goto end; + } + + String method = {.size = strlen(name), .data = name}; + WBuffer *buffer = serialize_request(0, + method, + args, + &out_buffer, + kv_size(subscribed)); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + channel_write(kv_A(subscribed, i), buffer); + } + +end: + kv_destroy(subscribed); +} + +static void unsubscribe(Channel *channel, char *event) +{ + char *event_string = pmap_get(cstr_t)(event_strings, event); + pmap_del(cstr_t)(channel->subscribed_events, event_string); + + map_foreach_value(channels, channel, { + if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + return; + } + }); + + // Since the string is no longer used by other channels, release it's memory + pmap_del(cstr_t)(event_strings, event_string); + free(event_string); +} + +static void close_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + channel_kill(channel); + + free(channel); +} + +static void channel_kill(Channel *channel) +{ + if (channel->is_job) { + if (channel->data.job) { + job_stop(channel->data.job); + } + } else { + rstream_free(channel->data.streams.read); + wstream_free(channel->data.streams.write); + if (channel->data.streams.uv) { + uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + } else { + // When the stdin channel closes, it's time to go + mch_exit(0); + } + } +} + +static void close_cb(uv_handle_t *handle) +{ + free(handle->data); + free(handle); +} + +static Channel *register_channel(void) +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->enabled = true; + rv->rpc_call_level = 0; + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->id = next_id++; + rv->subscribed_events = pmap_new(cstr_t)(); + rv->next_request_id = 1; + kv_init(rv->call_stack); + pmap_put(uint64_t)(channels, rv->id, rv); + return rv; +} + +static bool is_rpc_response(msgpack_object *obj) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 4 + && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.array.ptr[0].via.u64 == 1 + && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; +} + +static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) +{ + uint64_t response_id = obj->via.array.ptr[1].via.u64; + // Must be equal to the frame at the stack's bottom + return response_id == kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1)->request_id; +} + +static void call_stack_pop(msgpack_object *obj, Channel *channel) +{ + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; + + if (frame->errored) { + msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); + } else { + msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); + } +} + +static void call_set_error(Channel *channel, char *msg) +{ + for (size_t i = 0; i < kv_size(channel->call_stack); i++) { + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = true; + frame->result = STRING_OBJ(cstr_to_string(msg)); + } + + channel->enabled = false; +} diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h new file mode 100644 index 0000000000..df742fe368 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel.h @@ -0,0 +1,15 @@ +#ifndef NVIM_MSGPACK_RPC_CHANNEL_H +#define NVIM_MSGPACK_RPC_CHANNEL_H + +#include +#include + +#include "nvim/api/private/defs.h" +#include "nvim/vim.h" + +#define METHOD_MAXLEN 512 + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/channel.h.generated.h" +#endif +#endif // NVIM_MSGPACK_RPC_CHANNEL_H diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h new file mode 100644 index 0000000000..5eec4ced54 --- /dev/null +++ b/src/nvim/msgpack_rpc/defs.h @@ -0,0 +1,34 @@ +#ifndef NVIM_MSGPACK_RPC_DEFS_H +#define NVIM_MSGPACK_RPC_DEFS_H + +#include + + +/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores +/// functions of this type. +typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, + msgpack_object *req, + Error *error); + +/// Initializes the msgpack-rpc method table +void msgpack_rpc_init_method_table(void); + +void msgpack_rpc_init_function_metadata(Dictionary *metadata); + +/// Dispatches to the actual API function after basic payload validation by +/// `msgpack_rpc_call`. It is responsible for validating/converting arguments +/// to C types, and converting the return value back to msgpack types. +/// The implementation is generated at compile time with metadata extracted +/// from the api/*.h headers, +/// +/// @param channel_id The channel id +/// @param method_id The method id +/// @param req The parsed request object +/// @param error Pointer to error structure +/// @return Some object +Object msgpack_rpc_dispatch(uint64_t channel_id, + msgpack_object *req, + Error *error) + FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); + +#endif // NVIM_MSGPACK_RPC_DEFS_H diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c new file mode 100644 index 0000000000..4b96e4985e --- /dev/null +++ b/src/nvim/msgpack_rpc/helpers.c @@ -0,0 +1,463 @@ +#include +#include +#include + +#include + +#include "nvim/api/private/helpers.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/defs.h" +#include "nvim/vim.h" +#include "nvim/log.h" +#include "nvim/memory.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/helpers.c.generated.h" +#endif + +static msgpack_zone zone; +static msgpack_sbuffer sbuffer; + +#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ + bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ + FUNC_ATTR_NONNULL_ALL \ + { \ + if (obj->type != MSGPACK_OBJECT_EXT \ + || obj->via.ext.type != kObjectType##t) { \ + return false; \ + } \ + \ + msgpack_object data; \ + msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \ + obj->via.ext.size, \ + NULL, \ + &zone, \ + &data); \ + \ + if (ret != MSGPACK_UNPACK_SUCCESS) { \ + return false; \ + } \ + \ + *arg = data.via.u64; \ + return true; \ + } \ + \ + void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \ + FUNC_ATTR_NONNULL_ARG(2) \ + { \ + msgpack_packer pac; \ + msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ + msgpack_pack_uint64(&pac, o); \ + msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ + msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ + msgpack_sbuffer_clear(&sbuffer); \ + } + +void msgpack_rpc_helpers_init(void) +{ + msgpack_zone_init(&zone, 0xfff); + msgpack_sbuffer_init(&sbuffer); +} + +HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer) +HANDLE_TYPE_CONVERSION_IMPL(Window, window) +HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage) + +bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) + FUNC_ATTR_NONNULL_ALL +{ + *arg = obj->via.boolean; + return obj->type == MSGPACK_OBJECT_BOOLEAN; +} + +bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.u64 <= INT64_MAX) { + *arg = (int64_t)obj->via.u64; + return true; + } + + *arg = obj->via.i64; + return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; +} + +bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) + FUNC_ATTR_NONNULL_ALL +{ + *arg = obj->via.dec; + return obj->type == MSGPACK_OBJECT_DOUBLE; +} + +bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) { + arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size); + arg->size = obj->via.bin.size; + } else { + return false; + } + + return true; +} + +bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) + FUNC_ATTR_NONNULL_ALL +{ + switch (obj->type) { + case MSGPACK_OBJECT_NIL: + arg->type = kObjectTypeNil; + return true; + + case MSGPACK_OBJECT_BOOLEAN: + arg->type = kObjectTypeBoolean; + return msgpack_rpc_to_boolean(obj, &arg->data.boolean); + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + arg->type = kObjectTypeInteger; + return msgpack_rpc_to_integer(obj, &arg->data.integer); + + case MSGPACK_OBJECT_DOUBLE: + arg->type = kObjectTypeFloat; + return msgpack_rpc_to_float(obj, &arg->data.floating); + + case MSGPACK_OBJECT_BIN: + case MSGPACK_OBJECT_STR: + arg->type = kObjectTypeString; + return msgpack_rpc_to_string(obj, &arg->data.string); + + case MSGPACK_OBJECT_ARRAY: + arg->type = kObjectTypeArray; + return msgpack_rpc_to_array(obj, &arg->data.array); + + case MSGPACK_OBJECT_MAP: + arg->type = kObjectTypeDictionary; + return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); + + case MSGPACK_OBJECT_EXT: + switch (obj->via.ext.type) { + case kObjectTypeBuffer: + return msgpack_rpc_to_buffer(obj, &arg->data.buffer); + case kObjectTypeWindow: + return msgpack_rpc_to_window(obj, &arg->data.window); + case kObjectTypeTabpage: + return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); + } + default: + return false; + } +} + +bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type != MSGPACK_OBJECT_ARRAY) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.array.size, sizeof(Object)); + + for (uint32_t i = 0; i < obj->via.array.size; i++) { + if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { + return false; + } + } + + return true; +} + +bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) + FUNC_ATTR_NONNULL_ALL +{ + if (obj->type != MSGPACK_OBJECT_MAP) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); + + + for (uint32_t i = 0; i < obj->via.map.size; i++) { + if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, + &arg->items[i].key)) { + return false; + } + + if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, + &arg->items[i].value)) { + return false; + } + } + + return true; +} + +void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + if (result) { + msgpack_pack_true(res); + } else { + msgpack_pack_false(res); + } +} + +void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_int64(res, result); +} + +void msgpack_rpc_from_float(Float result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_double(res, result); +} + +void msgpack_rpc_from_string(String result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_bin(res, result.size); + msgpack_pack_bin_body(res, result.data, result.size); +} + +void msgpack_rpc_from_object(Object result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + switch (result.type) { + case kObjectTypeNil: + msgpack_pack_nil(res); + break; + + case kObjectTypeBoolean: + msgpack_rpc_from_boolean(result.data.boolean, res); + break; + + case kObjectTypeInteger: + msgpack_rpc_from_integer(result.data.integer, res); + break; + + case kObjectTypeFloat: + msgpack_rpc_from_float(result.data.floating, res); + break; + + case kObjectTypeString: + msgpack_rpc_from_string(result.data.string, res); + break; + + case kObjectTypeArray: + msgpack_rpc_from_array(result.data.array, res); + break; + + case kObjectTypeBuffer: + msgpack_rpc_from_buffer(result.data.buffer, res); + break; + + case kObjectTypeWindow: + msgpack_rpc_from_window(result.data.window, res); + break; + + case kObjectTypeTabpage: + msgpack_rpc_from_tabpage(result.data.tabpage, res); + break; + + case kObjectTypeDictionary: + msgpack_rpc_from_dictionary(result.data.dictionary, res); + break; + } +} + +void msgpack_rpc_from_array(Array result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_array(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_object(result.items[i], res); + } +} + +void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) +{ + msgpack_pack_map(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_string(result.items[i].key, res); + msgpack_rpc_from_object(result.items[i].value, res); + } +} + +/// Validates the basic structure of the msgpack-rpc call and fills `res` +/// with the basic response structure. +/// +/// @param channel_id The channel id +/// @param req The parsed request object +/// @param res A packer that contains the response +WBuffer *msgpack_rpc_call(uint64_t channel_id, + msgpack_object *req, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2) + FUNC_ATTR_NONNULL_ARG(3) +{ + uint64_t response_id; + Error error = ERROR_INIT; + msgpack_rpc_validate(&response_id, req, &error); + + if (error.set) { + return serialize_response(response_id, &error, NIL, sbuffer); + } + + // dispatch the call + Object rv = msgpack_rpc_dispatch(channel_id, req, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); + + if (error.set) { + ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", + error.msg, + response_id); + return serialize_response(response_id, &error, NIL, sbuffer); + } + + DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", + response_id); + return serialize_response(response_id, &error, rv, sbuffer); +} + +/// Finishes the msgpack-rpc call with an error message. +/// +/// @param msg The error message +/// @param res A packer that contains the response +void msgpack_rpc_error(char *msg, msgpack_packer *res) + FUNC_ATTR_NONNULL_ALL +{ + size_t len = strlen(msg); + + // error message + msgpack_pack_bin(res, len); + msgpack_pack_bin_body(res, msg, len); + // Nil result + msgpack_pack_nil(res); +} + +/// Handler executed when an invalid method name is passed +Object msgpack_rpc_handle_missing_method(uint64_t channel_id, + msgpack_object *req, + Error *error) +{ + snprintf(error->msg, sizeof(error->msg), "Invalid method name"); + error->set = true; + return NIL; +} + +/// Serializes a msgpack-rpc request or notification(id == 0) +WBuffer *serialize_request(uint64_t request_id, + String method, + Array args, + msgpack_sbuffer *sbuffer, + size_t refcount) + FUNC_ATTR_NONNULL_ARG(4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, request_id ? 4 : 3); + msgpack_pack_int(&pac, request_id ? 0 : 2); + + if (request_id) { + msgpack_pack_uint64(&pac, request_id); + } + + msgpack_pack_bin(&pac, method.size); + msgpack_pack_bin_body(&pac, method.data, method.size); + msgpack_rpc_from_array(args, &pac); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + refcount, + free); + api_free_array(args); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +/// Serializes a msgpack-rpc response +WBuffer *serialize_response(uint64_t response_id, + Error *err, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2, 4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, response_id); + + if (err->set) { + // error represented by a [type, message] array + msgpack_pack_array(&pac, 2); + msgpack_rpc_from_integer(err->type, &pac); + msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); + // Nil result + msgpack_pack_nil(&pac); + } else { + // Nil error + msgpack_pack_nil(&pac); + // Return value + msgpack_rpc_from_object(arg, &pac); + } + + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + 1, // responses only go though 1 channel + free); + api_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +void msgpack_rpc_validate(uint64_t *response_id, + msgpack_object *req, + Error *err) +{ + // response id not known yet + + *response_id = 0; + // Validate the basic structure of the msgpack-rpc payload + if (req->type != MSGPACK_OBJECT_ARRAY) { + api_set_error(err, Validation, _("Request is not an array")); + } + + if (req->via.array.size != 4) { + api_set_error(err, Validation, _("Request array size should be 4")); + } + + if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Id must be a positive integer")); + } + + // Set the response id, which is the same as the request + *response_id = req->via.array.ptr[1].via.u64; + + if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Message type must be an integer")); + } + + if (req->via.array.ptr[0].via.u64 != 0) { + api_set_error(err, Validation, _("Message type must be 0")); + } + + if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN + && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { + api_set_error(err, Validation, _("Method must be a string")); + } + + if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { + api_set_error(err, Validation, _("Paremeters must be an array")); + } +} diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h new file mode 100644 index 0000000000..bf161d54e0 --- /dev/null +++ b/src/nvim/msgpack_rpc/helpers.h @@ -0,0 +1,17 @@ +#ifndef NVIM_MSGPACK_RPC_HELPERS_H +#define NVIM_MSGPACK_RPC_HELPERS_H + +#include +#include + +#include + +#include "nvim/os/wstream.h" +#include "nvim/api/private/defs.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/helpers.h.generated.h" +#endif + +#endif // NVIM_MSGPACK_RPC_HELPERS_H + diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c new file mode 100644 index 0000000000..33e01fe562 --- /dev/null +++ b/src/nvim/msgpack_rpc/server.c @@ -0,0 +1,273 @@ +#include +#include +#include +#include + +#include + +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/os/os.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/memory.h" +#include "nvim/message.h" +#include "nvim/tempfile.h" +#include "nvim/map.h" +#include "nvim/path.h" + +#define MAX_CONNECTIONS 32 +#define ADDRESS_MAX_SIZE 256 +#define NEOVIM_DEFAULT_TCP_PORT 7450 +#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" + +typedef enum { + kServerTypeTcp, + kServerTypePipe +} ServerType; + +typedef struct { + // Type of the union below + ServerType type; + + // This is either a tcp server or unix socket(named pipe on windows) + union { + struct { + uv_tcp_t handle; + struct sockaddr_in addr; + } tcp; + struct { + uv_pipe_t handle; + char addr[ADDRESS_MAX_SIZE]; + } pipe; + } socket; +} Server; + +static PMap(cstr_t) *servers = NULL; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/server.c.generated.h" +#endif + +/// Initializes the module +bool server_init(void) +{ + servers = pmap_new(cstr_t)(); + + if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) { + char *listen_address = (char *)vim_tempname(); + os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1); + free(listen_address); + } + + return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0; +} + +/// Teardown the server module +void server_teardown(void) +{ + if (!servers) { + return; + } + + Server *server; + + map_foreach_value(servers, server, { + if (server->type == kServerTypeTcp) { + uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); + } else { + uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + } + }); +} + +/// Starts listening on arbitrary tcp/unix addresses specified by +/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will +/// be determined by parsing `endpoint`: If it's a valid tcp address in the +/// 'ip[:port]' format, then it will be tcp socket. The port is optional +/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will +/// be a unix socket or named pipe. +/// +/// @param endpoint Address of the server. Either a 'ip[:port]' string or an +/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or +/// named pipe. +/// @returns zero if successful, one on a regular error, and negative errno +/// on failure to bind or connect. +int server_start(const char *endpoint) + FUNC_ATTR_NONNULL_ALL +{ + char addr[ADDRESS_MAX_SIZE]; + + // Trim to `ADDRESS_MAX_SIZE` + if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { + // TODO(aktau): since this is not what the user wanted, perhaps we + // should return an error here + EMSG2("Address was too long, truncated to %s", addr); + } + + // Check if the server already exists + if (pmap_has(cstr_t)(servers, addr)) { + EMSG2("Already listening on %s", addr); + return 1; + } + + ServerType server_type = kServerTypeTcp; + Server *server = xmalloc(sizeof(Server)); + char ip[16], *ip_end = strrchr(addr, ':'); + + if (!ip_end) { + ip_end = strchr(addr, NUL); + } + + uint32_t addr_len = ip_end - addr; + + if (addr_len > sizeof(ip) - 1) { + // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) + addr_len = sizeof(ip) - 1; + } + + // Extract the address part + xstrlcpy(ip, addr, addr_len + 1); + + int port = NEOVIM_DEFAULT_TCP_PORT; + + if (*ip_end == ':') { + // Extract the port + long lport = strtol(ip_end + 1, NULL, 10); // NOLINT + if (lport <= 0 || lport > 0xffff) { + // Invalid port, treat as named pipe or unix socket + server_type = kServerTypePipe; + } else { + port = (int) lport; + } + } + + if (server_type == kServerTypeTcp) { + // Try to parse ip address + if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { + // Invalid address, treat as named pipe or unix socket + server_type = kServerTypePipe; + } + } + + int result; + + if (server_type == kServerTypeTcp) { + // Listen on tcp address/port + uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); + server->socket.tcp.handle.data = server; + result = uv_tcp_bind(&server->socket.tcp.handle, + (const struct sockaddr *)&server->socket.tcp.addr, + 0); + if (result == 0) { + result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, + MAX_CONNECTIONS, + connection_cb); + if (result) { + uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); + } + } + } else { + // Listen on named pipe or unix socket + xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr)); + uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); + server->socket.pipe.handle.data = server; + result = uv_pipe_bind(&server->socket.pipe.handle, + server->socket.pipe.addr); + if (result == 0) { + result = uv_listen((uv_stream_t *)&server->socket.pipe.handle, + MAX_CONNECTIONS, + connection_cb); + + if (result) { + uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + } + } + } + + assert(result <= 0); // libuv should have returned -errno or zero. + if (result < 0) { + if (result == -EACCES) { + // Libuv converts ENOENT to EACCES for Windows compatibility, but if + // the parent directory does not exist, ENOENT would be more accurate. + *path_tail((char_u *) addr) = NUL; + if (!os_file_exists((char_u *) addr)) { + result = -ENOENT; + } + } + EMSG2("Failed to start server: %s", uv_strerror(result)); + free(server); + return result; + } + + server->type = server_type; + + // Add the server to the hash table + pmap_put(cstr_t)(servers, addr, server); + + return 0; +} + +/// Stops listening on the address specified by `endpoint`. +/// +/// @param endpoint Address of the server. +void server_stop(char *endpoint) +{ + Server *server; + char addr[ADDRESS_MAX_SIZE]; + + // Trim to `ADDRESS_MAX_SIZE` + xstrlcpy(addr, endpoint, sizeof(addr)); + + if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) { + EMSG2("Not listening on %s", addr); + return; + } + + if (server->type == kServerTypeTcp) { + uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); + } else { + uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + } + + pmap_del(cstr_t)(servers, addr); +} + +static void connection_cb(uv_stream_t *server, int status) +{ + int result; + uv_stream_t *client; + Server *srv = server->data; + + if (status < 0) { + abort(); + } + + if (srv->type == kServerTypeTcp) { + client = xmalloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); + } else { + client = xmalloc(sizeof(uv_pipe_t)); + uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); + } + + result = uv_accept(server, client); + + if (result) { + EMSG2("Failed to accept connection: %s", uv_strerror(result)); + uv_close((uv_handle_t *)client, free_client); + return; + } + + channel_from_stream(client); +} + +static void free_client(uv_handle_t *handle) +{ + free(handle); +} + +static void free_server(uv_handle_t *handle) +{ + free(handle->data); +} diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h new file mode 100644 index 0000000000..f1a6703938 --- /dev/null +++ b/src/nvim/msgpack_rpc/server.h @@ -0,0 +1,7 @@ +#ifndef NVIM_MSGPACK_RPC_SERVER_H +#define NVIM_MSGPACK_RPC_SERVER_H + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/server.h.generated.h" +#endif +#endif // NVIM_MSGPACK_RPC_SERVER_H diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c deleted file mode 100644 index 959fbc6e73..0000000000 --- a/src/nvim/os/channel.c +++ /dev/null @@ -1,598 +0,0 @@ -#include -#include -#include - -#include -#include - -#include "nvim/api/private/helpers.h" -#include "nvim/api/vim.h" -#include "nvim/os/channel.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/vim.h" -#include "nvim/ascii.h" -#include "nvim/memory.h" -#include "nvim/os_unix.h" -#include "nvim/message.h" -#include "nvim/term.h" -#include "nvim/map.h" -#include "nvim/log.h" -#include "nvim/misc1.h" -#include "nvim/lib/kvec.h" - -#define CHANNEL_BUFFER_SIZE 0xffff - -typedef struct { - uint64_t request_id; - bool errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - PMap(cstr_t) *subscribed_events; - bool is_job, enabled; - msgpack_unpacker *unpacker; - union { - Job *job; - struct { - RStream *read; - WStream *write; - uv_stream_t *uv; - } streams; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; -} Channel; - -static uint64_t next_id = 1; -static PMap(uint64_t) *channels = NULL; -static PMap(cstr_t) *event_strings = NULL; -static msgpack_sbuffer out_buffer; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.c.generated.h" -#endif - -/// Initializes the module -void channel_init(void) -{ - channels = pmap_new(uint64_t)(); - event_strings = pmap_new(cstr_t)(); - msgpack_sbuffer_init(&out_buffer); - - if (embedded_mode) { - channel_from_stdio(); - } -} - -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a job and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. -/// -/// @param argv The argument vector for the process -/// @return The channel id -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(); - channel->is_job = true; - - int status; - channel->data.job = job_start(argv, - channel, - job_out, - job_err, - NULL, - 0, - &status); - - if (status <= 0) { - close_channel(channel); - return 0; - } - - return channel->id; -} - -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection -/// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) -{ - Channel *channel = register_channel(); - stream->data = NULL; - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); - rstream_set_stream(channel->data.streams.read, stream); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_stream(channel->data.streams.write, stream); - channel->data.streams.uv = stream; -} - -bool channel_exists(uint64_t id) -{ - Channel *channel; - return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; -} - -/// Sends event/arguments to channel -/// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments -/// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *name, Array args) -{ - Channel *channel = NULL; - - if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - api_free_array(args); - return false; - } - send_event(channel, name, args); - } else { - broadcast_event(name, args); - } - - return true; -} - -/// Sends a method call to a channel -/// -/// @param id The channel id -/// @param method_name The method name, an arbitrary string -/// @param args Array with method arguments -/// @param[out] error True if the return value is an error -/// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - char *method_name, - Array args, - Error *err) -{ - Channel *channel = NULL; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); - api_free_array(args); - return NIL; - } - - if (kv_size(channel->call_stack) > 20) { - // 20 stack depth is more than anyone should ever need for RPC calls - api_set_error(err, - Exception, - _("Channel %" PRIu64 " crossed maximum stack depth"), - channel->id); - api_free_array(args); - return NIL; - } - - uint64_t request_id = channel->next_request_id++; - // Send the msgpack-rpc request - send_request(channel, request_id, method_name, args); - - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - - // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; - kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); - - do { - event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return - - if (frame.errored) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); - return NIL; - } - - return frame.result; -} - -/// Subscribes to event broadcasts -/// -/// @param id The channel id -/// @param event The event type string -void channel_subscribe(uint64_t id, char *event) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - abort(); - } - - char *event_string = pmap_get(cstr_t)(event_strings, event); - - if (!event_string) { - event_string = xstrdup(event); - pmap_put(cstr_t)(event_strings, event_string, event_string); - } - - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); -} - -/// Unsubscribes to event broadcasts -/// -/// @param id The channel id -/// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - abort(); - } - - unsubscribe(channel, event); -} - -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { - return false; - } - - channel_kill(channel); - channel->enabled = false; - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -static void channel_from_stdio(void) -{ - Channel *channel = register_channel(); - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); - rstream_set_file(channel->data.streams.read, 0); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_file(channel->data.streams.write, 1); - channel->data.streams.uv = NULL; -} - -static void job_out(RStream *rstream, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(rstream, job_data(job), eof); -} - -static void job_err(RStream *rstream, void *data, bool eof) -{ - size_t count; - char buf[256]; - Channel *channel = job_data(data); - - while ((count = rstream_pending(rstream))) { - size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); - buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); - } -} - -static void parse_msgpack(RStream *rstream, void *data, bool eof) -{ - Channel *channel = data; - channel->rpc_call_level++; - - if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); - goto end; - } - - uint32_t count = rstream_pending(rstream); - DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", - count, - rstream); - - // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); - - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - msgpack_unpack_return result; - - // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { - if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { - if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); - } else { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Channel %" PRIu64 " returned a response that doesn't have " - " a matching id for the current RPC call. Ensure the client " - " is properly synchronized", - channel->id); - call_set_error(channel, buf); - } - msgpack_unpacked_destroy(&unpacked); - // Bail out from this event loop iteration - goto end; - } - - // Perform the call - WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); - // write the response - if (!channel_write(channel, resp)) { - goto end; - } - } - - if (result == MSGPACK_UNPACK_NOMEM_ERROR) { - OUT_STR(e_outofmem); - out_char('\n'); - preserve_exit(); - } - - if (result == MSGPACK_UNPACK_PARSE_ERROR) { - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - send_error(channel, 0, "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); - } - -end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); - } -} - -static bool channel_write(Channel *channel, WBuffer *buffer) -{ - bool success; - - if (channel->is_job) { - success = job_write(channel->data.job, buffer); - } else { - success = wstream_write(channel->data.streams.write, buffer); - } - - if (!success) { - // If the write failed for any reason, close the channel - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed due to a failed write", - channel->id); - call_set_error(channel, buf); - } - - return success; -} - -static void send_error(Channel *channel, uint64_t id, char *err) -{ - Error e = ERROR_INIT; - api_set_error(&e, Exception, "%s", err); - channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); -} - -static void send_request(Channel *channel, - uint64_t id, - char *name, - Array args) -{ - String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); -} - -static void send_event(Channel *channel, - char *name, - Array args) -{ - String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); -} - -static void broadcast_event(char *name, Array args) -{ - kvec_t(Channel *) subscribed; - kv_init(subscribed); - Channel *channel; - - map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { - kv_push(Channel *, subscribed, channel); - } - }); - - if (!kv_size(subscribed)) { - api_free_array(args); - goto end; - } - - String method = {.size = strlen(name), .data = name}; - WBuffer *buffer = serialize_request(0, - method, - args, - &out_buffer, - kv_size(subscribed)); - - for (size_t i = 0; i < kv_size(subscribed); i++) { - channel_write(kv_A(subscribed, i), buffer); - } - -end: - kv_destroy(subscribed); -} - -static void unsubscribe(Channel *channel, char *event) -{ - char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); - - map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { - return; - } - }); - - // Since the string is no longer used by other channels, release it's memory - pmap_del(cstr_t)(event_strings, event_string); - free(event_string); -} - -static void close_channel(Channel *channel) -{ - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ - if (channel->is_job) { - if (channel->data.job) { - job_stop(channel->data.job); - } - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - if (channel->data.streams.uv) { - uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); - } else { - // When the stdin channel closes, it's time to go - mch_exit(0); - } - } -} - -static void close_cb(uv_handle_t *handle) -{ - free(handle->data); - free(handle); -} - -static Channel *register_channel(void) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = next_id++; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; -} - -static bool is_rpc_response(msgpack_object *obj) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 4 - && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.array.ptr[0].via.u64 == 1 - && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; -} - -static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) -{ - uint64_t response_id = obj->via.array.ptr[1].via.u64; - // Must be equal to the frame at the stack's bottom - return response_id == kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1)->request_id; -} - -static void call_stack_pop(msgpack_object *obj, Channel *channel) -{ - ChannelCallFrame *frame = kv_pop(channel->call_stack); - frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - - if (frame->errored) { - msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); - } else { - msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); - } -} - -static void call_set_error(Channel *channel, char *msg) -{ - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); - frame->errored = true; - frame->result = STRING_OBJ(cstr_to_string(msg)); - } - - channel->enabled = false; -} diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h deleted file mode 100644 index bb409bfde9..0000000000 --- a/src/nvim/os/channel.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef NVIM_OS_CHANNEL_H -#define NVIM_OS_CHANNEL_H - -#include -#include - -#include "nvim/api/private/defs.h" -#include "nvim/vim.h" - -#define METHOD_MAXLEN 512 - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.h.generated.h" -#endif -#endif // NVIM_OS_CHANNEL_H diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index a460b2db96..f20d43c6a4 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -7,8 +7,10 @@ #include "nvim/os/event.h" #include "nvim/os/input.h" -#include "nvim/os/channel.h" -#include "nvim/os/server.h" +#include "nvim/msgpack_rpc/defs.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/os/provider.h" #include "nvim/os/signal.h" #include "nvim/os/rstream.h" @@ -41,6 +43,9 @@ static EventSource *immediate_sources = NULL; void event_init(void) { + // early msgpack-rpc initialization + msgpack_rpc_init_method_table(); + msgpack_rpc_helpers_init(); // Initialize the event queues deferred_events = kl_init(Event); immediate_events = kl_init(Event); @@ -52,9 +57,8 @@ void event_init(void) signal_init(); // Jobs job_init(); - // Channels + // finish mspgack-rpc initialization channel_init(); - // Servers server_init(); // Providers provider_init(); diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c deleted file mode 100644 index 55bc006ad1..0000000000 --- a/src/nvim/os/msgpack_rpc.c +++ /dev/null @@ -1,188 +0,0 @@ -#include -#include -#include - -#include - -#include "nvim/vim.h" -#include "nvim/log.h" -#include "nvim/memory.h" -#include "nvim/os/wstream.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/api/private/helpers.h" -#include "nvim/func_attr.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.c.generated.h" -#endif - -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param channel_id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -WBuffer *msgpack_rpc_call(uint64_t channel_id, - msgpack_object *req, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2) - FUNC_ATTR_NONNULL_ARG(3) -{ - uint64_t response_id; - Error error = ERROR_INIT; - msgpack_rpc_validate(&response_id, req, &error); - - if (error.set) { - return serialize_response(response_id, &error, NIL, sbuffer); - } - - // dispatch the call - Object rv = msgpack_rpc_dispatch(channel_id, req, &error); - // send the response - msgpack_packer response; - msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); - - if (error.set) { - ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", - error.msg, - response_id); - return serialize_response(response_id, &error, NIL, sbuffer); - } - - DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", - response_id); - return serialize_response(response_id, &error, rv, sbuffer); -} - -/// Finishes the msgpack-rpc call with an error message. -/// -/// @param msg The error message -/// @param res A packer that contains the response -void msgpack_rpc_error(char *msg, msgpack_packer *res) - FUNC_ATTR_NONNULL_ALL -{ - size_t len = strlen(msg); - - // error message - msgpack_pack_bin(res, len); - msgpack_pack_bin_body(res, msg, len); - // Nil result - msgpack_pack_nil(res); -} - -/// Handler executed when an invalid method name is passed -Object msgpack_rpc_handle_missing_method(uint64_t channel_id, - msgpack_object *req, - Error *error) -{ - snprintf(error->msg, sizeof(error->msg), "Invalid method name"); - error->set = true; - return NIL; -} - -/// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t request_id, - String method, - Array args, - msgpack_sbuffer *sbuffer, - size_t refcount) - FUNC_ATTR_NONNULL_ARG(4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, request_id ? 4 : 3); - msgpack_pack_int(&pac, request_id ? 0 : 2); - - if (request_id) { - msgpack_pack_uint64(&pac, request_id); - } - - msgpack_pack_bin(&pac, method.size); - msgpack_pack_bin_body(&pac, method.data, method.size); - msgpack_rpc_from_array(args, &pac); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - free); - api_free_array(args); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -/// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t response_id, - Error *err, - Object arg, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2, 4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, 4); - msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, response_id); - - if (err->set) { - // error represented by a [type, message] array - msgpack_pack_array(&pac, 2); - msgpack_rpc_from_integer(err->type, &pac); - msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); - // Nil result - msgpack_pack_nil(&pac); - } else { - // Nil error - msgpack_pack_nil(&pac); - // Return value - msgpack_rpc_from_object(arg, &pac); - } - - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - free); - api_free_object(arg); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -static void msgpack_rpc_validate(uint64_t *response_id, - msgpack_object *req, - Error *err) -{ - // response id not known yet - - *response_id = 0; - // Validate the basic structure of the msgpack-rpc payload - if (req->type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Request is not an array")); - } - - if (req->via.array.size != 4) { - api_set_error(err, Validation, _("Request array size should be 4")); - } - - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Id must be a positive integer")); - } - - // Set the response id, which is the same as the request - *response_id = req->via.array.ptr[1].via.u64; - - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Message type must be an integer")); - } - - if (req->via.array.ptr[0].via.u64 != 0) { - api_set_error(err, Validation, _("Message type must be 0")); - } - - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN - && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { - api_set_error(err, Validation, _("Method must be a string")); - } - - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Paremeters must be an array")); - } -} diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h deleted file mode 100644 index 3476d791ea..0000000000 --- a/src/nvim/os/msgpack_rpc.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_H -#define NVIM_OS_MSGPACK_RPC_H - -#include - -#include - -#include "nvim/func_attr.h" -#include "nvim/api/private/defs.h" -#include "nvim/os/wstream.h" - -typedef enum { - kUnpackResultOk, /// Successfully parsed a document - kUnpackResultFail, /// Got unexpected input - kUnpackResultNeedMore /// Need more data -} UnpackResult; - -/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores -/// functions of this type. -typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, - msgpack_object *req, - Error *error); - - -/// Initializes the msgpack-rpc method table -void msgpack_rpc_init(void); - -void msgpack_rpc_init_function_metadata(Dictionary *metadata); - -/// Dispatches to the actual API function after basic payload validation by -/// `msgpack_rpc_call`. It is responsible for validating/converting arguments -/// to C types, and converting the return value back to msgpack types. -/// The implementation is generated at compile time with metadata extracted -/// from the api/*.h headers, -/// -/// @param channel_id The channel id -/// @param method_id The method id -/// @param req The parsed request object -/// @param error Pointer to error structure -/// @return Some object -Object msgpack_rpc_dispatch(uint64_t channel_id, - msgpack_object *req, - Error *error) - FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); - - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_H diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c deleted file mode 100644 index b14de8245c..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.c +++ /dev/null @@ -1,289 +0,0 @@ -#include -#include - -#include - -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.c.generated.h" -#endif - -static msgpack_zone zone; -static msgpack_sbuffer sbuffer; - -#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ - FUNC_ATTR_NONNULL_ALL \ - { \ - if (obj->type != MSGPACK_OBJECT_EXT \ - || obj->via.ext.type != kObjectType##t) { \ - return false; \ - } \ - \ - msgpack_object data; \ - msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \ - obj->via.ext.size, \ - NULL, \ - &zone, \ - &data); \ - \ - if (ret != MSGPACK_UNPACK_SUCCESS) { \ - return false; \ - } \ - \ - *arg = data.via.u64; \ - return true; \ - } \ - \ - void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \ - FUNC_ATTR_NONNULL_ARG(2) \ - { \ - msgpack_packer pac; \ - msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ - msgpack_pack_uint64(&pac, o); \ - msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ - msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ - msgpack_sbuffer_clear(&sbuffer); \ - } - -void msgpack_rpc_helpers_init(void) -{ - msgpack_zone_init(&zone, 0xfff); - msgpack_sbuffer_init(&sbuffer); -} - -HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer) -HANDLE_TYPE_CONVERSION_IMPL(Window, window) -HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage) - -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) - FUNC_ATTR_NONNULL_ALL -{ - *arg = obj->via.boolean; - return obj->type == MSGPACK_OBJECT_BOOLEAN; -} - -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.u64 <= INT64_MAX) { - *arg = (int64_t)obj->via.u64; - return true; - } - - *arg = obj->via.i64; - return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; -} - -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) - FUNC_ATTR_NONNULL_ALL -{ - *arg = obj->via.dec; - return obj->type == MSGPACK_OBJECT_DOUBLE; -} - -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) { - arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size); - arg->size = obj->via.bin.size; - } else { - return false; - } - - return true; -} - -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) - FUNC_ATTR_NONNULL_ALL -{ - switch (obj->type) { - case MSGPACK_OBJECT_NIL: - arg->type = kObjectTypeNil; - return true; - - case MSGPACK_OBJECT_BOOLEAN: - arg->type = kObjectTypeBoolean; - return msgpack_rpc_to_boolean(obj, &arg->data.boolean); - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - arg->type = kObjectTypeInteger; - return msgpack_rpc_to_integer(obj, &arg->data.integer); - - case MSGPACK_OBJECT_DOUBLE: - arg->type = kObjectTypeFloat; - return msgpack_rpc_to_float(obj, &arg->data.floating); - - case MSGPACK_OBJECT_BIN: - case MSGPACK_OBJECT_STR: - arg->type = kObjectTypeString; - return msgpack_rpc_to_string(obj, &arg->data.string); - - case MSGPACK_OBJECT_ARRAY: - arg->type = kObjectTypeArray; - return msgpack_rpc_to_array(obj, &arg->data.array); - - case MSGPACK_OBJECT_MAP: - arg->type = kObjectTypeDictionary; - return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); - - case MSGPACK_OBJECT_EXT: - switch (obj->via.ext.type) { - case kObjectTypeBuffer: - return msgpack_rpc_to_buffer(obj, &arg->data.buffer); - case kObjectTypeWindow: - return msgpack_rpc_to_window(obj, &arg->data.window); - case kObjectTypeTabpage: - return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); - } - default: - return false; - } -} - -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type != MSGPACK_OBJECT_ARRAY) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.array.size, sizeof(Object)); - - for (uint32_t i = 0; i < obj->via.array.size; i++) { - if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { - return false; - } - } - - return true; -} - -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) - FUNC_ATTR_NONNULL_ALL -{ - if (obj->type != MSGPACK_OBJECT_MAP) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); - - - for (uint32_t i = 0; i < obj->via.map.size; i++) { - if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, - &arg->items[i].key)) { - return false; - } - - if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, - &arg->items[i].value)) { - return false; - } - } - - return true; -} - -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - if (result) { - msgpack_pack_true(res); - } else { - msgpack_pack_false(res); - } -} - -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_int64(res, result); -} - -void msgpack_rpc_from_float(Float result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_double(res, result); -} - -void msgpack_rpc_from_string(String result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_bin(res, result.size); - msgpack_pack_bin_body(res, result.data, result.size); -} - -void msgpack_rpc_from_object(Object result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - switch (result.type) { - case kObjectTypeNil: - msgpack_pack_nil(res); - break; - - case kObjectTypeBoolean: - msgpack_rpc_from_boolean(result.data.boolean, res); - break; - - case kObjectTypeInteger: - msgpack_rpc_from_integer(result.data.integer, res); - break; - - case kObjectTypeFloat: - msgpack_rpc_from_float(result.data.floating, res); - break; - - case kObjectTypeString: - msgpack_rpc_from_string(result.data.string, res); - break; - - case kObjectTypeArray: - msgpack_rpc_from_array(result.data.array, res); - break; - - case kObjectTypeBuffer: - msgpack_rpc_from_buffer(result.data.buffer, res); - break; - - case kObjectTypeWindow: - msgpack_rpc_from_window(result.data.window, res); - break; - - case kObjectTypeTabpage: - msgpack_rpc_from_tabpage(result.data.tabpage, res); - break; - - case kObjectTypeDictionary: - msgpack_rpc_from_dictionary(result.data.dictionary, res); - break; - } -} - -void msgpack_rpc_from_array(Array result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_array(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_object(result.items[i], res); - } -} - -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) -{ - msgpack_pack_map(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_string(result.items[i].key, res); - msgpack_rpc_from_object(result.items[i].value, res); - } -} diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h deleted file mode 100644 index aede6b1587..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H -#define NVIM_OS_MSGPACK_RPC_HELPERS_H - -#include -#include - -#include - -#include "nvim/api/private/defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H - diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c index d4fffaa053..414c8841fa 100644 --- a/src/nvim/os/provider.c +++ b/src/nvim/os/provider.c @@ -8,7 +8,7 @@ #include "nvim/api/vim.h" #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/shell.h" #include "nvim/os/os.h" #include "nvim/log.h" diff --git a/src/nvim/os/server.c b/src/nvim/os/server.c deleted file mode 100644 index 9f7f5b34da..0000000000 --- a/src/nvim/os/server.c +++ /dev/null @@ -1,273 +0,0 @@ -#include -#include -#include -#include - -#include - -#include "nvim/os/channel.h" -#include "nvim/os/server.h" -#include "nvim/os/os.h" -#include "nvim/ascii.h" -#include "nvim/vim.h" -#include "nvim/memory.h" -#include "nvim/message.h" -#include "nvim/tempfile.h" -#include "nvim/map.h" -#include "nvim/path.h" - -#define MAX_CONNECTIONS 32 -#define ADDRESS_MAX_SIZE 256 -#define NEOVIM_DEFAULT_TCP_PORT 7450 -#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" - -typedef enum { - kServerTypeTcp, - kServerTypePipe -} ServerType; - -typedef struct { - // Type of the union below - ServerType type; - - // This is either a tcp server or unix socket(named pipe on windows) - union { - struct { - uv_tcp_t handle; - struct sockaddr_in addr; - } tcp; - struct { - uv_pipe_t handle; - char addr[ADDRESS_MAX_SIZE]; - } pipe; - } socket; -} Server; - -static PMap(cstr_t) *servers = NULL; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.c.generated.h" -#endif - -/// Initializes the module -bool server_init(void) -{ - servers = pmap_new(cstr_t)(); - - if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) { - char *listen_address = (char *)vim_tempname(); - os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1); - free(listen_address); - } - - return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0; -} - -/// Teardown the server module -void server_teardown(void) -{ - if (!servers) { - return; - } - - Server *server; - - map_foreach_value(servers, server, { - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - }); -} - -/// Starts listening on arbitrary tcp/unix addresses specified by -/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will -/// be determined by parsing `endpoint`: If it's a valid tcp address in the -/// 'ip[:port]' format, then it will be tcp socket. The port is optional -/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will -/// be a unix socket or named pipe. -/// -/// @param endpoint Address of the server. Either a 'ip[:port]' string or an -/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or -/// named pipe. -/// @returns zero if successful, one on a regular error, and negative errno -/// on failure to bind or connect. -int server_start(const char *endpoint) - FUNC_ATTR_NONNULL_ALL -{ - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { - // TODO(aktau): since this is not what the user wanted, perhaps we - // should return an error here - EMSG2("Address was too long, truncated to %s", addr); - } - - // Check if the server already exists - if (pmap_has(cstr_t)(servers, addr)) { - EMSG2("Already listening on %s", addr); - return 1; - } - - ServerType server_type = kServerTypeTcp; - Server *server = xmalloc(sizeof(Server)); - char ip[16], *ip_end = strrchr(addr, ':'); - - if (!ip_end) { - ip_end = strchr(addr, NUL); - } - - uint32_t addr_len = ip_end - addr; - - if (addr_len > sizeof(ip) - 1) { - // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) - addr_len = sizeof(ip) - 1; - } - - // Extract the address part - xstrlcpy(ip, addr, addr_len + 1); - - int port = NEOVIM_DEFAULT_TCP_PORT; - - if (*ip_end == ':') { - // Extract the port - long lport = strtol(ip_end + 1, NULL, 10); // NOLINT - if (lport <= 0 || lport > 0xffff) { - // Invalid port, treat as named pipe or unix socket - server_type = kServerTypePipe; - } else { - port = (int) lport; - } - } - - if (server_type == kServerTypeTcp) { - // Try to parse ip address - if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { - // Invalid address, treat as named pipe or unix socket - server_type = kServerTypePipe; - } - } - - int result; - - if (server_type == kServerTypeTcp) { - // Listen on tcp address/port - uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); - server->socket.tcp.handle.data = server; - result = uv_tcp_bind(&server->socket.tcp.handle, - (const struct sockaddr *)&server->socket.tcp.addr, - 0); - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, - MAX_CONNECTIONS, - connection_cb); - if (result) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } - } - } else { - // Listen on named pipe or unix socket - xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr)); - uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); - server->socket.pipe.handle.data = server; - result = uv_pipe_bind(&server->socket.pipe.handle, - server->socket.pipe.addr); - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.pipe.handle, - MAX_CONNECTIONS, - connection_cb); - - if (result) { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - } - } - - assert(result <= 0); // libuv should have returned -errno or zero. - if (result < 0) { - if (result == -EACCES) { - // Libuv converts ENOENT to EACCES for Windows compatibility, but if - // the parent directory does not exist, ENOENT would be more accurate. - *path_tail((char_u *) addr) = NUL; - if (!os_file_exists((char_u *) addr)) { - result = -ENOENT; - } - } - EMSG2("Failed to start server: %s", uv_strerror(result)); - free(server); - return result; - } - - server->type = server_type; - - // Add the server to the hash table - pmap_put(cstr_t)(servers, addr, server); - - return 0; -} - -/// Stops listening on the address specified by `endpoint`. -/// -/// @param endpoint Address of the server. -void server_stop(char *endpoint) -{ - Server *server; - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - xstrlcpy(addr, endpoint, sizeof(addr)); - - if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) { - EMSG2("Not listening on %s", addr); - return; - } - - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - - pmap_del(cstr_t)(servers, addr); -} - -static void connection_cb(uv_stream_t *server, int status) -{ - int result; - uv_stream_t *client; - Server *srv = server->data; - - if (status < 0) { - abort(); - } - - if (srv->type == kServerTypeTcp) { - client = xmalloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); - } else { - client = xmalloc(sizeof(uv_pipe_t)); - uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); - } - - result = uv_accept(server, client); - - if (result) { - EMSG2("Failed to accept connection: %s", uv_strerror(result)); - uv_close((uv_handle_t *)client, free_client); - return; - } - - channel_from_stream(client); -} - -static void free_client(uv_handle_t *handle) -{ - free(handle); -} - -static void free_server(uv_handle_t *handle) -{ - free(handle->data); -} diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h deleted file mode 100644 index 43592a91e4..0000000000 --- a/src/nvim/os/server.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_H -#define NVIM_OS_SERVER_H - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.h.generated.h" -#endif -#endif // NVIM_OS_SERVER_H diff --git a/src/nvim/os/server_defs.h b/src/nvim/os/server_defs.h deleted file mode 100644 index 08cdf55428..0000000000 --- a/src/nvim/os/server_defs.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_DEFS_H -#define NVIM_OS_SERVER_DEFS_H - -typedef struct server Server; - -#endif // NVIM_OS_SERVER_DEFS_H - diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c index 52f57f8262..0ad15bc433 100644 --- a/src/nvim/os_unix.c +++ b/src/nvim/os_unix.c @@ -54,8 +54,8 @@ #include "nvim/os/shell.h" #include "nvim/os/signal.h" #include "nvim/os/job.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/defs.h" #if defined(HAVE_SYS_IOCTL_H) # include @@ -166,8 +166,6 @@ void mch_init(void) mac_conv_init(); #endif - msgpack_rpc_init(); - msgpack_rpc_helpers_init(); event_init(); } -- cgit