aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
authorJosh Rahm <joshuarahm@gmail.com>2023-11-30 20:35:25 +0000
committerJosh Rahm <joshuarahm@gmail.com>2023-11-30 20:35:25 +0000
commit1b7b916b7631ddf73c38e3a0070d64e4636cb2f3 (patch)
treecd08258054db80bb9a11b1061bb091c70b76926a /src/nvim/msgpack_rpc/channel.c
parenteaa89c11d0f8aefbb512de769c6c82f61a8baca3 (diff)
parent4a8bf24ac690004aedf5540fa440e788459e5e34 (diff)
downloadrneovim-1b7b916b7631ddf73c38e3a0070d64e4636cb2f3.tar.gz
rneovim-1b7b916b7631ddf73c38e3a0070d64e4636cb2f3.tar.bz2
rneovim-1b7b916b7631ddf73c38e3a0070d64e4636cb2f3.zip
Merge remote-tracking branch 'upstream/master' into aucmd_textputpostaucmd_textputpost
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c263
1 files changed, 148 insertions, 115 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index d60e18590f..0fb1ebf931 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,6 +1,3 @@
-// This is an open source non-commercial project. Dear PVS-Studio, please check
-// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
-
#include <assert.h>
#include <inttypes.h>
#include <msgpack/object.h>
@@ -10,7 +7,6 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
-#include <uv.h>
#include "klib/kvec.h"
#include "nvim/api/private/defs.h"
@@ -24,9 +20,11 @@
#include "nvim/event/rstream.h"
#include "nvim/event/stream.h"
#include "nvim/event/wstream.h"
+#include "nvim/func_attr.h"
+#include "nvim/globals.h"
#include "nvim/log.h"
#include "nvim/main.h"
-#include "nvim/map.h"
+#include "nvim/map_defs.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/msgpack_rpc/channel.h"
@@ -35,16 +33,82 @@
#include "nvim/msgpack_rpc/unpacker.h"
#include "nvim/os/input.h"
#include "nvim/rbuffer.h"
-#include "nvim/types.h"
+#include "nvim/types_defs.h"
#include "nvim/ui.h"
#include "nvim/ui_client.h"
-#if MIN_LOG_LEVEL > LOGLVL_DBG
+#ifdef NVIM_LOG_DEBUG
+# define REQ "[request] "
+# define RES "[response] "
+# define NOT "[notify] "
+# define ERR "[error] "
+
+// Cannot define array with negative offsets, so this one is needed to be added
+// to MSGPACK_UNPACK_\* values.
+# define MUR_OFF 2
+
+static const char *const msgpack_error_messages[] = {
+ [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found",
+ [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string",
+ [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error",
+ [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory",
+};
+
+static void log_close(FILE *f)
+{
+ fputc('\n', f);
+ fflush(f);
+ fclose(f);
+ log_unlock();
+}
+
+static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed)
+{
+ msgpack_unpacked unpacked;
+ msgpack_unpacked_init(&unpacked);
+ DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
+ const msgpack_unpack_return result =
+ msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
+ switch (result) {
+ case MSGPACK_UNPACK_SUCCESS: {
+ uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
+ msgpack_object_print(f, unpacked.data);
+ log_close(f);
+ msgpack_unpacked_destroy(&unpacked);
+ break;
+ }
+ case MSGPACK_UNPACK_EXTRA_BYTES:
+ case MSGPACK_UNPACK_CONTINUE:
+ case MSGPACK_UNPACK_PARSE_ERROR:
+ case MSGPACK_UNPACK_NOMEM_ERROR: {
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, ERR);
+ fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]);
+ log_close(f);
+ break;
+ }
+ }
+}
+
+static void log_client_msg(uint64_t channel_id, bool is_request, const char *name)
+{
+ DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, "%s: %s", is_request ? REQ : RES, name);
+ log_close(f);
+}
+
+#else
# define log_client_msg(...)
# define log_server_msg(...)
#endif
-static PMap(cstr_t) event_strings = MAP_INIT;
+static Set(cstr_t) event_strings = SET_INIT;
static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -71,7 +135,7 @@ void rpc_start(Channel *channel)
if (channel->streamtype != kChannelStreamInternal) {
Stream *out = channel_outstream(channel);
-#if MIN_LOG_LEVEL <= LOGLVL_DBG
+#ifdef NVIM_LOG_DEBUG
Stream *in = channel_instream(channel);
DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
(void *)in, (void *)out);
@@ -141,9 +205,15 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL, NULL };
kv_push(rpc->call_stack, &frame);
- LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
+ LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned || rpc->closed);
(void)kv_pop(rpc->call_stack);
+ if (rpc->closed) {
+ api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
+ channel_decref(channel);
+ return NIL;
+ }
+
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
api_set_error(err, kErrorTypeException, "%s",
@@ -188,14 +258,12 @@ void rpc_subscribe(uint64_t id, char *event)
abort();
}
- char *event_string = pmap_get(cstr_t)(&event_strings, event);
-
- if (!event_string) {
- event_string = xstrdup(event);
- pmap_put(cstr_t)(&event_strings, event_string, event_string);
+ const char **key_alloc = NULL;
+ if (set_put_ref(cstr_t, &event_strings, event, &key_alloc)) {
+ *key_alloc = xstrdup(event);
}
- pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
+ set_put(cstr_t, channel->rpc.subscribed_events, *key_alloc);
}
/// Unsubscribes to event broadcasts
@@ -242,26 +310,43 @@ end:
channel_decref(channel);
}
+static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id)
+{
+ for (size_t i = 0; i < kv_size(rpc->call_stack); i++) {
+ ChannelCallFrame *frame = kv_Z(rpc->call_stack, i);
+ if (frame->request_id == request_id) {
+ return frame;
+ }
+ }
+ return NULL;
+}
+
static void parse_msgpack(Channel *channel)
{
Unpacker *p = channel->rpc.unpacker;
while (unpacker_advance(p)) {
if (p->type == kMessageTypeRedrawEvent) {
- if (p->grid_line_event) {
- ui_client_event_raw_line(p->grid_line_event);
- } else if (p->ui_handler.fn != NULL && p->result.type == kObjectTypeArray) {
- p->ui_handler.fn(p->result.data.array);
+ // When exiting, ui_client_stop() has already been called, so don't handle UI events.
+ if (ui_client_channel_id && !exiting) {
+ if (p->grid_line_event) {
+ ui_client_event_raw_line(p->grid_line_event);
+ } else if (p->ui_handler.fn != NULL && p->result.type == kObjectTypeArray) {
+ p->ui_handler.fn(p->result.data.array);
+ }
}
arena_mem_free(arena_finish(&p->arena));
} else if (p->type == kMessageTypeResponse) {
- ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
- if (p->request_id != frame->request_id) {
+ ChannelCallFrame *frame = channel->rpc.client_type == kClientTypeMsgpackRpc
+ ? find_call_frame(&channel->rpc, p->request_id)
+ : kv_last(channel->rpc.call_stack);
+ if (frame == NULL || p->request_id != frame->request_id) {
char buf[256];
snprintf(buf, sizeof(buf),
- "ch %" PRIu64 " returned a response with an unknown request "
- "id. Ensure the client is properly synchronized",
- channel->id);
+ "ch %" PRIu64 " (type=%" PRIu32 ") returned a response with an unknown request "
+ "id %" PRIu32 ". Ensure the client is properly synchronized",
+ channel->id, (unsigned)channel->rpc.client_type, p->request_id);
chan_close_with_error(channel, buf, LOGLVL_ERR);
+ return;
}
frame->returned = true;
frame->errored = (p->error.type != kObjectTypeNil);
@@ -486,7 +571,7 @@ static void broadcast_event(const char *name, Array args)
map_foreach_value(&channels, channel, {
if (channel->is_rpc
- && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
+ && set_has(cstr_t, channel->rpc.subscribed_events, name)) {
kv_push(subscribed, channel);
}
});
@@ -514,24 +599,12 @@ end:
static void unsubscribe(Channel *channel, char *event)
{
- char *event_string = pmap_get(cstr_t)(&event_strings, event);
- if (!event_string) {
+ if (!set_has(cstr_t, &event_strings, event)) {
WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
channel->id, event);
return;
}
- pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
-
- map_foreach_value(&channels, channel, {
- if (channel->is_rpc
- && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
- return;
- }
- });
-
- // Since the string is no longer used by other channels, release it's memory
- pmap_del(cstr_t)(&event_strings, event_string);
- xfree(event_string);
+ set_del(cstr_t, channel->rpc.subscribed_events, event);
}
/// Mark rpc state as closed, and release its reference to the channel.
@@ -547,6 +620,10 @@ void rpc_close(Channel *channel)
if (channel->streamtype == kChannelStreamStdio
|| (channel->id == ui_client_channel_id && channel->streamtype != kChannelStreamProc)) {
+ if (channel->streamtype == kChannelStreamStdio) {
+ // Avoid hanging when there are no other UIs and a prompt is triggered on exit.
+ remote_ui_disconnect(channel->id);
+ }
exit_from_channel(0);
}
}
@@ -557,13 +634,7 @@ void rpc_free(Channel *channel)
unpacker_teardown(channel->rpc.unpacker);
xfree(channel->rpc.unpacker);
- // Unsubscribe from all events
- char *event_string;
- map_foreach_value(channel->rpc.subscribed_events, event_string, {
- unsubscribe(channel, event_string);
- });
-
- pmap_destroy(cstr_t)(channel->rpc.subscribed_events);
+ set_destroy(cstr_t, channel->rpc.subscribed_events);
kv_destroy(channel->rpc.call_stack);
api_free_dictionary(channel->rpc.info);
}
@@ -575,7 +646,7 @@ static void chan_close_with_error(Channel *channel, char *msg, int loglevel)
ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
frame->returned = true;
frame->errored = true;
- frame->result = STRING_OBJ(cstr_to_string(msg));
+ frame->result = CSTR_TO_OBJ(msg);
}
channel_close(channel->id, kChannelPartRpc, NULL);
@@ -612,7 +683,7 @@ static WBuffer *serialize_response(uint64_t channel_id, MsgpackRpcRequestHandler
} else {
Array args = ARRAY_DICT_INIT;
ADD(args, INTEGER_OBJ(err->type));
- ADD(args, STRING_OBJ(cstr_to_string(err->msg)));
+ ADD(args, CSTR_TO_OBJ(err->msg));
msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"),
args, &pac);
api_free_array(args);
@@ -638,6 +709,25 @@ void rpc_set_client_info(uint64_t id, Dictionary info)
api_free_dictionary(chan->rpc.info);
chan->rpc.info = info;
+
+ // Parse "type" on "info" and set "client_type"
+ const char *type = get_client_info(chan, "type");
+ if (type == NULL || strequal(type, "remote")) {
+ chan->rpc.client_type = kClientTypeRemote;
+ } else if (strequal(type, "msgpack-rpc")) {
+ chan->rpc.client_type = kClientTypeMsgpackRpc;
+ } else if (strequal(type, "ui")) {
+ chan->rpc.client_type = kClientTypeUi;
+ } else if (strequal(type, "embedder")) {
+ chan->rpc.client_type = kClientTypeEmbedder;
+ } else if (strequal(type, "host")) {
+ chan->rpc.client_type = kClientTypeHost;
+ } else if (strequal(type, "plugin")) {
+ chan->rpc.client_type = kClientTypePlugin;
+ } else {
+ chan->rpc.client_type = kClientTypeUnknown;
+ }
+
channel_info_changed(chan, false);
}
@@ -646,14 +736,15 @@ Dictionary rpc_client_info(Channel *chan)
return copy_dictionary(chan->rpc.info, NULL);
}
-const char *rpc_client_name(Channel *chan)
+const char *get_client_info(Channel *chan, const char *key)
+ FUNC_ATTR_NONNULL_ALL
{
if (!chan->is_rpc) {
return NULL;
}
Dictionary info = chan->rpc.info;
for (size_t i = 0; i < info.size; i++) {
- if (strequal("name", info.items[i].key.data)
+ if (strequal(key, info.items[i].key.data)
&& info.items[i].value.type == kObjectTypeString) {
return info.items[i].value.data.string.data;
}
@@ -662,69 +753,11 @@ const char *rpc_client_name(Channel *chan)
return NULL;
}
-#if MIN_LOG_LEVEL <= LOGLVL_DBG
-# define REQ "[request] "
-# define RES "[response] "
-# define NOT "[notify] "
-# define ERR "[error] "
-
-// Cannot define array with negative offsets, so this one is needed to be added
-// to MSGPACK_UNPACK_\* values.
-# define MUR_OFF 2
-
-static const char *const msgpack_error_messages[] = {
- [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found",
- [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string",
- [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error",
- [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory",
-};
-
-static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed)
-{
- msgpack_unpacked unpacked;
- msgpack_unpacked_init(&unpacked);
- DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
- const msgpack_unpack_return result =
- msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
- switch (result) {
- case MSGPACK_UNPACK_SUCCESS: {
- uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
- msgpack_object_print(f, unpacked.data);
- log_close(f);
- msgpack_unpacked_destroy(&unpacked);
- break;
- }
- case MSGPACK_UNPACK_EXTRA_BYTES:
- case MSGPACK_UNPACK_CONTINUE:
- case MSGPACK_UNPACK_PARSE_ERROR:
- case MSGPACK_UNPACK_NOMEM_ERROR: {
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, ERR);
- fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]);
- log_close(f);
- break;
- }
- }
-}
-
-static void log_client_msg(uint64_t channel_id, bool is_request, const char *name)
+void rpc_free_all_mem(void)
{
- DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, "%s: %s", is_request ? REQ : RES, name);
- log_close(f);
-}
-
-static void log_close(FILE *f)
-{
- fputc('\n', f);
- fflush(f);
- fclose(f);
- log_unlock();
+ cstr_t key;
+ set_foreach(&event_strings, key, {
+ xfree((void *)key);
+ });
+ set_destroy(cstr_t, &event_strings);
}
-#endif