diff options
author | bfredl <bjorn.linse@gmail.com> | 2022-05-23 19:53:19 +0200 |
---|---|---|
committer | bfredl <bjorn.linse@gmail.com> | 2022-06-02 16:05:24 +0200 |
commit | d5f047bee04a42f40425c34061c84b39af846e1f (patch) | |
tree | d81bb803389467604e2ad319b54b211415594ce6 | |
parent | d93ba03c717bee05fe6d239fd7faefe6e9698c85 (diff) | |
download | rneovim-d5f047bee04a42f40425c34061c84b39af846e1f.tar.gz rneovim-d5f047bee04a42f40425c34061c84b39af846e1f.tar.bz2 rneovim-d5f047bee04a42f40425c34061c84b39af846e1f.zip |
refactor(api): use a unpacker based on libmpack instead of msgpack-c
Currently this is more or less a straight off reimplementation,
but this allow further optimizations down the line, especially
for avoiding memory allocations of rpc objects.
Current score for "make functionaltest; make oldtest" on a -DEXITFREE build:
is 117 055 352 xfree(ptr != NULL) calls (that's NUMBERWANG!).
-rw-r--r-- | runtime/doc/api.txt | 6 | ||||
-rw-r--r-- | src/mpack/mpack_core.c | 10 | ||||
-rw-r--r-- | src/mpack/mpack_core.h | 2 | ||||
-rw-r--r-- | src/nvim/api/vim.c | 7 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 218 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel_defs.h | 3 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/unpacker.c | 301 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/unpacker.h | 40 | ||||
-rw-r--r-- | src/nvim/rbuffer.c | 17 |
9 files changed, 452 insertions, 152 deletions
diff --git a/runtime/doc/api.txt b/runtime/doc/api.txt index 4dbb3a9a30..60d7528602 100644 --- a/runtime/doc/api.txt +++ b/runtime/doc/api.txt @@ -638,6 +638,12 @@ nvim__stats() *nvim__stats()* Return: ~ Map of various internal stats. +nvim__unpack({str}) *nvim__unpack()* + TODO: Documentation + + Attributes: ~ + |api-fast| + nvim_call_atomic({calls}) *nvim_call_atomic()* Calls many API methods atomically. diff --git a/src/mpack/mpack_core.c b/src/mpack/mpack_core.c index f8ca63b7a3..4ee67a032a 100644 --- a/src/mpack/mpack_core.c +++ b/src/mpack/mpack_core.c @@ -12,8 +12,6 @@ # define MIN(X, Y) ((X) < (Y) ? (X) : (Y)) #endif -static int mpack_rtoken(const char **buf, size_t *buflen, - mpack_token_t *tok); static int mpack_rpending(const char **b, size_t *nl, mpack_tokbuf_t *tb); static int mpack_rvalue(mpack_token_type_t t, mpack_uint32_t l, const char **b, size_t *bl, mpack_token_t *tok); @@ -52,7 +50,10 @@ MPACK_API int mpack_read(mpack_tokbuf_t *tokbuf, const char **buf, int status; size_t initial_ppos, ptrlen, advanced; const char *ptr, *ptr_save; - assert(*buf && *buflen); + assert(*buf); + if (*buflen == 0) { + return MPACK_EOF; + } if (tokbuf->passthrough) { /* pass data from str/bin/ext directly as a MPACK_TOKEN_CHUNK, adjusting @@ -170,8 +171,7 @@ MPACK_API int mpack_write(mpack_tokbuf_t *tokbuf, char **buf, size_t *buflen, return MPACK_OK; } -static int mpack_rtoken(const char **buf, size_t *buflen, - mpack_token_t *tok) +int mpack_rtoken(const char **buf, size_t *buflen, mpack_token_t *tok) { unsigned char t = ADVANCE(buf, buflen); if (t < 0x80) { diff --git a/src/mpack/mpack_core.h b/src/mpack/mpack_core.h index 9edd13c41e..1d601bc82d 100644 --- a/src/mpack/mpack_core.h +++ b/src/mpack/mpack_core.h @@ -83,5 +83,7 @@ MPACK_API int mpack_read(mpack_tokbuf_t *tb, const char **b, size_t *bl, mpack_token_t *tok) FUNUSED FNONULL; MPACK_API int mpack_write(mpack_tokbuf_t *tb, char **b, size_t *bl, const mpack_token_t *tok) FUNUSED FNONULL; +int mpack_rtoken(const char **buf, size_t *buflen, + mpack_token_t *tok); #endif /* MPACK_CORE_H */ diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index 228d114376..2320ae62af 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -45,6 +45,7 @@ #include "nvim/move.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/unpacker.h" #include "nvim/ops.h" #include "nvim/option.h" #include "nvim/os/input.h" @@ -2188,6 +2189,12 @@ void nvim__screenshot(String path) ui_call_screenshot(path); } +Object nvim__unpack(String str, Error *err) + FUNC_API_FAST +{ + return unpack(str.data, str.size, err); +} + /// Deletes an uppercase/file named mark. See |mark-motions|. /// /// @note fails with error if a lowercase or buffer local named mark is used. diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 287310cc34..79a9e1082d 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -55,7 +55,8 @@ void rpc_start(Channel *channel) channel->is_rpc = true; RpcState *rpc = &channel->rpc; rpc->closed = false; - rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker); + unpacker_init(rpc->unpacker); rpc->next_request_id = 1; rpc->info = (Dictionary)ARRAY_DICT_INIT; kv_init(rpc->call_stack); @@ -209,20 +210,20 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, LOGLVL_INF); + chan_close_with_error(channel, buf, LOGLVL_INF); goto end; } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", - channel->id, count, (void *)stream); - - // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); - msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); + channel->id, rbuffer_size(rbuf), (void *)stream); + Unpacker *p = channel->rpc.unpacker; + size_t size = 0; + p->read_ptr = rbuffer_read_ptr(rbuf, &size); + p->read_size = size; parse_msgpack(channel); + size_t consumed = size - p->read_size; + rbuffer_consumed_compact(rbuf, consumed); end: channel_decref(channel); @@ -230,111 +231,70 @@ end: static void parse_msgpack(Channel *channel) { - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - msgpack_unpack_return result; - - // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { - bool is_response = is_rpc_response(&unpacked.data); - log_client_msg(channel->id, !is_response, unpacked.data); - - if (is_response) { - if (is_valid_rpc_response(&unpacked.data, channel)) { - complete_call(&unpacked.data, channel); - } else { + Unpacker *p = channel->rpc.unpacker; + while (unpacker_advance(p)) { + if (p->type == kMessageTypeResponse) { + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + if (p->request_id != frame->request_id) { char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf, LOGLVL_ERR); + chan_close_with_error(channel, buf, LOGLVL_ERR); + } + frame->returned = true; + frame->errored = (p->error.type != kObjectTypeNil); + + if (frame->errored) { + frame->result = p->error; + // TODO(bfredl): p->result should not even be decoded + api_free_object(p->result); + } else { + frame->result = p->result; } - msgpack_unpacked_destroy(&unpacked); } else { - handle_request(channel, &unpacked.data); - } - } + log_client_msg(channel->id, p->type == kMessageTypeRequest, p->handler.name); - if (result == MSGPACK_UNPACK_NOMEM_ERROR) { - mch_errmsg(e_outofmem); - mch_errmsg("\n"); - channel_decref(channel); - preserve_exit(); + Object res = p->result; + if (p->result.type != kObjectTypeArray) { + chan_close_with_error(channel, "msgpack-rpc request args has to be an array", LOGLVL_ERR); + api_free_object(p->result); + return; + } + Array arg = res.data.array; + handle_request(channel, p, arg); + } } - if (result == MSGPACK_UNPACK_PARSE_ERROR) { - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when its internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) - send_error(channel, kMessageTypeRequest, 0, - "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); + if (unpacker_closed(p)) { + chan_close_with_error(channel, p->unpack_error.msg, LOGLVL_ERR); + api_clear_error(&p->unpack_error); } } /// Handles requests and notifications received on the channel. -static void handle_request(Channel *channel, msgpack_object *request) +static void handle_request(Channel *channel, Unpacker *p, Array args) FUNC_ATTR_NONNULL_ALL { - uint32_t request_id; - Error error = ERROR_INIT; - MessageType type = msgpack_rpc_validate(&request_id, request, &error); - - if (ERROR_SET(&error)) { - // Validation failed, send response with error - if (channel_write(channel, - serialize_response(channel->id, - type, - request_id, - &error, - NIL, - &out_buffer))) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 " sent an invalid message, closed.", - channel->id); - call_set_error(channel, buf, LOGLVL_ERR); - } - api_clear_error(&error); - return; - } - assert(type == kMessageTypeRequest || type == kMessageTypeNotification); - - MsgpackRpcRequestHandler handler; - msgpack_object *method = msgpack_rpc_method(request); - handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, - method->via.bin.size, - &error); + assert(p->type == kMessageTypeRequest || p->type == kMessageTypeNotification); - // check method arguments - Array args = ARRAY_DICT_INIT; - if (!ERROR_SET(&error) - && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { - api_set_error(&error, kErrorTypeException, "Invalid method arguments"); - } - - if (ERROR_SET(&error)) { - send_error(channel, type, request_id, error.msg); - api_clear_error(&error); + if (!p->handler.fn) { + send_error(channel, p->type, p->request_id, p->unpack_error.msg); + api_clear_error(&p->unpack_error); api_free_array(args); return; } RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); - evdata->type = type; + evdata->type = p->type; evdata->channel = channel; - evdata->handler = handler; + evdata->handler = p->handler; evdata->args = args; - evdata->request_id = request_id; + evdata->request_id = p->request_id; channel_incref(channel); - if (handler.fast) { - bool is_get_mode = handler.fn == handle_nvim_get_mode; + if (p->handler.fast) { + bool is_get_mode = p->handler.fn == handle_nvim_get_mode; if (is_get_mode && !input_blocking()) { // Defer the event to a special queue used by os/input.c. #6247 @@ -344,7 +304,7 @@ static void handle_request(Channel *channel, msgpack_object *request) request_event((void **)&evdata); } } else { - bool is_resize = handler.fn == handle_nvim_ui_try_resize; + bool is_resize = p->handler.fn == handle_nvim_ui_try_resize; if (is_resize) { Event ev = event_create_oneshot(event_create(request_event, 1, evdata), 2); @@ -352,7 +312,7 @@ static void handle_request(Channel *channel, msgpack_object *request) multiqueue_put_event(resize_events, ev); } else { multiqueue_put(channel->events, request_event, 1, evdata); - DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr); + DLOG("RPC: scheduled %.*s", (int)p->method_name_len, p->handler.name); } } } @@ -418,7 +378,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "ch %" PRIu64 ": stream write failed. " "RPC canceled; closing channel", channel->id); - call_set_error(channel, buf, LOGLVL_ERR); + chan_close_with_error(channel, buf, LOGLVL_ERR); } return success; @@ -428,14 +388,19 @@ static void internal_read_event(void **argv) { Channel *channel = argv[0]; WBuffer *buffer = argv[1]; + Unpacker *p = channel->rpc.unpacker; - msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), - buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); - + p->read_ptr = buffer->data; + p->read_size = buffer->size; parse_msgpack(channel); + if (p->read_size) { + // This should not happen, as WBuffer is one single serialized message. + if (!channel->rpc.closed) { + chan_close_with_error(channel, "internal channel: internal error", LOGLVL_ERR); + } + } + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -558,7 +523,7 @@ static void exit_event(void **argv) void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - msgpack_unpacker_free(channel->rpc.unpacker); + xfree(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; @@ -571,41 +536,7 @@ void rpc_free(Channel *channel) api_free_dictionary(channel->rpc.info); } -static bool is_rpc_response(msgpack_object *obj) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 4 - && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.array.ptr[0].via.u64 == 1 - && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; -} - -static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) -{ - uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64; - if (kv_size(channel->rpc.call_stack) == 0) { - return false; - } - - // Must be equal to the frame at the stack's bottom - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - return response_id == frame->request_id; -} - -static void complete_call(msgpack_object *obj, Channel *channel) -{ - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - frame->returned = true; - frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - - if (frame->errored) { - msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); - } else { - msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); - } -} - -static void call_set_error(Channel *channel, char *msg, int loglevel) +static void chan_close_with_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { @@ -723,7 +654,8 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) log_lock(); FILE *f = open_log_file(); fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); - log_msg_close(f, unpacked.data); + msgpack_object_print(f, unpacked.data); + log_close(f); msgpack_unpacked_destroy(&unpacked); break; } @@ -734,30 +666,24 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) log_lock(); FILE *f = open_log_file(); fprintf(f, ERR); - log_msg_close(f, (msgpack_object) { - .type = MSGPACK_OBJECT_STR, - .via.str = { - .ptr = (char *)msgpack_error_messages[result + MUR_OFF], - .size = (uint32_t)strlen(msgpack_error_messages[result + MUR_OFF]), - }, - }); + fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]); + log_close(f); break; } } } -static void log_client_msg(uint64_t channel_id, bool is_request, msgpack_object msg) +static void log_client_msg(uint64_t channel_id, bool is_request, const char *name) { DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); log_lock(); FILE *f = open_log_file(); - fprintf(f, is_request ? REQ : RES); - log_msg_close(f, msg); + fprintf(f, "%s: %s", is_request ? REQ : RES, name); + log_close(f); } -static void log_msg_close(FILE *f, msgpack_object msg) +static void log_close(FILE *f) { - msgpack_object_print(f, msg); fputc('\n', f); fflush(f); fclose(f); diff --git a/src/nvim/msgpack_rpc/channel_defs.h b/src/nvim/msgpack_rpc/channel_defs.h index 65f1e6b6d6..4dc3c7f22d 100644 --- a/src/nvim/msgpack_rpc/channel_defs.h +++ b/src/nvim/msgpack_rpc/channel_defs.h @@ -9,6 +9,7 @@ #include "nvim/api/private/dispatch.h" #include "nvim/event/process.h" #include "nvim/event/socket.h" +#include "nvim/msgpack_rpc/unpacker.h" #include "nvim/vim.h" typedef struct Channel Channel; @@ -30,7 +31,7 @@ typedef struct { typedef struct { PMap(cstr_t) subscribed_events[1]; bool closed; - msgpack_unpacker *unpacker; + Unpacker *unpacker; uint32_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; Dictionary info; diff --git a/src/nvim/msgpack_rpc/unpacker.c b/src/nvim/msgpack_rpc/unpacker.c new file mode 100644 index 0000000000..9db8f314bf --- /dev/null +++ b/src/nvim/msgpack_rpc/unpacker.c @@ -0,0 +1,301 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + +#include "nvim/api/private/helpers.h" +#include "nvim/log.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/unpacker.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/unpacker.c.generated.h" +#endif + +Object unpack(const char *data, size_t size, Error *err) +{ + Unpacker unpacker; + mpack_parser_init(&unpacker.parser, 0); + unpacker.parser.data.p = &unpacker; + + int result = mpack_parse(&unpacker.parser, &data, &size, + api_parse_enter, api_parse_exit); + + if (result == MPACK_NOMEM) { + api_set_error(err, kErrorTypeException, "object was too deep to unpack"); + } else if (result == MPACK_EOF) { + api_set_error(err, kErrorTypeException, "incomplete msgpack string"); + } else if (result == MPACK_ERROR) { + api_set_error(err, kErrorTypeException, "invalid msgpack string"); + } else if (result == MPACK_OK && size) { + api_set_error(err, kErrorTypeException, "trailing data in msgpack string"); + } + + return unpacker.result; +} + +static void api_parse_enter(mpack_parser_t *parser, mpack_node_t *node) +{ + Unpacker *unpacker = parser->data.p; + Object *result = NULL; + String *key_location = NULL; + + mpack_node_t *parent = MPACK_PARENT_NODE(node); + if (parent) { + switch (parent->tok.type) { + case MPACK_TOKEN_ARRAY: { + Object *obj = parent->data[0].p; + result = &kv_A(obj->data.array, parent->pos); + break; + } + case MPACK_TOKEN_MAP: { + Object *obj = parent->data[0].p; + KeyValuePair *kv = &kv_A(obj->data.dictionary, parent->pos); + if (!parent->key_visited) { + key_location = &kv->key; + } else { + result = &kv->value; + } + break; + } + + default: + break; + } + } else { + result = &unpacker->result; + } + + switch (node->tok.type) { + case MPACK_TOKEN_NIL: + *result = NIL; + break; + case MPACK_TOKEN_BOOLEAN: + *result = BOOL(mpack_unpack_boolean(node->tok)); + break; + case MPACK_TOKEN_SINT: + *result = INTEGER_OBJ(mpack_unpack_sint(node->tok)); + break; + case MPACK_TOKEN_UINT: + *result = INTEGER_OBJ((Integer)mpack_unpack_uint(node->tok)); + break; + case MPACK_TOKEN_FLOAT: + *result = FLOAT_OBJ(mpack_unpack_float(node->tok)); + break; + case MPACK_TOKEN_BIN: + case MPACK_TOKEN_STR: { + String str = { .data = xmallocz(node->tok.length), .size = node->tok.length }; + + if (key_location) { + *key_location = str; + } else { + *result = STRING_OBJ(str); + } + + node->data[0].p = str.data; + break; + } + case MPACK_TOKEN_EXT: + // handled in chunk; but save result location + node->data[0].p = result; + break; + + case MPACK_TOKEN_CHUNK: + if (parent->tok.type == MPACK_TOKEN_STR || parent->tok.type == MPACK_TOKEN_BIN) { + char *data = parent->data[0].p; + memcpy(data + parent->pos, + node->tok.data.chunk_ptr, node->tok.length); + } else { + Object *res = parent->data[0].p; + + size_t endlen = parent->pos + node->tok.length; + if (endlen > MAX_EXT_LEN) { + *res = NIL; + break; + } + memcpy(unpacker->ext_buf + parent->pos, + node->tok.data.chunk_ptr, node->tok.length); + if (parent->pos + node->tok.length < parent->tok.length) { + break; // EOF, let's get back to it later + } + const char *buf = unpacker->ext_buf; + size_t size = parent->tok.length; + mpack_token_t ext_tok; + int status = mpack_rtoken(&buf, &size, &ext_tok); + if (status || ext_tok.type != MPACK_TOKEN_UINT) { + // TODO(bfredl): once we fixed memory management, we can set + // p->unpack_error and a flag like p->interrupted + *res = NIL; + break; + } + int ext_type = parent->tok.data.ext_type; + if (0 <= ext_type && ext_type <= EXT_OBJECT_TYPE_MAX) { + res->type = (ObjectType)(ext_type + EXT_OBJECT_TYPE_SHIFT); + res->data.integer = (int64_t)mpack_unpack_uint(ext_tok); + } else { + *res = NIL; + break; + } + } + break; + + case MPACK_TOKEN_ARRAY: { + Array arr = KV_INITIAL_VALUE; + kv_resize(arr, node->tok.length); + kv_size(arr) = node->tok.length; + *result = ARRAY_OBJ(arr); + node->data[0].p = result; + break; + } + case MPACK_TOKEN_MAP: { + Dictionary dict = KV_INITIAL_VALUE; + kv_resize(dict, node->tok.length); + kv_size(dict) = node->tok.length; + *result = DICTIONARY_OBJ(dict); + node->data[0].p = result; + break; + } + default: + abort(); + } +} + +static void api_parse_exit(mpack_parser_t *parser, mpack_node_t *node) +{} + +void unpacker_init(Unpacker *p) +{ + mpack_parser_init(&p->parser, 0); + p->parser.data.p = p; + mpack_tokbuf_init(&p->reader); + p->unpack_error = (Error)ERROR_INIT; +} + +bool unpacker_parse_header(Unpacker *p) +{ + mpack_token_t tok; + int result; + + const char *data = p->read_ptr; + size_t size = p->read_size; + + assert(!ERROR_SET(&p->unpack_error)); + +#define NEXT(tok) \ + result = mpack_read(&p->reader, &data, &size, &tok); \ + if (result) { goto error; } + + NEXT(tok); + if (tok.type != MPACK_TOKEN_ARRAY || tok.length < 3 || tok.length > 4) { + goto error; + } + size_t array_length = tok.length; + + NEXT(tok); + if (tok.type != MPACK_TOKEN_UINT) { + goto error; + } + uint32_t type = (uint32_t)mpack_unpack_uint(tok); + if ((array_length == 3) ? type != 2 : (type >= 2)) { + goto error; + } + p->type = (MessageType)type; + p->request_id = 0; + + if (p->type != kMessageTypeNotification) { + NEXT(tok); + if (tok.type != MPACK_TOKEN_UINT) { + goto error; + } + p->request_id = (uint32_t)mpack_unpack_uint(tok); + } + + if (p->type != kMessageTypeResponse) { + NEXT(tok); + if ((tok.type != MPACK_TOKEN_STR && tok.type != MPACK_TOKEN_BIN) + || tok.length > 100) { + goto error; + } + p->method_name_len = tok.length; + + if (p->method_name_len > 0) { + NEXT(tok); + assert(tok.type == MPACK_TOKEN_CHUNK); + } + if (tok.length < p->method_name_len) { + result = MPACK_EOF; + goto error; + } + // if this fails, p->handler.fn will be NULL + p->handler = msgpack_rpc_get_handler_for(tok.length ? tok.data.chunk_ptr : "", + tok.length, &p->unpack_error); + } + + p->read_ptr = data; + p->read_size = size; + return true; +#undef NEXT + +error: + if (result == MPACK_EOF) { + // recover later by retrying from scratch + // when more data is available. + mpack_tokbuf_init(&p->reader); + } else { + api_set_error(&p->unpack_error, kErrorTypeValidation, "failed to decode msgpack"); + p->state = -1; + } + return false; +} + +// BASIC BITCH STATE MACHINE +// +// With some basic assumptions, we can parse the overall structure of msgpack-rpc +// messages with a hand-rolled FSM of just 3 states (<x> = p->state): +// +// <0>[0, request_id, method_name, <2>args] +// <0>[1, request_id, <1>err, <2>result] +// <0>[2, method_name, <2>args] +// +// The assumption here is that the header of the message, which we define as the +// initial array head, the kind integer, request_id and/or method name (when needed), +// is relatively small, just ~10 bytes + the method name. Thus we can simply refuse +// to advance the stream beyond the header until it can be parsed in its entirety. +// +// Of course, later on, we want to specialize state 2 into sub-states depending +// on the specific method. "nvim_exec_lua" should just decode direct into lua +// objects, and "redraw/grid_line" should use a hand-rolled decoder to avoid +// a blizzard of small objects for each screen cell. + +bool unpacker_advance(Unpacker *p) +{ + assert(p->state >= 0); + if (p->state == 0) { + if (!unpacker_parse_header(p)) { + return false; + } + p->state = p->type == kMessageTypeResponse ? 1 : 2; + } + + int result; + +rerun: + result = mpack_parse(&p->parser, &p->read_ptr, &p->read_size, + api_parse_enter, api_parse_exit); + + if (result == MPACK_EOF) { + return false; + } else if (result != MPACK_OK) { + api_set_error(&p->unpack_error, kErrorTypeValidation, "failed to parse msgpack"); + p->state = -1; + return false; + } + + if (p->state == 1) { + p->error = p->result; + p->state = 2; + goto rerun; + } else { + assert(p->state == 2); + p->state = 0; + } + return true; +} diff --git a/src/nvim/msgpack_rpc/unpacker.h b/src/nvim/msgpack_rpc/unpacker.h new file mode 100644 index 0000000000..bbd6b1ef4f --- /dev/null +++ b/src/nvim/msgpack_rpc/unpacker.h @@ -0,0 +1,40 @@ +#ifndef NVIM_MSGPACK_RPC_UNPACKER_H +#define NVIM_MSGPACK_RPC_UNPACKER_H + +#include <inttypes.h> +#include <stdbool.h> +#include <string.h> + +#include "mpack/mpack_core.h" +#include "mpack/object.h" +#include "nvim/api/private/dispatch.h" +#include "nvim/api/private/helpers.h" + +typedef struct { + mpack_parser_t parser; + mpack_tokbuf_t reader; + + const char *read_ptr; + size_t read_size; + +#define MAX_EXT_LEN 9 // byte + 8-byte integer + char ext_buf[MAX_EXT_LEN]; + + int state; + MessageType type; + uint32_t request_id; + size_t method_name_len; + MsgpackRpcRequestHandler handler; + Object error; // error return + Object result; // arg list or result + Error unpack_error; +} Unpacker; + +// unrecovareble error. unpack_error should be set! +#define unpacker_closed(p) ((p)->state < 0) + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/unpacker.h.generated.h" +#endif + +#endif // NVIM_MSGPACK_RPC_UNPACKER_H diff --git a/src/nvim/rbuffer.c b/src/nvim/rbuffer.c index d280e08c03..6407ac172e 100644 --- a/src/nvim/rbuffer.c +++ b/src/nvim/rbuffer.c @@ -154,6 +154,23 @@ void rbuffer_consumed(RBuffer *buf, size_t count) } } +/// Use instead of rbuffer_consumed to use rbuffer in a linear, non-cyclic fashion. +/// +/// This is generally usefull if we can guarantee to parse all input +/// except some small incomplete token, like when parsing msgpack. +void rbuffer_consumed_compact(RBuffer *buf, size_t count) + FUNC_ATTR_NONNULL_ALL +{ + assert(buf->read_ptr <= buf->write_ptr); + rbuffer_consumed(buf, count); + if (buf->read_ptr > buf->start_ptr) { + assert((size_t)(buf->read_ptr - buf->write_ptr) == buf->size); + memmove(buf->start_ptr, buf->read_ptr, buf->size); + buf->read_ptr = buf->start_ptr; + buf->write_ptr = buf->read_ptr + buf->size; + } +} + // Higher level functions for copying from/to RBuffer instances and data // pointers size_t rbuffer_write(RBuffer *buf, const char *src, size_t src_size) |