aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/job.c')
-rw-r--r--src/nvim/os/job.c182
1 files changed, 120 insertions, 62 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index f8ad6874c9..8c744b0479 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -13,10 +13,14 @@
#include "nvim/os/event.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/shell.h"
+#include "nvim/os/time.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
-#define EXIT_TIMEOUT 25
+// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit
+// before we send SIGNAL to it
+#define TERM_TIMEOUT 1000000000
+#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 0xFFFF
@@ -39,14 +43,14 @@ struct job {
// Job id the index in the job table plus one.
int id;
// Exit status code of the job process
- int64_t status;
- // Number of polls after a SIGTERM that will trigger a SIGKILL
- int exit_timeout;
+ int status;
// Number of references to the job. The job resources will only be freed by
// close_cb when this is 0
int refcount;
- // If the job was already stopped
- bool stopped;
+ // Time when job_stop was called for the job.
+ uint64_t stopped_time;
+ // If SIGTERM was already sent to the job(only send one before SIGKILL)
+ bool term_sent;
// Data associated with the job
void *data;
// Callbacks
@@ -64,8 +68,8 @@ struct job {
};
static Job *table[MAX_RUNNING_JOBS] = {NULL};
-static uint32_t job_count = 0;
-static uv_prepare_t job_prepare;
+size_t stop_requests = 0;
+static uv_timer_t job_stop_timer;
// Some helpers shared in this module
@@ -78,7 +82,7 @@ static uv_prepare_t job_prepare;
void job_init(void)
{
uv_disable_stdio_inheritance();
- uv_prepare_init(uv_default_loop(), &job_prepare);
+ uv_timer_init(uv_default_loop(), &job_stop_timer);
}
/// Releases job control resources and terminates running jobs
@@ -136,10 +140,12 @@ void job_teardown(void)
/// @param argv Argument vector for the process. The first item is the
/// executable to run.
/// @param data Caller data that will be associated with the job
+/// @param writable If true the job stdin will be available for writing with
+/// job_write, otherwise it will be redirected to /dev/null
/// @param stdout_cb Callback that will be invoked when data is available
-/// on stdout
+/// on stdout. If NULL stdout will be redirected to /dev/null.
/// @param stderr_cb Callback that will be invoked when data is available
-/// on stderr
+/// on stderr. If NULL stderr will be redirected to /dev/null.
/// @param job_exit_cb Callback that will be invoked when the job exits
/// @param maxmem Maximum amount of memory used by the job WStream
/// @param[out] status The job id if the job started successfully, 0 if the job
@@ -147,6 +153,7 @@ void job_teardown(void)
/// @return The job pointer if the job started successfully, NULL otherwise
Job *job_start(char **argv,
void *data,
+ bool writable,
rstream_cb stdout_cb,
rstream_cb stderr_cb,
job_exit_cb job_exit_cb,
@@ -174,13 +181,13 @@ Job *job_start(char **argv,
job->id = i + 1;
*status = job->id;
job->status = -1;
- job->refcount = 4;
+ job->refcount = 1;
job->data = data;
job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb;
job->exit_cb = job_exit_cb;
- job->stopped = false;
- job->exit_timeout = EXIT_TIMEOUT;
+ job->stopped_time = 0;
+ job->term_sent = false;
job->proc_opts.file = argv[0];
job->proc_opts.args = argv;
job->proc_opts.stdio = job->stdio;
@@ -193,49 +200,78 @@ Job *job_start(char **argv,
job->proc_stdin.data = NULL;
job->proc_stdout.data = NULL;
job->proc_stderr.data = NULL;
+ job->in = NULL;
+ job->out = NULL;
+ job->err = NULL;
// Initialize the job std{in,out,err}
- uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
- job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
- job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
+ job->stdio[0].flags = UV_IGNORE;
+ job->stdio[1].flags = UV_IGNORE;
+ job->stdio[2].flags = UV_IGNORE;
+
+ if (writable) {
+ uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
+ job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
+ job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
+ handle_set_job((uv_handle_t *)&job->proc_stdin, job);
+ job->refcount++;
+ }
- uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
- job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
+ if (stdout_cb) {
+ uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
+ job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
+ job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
+ handle_set_job((uv_handle_t *)&job->proc_stdout, job);
+ job->refcount++;
+ }
- uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
- job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
+ if (stderr_cb) {
+ uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
+ job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
+ job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
+ handle_set_job((uv_handle_t *)&job->proc_stderr, job);
+ job->refcount++;
+ }
- // Give all handles a reference to the job
handle_set_job((uv_handle_t *)&job->proc, job);
- handle_set_job((uv_handle_t *)&job->proc_stdin, job);
- handle_set_job((uv_handle_t *)&job->proc_stdout, job);
- handle_set_job((uv_handle_t *)&job->proc_stderr, job);
// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
+ if (writable) {
+ uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
+ }
+ if (stdout_cb) {
+ uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
+ }
+ if (stderr_cb) {
+ uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
+ }
+ uv_close((uv_handle_t *)&job->proc, close_cb);
+ event_poll(0);
+ // Manually invoke the close_cb to free the job resources
*status = -1;
return NULL;
}
- job->in = wstream_new(maxmem);
- wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
+ if (writable) {
+ job->in = wstream_new(maxmem);
+ wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
+ }
+
// Start the readable streams
- job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
- job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
- rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
- rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
- rstream_start(job->out);
- rstream_start(job->err);
- // Save the job to the table
- table[i] = job;
+ if (stdout_cb) {
+ job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
+ rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
+ rstream_start(job->out);
+ }
- // Start polling job status if this is the first
- if (job_count == 0) {
- uv_prepare_start(&job_prepare, job_prepare_cb);
+ if (stderr_cb) {
+ job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
+ rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
+ rstream_start(job->err);
}
- job_count++;
+ // Save the job to the table
+ table[i] = job;
return job;
}
@@ -249,7 +285,7 @@ Job *job_find(int id)
Job *job;
if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
- || job->stopped) {
+ || job->stopped_time) {
return NULL;
}
@@ -262,7 +298,22 @@ Job *job_find(int id)
/// @param job The Job instance
void job_stop(Job *job)
{
- job->stopped = true;
+ if (job->stopped_time) {
+ return;
+ }
+
+ job->stopped_time = os_hrtime();
+ // Close the standard streams of the job
+ close_job_in(job);
+ close_job_out(job);
+ close_job_err(job);
+
+ if (!stop_requests++) {
+ // When there's at least one stop request pending, start a timer that
+ // will periodically check if a signal should be send to a to the job
+ DLOG("Starting job kill timer");
+ uv_timer_start(&job_stop_timer, job_stop_timer_cb, 100, 100);
+ }
}
/// job_wait - synchronously wait for a job to finish
@@ -276,6 +327,9 @@ void job_stop(Job *job)
/// is possible on some OS.
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
{
+ // The default status is -1, which represents a timeout
+ int status = -1;
+
// Increase refcount to stop the job from being freed before we have a
// chance to get the status.
job->refcount++;
@@ -291,15 +345,16 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
event_poll(0);
}
- if (!--job->refcount) {
- int status = (int) job->status;
- // Manually invoke close_cb to free the job resources
+ if (job->refcount == 1) {
+ // Job exited, collect status and manually invoke close_cb to free the job
+ // resources
+ status = job->status;
close_cb((uv_handle_t *)&job->proc);
- return status;
+ } else {
+ job->refcount--;
}
- // return -1 for a timeout
- return -1;
+ return status;
}
/// Close the pipe used to write to the job.
@@ -372,10 +427,10 @@ static void job_exit_callback(Job *job)
job->exit_cb(job, job->data);
}
- // Stop polling job status if this was the last
- job_count--;
- if (job_count == 0) {
- uv_prepare_stop(&job_prepare);
+ if (!--stop_requests) {
+ // Stop the timer if no more stop requests are pending
+ DLOG("Stopping job kill timer");
+ uv_timer_stop(&job_stop_timer);
}
}
@@ -386,21 +441,24 @@ static bool is_alive(Job *job)
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
-static void job_prepare_cb(uv_prepare_t *handle)
+static void job_stop_timer_cb(uv_timer_t *handle)
{
Job *job;
- int i;
+ uint64_t now = os_hrtime();
- for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL || !job->stopped) {
+ for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) {
+ if ((job = table[i]) == NULL || !job->stopped_time) {
continue;
}
- if ((job->exit_timeout--) == EXIT_TIMEOUT) {
- // Job was just stopped, close all stdio handles and send SIGTERM
+ uint64_t elapsed = now - job->stopped_time;
+
+ if (!job->term_sent && elapsed >= TERM_TIMEOUT) {
+ ILOG("Sending SIGTERM to job(id: %d)", job->id);
uv_process_kill(&job->proc, SIGTERM);
- } else if (job->exit_timeout == 0) {
- // We've waited long enough, send SIGKILL
+ job->term_sent = true;
+ } else if (elapsed >= KILL_TIMEOUT) {
+ ILOG("Sending SIGKILL to job(id: %d)", job->id);
uv_process_kill(&job->proc, SIGKILL);
}
}
@@ -429,7 +487,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
Job *job = handle_get_job((uv_handle_t *)proc);
- job->status = status;
+ job->status = (int)status;
uv_close((uv_handle_t *)&job->proc, close_cb);
}