aboutsummaryrefslogtreecommitdiff
path: root/src/os/job.c
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-04-17 11:59:50 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-04-18 16:11:59 -0300
commit9acb9607134a461fc342f29a098b83b1bad7134d (patch)
tree155980a8550ca2e4243e4612129a2cdd3fad499a /src/os/job.c
parent350144f5113e111fea0d5b33589d6d478280f298 (diff)
downloadrneovim-9acb9607134a461fc342f29a098b83b1bad7134d.tar.gz
rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.tar.bz2
rneovim-9acb9607134a461fc342f29a098b83b1bad7134d.zip
Refactor job control to use RStream events
Instead of a single 'job read' callback, job control consumers need to provide callbacks for "stdout read", "stderr read" and "exit". For vimscript, the JobActivity autocommand is still used to handle every job event, for example: ```vim :let srv1_id = jobstart('netcat-server-1', 'nc', ['-l', '9991']) :let srv2_id = jobstart('netcat-server-2', 'nc', ['-l', '9991']) function JobEvent() " v:job_data[0] = the job id " v:job_data[1] = the event type, one of "stdout", "stderr" or "exit" " v:job_data[2] = data read from stdout or stderr if v:job_data[1] == 'stdout' let str = 'Message from job '.v:job_data[0].': '.v:job_data[2] elseif v:job_data[1] == 'stderr' let str = 'Error message from job '.v:job_data[0].': '.v:job_data[2] else " Exit let str = 'Job '.v:job_data[0].' exited' endif call append(line('$'), str) endfunction au JobActivity netcat-server-* call JobEvent() ``` And to see messages from 'job 1', run in another terminal: ```sh bash -c "while true; do echo 123; sleep 1; done" | nc localhost 9991 ```
Diffstat (limited to 'src/os/job.c')
-rw-r--r--src/os/job.c103
1 files changed, 67 insertions, 36 deletions
diff --git a/src/os/job.c b/src/os/job.c
index 5bd404e5be..f4cbbfb670 100644
--- a/src/os/job.c
+++ b/src/os/job.c
@@ -7,6 +7,8 @@
#include "os/job_defs.h"
#include "os/rstream.h"
#include "os/rstream_defs.h"
+#include "os/event.h"
+#include "os/event_defs.h"
#include "os/time.h"
#include "os/shell.h"
#include "vim.h"
@@ -22,12 +24,17 @@ struct job {
int id;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
+ // exit_cb may be called while there's still pending data from stdout/stderr.
+ // We use this reference count to ensure the JobExit event is only emitted
+ // when stdout/stderr are drained
+ int pending_refs;
// If the job was already stopped
bool stopped;
// Data associated with the job
void *data;
- // Callback for consuming data from the buffer
- job_read_cb read_cb;
+ // Callbacks
+ job_exit_cb exit_cb;
+ rstream_cb stdout_cb, stderr_cb;
// Readable streams
RStream *out, *err;
// Structures for process spawning/management used by libuv
@@ -40,7 +47,6 @@ struct job {
static Job *table[MAX_RUNNING_JOBS] = {NULL};
static uv_prepare_t job_prepare;
-static void read_cb(RStream *rstream, void *data, bool eof);
// Some helpers shared in this module
static bool is_alive(Job *job);
static Job * find_job(int id);
@@ -48,9 +54,10 @@ static void free_job(Job *job);
// Callbacks for libuv
static void job_prepare_cb(uv_prepare_t *handle);
-// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
static void write_cb(uv_write_t *req, int status);
+static void read_cb(RStream *rstream, void *data, bool eof);
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal);
+static void emit_exit_event(Job *job);
void job_init()
{
@@ -105,7 +112,11 @@ void job_teardown()
}
}
-int job_start(char **argv, void *data, job_read_cb cb)
+int job_start(char **argv,
+ void *data,
+ rstream_cb stdout_cb,
+ rstream_cb stderr_cb,
+ job_exit_cb job_exit_cb)
{
int i;
Job *job;
@@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb)
job = xmalloc(sizeof(Job));
// Initialize
job->id = i + 1;
+ job->pending_refs = 3;
job->data = data;
- job->read_cb = cb;
+ job->stdout_cb = stdout_cb;
+ job->stderr_cb = stderr_cb;
+ job->exit_cb = job_exit_cb;
job->stopped = false;
job->exit_timeout = EXIT_TIMEOUT;
job->proc_opts.file = argv[0];
@@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb)
}
// Start the readable streams
- job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
- job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len)
return true;
}
-void job_handle(Event event)
+void job_exit_event(Event event)
{
- Job *job = event.data.job.ptr;
+ Job *job = event.data.job;
+
+ // Invoke the exit callback
+ job->exit_cb(job, job->data);
+
+ // Free the job resources
+ table[job->id - 1] = NULL;
+ shell_free_argv(job->proc_opts.args);
+ free_job(job);
+}
- // Invoke the job callback
- job->read_cb(job->id,
- job->data,
- event.data.job.target,
- event.data.job.from_stdout);
+int job_id(Job *job)
+{
+ return job->id;
+}
+
+void *job_data(Job *job)
+{
+ return job->data;
}
static bool is_alive(Job *job)
@@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle)
}
}
-/// Pushes a event object to the event queue, which will be handled later by
-/// `job_handle`
+static void write_cb(uv_write_t *req, int status)
+{
+ free(req->data);
+ free(req);
+}
+
+// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
static void read_cb(RStream *rstream, void *data, bool eof)
{
- Event event;
Job *job = data;
- if (eof) {
- uv_process_kill(&job->proc, SIGTERM);
- return;
+ if (rstream == job->out) {
+ job->stdout_cb(rstream, data, eof);
+ } else {
+ job->stderr_cb(rstream, data, eof);
}
- event.type = kEventJobActivity;
- event.data.job.ptr = job;
- event.data.job.target = rstream;
- event.data.job.from_stdout = rstream == job->out;
- event_push(event);
-}
-
-static void write_cb(uv_write_t *req, int status)
-{
- free(req->data);
- free(req);
+ if (eof && --job->pending_refs == 0) {
+ emit_exit_event(job);
+ }
}
-/// Cleanup all the resources associated with the job
+// Emits a JobExit event if both rstreams are closed
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
Job *job = proc->data;
- table[job->id - 1] = NULL;
- shell_free_argv(job->proc_opts.args);
- free_job(job);
+ if (--job->pending_refs == 0) {
+ emit_exit_event(job);
+ }
+}
+
+static void emit_exit_event(Job *job)
+{
+ Event event;
+ event.type = kEventJobExit;
+ event.data.job = job;
+ event_push(event);
}