aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/eval.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/eval.c')
-rw-r--r--src/nvim/eval.c173
1 files changed, 101 insertions, 72 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index aa9e656913..c7c67cfca4 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -468,6 +468,7 @@ typedef struct {
dict_T *self;
int *status_ptr;
uint64_t id;
+ Queue *events;
} TerminalJobData;
/// Structure representing current VimL to messagepack conversion state
@@ -493,6 +494,13 @@ typedef struct {
/// Stack used to convert VimL values to messagepack.
typedef kvec_t(MPConvStackVal) MPConvStack;
+typedef struct {
+ TerminalJobData *data;
+ ufunc_T *callback;
+ const char *type;
+ list_T *received;
+ int status;
+} JobEvent;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "eval.c.generated.h"
@@ -501,15 +509,6 @@ typedef kvec_t(MPConvStackVal) MPConvStack;
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
-// Memory pool for reusing JobEvent structures
-typedef struct {
- TerminalJobData *data;
- ufunc_T *callback;
- const char *type;
- list_T *received;
- int status;
-} JobEvent;
-static int disable_job_defer = 0;
static uint64_t current_job_id = 1;
static PMap(uint64_t) *jobs = NULL;
@@ -10778,7 +10777,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
return;
}
- TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
@@ -10819,7 +10818,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
- TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
@@ -10860,7 +10859,7 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv)
}
- TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
+ TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
@@ -11007,8 +11006,8 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)
}
- TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
- if (!data || data->stopped) {
+ TerminalJobData *data = find_job(argvars[0].vval.v_number);
+ if (!data) {
EMSG(_(e_invjob));
return;
}
@@ -11038,28 +11037,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
list_T *rv = list_alloc();
ui_busy_start();
- // disable breakchecks, which could result in job callbacks being executed
- // at unexpected places
- disable_breakcheck++;
- // disable job event deferring so the callbacks are processed while waiting.
- if (!disable_job_defer++) {
- // process any pending job events in the deferred queue, but only do this if
- // deferred is not disabled(at the top-level `jobwait()` call)
- loop_process_event(&loop);
- }
+ Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop);
// For each item in the input list append an integer to the output list. -3
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
+ || !(data = find_job(arg->li_tv.vval.v_number))) {
list_append_number(rv, -3);
} else {
// append the list item and set the status pointer so we'll collect the
// status code when the job exits
list_append_number(rv, -1);
data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
+ // Process any pending events for the job because we'll temporarily
+ // replace the parent queue
+ queue_process_events(data->events);
+ queue_replace_parent(data->events, waiting_jobs);
}
}
@@ -11077,10 +11072,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
break;
}
if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
+ || !(data = find_job(arg->li_tv.vval.v_number))) {
continue;
}
- int status = process_wait((Process *)&data->proc, remaining);
+ int status = process_wait((Process *)&data->proc, remaining, waiting_jobs);
if (status < 0) {
// interrupted or timed out, skip remaining jobs.
if (status == -2) {
@@ -11100,23 +11095,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
}
}
- // poll to ensure any pending callbacks from the last job are invoked
- loop_poll_events(&loop, 0);
-
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
+ || !(data = find_job(arg->li_tv.vval.v_number))) {
continue;
}
// remove the status pointer because the list may be freed before the
// job exits
data->status_ptr = NULL;
}
- disable_job_defer--;
- disable_breakcheck--;
- ui_busy_stop();
+ // restore the parent queue for any jobs still alive
+ for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
+ TerminalJobData *data = NULL;
+ if (arg->li_tv.v_type != VAR_NUMBER
+ || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
+ continue;
+ }
+ // restore the parent queue for the job
+ queue_process_events(data->events);
+ queue_replace_parent(data->events, loop.events);
+ }
+
+ queue_free(waiting_jobs);
+ ui_busy_stop();
rv->lv_refcount++;
rettv->v_type = VAR_LIST;
rettv->vval.v_list = rv;
@@ -21053,17 +21056,21 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout,
data->on_stderr = on_stderr;
data->on_exit = on_exit;
data->self = self;
+ data->events = queue_new_child(loop.events);
if (pty) {
- data->proc.pty = pty_process_init(data);
+ data->proc.pty = pty_process_init(&loop, data);
} else {
- data->proc.uv = uv_process_init(data);
+ data->proc.uv = uv_process_init(&loop, data);
}
Process *proc = (Process *)&data->proc;
proc->argv = argv;
proc->in = &data->in;
proc->out = &data->out;
- proc->err = &data->err;
+ if (!pty) {
+ proc->err = &data->err;
+ }
proc->cb = on_process_exit;
+ proc->events = data->events;
return data;
}
@@ -21094,8 +21101,12 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
{
data->refcount++;
Process *proc = (Process *)&data->proc;
- if (!process_spawn(&loop, proc)) {
+ if (!process_spawn(proc)) {
EMSG(_(e_jobexe));
+ if (proc->type == kProcessTypePty) {
+ xfree(data->proc.pty.term_name);
+ free_term_job_data(data);
+ }
return false;
}
@@ -21114,7 +21125,9 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
return true;
}
-static inline void free_term_job_data(TerminalJobData *data) {
+static inline void free_term_job_data_event(void **argv)
+{
+ TerminalJobData *data = argv[0];
if (data->on_stdout) {
user_func_unref(data->on_stdout);
}
@@ -21129,17 +21142,25 @@ static inline void free_term_job_data(TerminalJobData *data) {
data->self->internal_refcount--;
dict_unref(data->self);
}
+ queue_free(data->events);
xfree(data);
}
+static inline void free_term_job_data(TerminalJobData *data)
+{
+ // data->queue may still be used after this function returns(process_wait), so
+ // only free in the next event loop iteration
+ queue_put(loop.fast_events, free_term_job_data_event, 1, data);
+}
+
// vimscript job callbacks must be executed on Nvim main loop
-static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
+static inline void process_job_event(TerminalJobData *data, ufunc_T *callback,
const char *type, char *buf, size_t count, int status)
{
- JobEvent *event_data = xmalloc(sizeof(JobEvent));
- event_data->received = NULL;
+ JobEvent event_data;
+ event_data.received = NULL;
if (buf) {
- event_data->received = list_alloc();
+ event_data.received = list_alloc();
char *ptr = buf;
size_t remaining = count;
size_t off = 0;
@@ -21147,7 +21168,7 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
while (off < remaining) {
// append the line
if (ptr[off] == NL) {
- list_append_string(event_data->received, (uint8_t *)ptr, off);
+ list_append_string(event_data.received, (uint8_t *)ptr, off);
size_t skip = off + 1;
ptr += skip;
remaining -= skip;
@@ -21160,51 +21181,53 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
}
off++;
}
- list_append_string(event_data->received, (uint8_t *)ptr, off);
+ list_append_string(event_data.received, (uint8_t *)ptr, off);
} else {
- event_data->status = status;
+ event_data.status = status;
}
- event_data->data = data;
- event_data->callback = callback;
- event_data->type = type;
- loop_push_event(&loop, (Event) {
- .handler = on_job_event,
- .data = event_data
- }, !disable_job_defer);
+ event_data.data = data;
+ event_data.callback = callback;
+ event_data.type = type;
+ on_job_event(&event_data);
}
-static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)
+static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
+ void *job, bool eof)
{
TerminalJobData *data = job;
- on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");
+ on_job_output(stream, job, buf, count, eof, data->on_stdout, "stdout");
}
-static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)
+static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
+ void *job, bool eof)
{
TerminalJobData *data = job;
- on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");
+ on_job_output(stream, job, buf, count, eof, data->on_stderr, "stderr");
}
static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf,
- bool eof, ufunc_T *callback, const char *type)
+ size_t count, bool eof, ufunc_T *callback, const char *type)
{
if (eof) {
return;
}
- RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
- // The order here matters, the terminal must receive the data first because
- // push_job_event will modify the read buffer(convert NULs into NLs)
- if (data->term) {
- terminal_receive(data->term, ptr, len);
- }
+ // stub variable, to keep reading consistent with the order of events, only
+ // consider the count parameter.
+ size_t r;
+ char *ptr = rbuffer_read_ptr(buf, &r);
- if (callback) {
- push_job_event(data, callback, type, ptr, len, 0);
- }
+ // The order here matters, the terminal must receive the data first because
+ // process_job_event will modify the read buffer(convert NULs into NLs)
+ if (data->term) {
+ terminal_receive(data->term, ptr, count);
+ }
- rbuffer_consumed(buf, len);
+ if (callback) {
+ process_job_event(data, callback, type, ptr, count, 0);
}
+
+ rbuffer_consumed(buf, count);
}
static void on_process_exit(Process *proc, int status, void *d)
@@ -21220,7 +21243,7 @@ static void on_process_exit(Process *proc, int status, void *d)
*data->status_ptr = status;
}
- push_job_event(data, data->on_exit, "exit", NULL, 0, status);
+ process_job_event(data, data->on_exit, "exit", NULL, 0, status);
}
static void term_write(char *buf, size_t size, void *d)
@@ -21254,10 +21277,8 @@ static void term_job_data_decref(TerminalJobData *data)
}
}
-static void on_job_event(Event event)
+static void on_job_event(JobEvent *ev)
{
- JobEvent *ev = event.data;
-
if (!ev->callback) {
goto end;
}
@@ -21302,7 +21323,15 @@ end:
pmap_del(uint64_t)(jobs, ev->data->id);
term_job_data_decref(ev->data);
}
- xfree(ev);
+}
+
+static TerminalJobData *find_job(uint64_t id)
+{
+ TerminalJobData *data = pmap_get(uint64_t)(jobs, id);
+ if (!data || data->stopped) {
+ return NULL;
+ }
+ return data;
}
static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv)