diff options
Diffstat (limited to 'src/nvim/os/job.c')
-rw-r--r-- | src/nvim/os/job.c | 195 |
1 files changed, 41 insertions, 154 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 33429b364f..94bb9067ed 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -6,13 +6,13 @@ #include "nvim/os/uv_helpers.h" #include "nvim/os/job.h" #include "nvim/os/job_defs.h" +#include "nvim/os/job_private.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/wstream.h" #include "nvim/os/wstream_defs.h" #include "nvim/os/event.h" #include "nvim/os/event_defs.h" -#include "nvim/os/shell.h" #include "nvim/os/time.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -29,8 +29,8 @@ if (job->stream) { \ type##stream_free(job->stream); \ job->stream = NULL; \ - if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \ - uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \ + if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) { \ + uv_close((uv_handle_t *)job->proc_std##stream, close_cb); \ } \ } \ } while (0) @@ -39,37 +39,9 @@ #define close_job_out(job) close_job_stream(job, out, r) #define close_job_err(job) close_job_stream(job, err, r) -struct job { - // Job id the index in the job table plus one. - int id; - // Exit status code of the job process - int status; - // Number of references to the job. The job resources will only be freed by - // close_cb when this is 0 - int refcount; - // Time when job_stop was called for the job. - uint64_t stopped_time; - // If SIGTERM was already sent to the job(only send one before SIGKILL) - bool term_sent; - // Data associated with the job - void *data; - // Callbacks - job_exit_cb exit_cb; - rstream_cb stdout_cb, stderr_cb; - // Readable streams(std{out,err}) - RStream *out, *err; - // Writable stream(stdin) - WStream *in; - // Structures for process spawning/management used by libuv - uv_process_t proc; - uv_process_options_t proc_opts; - uv_stdio_container_t stdio[3]; - uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -}; - -static Job *table[MAX_RUNNING_JOBS] = {NULL}; +Job *table[MAX_RUNNING_JOBS] = {NULL}; size_t stop_requests = 0; -static uv_timer_t job_stop_timer; +uv_timer_t job_stop_timer; // Some helpers shared in this module @@ -106,29 +78,10 @@ void job_teardown(void) /// Tries to start a new job. /// -/// @param argv Argument vector for the process. The first item is the -/// executable to run. -/// [consumed] -/// @param data Caller data that will be associated with the job -/// @param writable If true the job stdin will be available for writing with -/// job_write, otherwise it will be redirected to /dev/null -/// @param stdout_cb Callback that will be invoked when data is available -/// on stdout. If NULL stdout will be redirected to /dev/null. -/// @param stderr_cb Callback that will be invoked when data is available -/// on stderr. If NULL stderr will be redirected to /dev/null. -/// @param job_exit_cb Callback that will be invoked when the job exits -/// @param maxmem Maximum amount of memory used by the job WStream /// @param[out] status The job id if the job started successfully, 0 if the job /// table is full, -1 if the program could not be executed. /// @return The job pointer if the job started successfully, NULL otherwise -Job *job_start(char **argv, - void *data, - bool writable, - rstream_cb stdout_cb, - rstream_cb stderr_cb, - job_exit_cb job_exit_cb, - size_t maxmem, - int *status) +Job *job_start(JobOptions opts, int *status) { int i; Job *job; @@ -142,7 +95,7 @@ Job *job_start(char **argv, if (i == MAX_RUNNING_JOBS) { // No free slots - shell_free_argv(argv); + shell_free_argv(opts.argv); *status = 0; return NULL; } @@ -153,92 +106,64 @@ Job *job_start(char **argv, *status = job->id; job->status = -1; job->refcount = 1; - job->data = data; - job->stdout_cb = stdout_cb; - job->stderr_cb = stderr_cb; - job->exit_cb = job_exit_cb; job->stopped_time = 0; job->term_sent = false; - job->proc_opts.file = argv[0]; - job->proc_opts.args = argv; - job->proc_opts.stdio = job->stdio; - job->proc_opts.stdio_count = 3; - job->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; - job->proc_opts.exit_cb = exit_cb; - job->proc_opts.cwd = NULL; - job->proc_opts.env = NULL; - job->proc.data = NULL; - job->proc_stdin.data = NULL; - job->proc_stdout.data = NULL; - job->proc_stderr.data = NULL; job->in = NULL; job->out = NULL; job->err = NULL; + job->opts = opts; + job->closed = false; - // Initialize the job std{in,out,err} - job->stdio[0].flags = UV_IGNORE; - job->stdio[1].flags = UV_IGNORE; - job->stdio[2].flags = UV_IGNORE; + process_init(job); - if (writable) { - uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); - job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; - handle_set_job((uv_handle_t *)&job->proc_stdin, job); + if (opts.writable) { + handle_set_job((uv_handle_t *)job->proc_stdin, job); job->refcount++; } - if (stdout_cb) { - uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); - job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; - handle_set_job((uv_handle_t *)&job->proc_stdout, job); + if (opts.stdout_cb) { + handle_set_job((uv_handle_t *)job->proc_stdout, job); job->refcount++; } - if (stderr_cb) { - uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); - job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; - handle_set_job((uv_handle_t *)&job->proc_stderr, job); + if (opts.stderr_cb) { + handle_set_job((uv_handle_t *)job->proc_stderr, job); job->refcount++; } - handle_set_job((uv_handle_t *)&job->proc, job); - // Spawn the job - if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { - if (writable) { + if (!process_spawn(job)) { + if (opts.writable) { uv_close((uv_handle_t *)&job->proc_stdin, close_cb); } - if (stdout_cb) { + if (opts.stdout_cb) { uv_close((uv_handle_t *)&job->proc_stdout, close_cb); } - if (stderr_cb) { + if (opts.stderr_cb) { uv_close((uv_handle_t *)&job->proc_stderr, close_cb); } - uv_close((uv_handle_t *)&job->proc, close_cb); + process_close(job); event_poll(0); // Manually invoke the close_cb to free the job resources *status = -1; return NULL; } - if (writable) { - job->in = wstream_new(maxmem); - wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); + if (opts.writable) { + job->in = wstream_new(opts.maxmem); + wstream_set_stream(job->in, job->proc_stdin); } // Start the readable streams - if (stdout_cb) { + if (opts.stdout_cb) { job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); + rstream_set_stream(job->out, job->proc_stdout); rstream_start(job->out); } - if (stderr_cb) { + if (opts.stderr_cb) { job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); - rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); + rstream_set_stream(job->err, job->proc_stderr); rstream_start(job->err); } // Save the job to the table @@ -327,7 +252,8 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL // Job exited, collect status and manually invoke close_cb to free the job // resources status = job->status; - close_cb((uv_handle_t *)&job->proc); + job_close_streams(job); + job_decref(job); } else { job->refcount--; } @@ -391,25 +317,7 @@ int job_id(Job *job) /// @return The job data void *job_data(Job *job) { - return job->data; -} - -static void job_exit_callback(Job *job) -{ - // Free the slot now, 'exit_cb' may want to start another job to replace - // this one - table[job->id - 1] = NULL; - - if (job->exit_cb) { - // Invoke the exit callback - job->exit_cb(job, job->data); - } - - if (stop_requests && !--stop_requests) { - // Stop the timer if no more stop requests are pending - DLOG("Stopping job kill timer"); - uv_timer_stop(&job_stop_timer); - } + return job->opts.data; } /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those @@ -428,11 +336,12 @@ static void job_stop_timer_cb(uv_timer_t *handle) if (!job->term_sent && elapsed >= TERM_TIMEOUT) { ILOG("Sending SIGTERM to job(id: %d)", job->id); - uv_process_kill(&job->proc, SIGTERM); + uv_kill(job->pid, SIGTERM); job->term_sent = true; } else if (elapsed >= KILL_TIMEOUT) { ILOG("Sending SIGKILL to job(id: %d)", job->id); - uv_process_kill(&job->proc, SIGKILL); + uv_kill(job->pid, SIGKILL); + process_close(job); } } } @@ -443,48 +352,26 @@ static void read_cb(RStream *rstream, void *data, bool eof) Job *job = data; if (rstream == job->out) { - job->stdout_cb(rstream, data, eof); + job->opts.stdout_cb(rstream, data, eof); if (eof) { close_job_out(job); } } else { - job->stderr_cb(rstream, data, eof); + job->opts.stderr_cb(rstream, data, eof); if (eof) { close_job_err(job); } } } -// Emits a JobExit event if both rstreams are closed -static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) +void job_close_streams(Job *job) { - Job *job = handle_get_job((uv_handle_t *)proc); - - job->status = (int)status; - uv_close((uv_handle_t *)&job->proc, close_cb); + close_job_in(job); + close_job_out(job); + close_job_err(job); } static void close_cb(uv_handle_t *handle) { - Job *job = handle_get_job(handle); - - if (handle == (uv_handle_t *)&job->proc) { - // Make sure all streams are properly closed to trigger callback invocation - // when job->proc is closed - close_job_in(job); - close_job_out(job); - close_job_err(job); - } - - if (--job->refcount == 0) { - // Invoke the exit_cb - job_exit_callback(job); - // Free all memory allocated for the job - free(job->proc.data); - free(job->proc_stdin.data); - free(job->proc_stdout.data); - free(job->proc_stderr.data); - shell_free_argv(job->proc_opts.args); - free(job); - } + job_decref(handle_get_job(handle)); } |