aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
authorBjörn Linse <bjorn.linse@gmail.com>2016-08-20 12:58:37 +0200
committerGitHub <noreply@github.com>2016-08-20 12:58:37 +0200
commit71b3e20d0fba20d4957c1949352bde1744a0a947 (patch)
tree7844b2d185b27a0303183e90792a5ef807933e88 /src/nvim/msgpack_rpc/channel.c
parent1b825a9ada4d89059645bc7a458e1e8d931c6161 (diff)
parent2d60a15e25f487eda1ac00a9e6cdf9a6564fb416 (diff)
downloadrneovim-71b3e20d0fba20d4957c1949352bde1744a0a947.tar.gz
rneovim-71b3e20d0fba20d4957c1949352bde1744a0a947.tar.bz2
rneovim-71b3e20d0fba20d4957c1949352bde1744a0a947.zip
Merge pull request #4723 from bfredl/rpcstderr
allow stderr handler for rpc jobs and use it to display python/ruby startup error
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c92
1 files changed, 34 insertions, 58 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 5b249ee1c7..8b5f212d66 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -19,6 +19,7 @@
#include "nvim/main.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
+#include "nvim/eval.h"
#include "nvim/os_unix.h"
#include "nvim/message.h"
#include "nvim/map.h"
@@ -55,12 +56,7 @@ typedef struct {
msgpack_unpacker *unpacker;
union {
Stream stream;
- struct {
- LibuvProcess uvproc;
- Stream in;
- Stream out;
- Stream err;
- } process;
+ Process *proc;
struct {
Stream in;
Stream out;
@@ -79,7 +75,6 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
-static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@@ -112,33 +107,20 @@ void channel_teardown(void)
}
/// Creates an API channel by starting a process and connecting to its
-/// stdin/stdout. stderr is forwarded to the editor error stream.
+/// stdin/stdout. stderr is handled by the job infrastructure.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_process(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeProc);
- channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
- Process *proc = &channel->data.process.uvproc.process;
- proc->argv = argv;
- proc->in = &channel->data.process.in;
- proc->out = &channel->data.process.out;
- proc->err = &channel->data.process.err;
- proc->cb = process_exit;
- if (!process_spawn(proc)) {
- loop_poll_events(&main_loop, 0);
- decref(channel);
- return 0;
- }
-
+uint64_t channel_from_process(Process *proc, uint64_t id)
+{
+ Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
incref(channel); // process channels are only closed by the exit_cb
+ channel->data.proc = proc;
+
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack);
- rstream_init(proc->err, 0);
- rstream_start(proc->err, forward_stderr);
+ rstream_start(proc->out, parse_msgpack, channel);
return channel->id;
}
@@ -148,14 +130,14 @@ uint64_t channel_from_process(char **argv)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel(kChannelTypeSocket);
- socket_watcher_accept(watcher, &channel->data.stream, channel);
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack);
+ rstream_start(&channel->data.stream, parse_msgpack, channel);
}
/// Sends event/arguments to channel
@@ -314,30 +296,21 @@ bool channel_close(uint64_t id)
/// Neovim
void channel_from_stdio(void)
{
- Channel *channel = register_channel(kChannelTypeStdio);
+ Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
- rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
- channel);
- rstream_start(&channel->data.std.in, parse_msgpack);
+ rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.std.in, parse_msgpack, channel);
// write stream
- wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL);
+ wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
-static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
- void *data, bool eof)
+void channel_process_exit(uint64_t id, int status)
{
- while (rbuffer_size(rbuf)) {
- char buf[256];
- size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
- buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
- }
-}
+ Channel *channel = pmap_get(uint64_t)(channels, id);
-static void process_exit(Process *proc, int status, void *data)
-{
- decref(data);
+ channel->closed = true;
+ decref(channel);
}
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
@@ -512,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeProc:
- success = wstream_write(&channel->data.process.in, buffer);
+ success = wstream_write(channel->data.proc->in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@@ -637,16 +610,17 @@ static void close_channel(Channel *channel)
switch (channel->type) {
case kChannelTypeSocket:
- stream_close(&channel->data.stream, NULL);
+ stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
- if (!channel->data.process.uvproc.process.closed) {
- process_stop(&channel->data.process.uvproc.process);
- }
+ // Only close the rpc channel part,
+ // there could be an error message on the stderr stream
+ process_close_in(channel->data.proc);
+ process_close_out(channel->data.proc);
break;
case kChannelTypeStdio:
- stream_close(&channel->data.std.in, NULL);
- stream_close(&channel->data.std.out, NULL);
+ stream_close(&channel->data.std.in, NULL, NULL);
+ stream_close(&channel->data.std.out, NULL, NULL);
queue_put(main_loop.fast_events, exit_event, 1, channel);
return;
default:
@@ -680,7 +654,9 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
- queue_free(channel->events);
+ if (channel->type != kChannelTypeProc) {
+ queue_free(channel->events);
+ }
xfree(channel);
}
@@ -689,15 +665,15 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
-static Channel *register_channel(ChannelType type)
+static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)
{
Channel *rv = xmalloc(sizeof(Channel));
- rv->events = queue_new_child(main_loop.events);
+ rv->events = events ? events : queue_new_child(main_loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = next_id++;
+ rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;