diff options
author | Josh Rahm <joshuarahm@gmail.com> | 2023-11-30 20:35:25 +0000 |
---|---|---|
committer | Josh Rahm <joshuarahm@gmail.com> | 2023-11-30 20:35:25 +0000 |
commit | 1b7b916b7631ddf73c38e3a0070d64e4636cb2f3 (patch) | |
tree | cd08258054db80bb9a11b1061bb091c70b76926a /src/nvim/msgpack_rpc/channel.c | |
parent | eaa89c11d0f8aefbb512de769c6c82f61a8baca3 (diff) | |
parent | 4a8bf24ac690004aedf5540fa440e788459e5e34 (diff) | |
download | rneovim-aucmd_textputpost.tar.gz rneovim-aucmd_textputpost.tar.bz2 rneovim-aucmd_textputpost.zip |
Merge remote-tracking branch 'upstream/master' into aucmd_textputpostaucmd_textputpost
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 263 |
1 files changed, 148 insertions, 115 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index d60e18590f..0fb1ebf931 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -1,6 +1,3 @@ -// This is an open source non-commercial project. Dear PVS-Studio, please check -// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com - #include <assert.h> #include <inttypes.h> #include <msgpack/object.h> @@ -10,7 +7,6 @@ #include <stdbool.h> #include <stdio.h> #include <stdlib.h> -#include <uv.h> #include "klib/kvec.h" #include "nvim/api/private/defs.h" @@ -24,9 +20,11 @@ #include "nvim/event/rstream.h" #include "nvim/event/stream.h" #include "nvim/event/wstream.h" +#include "nvim/func_attr.h" +#include "nvim/globals.h" #include "nvim/log.h" #include "nvim/main.h" -#include "nvim/map.h" +#include "nvim/map_defs.h" #include "nvim/memory.h" #include "nvim/message.h" #include "nvim/msgpack_rpc/channel.h" @@ -35,16 +33,82 @@ #include "nvim/msgpack_rpc/unpacker.h" #include "nvim/os/input.h" #include "nvim/rbuffer.h" -#include "nvim/types.h" +#include "nvim/types_defs.h" #include "nvim/ui.h" #include "nvim/ui_client.h" -#if MIN_LOG_LEVEL > LOGLVL_DBG +#ifdef NVIM_LOG_DEBUG +# define REQ "[request] " +# define RES "[response] " +# define NOT "[notify] " +# define ERR "[error] " + +// Cannot define array with negative offsets, so this one is needed to be added +// to MSGPACK_UNPACK_\* values. +# define MUR_OFF 2 + +static const char *const msgpack_error_messages[] = { + [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found", + [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string", + [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error", + [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory", +}; + +static void log_close(FILE *f) +{ + fputc('\n', f); + fflush(f); + fclose(f); + log_unlock(); +} + +static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) +{ + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + DLOGN("RPC ->ch %" PRIu64 ": ", channel_id); + const msgpack_unpack_return result = + msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); + switch (result) { + case MSGPACK_UNPACK_SUCCESS: { + uint64_t type = unpacked.data.via.array.ptr[0].via.u64; + log_lock(); + FILE *f = open_log_file(); + fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); + msgpack_object_print(f, unpacked.data); + log_close(f); + msgpack_unpacked_destroy(&unpacked); + break; + } + case MSGPACK_UNPACK_EXTRA_BYTES: + case MSGPACK_UNPACK_CONTINUE: + case MSGPACK_UNPACK_PARSE_ERROR: + case MSGPACK_UNPACK_NOMEM_ERROR: { + log_lock(); + FILE *f = open_log_file(); + fprintf(f, ERR); + fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]); + log_close(f); + break; + } + } +} + +static void log_client_msg(uint64_t channel_id, bool is_request, const char *name) +{ + DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); + log_lock(); + FILE *f = open_log_file(); + fprintf(f, "%s: %s", is_request ? REQ : RES, name); + log_close(f); +} + +#else # define log_client_msg(...) # define log_server_msg(...) #endif -static PMap(cstr_t) event_strings = MAP_INIT; +static Set(cstr_t) event_strings = SET_INIT; static msgpack_sbuffer out_buffer; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -71,7 +135,7 @@ void rpc_start(Channel *channel) if (channel->streamtype != kChannelStreamInternal) { Stream *out = channel_outstream(channel); -#if MIN_LOG_LEVEL <= LOGLVL_DBG +#ifdef NVIM_LOG_DEBUG Stream *in = channel_instream(channel); DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, (void *)in, (void *)out); @@ -141,9 +205,15 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL, NULL }; kv_push(rpc->call_stack, &frame); - LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); + LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned || rpc->closed); (void)kv_pop(rpc->call_stack); + if (rpc->closed) { + api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); + channel_decref(channel); + return NIL; + } + if (frame.errored) { if (frame.result.type == kObjectTypeString) { api_set_error(err, kErrorTypeException, "%s", @@ -188,14 +258,12 @@ void rpc_subscribe(uint64_t id, char *event) abort(); } - char *event_string = pmap_get(cstr_t)(&event_strings, event); - - if (!event_string) { - event_string = xstrdup(event); - pmap_put(cstr_t)(&event_strings, event_string, event_string); + const char **key_alloc = NULL; + if (set_put_ref(cstr_t, &event_strings, event, &key_alloc)) { + *key_alloc = xstrdup(event); } - pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); + set_put(cstr_t, channel->rpc.subscribed_events, *key_alloc); } /// Unsubscribes to event broadcasts @@ -242,26 +310,43 @@ end: channel_decref(channel); } +static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id) +{ + for (size_t i = 0; i < kv_size(rpc->call_stack); i++) { + ChannelCallFrame *frame = kv_Z(rpc->call_stack, i); + if (frame->request_id == request_id) { + return frame; + } + } + return NULL; +} + static void parse_msgpack(Channel *channel) { Unpacker *p = channel->rpc.unpacker; while (unpacker_advance(p)) { if (p->type == kMessageTypeRedrawEvent) { - if (p->grid_line_event) { - ui_client_event_raw_line(p->grid_line_event); - } else if (p->ui_handler.fn != NULL && p->result.type == kObjectTypeArray) { - p->ui_handler.fn(p->result.data.array); + // When exiting, ui_client_stop() has already been called, so don't handle UI events. + if (ui_client_channel_id && !exiting) { + if (p->grid_line_event) { + ui_client_event_raw_line(p->grid_line_event); + } else if (p->ui_handler.fn != NULL && p->result.type == kObjectTypeArray) { + p->ui_handler.fn(p->result.data.array); + } } arena_mem_free(arena_finish(&p->arena)); } else if (p->type == kMessageTypeResponse) { - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - if (p->request_id != frame->request_id) { + ChannelCallFrame *frame = channel->rpc.client_type == kClientTypeMsgpackRpc + ? find_call_frame(&channel->rpc, p->request_id) + : kv_last(channel->rpc.call_stack); + if (frame == NULL || p->request_id != frame->request_id) { char buf[256]; snprintf(buf, sizeof(buf), - "ch %" PRIu64 " returned a response with an unknown request " - "id. Ensure the client is properly synchronized", - channel->id); + "ch %" PRIu64 " (type=%" PRIu32 ") returned a response with an unknown request " + "id %" PRIu32 ". Ensure the client is properly synchronized", + channel->id, (unsigned)channel->rpc.client_type, p->request_id); chan_close_with_error(channel, buf, LOGLVL_ERR); + return; } frame->returned = true; frame->errored = (p->error.type != kObjectTypeNil); @@ -486,7 +571,7 @@ static void broadcast_event(const char *name, Array args) map_foreach_value(&channels, channel, { if (channel->is_rpc - && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { + && set_has(cstr_t, channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -514,24 +599,12 @@ end: static void unsubscribe(Channel *channel, char *event) { - char *event_string = pmap_get(cstr_t)(&event_strings, event); - if (!event_string) { + if (!set_has(cstr_t, &event_strings, event)) { WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'", channel->id, event); return; } - pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); - - map_foreach_value(&channels, channel, { - if (channel->is_rpc - && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { - return; - } - }); - - // Since the string is no longer used by other channels, release it's memory - pmap_del(cstr_t)(&event_strings, event_string); - xfree(event_string); + set_del(cstr_t, channel->rpc.subscribed_events, event); } /// Mark rpc state as closed, and release its reference to the channel. @@ -547,6 +620,10 @@ void rpc_close(Channel *channel) if (channel->streamtype == kChannelStreamStdio || (channel->id == ui_client_channel_id && channel->streamtype != kChannelStreamProc)) { + if (channel->streamtype == kChannelStreamStdio) { + // Avoid hanging when there are no other UIs and a prompt is triggered on exit. + remote_ui_disconnect(channel->id); + } exit_from_channel(0); } } @@ -557,13 +634,7 @@ void rpc_free(Channel *channel) unpacker_teardown(channel->rpc.unpacker); xfree(channel->rpc.unpacker); - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->rpc.subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_destroy(cstr_t)(channel->rpc.subscribed_events); + set_destroy(cstr_t, channel->rpc.subscribed_events); kv_destroy(channel->rpc.call_stack); api_free_dictionary(channel->rpc.info); } @@ -575,7 +646,7 @@ static void chan_close_with_error(Channel *channel, char *msg, int loglevel) ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; - frame->result = STRING_OBJ(cstr_to_string(msg)); + frame->result = CSTR_TO_OBJ(msg); } channel_close(channel->id, kChannelPartRpc, NULL); @@ -612,7 +683,7 @@ static WBuffer *serialize_response(uint64_t channel_id, MsgpackRpcRequestHandler } else { Array args = ARRAY_DICT_INIT; ADD(args, INTEGER_OBJ(err->type)); - ADD(args, STRING_OBJ(cstr_to_string(err->msg))); + ADD(args, CSTR_TO_OBJ(err->msg)); msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"), args, &pac); api_free_array(args); @@ -638,6 +709,25 @@ void rpc_set_client_info(uint64_t id, Dictionary info) api_free_dictionary(chan->rpc.info); chan->rpc.info = info; + + // Parse "type" on "info" and set "client_type" + const char *type = get_client_info(chan, "type"); + if (type == NULL || strequal(type, "remote")) { + chan->rpc.client_type = kClientTypeRemote; + } else if (strequal(type, "msgpack-rpc")) { + chan->rpc.client_type = kClientTypeMsgpackRpc; + } else if (strequal(type, "ui")) { + chan->rpc.client_type = kClientTypeUi; + } else if (strequal(type, "embedder")) { + chan->rpc.client_type = kClientTypeEmbedder; + } else if (strequal(type, "host")) { + chan->rpc.client_type = kClientTypeHost; + } else if (strequal(type, "plugin")) { + chan->rpc.client_type = kClientTypePlugin; + } else { + chan->rpc.client_type = kClientTypeUnknown; + } + channel_info_changed(chan, false); } @@ -646,14 +736,15 @@ Dictionary rpc_client_info(Channel *chan) return copy_dictionary(chan->rpc.info, NULL); } -const char *rpc_client_name(Channel *chan) +const char *get_client_info(Channel *chan, const char *key) + FUNC_ATTR_NONNULL_ALL { if (!chan->is_rpc) { return NULL; } Dictionary info = chan->rpc.info; for (size_t i = 0; i < info.size; i++) { - if (strequal("name", info.items[i].key.data) + if (strequal(key, info.items[i].key.data) && info.items[i].value.type == kObjectTypeString) { return info.items[i].value.data.string.data; } @@ -662,69 +753,11 @@ const char *rpc_client_name(Channel *chan) return NULL; } -#if MIN_LOG_LEVEL <= LOGLVL_DBG -# define REQ "[request] " -# define RES "[response] " -# define NOT "[notify] " -# define ERR "[error] " - -// Cannot define array with negative offsets, so this one is needed to be added -// to MSGPACK_UNPACK_\* values. -# define MUR_OFF 2 - -static const char *const msgpack_error_messages[] = { - [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found", - [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string", - [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error", - [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory", -}; - -static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) -{ - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - DLOGN("RPC ->ch %" PRIu64 ": ", channel_id); - const msgpack_unpack_return result = - msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); - switch (result) { - case MSGPACK_UNPACK_SUCCESS: { - uint64_t type = unpacked.data.via.array.ptr[0].via.u64; - log_lock(); - FILE *f = open_log_file(); - fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); - msgpack_object_print(f, unpacked.data); - log_close(f); - msgpack_unpacked_destroy(&unpacked); - break; - } - case MSGPACK_UNPACK_EXTRA_BYTES: - case MSGPACK_UNPACK_CONTINUE: - case MSGPACK_UNPACK_PARSE_ERROR: - case MSGPACK_UNPACK_NOMEM_ERROR: { - log_lock(); - FILE *f = open_log_file(); - fprintf(f, ERR); - fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]); - log_close(f); - break; - } - } -} - -static void log_client_msg(uint64_t channel_id, bool is_request, const char *name) +void rpc_free_all_mem(void) { - DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); - log_lock(); - FILE *f = open_log_file(); - fprintf(f, "%s: %s", is_request ? REQ : RES, name); - log_close(f); -} - -static void log_close(FILE *f) -{ - fputc('\n', f); - fflush(f); - fclose(f); - log_unlock(); + cstr_t key; + set_foreach(&event_strings, key, { + xfree((void *)key); + }); + set_destroy(cstr_t, &event_strings); } -#endif |