aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c605
1 files changed, 235 insertions, 370 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 5b249ee1c7..76fbe407c2 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <stdbool.h>
#include <string.h>
#include <inttypes.h>
@@ -8,6 +11,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
+#include "nvim/channel.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
@@ -19,68 +23,20 @@
#include "nvim/main.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
+#include "nvim/eval.h"
#include "nvim/os_unix.h"
#include "nvim/message.h"
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
#include "nvim/lib/kvec.h"
-
-#define CHANNEL_BUFFER_SIZE 0xffff
+#include "nvim/os/input.h"
#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
#define log_client_msg(...)
#define log_server_msg(...)
#endif
-typedef enum {
- kChannelTypeSocket,
- kChannelTypeProc,
- kChannelTypeStdio
-} ChannelType;
-
-typedef struct {
- uint64_t request_id;
- bool returned, errored;
- Object result;
-} ChannelCallFrame;
-
-typedef struct {
- uint64_t id;
- size_t refcount;
- size_t pending_requests;
- PMap(cstr_t) *subscribed_events;
- bool closed;
- ChannelType type;
- msgpack_unpacker *unpacker;
- union {
- Stream stream;
- struct {
- LibuvProcess uvproc;
- Stream in;
- Stream out;
- Stream err;
- } process;
- struct {
- Stream in;
- Stream out;
- } std;
- } data;
- uint64_t next_request_id;
- kvec_t(ChannelCallFrame *) call_stack;
- kvec_t(WBuffer *) delayed_notifications;
- Queue *events;
-} Channel;
-
-typedef struct {
- Channel *channel;
- MsgpackRpcRequestHandler handler;
- Array args;
- uint64_t request_id;
-} RequestEvent;
-
-static uint64_t next_id = 1;
-static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@@ -88,104 +44,65 @@ static msgpack_sbuffer out_buffer;
# include "msgpack_rpc/channel.c.generated.h"
#endif
-/// Initializes the module
-void channel_init(void)
+void rpc_init(void)
{
- channels = pmap_new(uint64_t)();
+ ch_before_blocking_events = multiqueue_new_child(main_loop.events);
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
- remote_ui_init();
}
-/// Teardown the module
-void channel_teardown(void)
+
+void rpc_start(Channel *channel)
{
- if (!channels) {
- return;
- }
+ channel_incref(channel);
+ channel->is_rpc = true;
+ RpcState *rpc = &channel->rpc;
+ rpc->closed = false;
+ rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ rpc->subscribed_events = pmap_new(cstr_t)();
+ rpc->next_request_id = 1;
+ rpc->info = (Dictionary)ARRAY_DICT_INIT;
+ kv_init(rpc->call_stack);
- Channel *channel;
+ if (channel->streamtype != kChannelStreamInternal) {
+ Stream *out = channel_outstream(channel);
+#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
+ Stream *in = channel_instream(channel);
+ DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out);
+#endif
- map_foreach_value(channels, channel, {
- close_channel(channel);
- });
+ rstream_start(out, receive_msgpack, channel);
+ }
}
-/// Creates an API channel by starting a process and connecting to its
-/// stdin/stdout. stderr is forwarded to the editor error stream.
-///
-/// @param argv The argument vector for the process. [consumed]
-/// @return The channel id (> 0), on success.
-/// 0, on error.
-uint64_t channel_from_process(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeProc);
- channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
- Process *proc = &channel->data.process.uvproc.process;
- proc->argv = argv;
- proc->in = &channel->data.process.in;
- proc->out = &channel->data.process.out;
- proc->err = &channel->data.process.err;
- proc->cb = process_exit;
- if (!process_spawn(proc)) {
- loop_poll_events(&main_loop, 0);
- decref(channel);
- return 0;
- }
-
- incref(channel); // process channels are only closed by the exit_cb
- wstream_init(proc->in, 0);
- rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack);
- rstream_init(proc->err, 0);
- rstream_start(proc->err, forward_stderr);
-
- return channel->id;
-}
-
-/// Creates an API channel from a tcp/pipe socket connection
-///
-/// @param watcher The SocketWatcher ready to accept the connection
-void channel_from_connection(SocketWatcher *watcher)
+
+static Channel *find_rpc_channel(uint64_t id)
{
- Channel *channel = register_channel(kChannelTypeSocket);
- socket_watcher_accept(watcher, &channel->data.stream, channel);
- incref(channel); // close channel only after the stream is closed
- channel->data.stream.internal_close_cb = close_cb;
- channel->data.stream.internal_data = channel;
- wstream_init(&channel->data.stream, 0);
- rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack);
+ Channel *chan = find_channel(id);
+ if (!chan || !chan->is_rpc || chan->rpc.closed) {
+ return NULL;
+ }
+ return chan;
}
-/// Sends event/arguments to channel
+/// Publishes an event to a channel.
///
-/// @param id The channel id. If 0, the event will be sent to all
-/// channels that have subscribed to the event type
-/// @param name The event name, an arbitrary string
-/// @param args Array with event arguments
+/// @param id Channel id. 0 means "broadcast to all subscribed channels"
+/// @param name Event name (application-defined)
+/// @param args Array of event arguments
/// @return True if the event was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *name, Array args)
+bool rpc_send_event(uint64_t id, const char *name, Array args)
{
Channel *channel = NULL;
- if (id && (!(channel = pmap_get(uint64_t)(channels, id))
- || channel->closed)) {
+ if (id && (!(channel = find_rpc_channel(id)))) {
api_free_array(args);
return false;
}
if (channel) {
- if (channel->pending_requests) {
- // Pending request, queue the notification for later sending.
- String method = cstr_as_string(name);
- WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
- kv_push(channel->delayed_notifications, buffer);
- } else {
- send_event(channel, name, args);
- }
+ send_event(channel, name, args);
} else {
- // TODO(tarruda): Implement event broadcasting in vimscript
broadcast_event(name, args);
}
@@ -199,35 +116,35 @@ bool channel_send_event(uint64_t id, char *name, Array args)
/// @param args Array with method arguments
/// @param[out] error True if the return value is an error
/// @return Whatever the remote method returned
-Object channel_send_call(uint64_t id,
- char *method_name,
- Array args,
- Error *err)
+Object rpc_send_call(uint64_t id,
+ const char *method_name,
+ Array args,
+ Error *err)
{
Channel *channel = NULL;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
- api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
+ if (!(channel = find_rpc_channel(id))) {
+ api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
api_free_array(args);
return NIL;
}
- incref(channel);
- uint64_t request_id = channel->next_request_id++;
+ channel_incref(channel);
+ RpcState *rpc = &channel->rpc;
+ uint64_t request_id = rpc->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, method_name, args);
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
- kv_push(channel->call_stack, &frame);
- channel->pending_requests++;
+ kv_push(rpc->call_stack, &frame);
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
- (void)kv_pop(channel->call_stack);
- channel->pending_requests--;
+ (void)kv_pop(rpc->call_stack);
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
- api_set_error(err, Exception, "%s", frame.result.data.string.data);
+ api_set_error(err, kErrorTypeException, "%s",
+ frame.result.data.string.data);
} else if (frame.result.type == kObjectTypeArray) {
// Should be an error in the form [type, message]
Array array = frame.result.data.array;
@@ -235,24 +152,19 @@ Object channel_send_call(uint64_t id,
&& (array.items[0].data.integer == kErrorTypeException
|| array.items[0].data.integer == kErrorTypeValidation)
&& array.items[1].type == kObjectTypeString) {
- err->type = (ErrorType) array.items[0].data.integer;
- xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg));
- err->set = true;
+ api_set_error(err, (ErrorType)array.items[0].data.integer, "%s",
+ array.items[1].data.string.data);
} else {
- api_set_error(err, Exception, "%s", "unknown error");
+ api_set_error(err, kErrorTypeException, "%s", "unknown error");
}
} else {
- api_set_error(err, Exception, "%s", "unknown error");
+ api_set_error(err, kErrorTypeException, "%s", "unknown error");
}
api_free_object(frame.result);
}
- if (!channel->pending_requests) {
- send_delayed_notifications(channel);
- }
-
- decref(channel);
+ channel_decref(channel);
return frame.errored ? NIL : frame.result;
}
@@ -261,11 +173,11 @@ Object channel_send_call(uint64_t id,
///
/// @param id The channel id
/// @param event The event type string
-void channel_subscribe(uint64_t id, char *event)
+void rpc_subscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
+ if (!(channel = find_rpc_channel(id))) {
abort();
}
@@ -276,99 +188,63 @@ void channel_subscribe(uint64_t id, char *event)
pmap_put(cstr_t)(event_strings, event_string, event_string);
}
- pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
+ pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
}
/// Unsubscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
-void channel_unsubscribe(uint64_t id, char *event)
+void rpc_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
+ if (!(channel = find_rpc_channel(id))) {
abort();
}
unsubscribe(channel, event);
}
-/// Closes a channel
-///
-/// @param id The channel id
-/// @return true if successful, false otherwise
-bool channel_close(uint64_t id)
-{
- Channel *channel;
-
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
- return false;
- }
-
- close_channel(channel);
- return true;
-}
-
-/// Creates an API channel from stdin/stdout. This is used when embedding
-/// Neovim
-void channel_from_stdio(void)
-{
- Channel *channel = register_channel(kChannelTypeStdio);
- incref(channel); // stdio channels are only closed on exit
- // read stream
- rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
- channel);
- rstream_start(&channel->data.std.in, parse_msgpack);
- // write stream
- wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL);
-}
-
-static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
- void *data, bool eof)
-{
- while (rbuffer_size(rbuf)) {
- char buf[256];
- size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
- buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
- }
-}
-
-static void process_exit(Process *proc, int status, void *data)
-{
- decref(data);
-}
-
-static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
- bool eof)
+static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
+ void *data, bool eof)
{
Channel *channel = data;
- incref(channel);
+ channel_incref(channel);
if (eof) {
- close_channel(channel);
- call_set_error(channel, "Channel was closed by the client");
+ channel_close(channel->id, kChannelPartRpc, NULL);
+ char buf[256];
+ snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
+ channel->id);
+ call_set_error(channel, buf, WARN_LOG_LEVEL);
goto end;
}
size_t count = rbuffer_size(rbuf);
- DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
- count,
- stream);
+ DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
+ channel->id, count, stream);
// Feed the unpacker with data
- msgpack_unpacker_reserve_buffer(channel->unpacker, count);
- rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
- msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+ msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
+ rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
+ msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
+
+ parse_msgpack(channel);
+
+end:
+ channel_decref(channel);
+}
+static void parse_msgpack(Channel *channel)
+{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
msgpack_unpack_return result;
// Deserialize everything we can.
- while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
- MSGPACK_UNPACK_SUCCESS) {
+ while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
+ MSGPACK_UNPACK_SUCCESS) {
bool is_response = is_rpc_response(&unpacked.data);
log_client_msg(channel->id, !is_response, unpacked.data);
@@ -377,17 +253,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
complete_call(&unpacked.data, channel);
} else {
char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Channel %" PRIu64 " returned a response that doesn't have "
- "a matching request id. Ensure the client is properly "
- "synchronized",
+ snprintf(buf, sizeof(buf),
+ "ch %" PRIu64 " returned a response with an unknown request "
+ "id. Ensure the client is properly synchronized",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -396,7 +270,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
mch_errmsg(e_outofmem);
mch_errmsg("\n");
- decref(channel);
+ channel_decref(channel);
preserve_exit();
}
@@ -405,15 +279,12 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
// causes for this error(search for 'goto _failed')
//
// A not so uncommon cause for this might be deserializing objects with
- // a high nesting level: msgpack will break when it's internal parse stack
- // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
+ // a high nesting level: msgpack will break when its internal parse stack
+ // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
send_error(channel, 0, "Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- decref(channel);
}
static void handle_request(Channel *channel, msgpack_object *request)
@@ -423,7 +294,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error);
- if (error.set) {
+ if (ERROR_SET(&error)) {
// Validation failed, send response with error
if (channel_write(channel,
serialize_response(channel->id,
@@ -433,41 +304,53 @@ static void handle_request(Channel *channel, msgpack_object *request)
&out_buffer))) {
char buf[256];
snprintf(buf, sizeof(buf),
- "Channel %" PRIu64 " sent an invalid message, closed.",
+ "ch %" PRIu64 " sent an invalid message, closed.",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
+ api_clear_error(&error);
return;
}
- // Retrieve the request handler
MsgpackRpcRequestHandler handler;
msgpack_object *method = msgpack_rpc_method(request);
+ handler = msgpack_rpc_get_handler_for(method->via.bin.ptr,
+ method->via.bin.size,
+ &error);
- if (method) {
- handler = msgpack_rpc_get_handler_for(method->via.bin.ptr,
- method->via.bin.size);
- } else {
- handler.fn = msgpack_rpc_handle_missing_method;
- handler.async = true;
+ // check method arguments
+ Array args = ARRAY_DICT_INIT;
+ if (!ERROR_SET(&error)
+ && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
+ api_set_error(&error, kErrorTypeException, "Invalid method arguments");
}
- Array args = ARRAY_DICT_INIT;
- if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
- handler.fn = msgpack_rpc_handle_invalid_arguments;
- handler.async = true;
+ if (ERROR_SET(&error)) {
+ send_error(channel, request_id, error.msg);
+ api_clear_error(&error);
+ api_free_array(args);
+ return;
}
- RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
- event_data->channel = channel;
- event_data->handler = handler;
- event_data->args = args;
- event_data->request_id = request_id;
- incref(channel);
+ RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
+ evdata->channel = channel;
+ evdata->handler = handler;
+ evdata->args = args;
+ evdata->request_id = request_id;
+ channel_incref(channel);
if (handler.async) {
- on_request_event((void **)&event_data);
+ bool is_get_mode = handler.fn == handle_nvim_get_mode;
+
+ if (is_get_mode && !input_blocking()) {
+ // Defer the event to a special queue used by os/input.c. #6247
+ multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata);
+ } else {
+ // Invoke immediately.
+ on_request_event((void **)&evdata);
+ }
} else {
- queue_put(channel->events, on_request_event, 1, event_data);
+ multiqueue_put(channel->events, on_request_event, 1, evdata);
+ DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr);
}
}
@@ -479,7 +362,7 @@ static void on_request_event(void **argv)
Array args = e->args;
uint64_t request_id = e->request_id;
Error error = ERROR_INIT;
- Object result = handler.fn(channel->id, request_id, args, &error);
+ Object result = handler.fn(channel->id, args, &error);
if (request_id != NO_RESPONSE) {
// send the response
msgpack_packer response;
@@ -492,66 +375,79 @@ static void on_request_event(void **argv)
} else {
api_free_object(result);
}
- // All arguments were freed already, but we still need to free the array
- xfree(args.items);
- decref(channel);
+ api_free_array(args);
+ channel_decref(channel);
xfree(e);
+ api_clear_error(&error);
}
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;
- if (channel->closed) {
+ if (channel->rpc.closed) {
wstream_release_wbuffer(buffer);
return false;
}
- switch (channel->type) {
- case kChannelTypeSocket:
- success = wstream_write(&channel->data.stream, buffer);
- break;
- case kChannelTypeProc:
- success = wstream_write(&channel->data.process.in, buffer);
- break;
- case kChannelTypeStdio:
- success = wstream_write(&channel->data.std.out, buffer);
- break;
- default:
- abort();
+ if (channel->streamtype == kChannelStreamInternal) {
+ channel_incref(channel);
+ CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
+ success = true;
+ } else {
+ Stream *in = channel_instream(channel);
+ success = wstream_write(in, buffer);
}
+
if (!success) {
// If the write failed for any reason, close the channel
char buf[256];
snprintf(buf,
sizeof(buf),
- "Before returning from a RPC call, channel %" PRIu64 " was "
- "closed due to a failed write",
+ "ch %" PRIu64 ": stream write failed. "
+ "RPC canceled; closing channel",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
return success;
}
+static void internal_read_event(void **argv)
+{
+ Channel *channel = argv[0];
+ WBuffer *buffer = argv[1];
+
+ msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
+ memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
+ buffer->data, buffer->size);
+ msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
+
+ parse_msgpack(channel);
+
+ channel_decref(channel);
+ wstream_release_wbuffer(buffer);
+}
+
static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
- api_set_error(&e, Exception, "%s", err);
+ api_set_error(&e, kErrorTypeException, "%s", err);
channel_write(channel, serialize_response(channel->id,
id,
&e,
NIL,
&out_buffer));
+ api_clear_error(&e);
}
static void send_request(Channel *channel,
uint64_t id,
- char *name,
+ const char *name,
Array args)
{
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
id,
method,
@@ -561,10 +457,10 @@ static void send_request(Channel *channel,
}
static void send_event(Channel *channel,
- char *name,
+ const char *name,
Array args)
{
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
0,
method,
@@ -573,13 +469,14 @@ static void send_event(Channel *channel,
1));
}
-static void broadcast_event(char *name, Array args)
+static void broadcast_event(const char *name, Array args)
{
kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
Channel *channel;
map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
+ if (channel->is_rpc
+ && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
kv_push(subscribed, channel);
}
});
@@ -589,7 +486,7 @@ static void broadcast_event(char *name, Array args)
goto end;
}
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(0,
0,
method,
@@ -598,12 +495,8 @@ static void broadcast_event(char *name, Array args)
kv_size(subscribed));
for (size_t i = 0; i < kv_size(subscribed); i++) {
- Channel *channel = kv_A(subscribed, i);
- if (channel->pending_requests) {
- kv_push(channel->delayed_notifications, buffer);
- } else {
- channel_write(channel, buffer);
- }
+ Channel *c = kv_A(subscribed, i);
+ channel_write(c, buffer);
}
end:
@@ -613,10 +506,16 @@ end:
static void unsubscribe(Channel *channel, char *event)
{
char *event_string = pmap_get(cstr_t)(event_strings, event);
- pmap_del(cstr_t)(channel->subscribed_events, event_string);
+ if (!event_string) {
+ WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
+ channel->id, event);
+ return;
+ }
+ pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
+ if (channel->is_rpc
+ && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
return;
}
});
@@ -626,85 +525,44 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/process and free the channel resources.
-static void close_channel(Channel *channel)
+
+/// Mark rpc state as closed, and release its reference to the channel.
+/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
+void rpc_close(Channel *channel)
{
- if (channel->closed) {
+ if (channel->rpc.closed) {
return;
}
- channel->closed = true;
+ channel->rpc.closed = true;
+ channel_decref(channel);
- switch (channel->type) {
- case kChannelTypeSocket:
- stream_close(&channel->data.stream, NULL);
- break;
- case kChannelTypeProc:
- if (!channel->data.process.uvproc.process.closed) {
- process_stop(&channel->data.process.uvproc.process);
- }
- break;
- case kChannelTypeStdio:
- stream_close(&channel->data.std.in, NULL);
- stream_close(&channel->data.std.out, NULL);
- queue_put(main_loop.fast_events, exit_event, 1, channel);
- return;
- default:
- abort();
+ if (channel->streamtype == kChannelStreamStdio) {
+ multiqueue_put(main_loop.fast_events, exit_event, 0);
}
-
- decref(channel);
}
static void exit_event(void **argv)
{
- decref(argv[0]);
-
if (!exiting) {
mch_exit(0);
}
}
-static void free_channel(Channel *channel)
+void rpc_free(Channel *channel)
{
remote_ui_disconnect(channel->id);
- pmap_del(uint64_t)(channels, channel->id);
- msgpack_unpacker_free(channel->unpacker);
+ msgpack_unpacker_free(channel->rpc.unpacker);
// Unsubscribe from all events
char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
+ map_foreach_value(channel->rpc.subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
- kv_destroy(channel->delayed_notifications);
- queue_free(channel->events);
- xfree(channel);
-}
-
-static void close_cb(Stream *stream, void *data)
-{
- decref(data);
-}
-
-static Channel *register_channel(ChannelType type)
-{
- Channel *rv = xmalloc(sizeof(Channel));
- rv->events = queue_new_child(main_loop.events);
- rv->type = type;
- rv->refcount = 1;
- rv->closed = false;
- rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = next_id++;
- rv->pending_requests = 0;
- rv->subscribed_events = pmap_new(cstr_t)();
- rv->next_request_id = 1;
- kv_init(rv->call_stack);
- kv_init(rv->delayed_notifications);
- pmap_put(uint64_t)(channels, rv->id, rv);
- return rv;
+ pmap_free(cstr_t)(channel->rpc.subscribed_events);
+ kv_destroy(channel->rpc.call_stack);
+ api_free_dictionary(channel->rpc.info);
}
static bool is_rpc_response(msgpack_object *obj)
@@ -719,15 +577,18 @@ static bool is_rpc_response(msgpack_object *obj)
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
{
uint64_t response_id = obj->via.array.ptr[1].via.u64;
+ if (kv_size(channel->rpc.call_stack) == 0) {
+ return false;
+ }
+
// Must be equal to the frame at the stack's bottom
- return kv_size(channel->call_stack) && response_id
- == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id;
+ ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
+ return response_id == frame->request_id;
}
static void complete_call(msgpack_object *obj, Channel *channel)
{
- ChannelCallFrame *frame = kv_A(channel->call_stack,
- kv_size(channel->call_stack) - 1);
+ ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
frame->returned = true;
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
@@ -738,22 +599,23 @@ static void complete_call(msgpack_object *obj, Channel *channel)
}
}
-static void call_set_error(Channel *channel, char *msg)
+static void call_set_error(Channel *channel, char *msg, int loglevel)
{
- ELOG("msgpack-rpc: %s", msg);
- for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
- ChannelCallFrame *frame = kv_A(channel->call_stack, i);
+ LOG(loglevel, "RPC: %s", msg);
+ for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
+ ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
frame->returned = true;
frame->errored = true;
+ api_free_object(frame->result);
frame->result = STRING_OBJ(cstr_to_string(msg));
}
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
}
static WBuffer *serialize_request(uint64_t channel_id,
uint64_t request_id,
- String method,
+ const String method,
Array args,
msgpack_sbuffer *sbuffer,
size_t refcount)
@@ -779,7 +641,16 @@ static WBuffer *serialize_response(uint64_t channel_id,
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_rpc_serialize_response(response_id, err, arg, &pac);
+ if (ERROR_SET(err) && response_id == NO_RESPONSE) {
+ Array args = ARRAY_DICT_INIT;
+ ADD(args, INTEGER_OBJ(err->type));
+ ADD(args, STRING_OBJ(cstr_to_string(err->msg)));
+ msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"),
+ args, &pac);
+ api_free_array(args);
+ } else {
+ msgpack_rpc_serialize_response(response_id, err, arg, &pac);
+ }
log_server_msg(channel_id, sbuffer);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
@@ -790,33 +661,28 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}
-static void send_delayed_notifications(Channel* channel)
+void rpc_set_client_info(uint64_t id, Dictionary info)
{
- for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
- WBuffer *buffer = kv_A(channel->delayed_notifications, i);
- channel_write(channel, buffer);
+ Channel *chan = find_rpc_channel(id);
+ if (!chan) {
+ abort();
}
- kv_size(channel->delayed_notifications) = 0;
-}
-
-static void incref(Channel *channel)
-{
- channel->refcount++;
+ api_free_dictionary(chan->rpc.info);
+ chan->rpc.info = info;
+ channel_info_changed(chan, false);
}
-static void decref(Channel *channel)
+Dictionary rpc_client_info(Channel *chan)
{
- if (!(--channel->refcount)) {
- free_channel(channel);
- }
+ return copy_dictionary(chan->rpc.info);
}
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
-#define REQ "[request] "
-#define RES "[response] "
-#define NOT "[notification] "
-#define ERR "[error] "
+#define REQ "[request] "
+#define RES "[response] "
+#define NOT "[notify] "
+#define ERR "[error] "
// Cannot define array with negative offsets, so this one is needed to be added
// to MSGPACK_UNPACK_\* values.
@@ -834,7 +700,7 @@ static void log_server_msg(uint64_t channel_id,
{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
- DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id);
+ DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
const msgpack_unpack_return result =
msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
switch (result) {
@@ -871,7 +737,7 @@ static void log_client_msg(uint64_t channel_id,
bool is_request,
msgpack_object msg)
{
- DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id);
+ DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
log_lock();
FILE *f = open_log_file();
fprintf(f, is_request ? REQ : RES);
@@ -887,4 +753,3 @@ static void log_msg_close(FILE *f, msgpack_object msg)
log_unlock();
}
#endif
-