aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c155
1 files changed, 129 insertions, 26 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 94605c37e9..3325b294dd 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -31,6 +31,11 @@
#define CHANNEL_BUFFER_SIZE 0xffff
+#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
+#define log_client_msg(...)
+#define log_server_msg(...)
+#endif
+
typedef struct {
uint64_t request_id;
bool returned, errored;
@@ -225,6 +230,10 @@ Object channel_send_call(uint64_t id,
return NIL;
}
+ if (channel->closed && !kv_size(channel->call_stack)) {
+ free_channel(channel);
+ }
+
return frame.result;
}
@@ -328,7 +337,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
Channel *channel = data;
if (eof) {
- goto end;
+ close_channel(channel);
}
size_t count = rstream_pending(rstream);
@@ -348,7 +357,10 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// Deserialize everything we can.
while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
MSGPACK_UNPACK_SUCCESS) {
- if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
+ bool is_response = is_rpc_response(&unpacked.data);
+ log_client_msg(channel->id, !is_response, unpacked.data);
+
+ if (kv_size(channel->call_stack) && is_response) {
if (is_valid_rpc_response(&unpacked.data, channel)) {
complete_call(&unpacked.data, channel);
} else {
@@ -363,7 +375,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -386,14 +398,6 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- if (eof && !channel->is_job && !kv_size(channel->call_stack)) {
- // The free_channel call is deferred for jobs because it's possible that
- // job_stderr will called after this. For non-job channels, this is the
- // last callback so it must be freed now.
- free_channel(channel);
- }
}
static void handle_request(Channel *channel, msgpack_object *request)
@@ -406,7 +410,11 @@ static void handle_request(Channel *channel, msgpack_object *request)
if (error.set) {
// Validation failed, send response with error
channel_write(channel,
- serialize_response(request_id, &error, NIL, &out_buffer));
+ serialize_response(channel->id,
+ request_id,
+ &error,
+ NIL,
+ &out_buffer));
return;
}
@@ -459,14 +467,13 @@ static void call_request_handler(Channel *channel,
// send the response
msgpack_packer response;
msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
-
- if (error.set) {
- channel_write(channel,
- serialize_response(request_id, &error, NIL, &out_buffer));
- }
-
- channel_write(channel,
- serialize_response(request_id, &error, result, &out_buffer));
+ channel_write(channel, serialize_response(channel->id,
+ request_id,
+ &error,
+ result,
+ &out_buffer));
+ // All arguments were freed already, but we still need to free the array
+ free(args.items);
}
static bool channel_write(Channel *channel, WBuffer *buffer)
@@ -497,7 +504,11 @@ static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
api_set_error(&e, Exception, "%s", err);
- channel_write(channel, serialize_response(id, &e, NIL, &out_buffer));
+ channel_write(channel, serialize_response(channel->id,
+ id,
+ &e,
+ NIL,
+ &out_buffer));
}
static void send_request(Channel *channel,
@@ -506,7 +517,12 @@ static void send_request(Channel *channel,
Array args)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(id, method, args, &out_buffer, 1));
+ channel_write(channel, serialize_request(channel->id,
+ id,
+ method,
+ args,
+ &out_buffer,
+ 1));
}
static void send_event(Channel *channel,
@@ -514,7 +530,12 @@ static void send_event(Channel *channel,
Array args)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(0, method, args, &out_buffer, 1));
+ channel_write(channel, serialize_request(channel->id,
+ 0,
+ method,
+ args,
+ &out_buffer,
+ 1));
}
static void broadcast_event(char *name, Array args)
@@ -536,6 +557,7 @@ static void broadcast_event(char *name, Array args)
String method = {.size = strlen(name), .data = name};
WBuffer *buffer = serialize_request(0,
+ 0,
method,
args,
&out_buffer,
@@ -569,6 +591,10 @@ static void unsubscribe(Channel *channel, char *event)
/// free_channel later.
static void close_channel(Channel *channel)
{
+ if (channel->closed) {
+ return;
+ }
+
channel->closed = true;
if (channel->is_job) {
if (channel->data.job) {
@@ -577,10 +603,10 @@ static void close_channel(Channel *channel)
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
- if (channel->data.streams.uv) {
- uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
+ uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv;
+ if (handle) {
+ uv_close(handle, close_cb);
} else {
- // When the stdin channel closes, it's time to go
mch_exit(0);
}
}
@@ -663,3 +689,80 @@ static void call_set_error(Channel *channel, char *msg)
close_channel(channel);
}
+
+static WBuffer *serialize_request(uint64_t channel_id,
+ uint64_t request_id,
+ String method,
+ Array args,
+ msgpack_sbuffer *sbuffer,
+ size_t refcount)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_rpc_serialize_request(request_id, method, args, &pac);
+ log_server_msg(channel_id, sbuffer);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ refcount,
+ free);
+ msgpack_sbuffer_clear(sbuffer);
+ api_free_array(args);
+ return rv;
+}
+
+static WBuffer *serialize_response(uint64_t channel_id,
+ uint64_t response_id,
+ Error *err,
+ Object arg,
+ msgpack_sbuffer *sbuffer)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_rpc_serialize_response(response_id, err, arg, &pac);
+ log_server_msg(channel_id, sbuffer);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ 1, // responses only go though 1 channel
+ free);
+ msgpack_sbuffer_clear(sbuffer);
+ api_free_object(arg);
+ return rv;
+}
+
+#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
+#define REQ "[response] "
+#define RES "[response] "
+#define NOT "[notification] "
+
+static void log_server_msg(uint64_t channel_id,
+ msgpack_sbuffer *packed)
+{
+ msgpack_unpacked unpacked;
+ msgpack_unpacked_init(&unpacked);
+ msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
+ uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
+ DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id);
+ FILE *f = open_log_file();
+ fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
+ log_msg_close(f, unpacked.data);
+ msgpack_unpacked_destroy(&unpacked);
+}
+
+static void log_client_msg(uint64_t channel_id,
+ bool is_request,
+ msgpack_object msg)
+{
+ DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id);
+ FILE *f = open_log_file();
+ fprintf(f, is_request ? REQ : RES);
+ log_msg_close(f, msg);
+}
+
+static void log_msg_close(FILE *f, msgpack_object msg)
+{
+ msgpack_object_print(f, msg);
+ fputc('\n', f);
+ fflush(f);
+ fclose(f);
+}
+#endif