diff options
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 501 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 7 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel_defs.h | 36 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 86 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.h | 7 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/server.c | 55 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/server.h | 2 |
7 files changed, 329 insertions, 365 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index de6167c7fc..32781cf4d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -1,3 +1,6 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + #include <stdbool.h> #include <string.h> #include <inttypes.h> @@ -8,6 +11,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" @@ -26,56 +30,13 @@ #include "nvim/log.h" #include "nvim/misc1.h" #include "nvim/lib/kvec.h" - -#define CHANNEL_BUFFER_SIZE 0xffff +#include "nvim/os/input.h" #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - size_t pending_requests; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; - Process *proc; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - kvec_t(WBuffer *) delayed_notifications; - MultiQueue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -83,91 +44,64 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { - channels = pmap_new(uint64_t)(); + ch_before_blocking_events = multiqueue_new_child(main_loop.events); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +void rpc_start(Channel *channel) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); - incref(channel); // process channels are only closed by the exit_cb - channel->data.proc = proc; + channel_incref(channel); + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + kv_init(rpc->call_stack); - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack, channel); + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - return channel->id; + rstream_start(out, receive_msgpack, channel); + } } -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) + +static Channel *find_rpc_channel(uint64_t id) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - socket_watcher_accept(watcher, &channel->data.stream); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack, channel); + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; + } + return chan; } -/// Sends event/arguments to channel +/// Publishes an event to a channel. /// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments +/// @param id Channel id. 0 means "broadcast to all subscribed channels" +/// @param name Event name (application-defined) +/// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } if (channel) { - if (channel->pending_requests) { - // Pending request, queue the notification for later sending. - String method = cstr_as_string(name); - WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1); - kv_push(channel->delayed_notifications, buffer); - } else { - send_event(channel, name, args); - } + send_event(channel, name, args); } else { - // TODO(tarruda): Implement event broadcasting in vimscript broadcast_event(name, args); } @@ -181,35 +115,35 @@ bool channel_send_event(uint64_t id, char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); + if (!(channel = find_rpc_channel(id))) { + api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); - channel->pending_requests++; + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); - channel->pending_requests--; + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); + api_set_error(err, kErrorTypeException, "%s", + frame.result.data.string.data); } else if (frame.result.type == kObjectTypeArray) { // Should be an error in the form [type, message] Array array = frame.result.data.array; @@ -217,24 +151,19 @@ Object channel_send_call(uint64_t id, && (array.items[0].data.integer == kErrorTypeException || array.items[0].data.integer == kErrorTypeValidation) && array.items[1].type == kObjectTypeString) { - err->type = (ErrorType) array.items[0].data.integer; - xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg)); - err->set = true; + api_set_error(err, (ErrorType)array.items[0].data.integer, "%s", + array.items[1].data.string.data); } else { - api_set_error(err, Exception, "%s", "unknown error"); + api_set_error(err, kErrorTypeException, "%s", "unknown error"); } } else { - api_set_error(err, Exception, "%s", "unknown error"); + api_set_error(err, kErrorTypeException, "%s", "unknown error"); } api_free_object(frame.result); } - if (!channel->pending_requests) { - send_delayed_notifications(channel); - } - - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -243,11 +172,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -258,91 +187,63 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } unsubscribe(channel, event); } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - return false; - } - - close_channel(channel); - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); - incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, parse_msgpack, channel); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); -} - -void channel_process_exit(uint64_t id, int status) -{ - Channel *channel = pmap_get(uint64_t)(channels, id); - - channel->closed = true; - decref(channel); -} - -static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, - bool eof) +static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, + void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } size_t count = rbuffer_size(rbuf); - DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream); + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); + parse_msgpack(channel); + +end: + channel_decref(channel); +} + +static void parse_msgpack(Channel *channel) +{ msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -355,11 +256,11 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -368,7 +269,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -377,17 +278,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - decref(channel); } + static void handle_request(Channel *channel, msgpack_object *request) FUNC_ATTR_NONNULL_ALL { @@ -395,7 +294,7 @@ static void handle_request(Channel *channel, msgpack_object *request) Error error = ERROR_INIT; msgpack_rpc_validate(&request_id, request, &error); - if (error.set) { + if (ERROR_SET(&error)) { // Validation failed, send response with error if (channel_write(channel, serialize_response(channel->id, @@ -407,11 +306,11 @@ static void handle_request(Channel *channel, msgpack_object *request) snprintf(buf, sizeof(buf), "ch %" PRIu64 " sent an invalid message, closed.", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } + api_clear_error(&error); return; } - // Retrieve the request handler MsgpackRpcRequestHandler handler; msgpack_object *method = msgpack_rpc_method(request); @@ -430,16 +329,24 @@ static void handle_request(Channel *channel, msgpack_object *request) handler.async = true; } - RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); - event_data->channel = channel; - event_data->handler = handler; - event_data->args = args; - event_data->request_id = request_id; - incref(channel); + RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); + evdata->channel = channel; + evdata->handler = handler; + evdata->args = args; + evdata->request_id = request_id; + channel_incref(channel); if (handler.async) { - on_request_event((void **)&event_data); + bool is_get_mode = handler.fn == handle_nvim_get_mode; + + if (is_get_mode && !input_blocking()) { + // Defer the event to a special queue used by os/input.c. #6247 + multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata); + } else { + // Invoke immediately. + on_request_event((void **)&evdata); + } } else { - multiqueue_put(channel->events, on_request_event, 1, event_data); + multiqueue_put(channel->events, on_request_event, 1, evdata); } } @@ -465,64 +372,78 @@ static void on_request_event(void **argv) api_free_object(result); } api_free_array(args); - decref(channel); + channel_decref(channel); xfree(e); + api_clear_error(&error); } static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; - case kChannelTypeProc: - success = wstream_write(channel->data.proc->in, buffer); - break; - case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); - break; - default: - abort(); + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, ch %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } return success; } +static void internal_read_event(void **argv) +{ + Channel *channel = argv[0]; + WBuffer *buffer = argv[1]; + + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), + buffer->data, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); + + parse_msgpack(channel); + + channel_decref(channel); + wstream_release_wbuffer(buffer); +} + static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; - api_set_error(&e, Exception, "%s", err); + api_set_error(&e, kErrorTypeException, "%s", err); channel_write(channel, serialize_response(channel->id, id, &e, NIL, &out_buffer)); + api_clear_error(&e); } static void send_request(Channel *channel, uint64_t id, - char *name, + const char *name, Array args) { - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); channel_write(channel, serialize_request(channel->id, id, method, @@ -532,10 +453,10 @@ static void send_request(Channel *channel, } static void send_event(Channel *channel, - char *name, + const char *name, Array args) { - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); channel_write(channel, serialize_request(channel->id, 0, method, @@ -544,13 +465,14 @@ static void send_event(Channel *channel, 1)); } -static void broadcast_event(char *name, Array args) +static void broadcast_event(const char *name, Array args) { kvec_t(Channel *) subscribed = KV_INITIAL_VALUE; Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -560,7 +482,7 @@ static void broadcast_event(char *name, Array args) goto end; } - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); WBuffer *buffer = serialize_request(0, 0, method, @@ -570,11 +492,7 @@ static void broadcast_event(char *name, Array args) for (size_t i = 0; i < kv_size(subscribed); i++) { Channel *channel = kv_A(subscribed, i); - if (channel->pending_requests) { - kv_push(channel->delayed_notifications, buffer); - } else { - channel_write(channel, buffer); - } + channel_write(channel, buffer); } end: @@ -584,10 +502,11 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -597,89 +516,43 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/process and free the channel resources. -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; + channel_decref(channel); - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL, NULL); - break; - case kChannelTypeProc: - // Only close the rpc channel part, - // there could be an error message on the stderr stream - process_close_in(channel->data.proc); - process_close_out(channel->data.proc); - break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL, NULL); - stream_close(&channel->data.std.out, NULL, NULL); - multiqueue_put(main_loop.fast_events, exit_event, 1, channel); - return; - default: - abort(); + if (channel->streamtype == kChannelStreamStdio) { + multiqueue_put(main_loop.fast_events, exit_event, 0); } - - decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); - if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - kv_destroy(channel->delayed_notifications); - if (channel->type != kChannelTypeProc) { - multiqueue_free(channel->events); - } - xfree(channel); -} - -static void close_cb(Stream *stream, void *data) -{ - decref(data); -} - -static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = events ? events : multiqueue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = id > 0 ? id : next_chan_id++; - rv->pending_requests = 0; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - kv_init(rv->delayed_notifications); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); } static bool is_rpc_response(msgpack_object *obj) @@ -694,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -713,22 +589,23 @@ static void complete_call(msgpack_object *obj, Channel *channel) } } -static void call_set_error(Channel *channel, char *msg) +static void call_set_error(Channel *channel, char *msg, int loglevel) { - ELOG("RPC: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + LOG(loglevel, "RPC: %s", msg); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); } static WBuffer *serialize_request(uint64_t channel_id, uint64_t request_id, - String method, + const String method, Array args, msgpack_sbuffer *sbuffer, size_t refcount) @@ -765,28 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void send_delayed_notifications(Channel* channel) -{ - for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) { - WBuffer *buffer = kv_A(channel->delayed_notifications, i); - channel_write(channel, buffer); - } - - kv_size(channel->delayed_notifications) = 0; -} - -static void incref(Channel *channel) -{ - channel->refcount++; -} - -static void decref(Channel *channel) -{ - if (!(--channel->refcount)) { - free_channel(channel); - } -} - #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL #define REQ "[request] " #define RES "[response] " diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index 0d92976d02..9ff5abdc5f 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -8,9 +8,16 @@ #include "nvim/event/socket.h" #include "nvim/event/process.h" #include "nvim/vim.h" +#include "nvim/channel.h" #define METHOD_MAXLEN 512 +/// HACK: os/input.c drains this queue immediately before blocking for input. +/// Events on this queue are async-safe, but they need the resolved state +/// of os_inchar(), so they are processed "just-in-time". +MultiQueue *ch_before_blocking_events; + + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/channel.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/channel_defs.h b/src/nvim/msgpack_rpc/channel_defs.h new file mode 100644 index 0000000000..6d8362e8b7 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel_defs.h @@ -0,0 +1,36 @@ +#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H +#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H + +#include <stdbool.h> +#include <uv.h> +#include <msgpack.h> + +#include "nvim/api/private/defs.h" +#include "nvim/event/socket.h" +#include "nvim/event/process.h" +#include "nvim/vim.h" + +typedef struct Channel Channel; + +typedef struct { + uint64_t request_id; + bool returned, errored; + Object result; +} ChannelCallFrame; + +typedef struct { + Channel *channel; + MsgpackRpcRequestHandler handler; + Array args; + uint64_t request_id; +} RequestEvent; + +typedef struct { + PMap(cstr_t) *subscribed_events; + bool closed; + msgpack_unpacker *unpacker; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; +} RpcState; + +#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 5137b375f0..fecae11d45 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -1,3 +1,6 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + #include <stdint.h> #include <stdbool.h> #include <inttypes.h> @@ -21,12 +24,12 @@ static msgpack_zone zone; static msgpack_sbuffer sbuffer; #define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(const msgpack_object *const obj, \ - Integer *const arg) \ - FUNC_ATTR_NONNULL_ALL \ + static bool msgpack_rpc_to_##lt(const msgpack_object *const obj, \ + Integer *const arg) \ + FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT \ { \ if (obj->type != MSGPACK_OBJECT_EXT \ - || obj->via.ext.type != kObjectType##t) { \ + || obj->via.ext.type + EXT_OBJECT_TYPE_SHIFT != kObjectType##t) { \ return false; \ } \ \ @@ -45,13 +48,14 @@ static msgpack_sbuffer sbuffer; return true; \ } \ \ - void msgpack_rpc_from_##lt(Integer o, msgpack_packer *res) \ + static void msgpack_rpc_from_##lt(Integer o, msgpack_packer *res) \ FUNC_ATTR_NONNULL_ARG(2) \ { \ msgpack_packer pac; \ msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ msgpack_pack_int64(&pac, (handle_T)o); \ - msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ + msgpack_pack_ext(res, sbuffer.size, \ + kObjectType##t - EXT_OBJECT_TYPE_SHIFT); \ msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ msgpack_sbuffer_clear(&sbuffer); \ } @@ -73,7 +77,7 @@ typedef struct { size_t idx; } MPToAPIObjectStackItem; -/// Convert type used by msgpack parser to Neovim own API type +/// Convert type used by msgpack parser to Nvim API type. /// /// @param[in] obj Msgpack value to convert. /// @param[out] arg Location where result of conversion will be saved. @@ -84,7 +88,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) { bool ret = true; kvec_t(MPToAPIObjectStackItem) stack = KV_INITIAL_VALUE; - kv_push(stack, ((MPToAPIObjectStackItem) { obj, arg, false, 0 })); + kv_push(stack, ((MPToAPIObjectStackItem) { + .mobj = obj, + .aobj = arg, + .container = false, + .idx = 0, + })); while (ret && kv_size(stack)) { MPToAPIObjectStackItem cur = kv_last(stack); if (!cur.container) { @@ -114,10 +123,16 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) } break; } - case MSGPACK_OBJECT_FLOAT: { +#ifdef NVIM_MSGPACK_HAS_FLOAT32 + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: +#else + case MSGPACK_OBJECT_FLOAT: +#endif + { STATIC_ASSERT(sizeof(Float) == sizeof(cur.mobj->via.f64), "Msgpack floating-point size does not match API integer"); - *cur.aobj = FLOATING_OBJ(cur.mobj->via.f64); + *cur.aobj = FLOAT_OBJ(cur.mobj->via.f64); break; } #define STR_CASE(type, attr, obj, dest, conv) \ @@ -181,7 +196,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) case MSGPACK_OBJECT_BOOLEAN: case MSGPACK_OBJECT_POSITIVE_INTEGER: case MSGPACK_OBJECT_NEGATIVE_INTEGER: +#ifdef NVIM_MSGPACK_HAS_FLOAT32 + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: +#else case MSGPACK_OBJECT_FLOAT: +#endif case MSGPACK_OBJECT_EXT: case MSGPACK_OBJECT_MAP: case MSGPACK_OBJECT_ARRAY: { @@ -211,7 +231,7 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) break; } case MSGPACK_OBJECT_EXT: { - switch (cur.mobj->via.ext.type) { + switch ((ObjectType)(cur.mobj->via.ext.type + EXT_OBJECT_TYPE_SHIFT)) { case kObjectTypeBuffer: { cur.aobj->type = kObjectTypeBuffer; ret = msgpack_rpc_to_buffer(cur.mobj, &cur.aobj->data.integer); @@ -227,6 +247,15 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) ret = msgpack_rpc_to_tabpage(cur.mobj, &cur.aobj->data.integer); break; } + case kObjectTypeNil: + case kObjectTypeBoolean: + case kObjectTypeInteger: + case kObjectTypeFloat: + case kObjectTypeString: + case kObjectTypeArray: + case kObjectTypeDictionary: { + break; + } } break; } @@ -322,7 +351,7 @@ void msgpack_rpc_from_float(Float result, msgpack_packer *res) msgpack_pack_double(res, result); } -void msgpack_rpc_from_string(String result, msgpack_packer *res) +void msgpack_rpc_from_string(const String result, msgpack_packer *res) FUNC_ATTR_NONNULL_ARG(2) { msgpack_pack_str(res, result.size); @@ -337,7 +366,7 @@ typedef struct { size_t idx; } APIToMPObjectStackItem; -/// Convert type used by Neovim API to msgpack +/// Convert type used by Nvim API to msgpack type. /// /// @param[in] result Object to convert. /// @param[out] res Structure that defines where conversion results are saved. @@ -350,6 +379,9 @@ void msgpack_rpc_from_object(const Object result, msgpack_packer *const res) kv_push(stack, ((APIToMPObjectStackItem) { &result, false, 0 })); while (kv_size(stack)) { APIToMPObjectStackItem cur = kv_last(stack); + STATIC_ASSERT(kObjectTypeWindow == kObjectTypeBuffer + 1 + && kObjectTypeTabpage == kObjectTypeWindow + 1, + "Buffer, window and tabpage enum items are in order"); switch (cur.aobj->type) { case kObjectTypeNil: { msgpack_pack_nil(res); @@ -461,8 +493,7 @@ Object msgpack_rpc_handle_missing_method(uint64_t channel_id, Array args, Error *error) { - snprintf(error->msg, sizeof(error->msg), "Invalid method name"); - error->set = true; + api_set_error(error, kErrorTypeException, "Invalid method name"); return NIL; } @@ -471,14 +502,13 @@ Object msgpack_rpc_handle_invalid_arguments(uint64_t channel_id, Array args, Error *error) { - snprintf(error->msg, sizeof(error->msg), "Invalid method arguments"); - error->set = true; + api_set_error(error, kErrorTypeException, "Invalid method arguments"); return NIL; } /// Serializes a msgpack-rpc request or notification(id == 0) void msgpack_rpc_serialize_request(uint64_t request_id, - String method, + const String method, Array args, msgpack_packer *pac) FUNC_ATTR_NONNULL_ARG(4) @@ -505,7 +535,7 @@ void msgpack_rpc_serialize_response(uint64_t response_id, msgpack_pack_int(pac, 1); msgpack_pack_uint64(pac, response_id); - if (err->set) { + if (ERROR_SET(err)) { // error represented by a [type, message] array msgpack_pack_array(pac, 2); msgpack_rpc_from_integer(err->type, pac); @@ -558,49 +588,49 @@ void msgpack_rpc_validate(uint64_t *response_id, *response_id = NO_RESPONSE; // Validate the basic structure of the msgpack-rpc payload if (req->type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Message is not an array")); + api_set_error(err, kErrorTypeValidation, "Message is not an array"); return; } if (req->via.array.size == 0) { - api_set_error(err, Validation, _("Message is empty")); + api_set_error(err, kErrorTypeValidation, "Message is empty"); return; } if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Message type must be an integer")); + api_set_error(err, kErrorTypeValidation, "Message type must be an integer"); return; } uint64_t type = req->via.array.ptr[0].via.u64; if (type != kMessageTypeRequest && type != kMessageTypeNotification) { - api_set_error(err, Validation, _("Unknown message type")); + api_set_error(err, kErrorTypeValidation, "Unknown message type"); return; } if ((type == kMessageTypeRequest && req->via.array.size != 4) || (type == kMessageTypeNotification && req->via.array.size != 3)) { - api_set_error(err, Validation, _("Request array size should be 4 (request) " - "or 3 (notification)")); + api_set_error(err, kErrorTypeValidation, + "Request array size must be 4 (request) or 3 (notification)"); return; } if (type == kMessageTypeRequest) { msgpack_object *id_obj = msgpack_rpc_msg_id(req); if (!id_obj) { - api_set_error(err, Validation, _("ID must be a positive integer")); + api_set_error(err, kErrorTypeValidation, "ID must be a positive integer"); return; } *response_id = id_obj->via.u64; } if (!msgpack_rpc_method(req)) { - api_set_error(err, Validation, _("Method must be a string")); + api_set_error(err, kErrorTypeValidation, "Method must be a string"); return; } if (!msgpack_rpc_args(req)) { - api_set_error(err, Validation, _("Parameters must be an array")); + api_set_error(err, kErrorTypeValidation, "Parameters must be an array"); return; } } diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h index 7d9f114140..0e4cd1be6d 100644 --- a/src/nvim/msgpack_rpc/helpers.h +++ b/src/nvim/msgpack_rpc/helpers.h @@ -9,6 +9,13 @@ #include "nvim/event/wstream.h" #include "nvim/api/private/defs.h" +/// Value by which objects represented as EXT type are shifted +/// +/// Subtracted when packing, added when unpacking. Used to allow moving +/// buffer/window/tabpage block inside ObjectType enum. This block yet cannot be +/// split or reordered. +#define EXT_OBJECT_TYPE_SHIFT kObjectTypeBuffer + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/helpers.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index d7c2926a0f..9bf122f4db 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -1,3 +1,6 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + #include <assert.h> #include <stdlib.h> #include <string.h> @@ -94,39 +97,61 @@ char *server_address_new(void) #endif } -/// Starts listening for API calls on the TCP address or pipe path `endpoint`. +/// Check if this instance owns a pipe address. +/// The argument must already be resolved to an absolute path! +bool server_owns_pipe_address(const char *path) +{ + for (int i = 0; i < watchers.ga_len; i++) { + if (!strcmp(path, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { + return true; + } + } + return false; +} + +/// Starts listening for API calls. +/// /// The socket type is determined by parsing `endpoint`: If it's a valid IPv4 -/// address in 'ip[:port]' format, then it will be TCP socket. The port is -/// optional and if omitted defaults to NVIM_DEFAULT_TCP_PORT. Otherwise it -/// will be a unix socket or named pipe. +/// or IPv6 address in 'ip:[port]' format, then it will be a TCP socket. +/// Otherwise it will be a Unix socket or named pipe (Windows). +/// +/// If no port is given, a random one will be assigned. /// -/// @param endpoint Address of the server. Either a 'ip[:port]' string or an -/// arbitrary identifier (trimmed to 256 bytes) for the unix socket or -/// named pipe. +/// @param endpoint Address of the server. Either a 'ip:[port]' string or an +/// arbitrary identifier (trimmed to 256 bytes) for the Unix +/// socket or named pipe. /// @returns 0 on success, 1 on a regular error, and negative errno -/// on failure to bind or connect. +/// on failure to bind or listen. int server_start(const char *endpoint) { - if (endpoint == NULL) { - ELOG("Attempting to start server on NULL endpoint"); + if (endpoint == NULL || endpoint[0] == '\0') { + WLOG("Empty or NULL endpoint"); return 1; } SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher)); - socket_watcher_init(&main_loop, watcher, endpoint, NULL); + + int result = socket_watcher_init(&main_loop, watcher, endpoint); + if (result < 0) { + xfree(watcher); + return result; + } // Check if a watcher for the endpoint already exists for (int i = 0; i < watchers.ga_len; i++) { if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { ELOG("Already listening on %s", watcher->addr); + if (watcher->stream->type == UV_TCP) { + uv_freeaddrinfo(watcher->uv.tcp.addrinfo); + } socket_watcher_close(watcher, free_server); return 1; } } - int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb); + result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb); if (result < 0) { - ELOG("Failed to start server: %s", uv_strerror(result)); + WLOG("Failed to start server: %s", uv_strerror(result)); socket_watcher_close(watcher, free_server); return result; } @@ -155,6 +180,7 @@ int server_start(const char *endpoint) void server_stop(char *endpoint) { SocketWatcher *watcher; + bool watcher_found = false; char addr[ADDRESS_MAX_SIZE]; // Trim to `ADDRESS_MAX_SIZE` @@ -164,11 +190,12 @@ void server_stop(char *endpoint) for (; i < watchers.ga_len; i++) { watcher = ((SocketWatcher **)watchers.ga_data)[i]; if (strcmp(addr, watcher->addr) == 0) { + watcher_found = true; break; } } - if (i >= watchers.ga_len) { + if (!watcher_found) { ELOG("Not listening on %s", addr); return; } diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h index f1a6703938..5446e40e0b 100644 --- a/src/nvim/msgpack_rpc/server.h +++ b/src/nvim/msgpack_rpc/server.h @@ -1,6 +1,8 @@ #ifndef NVIM_MSGPACK_RPC_SERVER_H #define NVIM_MSGPACK_RPC_SERVER_H +#include <stdio.h> + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/server.h.generated.h" #endif |