diff options
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 155 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 66 |
2 files changed, 152 insertions, 69 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 94605c37e9..3325b294dd 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -31,6 +31,11 @@ #define CHANNEL_BUFFER_SIZE 0xffff +#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL +#define log_client_msg(...) +#define log_server_msg(...) +#endif + typedef struct { uint64_t request_id; bool returned, errored; @@ -225,6 +230,10 @@ Object channel_send_call(uint64_t id, return NIL; } + if (channel->closed && !kv_size(channel->call_stack)) { + free_channel(channel); + } + return frame.result; } @@ -328,7 +337,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) Channel *channel = data; if (eof) { - goto end; + close_channel(channel); } size_t count = rstream_pending(rstream); @@ -348,7 +357,10 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Deserialize everything we can. while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == MSGPACK_UNPACK_SUCCESS) { - if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + bool is_response = is_rpc_response(&unpacked.data); + log_client_msg(channel->id, !is_response, unpacked.data); + + if (kv_size(channel->call_stack) && is_response) { if (is_valid_rpc_response(&unpacked.data, channel)) { complete_call(&unpacked.data, channel); } else { @@ -363,7 +375,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -386,14 +398,6 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - if (eof && !channel->is_job && !kv_size(channel->call_stack)) { - // The free_channel call is deferred for jobs because it's possible that - // job_stderr will called after this. For non-job channels, this is the - // last callback so it must be freed now. - free_channel(channel); - } } static void handle_request(Channel *channel, msgpack_object *request) @@ -406,7 +410,11 @@ static void handle_request(Channel *channel, msgpack_object *request) if (error.set) { // Validation failed, send response with error channel_write(channel, - serialize_response(request_id, &error, NIL, &out_buffer)); + serialize_response(channel->id, + request_id, + &error, + NIL, + &out_buffer)); return; } @@ -459,14 +467,13 @@ static void call_request_handler(Channel *channel, // send the response msgpack_packer response; msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); - - if (error.set) { - channel_write(channel, - serialize_response(request_id, &error, NIL, &out_buffer)); - } - - channel_write(channel, - serialize_response(request_id, &error, result, &out_buffer)); + channel_write(channel, serialize_response(channel->id, + request_id, + &error, + result, + &out_buffer)); + // All arguments were freed already, but we still need to free the array + free(args.items); } static bool channel_write(Channel *channel, WBuffer *buffer) @@ -497,7 +504,11 @@ static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; api_set_error(&e, Exception, "%s", err); - channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); + channel_write(channel, serialize_response(channel->id, + id, + &e, + NIL, + &out_buffer)); } static void send_request(Channel *channel, @@ -506,7 +517,12 @@ static void send_request(Channel *channel, Array args) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); + channel_write(channel, serialize_request(channel->id, + id, + method, + args, + &out_buffer, + 1)); } static void send_event(Channel *channel, @@ -514,7 +530,12 @@ static void send_event(Channel *channel, Array args) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); + channel_write(channel, serialize_request(channel->id, + 0, + method, + args, + &out_buffer, + 1)); } static void broadcast_event(char *name, Array args) @@ -536,6 +557,7 @@ static void broadcast_event(char *name, Array args) String method = {.size = strlen(name), .data = name}; WBuffer *buffer = serialize_request(0, + 0, method, args, &out_buffer, @@ -569,6 +591,10 @@ static void unsubscribe(Channel *channel, char *event) /// free_channel later. static void close_channel(Channel *channel) { + if (channel->closed) { + return; + } + channel->closed = true; if (channel->is_job) { if (channel->data.job) { @@ -577,10 +603,10 @@ static void close_channel(Channel *channel) } else { rstream_free(channel->data.streams.read); wstream_free(channel->data.streams.write); - if (channel->data.streams.uv) { - uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; + if (handle) { + uv_close(handle, close_cb); } else { - // When the stdin channel closes, it's time to go mch_exit(0); } } @@ -663,3 +689,80 @@ static void call_set_error(Channel *channel, char *msg) close_channel(channel); } + +static WBuffer *serialize_request(uint64_t channel_id, + uint64_t request_id, + String method, + Array args, + msgpack_sbuffer *sbuffer, + size_t refcount) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_rpc_serialize_request(request_id, method, args, &pac); + log_server_msg(channel_id, sbuffer); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + refcount, + free); + msgpack_sbuffer_clear(sbuffer); + api_free_array(args); + return rv; +} + +static WBuffer *serialize_response(uint64_t channel_id, + uint64_t response_id, + Error *err, + Object arg, + msgpack_sbuffer *sbuffer) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_rpc_serialize_response(response_id, err, arg, &pac); + log_server_msg(channel_id, sbuffer); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + 1, // responses only go though 1 channel + free); + msgpack_sbuffer_clear(sbuffer); + api_free_object(arg); + return rv; +} + +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#define REQ "[response] " +#define RES "[response] " +#define NOT "[notification] " + +static void log_server_msg(uint64_t channel_id, + msgpack_sbuffer *packed) +{ + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); + uint64_t type = unpacked.data.via.array.ptr[0].via.u64; + DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id); + FILE *f = open_log_file(); + fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); + log_msg_close(f, unpacked.data); + msgpack_unpacked_destroy(&unpacked); +} + +static void log_client_msg(uint64_t channel_id, + bool is_request, + msgpack_object msg) +{ + DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id); + FILE *f = open_log_file(); + fprintf(f, is_request ? REQ : RES); + log_msg_close(f, msg); +} + +static void log_msg_close(FILE *f, msgpack_object msg) +{ + msgpack_object_print(f, msg); + fputc('\n', f); + fflush(f); + fclose(f); +} +#endif diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 6be221b912..4414aadb15 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -323,68 +323,48 @@ Object msgpack_rpc_handle_missing_method(uint64_t channel_id, } /// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t request_id, - String method, - Array args, - msgpack_sbuffer *sbuffer, - size_t refcount) +void msgpack_rpc_serialize_request(uint64_t request_id, + String method, + Array args, + msgpack_packer *pac) FUNC_ATTR_NONNULL_ARG(4) { - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, request_id ? 4 : 3); - msgpack_pack_int(&pac, request_id ? 0 : 2); + msgpack_pack_array(pac, request_id ? 4 : 3); + msgpack_pack_int(pac, request_id ? 0 : 2); if (request_id) { - msgpack_pack_uint64(&pac, request_id); + msgpack_pack_uint64(pac, request_id); } - msgpack_pack_bin(&pac, method.size); - msgpack_pack_bin_body(&pac, method.data, method.size); - msgpack_rpc_from_array(args, &pac); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - free); - api_free_array(args); - msgpack_sbuffer_clear(sbuffer); - return rv; + msgpack_pack_bin(pac, method.size); + msgpack_pack_bin_body(pac, method.data, method.size); + msgpack_rpc_from_array(args, pac); } /// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t response_id, - Error *err, - Object arg, - msgpack_sbuffer *sbuffer) +void msgpack_rpc_serialize_response(uint64_t response_id, + Error *err, + Object arg, + msgpack_packer *pac) FUNC_ATTR_NONNULL_ARG(2, 4) { - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, 4); - msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, response_id); + msgpack_pack_array(pac, 4); + msgpack_pack_int(pac, 1); + msgpack_pack_uint64(pac, response_id); if (err->set) { // error represented by a [type, message] array - msgpack_pack_array(&pac, 2); - msgpack_rpc_from_integer(err->type, &pac); - msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); + msgpack_pack_array(pac, 2); + msgpack_rpc_from_integer(err->type, pac); + msgpack_rpc_from_string(cstr_as_string(err->msg), pac); // Nil result - msgpack_pack_nil(&pac); + msgpack_pack_nil(pac); } else { // Nil error - msgpack_pack_nil(&pac); + msgpack_pack_nil(pac); // Return value - msgpack_rpc_from_object(arg, &pac); + msgpack_rpc_from_object(arg, pac); } - - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - free); - api_free_object(arg); - msgpack_sbuffer_clear(sbuffer); - return rv; } void msgpack_rpc_validate(uint64_t *response_id, |