aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
authorbfredl <bjorn.linse@gmail.com>2022-05-23 19:53:19 +0200
committerbfredl <bjorn.linse@gmail.com>2022-06-02 16:05:24 +0200
commitd5f047bee04a42f40425c34061c84b39af846e1f (patch)
treed81bb803389467604e2ad319b54b211415594ce6 /src/nvim/msgpack_rpc/channel.c
parentd93ba03c717bee05fe6d239fd7faefe6e9698c85 (diff)
downloadrneovim-d5f047bee04a42f40425c34061c84b39af846e1f.tar.gz
rneovim-d5f047bee04a42f40425c34061c84b39af846e1f.tar.bz2
rneovim-d5f047bee04a42f40425c34061c84b39af846e1f.zip
refactor(api): use a unpacker based on libmpack instead of msgpack-c
Currently this is more or less a straight off reimplementation, but this allow further optimizations down the line, especially for avoiding memory allocations of rpc objects. Current score for "make functionaltest; make oldtest" on a -DEXITFREE build: is 117 055 352 xfree(ptr != NULL) calls (that's NUMBERWANG!).
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c218
1 files changed, 72 insertions, 146 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 287310cc34..79a9e1082d 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -55,7 +55,8 @@ void rpc_start(Channel *channel)
channel->is_rpc = true;
RpcState *rpc = &channel->rpc;
rpc->closed = false;
- rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker);
+ unpacker_init(rpc->unpacker);
rpc->next_request_id = 1;
rpc->info = (Dictionary)ARRAY_DICT_INIT;
kv_init(rpc->call_stack);
@@ -209,20 +210,20 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
- call_set_error(channel, buf, LOGLVL_INF);
+ chan_close_with_error(channel, buf, LOGLVL_INF);
goto end;
}
- size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
- channel->id, count, (void *)stream);
-
- // Feed the unpacker with data
- msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
- rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
- msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
+ channel->id, rbuffer_size(rbuf), (void *)stream);
+ Unpacker *p = channel->rpc.unpacker;
+ size_t size = 0;
+ p->read_ptr = rbuffer_read_ptr(rbuf, &size);
+ p->read_size = size;
parse_msgpack(channel);
+ size_t consumed = size - p->read_size;
+ rbuffer_consumed_compact(rbuf, consumed);
end:
channel_decref(channel);
@@ -230,111 +231,70 @@ end:
static void parse_msgpack(Channel *channel)
{
- msgpack_unpacked unpacked;
- msgpack_unpacked_init(&unpacked);
- msgpack_unpack_return result;
-
- // Deserialize everything we can.
- while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
- MSGPACK_UNPACK_SUCCESS) {
- bool is_response = is_rpc_response(&unpacked.data);
- log_client_msg(channel->id, !is_response, unpacked.data);
-
- if (is_response) {
- if (is_valid_rpc_response(&unpacked.data, channel)) {
- complete_call(&unpacked.data, channel);
- } else {
+ Unpacker *p = channel->rpc.unpacker;
+ while (unpacker_advance(p)) {
+ if (p->type == kMessageTypeResponse) {
+ ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
+ if (p->request_id != frame->request_id) {
char buf[256];
snprintf(buf, sizeof(buf),
"ch %" PRIu64 " returned a response with an unknown request "
"id. Ensure the client is properly synchronized",
channel->id);
- call_set_error(channel, buf, LOGLVL_ERR);
+ chan_close_with_error(channel, buf, LOGLVL_ERR);
+ }
+ frame->returned = true;
+ frame->errored = (p->error.type != kObjectTypeNil);
+
+ if (frame->errored) {
+ frame->result = p->error;
+ // TODO(bfredl): p->result should not even be decoded
+ api_free_object(p->result);
+ } else {
+ frame->result = p->result;
}
- msgpack_unpacked_destroy(&unpacked);
} else {
- handle_request(channel, &unpacked.data);
- }
- }
+ log_client_msg(channel->id, p->type == kMessageTypeRequest, p->handler.name);
- if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
- mch_errmsg(e_outofmem);
- mch_errmsg("\n");
- channel_decref(channel);
- preserve_exit();
+ Object res = p->result;
+ if (p->result.type != kObjectTypeArray) {
+ chan_close_with_error(channel, "msgpack-rpc request args has to be an array", LOGLVL_ERR);
+ api_free_object(p->result);
+ return;
+ }
+ Array arg = res.data.array;
+ handle_request(channel, p, arg);
+ }
}
- if (result == MSGPACK_UNPACK_PARSE_ERROR) {
- // See src/msgpack/unpack_template.h in msgpack source tree for
- // causes for this error(search for 'goto _failed')
- //
- // A not so uncommon cause for this might be deserializing objects with
- // a high nesting level: msgpack will break when its internal parse stack
- // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
- send_error(channel, kMessageTypeRequest, 0,
- "Invalid msgpack payload. "
- "This error can also happen when deserializing "
- "an object with high level of nesting");
+ if (unpacker_closed(p)) {
+ chan_close_with_error(channel, p->unpack_error.msg, LOGLVL_ERR);
+ api_clear_error(&p->unpack_error);
}
}
/// Handles requests and notifications received on the channel.
-static void handle_request(Channel *channel, msgpack_object *request)
+static void handle_request(Channel *channel, Unpacker *p, Array args)
FUNC_ATTR_NONNULL_ALL
{
- uint32_t request_id;
- Error error = ERROR_INIT;
- MessageType type = msgpack_rpc_validate(&request_id, request, &error);
-
- if (ERROR_SET(&error)) {
- // Validation failed, send response with error
- if (channel_write(channel,
- serialize_response(channel->id,
- type,
- request_id,
- &error,
- NIL,
- &out_buffer))) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 " sent an invalid message, closed.",
- channel->id);
- call_set_error(channel, buf, LOGLVL_ERR);
- }
- api_clear_error(&error);
- return;
- }
- assert(type == kMessageTypeRequest || type == kMessageTypeNotification);
-
- MsgpackRpcRequestHandler handler;
- msgpack_object *method = msgpack_rpc_method(request);
- handler = msgpack_rpc_get_handler_for(method->via.bin.ptr,
- method->via.bin.size,
- &error);
+ assert(p->type == kMessageTypeRequest || p->type == kMessageTypeNotification);
- // check method arguments
- Array args = ARRAY_DICT_INIT;
- if (!ERROR_SET(&error)
- && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
- api_set_error(&error, kErrorTypeException, "Invalid method arguments");
- }
-
- if (ERROR_SET(&error)) {
- send_error(channel, type, request_id, error.msg);
- api_clear_error(&error);
+ if (!p->handler.fn) {
+ send_error(channel, p->type, p->request_id, p->unpack_error.msg);
+ api_clear_error(&p->unpack_error);
api_free_array(args);
return;
}
RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
- evdata->type = type;
+ evdata->type = p->type;
evdata->channel = channel;
- evdata->handler = handler;
+ evdata->handler = p->handler;
evdata->args = args;
- evdata->request_id = request_id;
+ evdata->request_id = p->request_id;
channel_incref(channel);
- if (handler.fast) {
- bool is_get_mode = handler.fn == handle_nvim_get_mode;
+ if (p->handler.fast) {
+ bool is_get_mode = p->handler.fn == handle_nvim_get_mode;
if (is_get_mode && !input_blocking()) {
// Defer the event to a special queue used by os/input.c. #6247
@@ -344,7 +304,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
request_event((void **)&evdata);
}
} else {
- bool is_resize = handler.fn == handle_nvim_ui_try_resize;
+ bool is_resize = p->handler.fn == handle_nvim_ui_try_resize;
if (is_resize) {
Event ev = event_create_oneshot(event_create(request_event, 1, evdata),
2);
@@ -352,7 +312,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
multiqueue_put_event(resize_events, ev);
} else {
multiqueue_put(channel->events, request_event, 1, evdata);
- DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr);
+ DLOG("RPC: scheduled %.*s", (int)p->method_name_len, p->handler.name);
}
}
}
@@ -418,7 +378,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
"ch %" PRIu64 ": stream write failed. "
"RPC canceled; closing channel",
channel->id);
- call_set_error(channel, buf, LOGLVL_ERR);
+ chan_close_with_error(channel, buf, LOGLVL_ERR);
}
return success;
@@ -428,14 +388,19 @@ static void internal_read_event(void **argv)
{
Channel *channel = argv[0];
WBuffer *buffer = argv[1];
+ Unpacker *p = channel->rpc.unpacker;
- msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
- memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
- buffer->data, buffer->size);
- msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
-
+ p->read_ptr = buffer->data;
+ p->read_size = buffer->size;
parse_msgpack(channel);
+ if (p->read_size) {
+ // This should not happen, as WBuffer is one single serialized message.
+ if (!channel->rpc.closed) {
+ chan_close_with_error(channel, "internal channel: internal error", LOGLVL_ERR);
+ }
+ }
+
channel_decref(channel);
wstream_release_wbuffer(buffer);
}
@@ -558,7 +523,7 @@ static void exit_event(void **argv)
void rpc_free(Channel *channel)
{
remote_ui_disconnect(channel->id);
- msgpack_unpacker_free(channel->rpc.unpacker);
+ xfree(channel->rpc.unpacker);
// Unsubscribe from all events
char *event_string;
@@ -571,41 +536,7 @@ void rpc_free(Channel *channel)
api_free_dictionary(channel->rpc.info);
}
-static bool is_rpc_response(msgpack_object *obj)
-{
- return obj->type == MSGPACK_OBJECT_ARRAY
- && obj->via.array.size == 4
- && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
- && obj->via.array.ptr[0].via.u64 == 1
- && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
-}
-
-static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
-{
- uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64;
- if (kv_size(channel->rpc.call_stack) == 0) {
- return false;
- }
-
- // Must be equal to the frame at the stack's bottom
- ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
- return response_id == frame->request_id;
-}
-
-static void complete_call(msgpack_object *obj, Channel *channel)
-{
- ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
- frame->returned = true;
- frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
-
- if (frame->errored) {
- msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
- } else {
- msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
- }
-}
-
-static void call_set_error(Channel *channel, char *msg, int loglevel)
+static void chan_close_with_error(Channel *channel, char *msg, int loglevel)
{
LOG(loglevel, "RPC: %s", msg);
for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
@@ -723,7 +654,8 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed)
log_lock();
FILE *f = open_log_file();
fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
- log_msg_close(f, unpacked.data);
+ msgpack_object_print(f, unpacked.data);
+ log_close(f);
msgpack_unpacked_destroy(&unpacked);
break;
}
@@ -734,30 +666,24 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed)
log_lock();
FILE *f = open_log_file();
fprintf(f, ERR);
- log_msg_close(f, (msgpack_object) {
- .type = MSGPACK_OBJECT_STR,
- .via.str = {
- .ptr = (char *)msgpack_error_messages[result + MUR_OFF],
- .size = (uint32_t)strlen(msgpack_error_messages[result + MUR_OFF]),
- },
- });
+ fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]);
+ log_close(f);
break;
}
}
}
-static void log_client_msg(uint64_t channel_id, bool is_request, msgpack_object msg)
+static void log_client_msg(uint64_t channel_id, bool is_request, const char *name)
{
DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
log_lock();
FILE *f = open_log_file();
- fprintf(f, is_request ? REQ : RES);
- log_msg_close(f, msg);
+ fprintf(f, "%s: %s", is_request ? REQ : RES, name);
+ log_close(f);
}
-static void log_msg_close(FILE *f, msgpack_object msg)
+static void log_close(FILE *f)
{
- msgpack_object_print(f, msg);
fputc('\n', f);
fflush(f);
fclose(f);