aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c99
1 files changed, 93 insertions, 6 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 91c26ca21e..6ddda10c5f 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -5,6 +5,8 @@
#include <uv.h>
#include <msgpack.h>
+#include "nvim/lib/klist.h"
+
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/msgpack_rpc/channel.h"
@@ -52,6 +54,17 @@ typedef struct {
kvec_t(ChannelCallFrame *) call_stack;
} Channel;
+typedef struct {
+ Channel *channel;
+ MsgpackRpcRequestHandler handler;
+ Array args;
+ uint64_t request_id;
+} RequestEvent;
+
+#define RequestEventFreer(x)
+KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer)
+kmempool_t(RequestEventPool) *request_event_pool = NULL;
+
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
@@ -64,6 +77,7 @@ static msgpack_sbuffer out_buffer;
/// Initializes the module
void channel_init(void)
{
+ request_event_pool = kmp_init(RequestEventPool);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@@ -352,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
goto end;
}
- // Perform the call
- WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
- // write the response
- if (!channel_write(channel, resp)) {
- goto end;
- }
+ handle_request(channel, &unpacked.data);
}
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
@@ -387,6 +396,84 @@ end:
}
}
+static void handle_request(Channel *channel, msgpack_object *request)
+ FUNC_ATTR_NONNULL_ALL
+{
+ uint64_t request_id;
+ Error error = ERROR_INIT;
+ msgpack_rpc_validate(&request_id, request, &error);
+
+ if (error.set) {
+ // Validation failed, send response with error
+ channel_write(channel,
+ serialize_response(request_id, &error, NIL, &out_buffer));
+ return;
+ }
+
+ // Retrieve the request handler
+ MsgpackRpcRequestHandler handler;
+ msgpack_object method = request->via.array.ptr[2];
+
+ if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
+ handler = msgpack_rpc_get_handler_for(method.via.bin.ptr,
+ method.via.bin.size);
+ } else {
+ handler.fn = msgpack_rpc_handle_missing_method;
+ handler.defer = false;
+ }
+
+ Array args;
+ msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
+
+ if (kv_size(channel->call_stack) || !handler.defer) {
+ call_request_handler(channel, handler, args, request_id);
+ return;
+ }
+
+ // Defer calling the request handler.
+ RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
+ event_data->channel = channel;
+ event_data->handler = handler;
+ event_data->args = args;
+ event_data->request_id = request_id;
+ event_push((Event) {
+ .handler = on_request_event,
+ .data = event_data
+ });
+}
+
+static void on_request_event(Event event)
+{
+ RequestEvent *e = event.data;
+ call_request_handler(e->channel, e->handler, e->args, e->request_id);
+ kmp_free(RequestEventPool, request_event_pool, e);
+}
+
+static void call_request_handler(Channel *channel,
+ MsgpackRpcRequestHandler handler,
+ Array args,
+ uint64_t request_id)
+{
+ Error error = ERROR_INIT;
+ Object result = handler.fn(channel->id, request_id, args, &error);
+ // send the response
+ msgpack_packer response;
+ msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
+
+ if (error.set) {
+ ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
+ error.msg,
+ request_id);
+ channel_write(channel,
+ serialize_response(request_id, &error, NIL, &out_buffer));
+ }
+
+ DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
+ request_id);
+ channel_write(channel,
+ serialize_response(request_id, &error, result, &out_buffer));
+}
+
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;