aboutsummaryrefslogtreecommitdiff
path: root/src/os/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/os/job.c')
-rw-r--r--src/os/job.c103
1 files changed, 67 insertions, 36 deletions
diff --git a/src/os/job.c b/src/os/job.c
index 5bd404e5be..f4cbbfb670 100644
--- a/src/os/job.c
+++ b/src/os/job.c
@@ -7,6 +7,8 @@
#include "os/job_defs.h"
#include "os/rstream.h"
#include "os/rstream_defs.h"
+#include "os/event.h"
+#include "os/event_defs.h"
#include "os/time.h"
#include "os/shell.h"
#include "vim.h"
@@ -22,12 +24,17 @@ struct job {
int id;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
+ // exit_cb may be called while there's still pending data from stdout/stderr.
+ // We use this reference count to ensure the JobExit event is only emitted
+ // when stdout/stderr are drained
+ int pending_refs;
// If the job was already stopped
bool stopped;
// Data associated with the job
void *data;
- // Callback for consuming data from the buffer
- job_read_cb read_cb;
+ // Callbacks
+ job_exit_cb exit_cb;
+ rstream_cb stdout_cb, stderr_cb;
// Readable streams
RStream *out, *err;
// Structures for process spawning/management used by libuv
@@ -40,7 +47,6 @@ struct job {
static Job *table[MAX_RUNNING_JOBS] = {NULL};
static uv_prepare_t job_prepare;
-static void read_cb(RStream *rstream, void *data, bool eof);
// Some helpers shared in this module
static bool is_alive(Job *job);
static Job * find_job(int id);
@@ -48,9 +54,10 @@ static void free_job(Job *job);
// Callbacks for libuv
static void job_prepare_cb(uv_prepare_t *handle);
-// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
static void write_cb(uv_write_t *req, int status);
+static void read_cb(RStream *rstream, void *data, bool eof);
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal);
+static void emit_exit_event(Job *job);
void job_init()
{
@@ -105,7 +112,11 @@ void job_teardown()
}
}
-int job_start(char **argv, void *data, job_read_cb cb)
+int job_start(char **argv,
+ void *data,
+ rstream_cb stdout_cb,
+ rstream_cb stderr_cb,
+ job_exit_cb job_exit_cb)
{
int i;
Job *job;
@@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb)
job = xmalloc(sizeof(Job));
// Initialize
job->id = i + 1;
+ job->pending_refs = 3;
job->data = data;
- job->read_cb = cb;
+ job->stdout_cb = stdout_cb;
+ job->stderr_cb = stderr_cb;
+ job->exit_cb = job_exit_cb;
job->stopped = false;
job->exit_timeout = EXIT_TIMEOUT;
job->proc_opts.file = argv[0];
@@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb)
}
// Start the readable streams
- job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
- job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len)
return true;
}
-void job_handle(Event event)
+void job_exit_event(Event event)
{
- Job *job = event.data.job.ptr;
+ Job *job = event.data.job;
+
+ // Invoke the exit callback
+ job->exit_cb(job, job->data);
+
+ // Free the job resources
+ table[job->id - 1] = NULL;
+ shell_free_argv(job->proc_opts.args);
+ free_job(job);
+}
- // Invoke the job callback
- job->read_cb(job->id,
- job->data,
- event.data.job.target,
- event.data.job.from_stdout);
+int job_id(Job *job)
+{
+ return job->id;
+}
+
+void *job_data(Job *job)
+{
+ return job->data;
}
static bool is_alive(Job *job)
@@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle)
}
}
-/// Pushes a event object to the event queue, which will be handled later by
-/// `job_handle`
+static void write_cb(uv_write_t *req, int status)
+{
+ free(req->data);
+ free(req);
+}
+
+// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
static void read_cb(RStream *rstream, void *data, bool eof)
{
- Event event;
Job *job = data;
- if (eof) {
- uv_process_kill(&job->proc, SIGTERM);
- return;
+ if (rstream == job->out) {
+ job->stdout_cb(rstream, data, eof);
+ } else {
+ job->stderr_cb(rstream, data, eof);
}
- event.type = kEventJobActivity;
- event.data.job.ptr = job;
- event.data.job.target = rstream;
- event.data.job.from_stdout = rstream == job->out;
- event_push(event);
-}
-
-static void write_cb(uv_write_t *req, int status)
-{
- free(req->data);
- free(req);
+ if (eof && --job->pending_refs == 0) {
+ emit_exit_event(job);
+ }
}
-/// Cleanup all the resources associated with the job
+// Emits a JobExit event if both rstreams are closed
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
Job *job = proc->data;
- table[job->id - 1] = NULL;
- shell_free_argv(job->proc_opts.args);
- free_job(job);
+ if (--job->pending_refs == 0) {
+ emit_exit_event(job);
+ }
+}
+
+static void emit_exit_event(Job *job)
+{
+ Event event;
+ event.type = kEventJobExit;
+ event.data.job = job;
+ event_push(event);
}