aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r--src/nvim/msgpack_rpc/channel.c597
-rw-r--r--src/nvim/msgpack_rpc/channel.h15
-rw-r--r--src/nvim/msgpack_rpc/defs.h34
-rw-r--r--src/nvim/msgpack_rpc/helpers.c463
-rw-r--r--src/nvim/msgpack_rpc/helpers.h17
-rw-r--r--src/nvim/msgpack_rpc/server.c273
-rw-r--r--src/nvim/msgpack_rpc/server.h7
7 files changed, 1406 insertions, 0 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
new file mode 100644
index 0000000000..dcd7e41737
--- /dev/null
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -0,0 +1,597 @@
+#include <stdbool.h>
+#include <string.h>
+#include <inttypes.h>
+
+#include <uv.h>
+#include <msgpack.h>
+
+#include "nvim/api/private/helpers.h"
+#include "nvim/api/vim.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/os/event.h"
+#include "nvim/os/rstream.h"
+#include "nvim/os/rstream_defs.h"
+#include "nvim/os/wstream.h"
+#include "nvim/os/wstream_defs.h"
+#include "nvim/os/job.h"
+#include "nvim/os/job_defs.h"
+#include "nvim/msgpack_rpc/helpers.h"
+#include "nvim/vim.h"
+#include "nvim/ascii.h"
+#include "nvim/memory.h"
+#include "nvim/os_unix.h"
+#include "nvim/message.h"
+#include "nvim/term.h"
+#include "nvim/map.h"
+#include "nvim/log.h"
+#include "nvim/misc1.h"
+#include "nvim/lib/kvec.h"
+
+#define CHANNEL_BUFFER_SIZE 0xffff
+
+typedef struct {
+ uint64_t request_id;
+ bool errored;
+ Object result;
+} ChannelCallFrame;
+
+typedef struct {
+ uint64_t id;
+ PMap(cstr_t) *subscribed_events;
+ bool is_job, enabled;
+ msgpack_unpacker *unpacker;
+ union {
+ Job *job;
+ struct {
+ RStream *read;
+ WStream *write;
+ uv_stream_t *uv;
+ } streams;
+ } data;
+ uint64_t next_request_id;
+ kvec_t(ChannelCallFrame *) call_stack;
+ size_t rpc_call_level;
+} Channel;
+
+static uint64_t next_id = 1;
+static PMap(uint64_t) *channels = NULL;
+static PMap(cstr_t) *event_strings = NULL;
+static msgpack_sbuffer out_buffer;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/channel.c.generated.h"
+#endif
+
+/// Initializes the module
+void channel_init(void)
+{
+ channels = pmap_new(uint64_t)();
+ event_strings = pmap_new(cstr_t)();
+ msgpack_sbuffer_init(&out_buffer);
+
+ if (embedded_mode) {
+ channel_from_stdio();
+ }
+}
+
+/// Teardown the module
+void channel_teardown(void)
+{
+ if (!channels) {
+ return;
+ }
+
+ Channel *channel;
+
+ map_foreach_value(channels, channel, {
+ close_channel(channel);
+ });
+}
+
+/// Creates an API channel by starting a job and connecting to its
+/// stdin/stdout. stderr is forwarded to the editor error stream.
+///
+/// @param argv The argument vector for the process
+/// @return The channel id
+uint64_t channel_from_job(char **argv)
+{
+ Channel *channel = register_channel();
+ channel->is_job = true;
+
+ int status;
+ channel->data.job = job_start(argv,
+ channel,
+ job_out,
+ job_err,
+ NULL,
+ 0,
+ &status);
+
+ if (status <= 0) {
+ close_channel(channel);
+ return 0;
+ }
+
+ return channel->id;
+}
+
+/// Creates an API channel from a libuv stream representing a tcp or
+/// pipe/socket client connection
+///
+/// @param stream The established connection
+void channel_from_stream(uv_stream_t *stream)
+{
+ Channel *channel = register_channel();
+ stream->data = NULL;
+ channel->is_job = false;
+ // read stream
+ channel->data.streams.read = rstream_new(parse_msgpack,
+ rbuffer_new(CHANNEL_BUFFER_SIZE),
+ channel,
+ NULL);
+ rstream_set_stream(channel->data.streams.read, stream);
+ rstream_start(channel->data.streams.read);
+ // write stream
+ channel->data.streams.write = wstream_new(0);
+ wstream_set_stream(channel->data.streams.write, stream);
+ channel->data.streams.uv = stream;
+}
+
+bool channel_exists(uint64_t id)
+{
+ Channel *channel;
+ return (channel = pmap_get(uint64_t)(channels, id)) != NULL
+ && channel->enabled;
+}
+
+/// Sends event/arguments to channel
+///
+/// @param id The channel id. If 0, the event will be sent to all
+/// channels that have subscribed to the event type
+/// @param name The event name, an arbitrary string
+/// @param args Array with event arguments
+/// @return True if the event was sent successfully, false otherwise.
+bool channel_send_event(uint64_t id, char *name, Array args)
+{
+ Channel *channel = NULL;
+
+ if (id > 0) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ api_free_array(args);
+ return false;
+ }
+ send_event(channel, name, args);
+ } else {
+ broadcast_event(name, args);
+ }
+
+ return true;
+}
+
+/// Sends a method call to a channel
+///
+/// @param id The channel id
+/// @param method_name The method name, an arbitrary string
+/// @param args Array with method arguments
+/// @param[out] error True if the return value is an error
+/// @return Whatever the remote method returned
+Object channel_send_call(uint64_t id,
+ char *method_name,
+ Array args,
+ Error *err)
+{
+ Channel *channel = NULL;
+
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
+ api_free_array(args);
+ return NIL;
+ }
+
+ if (kv_size(channel->call_stack) > 20) {
+ // 20 stack depth is more than anyone should ever need for RPC calls
+ api_set_error(err,
+ Exception,
+ _("Channel %" PRIu64 " crossed maximum stack depth"),
+ channel->id);
+ api_free_array(args);
+ return NIL;
+ }
+
+ uint64_t request_id = channel->next_request_id++;
+ // Send the msgpack-rpc request
+ send_request(channel, request_id, method_name, args);
+
+ EventSource channel_source = channel->is_job
+ ? job_event_source(channel->data.job)
+ : rstream_event_source(channel->data.streams.read);
+ EventSource sources[] = {channel_source, NULL};
+
+ // Push the frame
+ ChannelCallFrame frame = {request_id, false, NIL};
+ kv_push(ChannelCallFrame *, channel->call_stack, &frame);
+ size_t size = kv_size(channel->call_stack);
+
+ do {
+ event_poll(-1, sources);
+ } while (
+ // Continue running if ...
+ channel->enabled && // the channel is still enabled
+ kv_size(channel->call_stack) >= size); // the call didn't return
+
+ if (frame.errored) {
+ api_set_error(err, Exception, "%s", frame.result.data.string.data);
+ return NIL;
+ }
+
+ return frame.result;
+}
+
+/// Subscribes to event broadcasts
+///
+/// @param id The channel id
+/// @param event The event type string
+void channel_subscribe(uint64_t id, char *event)
+{
+ Channel *channel;
+
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ abort();
+ }
+
+ char *event_string = pmap_get(cstr_t)(event_strings, event);
+
+ if (!event_string) {
+ event_string = xstrdup(event);
+ pmap_put(cstr_t)(event_strings, event_string, event_string);
+ }
+
+ pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
+}
+
+/// Unsubscribes to event broadcasts
+///
+/// @param id The channel id
+/// @param event The event type string
+void channel_unsubscribe(uint64_t id, char *event)
+{
+ Channel *channel;
+
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ abort();
+ }
+
+ unsubscribe(channel, event);
+}
+
+/// Closes a channel
+///
+/// @param id The channel id
+/// @return true if successful, false otherwise
+bool channel_close(uint64_t id)
+{
+ Channel *channel;
+
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ return false;
+ }
+
+ channel_kill(channel);
+ channel->enabled = false;
+ return true;
+}
+
+/// Creates an API channel from stdin/stdout. This is used when embedding
+/// Neovim
+static void channel_from_stdio(void)
+{
+ Channel *channel = register_channel();
+ channel->is_job = false;
+ // read stream
+ channel->data.streams.read = rstream_new(parse_msgpack,
+ rbuffer_new(CHANNEL_BUFFER_SIZE),
+ channel,
+ NULL);
+ rstream_set_file(channel->data.streams.read, 0);
+ rstream_start(channel->data.streams.read);
+ // write stream
+ channel->data.streams.write = wstream_new(0);
+ wstream_set_file(channel->data.streams.write, 1);
+ channel->data.streams.uv = NULL;
+}
+
+static void job_out(RStream *rstream, void *data, bool eof)
+{
+ Job *job = data;
+ parse_msgpack(rstream, job_data(job), eof);
+}
+
+static void job_err(RStream *rstream, void *data, bool eof)
+{
+ size_t count;
+ char buf[256];
+ Channel *channel = job_data(data);
+
+ while ((count = rstream_pending(rstream))) {
+ size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
+ buf[read] = NUL;
+ ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
+ }
+}
+
+static void parse_msgpack(RStream *rstream, void *data, bool eof)
+{
+ Channel *channel = data;
+ channel->rpc_call_level++;
+
+ if (eof) {
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Before returning from a RPC call, channel %" PRIu64 " was "
+ "closed by the client",
+ channel->id);
+ call_set_error(channel, buf);
+ goto end;
+ }
+
+ uint32_t count = rstream_pending(rstream);
+ DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
+ count,
+ rstream);
+
+ // Feed the unpacker with data
+ msgpack_unpacker_reserve_buffer(channel->unpacker, count);
+ rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+
+ msgpack_unpacked unpacked;
+ msgpack_unpacked_init(&unpacked);
+ msgpack_unpack_return result;
+
+ // Deserialize everything we can.
+ while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
+ MSGPACK_UNPACK_SUCCESS) {
+ if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
+ if (is_valid_rpc_response(&unpacked.data, channel)) {
+ call_stack_pop(&unpacked.data, channel);
+ } else {
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Channel %" PRIu64 " returned a response that doesn't have "
+ " a matching id for the current RPC call. Ensure the client "
+ " is properly synchronized",
+ channel->id);
+ call_set_error(channel, buf);
+ }
+ msgpack_unpacked_destroy(&unpacked);
+ // Bail out from this event loop iteration
+ goto end;
+ }
+
+ // Perform the call
+ WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
+ // write the response
+ if (!channel_write(channel, resp)) {
+ goto end;
+ }
+ }
+
+ if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
+ OUT_STR(e_outofmem);
+ out_char('\n');
+ preserve_exit();
+ }
+
+ if (result == MSGPACK_UNPACK_PARSE_ERROR) {
+ // See src/msgpack/unpack_template.h in msgpack source tree for
+ // causes for this error(search for 'goto _failed')
+ //
+ // A not so uncommon cause for this might be deserializing objects with
+ // a high nesting level: msgpack will break when it's internal parse stack
+ // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
+ send_error(channel, 0, "Invalid msgpack payload. "
+ "This error can also happen when deserializing "
+ "an object with high level of nesting");
+ }
+
+end:
+ channel->rpc_call_level--;
+ if (!channel->enabled && !kv_size(channel->call_stack)) {
+ // Now it's safe to destroy the channel
+ close_channel(channel);
+ }
+}
+
+static bool channel_write(Channel *channel, WBuffer *buffer)
+{
+ bool success;
+
+ if (channel->is_job) {
+ success = job_write(channel->data.job, buffer);
+ } else {
+ success = wstream_write(channel->data.streams.write, buffer);
+ }
+
+ if (!success) {
+ // If the write failed for any reason, close the channel
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Before returning from a RPC call, channel %" PRIu64 " was "
+ "closed due to a failed write",
+ channel->id);
+ call_set_error(channel, buf);
+ }
+
+ return success;
+}
+
+static void send_error(Channel *channel, uint64_t id, char *err)
+{
+ Error e = ERROR_INIT;
+ api_set_error(&e, Exception, "%s", err);
+ channel_write(channel, serialize_response(id, &e, NIL, &out_buffer));
+}
+
+static void send_request(Channel *channel,
+ uint64_t id,
+ char *name,
+ Array args)
+{
+ String method = {.size = strlen(name), .data = name};
+ channel_write(channel, serialize_request(id, method, args, &out_buffer, 1));
+}
+
+static void send_event(Channel *channel,
+ char *name,
+ Array args)
+{
+ String method = {.size = strlen(name), .data = name};
+ channel_write(channel, serialize_request(0, method, args, &out_buffer, 1));
+}
+
+static void broadcast_event(char *name, Array args)
+{
+ kvec_t(Channel *) subscribed;
+ kv_init(subscribed);
+ Channel *channel;
+
+ map_foreach_value(channels, channel, {
+ if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
+ kv_push(Channel *, subscribed, channel);
+ }
+ });
+
+ if (!kv_size(subscribed)) {
+ api_free_array(args);
+ goto end;
+ }
+
+ String method = {.size = strlen(name), .data = name};
+ WBuffer *buffer = serialize_request(0,
+ method,
+ args,
+ &out_buffer,
+ kv_size(subscribed));
+
+ for (size_t i = 0; i < kv_size(subscribed); i++) {
+ channel_write(kv_A(subscribed, i), buffer);
+ }
+
+end:
+ kv_destroy(subscribed);
+}
+
+static void unsubscribe(Channel *channel, char *event)
+{
+ char *event_string = pmap_get(cstr_t)(event_strings, event);
+ pmap_del(cstr_t)(channel->subscribed_events, event_string);
+
+ map_foreach_value(channels, channel, {
+ if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
+ return;
+ }
+ });
+
+ // Since the string is no longer used by other channels, release it's memory
+ pmap_del(cstr_t)(event_strings, event_string);
+ free(event_string);
+}
+
+static void close_channel(Channel *channel)
+{
+ pmap_del(uint64_t)(channels, channel->id);
+ msgpack_unpacker_free(channel->unpacker);
+
+ // Unsubscribe from all events
+ char *event_string;
+ map_foreach_value(channel->subscribed_events, event_string, {
+ unsubscribe(channel, event_string);
+ });
+
+ pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
+ channel_kill(channel);
+
+ free(channel);
+}
+
+static void channel_kill(Channel *channel)
+{
+ if (channel->is_job) {
+ if (channel->data.job) {
+ job_stop(channel->data.job);
+ }
+ } else {
+ rstream_free(channel->data.streams.read);
+ wstream_free(channel->data.streams.write);
+ if (channel->data.streams.uv) {
+ uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
+ } else {
+ // When the stdin channel closes, it's time to go
+ mch_exit(0);
+ }
+ }
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ free(handle->data);
+ free(handle);
+}
+
+static Channel *register_channel(void)
+{
+ Channel *rv = xmalloc(sizeof(Channel));
+ rv->enabled = true;
+ rv->rpc_call_level = 0;
+ rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ rv->id = next_id++;
+ rv->subscribed_events = pmap_new(cstr_t)();
+ rv->next_request_id = 1;
+ kv_init(rv->call_stack);
+ pmap_put(uint64_t)(channels, rv->id, rv);
+ return rv;
+}
+
+static bool is_rpc_response(msgpack_object *obj)
+{
+ return obj->type == MSGPACK_OBJECT_ARRAY
+ && obj->via.array.size == 4
+ && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
+ && obj->via.array.ptr[0].via.u64 == 1
+ && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
+}
+
+static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
+{
+ uint64_t response_id = obj->via.array.ptr[1].via.u64;
+ // Must be equal to the frame at the stack's bottom
+ return response_id == kv_A(channel->call_stack,
+ kv_size(channel->call_stack) - 1)->request_id;
+}
+
+static void call_stack_pop(msgpack_object *obj, Channel *channel)
+{
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
+ frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
+
+ if (frame->errored) {
+ msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
+ } else {
+ msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
+ }
+}
+
+static void call_set_error(Channel *channel, char *msg)
+{
+ for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
+ frame->errored = true;
+ frame->result = STRING_OBJ(cstr_to_string(msg));
+ }
+
+ channel->enabled = false;
+}
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
new file mode 100644
index 0000000000..df742fe368
--- /dev/null
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -0,0 +1,15 @@
+#ifndef NVIM_MSGPACK_RPC_CHANNEL_H
+#define NVIM_MSGPACK_RPC_CHANNEL_H
+
+#include <stdbool.h>
+#include <uv.h>
+
+#include "nvim/api/private/defs.h"
+#include "nvim/vim.h"
+
+#define METHOD_MAXLEN 512
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/channel.h.generated.h"
+#endif
+#endif // NVIM_MSGPACK_RPC_CHANNEL_H
diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h
new file mode 100644
index 0000000000..5eec4ced54
--- /dev/null
+++ b/src/nvim/msgpack_rpc/defs.h
@@ -0,0 +1,34 @@
+#ifndef NVIM_MSGPACK_RPC_DEFS_H
+#define NVIM_MSGPACK_RPC_DEFS_H
+
+#include <msgpack.h>
+
+
+/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores
+/// functions of this type.
+typedef Object (*rpc_method_handler_fn)(uint64_t channel_id,
+ msgpack_object *req,
+ Error *error);
+
+/// Initializes the msgpack-rpc method table
+void msgpack_rpc_init_method_table(void);
+
+void msgpack_rpc_init_function_metadata(Dictionary *metadata);
+
+/// Dispatches to the actual API function after basic payload validation by
+/// `msgpack_rpc_call`. It is responsible for validating/converting arguments
+/// to C types, and converting the return value back to msgpack types.
+/// The implementation is generated at compile time with metadata extracted
+/// from the api/*.h headers,
+///
+/// @param channel_id The channel id
+/// @param method_id The method id
+/// @param req The parsed request object
+/// @param error Pointer to error structure
+/// @return Some object
+Object msgpack_rpc_dispatch(uint64_t channel_id,
+ msgpack_object *req,
+ Error *error)
+ FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
+
+#endif // NVIM_MSGPACK_RPC_DEFS_H
diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c
new file mode 100644
index 0000000000..4b96e4985e
--- /dev/null
+++ b/src/nvim/msgpack_rpc/helpers.c
@@ -0,0 +1,463 @@
+#include <stdint.h>
+#include <stdbool.h>
+#include <inttypes.h>
+
+#include <msgpack.h>
+
+#include "nvim/api/private/helpers.h"
+#include "nvim/msgpack_rpc/helpers.h"
+#include "nvim/msgpack_rpc/defs.h"
+#include "nvim/vim.h"
+#include "nvim/log.h"
+#include "nvim/memory.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/helpers.c.generated.h"
+#endif
+
+static msgpack_zone zone;
+static msgpack_sbuffer sbuffer;
+
+#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \
+ bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
+ FUNC_ATTR_NONNULL_ALL \
+ { \
+ if (obj->type != MSGPACK_OBJECT_EXT \
+ || obj->via.ext.type != kObjectType##t) { \
+ return false; \
+ } \
+ \
+ msgpack_object data; \
+ msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \
+ obj->via.ext.size, \
+ NULL, \
+ &zone, \
+ &data); \
+ \
+ if (ret != MSGPACK_UNPACK_SUCCESS) { \
+ return false; \
+ } \
+ \
+ *arg = data.via.u64; \
+ return true; \
+ } \
+ \
+ void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \
+ FUNC_ATTR_NONNULL_ARG(2) \
+ { \
+ msgpack_packer pac; \
+ msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \
+ msgpack_pack_uint64(&pac, o); \
+ msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \
+ msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \
+ msgpack_sbuffer_clear(&sbuffer); \
+ }
+
+void msgpack_rpc_helpers_init(void)
+{
+ msgpack_zone_init(&zone, 0xfff);
+ msgpack_sbuffer_init(&sbuffer);
+}
+
+HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer)
+HANDLE_TYPE_CONVERSION_IMPL(Window, window)
+HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage)
+
+bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ *arg = obj->via.boolean;
+ return obj->type == MSGPACK_OBJECT_BOOLEAN;
+}
+
+bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
+ && obj->via.u64 <= INT64_MAX) {
+ *arg = (int64_t)obj->via.u64;
+ return true;
+ }
+
+ *arg = obj->via.i64;
+ return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
+}
+
+bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ *arg = obj->via.dec;
+ return obj->type == MSGPACK_OBJECT_DOUBLE;
+}
+
+bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) {
+ arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size);
+ arg->size = obj->via.bin.size;
+ } else {
+ return false;
+ }
+
+ return true;
+}
+
+bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ switch (obj->type) {
+ case MSGPACK_OBJECT_NIL:
+ arg->type = kObjectTypeNil;
+ return true;
+
+ case MSGPACK_OBJECT_BOOLEAN:
+ arg->type = kObjectTypeBoolean;
+ return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
+
+ case MSGPACK_OBJECT_POSITIVE_INTEGER:
+ case MSGPACK_OBJECT_NEGATIVE_INTEGER:
+ arg->type = kObjectTypeInteger;
+ return msgpack_rpc_to_integer(obj, &arg->data.integer);
+
+ case MSGPACK_OBJECT_DOUBLE:
+ arg->type = kObjectTypeFloat;
+ return msgpack_rpc_to_float(obj, &arg->data.floating);
+
+ case MSGPACK_OBJECT_BIN:
+ case MSGPACK_OBJECT_STR:
+ arg->type = kObjectTypeString;
+ return msgpack_rpc_to_string(obj, &arg->data.string);
+
+ case MSGPACK_OBJECT_ARRAY:
+ arg->type = kObjectTypeArray;
+ return msgpack_rpc_to_array(obj, &arg->data.array);
+
+ case MSGPACK_OBJECT_MAP:
+ arg->type = kObjectTypeDictionary;
+ return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
+
+ case MSGPACK_OBJECT_EXT:
+ switch (obj->via.ext.type) {
+ case kObjectTypeBuffer:
+ return msgpack_rpc_to_buffer(obj, &arg->data.buffer);
+ case kObjectTypeWindow:
+ return msgpack_rpc_to_window(obj, &arg->data.window);
+ case kObjectTypeTabpage:
+ return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage);
+ }
+ default:
+ return false;
+ }
+}
+
+bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (obj->type != MSGPACK_OBJECT_ARRAY) {
+ return false;
+ }
+
+ arg->size = obj->via.array.size;
+ arg->items = xcalloc(obj->via.array.size, sizeof(Object));
+
+ for (uint32_t i = 0; i < obj->via.array.size; i++) {
+ if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (obj->type != MSGPACK_OBJECT_MAP) {
+ return false;
+ }
+
+ arg->size = obj->via.array.size;
+ arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
+
+
+ for (uint32_t i = 0; i < obj->via.map.size; i++) {
+ if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
+ &arg->items[i].key)) {
+ return false;
+ }
+
+ if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
+ &arg->items[i].value)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ if (result) {
+ msgpack_pack_true(res);
+ } else {
+ msgpack_pack_false(res);
+ }
+}
+
+void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ msgpack_pack_int64(res, result);
+}
+
+void msgpack_rpc_from_float(Float result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ msgpack_pack_double(res, result);
+}
+
+void msgpack_rpc_from_string(String result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ msgpack_pack_bin(res, result.size);
+ msgpack_pack_bin_body(res, result.data, result.size);
+}
+
+void msgpack_rpc_from_object(Object result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ switch (result.type) {
+ case kObjectTypeNil:
+ msgpack_pack_nil(res);
+ break;
+
+ case kObjectTypeBoolean:
+ msgpack_rpc_from_boolean(result.data.boolean, res);
+ break;
+
+ case kObjectTypeInteger:
+ msgpack_rpc_from_integer(result.data.integer, res);
+ break;
+
+ case kObjectTypeFloat:
+ msgpack_rpc_from_float(result.data.floating, res);
+ break;
+
+ case kObjectTypeString:
+ msgpack_rpc_from_string(result.data.string, res);
+ break;
+
+ case kObjectTypeArray:
+ msgpack_rpc_from_array(result.data.array, res);
+ break;
+
+ case kObjectTypeBuffer:
+ msgpack_rpc_from_buffer(result.data.buffer, res);
+ break;
+
+ case kObjectTypeWindow:
+ msgpack_rpc_from_window(result.data.window, res);
+ break;
+
+ case kObjectTypeTabpage:
+ msgpack_rpc_from_tabpage(result.data.tabpage, res);
+ break;
+
+ case kObjectTypeDictionary:
+ msgpack_rpc_from_dictionary(result.data.dictionary, res);
+ break;
+ }
+}
+
+void msgpack_rpc_from_array(Array result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ msgpack_pack_array(res, result.size);
+
+ for (size_t i = 0; i < result.size; i++) {
+ msgpack_rpc_from_object(result.items[i], res);
+ }
+}
+
+void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2)
+{
+ msgpack_pack_map(res, result.size);
+
+ for (size_t i = 0; i < result.size; i++) {
+ msgpack_rpc_from_string(result.items[i].key, res);
+ msgpack_rpc_from_object(result.items[i].value, res);
+ }
+}
+
+/// Validates the basic structure of the msgpack-rpc call and fills `res`
+/// with the basic response structure.
+///
+/// @param channel_id The channel id
+/// @param req The parsed request object
+/// @param res A packer that contains the response
+WBuffer *msgpack_rpc_call(uint64_t channel_id,
+ msgpack_object *req,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ARG(2)
+ FUNC_ATTR_NONNULL_ARG(3)
+{
+ uint64_t response_id;
+ Error error = ERROR_INIT;
+ msgpack_rpc_validate(&response_id, req, &error);
+
+ if (error.set) {
+ return serialize_response(response_id, &error, NIL, sbuffer);
+ }
+
+ // dispatch the call
+ Object rv = msgpack_rpc_dispatch(channel_id, req, &error);
+ // send the response
+ msgpack_packer response;
+ msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
+
+ if (error.set) {
+ ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
+ error.msg,
+ response_id);
+ return serialize_response(response_id, &error, NIL, sbuffer);
+ }
+
+ DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
+ response_id);
+ return serialize_response(response_id, &error, rv, sbuffer);
+}
+
+/// Finishes the msgpack-rpc call with an error message.
+///
+/// @param msg The error message
+/// @param res A packer that contains the response
+void msgpack_rpc_error(char *msg, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ALL
+{
+ size_t len = strlen(msg);
+
+ // error message
+ msgpack_pack_bin(res, len);
+ msgpack_pack_bin_body(res, msg, len);
+ // Nil result
+ msgpack_pack_nil(res);
+}
+
+/// Handler executed when an invalid method name is passed
+Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
+ msgpack_object *req,
+ Error *error)
+{
+ snprintf(error->msg, sizeof(error->msg), "Invalid method name");
+ error->set = true;
+ return NIL;
+}
+
+/// Serializes a msgpack-rpc request or notification(id == 0)
+WBuffer *serialize_request(uint64_t request_id,
+ String method,
+ Array args,
+ msgpack_sbuffer *sbuffer,
+ size_t refcount)
+ FUNC_ATTR_NONNULL_ARG(4)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, request_id ? 4 : 3);
+ msgpack_pack_int(&pac, request_id ? 0 : 2);
+
+ if (request_id) {
+ msgpack_pack_uint64(&pac, request_id);
+ }
+
+ msgpack_pack_bin(&pac, method.size);
+ msgpack_pack_bin_body(&pac, method.data, method.size);
+ msgpack_rpc_from_array(args, &pac);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ refcount,
+ free);
+ api_free_array(args);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+/// Serializes a msgpack-rpc response
+WBuffer *serialize_response(uint64_t response_id,
+ Error *err,
+ Object arg,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ARG(2, 4)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, 4);
+ msgpack_pack_int(&pac, 1);
+ msgpack_pack_uint64(&pac, response_id);
+
+ if (err->set) {
+ // error represented by a [type, message] array
+ msgpack_pack_array(&pac, 2);
+ msgpack_rpc_from_integer(err->type, &pac);
+ msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
+ // Nil result
+ msgpack_pack_nil(&pac);
+ } else {
+ // Nil error
+ msgpack_pack_nil(&pac);
+ // Return value
+ msgpack_rpc_from_object(arg, &pac);
+ }
+
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ 1, // responses only go though 1 channel
+ free);
+ api_free_object(arg);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+void msgpack_rpc_validate(uint64_t *response_id,
+ msgpack_object *req,
+ Error *err)
+{
+ // response id not known yet
+
+ *response_id = 0;
+ // Validate the basic structure of the msgpack-rpc payload
+ if (req->type != MSGPACK_OBJECT_ARRAY) {
+ api_set_error(err, Validation, _("Request is not an array"));
+ }
+
+ if (req->via.array.size != 4) {
+ api_set_error(err, Validation, _("Request array size should be 4"));
+ }
+
+ if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ api_set_error(err, Validation, _("Id must be a positive integer"));
+ }
+
+ // Set the response id, which is the same as the request
+ *response_id = req->via.array.ptr[1].via.u64;
+
+ if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ api_set_error(err, Validation, _("Message type must be an integer"));
+ }
+
+ if (req->via.array.ptr[0].via.u64 != 0) {
+ api_set_error(err, Validation, _("Message type must be 0"));
+ }
+
+ if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
+ && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
+ api_set_error(err, Validation, _("Method must be a string"));
+ }
+
+ if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
+ api_set_error(err, Validation, _("Paremeters must be an array"));
+ }
+}
diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h
new file mode 100644
index 0000000000..bf161d54e0
--- /dev/null
+++ b/src/nvim/msgpack_rpc/helpers.h
@@ -0,0 +1,17 @@
+#ifndef NVIM_MSGPACK_RPC_HELPERS_H
+#define NVIM_MSGPACK_RPC_HELPERS_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <msgpack.h>
+
+#include "nvim/os/wstream.h"
+#include "nvim/api/private/defs.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/helpers.h.generated.h"
+#endif
+
+#endif // NVIM_MSGPACK_RPC_HELPERS_H
+
diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c
new file mode 100644
index 0000000000..33e01fe562
--- /dev/null
+++ b/src/nvim/msgpack_rpc/server.c
@@ -0,0 +1,273 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+
+#include <uv.h>
+
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
+#include "nvim/os/os.h"
+#include "nvim/ascii.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+#include "nvim/message.h"
+#include "nvim/tempfile.h"
+#include "nvim/map.h"
+#include "nvim/path.h"
+
+#define MAX_CONNECTIONS 32
+#define ADDRESS_MAX_SIZE 256
+#define NEOVIM_DEFAULT_TCP_PORT 7450
+#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS"
+
+typedef enum {
+ kServerTypeTcp,
+ kServerTypePipe
+} ServerType;
+
+typedef struct {
+ // Type of the union below
+ ServerType type;
+
+ // This is either a tcp server or unix socket(named pipe on windows)
+ union {
+ struct {
+ uv_tcp_t handle;
+ struct sockaddr_in addr;
+ } tcp;
+ struct {
+ uv_pipe_t handle;
+ char addr[ADDRESS_MAX_SIZE];
+ } pipe;
+ } socket;
+} Server;
+
+static PMap(cstr_t) *servers = NULL;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/server.c.generated.h"
+#endif
+
+/// Initializes the module
+bool server_init(void)
+{
+ servers = pmap_new(cstr_t)();
+
+ if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) {
+ char *listen_address = (char *)vim_tempname();
+ os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1);
+ free(listen_address);
+ }
+
+ return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0;
+}
+
+/// Teardown the server module
+void server_teardown(void)
+{
+ if (!servers) {
+ return;
+ }
+
+ Server *server;
+
+ map_foreach_value(servers, server, {
+ if (server->type == kServerTypeTcp) {
+ uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
+ } else {
+ uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
+ }
+ });
+}
+
+/// Starts listening on arbitrary tcp/unix addresses specified by
+/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will
+/// be determined by parsing `endpoint`: If it's a valid tcp address in the
+/// 'ip[:port]' format, then it will be tcp socket. The port is optional
+/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will
+/// be a unix socket or named pipe.
+///
+/// @param endpoint Address of the server. Either a 'ip[:port]' string or an
+/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or
+/// named pipe.
+/// @returns zero if successful, one on a regular error, and negative errno
+/// on failure to bind or connect.
+int server_start(const char *endpoint)
+ FUNC_ATTR_NONNULL_ALL
+{
+ char addr[ADDRESS_MAX_SIZE];
+
+ // Trim to `ADDRESS_MAX_SIZE`
+ if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) {
+ // TODO(aktau): since this is not what the user wanted, perhaps we
+ // should return an error here
+ EMSG2("Address was too long, truncated to %s", addr);
+ }
+
+ // Check if the server already exists
+ if (pmap_has(cstr_t)(servers, addr)) {
+ EMSG2("Already listening on %s", addr);
+ return 1;
+ }
+
+ ServerType server_type = kServerTypeTcp;
+ Server *server = xmalloc(sizeof(Server));
+ char ip[16], *ip_end = strrchr(addr, ':');
+
+ if (!ip_end) {
+ ip_end = strchr(addr, NUL);
+ }
+
+ uint32_t addr_len = ip_end - addr;
+
+ if (addr_len > sizeof(ip) - 1) {
+ // Maximum length of an IP address buffer is 15(eg: 255.255.255.255)
+ addr_len = sizeof(ip) - 1;
+ }
+
+ // Extract the address part
+ xstrlcpy(ip, addr, addr_len + 1);
+
+ int port = NEOVIM_DEFAULT_TCP_PORT;
+
+ if (*ip_end == ':') {
+ // Extract the port
+ long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
+ if (lport <= 0 || lport > 0xffff) {
+ // Invalid port, treat as named pipe or unix socket
+ server_type = kServerTypePipe;
+ } else {
+ port = (int) lport;
+ }
+ }
+
+ if (server_type == kServerTypeTcp) {
+ // Try to parse ip address
+ if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) {
+ // Invalid address, treat as named pipe or unix socket
+ server_type = kServerTypePipe;
+ }
+ }
+
+ int result;
+
+ if (server_type == kServerTypeTcp) {
+ // Listen on tcp address/port
+ uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle);
+ server->socket.tcp.handle.data = server;
+ result = uv_tcp_bind(&server->socket.tcp.handle,
+ (const struct sockaddr *)&server->socket.tcp.addr,
+ 0);
+ if (result == 0) {
+ result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
+ MAX_CONNECTIONS,
+ connection_cb);
+ if (result) {
+ uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
+ }
+ }
+ } else {
+ // Listen on named pipe or unix socket
+ xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr));
+ uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
+ server->socket.pipe.handle.data = server;
+ result = uv_pipe_bind(&server->socket.pipe.handle,
+ server->socket.pipe.addr);
+ if (result == 0) {
+ result = uv_listen((uv_stream_t *)&server->socket.pipe.handle,
+ MAX_CONNECTIONS,
+ connection_cb);
+
+ if (result) {
+ uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
+ }
+ }
+ }
+
+ assert(result <= 0); // libuv should have returned -errno or zero.
+ if (result < 0) {
+ if (result == -EACCES) {
+ // Libuv converts ENOENT to EACCES for Windows compatibility, but if
+ // the parent directory does not exist, ENOENT would be more accurate.
+ *path_tail((char_u *) addr) = NUL;
+ if (!os_file_exists((char_u *) addr)) {
+ result = -ENOENT;
+ }
+ }
+ EMSG2("Failed to start server: %s", uv_strerror(result));
+ free(server);
+ return result;
+ }
+
+ server->type = server_type;
+
+ // Add the server to the hash table
+ pmap_put(cstr_t)(servers, addr, server);
+
+ return 0;
+}
+
+/// Stops listening on the address specified by `endpoint`.
+///
+/// @param endpoint Address of the server.
+void server_stop(char *endpoint)
+{
+ Server *server;
+ char addr[ADDRESS_MAX_SIZE];
+
+ // Trim to `ADDRESS_MAX_SIZE`
+ xstrlcpy(addr, endpoint, sizeof(addr));
+
+ if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) {
+ EMSG2("Not listening on %s", addr);
+ return;
+ }
+
+ if (server->type == kServerTypeTcp) {
+ uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
+ } else {
+ uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
+ }
+
+ pmap_del(cstr_t)(servers, addr);
+}
+
+static void connection_cb(uv_stream_t *server, int status)
+{
+ int result;
+ uv_stream_t *client;
+ Server *srv = server->data;
+
+ if (status < 0) {
+ abort();
+ }
+
+ if (srv->type == kServerTypeTcp) {
+ client = xmalloc(sizeof(uv_tcp_t));
+ uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client);
+ } else {
+ client = xmalloc(sizeof(uv_pipe_t));
+ uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0);
+ }
+
+ result = uv_accept(server, client);
+
+ if (result) {
+ EMSG2("Failed to accept connection: %s", uv_strerror(result));
+ uv_close((uv_handle_t *)client, free_client);
+ return;
+ }
+
+ channel_from_stream(client);
+}
+
+static void free_client(uv_handle_t *handle)
+{
+ free(handle);
+}
+
+static void free_server(uv_handle_t *handle)
+{
+ free(handle->data);
+}
diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h
new file mode 100644
index 0000000000..f1a6703938
--- /dev/null
+++ b/src/nvim/msgpack_rpc/server.h
@@ -0,0 +1,7 @@
+#ifndef NVIM_MSGPACK_RPC_SERVER_H
+#define NVIM_MSGPACK_RPC_SERVER_H
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/server.h.generated.h"
+#endif
+#endif // NVIM_MSGPACK_RPC_SERVER_H