diff options
| author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:32:07 -0300 |
|---|---|---|
| committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:32:07 -0300 |
| commit | aa9cb48bf08af14068178619414590254b263882 (patch) | |
| tree | b555f3a48c08862c07ef7518a8ba6c8fa58c1aee /src/nvim/msgpack_rpc | |
| parent | 9d8d2b7fa83fd69d1d616728c505a41acf8fedbb (diff) | |
| download | rneovim-aa9cb48bf08af14068178619414590254b263882.tar.gz rneovim-aa9cb48bf08af14068178619414590254b263882.tar.bz2 rneovim-aa9cb48bf08af14068178619414590254b263882.zip | |
job: Replace by a better process abstraction layer
- New libuv/pty process abstraction with simplified API and no globals.
- Remove nvim/os/job*. Jobs are now a concept that apply only to programs
spawned by vimscript job* functions.
- Refactor shell.c/channel.c to use the new module, which brings a number of
advantages:
- Simplified API, less code
- No slots in the user job table are used
- Not possible to acidentally receive data from vimscript
- Implement job table in eval.c, which is now a hash table with unilimited job
slots and unique job ids.
Diffstat (limited to 'src/nvim/msgpack_rpc')
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 76 |
1 files changed, 37 insertions, 39 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 577965e5ba..861614f147 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -10,11 +10,10 @@ #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/remote_ui.h" #include "nvim/event/loop.h" +#include "nvim/event/uv_process.h" #include "nvim/event/rstream.h" #include "nvim/event/wstream.h" #include "nvim/event/socket.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" #include "nvim/msgpack_rpc/helpers.h" #include "nvim/vim.h" #include "nvim/ascii.h" @@ -35,7 +34,7 @@ typedef enum { kChannelTypeSocket, - kChannelTypeJob, + kChannelTypeProc, kChannelTypeStdio } ChannelType; @@ -54,9 +53,14 @@ typedef struct { ChannelType type; msgpack_unpacker *unpacker; union { - Job *job; Stream stream; struct { + UvProcess uvproc; + Stream in; + Stream out; + Stream err; + } process; + struct { Stream in; Stream out; } std; @@ -110,34 +114,35 @@ void channel_teardown(void) }); } -/// Creates an API channel by starting a job and connecting to its +/// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process. [consumed] /// @return The channel id (> 0), on success. /// 0, on error. -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(kChannelTypeJob); - incref(channel); // job channels are only closed by the exit_cb - - int status; - JobOptions opts = JOB_OPTIONS_INIT; - opts.argv = argv; - opts.data = channel; - opts.stdout_cb = job_out; - opts.stderr_cb = job_err; - opts.exit_cb = job_exit; - channel->data.job = job_start(opts, &status); - - if (status <= 0) { - if (status == 0) { // Two decrefs needed if status == 0. - decref(channel); // Only one needed if status < 0, - } // because exit_cb will do the second one. +uint64_t channel_from_process(char **argv) +{ + Channel *channel = register_channel(kChannelTypeProc); + channel->data.process.uvproc = uv_process_init(channel); + Process *proc = &channel->data.process.uvproc.process; + proc->argv = argv; + proc->in = &channel->data.process.in; + proc->out = &channel->data.process.out; + proc->err = &channel->data.process.err; + proc->cb = process_exit; + if (!process_spawn(&loop, proc)) { + loop_poll_events(&loop, 0); decref(channel); return 0; } + incref(channel); // process channels are only closed by the exit_cb + wstream_init(proc->in, 0); + rstream_init(proc->out, 0); + rstream_start(proc->out, parse_msgpack); + rstream_init(proc->err, 0); + rstream_start(proc->err, forward_stderr); + return channel->id; } @@ -319,24 +324,17 @@ static void channel_from_stdio(void) wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(stream, buf, job_data(job), eof); -} - -static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", - ((Channel *)job_data(data))->id, buf); + ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); } } -static void job_exit(Job *job, int status, void *data) +static void process_exit(Process *proc, int status, void *data) { decref(data); } @@ -511,8 +509,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer) case kChannelTypeSocket: success = wstream_write(&channel->data.stream, buffer); break; - case kChannelTypeJob: - success = job_write(channel->data.job, buffer); + case kChannelTypeProc: + success = wstream_write(&channel->data.process.in, buffer); break; case kChannelTypeStdio: success = wstream_write(&channel->data.std.out, buffer); @@ -627,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/job and free the channel resources. +/// Close the channel streams/process and free the channel resources. static void close_channel(Channel *channel) { if (channel->closed) { @@ -640,9 +638,9 @@ static void close_channel(Channel *channel) case kChannelTypeSocket: stream_close(&channel->data.stream, close_cb); break; - case kChannelTypeJob: - if (channel->data.job) { - job_stop(channel->data.job); + case kChannelTypeProc: + if (!channel->data.process.uvproc.process.closed) { + process_stop(&channel->data.process.uvproc.process); } break; case kChannelTypeStdio: |