diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 605 |
1 files changed, 235 insertions, 370 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5b249ee1c7..76fbe407c2 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -1,3 +1,6 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + #include <stdbool.h> #include <string.h> #include <inttypes.h> @@ -8,6 +11,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" @@ -19,68 +23,20 @@ #include "nvim/main.h" #include "nvim/ascii.h" #include "nvim/memory.h" +#include "nvim/eval.h" #include "nvim/os_unix.h" #include "nvim/message.h" #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" #include "nvim/lib/kvec.h" - -#define CHANNEL_BUFFER_SIZE 0xffff +#include "nvim/os/input.h" #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - size_t pending_requests; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; - struct { - LibuvProcess uvproc; - Stream in; - Stream out; - Stream err; - } process; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - kvec_t(WBuffer *) delayed_notifications; - Queue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static uint64_t next_id = 1; -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -88,104 +44,65 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { - channels = pmap_new(uint64_t)(); + ch_before_blocking_events = multiqueue_new_child(main_loop.events); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) + +void rpc_start(Channel *channel) { - if (!channels) { - return; - } + channel_incref(channel); + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + rpc->info = (Dictionary)ARRAY_DICT_INIT; + kv_init(rpc->call_stack); - Channel *channel; + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - map_foreach_value(channels, channel, { - close_channel(channel); - }); + rstream_start(out, receive_msgpack, channel); + } } -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(char **argv) -{ - Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = libuv_process_init(&main_loop, channel); - Process *proc = &channel->data.process.uvproc.process; - proc->argv = argv; - proc->in = &channel->data.process.in; - proc->out = &channel->data.process.out; - proc->err = &channel->data.process.err; - proc->cb = process_exit; - if (!process_spawn(proc)) { - loop_poll_events(&main_loop, 0); - decref(channel); - return 0; - } - - incref(channel); // process channels are only closed by the exit_cb - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack); - rstream_init(proc->err, 0); - rstream_start(proc->err, forward_stderr); - - return channel->id; -} - -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) + +static Channel *find_rpc_channel(uint64_t id) { - Channel *channel = register_channel(kChannelTypeSocket); - socket_watcher_accept(watcher, &channel->data.stream, channel); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack); + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; + } + return chan; } -/// Sends event/arguments to channel +/// Publishes an event to a channel. /// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments +/// @param id Channel id. 0 means "broadcast to all subscribed channels" +/// @param name Event name (application-defined) +/// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } if (channel) { - if (channel->pending_requests) { - // Pending request, queue the notification for later sending. - String method = cstr_as_string(name); - WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1); - kv_push(channel->delayed_notifications, buffer); - } else { - send_event(channel, name, args); - } + send_event(channel, name, args); } else { - // TODO(tarruda): Implement event broadcasting in vimscript broadcast_event(name, args); } @@ -199,35 +116,35 @@ bool channel_send_event(uint64_t id, char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); + if (!(channel = find_rpc_channel(id))) { + api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); - channel->pending_requests++; + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); - channel->pending_requests--; + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); + api_set_error(err, kErrorTypeException, "%s", + frame.result.data.string.data); } else if (frame.result.type == kObjectTypeArray) { // Should be an error in the form [type, message] Array array = frame.result.data.array; @@ -235,24 +152,19 @@ Object channel_send_call(uint64_t id, && (array.items[0].data.integer == kErrorTypeException || array.items[0].data.integer == kErrorTypeValidation) && array.items[1].type == kObjectTypeString) { - err->type = (ErrorType) array.items[0].data.integer; - xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg)); - err->set = true; + api_set_error(err, (ErrorType)array.items[0].data.integer, "%s", + array.items[1].data.string.data); } else { - api_set_error(err, Exception, "%s", "unknown error"); + api_set_error(err, kErrorTypeException, "%s", "unknown error"); } } else { - api_set_error(err, Exception, "%s", "unknown error"); + api_set_error(err, kErrorTypeException, "%s", "unknown error"); } api_free_object(frame.result); } - if (!channel->pending_requests) { - send_delayed_notifications(channel); - } - - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -261,11 +173,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -276,99 +188,63 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } unsubscribe(channel, event); } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - return false; - } - - close_channel(channel); - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = register_channel(kChannelTypeStdio); - incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, - channel); - rstream_start(&channel->data.std.in, parse_msgpack); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL); -} - -static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, - void *data, bool eof) -{ - while (rbuffer_size(rbuf)) { - char buf[256]; - size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); - buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); - } -} - -static void process_exit(Process *proc, int status, void *data) -{ - decref(data); -} - -static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, - bool eof) +static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, + void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { - close_channel(channel); - call_set_error(channel, "Channel was closed by the client"); + channel_close(channel->id, kChannelPartRpc, NULL); + char buf[256]; + snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", + channel->id); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } size_t count = rbuffer_size(rbuf); - DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)", - count, - stream); + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); + + parse_msgpack(channel); + +end: + channel_decref(channel); +} +static void parse_msgpack(Channel *channel) +{ msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -377,17 +253,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, complete_call(&unpacked.data, channel); } else { char buf[256]; - snprintf(buf, - sizeof(buf), - "Channel %" PRIu64 " returned a response that doesn't have " - "a matching request id. Ensure the client is properly " - "synchronized", + snprintf(buf, sizeof(buf), + "ch %" PRIu64 " returned a response with an unknown request " + "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -396,7 +270,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -405,15 +279,12 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - decref(channel); } static void handle_request(Channel *channel, msgpack_object *request) @@ -423,7 +294,7 @@ static void handle_request(Channel *channel, msgpack_object *request) Error error = ERROR_INIT; msgpack_rpc_validate(&request_id, request, &error); - if (error.set) { + if (ERROR_SET(&error)) { // Validation failed, send response with error if (channel_write(channel, serialize_response(channel->id, @@ -433,41 +304,53 @@ static void handle_request(Channel *channel, msgpack_object *request) &out_buffer))) { char buf[256]; snprintf(buf, sizeof(buf), - "Channel %" PRIu64 " sent an invalid message, closed.", + "ch %" PRIu64 " sent an invalid message, closed.", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } + api_clear_error(&error); return; } - // Retrieve the request handler MsgpackRpcRequestHandler handler; msgpack_object *method = msgpack_rpc_method(request); + handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, + method->via.bin.size, + &error); - if (method) { - handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, - method->via.bin.size); - } else { - handler.fn = msgpack_rpc_handle_missing_method; - handler.async = true; + // check method arguments + Array args = ARRAY_DICT_INIT; + if (!ERROR_SET(&error) + && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { + api_set_error(&error, kErrorTypeException, "Invalid method arguments"); } - Array args = ARRAY_DICT_INIT; - if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { - handler.fn = msgpack_rpc_handle_invalid_arguments; - handler.async = true; + if (ERROR_SET(&error)) { + send_error(channel, request_id, error.msg); + api_clear_error(&error); + api_free_array(args); + return; } - RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); - event_data->channel = channel; - event_data->handler = handler; - event_data->args = args; - event_data->request_id = request_id; - incref(channel); + RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); + evdata->channel = channel; + evdata->handler = handler; + evdata->args = args; + evdata->request_id = request_id; + channel_incref(channel); if (handler.async) { - on_request_event((void **)&event_data); + bool is_get_mode = handler.fn == handle_nvim_get_mode; + + if (is_get_mode && !input_blocking()) { + // Defer the event to a special queue used by os/input.c. #6247 + multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata); + } else { + // Invoke immediately. + on_request_event((void **)&evdata); + } } else { - queue_put(channel->events, on_request_event, 1, event_data); + multiqueue_put(channel->events, on_request_event, 1, evdata); + DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr); } } @@ -479,7 +362,7 @@ static void on_request_event(void **argv) Array args = e->args; uint64_t request_id = e->request_id; Error error = ERROR_INIT; - Object result = handler.fn(channel->id, request_id, args, &error); + Object result = handler.fn(channel->id, args, &error); if (request_id != NO_RESPONSE) { // send the response msgpack_packer response; @@ -492,66 +375,79 @@ static void on_request_event(void **argv) } else { api_free_object(result); } - // All arguments were freed already, but we still need to free the array - xfree(args.items); - decref(channel); + api_free_array(args); + channel_decref(channel); xfree(e); + api_clear_error(&error); } static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; - case kChannelTypeProc: - success = wstream_write(&channel->data.process.in, buffer); - break; - case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); - break; - default: - abort(); + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } return success; } +static void internal_read_event(void **argv) +{ + Channel *channel = argv[0]; + WBuffer *buffer = argv[1]; + + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), + buffer->data, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); + + parse_msgpack(channel); + + channel_decref(channel); + wstream_release_wbuffer(buffer); +} + static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; - api_set_error(&e, Exception, "%s", err); + api_set_error(&e, kErrorTypeException, "%s", err); channel_write(channel, serialize_response(channel->id, id, &e, NIL, &out_buffer)); + api_clear_error(&e); } static void send_request(Channel *channel, uint64_t id, - char *name, + const char *name, Array args) { - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); channel_write(channel, serialize_request(channel->id, id, method, @@ -561,10 +457,10 @@ static void send_request(Channel *channel, } static void send_event(Channel *channel, - char *name, + const char *name, Array args) { - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); channel_write(channel, serialize_request(channel->id, 0, method, @@ -573,13 +469,14 @@ static void send_event(Channel *channel, 1)); } -static void broadcast_event(char *name, Array args) +static void broadcast_event(const char *name, Array args) { kvec_t(Channel *) subscribed = KV_INITIAL_VALUE; Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -589,7 +486,7 @@ static void broadcast_event(char *name, Array args) goto end; } - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); WBuffer *buffer = serialize_request(0, 0, method, @@ -598,12 +495,8 @@ static void broadcast_event(char *name, Array args) kv_size(subscribed)); for (size_t i = 0; i < kv_size(subscribed); i++) { - Channel *channel = kv_A(subscribed, i); - if (channel->pending_requests) { - kv_push(channel->delayed_notifications, buffer); - } else { - channel_write(channel, buffer); - } + Channel *c = kv_A(subscribed, i); + channel_write(c, buffer); } end: @@ -613,10 +506,16 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + if (!event_string) { + WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'", + channel->id, event); + return; + } + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -626,85 +525,44 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/process and free the channel resources. -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; + channel_decref(channel); - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL); - break; - case kChannelTypeProc: - if (!channel->data.process.uvproc.process.closed) { - process_stop(&channel->data.process.uvproc.process); - } - break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL); - stream_close(&channel->data.std.out, NULL); - queue_put(main_loop.fast_events, exit_event, 1, channel); - return; - default: - abort(); + if (channel->streamtype == kChannelStreamStdio) { + multiqueue_put(main_loop.fast_events, exit_event, 0); } - - decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); - if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - kv_destroy(channel->delayed_notifications); - queue_free(channel->events); - xfree(channel); -} - -static void close_cb(Stream *stream, void *data) -{ - decref(data); -} - -static Channel *register_channel(ChannelType type) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = queue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = next_id++; - rv->pending_requests = 0; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - kv_init(rv->delayed_notifications); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); + api_free_dictionary(channel->rpc.info); } static bool is_rpc_response(msgpack_object *obj) @@ -719,15 +577,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -738,22 +599,23 @@ static void complete_call(msgpack_object *obj, Channel *channel) } } -static void call_set_error(Channel *channel, char *msg) +static void call_set_error(Channel *channel, char *msg, int loglevel) { - ELOG("msgpack-rpc: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + LOG(loglevel, "RPC: %s", msg); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); } static WBuffer *serialize_request(uint64_t channel_id, uint64_t request_id, - String method, + const String method, Array args, msgpack_sbuffer *sbuffer, size_t refcount) @@ -779,7 +641,16 @@ static WBuffer *serialize_response(uint64_t channel_id, { msgpack_packer pac; msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_rpc_serialize_response(response_id, err, arg, &pac); + if (ERROR_SET(err) && response_id == NO_RESPONSE) { + Array args = ARRAY_DICT_INIT; + ADD(args, INTEGER_OBJ(err->type)); + ADD(args, STRING_OBJ(cstr_to_string(err->msg))); + msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"), + args, &pac); + api_free_array(args); + } else { + msgpack_rpc_serialize_response(response_id, err, arg, &pac); + } log_server_msg(channel_id, sbuffer); WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), sbuffer->size, @@ -790,33 +661,28 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void send_delayed_notifications(Channel* channel) +void rpc_set_client_info(uint64_t id, Dictionary info) { - for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) { - WBuffer *buffer = kv_A(channel->delayed_notifications, i); - channel_write(channel, buffer); + Channel *chan = find_rpc_channel(id); + if (!chan) { + abort(); } - kv_size(channel->delayed_notifications) = 0; -} - -static void incref(Channel *channel) -{ - channel->refcount++; + api_free_dictionary(chan->rpc.info); + chan->rpc.info = info; + channel_info_changed(chan, false); } -static void decref(Channel *channel) +Dictionary rpc_client_info(Channel *chan) { - if (!(--channel->refcount)) { - free_channel(channel); - } + return copy_dictionary(chan->rpc.info); } #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL -#define REQ "[request] " -#define RES "[response] " -#define NOT "[notification] " -#define ERR "[error] " +#define REQ "[request] " +#define RES "[response] " +#define NOT "[notify] " +#define ERR "[error] " // Cannot define array with negative offsets, so this one is needed to be added // to MSGPACK_UNPACK_\* values. @@ -834,7 +700,7 @@ static void log_server_msg(uint64_t channel_id, { msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); - DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id); + DLOGN("RPC ->ch %" PRIu64 ": ", channel_id); const msgpack_unpack_return result = msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); switch (result) { @@ -871,7 +737,7 @@ static void log_client_msg(uint64_t channel_id, bool is_request, msgpack_object msg) { - DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id); + DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); log_lock(); FILE *f = open_log_file(); fprintf(f, is_request ? REQ : RES); @@ -887,4 +753,3 @@ static void log_msg_close(FILE *f, msgpack_object msg) log_unlock(); } #endif - |