diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 155 |
1 files changed, 129 insertions, 26 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 94605c37e9..3325b294dd 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -31,6 +31,11 @@ #define CHANNEL_BUFFER_SIZE 0xffff +#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL +#define log_client_msg(...) +#define log_server_msg(...) +#endif + typedef struct { uint64_t request_id; bool returned, errored; @@ -225,6 +230,10 @@ Object channel_send_call(uint64_t id, return NIL; } + if (channel->closed && !kv_size(channel->call_stack)) { + free_channel(channel); + } + return frame.result; } @@ -328,7 +337,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) Channel *channel = data; if (eof) { - goto end; + close_channel(channel); } size_t count = rstream_pending(rstream); @@ -348,7 +357,10 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Deserialize everything we can. while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == MSGPACK_UNPACK_SUCCESS) { - if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + bool is_response = is_rpc_response(&unpacked.data); + log_client_msg(channel->id, !is_response, unpacked.data); + + if (kv_size(channel->call_stack) && is_response) { if (is_valid_rpc_response(&unpacked.data, channel)) { complete_call(&unpacked.data, channel); } else { @@ -363,7 +375,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -386,14 +398,6 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - if (eof && !channel->is_job && !kv_size(channel->call_stack)) { - // The free_channel call is deferred for jobs because it's possible that - // job_stderr will called after this. For non-job channels, this is the - // last callback so it must be freed now. - free_channel(channel); - } } static void handle_request(Channel *channel, msgpack_object *request) @@ -406,7 +410,11 @@ static void handle_request(Channel *channel, msgpack_object *request) if (error.set) { // Validation failed, send response with error channel_write(channel, - serialize_response(request_id, &error, NIL, &out_buffer)); + serialize_response(channel->id, + request_id, + &error, + NIL, + &out_buffer)); return; } @@ -459,14 +467,13 @@ static void call_request_handler(Channel *channel, // send the response msgpack_packer response; msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); - - if (error.set) { - channel_write(channel, - serialize_response(request_id, &error, NIL, &out_buffer)); - } - - channel_write(channel, - serialize_response(request_id, &error, result, &out_buffer)); + channel_write(channel, serialize_response(channel->id, + request_id, + &error, + result, + &out_buffer)); + // All arguments were freed already, but we still need to free the array + free(args.items); } static bool channel_write(Channel *channel, WBuffer *buffer) @@ -497,7 +504,11 @@ static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; api_set_error(&e, Exception, "%s", err); - channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); + channel_write(channel, serialize_response(channel->id, + id, + &e, + NIL, + &out_buffer)); } static void send_request(Channel *channel, @@ -506,7 +517,12 @@ static void send_request(Channel *channel, Array args) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); + channel_write(channel, serialize_request(channel->id, + id, + method, + args, + &out_buffer, + 1)); } static void send_event(Channel *channel, @@ -514,7 +530,12 @@ static void send_event(Channel *channel, Array args) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); + channel_write(channel, serialize_request(channel->id, + 0, + method, + args, + &out_buffer, + 1)); } static void broadcast_event(char *name, Array args) @@ -536,6 +557,7 @@ static void broadcast_event(char *name, Array args) String method = {.size = strlen(name), .data = name}; WBuffer *buffer = serialize_request(0, + 0, method, args, &out_buffer, @@ -569,6 +591,10 @@ static void unsubscribe(Channel *channel, char *event) /// free_channel later. static void close_channel(Channel *channel) { + if (channel->closed) { + return; + } + channel->closed = true; if (channel->is_job) { if (channel->data.job) { @@ -577,10 +603,10 @@ static void close_channel(Channel *channel) } else { rstream_free(channel->data.streams.read); wstream_free(channel->data.streams.write); - if (channel->data.streams.uv) { - uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; + if (handle) { + uv_close(handle, close_cb); } else { - // When the stdin channel closes, it's time to go mch_exit(0); } } @@ -663,3 +689,80 @@ static void call_set_error(Channel *channel, char *msg) close_channel(channel); } + +static WBuffer *serialize_request(uint64_t channel_id, + uint64_t request_id, + String method, + Array args, + msgpack_sbuffer *sbuffer, + size_t refcount) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_rpc_serialize_request(request_id, method, args, &pac); + log_server_msg(channel_id, sbuffer); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + refcount, + free); + msgpack_sbuffer_clear(sbuffer); + api_free_array(args); + return rv; +} + +static WBuffer *serialize_response(uint64_t channel_id, + uint64_t response_id, + Error *err, + Object arg, + msgpack_sbuffer *sbuffer) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_rpc_serialize_response(response_id, err, arg, &pac); + log_server_msg(channel_id, sbuffer); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + 1, // responses only go though 1 channel + free); + msgpack_sbuffer_clear(sbuffer); + api_free_object(arg); + return rv; +} + +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#define REQ "[response] " +#define RES "[response] " +#define NOT "[notification] " + +static void log_server_msg(uint64_t channel_id, + msgpack_sbuffer *packed) +{ + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); + uint64_t type = unpacked.data.via.array.ptr[0].via.u64; + DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id); + FILE *f = open_log_file(); + fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); + log_msg_close(f, unpacked.data); + msgpack_unpacked_destroy(&unpacked); +} + +static void log_client_msg(uint64_t channel_id, + bool is_request, + msgpack_object msg) +{ + DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id); + FILE *f = open_log_file(); + fprintf(f, is_request ? REQ : RES); + log_msg_close(f, msg); +} + +static void log_msg_close(FILE *f, msgpack_object msg) +{ + msgpack_object_print(f, msg); + fputc('\n', f); + fflush(f); + fclose(f); +} +#endif |