diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 73 |
1 files changed, 25 insertions, 48 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index c3378783f2..8b5f212d66 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -19,6 +19,7 @@ #include "nvim/main.h" #include "nvim/ascii.h" #include "nvim/memory.h" +#include "nvim/eval.h" #include "nvim/os_unix.h" #include "nvim/message.h" #include "nvim/map.h" @@ -55,12 +56,7 @@ typedef struct { msgpack_unpacker *unpacker; union { Stream stream; - struct { - LibuvProcess uvproc; - Stream in; - Stream out; - Stream err; - } process; + Process *proc; struct { Stream in; Stream out; @@ -79,7 +75,6 @@ typedef struct { uint64_t request_id; } RequestEvent; -static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -112,33 +107,20 @@ void channel_teardown(void) } /// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. +/// stdin/stdout. stderr is handled by the job infrastructure. /// /// @param argv The argument vector for the process. [consumed] /// @return The channel id (> 0), on success. /// 0, on error. -uint64_t channel_from_process(char **argv) -{ - Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = libuv_process_init(&main_loop, channel); - Process *proc = &channel->data.process.uvproc.process; - proc->argv = argv; - proc->in = &channel->data.process.in; - proc->out = &channel->data.process.out; - proc->err = &channel->data.process.err; - proc->cb = process_exit; - if (!process_spawn(proc)) { - loop_poll_events(&main_loop, 0); - decref(channel); - return 0; - } - +uint64_t channel_from_process(Process *proc, uint64_t id) +{ + Channel *channel = register_channel(kChannelTypeProc, id, proc->events); incref(channel); // process channels are only closed by the exit_cb + channel->data.proc = proc; + wstream_init(proc->in, 0); rstream_init(proc->out, 0); rstream_start(proc->out, parse_msgpack, channel); - rstream_init(proc->err, 0); - rstream_start(proc->err, forward_stderr, channel); return channel->id; } @@ -148,7 +130,7 @@ uint64_t channel_from_process(char **argv) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; @@ -314,7 +296,7 @@ bool channel_close(uint64_t id) /// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio); + Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); incref(channel); // stdio channels are only closed on exit // read stream rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); @@ -323,20 +305,12 @@ void channel_from_stdio(void) wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); } -static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, - void *data, bool eof) +void channel_process_exit(uint64_t id, int status) { - while (rbuffer_size(rbuf)) { - char buf[256]; - size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); - buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); - } -} + Channel *channel = pmap_get(uint64_t)(channels, id); -static void process_exit(Process *proc, int status, void *data) -{ - decref(data); + channel->closed = true; + decref(channel); } static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, @@ -511,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) success = wstream_write(&channel->data.stream, buffer); break; case kChannelTypeProc: - success = wstream_write(&channel->data.process.in, buffer); + success = wstream_write(channel->data.proc->in, buffer); break; case kChannelTypeStdio: success = wstream_write(&channel->data.std.out, buffer); @@ -639,9 +613,10 @@ static void close_channel(Channel *channel) stream_close(&channel->data.stream, NULL, NULL); break; case kChannelTypeProc: - if (!channel->data.process.uvproc.process.closed) { - process_stop(&channel->data.process.uvproc.process); - } + // Only close the rpc channel part, + // there could be an error message on the stderr stream + process_close_in(channel->data.proc); + process_close_out(channel->data.proc); break; case kChannelTypeStdio: stream_close(&channel->data.std.in, NULL, NULL); @@ -679,7 +654,9 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); - queue_free(channel->events); + if (channel->type != kChannelTypeProc) { + queue_free(channel->events); + } xfree(channel); } @@ -688,15 +665,15 @@ static void close_cb(Stream *stream, void *data) decref(data); } -static Channel *register_channel(ChannelType type) +static Channel *register_channel(ChannelType type, uint64_t id, Queue *events) { Channel *rv = xmalloc(sizeof(Channel)); - rv->events = queue_new_child(main_loop.events); + rv->events = events ? events : queue_new_child(main_loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = next_id++; + rv->id = id > 0 ? id : next_chan_id++; rv->pending_requests = 0; rv->subscribed_events = pmap_new(cstr_t)(); rv->next_request_id = 1; |