aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c171
1 files changed, 91 insertions, 80 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 34ff7c6374..98636263b9 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -7,8 +7,8 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
+#include "nvim/api/ui.h"
#include "nvim/msgpack_rpc/channel.h"
-#include "nvim/msgpack_rpc/remote_ui.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@@ -16,8 +16,10 @@
#include "nvim/event/socket.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
+#include "nvim/main.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
+#include "nvim/eval.h"
#include "nvim/os_unix.h"
#include "nvim/message.h"
#include "nvim/map.h"
@@ -54,12 +56,7 @@ typedef struct {
msgpack_unpacker *unpacker;
union {
Stream stream;
- struct {
- LibuvProcess uvproc;
- Stream in;
- Stream out;
- Stream err;
- } process;
+ Process *proc;
struct {
Stream in;
Stream out;
@@ -68,7 +65,7 @@ typedef struct {
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
- Queue *events;
+ MultiQueue *events;
} Channel;
typedef struct {
@@ -78,7 +75,6 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
-static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@@ -111,33 +107,20 @@ void channel_teardown(void)
}
/// Creates an API channel by starting a process and connecting to its
-/// stdin/stdout. stderr is forwarded to the editor error stream.
+/// stdin/stdout. stderr is handled by the job infrastructure.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_process(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeProc);
- channel->data.process.uvproc = libuv_process_init(&loop, channel);
- Process *proc = &channel->data.process.uvproc.process;
- proc->argv = argv;
- proc->in = &channel->data.process.in;
- proc->out = &channel->data.process.out;
- proc->err = &channel->data.process.err;
- proc->cb = process_exit;
- if (!process_spawn(proc)) {
- loop_poll_events(&loop, 0);
- decref(channel);
- return 0;
- }
-
+uint64_t channel_from_process(Process *proc, uint64_t id)
+{
+ Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
incref(channel); // process channels are only closed by the exit_cb
+ channel->data.proc = proc;
+
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack);
- rstream_init(proc->err, 0);
- rstream_start(proc->err, forward_stderr);
+ rstream_start(proc->out, parse_msgpack, channel);
return channel->id;
}
@@ -147,14 +130,14 @@ uint64_t channel_from_process(char **argv)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel(kChannelTypeSocket);
- socket_watcher_accept(watcher, &channel->data.stream, channel);
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack);
+ rstream_start(&channel->data.stream, parse_msgpack, channel);
}
/// Sends event/arguments to channel
@@ -179,7 +162,7 @@ bool channel_send_event(uint64_t id, char *name, Array args)
// Pending request, queue the notification for later sending.
String method = cstr_as_string(name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
- kv_push(WBuffer *, channel->delayed_notifications, buffer);
+ kv_push(channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
@@ -217,10 +200,10 @@ Object channel_send_call(uint64_t id,
send_request(channel, request_id, method_name, args);
// Push the frame
- ChannelCallFrame frame = {request_id, false, false, NIL};
- kv_push(ChannelCallFrame *, channel->call_stack, &frame);
+ ChannelCallFrame frame = { request_id, false, false, NIL };
+ kv_push(channel->call_stack, &frame);
channel->pending_requests++;
- LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned);
+ LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@@ -313,30 +296,21 @@ bool channel_close(uint64_t id)
/// Neovim
void channel_from_stdio(void)
{
- Channel *channel = register_channel(kChannelTypeStdio);
+ Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
- rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
- channel);
- rstream_start(&channel->data.std.in, parse_msgpack);
+ rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.std.in, parse_msgpack, channel);
// write stream
- wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
+ wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
-static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
- void *data, bool eof)
+void channel_process_exit(uint64_t id, int status)
{
- while (rbuffer_size(rbuf)) {
- char buf[256];
- size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
- buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
- }
-}
+ Channel *channel = pmap_get(uint64_t)(channels, id);
-static void process_exit(Process *proc, int status, void *data)
-{
- decref(data);
+ channel->closed = true;
+ decref(channel);
}
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
@@ -466,7 +440,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
if (handler.async) {
on_request_event((void **)&event_data);
} else {
- queue_put(channel->events, on_request_event, 1, event_data);
+ multiqueue_put(channel->events, on_request_event, 1, event_data);
}
}
@@ -478,7 +452,7 @@ static void on_request_event(void **argv)
Array args = e->args;
uint64_t request_id = e->request_id;
Error error = ERROR_INIT;
- Object result = handler.fn(channel->id, request_id, args, &error);
+ Object result = handler.fn(channel->id, args, &error);
if (request_id != NO_RESPONSE) {
// send the response
msgpack_packer response;
@@ -491,8 +465,7 @@ static void on_request_event(void **argv)
} else {
api_free_object(result);
}
- // All arguments were freed already, but we still need to free the array
- xfree(args.items);
+ api_free_array(args);
decref(channel);
xfree(e);
}
@@ -511,7 +484,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeProc:
- success = wstream_write(&channel->data.process.in, buffer);
+ success = wstream_write(channel->data.proc->in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@@ -574,13 +547,12 @@ static void send_event(Channel *channel,
static void broadcast_event(char *name, Array args)
{
- kvec_t(Channel *) subscribed;
- kv_init(subscribed);
+ kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
Channel *channel;
map_foreach_value(channels, channel, {
if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
- kv_push(Channel *, subscribed, channel);
+ kv_push(subscribed, channel);
}
});
@@ -600,7 +572,7 @@ static void broadcast_event(char *name, Array args)
for (size_t i = 0; i < kv_size(subscribed); i++) {
Channel *channel = kv_A(subscribed, i);
if (channel->pending_requests) {
- kv_push(WBuffer *, channel->delayed_notifications, buffer);
+ kv_push(channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
@@ -637,17 +609,18 @@ static void close_channel(Channel *channel)
switch (channel->type) {
case kChannelTypeSocket:
- stream_close(&channel->data.stream, NULL);
+ stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
- if (!channel->data.process.uvproc.process.closed) {
- process_stop(&channel->data.process.uvproc.process);
- }
+ // Only close the rpc channel part,
+ // there could be an error message on the stderr stream
+ process_close_in(channel->data.proc);
+ process_close_out(channel->data.proc);
break;
case kChannelTypeStdio:
- stream_close(&channel->data.std.in, NULL);
- stream_close(&channel->data.std.out, NULL);
- queue_put(loop.fast_events, exit_event, 1, channel);
+ stream_close(&channel->data.std.in, NULL, NULL);
+ stream_close(&channel->data.std.out, NULL, NULL);
+ multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
default:
abort();
@@ -680,7 +653,9 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
- queue_free(channel->events);
+ if (channel->type != kChannelTypeProc) {
+ multiqueue_free(channel->events);
+ }
xfree(channel);
}
@@ -689,15 +664,16 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
-static Channel *register_channel(ChannelType type)
+static Channel *register_channel(ChannelType type, uint64_t id,
+ MultiQueue *events)
{
Channel *rv = xmalloc(sizeof(Channel));
- rv->events = queue_new_child(loop.events);
+ rv->events = events ? events : multiqueue_new_child(main_loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = next_id++;
+ rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
@@ -816,20 +792,55 @@ static void decref(Channel *channel)
#define REQ "[request] "
#define RES "[response] "
#define NOT "[notification] "
+#define ERR "[error] "
+
+// Cannot define array with negative offsets, so this one is needed to be added
+// to MSGPACK_UNPACK_\* values.
+#define MUR_OFF 2
+
+static const char *const msgpack_error_messages[] = {
+ [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found",
+ [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string",
+ [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error",
+ [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory",
+};
static void log_server_msg(uint64_t channel_id,
msgpack_sbuffer *packed)
{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
- msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
- uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id);
- log_lock();
- FILE *f = open_log_file();
- fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
- log_msg_close(f, unpacked.data);
- msgpack_unpacked_destroy(&unpacked);
+ const msgpack_unpack_return result =
+ msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
+ switch (result) {
+ case MSGPACK_UNPACK_SUCCESS: {
+ uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
+ log_msg_close(f, unpacked.data);
+ msgpack_unpacked_destroy(&unpacked);
+ break;
+ }
+ case MSGPACK_UNPACK_EXTRA_BYTES:
+ case MSGPACK_UNPACK_CONTINUE:
+ case MSGPACK_UNPACK_PARSE_ERROR:
+ case MSGPACK_UNPACK_NOMEM_ERROR: {
+ log_lock();
+ FILE *f = open_log_file();
+ fprintf(f, ERR);
+ log_msg_close(f, (msgpack_object) {
+ .type = MSGPACK_OBJECT_STR,
+ .via.str = {
+ .ptr = (char *)msgpack_error_messages[result + MUR_OFF],
+ .size = (uint32_t)strlen(
+ msgpack_error_messages[result + MUR_OFF]),
+ },
+ });
+ break;
+ }
+ }
}
static void log_client_msg(uint64_t channel_id,