aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/eval.c74
-rw-r--r--src/os/event.c6
-rw-r--r--src/os/event_defs.h10
-rw-r--r--src/os/job.c103
-rw-r--r--src/os/job.h31
-rw-r--r--src/os/job_defs.h8
6 files changed, 150 insertions, 82 deletions
diff --git a/src/eval.c b/src/eval.c
index 18d7597a24..8cd9f30f0a 100644
--- a/src/eval.c
+++ b/src/eval.c
@@ -405,10 +405,11 @@ static struct vimvar {
static dictitem_T vimvars_var; /* variable used for v: */
#define vimvarht vimvardict.dv_hashtab
-static void apply_job_autocmds(int id,
- void *data,
- RStream *target,
- bool from_stdout);
+static void on_job_stdout(RStream *rstream, void *data, bool eof);
+static void on_job_stderr(RStream *rstream, void *data, bool eof);
+static void on_job_exit(Job *job, void *data);
+static void on_job_data(RStream *rstream, void *data, bool eof, char *type);
+static void apply_job_autocmds(Job *job, char *name, char *type, char *str);
static void prepare_vimvar(int idx, typval_T *save_tv);
static void restore_vimvar(int idx, typval_T *save_tv);
static int ex_let_vars(char_u *arg, typval_T *tv, int copy,
@@ -11073,7 +11074,9 @@ static void f_job_start(typval_T *argvars, typval_T *rettv)
rettv->vval.v_number = job_start(argv,
xstrdup((char *)argvars[0].vval.v_string),
- apply_job_autocmds);
+ on_job_stdout,
+ on_job_stderr,
+ on_job_exit);
if (rettv->vval.v_number <= 0) {
if (rettv->vval.v_number == 0) {
@@ -19796,31 +19799,56 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
return ret;
}
-static void apply_job_autocmds(int id,
- void *data,
- RStream *target,
- bool from_stdout)
+static void on_job_stdout(RStream *rstream, void *data, bool eof)
{
- list_T *list;
- char *str;
- listitem_T *str_slot = listitem_alloc();
- uint32_t read_count = rstream_available(target);
+ if (!eof) {
+ on_job_data(rstream, data, eof, "stdout");
+ }
+}
+
+static void on_job_stderr(RStream *rstream, void *data, bool eof)
+{
+ if (!eof) {
+ on_job_data(rstream, data, eof, "stderr");
+ }
+}
+
+static void on_job_exit(Job *job, void *data)
+{
+ apply_job_autocmds(job, data, "exit", NULL);
+}
- // Prepare the list item containing the data read
- str = xmalloc(read_count + 1);
- rstream_read(target, str, read_count);
+static void on_job_data(RStream *rstream, void *data, bool eof, char *type)
+{
+ Job *job = data;
+ uint32_t read_count = rstream_available(rstream);
+ char *str = xmalloc(read_count + 1);
+
+ rstream_read(rstream, str, read_count);
str[read_count] = NUL;
- str_slot->li_tv.v_type = VAR_STRING;
- str_slot->li_tv.v_lock = 0;
- str_slot->li_tv.vval.v_string = (char_u *)str;
+ apply_job_autocmds(job, job_data(job), type, str);
+}
+
+static void apply_job_autocmds(Job *job, char *name, char *type, char *str)
+{
+ list_T *list;
+
// Create the list which will be set to v:job_data
list = list_alloc();
- list_append_number(list, id);
- list_append(list, str_slot);
- list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3);
+ list_append_number(list, job_id(job));
+ list_append_string(list, (uint8_t *)type, -1);
+
+ if (str) {
+ listitem_T *str_slot = listitem_alloc();
+ str_slot->li_tv.v_type = VAR_STRING;
+ str_slot->li_tv.v_lock = 0;
+ str_slot->li_tv.vval.v_string = (uint8_t *)str;
+ list_append(list, str_slot);
+ }
+
// Update v:job_data for the autocommands
set_vim_var_list(VV_JOB_DATA, list);
// Call JobActivity autocommands
- apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL);
+ apply_autocmds(EVENT_JOBACTIVITY, (uint8_t *)name, NULL, TRUE, NULL);
}
diff --git a/src/os/event.c b/src/os/event.c
index 4c5be1e16b..1a35cb3f12 100644
--- a/src/os/event.c
+++ b/src/os/event.c
@@ -110,12 +110,12 @@ void event_process()
case kEventSignal:
signal_handle(event);
break;
- case kEventJobActivity:
- job_handle(event);
- break;
case kEventRStreamData:
rstream_read_event(event);
break;
+ case kEventJobExit:
+ job_exit_event(event);
+ break;
default:
abort();
}
diff --git a/src/os/event_defs.h b/src/os/event_defs.h
index 5925a31718..428ae50f3e 100644
--- a/src/os/event_defs.h
+++ b/src/os/event_defs.h
@@ -6,8 +6,8 @@
typedef enum {
kEventSignal,
- kEventJobActivity,
- kEventRStreamData
+ kEventRStreamData,
+ kEventJobExit
} EventType;
typedef struct {
@@ -18,11 +18,7 @@ typedef struct {
RStream *ptr;
bool eof;
} rstream;
- struct {
- Job *ptr;
- RStream *target;
- bool from_stdout;
- } job;
+ Job *job;
} data;
} Event;
diff --git a/src/os/job.c b/src/os/job.c
index 5bd404e5be..f4cbbfb670 100644
--- a/src/os/job.c
+++ b/src/os/job.c
@@ -7,6 +7,8 @@
#include "os/job_defs.h"
#include "os/rstream.h"
#include "os/rstream_defs.h"
+#include "os/event.h"
+#include "os/event_defs.h"
#include "os/time.h"
#include "os/shell.h"
#include "vim.h"
@@ -22,12 +24,17 @@ struct job {
int id;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
+ // exit_cb may be called while there's still pending data from stdout/stderr.
+ // We use this reference count to ensure the JobExit event is only emitted
+ // when stdout/stderr are drained
+ int pending_refs;
// If the job was already stopped
bool stopped;
// Data associated with the job
void *data;
- // Callback for consuming data from the buffer
- job_read_cb read_cb;
+ // Callbacks
+ job_exit_cb exit_cb;
+ rstream_cb stdout_cb, stderr_cb;
// Readable streams
RStream *out, *err;
// Structures for process spawning/management used by libuv
@@ -40,7 +47,6 @@ struct job {
static Job *table[MAX_RUNNING_JOBS] = {NULL};
static uv_prepare_t job_prepare;
-static void read_cb(RStream *rstream, void *data, bool eof);
// Some helpers shared in this module
static bool is_alive(Job *job);
static Job * find_job(int id);
@@ -48,9 +54,10 @@ static void free_job(Job *job);
// Callbacks for libuv
static void job_prepare_cb(uv_prepare_t *handle);
-// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
static void write_cb(uv_write_t *req, int status);
+static void read_cb(RStream *rstream, void *data, bool eof);
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal);
+static void emit_exit_event(Job *job);
void job_init()
{
@@ -105,7 +112,11 @@ void job_teardown()
}
}
-int job_start(char **argv, void *data, job_read_cb cb)
+int job_start(char **argv,
+ void *data,
+ rstream_cb stdout_cb,
+ rstream_cb stderr_cb,
+ job_exit_cb job_exit_cb)
{
int i;
Job *job;
@@ -125,8 +136,11 @@ int job_start(char **argv, void *data, job_read_cb cb)
job = xmalloc(sizeof(Job));
// Initialize
job->id = i + 1;
+ job->pending_refs = 3;
job->data = data;
- job->read_cb = cb;
+ job->stdout_cb = stdout_cb;
+ job->stderr_cb = stderr_cb;
+ job->exit_cb = job_exit_cb;
job->stopped = false;
job->exit_timeout = EXIT_TIMEOUT;
job->proc_opts.file = argv[0];
@@ -159,8 +173,8 @@ int job_start(char **argv, void *data, job_read_cb cb)
}
// Start the readable streams
- job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
- job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -208,15 +222,27 @@ bool job_write(int id, char *data, uint32_t len)
return true;
}
-void job_handle(Event event)
+void job_exit_event(Event event)
{
- Job *job = event.data.job.ptr;
+ Job *job = event.data.job;
+
+ // Invoke the exit callback
+ job->exit_cb(job, job->data);
+
+ // Free the job resources
+ table[job->id - 1] = NULL;
+ shell_free_argv(job->proc_opts.args);
+ free_job(job);
+}
- // Invoke the job callback
- job->read_cb(job->id,
- job->data,
- event.data.job.target,
- event.data.job.from_stdout);
+int job_id(Job *job)
+{
+ return job->id;
+}
+
+void *job_data(Job *job)
+{
+ return job->data;
}
static bool is_alive(Job *job)
@@ -266,38 +292,43 @@ static void job_prepare_cb(uv_prepare_t *handle)
}
}
-/// Pushes a event object to the event queue, which will be handled later by
-/// `job_handle`
+static void write_cb(uv_write_t *req, int status)
+{
+ free(req->data);
+ free(req);
+}
+
+// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
static void read_cb(RStream *rstream, void *data, bool eof)
{
- Event event;
Job *job = data;
- if (eof) {
- uv_process_kill(&job->proc, SIGTERM);
- return;
+ if (rstream == job->out) {
+ job->stdout_cb(rstream, data, eof);
+ } else {
+ job->stderr_cb(rstream, data, eof);
}
- event.type = kEventJobActivity;
- event.data.job.ptr = job;
- event.data.job.target = rstream;
- event.data.job.from_stdout = rstream == job->out;
- event_push(event);
-}
-
-static void write_cb(uv_write_t *req, int status)
-{
- free(req->data);
- free(req);
+ if (eof && --job->pending_refs == 0) {
+ emit_exit_event(job);
+ }
}
-/// Cleanup all the resources associated with the job
+// Emits a JobExit event if both rstreams are closed
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
Job *job = proc->data;
- table[job->id - 1] = NULL;
- shell_free_argv(job->proc_opts.args);
- free_job(job);
+ if (--job->pending_refs == 0) {
+ emit_exit_event(job);
+ }
+}
+
+static void emit_exit_event(Job *job)
+{
+ Event event;
+ event.type = kEventJobExit;
+ event.data.job = job;
+ event_push(event);
}
diff --git a/src/os/job.h b/src/os/job.h
index c0f6734467..37e34700dc 100644
--- a/src/os/job.h
+++ b/src/os/job.h
@@ -11,7 +11,7 @@
#include <stdbool.h>
#include "os/event_defs.h"
-#include "os/event.h"
+#include "os/rstream_defs.h"
/// Initializes job control resources
void job_init(void);
@@ -24,12 +24,19 @@ void job_teardown(void);
/// @param argv Argument vector for the process. The first item is the
/// executable to run.
/// @param data Caller data that will be associated with the job
-/// @param cb Callback that will be invoked everytime data is available in
-/// the job's stdout/stderr
+/// @param stdout_cb Callback that will be invoked when data is available
+/// on stdout
+/// @param stderr_cb Callback that will be invoked when data is available
+/// on stderr
+/// @param exit_cb Callback that will be invoked when the job exits
/// @return The job id if the job started successfully. If the the first item /
/// of `argv`(the program) could not be executed, -1 will be returned.
// 0 will be returned if the job table is full.
-int job_start(char **argv, void *data, job_read_cb cb);
+int job_start(char **argv,
+ void *data,
+ rstream_cb stdout_cb,
+ rstream_cb stderr_cb,
+ job_exit_cb exit_cb);
/// Terminates a job. This is a non-blocking operation, but if the job exists
/// it's guaranteed to succeed(SIGKILL will eventually be sent)
@@ -49,10 +56,22 @@ bool job_stop(int id);
/// id is invalid(probably because it has already stopped)
bool job_write(int id, char *data, uint32_t len);
-/// Runs the read callback associated with the job/event
+/// Runs the read callback associated with the job exit event
///
/// @param event Object containing data necessary to invoke the callback
-void job_handle(Event event);
+void job_exit_event(Event event);
+
+/// Get the job id
+///
+/// @param job A pointer to the job
+/// @return The job id
+int job_id(Job *job);
+
+/// Get data associated with a job
+///
+/// @param job A pointer to the job
+/// @return The job data
+void *job_data(Job *job);
#endif // NEOVIM_OS_JOB_H
diff --git a/src/os/job_defs.h b/src/os/job_defs.h
index ada33d757e..e98b639154 100644
--- a/src/os/job_defs.h
+++ b/src/os/job_defs.h
@@ -9,13 +9,7 @@ typedef struct job Job;
///
/// @param id The job id
/// @param data Some data associated with the job by the caller
-/// @param target The `RStream` instance containing data to be read
-/// @param from_stdout This is true if data was read from the job's stdout,
-/// false if it came from stderr.
-typedef void (*job_read_cb)(int id,
- void *data,
- RStream *target,
- bool from_stdout);
+typedef void (*job_exit_cb)(Job *job, void *data);
#endif // NEOVIM_OS_JOB_DEFS_H