diff options
Diffstat (limited to 'src/os/job.c')
-rw-r--r-- | src/os/job.c | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/src/os/job.c b/src/os/job.c new file mode 100644 index 0000000000..f8d9d6d576 --- /dev/null +++ b/src/os/job.c @@ -0,0 +1,345 @@ +#include <stdint.h> +#include <stdbool.h> + +#include <uv.h> + +#include "os/job_defs.h" +#include "os/job.h" +#include "os/time.h" +#include "os/shell.h" +#include "vim.h" +#include "memory.h" +#include "term.h" + +#define EXIT_TIMEOUT 25 +#define MAX_RUNNING_JOBS 100 +#define JOB_BUFFER_SIZE 1024 + +/// Possible lock states of the job buffer +typedef enum { + kBufferLockNone = 0, ///< No data was read + kBufferLockStdout, ///< Data read from stdout + kBufferLockStderr ///< Data read from stderr +} BufferLock; + +struct _Job { + // Job id the index in the job table plus one. + int id; + // Number of polls after a SIGTERM that will trigger a SIGKILL + int exit_timeout; + // If the job was already stopped + bool stopped; + // Data associated with the job + void *data; + // Buffer for reading from stdout or stderr + char buffer[JOB_BUFFER_SIZE]; + // Size of the data from the last read + uint32_t length; + // Buffer lock state + BufferLock lock; + // Callback for consuming data from the buffer + job_read_cb read_cb; + // Structures for process spawning/management used by libuv + uv_process_t proc; + uv_process_options_t proc_opts; + uv_stdio_container_t stdio[3]; + uv_pipe_t proc_stdin, proc_stdout, proc_stderr; +}; + +static Job *table[MAX_RUNNING_JOBS] = {NULL}; +static uv_prepare_t job_prepare; + +// Some helpers shared in this module +static bool is_alive(Job *job); +static Job * find_job(int id); +static void free_job(Job *job); + +// Callbacks for libuv +static void job_prepare_cb(uv_prepare_t *handle, int status); +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf); +static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf); +static void write_cb(uv_write_t *req, int status); +static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); + +void job_init() +{ + uv_disable_stdio_inheritance(); + uv_prepare_init(uv_default_loop(), &job_prepare); + uv_prepare_start(&job_prepare, job_prepare_cb); +} + +void job_teardown() +{ + // 20 tries will give processes about 1 sec to exit cleanly + uint32_t remaining_tries = 20; + bool all_dead = true; + int i; + Job *job; + + // Politely ask each job to terminate + for (i = 0; i < MAX_RUNNING_JOBS; i++) { + if ((job = table[i]) != NULL) { + all_dead = false; + uv_process_kill(&job->proc, SIGTERM); + } + } + + if (all_dead) { + return; + } + + os_delay(10, 0); + // Right now any exited process are zombies waiting for us to acknowledge + // their status with `wait` or handling SIGCHLD. libuv does that + // automatically (and then calls `exit_cb`) but we have to give it a chance + // by running the loop one more time + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + + // Prepare to start shooting + for (i = 0; i < MAX_RUNNING_JOBS; i++) { + if ((job = table[i]) == NULL) { + continue; + } + + // Still alive + while (is_alive(job) && remaining_tries--) { + // Since this is the first time we're checking, wait 300ms so + // every job has a chance to exit normally + os_delay(50, 0); + // Acknowledge child exits + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } + + if (is_alive(job)) { + uv_process_kill(&job->proc, SIGKILL); + } + } +} + +int job_start(char **argv, void *data, job_read_cb cb) +{ + int i; + Job *job; + + // Search for a free slot in the table + for (i = 0; i < MAX_RUNNING_JOBS; i++) { + if (table[i] == NULL) { + break; + } + } + + if (i == MAX_RUNNING_JOBS) { + // No free slots + return 0; + } + + job = xmalloc(sizeof(Job)); + // Initialize + job->id = i + 1; + job->data = data; + job->read_cb = cb; + job->stopped = false; + job->exit_timeout = EXIT_TIMEOUT; + job->proc_opts.file = argv[0]; + job->proc_opts.args = argv; + job->proc_opts.stdio = job->stdio; + job->proc_opts.stdio_count = 3; + job->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; + job->proc_opts.exit_cb = exit_cb; + job->proc_opts.cwd = NULL; + job->proc_opts.env = NULL; + + // Initialize the job std{in,out,err} + uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); + job->proc_stdin.data = job; + job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; + + uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); + job->proc_stdout.data = job; + job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; + + uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); + job->proc_stderr.data = job; + job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; + job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; + + // Spawn the job + if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { + free_job(job); + return -1; + } + + // Start the readable streams + uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); + uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); + // Give the callback a reference to the job + job->proc.data = job; + // Save the job to the table + table[i] = job; + + return job->id; +} + +bool job_stop(int id) +{ + Job *job = find_job(id); + + if (job == NULL || job->stopped) { + return false; + } + + uv_read_stop((uv_stream_t *)&job->proc_stdout); + uv_read_stop((uv_stream_t *)&job->proc_stderr); + job->stopped = true; + + return true; +} + +bool job_write(int id, char *data, uint32_t len) +{ + uv_buf_t uvbuf; + uv_write_t *req; + Job *job = find_job(id); + + if (job == NULL || job->stopped) { + free(data); + return false; + } + + req = xmalloc(sizeof(uv_write_t)); + req->data = data; + uvbuf.base = data; + uvbuf.len = len; + uv_write(req, (uv_stream_t *)&job->proc_stdin, &uvbuf, 1, write_cb); + + return true; +} + +void job_handle(Event event) +{ + Job *job = event.data.job; + + // Invoke the job callback + job->read_cb(job->id, + job->data, + job->buffer, + job->length, + job->lock == kBufferLockStdout); + + shell_resized(); + // restart reading + job->lock = kBufferLockNone; + uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); + uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); +} + +static bool is_alive(Job *job) +{ + return uv_process_kill(&job->proc, 0) == 0; +} + +static Job * find_job(int id) +{ + if (id <= 0 || id > MAX_RUNNING_JOBS) { + return NULL; + } + + return table[id - 1]; +} + +static void free_job(Job *job) +{ + uv_close((uv_handle_t *)&job->proc_stdout, NULL); + uv_close((uv_handle_t *)&job->proc_stdin, NULL); + uv_close((uv_handle_t *)&job->proc_stderr, NULL); + uv_close((uv_handle_t *)&job->proc, NULL); + free(job); +} + +/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those +/// that didn't die from SIGTERM after a while(exit_timeout is 0). +static void job_prepare_cb(uv_prepare_t *handle, int status) +{ + Job *job; + int i; + + for (i = 0; i < MAX_RUNNING_JOBS; i++) { + if ((job = table[i]) == NULL || !job->stopped) { + continue; + } + + if ((job->exit_timeout--) == EXIT_TIMEOUT) { + // Job was just stopped, close all stdio handles and send SIGTERM + uv_process_kill(&job->proc, SIGTERM); + } else if (job->exit_timeout == 0) { + // We've waited long enough, send SIGKILL + uv_process_kill(&job->proc, SIGKILL); + } + } +} + +/// Puts the job into a 'reading state' which 'locks' the job buffer +/// until the data is consumed +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ + Job *job = (Job *)handle->data; + + if (job->lock != kBufferLockNone) { + // Already reserved the buffer for reading from stdout or stderr. + buf->len = 0; + return; + } + + buf->base = job->buffer; + buf->len = JOB_BUFFER_SIZE; + // Avoid `alloc_cb`, `alloc_cb` sequences on windows and also mark which + // stream we are reading from + job->lock = + (handle == (uv_handle_t *)&job->proc_stdout) ? + kBufferLockStdout : + kBufferLockStderr; +} + +/// Pushes a event object to the event queue, which will be handled later by +/// `job_handle` +static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) +{ + Event event; + Job *job = (Job *)stream->data; + // pause reading on both streams + uv_read_stop((uv_stream_t *)&job->proc_stdout); + uv_read_stop((uv_stream_t *)&job->proc_stderr); + + if (cnt <= 0) { + if (cnt != UV_ENOBUFS) { + // Assume it's EOF and exit the job. Doesn't harm sending a SIGTERM + // at this point + uv_process_kill(&job->proc, SIGTERM); + } + return; + } + + job->length = cnt; + event.type = kEventJobActivity; + event.data.job = job; + event_push(event); +} + +static void write_cb(uv_write_t *req, int status) +{ + free(req->data); + free(req); +} + +/// Cleanup all the resources associated with the job +static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) +{ + Job *job = proc->data; + + table[job->id - 1] = NULL; + shell_free_argv(job->proc_opts.args); + free_job(job); +} + |