diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 274 |
1 files changed, 113 insertions, 161 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 299651ee97..de01443313 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -26,12 +26,13 @@ #include "nvim/message.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/unpacker.h" #include "nvim/os/input.h" #include "nvim/os_unix.h" #include "nvim/ui.h" #include "nvim/vim.h" -#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL +#if MIN_LOG_LEVEL > LOGLVL_DBG # define log_client_msg(...) # define log_server_msg(...) #endif @@ -49,21 +50,21 @@ void rpc_init(void) msgpack_sbuffer_init(&out_buffer); } - void rpc_start(Channel *channel) { channel_incref(channel); channel->is_rpc = true; RpcState *rpc = &channel->rpc; rpc->closed = false; - rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker); + unpacker_init(rpc->unpacker); rpc->next_request_id = 1; rpc->info = (Dictionary)ARRAY_DICT_INIT; kv_init(rpc->call_stack); if (channel->streamtype != kChannelStreamInternal) { Stream *out = channel_outstream(channel); -#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#if MIN_LOG_LEVEL <= LOGLVL_DBG Stream *in = channel_instream(channel); DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, (void *)in, (void *)out); @@ -73,7 +74,6 @@ void rpc_start(Channel *channel) } } - static Channel *find_rpc_channel(uint64_t id) { Channel *chan = find_channel(id); @@ -114,7 +114,8 @@ bool rpc_send_event(uint64_t id, const char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *err) +Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem *result_mem, + Error *err) { Channel *channel = NULL; @@ -131,7 +132,7 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *er send_request(channel, request_id, method_name, args); // Push the frame - ChannelCallFrame frame = { request_id, false, false, NIL }; + ChannelCallFrame frame = { request_id, false, false, NIL, NULL }; kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); (void)kv_pop(rpc->call_stack); @@ -156,11 +157,15 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *er api_set_error(err, kErrorTypeException, "%s", "unknown error"); } - api_free_object(frame.result); + // frame.result was allocated in an arena + arena_mem_free(frame.result_mem, &rpc->unpacker->reuse_blk); + frame.result_mem = NULL; } channel_decref(channel); + *result_mem = frame.result_mem; + return frame.errored ? NIL : frame.result; } @@ -211,20 +216,20 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, INFO_LOG_LEVEL); + chan_close_with_error(channel, buf, LOGLVL_INF); goto end; } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", - channel->id, count, (void *)stream); - - // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); - msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); + channel->id, rbuffer_size(rbuf), (void *)stream); + Unpacker *p = channel->rpc.unpacker; + size_t size = 0; + p->read_ptr = rbuffer_read_ptr(rbuf, &size); + p->read_size = size; parse_msgpack(channel); + size_t consumed = size - p->read_size; + rbuffer_consumed_compact(rbuf, consumed); end: channel_decref(channel); @@ -232,111 +237,71 @@ end: static void parse_msgpack(Channel *channel) { - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - msgpack_unpack_return result; - - // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { - bool is_response = is_rpc_response(&unpacked.data); - log_client_msg(channel->id, !is_response, unpacked.data); - - if (is_response) { - if (is_valid_rpc_response(&unpacked.data, channel)) { - complete_call(&unpacked.data, channel); - } else { + Unpacker *p = channel->rpc.unpacker; + while (unpacker_advance(p)) { + if (p->type == kMessageTypeResponse) { + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + if (p->request_id != frame->request_id) { char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf, ERROR_LOG_LEVEL); + chan_close_with_error(channel, buf, LOGLVL_ERR); + } + frame->returned = true; + frame->errored = (p->error.type != kObjectTypeNil); + + if (frame->errored) { + frame->result = p->error; + // TODO(bfredl): p->result should not even be decoded + // api_free_object(p->result); + } else { + frame->result = p->result; } - msgpack_unpacked_destroy(&unpacked); + frame->result_mem = arena_finish(&p->arena); } else { - handle_request(channel, &unpacked.data); - } - } + log_client_msg(channel->id, p->type == kMessageTypeRequest, p->handler.name); - if (result == MSGPACK_UNPACK_NOMEM_ERROR) { - mch_errmsg(e_outofmem); - mch_errmsg("\n"); - channel_decref(channel); - preserve_exit(); + Object res = p->result; + if (p->result.type != kObjectTypeArray) { + chan_close_with_error(channel, "msgpack-rpc request args has to be an array", LOGLVL_ERR); + return; + } + Array arg = res.data.array; + handle_request(channel, p, arg); + } } - if (result == MSGPACK_UNPACK_PARSE_ERROR) { - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when its internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) - send_error(channel, kMessageTypeRequest, 0, - "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); + if (unpacker_closed(p)) { + chan_close_with_error(channel, p->unpack_error.msg, LOGLVL_ERR); + api_clear_error(&p->unpack_error); } } /// Handles requests and notifications received on the channel. -static void handle_request(Channel *channel, msgpack_object *request) +static void handle_request(Channel *channel, Unpacker *p, Array args) FUNC_ATTR_NONNULL_ALL { - uint32_t request_id; - Error error = ERROR_INIT; - MessageType type = msgpack_rpc_validate(&request_id, request, &error); + assert(p->type == kMessageTypeRequest || p->type == kMessageTypeNotification); - if (ERROR_SET(&error)) { - // Validation failed, send response with error - if (channel_write(channel, - serialize_response(channel->id, - type, - request_id, - &error, - NIL, - &out_buffer))) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 " sent an invalid message, closed.", - channel->id); - call_set_error(channel, buf, ERROR_LOG_LEVEL); - } - api_clear_error(&error); - return; - } - assert(type == kMessageTypeRequest || type == kMessageTypeNotification); - - MsgpackRpcRequestHandler handler; - msgpack_object *method = msgpack_rpc_method(request); - handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, - method->via.bin.size, - &error); - - // check method arguments - Array args = ARRAY_DICT_INIT; - if (!ERROR_SET(&error) - && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { - api_set_error(&error, kErrorTypeException, "Invalid method arguments"); - } - - if (ERROR_SET(&error)) { - send_error(channel, type, request_id, error.msg); - api_clear_error(&error); - api_free_array(args); + if (!p->handler.fn) { + send_error(channel, p->type, p->request_id, p->unpack_error.msg); + api_clear_error(&p->unpack_error); + arena_mem_free(arena_finish(&p->arena), &p->reuse_blk); return; } RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); - evdata->type = type; + evdata->type = p->type; evdata->channel = channel; - evdata->handler = handler; + evdata->handler = p->handler; evdata->args = args; - evdata->request_id = request_id; + evdata->used_mem = arena_finish(&p->arena); + evdata->request_id = p->request_id; channel_incref(channel); - if (handler.fast) { - bool is_get_mode = handler.fn == handle_nvim_get_mode; + if (p->handler.fast) { + bool is_get_mode = p->handler.fn == handle_nvim_get_mode; if (is_get_mode && !input_blocking()) { // Defer the event to a special queue used by os/input.c. #6247 @@ -346,7 +311,7 @@ static void handle_request(Channel *channel, msgpack_object *request) request_event((void **)&evdata); } } else { - bool is_resize = handler.fn == handle_nvim_ui_try_resize; + bool is_resize = p->handler.fn == handle_nvim_ui_try_resize; if (is_resize) { Event ev = event_create_oneshot(event_create(request_event, 1, evdata), 2); @@ -354,12 +319,11 @@ static void handle_request(Channel *channel, msgpack_object *request) multiqueue_put_event(resize_events, ev); } else { multiqueue_put(channel->events, request_event, 1, evdata); - DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr); + DLOG("RPC: scheduled %.*s", (int)p->method_name_len, p->handler.name); } } } - /// Handles a message, depending on the type: /// - Request: invokes method and writes the response (or error). /// - Notification: invokes method (emits `nvim_error_event` on error). @@ -389,12 +353,24 @@ static void request_event(void **argv) } free_ret: - api_free_array(e->args); + // e->args is allocated in an arena + arena_mem_free(e->used_mem, &channel->rpc.unpacker->reuse_blk); channel_decref(channel); xfree(e); api_clear_error(&error); } +bool rpc_write_raw(uint64_t id, WBuffer *buffer) +{ + Channel *channel = find_rpc_channel(id); + if (!channel) { + wstream_release_wbuffer(buffer); + return false; + } + + return channel_write(channel, buffer); +} + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; @@ -413,7 +389,6 @@ static bool channel_write(Channel *channel, WBuffer *buffer) success = wstream_write(in, buffer); } - if (!success) { // If the write failed for any reason, close the channel char buf[256]; @@ -422,7 +397,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "ch %" PRIu64 ": stream write failed. " "RPC canceled; closing channel", channel->id); - call_set_error(channel, buf, ERROR_LOG_LEVEL); + chan_close_with_error(channel, buf, LOGLVL_ERR); } return success; @@ -432,14 +407,19 @@ static void internal_read_event(void **argv) { Channel *channel = argv[0]; WBuffer *buffer = argv[1]; + Unpacker *p = channel->rpc.unpacker; - msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), - buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); - + p->read_ptr = buffer->data; + p->read_size = buffer->size; parse_msgpack(channel); + if (p->read_size) { + // This should not happen, as WBuffer is one single serialized message. + if (!channel->rpc.closed) { + chan_close_with_error(channel, "internal channel: internal error", LOGLVL_ERR); + } + } + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -535,7 +515,6 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } - /// Mark rpc state as closed, and release its reference to the channel. /// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) void rpc_close(Channel *channel) @@ -547,13 +526,25 @@ void rpc_close(Channel *channel) channel->rpc.closed = true; channel_decref(channel); - if (channel->streamtype == kChannelStreamStdio) { + if (channel->streamtype == kChannelStreamStdio + || channel->id == ui_client_channel_id) { multiqueue_put(main_loop.fast_events, exit_event, 0); } } +static void exit_delay_cb(uv_timer_t *handle) +{ + uv_timer_stop(&main_loop.exit_delay_timer); + multiqueue_put(main_loop.fast_events, exit_event, 0); +} + static void exit_event(void **argv) { + if (exit_need_delay) { + uv_timer_start(&main_loop.exit_delay_timer, exit_delay_cb, 0, 0); + return; + } + if (!exiting) { os_exit(0); } @@ -562,7 +553,8 @@ static void exit_event(void **argv) void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - msgpack_unpacker_free(channel->rpc.unpacker); + unpacker_teardown(channel->rpc.unpacker); + xfree(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; @@ -575,48 +567,13 @@ void rpc_free(Channel *channel) api_free_dictionary(channel->rpc.info); } -static bool is_rpc_response(msgpack_object *obj) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 4 - && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.array.ptr[0].via.u64 == 1 - && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; -} - -static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) -{ - uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64; - if (kv_size(channel->rpc.call_stack) == 0) { - return false; - } - - // Must be equal to the frame at the stack's bottom - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - return response_id == frame->request_id; -} - -static void complete_call(msgpack_object *obj, Channel *channel) -{ - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - frame->returned = true; - frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - - if (frame->errored) { - msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); - } else { - msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); - } -} - -static void call_set_error(Channel *channel, char *msg, int loglevel) +static void chan_close_with_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; - api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } @@ -697,7 +654,7 @@ const char *rpc_client_name(Channel *chan) return NULL; } -#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#if MIN_LOG_LEVEL <= LOGLVL_DBG # define REQ "[request] " # define RES "[response] " # define NOT "[notify] " @@ -727,7 +684,8 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) log_lock(); FILE *f = open_log_file(); fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); - log_msg_close(f, unpacked.data); + msgpack_object_print(f, unpacked.data); + log_close(f); msgpack_unpacked_destroy(&unpacked); break; } @@ -738,30 +696,24 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) log_lock(); FILE *f = open_log_file(); fprintf(f, ERR); - log_msg_close(f, (msgpack_object) { - .type = MSGPACK_OBJECT_STR, - .via.str = { - .ptr = (char *)msgpack_error_messages[result + MUR_OFF], - .size = (uint32_t)strlen(msgpack_error_messages[result + MUR_OFF]), - }, - }); + fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]); + log_close(f); break; } } } -static void log_client_msg(uint64_t channel_id, bool is_request, msgpack_object msg) +static void log_client_msg(uint64_t channel_id, bool is_request, const char *name) { DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); log_lock(); FILE *f = open_log_file(); - fprintf(f, is_request ? REQ : RES); - log_msg_close(f, msg); + fprintf(f, "%s: %s", is_request ? REQ : RES, name); + log_close(f); } -static void log_msg_close(FILE *f, msgpack_object msg) +static void log_close(FILE *f) { - msgpack_object_print(f, msg); fputc('\n', f); fflush(f); fclose(f); |