aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c314
1 files changed, 231 insertions, 83 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 98636263b9..02f3854f47 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <stdbool.h>
#include <string.h>
#include <inttypes.h>
@@ -9,6 +12,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@@ -25,7 +29,9 @@
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
+#include "nvim/path.h"
#include "nvim/lib/kvec.h"
+#include "nvim/os/input.h"
#define CHANNEL_BUFFER_SIZE 0xffff
@@ -37,7 +43,8 @@
typedef enum {
kChannelTypeSocket,
kChannelTypeProc,
- kChannelTypeStdio
+ kChannelTypeStdio,
+ kChannelTypeInternal
} ChannelType;
typedef struct {
@@ -55,7 +62,7 @@ typedef struct {
ChannelType type;
msgpack_unpacker *unpacker;
union {
- Stream stream;
+ Stream stream; // bidirectional (socket)
Process *proc;
struct {
Stream in;
@@ -86,6 +93,7 @@ static msgpack_sbuffer out_buffer;
/// Initializes the module
void channel_init(void)
{
+ ch_before_blocking_events = multiqueue_new_child(main_loop.events);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@@ -109,18 +117,24 @@ void channel_teardown(void)
/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is handled by the job infrastructure.
///
-/// @param argv The argument vector for the process. [consumed]
-/// @return The channel id (> 0), on success.
-/// 0, on error.
-uint64_t channel_from_process(Process *proc, uint64_t id)
+/// @param proc process object
+/// @param id (optional) channel id
+/// @param source description of source function, rplugin name, TCP addr, etc
+///
+/// @return Channel id (> 0), on success. 0, on error.
+uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
{
- Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
+ Channel *channel = register_channel(kChannelTypeProc, id, proc->events,
+ source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack, channel);
+ rstream_start(proc->out, receive_msgpack, channel);
+
+ DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
+ proc->out);
return channel->id;
}
@@ -130,14 +144,48 @@ uint64_t channel_from_process(Process *proc, uint64_t id)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL,
+ watcher->addr);
socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack, channel);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+
+ DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
+ &channel->data.stream);
+}
+
+/// @param source description of source function, rplugin name, TCP addr, etc
+uint64_t channel_connect(bool tcp, const char *address, int timeout,
+ char *source, const char **error)
+{
+ if (!tcp) {
+ char *path = fix_fname(address);
+ if (server_owns_pipe_address(path)) {
+ // avoid deadlock
+ xfree(path);
+ return channel_create_internal();
+ }
+ xfree(path);
+ }
+
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source);
+ if (!socket_connect(&main_loop, &channel->data.stream,
+ tcp, address, timeout, error)) {
+ decref(channel);
+ return 0;
+ }
+
+ incref(channel); // close channel only after the stream is closed
+ channel->data.stream.internal_close_cb = close_cb;
+ channel->data.stream.internal_data = channel;
+ wstream_init(&channel->data.stream, 0);
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+ return channel->id;
}
/// Sends event/arguments to channel
@@ -147,7 +195,7 @@ void channel_from_connection(SocketWatcher *watcher)
/// @param name The event name, an arbitrary string
/// @param args Array with event arguments
/// @return True if the event was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *name, Array args)
+bool channel_send_event(uint64_t id, const char *name, Array args)
{
Channel *channel = NULL;
@@ -160,7 +208,7 @@ bool channel_send_event(uint64_t id, char *name, Array args)
if (channel) {
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
- String method = cstr_as_string(name);
+ const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
@@ -182,14 +230,14 @@ bool channel_send_event(uint64_t id, char *name, Array args)
/// @param[out] error True if the return value is an error
/// @return Whatever the remote method returned
Object channel_send_call(uint64_t id,
- char *method_name,
+ const char *method_name,
Array args,
Error *err)
{
Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
- api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
+ api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
api_free_array(args);
return NIL;
}
@@ -209,7 +257,8 @@ Object channel_send_call(uint64_t id,
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
- api_set_error(err, Exception, "%s", frame.result.data.string.data);
+ api_set_error(err, kErrorTypeException, "%s",
+ frame.result.data.string.data);
} else if (frame.result.type == kObjectTypeArray) {
// Should be an error in the form [type, message]
Array array = frame.result.data.array;
@@ -217,14 +266,13 @@ Object channel_send_call(uint64_t id,
&& (array.items[0].data.integer == kErrorTypeException
|| array.items[0].data.integer == kErrorTypeValidation)
&& array.items[1].type == kObjectTypeString) {
- err->type = (ErrorType) array.items[0].data.integer;
- xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg));
- err->set = true;
+ api_set_error(err, (ErrorType)array.items[0].data.integer, "%s",
+ array.items[1].data.string.data);
} else {
- api_set_error(err, Exception, "%s", "unknown error");
+ api_set_error(err, kErrorTypeException, "%s", "unknown error");
}
} else {
- api_set_error(err, Exception, "%s", "unknown error");
+ api_set_error(err, kErrorTypeException, "%s", "unknown error");
}
api_free_object(frame.result);
@@ -292,17 +340,28 @@ bool channel_close(uint64_t id)
return true;
}
-/// Creates an API channel from stdin/stdout. This is used when embedding
-/// Neovim
+/// Creates an API channel from stdin/stdout. Used to embed Nvim.
void channel_from_stdio(void)
{
- Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
+ Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.std.in, parse_msgpack, channel);
+ rstream_start(&channel->data.std.in, receive_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
+
+ DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
+ &channel->data.std.in, &channel->data.std.out);
+}
+
+/// Creates a loopback channel. This is used to avoid deadlock
+/// when an instance connects to its own named pipe.
+uint64_t channel_create_internal(void)
+{
+ Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL);
+ incref(channel); // internal channel lives until process exit
+ return channel->id;
}
void channel_process_exit(uint64_t id, int status)
@@ -313,28 +372,50 @@ void channel_process_exit(uint64_t id, int status)
decref(channel);
}
-static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
- bool eof)
+// rstream.c:read_event() invokes this as stream->read_cb().
+static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
+ void *data, bool eof)
{
Channel *channel = data;
incref(channel);
if (eof) {
close_channel(channel);
- call_set_error(channel, "Channel was closed by the client");
+ char buf[256];
+ snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
+ channel->id);
+ call_set_error(channel, buf, WARN_LOG_LEVEL);
+ goto end;
+ }
+
+ if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
+ || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
+ char buf[256];
+ snprintf(buf, sizeof(buf),
+ "ch %" PRIu64 ": stream closed unexpectedly. "
+ "closing channel",
+ channel->id);
+ call_set_error(channel, buf, WARN_LOG_LEVEL);
goto end;
}
size_t count = rbuffer_size(rbuf);
- DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
- count,
- stream);
+ DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
+ channel->id, count, stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+ parse_msgpack(channel);
+
+end:
+ decref(channel);
+}
+
+static void parse_msgpack(Channel *channel)
+{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
msgpack_unpack_return result;
@@ -350,17 +431,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
complete_call(&unpacked.data, channel);
} else {
char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Channel %" PRIu64 " returned a response that doesn't have "
- "a matching request id. Ensure the client is properly "
- "synchronized",
+ snprintf(buf, sizeof(buf),
+ "ch %" PRIu64 " returned a response with an unknown request "
+ "id. Ensure the client is properly synchronized",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -378,17 +457,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
// causes for this error(search for 'goto _failed')
//
// A not so uncommon cause for this might be deserializing objects with
- // a high nesting level: msgpack will break when it's internal parse stack
- // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
+ // a high nesting level: msgpack will break when its internal parse stack
+ // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
send_error(channel, 0, "Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- decref(channel);
}
+
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
@@ -396,7 +473,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error);
- if (error.set) {
+ if (ERROR_SET(&error)) {
// Validation failed, send response with error
if (channel_write(channel,
serialize_response(channel->id,
@@ -406,13 +483,13 @@ static void handle_request(Channel *channel, msgpack_object *request)
&out_buffer))) {
char buf[256];
snprintf(buf, sizeof(buf),
- "Channel %" PRIu64 " sent an invalid message, closed.",
+ "ch %" PRIu64 " sent an invalid message, closed.",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
+ api_clear_error(&error);
return;
}
-
// Retrieve the request handler
MsgpackRpcRequestHandler handler;
msgpack_object *method = msgpack_rpc_method(request);
@@ -431,16 +508,24 @@ static void handle_request(Channel *channel, msgpack_object *request)
handler.async = true;
}
- RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
- event_data->channel = channel;
- event_data->handler = handler;
- event_data->args = args;
- event_data->request_id = request_id;
+ RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
+ evdata->channel = channel;
+ evdata->handler = handler;
+ evdata->args = args;
+ evdata->request_id = request_id;
incref(channel);
if (handler.async) {
- on_request_event((void **)&event_data);
+ bool is_get_mode = handler.fn == handle_nvim_get_mode;
+
+ if (is_get_mode && !input_blocking()) {
+ // Defer the event to a special queue used by os/input.c. #6247
+ multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata);
+ } else {
+ // Invoke immediately.
+ on_request_event((void **)&evdata);
+ }
} else {
- multiqueue_put(channel->events, on_request_event, 1, event_data);
+ multiqueue_put(channel->events, on_request_event, 1, evdata);
}
}
@@ -468,11 +553,45 @@ static void on_request_event(void **argv)
api_free_array(args);
decref(channel);
xfree(e);
+ api_clear_error(&error);
}
+/// Returns the Stream that a Channel writes to.
+static Stream *chan_wstream(Channel *chan)
+{
+ switch (chan->type) {
+ case kChannelTypeSocket:
+ return &chan->data.stream;
+ case kChannelTypeProc:
+ return chan->data.proc->in;
+ case kChannelTypeStdio:
+ return &chan->data.std.out;
+ case kChannelTypeInternal:
+ return NULL;
+ }
+ abort();
+}
+
+/// Returns the Stream that a Channel reads from.
+static Stream *chan_rstream(Channel *chan)
+{
+ switch (chan->type) {
+ case kChannelTypeSocket:
+ return &chan->data.stream;
+ case kChannelTypeProc:
+ return chan->data.proc->out;
+ case kChannelTypeStdio:
+ return &chan->data.std.in;
+ case kChannelTypeInternal:
+ return NULL;
+ }
+ abort();
+}
+
+
static bool channel_write(Channel *channel, WBuffer *buffer)
{
- bool success;
+ bool success = false;
if (channel->closed) {
wstream_release_wbuffer(buffer);
@@ -481,16 +600,15 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
switch (channel->type) {
case kChannelTypeSocket:
- success = wstream_write(&channel->data.stream, buffer);
- break;
case kChannelTypeProc:
- success = wstream_write(channel->data.proc->in, buffer);
- break;
case kChannelTypeStdio:
- success = wstream_write(&channel->data.std.out, buffer);
+ success = wstream_write(chan_wstream(channel), buffer);
+ break;
+ case kChannelTypeInternal:
+ incref(channel);
+ CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
+ success = true;
break;
- default:
- abort();
}
if (!success) {
@@ -498,32 +616,49 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
char buf[256];
snprintf(buf,
sizeof(buf),
- "Before returning from a RPC call, channel %" PRIu64 " was "
- "closed due to a failed write",
+ "ch %" PRIu64 ": stream write failed. "
+ "RPC canceled; closing channel",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
return success;
}
+static void internal_read_event(void **argv)
+{
+ Channel *channel = argv[0];
+ WBuffer *buffer = argv[1];
+
+ msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
+ memcpy(msgpack_unpacker_buffer(channel->unpacker),
+ buffer->data, buffer->size);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
+
+ parse_msgpack(channel);
+
+ decref(channel);
+ wstream_release_wbuffer(buffer);
+}
+
static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
- api_set_error(&e, Exception, "%s", err);
+ api_set_error(&e, kErrorTypeException, "%s", err);
channel_write(channel, serialize_response(channel->id,
id,
&e,
NIL,
&out_buffer));
+ api_clear_error(&e);
}
static void send_request(Channel *channel,
uint64_t id,
- char *name,
+ const char *name,
Array args)
{
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
id,
method,
@@ -533,10 +668,10 @@ static void send_request(Channel *channel,
}
static void send_event(Channel *channel,
- char *name,
+ const char *name,
Array args)
{
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
0,
method,
@@ -545,7 +680,7 @@ static void send_event(Channel *channel,
1));
}
-static void broadcast_event(char *name, Array args)
+static void broadcast_event(const char *name, Array args)
{
kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
Channel *channel;
@@ -561,7 +696,7 @@ static void broadcast_event(char *name, Array args)
goto end;
}
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(0,
0,
method,
@@ -622,8 +757,9 @@ static void close_channel(Channel *channel)
stream_close(&channel->data.std.out, NULL, NULL);
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
- default:
- abort();
+ case kChannelTypeInternal:
+ // nothing to free.
+ break;
}
decref(channel);
@@ -664,9 +800,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
- MultiQueue *events)
+ MultiQueue *events, char *source)
{
+ // Jobs and channels share the same id namespace.
+ assert(id == 0 || !pmap_get(uint64_t)(channels, id));
Channel *rv = xmalloc(sizeof(Channel));
rv->events = events ? events : multiqueue_new_child(main_loop.events);
rv->type = type;
@@ -680,6 +819,14 @@ static Channel *register_channel(ChannelType type, uint64_t id,
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);
+
+ ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
+ (type == kChannelTypeProc ? "proc"
+ : (type == kChannelTypeSocket ? "socket"
+ : (type == kChannelTypeStdio ? "stdio"
+ : (type == kChannelTypeInternal ? "internal" : "?")))),
+ (source ? source : "?"));
+
return rv;
}
@@ -714,13 +861,14 @@ static void complete_call(msgpack_object *obj, Channel *channel)
}
}
-static void call_set_error(Channel *channel, char *msg)
+static void call_set_error(Channel *channel, char *msg, int loglevel)
{
- ELOG("msgpack-rpc: %s", msg);
+ LOG(loglevel, "RPC: %s", msg);
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
frame->returned = true;
frame->errored = true;
+ api_free_object(frame->result);
frame->result = STRING_OBJ(cstr_to_string(msg));
}
@@ -729,7 +877,7 @@ static void call_set_error(Channel *channel, char *msg)
static WBuffer *serialize_request(uint64_t channel_id,
uint64_t request_id,
- String method,
+ const String method,
Array args,
msgpack_sbuffer *sbuffer,
size_t refcount)
@@ -789,10 +937,10 @@ static void decref(Channel *channel)
}
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
-#define REQ "[request] "
-#define RES "[response] "
-#define NOT "[notification] "
-#define ERR "[error] "
+#define REQ "[request] "
+#define RES "[response] "
+#define NOT "[notify] "
+#define ERR "[error] "
// Cannot define array with negative offsets, so this one is needed to be added
// to MSGPACK_UNPACK_\* values.
@@ -810,7 +958,7 @@ static void log_server_msg(uint64_t channel_id,
{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
- DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id);
+ DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
const msgpack_unpack_return result =
msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
switch (result) {
@@ -847,7 +995,7 @@ static void log_client_msg(uint64_t channel_id,
bool is_request,
msgpack_object msg)
{
- DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id);
+ DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
log_lock();
FILE *f = open_log_file();
fprintf(f, is_request ? REQ : RES);