aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/job.c')
-rw-r--r--src/nvim/os/job.c204
1 files changed, 71 insertions, 133 deletions
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index 2ca1023290..f8ad6874c9 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -12,17 +12,29 @@
#include "nvim/os/wstream_defs.h"
#include "nvim/os/event.h"
#include "nvim/os/event_defs.h"
-#include "nvim/os/time.h"
#include "nvim/os/shell.h"
-#include "nvim/os/signal.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
-#include "nvim/term.h"
#define EXIT_TIMEOUT 25
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 0xFFFF
+#define close_job_stream(job, stream, type) \
+ do { \
+ if (job->stream) { \
+ type##stream_free(job->stream); \
+ job->stream = NULL; \
+ if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \
+ uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \
+ } \
+ } \
+ } while (0)
+
+#define close_job_in(job) close_job_stream(job, in, w)
+#define close_job_out(job) close_job_stream(job, out, r)
+#define close_job_err(job) close_job_stream(job, err, r)
+
struct job {
// Job id the index in the job table plus one.
int id;
@@ -30,13 +42,9 @@ struct job {
int64_t status;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
- // exit_cb may be called while there's still pending data from stdout/stderr.
- // We use this reference count to ensure the JobExit event is only emitted
- // when stdout/stderr are drained
- int pending_refs;
- // Same as above, but for freeing the job memory which contains
- // libuv handles. Only after all are closed the job can be safely freed.
- int pending_closes;
+ // Number of references to the job. The job resources will only be freed by
+ // close_cb when this is 0
+ int refcount;
// If the job was already stopped
bool stopped;
// Data associated with the job
@@ -99,25 +107,28 @@ void job_teardown(void)
// their status with `wait` or handling SIGCHLD. libuv does that
// automatically (and then calls `exit_cb`) but we have to give it a chance
// by running the loop one more time
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_poll(0);
// Prepare to start shooting
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL) {
- continue;
- }
+ job = table[i];
// Still alive
- while (is_alive(job) && remaining_tries--) {
+ while (job && is_alive(job) && remaining_tries--) {
os_delay(50, 0);
// Acknowledge child exits
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_poll(0);
+ // It's possible that the event_poll call removed the job from the table,
+ // reset 'job' so the next iteration won't run in that case.
+ job = table[i];
}
- if (is_alive(job)) {
+ if (job && is_alive(job)) {
uv_process_kill(&job->proc, SIGKILL);
}
}
+ // Last run to ensure all children were removed
+ event_poll(0);
}
/// Tries to start a new job.
@@ -163,8 +174,7 @@ Job *job_start(char **argv,
job->id = i + 1;
*status = job->id;
job->status = -1;
- job->pending_refs = 3;
- job->pending_closes = 4;
+ job->refcount = 4;
job->data = data;
job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb;
@@ -205,7 +215,6 @@ Job *job_start(char **argv,
// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
- free_job(job);
*status = -1;
return NULL;
}
@@ -213,14 +222,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
- job->out = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
- job->err = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
+ job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
+ job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -273,51 +276,30 @@ void job_stop(Job *job)
/// is possible on some OS.
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
{
- // switch to cooked so `got_int` will be set if the user interrupts
- int old_mode = cur_tmode;
- settmode(TMODE_COOK);
-
- EventSource sources[] = {job_event_source(job), signal_event_source(), NULL};
-
- // keep track of the elapsed time if ms > 0
- uint64_t before = (ms > 0) ? os_hrtime() : 0;
-
- while (1) {
- // check if the job has exited (and the status is available).
- if (job->pending_refs == 0) {
- break;
- }
-
- event_poll(ms, sources);
-
- // we'll assume that a user frantically hitting interrupt doesn't like
- // the current job. Signal that it has to be killed.
- if (got_int) {
- job_stop(job);
- }
-
- if (ms == 0) {
- break;
- }
-
- // check if the poll timed out, if not, decrease the ms to wait for the
- // next run
- if (ms > 0) {
- uint64_t now = os_hrtime();
- ms -= (int) ((now - before) / 1000000);
- before = now;
-
- // if the time elapsed is greater than the `ms` wait time, break
- if (ms <= 0) {
- break;
- }
- }
+ // Increase refcount to stop the job from being freed before we have a
+ // chance to get the status.
+ job->refcount++;
+ event_poll_until(ms,
+ // Until...
+ got_int || // interrupted by the user
+ job->refcount == 1); // job exited
+
+ // we'll assume that a user frantically hitting interrupt doesn't like
+ // the current job. Signal that it has to be killed.
+ if (got_int) {
+ job_stop(job);
+ event_poll(0);
}
- settmode(old_mode);
+ if (!--job->refcount) {
+ int status = (int) job->status;
+ // Manually invoke close_cb to free the job resources
+ close_cb((uv_handle_t *)&job->proc);
+ return status;
+ }
- // return -1 for a timeout, the job status otherwise
- return (job->pending_refs) ? -1 : (int) job->status;
+ // return -1 for a timeout
+ return -1;
}
/// Close the pipe used to write to the job.
@@ -331,15 +313,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
/// @param job The job instance
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
{
- if (!job->in) {
- return;
- }
-
- // let other functions in the job module know that the in pipe is no more
- wstream_free(job->in);
- job->in = NULL;
-
- uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
+ close_job_in(job);
}
/// All writes that complete after calling this function will be reported
@@ -369,14 +343,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
-/// Runs the read callback associated with the job exit event
-///
-/// @param event Object containing data necessary to invoke the callback
-void job_exit_event(Event event)
-{
- job_exit_callback(event.data.job);
-}
-
/// Get the job id
///
/// @param job A pointer to the job
@@ -395,11 +361,6 @@ void *job_data(Job *job)
return job->data;
}
-EventSource job_event_source(Job *job)
-{
- return job;
-}
-
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@@ -411,9 +372,6 @@ static void job_exit_callback(Job *job)
job->exit_cb(job, job->data);
}
- // Free the job resources
- free_job(job);
-
// Stop polling job status if this was the last
job_count--;
if (job_count == 0) {
@@ -426,16 +384,6 @@ static bool is_alive(Job *job)
return uv_process_kill(&job->proc, 0) == 0;
}
-static void free_job(Job *job)
-{
- uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
- if (job->in) {
- uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
- }
- uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
- uv_close((uv_handle_t *)&job->proc, close_cb);
-}
-
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_prepare_cb(uv_prepare_t *handle)
@@ -465,12 +413,14 @@ static void read_cb(RStream *rstream, void *data, bool eof)
if (rstream == job->out) {
job->stdout_cb(rstream, data, eof);
+ if (eof) {
+ close_job_out(job);
+ }
} else {
job->stderr_cb(rstream, data, eof);
- }
-
- if (eof && --job->pending_refs == 0) {
- emit_exit_event(job);
+ if (eof) {
+ close_job_err(job);
+ }
}
}
@@ -480,41 +430,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
Job *job = handle_get_job((uv_handle_t *)proc);
job->status = status;
- if (--job->pending_refs == 0) {
- emit_exit_event(job);
- }
-}
-
-static void emit_exit_event(Job *job)
-{
- Event event = {
- .source = job_event_source(job),
- .type = kEventJobExit,
- .data.job = job
- };
- event_push(event);
+ uv_close((uv_handle_t *)&job->proc, close_cb);
}
static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);
- if (--job->pending_closes == 0) {
- // Only free the job memory after all the associated handles are properly
- // closed by libuv
- rstream_free(job->out);
- rstream_free(job->err);
- if (job->in) {
- wstream_free(job->in);
- }
+ if (handle == (uv_handle_t *)&job->proc) {
+ // Make sure all streams are properly closed to trigger callback invocation
+ // when job->proc is closed
+ close_job_in(job);
+ close_job_out(job);
+ close_job_err(job);
+ }
- // Free data memory of process and pipe handles, that was allocated
- // by handle_set_job in job_start.
+ if (--job->refcount == 0) {
+ // Invoke the exit_cb
+ job_exit_callback(job);
+ // Free all memory allocated for the job
free(job->proc.data);
free(job->proc_stdin.data);
free(job->proc_stdout.data);
free(job->proc_stderr.data);
-
shell_free_argv(job->proc_opts.args);
free(job);
}