diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 195 |
1 files changed, 98 insertions, 97 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 2a81b4f160..861614f147 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -9,13 +9,11 @@ #include "nvim/api/vim.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/remote_ui.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" +#include "nvim/event/loop.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/socket.h" #include "nvim/msgpack_rpc/helpers.h" #include "nvim/vim.h" #include "nvim/ascii.h" @@ -34,6 +32,12 @@ #define log_server_msg(...) #endif +typedef enum { + kChannelTypeSocket, + kChannelTypeProc, + kChannelTypeStdio +} ChannelType; + typedef struct { uint64_t request_id; bool returned, errored; @@ -45,15 +49,21 @@ typedef struct { size_t refcount; size_t pending_requests; PMap(cstr_t) *subscribed_events; - bool is_job, closed; + bool closed; + ChannelType type; msgpack_unpacker *unpacker; union { - Job *job; + Stream stream; + struct { + UvProcess uvproc; + Stream in; + Stream out; + Stream err; + } process; struct { - RStream *read; - WStream *write; - uv_stream_t *uv; - } streams; + Stream in; + Stream out; + } std; } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; @@ -104,57 +114,48 @@ void channel_teardown(void) }); } -/// Creates an API channel by starting a job and connecting to its +/// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process. [consumed] /// @return The channel id (> 0), on success. /// 0, on error. -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(); - channel->is_job = true; - incref(channel); // job channels are only closed by the exit_cb - - int status; - JobOptions opts = JOB_OPTIONS_INIT; - opts.argv = argv; - opts.data = channel; - opts.stdout_cb = job_out; - opts.stderr_cb = job_err; - opts.exit_cb = job_exit; - channel->data.job = job_start(opts, &status); - - if (status <= 0) { - if (status == 0) { // Two decrefs needed if status == 0. - decref(channel); // Only one needed if status < 0, - } // because exit_cb will do the second one. +uint64_t channel_from_process(char **argv) +{ + Channel *channel = register_channel(kChannelTypeProc); + channel->data.process.uvproc = uv_process_init(channel); + Process *proc = &channel->data.process.uvproc.process; + proc->argv = argv; + proc->in = &channel->data.process.in; + proc->out = &channel->data.process.out; + proc->err = &channel->data.process.err; + proc->cb = process_exit; + if (!process_spawn(&loop, proc)) { + loop_poll_events(&loop, 0); decref(channel); return 0; } + incref(channel); // process channels are only closed by the exit_cb + wstream_init(proc->in, 0); + rstream_init(proc->out, 0); + rstream_start(proc->out, parse_msgpack); + rstream_init(proc->err, 0); + rstream_start(proc->err, forward_stderr); + return channel->id; } -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection +/// Creates an API channel from a tcp/pipe socket connection /// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) +/// @param watcher The SocketWatcher ready to accept the connection +void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(); - stream->data = NULL; - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel); - rstream_set_stream(channel->data.streams.read, stream); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_stream(channel->data.streams.write, stream); - channel->data.streams.uv = stream; + Channel *channel = register_channel(kChannelTypeSocket); + socket_watcher_accept(watcher, &channel->data.stream, channel); + wstream_init(&channel->data.stream, 0); + rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.stream, parse_msgpack); } /// Sends event/arguments to channel @@ -220,7 +221,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - event_poll_until(-1, frame.returned); + LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -313,44 +314,32 @@ bool channel_close(uint64_t id) /// Neovim static void channel_from_stdio(void) { - Channel *channel = register_channel(); + Channel *channel = register_channel(kChannelTypeStdio); incref(channel); // stdio channels are only closed on exit - channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel); - rstream_set_file(channel->data.streams.read, 0); - rstream_start(channel->data.streams.read); + rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, + channel); + rstream_start(&channel->data.std.in, parse_msgpack); // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_file(channel->data.streams.write, 1); - channel->data.streams.uv = NULL; -} - -static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(rstream, buf, job_data(job), eof); + wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", - ((Channel *)job_data(data))->id, buf); + ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); } } -static void job_exit(Job *job, int status, void *data) +static void process_exit(Process *proc, int status, void *data) { decref(data); } -static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) { Channel *channel = data; incref(channel); @@ -362,9 +351,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) } size_t count = rbuffer_size(rbuf); - DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)", count, - rstream); + stream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -474,7 +463,7 @@ static void handle_request(Channel *channel, msgpack_object *request) event_data->args = args; event_data->request_id = request_id; incref(channel); - event_push((Event) { + loop_push_event(&loop, (Event) { .handler = on_request_event, .data = event_data }, defer); @@ -516,10 +505,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer) return false; } - if (channel->is_job) { - success = job_write(channel->data.job, buffer); - } else { - success = wstream_write(channel->data.streams.write, buffer); + switch (channel->type) { + case kChannelTypeSocket: + success = wstream_write(&channel->data.stream, buffer); + break; + case kChannelTypeProc: + success = wstream_write(&channel->data.process.in, buffer); + break; + case kChannelTypeStdio: + success = wstream_write(&channel->data.std.out, buffer); + break; + default: + abort(); } if (!success) { @@ -628,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/job and free the channel resources. +/// Close the channel streams/process and free the channel resources. static void close_channel(Channel *channel) { if (channel->closed) { @@ -637,19 +634,23 @@ static void close_channel(Channel *channel) channel->closed = true; - if (channel->is_job) { - if (channel->data.job) { - job_stop(channel->data.job); - } - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; - if (handle) { - uv_close(handle, close_cb); - } else { - event_push((Event) { .handler = on_stdio_close, .data = channel }, false); - } + switch (channel->type) { + case kChannelTypeSocket: + stream_close(&channel->data.stream, close_cb); + break; + case kChannelTypeProc: + if (!channel->data.process.uvproc.process.closed) { + process_stop(&channel->data.process.uvproc.process); + } + break; + case kChannelTypeStdio: + stream_close(&channel->data.std.in, NULL); + stream_close(&channel->data.std.out, NULL); + loop_push_event(&loop, + (Event) { .handler = on_stdio_close, .data = channel }, false); + break; + default: + abort(); } decref(channel); @@ -682,15 +683,15 @@ static void free_channel(Channel *channel) xfree(channel); } -static void close_cb(uv_handle_t *handle) +static void close_cb(Stream *stream, void *data) { - xfree(handle->data); - xfree(handle); + xfree(data); } -static Channel *register_channel(void) +static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->type = type; rv->refcount = 1; rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); |