diff options
Diffstat (limited to 'src/nvim/os/job.c')
| -rw-r--r-- | src/nvim/os/job.c | 204 |
1 files changed, 71 insertions, 133 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 2ca1023290..f8ad6874c9 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -12,17 +12,29 @@ #include "nvim/os/wstream_defs.h" #include "nvim/os/event.h" #include "nvim/os/event_defs.h" -#include "nvim/os/time.h" #include "nvim/os/shell.h" -#include "nvim/os/signal.h" #include "nvim/vim.h" #include "nvim/memory.h" -#include "nvim/term.h" #define EXIT_TIMEOUT 25 #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 0xFFFF +#define close_job_stream(job, stream, type) \ + do { \ + if (job->stream) { \ + type##stream_free(job->stream); \ + job->stream = NULL; \ + if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \ + uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \ + } \ + } \ + } while (0) + +#define close_job_in(job) close_job_stream(job, in, w) +#define close_job_out(job) close_job_stream(job, out, r) +#define close_job_err(job) close_job_stream(job, err, r) + struct job { // Job id the index in the job table plus one. int id; @@ -30,13 +42,9 @@ struct job { int64_t status; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; - // exit_cb may be called while there's still pending data from stdout/stderr. - // We use this reference count to ensure the JobExit event is only emitted - // when stdout/stderr are drained - int pending_refs; - // Same as above, but for freeing the job memory which contains - // libuv handles. Only after all are closed the job can be safely freed. - int pending_closes; + // Number of references to the job. The job resources will only be freed by + // close_cb when this is 0 + int refcount; // If the job was already stopped bool stopped; // Data associated with the job @@ -99,25 +107,28 @@ void job_teardown(void) // their status with `wait` or handling SIGCHLD. libuv does that // automatically (and then calls `exit_cb`) but we have to give it a chance // by running the loop one more time - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); // Prepare to start shooting for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL) { - continue; - } + job = table[i]; // Still alive - while (is_alive(job) && remaining_tries--) { + while (job && is_alive(job) && remaining_tries--) { os_delay(50, 0); // Acknowledge child exits - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); + // It's possible that the event_poll call removed the job from the table, + // reset 'job' so the next iteration won't run in that case. + job = table[i]; } - if (is_alive(job)) { + if (job && is_alive(job)) { uv_process_kill(&job->proc, SIGKILL); } } + // Last run to ensure all children were removed + event_poll(0); } /// Tries to start a new job. @@ -163,8 +174,7 @@ Job *job_start(char **argv, job->id = i + 1; *status = job->id; job->status = -1; - job->pending_refs = 3; - job->pending_closes = 4; + job->refcount = 4; job->data = data; job->stdout_cb = stdout_cb; job->stderr_cb = stderr_cb; @@ -205,7 +215,6 @@ Job *job_start(char **argv, // Spawn the job if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { - free_job(job); *status = -1; return NULL; } @@ -213,14 +222,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); - job->err = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); + job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -273,51 +276,30 @@ void job_stop(Job *job) /// is possible on some OS. int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL { - // switch to cooked so `got_int` will be set if the user interrupts - int old_mode = cur_tmode; - settmode(TMODE_COOK); - - EventSource sources[] = {job_event_source(job), signal_event_source(), NULL}; - - // keep track of the elapsed time if ms > 0 - uint64_t before = (ms > 0) ? os_hrtime() : 0; - - while (1) { - // check if the job has exited (and the status is available). - if (job->pending_refs == 0) { - break; - } - - event_poll(ms, sources); - - // we'll assume that a user frantically hitting interrupt doesn't like - // the current job. Signal that it has to be killed. - if (got_int) { - job_stop(job); - } - - if (ms == 0) { - break; - } - - // check if the poll timed out, if not, decrease the ms to wait for the - // next run - if (ms > 0) { - uint64_t now = os_hrtime(); - ms -= (int) ((now - before) / 1000000); - before = now; - - // if the time elapsed is greater than the `ms` wait time, break - if (ms <= 0) { - break; - } - } + // Increase refcount to stop the job from being freed before we have a + // chance to get the status. + job->refcount++; + event_poll_until(ms, + // Until... + got_int || // interrupted by the user + job->refcount == 1); // job exited + + // we'll assume that a user frantically hitting interrupt doesn't like + // the current job. Signal that it has to be killed. + if (got_int) { + job_stop(job); + event_poll(0); } - settmode(old_mode); + if (!--job->refcount) { + int status = (int) job->status; + // Manually invoke close_cb to free the job resources + close_cb((uv_handle_t *)&job->proc); + return status; + } - // return -1 for a timeout, the job status otherwise - return (job->pending_refs) ? -1 : (int) job->status; + // return -1 for a timeout + return -1; } /// Close the pipe used to write to the job. @@ -331,15 +313,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL /// @param job The job instance void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL { - if (!job->in) { - return; - } - - // let other functions in the job module know that the in pipe is no more - wstream_free(job->in); - job->in = NULL; - - uv_close((uv_handle_t *)&job->proc_stdin, close_cb); + close_job_in(job); } /// All writes that complete after calling this function will be reported @@ -369,14 +343,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Runs the read callback associated with the job exit event -/// -/// @param event Object containing data necessary to invoke the callback -void job_exit_event(Event event) -{ - job_exit_callback(event.data.job); -} - /// Get the job id /// /// @param job A pointer to the job @@ -395,11 +361,6 @@ void *job_data(Job *job) return job->data; } -EventSource job_event_source(Job *job) -{ - return job; -} - static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -411,9 +372,6 @@ static void job_exit_callback(Job *job) job->exit_cb(job, job->data); } - // Free the job resources - free_job(job); - // Stop polling job status if this was the last job_count--; if (job_count == 0) { @@ -426,16 +384,6 @@ static bool is_alive(Job *job) return uv_process_kill(&job->proc, 0) == 0; } -static void free_job(Job *job) -{ - uv_close((uv_handle_t *)&job->proc_stdout, close_cb); - if (job->in) { - uv_close((uv_handle_t *)&job->proc_stdin, close_cb); - } - uv_close((uv_handle_t *)&job->proc_stderr, close_cb); - uv_close((uv_handle_t *)&job->proc, close_cb); -} - /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// that didn't die from SIGTERM after a while(exit_timeout is 0). static void job_prepare_cb(uv_prepare_t *handle) @@ -465,12 +413,14 @@ static void read_cb(RStream *rstream, void *data, bool eof) if (rstream == job->out) { job->stdout_cb(rstream, data, eof); + if (eof) { + close_job_out(job); + } } else { job->stderr_cb(rstream, data, eof); - } - - if (eof && --job->pending_refs == 0) { - emit_exit_event(job); + if (eof) { + close_job_err(job); + } } } @@ -480,41 +430,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) Job *job = handle_get_job((uv_handle_t *)proc); job->status = status; - if (--job->pending_refs == 0) { - emit_exit_event(job); - } -} - -static void emit_exit_event(Job *job) -{ - Event event = { - .source = job_event_source(job), - .type = kEventJobExit, - .data.job = job - }; - event_push(event); + uv_close((uv_handle_t *)&job->proc, close_cb); } static void close_cb(uv_handle_t *handle) { Job *job = handle_get_job(handle); - if (--job->pending_closes == 0) { - // Only free the job memory after all the associated handles are properly - // closed by libuv - rstream_free(job->out); - rstream_free(job->err); - if (job->in) { - wstream_free(job->in); - } + if (handle == (uv_handle_t *)&job->proc) { + // Make sure all streams are properly closed to trigger callback invocation + // when job->proc is closed + close_job_in(job); + close_job_out(job); + close_job_err(job); + } - // Free data memory of process and pipe handles, that was allocated - // by handle_set_job in job_start. + if (--job->refcount == 0) { + // Invoke the exit_cb + job_exit_callback(job); + // Free all memory allocated for the job free(job->proc.data); free(job->proc_stdin.data); free(job->proc_stdout.data); free(job->proc_stderr.data); - shell_free_argv(job->proc_opts.args); free(job); } |