aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c247
1 files changed, 104 insertions, 143 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index ee7359c476..a8fde5a652 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,7 +1,6 @@
#include <assert.h>
#include <inttypes.h>
#include <msgpack/object.h>
-#include <msgpack/pack.h>
#include <msgpack/sbuffer.h>
#include <msgpack/unpack.h>
#include <stdbool.h>
@@ -29,7 +28,7 @@
#include "nvim/message.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/channel_defs.h"
-#include "nvim/msgpack_rpc/helpers.h"
+#include "nvim/msgpack_rpc/packer.h"
#include "nvim/msgpack_rpc/unpacker.h"
#include "nvim/os/input.h"
#include "nvim/rbuffer.h"
@@ -44,73 +43,31 @@
# define NOT "[notify] "
# define ERR "[error] "
-// Cannot define array with negative offsets, so this one is needed to be added
-// to MSGPACK_UNPACK_\* values.
-# define MUR_OFF 2
+# define SEND "->"
+# define RECV "<-"
-static const char *const msgpack_error_messages[] = {
- [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found",
- [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string",
- [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error",
- [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory",
-};
-
-static void log_close(FILE *f)
+static void log_request(char *dir, uint64_t channel_id, uint32_t req_id, const char *name)
{
- fputc('\n', f);
- fflush(f);
- fclose(f);
- log_unlock();
+ DLOGN("RPC %s %" PRIu64 ": %s id=%u: %s\n", dir, channel_id, REQ, req_id, name);
}
-static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed)
+static void log_response(char *dir, uint64_t channel_id, char *kind, uint32_t req_id)
{
- msgpack_unpacked unpacked;
- msgpack_unpacked_init(&unpacked);
- DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
- const msgpack_unpack_return result =
- msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
- switch (result) {
- case MSGPACK_UNPACK_SUCCESS: {
- uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
- msgpack_object_print(f, unpacked.data);
- log_close(f);
- msgpack_unpacked_destroy(&unpacked);
- break;
- }
- case MSGPACK_UNPACK_EXTRA_BYTES:
- case MSGPACK_UNPACK_CONTINUE:
- case MSGPACK_UNPACK_PARSE_ERROR:
- case MSGPACK_UNPACK_NOMEM_ERROR: {
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, ERR);
- fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]);
- log_close(f);
- break;
- }
- }
+ DLOGN("RPC %s %" PRIu64 ": %s id=%u\n", dir, channel_id, kind, req_id);
}
-static void log_client_msg(uint64_t channel_id, bool is_request, const char *name)
+static void log_notify(char *dir, uint64_t channel_id, const char *name)
{
- DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, "%s: %s", is_request ? REQ : RES, name);
- log_close(f);
+ DLOGN("RPC %s %" PRIu64 ": %s %s\n", dir, channel_id, NOT, name);
}
#else
-# define log_client_msg(...)
-# define log_server_msg(...)
+# define log_request(...)
+# define log_response(...)
+# define log_notify(...)
#endif
static Set(cstr_t) event_strings = SET_INIT;
-static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/channel.c.generated.h"
@@ -119,7 +76,6 @@ static msgpack_sbuffer out_buffer;
void rpc_init(void)
{
ch_before_blocking_events = multiqueue_new_child(main_loop.events);
- msgpack_sbuffer_init(&out_buffer);
}
void rpc_start(Channel *channel)
@@ -169,8 +125,9 @@ bool rpc_send_event(uint64_t id, const char *name, Array args)
return false;
}
+ log_notify(SEND, channel ? channel->id : 0, name);
if (channel) {
- send_event(channel, name, args);
+ serialize_request(&channel, 1, 0, name, args);
} else {
broadcast_event(name, args);
}
@@ -199,7 +156,9 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
RpcState *rpc = &channel->rpc;
uint32_t request_id = rpc->next_request_id++;
// Send the msgpack-rpc request
- send_request(channel, request_id, method_name, args);
+ serialize_request(&channel, 1, request_id, method_name, args);
+
+ log_request(SEND, channel->id, request_id, method_name);
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL, NULL };
@@ -361,8 +320,13 @@ static void parse_msgpack(Channel *channel)
frame->result = p->result;
}
frame->result_mem = arena_finish(&p->arena);
+ log_response(RECV, channel->id, frame->errored ? ERR : RES, p->request_id);
} else {
- log_client_msg(channel->id, p->type == kMessageTypeRequest, p->handler.name);
+ if (p->type == kMessageTypeNotification) {
+ log_notify(RECV, channel->id, p->handler.name);
+ } else {
+ log_request(RECV, channel->id, p->request_id, p->handler.name);
+ }
Object res = p->result;
if (p->result.type != kObjectTypeArray) {
@@ -442,15 +406,7 @@ static void request_event(void **argv)
Object result = handler.fn(channel->id, e->args, &e->used_mem, &error);
if (e->type == kMessageTypeRequest || ERROR_SET(&error)) {
// Send the response.
- msgpack_packer response;
- msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
- channel_write(channel, serialize_response(channel->id,
- e->handler,
- e->type,
- e->request_id,
- &error,
- &result,
- &out_buffer));
+ serialize_response(channel, e->handler, e->type, e->request_id, &error, &result);
}
if (handler.ret_alloc) {
api_free_object(result);
@@ -533,41 +489,14 @@ static void send_error(Channel *chan, MsgpackRpcRequestHandler handler, MessageT
{
Error e = ERROR_INIT;
api_set_error(&e, kErrorTypeException, "%s", err);
- channel_write(chan, serialize_response(chan->id,
- handler,
- type,
- id,
- &e,
- &NIL,
- &out_buffer));
+ serialize_response(chan, handler, type, id, &e, &NIL);
api_clear_error(&e);
}
-static void send_request(Channel *channel, uint32_t id, const char *name, Array args)
-{
- const String method = cstr_as_string(name);
- channel_write(channel, serialize_request(channel->id,
- id,
- method,
- args,
- &out_buffer,
- 1));
-}
-
-static void send_event(Channel *channel, const char *name, Array args)
-{
- const String method = cstr_as_string(name);
- channel_write(channel, serialize_request(channel->id,
- 0,
- method,
- args,
- &out_buffer,
- 1));
-}
-
static void broadcast_event(const char *name, Array args)
{
- kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
+ kvec_withinit_t(Channel *, 4) subscribed = KV_INITIAL_VALUE;
+ kvi_init(subscribed);
Channel *channel;
map_foreach_value(&channels, channel, {
@@ -577,25 +506,11 @@ static void broadcast_event(const char *name, Array args)
}
});
- if (!kv_size(subscribed)) {
- goto end;
- }
-
- const String method = cstr_as_string(name);
- WBuffer *buffer = serialize_request(0,
- 0,
- method,
- args,
- &out_buffer,
- kv_size(subscribed));
-
- for (size_t i = 0; i < kv_size(subscribed); i++) {
- Channel *c = kv_A(subscribed, i);
- channel_write(c, buffer);
+ if (kv_size(subscribed)) {
+ serialize_request(subscribed.items, kv_size(subscribed), 0, name, args);
}
-end:
- kv_destroy(subscribed);
+ kvi_destroy(subscribed);
}
static void unsubscribe(Channel *channel, char *event)
@@ -653,27 +568,28 @@ static void chan_close_with_error(Channel *channel, char *msg, int loglevel)
channel_close(channel->id, kChannelPartRpc, NULL);
}
-static WBuffer *serialize_request(uint64_t channel_id, uint32_t request_id, const String method,
- Array args, msgpack_sbuffer *sbuffer, size_t refcount)
+static void serialize_request(Channel **chans, size_t nchans, uint32_t request_id,
+ const char *method, Array args)
{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_rpc_serialize_request(request_id, method, args, &pac);
- log_server_msg(channel_id, sbuffer);
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- refcount,
- xfree);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
+ PackerBuffer packer;
+ packer_buffer_init_channels(chans, nchans, &packer);
+
+ mpack_array(&packer.ptr, request_id ? 4 : 3);
+ mpack_w(&packer.ptr, request_id ? 0 : 2);
+
+ if (request_id) {
+ mpack_uint(&packer.ptr, request_id);
+ }
+
+ mpack_str(cstr_as_string(method), &packer);
+ mpack_object_array(args, &packer);
+
+ packer_buffer_finish_channels(&packer);
}
-static WBuffer *serialize_response(uint64_t channel_id, MsgpackRpcRequestHandler handler,
- MessageType type, uint32_t response_id, Error *err, Object *arg,
- msgpack_sbuffer *sbuffer)
+void serialize_response(Channel *channel, MsgpackRpcRequestHandler handler, MessageType type,
+ uint32_t response_id, Error *err, Object *arg)
{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
if (ERROR_SET(err) && type == kMessageTypeNotification) {
if (handler.fn == handle_nvim_paste) {
// TODO(bfredl): this is pretty much ad-hoc. maybe TUI and UI:s should be
@@ -685,19 +601,65 @@ static WBuffer *serialize_response(uint64_t channel_id, MsgpackRpcRequestHandler
MAXSIZE_TEMP_ARRAY(args, 2);
ADD_C(args, INTEGER_OBJ(err->type));
ADD_C(args, CSTR_AS_OBJ(err->msg));
- msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"),
- args, &pac);
+ serialize_request(&channel, 1, 0, "nvim_error_event", args);
}
+ return;
+ }
+
+ PackerBuffer packer;
+ packer_buffer_init_channels(&channel, 1, &packer);
+
+ mpack_array(&packer.ptr, 4);
+ mpack_w(&packer.ptr, 1);
+ mpack_uint(&packer.ptr, response_id);
+
+ if (ERROR_SET(err)) {
+ // error represented by a [type, message] array
+ mpack_array(&packer.ptr, 2);
+ mpack_integer(&packer.ptr, err->type);
+ mpack_str(cstr_as_string(err->msg), &packer);
+ // Nil result
+ mpack_nil(&packer.ptr);
} else {
- msgpack_rpc_serialize_response(response_id, err, arg, &pac);
+ // Nil error
+ mpack_nil(&packer.ptr);
+ // Return value
+ mpack_object(arg, &packer);
}
- log_server_msg(channel_id, sbuffer);
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- 1, // responses only go though 1 channel
- xfree);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
+
+ packer_buffer_finish_channels(&packer);
+
+ log_response(SEND, channel->id, ERROR_SET(err) ? ERR : RES, response_id);
+}
+
+static void packer_buffer_init_channels(Channel **chans, size_t nchans, PackerBuffer *packer)
+{
+ packer->startptr = alloc_block();
+ packer->ptr = packer->startptr;
+ packer->endptr = packer->startptr + ARENA_BLOCK_SIZE;
+ packer->packer_flush = channel_flush_callback;
+ packer->anydata = chans;
+ packer->anylen = nchans;
+}
+
+static void packer_buffer_finish_channels(PackerBuffer *packer)
+{
+ size_t len = (size_t)(packer->ptr - packer->startptr);
+ if (len > 0) {
+ WBuffer *buf = wstream_new_buffer(packer->startptr, len, packer->anylen, free_block);
+ Channel **chans = packer->anydata;
+ for (size_t i = 0; i < packer->anylen; i++) {
+ channel_write(chans[i], buf);
+ }
+ } else {
+ free_block(packer->startptr);
+ }
+}
+
+static void channel_flush_callback(PackerBuffer *packer)
+{
+ packer_buffer_finish_channels(packer);
+ packer_buffer_init_channels(packer->anydata, packer->anylen, packer);
}
void rpc_set_client_info(uint64_t id, Dictionary info)
@@ -762,7 +724,6 @@ void rpc_free_all_mem(void)
});
set_destroy(cstr_t, &event_strings);
- msgpack_sbuffer_destroy(&out_buffer);
multiqueue_free(ch_before_blocking_events);
}
#endif