diff options
Diffstat (limited to 'src/nvim/os/job.c')
-rw-r--r-- | src/nvim/os/job.c | 182 |
1 files changed, 120 insertions, 62 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index f8ad6874c9..8c744b0479 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -13,10 +13,14 @@ #include "nvim/os/event.h" #include "nvim/os/event_defs.h" #include "nvim/os/shell.h" +#include "nvim/os/time.h" #include "nvim/vim.h" #include "nvim/memory.h" -#define EXIT_TIMEOUT 25 +// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit +// before we send SIGNAL to it +#define TERM_TIMEOUT 1000000000 +#define KILL_TIMEOUT (TERM_TIMEOUT * 2) #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 0xFFFF @@ -39,14 +43,14 @@ struct job { // Job id the index in the job table plus one. int id; // Exit status code of the job process - int64_t status; - // Number of polls after a SIGTERM that will trigger a SIGKILL - int exit_timeout; + int status; // Number of references to the job. The job resources will only be freed by // close_cb when this is 0 int refcount; - // If the job was already stopped - bool stopped; + // Time when job_stop was called for the job. + uint64_t stopped_time; + // If SIGTERM was already sent to the job(only send one before SIGKILL) + bool term_sent; // Data associated with the job void *data; // Callbacks @@ -64,8 +68,8 @@ struct job { }; static Job *table[MAX_RUNNING_JOBS] = {NULL}; -static uint32_t job_count = 0; -static uv_prepare_t job_prepare; +size_t stop_requests = 0; +static uv_timer_t job_stop_timer; // Some helpers shared in this module @@ -78,7 +82,7 @@ static uv_prepare_t job_prepare; void job_init(void) { uv_disable_stdio_inheritance(); - uv_prepare_init(uv_default_loop(), &job_prepare); + uv_timer_init(uv_default_loop(), &job_stop_timer); } /// Releases job control resources and terminates running jobs @@ -136,10 +140,12 @@ void job_teardown(void) /// @param argv Argument vector for the process. The first item is the /// executable to run. /// @param data Caller data that will be associated with the job +/// @param writable If true the job stdin will be available for writing with +/// job_write, otherwise it will be redirected to /dev/null /// @param stdout_cb Callback that will be invoked when data is available -/// on stdout +/// on stdout. If NULL stdout will be redirected to /dev/null. /// @param stderr_cb Callback that will be invoked when data is available -/// on stderr +/// on stderr. If NULL stderr will be redirected to /dev/null. /// @param job_exit_cb Callback that will be invoked when the job exits /// @param maxmem Maximum amount of memory used by the job WStream /// @param[out] status The job id if the job started successfully, 0 if the job @@ -147,6 +153,7 @@ void job_teardown(void) /// @return The job pointer if the job started successfully, NULL otherwise Job *job_start(char **argv, void *data, + bool writable, rstream_cb stdout_cb, rstream_cb stderr_cb, job_exit_cb job_exit_cb, @@ -174,13 +181,13 @@ Job *job_start(char **argv, job->id = i + 1; *status = job->id; job->status = -1; - job->refcount = 4; + job->refcount = 1; job->data = data; job->stdout_cb = stdout_cb; job->stderr_cb = stderr_cb; job->exit_cb = job_exit_cb; - job->stopped = false; - job->exit_timeout = EXIT_TIMEOUT; + job->stopped_time = 0; + job->term_sent = false; job->proc_opts.file = argv[0]; job->proc_opts.args = argv; job->proc_opts.stdio = job->stdio; @@ -193,49 +200,78 @@ Job *job_start(char **argv, job->proc_stdin.data = NULL; job->proc_stdout.data = NULL; job->proc_stderr.data = NULL; + job->in = NULL; + job->out = NULL; + job->err = NULL; // Initialize the job std{in,out,err} - uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); - job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; + job->stdio[0].flags = UV_IGNORE; + job->stdio[1].flags = UV_IGNORE; + job->stdio[2].flags = UV_IGNORE; + + if (writable) { + uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); + job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; + handle_set_job((uv_handle_t *)&job->proc_stdin, job); + job->refcount++; + } - uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); - job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; + if (stdout_cb) { + uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); + job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; + handle_set_job((uv_handle_t *)&job->proc_stdout, job); + job->refcount++; + } - uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); - job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; + if (stderr_cb) { + uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); + job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; + handle_set_job((uv_handle_t *)&job->proc_stderr, job); + job->refcount++; + } - // Give all handles a reference to the job handle_set_job((uv_handle_t *)&job->proc, job); - handle_set_job((uv_handle_t *)&job->proc_stdin, job); - handle_set_job((uv_handle_t *)&job->proc_stdout, job); - handle_set_job((uv_handle_t *)&job->proc_stderr, job); // Spawn the job if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { + if (writable) { + uv_close((uv_handle_t *)&job->proc_stdin, close_cb); + } + if (stdout_cb) { + uv_close((uv_handle_t *)&job->proc_stdout, close_cb); + } + if (stderr_cb) { + uv_close((uv_handle_t *)&job->proc_stderr, close_cb); + } + uv_close((uv_handle_t *)&job->proc, close_cb); + event_poll(0); + // Manually invoke the close_cb to free the job resources *status = -1; return NULL; } - job->in = wstream_new(maxmem); - wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); + if (writable) { + job->in = wstream_new(maxmem); + wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); + } + // Start the readable streams - job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); - rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); - rstream_start(job->out); - rstream_start(job->err); - // Save the job to the table - table[i] = job; + if (stdout_cb) { + job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); + rstream_start(job->out); + } - // Start polling job status if this is the first - if (job_count == 0) { - uv_prepare_start(&job_prepare, job_prepare_cb); + if (stderr_cb) { + job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); + rstream_start(job->err); } - job_count++; + // Save the job to the table + table[i] = job; return job; } @@ -249,7 +285,7 @@ Job *job_find(int id) Job *job; if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1]) - || job->stopped) { + || job->stopped_time) { return NULL; } @@ -262,7 +298,22 @@ Job *job_find(int id) /// @param job The Job instance void job_stop(Job *job) { - job->stopped = true; + if (job->stopped_time) { + return; + } + + job->stopped_time = os_hrtime(); + // Close the standard streams of the job + close_job_in(job); + close_job_out(job); + close_job_err(job); + + if (!stop_requests++) { + // When there's at least one stop request pending, start a timer that + // will periodically check if a signal should be send to a to the job + DLOG("Starting job kill timer"); + uv_timer_start(&job_stop_timer, job_stop_timer_cb, 100, 100); + } } /// job_wait - synchronously wait for a job to finish @@ -276,6 +327,9 @@ void job_stop(Job *job) /// is possible on some OS. int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL { + // The default status is -1, which represents a timeout + int status = -1; + // Increase refcount to stop the job from being freed before we have a // chance to get the status. job->refcount++; @@ -291,15 +345,16 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL event_poll(0); } - if (!--job->refcount) { - int status = (int) job->status; - // Manually invoke close_cb to free the job resources + if (job->refcount == 1) { + // Job exited, collect status and manually invoke close_cb to free the job + // resources + status = job->status; close_cb((uv_handle_t *)&job->proc); - return status; + } else { + job->refcount--; } - // return -1 for a timeout - return -1; + return status; } /// Close the pipe used to write to the job. @@ -372,10 +427,10 @@ static void job_exit_callback(Job *job) job->exit_cb(job, job->data); } - // Stop polling job status if this was the last - job_count--; - if (job_count == 0) { - uv_prepare_stop(&job_prepare); + if (!--stop_requests) { + // Stop the timer if no more stop requests are pending + DLOG("Stopping job kill timer"); + uv_timer_stop(&job_stop_timer); } } @@ -386,21 +441,24 @@ static bool is_alive(Job *job) /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// that didn't die from SIGTERM after a while(exit_timeout is 0). -static void job_prepare_cb(uv_prepare_t *handle) +static void job_stop_timer_cb(uv_timer_t *handle) { Job *job; - int i; + uint64_t now = os_hrtime(); - for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL || !job->stopped) { + for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) { + if ((job = table[i]) == NULL || !job->stopped_time) { continue; } - if ((job->exit_timeout--) == EXIT_TIMEOUT) { - // Job was just stopped, close all stdio handles and send SIGTERM + uint64_t elapsed = now - job->stopped_time; + + if (!job->term_sent && elapsed >= TERM_TIMEOUT) { + ILOG("Sending SIGTERM to job(id: %d)", job->id); uv_process_kill(&job->proc, SIGTERM); - } else if (job->exit_timeout == 0) { - // We've waited long enough, send SIGKILL + job->term_sent = true; + } else if (elapsed >= KILL_TIMEOUT) { + ILOG("Sending SIGKILL to job(id: %d)", job->id); uv_process_kill(&job->proc, SIGKILL); } } @@ -429,7 +487,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { Job *job = handle_get_job((uv_handle_t *)proc); - job->status = status; + job->status = (int)status; uv_close((uv_handle_t *)&job->proc, close_cb); } |