aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:32:07 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:32:07 -0300
commitaa9cb48bf08af14068178619414590254b263882 (patch)
treeb555f3a48c08862c07ef7518a8ba6c8fa58c1aee /src/nvim/msgpack_rpc
parent9d8d2b7fa83fd69d1d616728c505a41acf8fedbb (diff)
downloadrneovim-aa9cb48bf08af14068178619414590254b263882.tar.gz
rneovim-aa9cb48bf08af14068178619414590254b263882.tar.bz2
rneovim-aa9cb48bf08af14068178619414590254b263882.zip
job: Replace by a better process abstraction layer
- New libuv/pty process abstraction with simplified API and no globals. - Remove nvim/os/job*. Jobs are now a concept that apply only to programs spawned by vimscript job* functions. - Refactor shell.c/channel.c to use the new module, which brings a number of advantages: - Simplified API, less code - No slots in the user job table are used - Not possible to acidentally receive data from vimscript - Implement job table in eval.c, which is now a hash table with unilimited job slots and unique job ids.
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r--src/nvim/msgpack_rpc/channel.c76
1 files changed, 37 insertions, 39 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 577965e5ba..861614f147 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -10,11 +10,10 @@
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
#include "nvim/event/loop.h"
+#include "nvim/event/uv_process.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/event/socket.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@@ -35,7 +34,7 @@
typedef enum {
kChannelTypeSocket,
- kChannelTypeJob,
+ kChannelTypeProc,
kChannelTypeStdio
} ChannelType;
@@ -54,9 +53,14 @@ typedef struct {
ChannelType type;
msgpack_unpacker *unpacker;
union {
- Job *job;
Stream stream;
struct {
+ UvProcess uvproc;
+ Stream in;
+ Stream out;
+ Stream err;
+ } process;
+ struct {
Stream in;
Stream out;
} std;
@@ -110,34 +114,35 @@ void channel_teardown(void)
});
}
-/// Creates an API channel by starting a job and connecting to its
+/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_job(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeJob);
- incref(channel); // job channels are only closed by the exit_cb
-
- int status;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = channel;
- opts.stdout_cb = job_out;
- opts.stderr_cb = job_err;
- opts.exit_cb = job_exit;
- channel->data.job = job_start(opts, &status);
-
- if (status <= 0) {
- if (status == 0) { // Two decrefs needed if status == 0.
- decref(channel); // Only one needed if status < 0,
- } // because exit_cb will do the second one.
+uint64_t channel_from_process(char **argv)
+{
+ Channel *channel = register_channel(kChannelTypeProc);
+ channel->data.process.uvproc = uv_process_init(channel);
+ Process *proc = &channel->data.process.uvproc.process;
+ proc->argv = argv;
+ proc->in = &channel->data.process.in;
+ proc->out = &channel->data.process.out;
+ proc->err = &channel->data.process.err;
+ proc->cb = process_exit;
+ if (!process_spawn(&loop, proc)) {
+ loop_poll_events(&loop, 0);
decref(channel);
return 0;
}
+ incref(channel); // process channels are only closed by the exit_cb
+ wstream_init(proc->in, 0);
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, parse_msgpack);
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, forward_stderr);
+
return channel->id;
}
@@ -319,24 +324,17 @@ static void channel_from_stdio(void)
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
-static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof)
-{
- Job *job = data;
- parse_msgpack(stream, buf, job_data(job), eof);
-}
-
-static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof)
+static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s",
- ((Channel *)job_data(data))->id, buf);
+ ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
-static void job_exit(Job *job, int status, void *data)
+static void process_exit(Process *proc, int status, void *data)
{
decref(data);
}
@@ -511,8 +509,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
case kChannelTypeSocket:
success = wstream_write(&channel->data.stream, buffer);
break;
- case kChannelTypeJob:
- success = job_write(channel->data.job, buffer);
+ case kChannelTypeProc:
+ success = wstream_write(&channel->data.process.in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@@ -627,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/job and free the channel resources.
+/// Close the channel streams/process and free the channel resources.
static void close_channel(Channel *channel)
{
if (channel->closed) {
@@ -640,9 +638,9 @@ static void close_channel(Channel *channel)
case kChannelTypeSocket:
stream_close(&channel->data.stream, close_cb);
break;
- case kChannelTypeJob:
- if (channel->data.job) {
- job_stop(channel->data.job);
+ case kChannelTypeProc:
+ if (!channel->data.process.uvproc.process.closed) {
+ process_stop(&channel->data.process.uvproc.process);
}
break;
case kChannelTypeStdio: