diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 447 |
1 files changed, 110 insertions, 337 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 68ac35bc4e..32781cf4d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -11,8 +11,8 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" -#include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" #include "nvim/event/rstream.h" @@ -29,60 +29,14 @@ #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" -#include "nvim/path.h" #include "nvim/lib/kvec.h" #include "nvim/os/input.h" -#define CHANNEL_BUFFER_SIZE 0xffff - #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio, - kChannelTypeInternal -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - size_t pending_requests; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; - Process *proc; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - kvec_t(WBuffer *) delayed_notifications; - MultiQueue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -90,121 +44,64 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { ch_before_blocking_events = multiqueue_new_child(main_loop.events); - channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +void rpc_start(Channel *channel) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); - incref(channel); // process channels are only closed by the exit_cb - channel->data.proc = proc; + channel_incref(channel); + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + kv_init(rpc->call_stack); - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, receive_msgpack, channel); + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - return channel->id; + rstream_start(out, receive_msgpack, channel); + } } -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) -{ - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - socket_watcher_accept(watcher, &channel->data.stream); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); -} -uint64_t channel_connect(bool tcp, const char *address, - int timeout, const char **error) +static Channel *find_rpc_channel(uint64_t id) { - if (!tcp) { - char *path = fix_fname(address); - if (server_owns_pipe_address(path)) { - // avoid deadlock - xfree(path); - return channel_create_internal(); - } - xfree(path); - } - - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - if (!socket_connect(&main_loop, &channel->data.stream, - tcp, address, timeout, error)) { - decref(channel); - return 0; + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; } - - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); - return channel->id; + return chan; } -/// Sends event/arguments to channel +/// Publishes an event to a channel. /// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments +/// @param id Channel id. 0 means "broadcast to all subscribed channels" +/// @param name Event name (application-defined) +/// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, const char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } if (channel) { - if (channel->pending_requests) { - // Pending request, queue the notification for later sending. - const String method = cstr_as_string((char *)name); - WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1); - kv_push(channel->delayed_notifications, buffer); - } else { - send_event(channel, name, args); - } + send_event(channel, name, args); } else { - // TODO(tarruda): Implement event broadcasting in vimscript broadcast_event(name, args); } @@ -218,31 +115,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - const char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); - channel->pending_requests++; + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); - channel->pending_requests--; + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { @@ -267,11 +163,7 @@ Object channel_send_call(uint64_t id, api_free_object(frame.result); } - if (!channel->pending_requests) { - send_delayed_notifications(channel); - } - - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -280,11 +172,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -295,97 +187,52 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } unsubscribe(channel, event); } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - return false; - } - - close_channel(channel); - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); - incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, receive_msgpack, channel); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); -} - -/// Creates a loopback channel. This is used to avoid deadlock -/// when an instance connects to its own named pipe. -uint64_t channel_create_internal(void) -{ - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); - incref(channel); // internal channel lives until process exit - return channel->id; -} - -void channel_process_exit(uint64_t id, int status) -{ - Channel *channel = pmap_get(uint64_t)(channels, id); - - channel->closed = true; - decref(channel); -} - static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, WARNING_LOG_LEVEL); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } size_t count = rbuffer_size(rbuf); - DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream); + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); parse_msgpack(channel); end: - decref(channel); + channel_decref(channel); } static void parse_msgpack(Channel *channel) @@ -395,8 +242,8 @@ static void parse_msgpack(Channel *channel) msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -422,7 +269,7 @@ static void parse_msgpack(Channel *channel) if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -431,8 +278,8 @@ static void parse_msgpack(Channel *channel) // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); @@ -487,7 +334,7 @@ static void handle_request(Channel *channel, msgpack_object *request) evdata->handler = handler; evdata->args = args; evdata->request_id = request_id; - incref(channel); + channel_incref(channel); if (handler.async) { bool is_get_mode = handler.fn == handle_nvim_get_mode; @@ -525,44 +372,37 @@ static void on_request_event(void **argv) api_free_object(result); } api_free_array(args); - decref(channel); + channel_decref(channel); xfree(e); api_clear_error(&error); } static bool channel_write(Channel *channel, WBuffer *buffer) { - bool success = false; + bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; - case kChannelTypeProc: - success = wstream_write(channel->data.proc->in, buffer); - break; - case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); - break; - case kChannelTypeInternal: - incref(channel); - CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); - success = true; - break; + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, ch %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); call_set_error(channel, buf, ERROR_LOG_LEVEL); } @@ -575,14 +415,14 @@ static void internal_read_event(void **argv) Channel *channel = argv[0]; WBuffer *buffer = argv[1]; - msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->unpacker), + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); parse_msgpack(channel); - decref(channel); + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -631,7 +471,8 @@ static void broadcast_event(const char *name, Array args) Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -651,11 +492,7 @@ static void broadcast_event(const char *name, Array args) for (size_t i = 0; i < kv_size(subscribed); i++) { Channel *channel = kv_A(subscribed, i); - if (channel->pending_requests) { - kv_push(channel->delayed_notifications, buffer); - } else { - channel_write(channel, buffer); - } + channel_write(channel, buffer); } end: @@ -665,10 +502,11 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -678,90 +516,43 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/process and free the channel resources. -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; + channel_decref(channel); - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL, NULL); - break; - case kChannelTypeProc: - // Only close the rpc channel part, - // there could be an error message on the stderr stream - process_close_in(channel->data.proc); - process_close_out(channel->data.proc); - break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL, NULL); - stream_close(&channel->data.std.out, NULL, NULL); - multiqueue_put(main_loop.fast_events, exit_event, 1, channel); - return; - case kChannelTypeInternal: - // nothing to free. - break; + if (channel->streamtype == kChannelStreamStdio) { + multiqueue_put(main_loop.fast_events, exit_event, 0); } - - decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); - if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - kv_destroy(channel->delayed_notifications); - if (channel->type != kChannelTypeProc) { - multiqueue_free(channel->events); - } - xfree(channel); -} - -static void close_cb(Stream *stream, void *data) -{ - decref(data); -} - -static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = events ? events : multiqueue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = id > 0 ? id : next_chan_id++; - rv->pending_requests = 0; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - kv_init(rv->delayed_notifications); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); } static bool is_rpc_response(msgpack_object *obj) @@ -776,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -798,14 +592,15 @@ static void complete_call(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); } static WBuffer *serialize_request(uint64_t channel_id, @@ -847,28 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void send_delayed_notifications(Channel* channel) -{ - for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) { - WBuffer *buffer = kv_A(channel->delayed_notifications, i); - channel_write(channel, buffer); - } - - kv_size(channel->delayed_notifications) = 0; -} - -static void incref(Channel *channel) -{ - channel->refcount++; -} - -static void decref(Channel *channel) -{ - if (!(--channel->refcount)) { - free_channel(channel); - } -} - #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL #define REQ "[request] " #define RES "[response] " |