aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c168
1 files changed, 72 insertions, 96 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index a2d8859c68..a1a1f0f8c0 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,44 +1,43 @@
// This is an open source non-commercial project. Dear PVS-Studio, please check
// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+#include <inttypes.h>
+#include <msgpack.h>
#include <stdbool.h>
#include <string.h>
-#include <inttypes.h>
-
#include <uv.h>
-#include <msgpack.h>
#include "nvim/api/private/helpers.h"
-#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
+#include "nvim/api/vim.h"
+#include "nvim/ascii.h"
#include "nvim/channel.h"
-#include "nvim/msgpack_rpc/channel.h"
-#include "nvim/event/loop.h"
+#include "nvim/eval.h"
#include "nvim/event/libuv_process.h"
+#include "nvim/event/loop.h"
#include "nvim/event/rstream.h"
-#include "nvim/event/wstream.h"
#include "nvim/event/socket.h"
-#include "nvim/msgpack_rpc/helpers.h"
-#include "nvim/vim.h"
+#include "nvim/event/wstream.h"
+#include "nvim/lib/kvec.h"
+#include "nvim/log.h"
#include "nvim/main.h"
-#include "nvim/ascii.h"
+#include "nvim/map.h"
#include "nvim/memory.h"
-#include "nvim/eval.h"
-#include "nvim/os_unix.h"
#include "nvim/message.h"
-#include "nvim/map.h"
-#include "nvim/log.h"
#include "nvim/misc1.h"
-#include "nvim/lib/kvec.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/os/input.h"
+#include "nvim/os_unix.h"
#include "nvim/ui.h"
+#include "nvim/vim.h"
#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
-#define log_client_msg(...)
-#define log_server_msg(...)
+# define log_client_msg(...)
+# define log_server_msg(...)
#endif
-static PMap(cstr_t) *event_strings = NULL;
+static PMap(cstr_t) event_strings = MAP_INIT;
static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -48,7 +47,6 @@ static msgpack_sbuffer out_buffer;
void rpc_init(void)
{
ch_before_blocking_events = multiqueue_new_child(main_loop.events);
- event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
}
@@ -60,7 +58,6 @@ void rpc_start(Channel *channel)
RpcState *rpc = &channel->rpc;
rpc->closed = false;
rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rpc->subscribed_events = pmap_new(cstr_t)();
rpc->next_request_id = 1;
rpc->info = (Dictionary)ARRAY_DICT_INIT;
kv_init(rpc->call_stack);
@@ -104,7 +101,7 @@ bool rpc_send_event(uint64_t id, const char *name, Array args)
if (channel) {
send_event(channel, name, args);
- } else {
+ } else {
broadcast_event(name, args);
}
@@ -118,10 +115,7 @@ bool rpc_send_event(uint64_t id, const char *name, Array args)
/// @param args Array with method arguments
/// @param[out] error True if the return value is an error
/// @return Whatever the remote method returned
-Object rpc_send_call(uint64_t id,
- const char *method_name,
- Array args,
- Error *err)
+Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *err)
{
Channel *channel = NULL;
@@ -183,11 +177,11 @@ void rpc_subscribe(uint64_t id, char *event)
abort();
}
- char *event_string = pmap_get(cstr_t)(event_strings, event);
+ char *event_string = pmap_get(cstr_t)(&event_strings, event);
if (!event_string) {
event_string = xstrdup(event);
- pmap_put(cstr_t)(event_strings, event_string, event_string);
+ pmap_put(cstr_t)(&event_strings, event_string, event_string);
}
pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
@@ -208,8 +202,7 @@ void rpc_unsubscribe(uint64_t id, char *event)
unsubscribe(channel, event);
}
-static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
- void *data, bool eof)
+static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
{
Channel *channel = data;
channel_incref(channel);
@@ -465,10 +458,7 @@ static void send_error(Channel *chan, MessageType type, uint32_t id, char *err)
api_clear_error(&e);
}
-static void send_request(Channel *channel,
- uint32_t id,
- const char *name,
- Array args)
+static void send_request(Channel *channel, uint32_t id, const char *name, Array args)
{
const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
@@ -479,9 +469,7 @@ static void send_request(Channel *channel,
1));
}
-static void send_event(Channel *channel,
- const char *name,
- Array args)
+static void send_event(Channel *channel, const char *name, Array args)
{
const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
@@ -497,7 +485,7 @@ static void broadcast_event(const char *name, Array args)
kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
Channel *channel;
- map_foreach_value(channels, channel, {
+ map_foreach_value(&channels, channel, {
if (channel->is_rpc
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
kv_push(subscribed, channel);
@@ -528,15 +516,15 @@ end:
static void unsubscribe(Channel *channel, char *event)
{
- char *event_string = pmap_get(cstr_t)(event_strings, event);
+ char *event_string = pmap_get(cstr_t)(&event_strings, event);
if (!event_string) {
- WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
- channel->id, event);
- return;
+ WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
+ channel->id, event);
+ return;
}
pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
- map_foreach_value(channels, channel, {
+ map_foreach_value(&channels, channel, {
if (channel->is_rpc
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
return;
@@ -544,7 +532,7 @@ static void unsubscribe(Channel *channel, char *event)
});
// Since the string is no longer used by other channels, release it's memory
- pmap_del(cstr_t)(event_strings, event_string);
+ pmap_del(cstr_t)(&event_strings, event_string);
xfree(event_string);
}
@@ -583,7 +571,7 @@ void rpc_free(Channel *channel)
unsubscribe(channel, event_string);
});
- pmap_free(cstr_t)(channel->rpc.subscribed_events);
+ pmap_destroy(cstr_t)(channel->rpc.subscribed_events);
kv_destroy(channel->rpc.call_stack);
api_free_dictionary(channel->rpc.info);
}
@@ -591,10 +579,10 @@ void rpc_free(Channel *channel)
static bool is_rpc_response(msgpack_object *obj)
{
return obj->type == MSGPACK_OBJECT_ARRAY
- && obj->via.array.size == 4
- && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
- && obj->via.array.ptr[0].via.u64 == 1
- && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
+ && obj->via.array.size == 4
+ && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
+ && obj->via.array.ptr[0].via.u64 == 1
+ && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
}
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
@@ -636,12 +624,8 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
channel_close(channel->id, kChannelPartRpc, NULL);
}
-static WBuffer *serialize_request(uint64_t channel_id,
- uint32_t request_id,
- const String method,
- Array args,
- msgpack_sbuffer *sbuffer,
- size_t refcount)
+static WBuffer *serialize_request(uint64_t channel_id, uint32_t request_id, const String method,
+ Array args, msgpack_sbuffer *sbuffer, size_t refcount)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
@@ -656,12 +640,8 @@ static WBuffer *serialize_request(uint64_t channel_id,
return rv;
}
-static WBuffer *serialize_response(uint64_t channel_id,
- MessageType type,
- uint32_t response_id,
- Error *err,
- Object arg,
- msgpack_sbuffer *sbuffer)
+static WBuffer *serialize_response(uint64_t channel_id, MessageType type, uint32_t response_id,
+ Error *err, Object arg, msgpack_sbuffer *sbuffer)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
@@ -719,14 +699,14 @@ const char *rpc_client_name(Channel *chan)
}
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
-#define REQ "[request] "
-#define RES "[response] "
-#define NOT "[notify] "
-#define ERR "[error] "
+# define REQ "[request] "
+# define RES "[response] "
+# define NOT "[notify] "
+# define ERR "[error] "
// Cannot define array with negative offsets, so this one is needed to be added
// to MSGPACK_UNPACK_\* values.
-#define MUR_OFF 2
+# define MUR_OFF 2
static const char *const msgpack_error_messages[] = {
[MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found",
@@ -735,47 +715,43 @@ static const char *const msgpack_error_messages[] = {
[MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory",
};
-static void log_server_msg(uint64_t channel_id,
- msgpack_sbuffer *packed)
+static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed)
{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
const msgpack_unpack_return result =
- msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
+ msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
switch (result) {
- case MSGPACK_UNPACK_SUCCESS: {
- uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
- log_msg_close(f, unpacked.data);
- msgpack_unpacked_destroy(&unpacked);
- break;
- }
- case MSGPACK_UNPACK_EXTRA_BYTES:
- case MSGPACK_UNPACK_CONTINUE:
- case MSGPACK_UNPACK_PARSE_ERROR:
- case MSGPACK_UNPACK_NOMEM_ERROR: {
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, ERR);
- log_msg_close(f, (msgpack_object) {
- .type = MSGPACK_OBJECT_STR,
- .via.str = {
- .ptr = (char *)msgpack_error_messages[result + MUR_OFF],
- .size = (uint32_t)strlen(
- msgpack_error_messages[result + MUR_OFF]),
- },
+ case MSGPACK_UNPACK_SUCCESS: {
+ uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
+ log_msg_close(f, unpacked.data);
+ msgpack_unpacked_destroy(&unpacked);
+ break;
+ }
+ case MSGPACK_UNPACK_EXTRA_BYTES:
+ case MSGPACK_UNPACK_CONTINUE:
+ case MSGPACK_UNPACK_PARSE_ERROR:
+ case MSGPACK_UNPACK_NOMEM_ERROR: {
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, ERR);
+ log_msg_close(f, (msgpack_object) {
+ .type = MSGPACK_OBJECT_STR,
+ .via.str = {
+ .ptr = (char *)msgpack_error_messages[result + MUR_OFF],
+ .size = (uint32_t)strlen(msgpack_error_messages[result + MUR_OFF]),
+ },
});
- break;
- }
+ break;
+ }
}
}
-static void log_client_msg(uint64_t channel_id,
- bool is_request,
- msgpack_object msg)
+static void log_client_msg(uint64_t channel_id, bool is_request, msgpack_object msg)
{
DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
log_lock();