diff options
Diffstat (limited to 'src/nvim/eval.c')
-rw-r--r-- | src/nvim/eval.c | 173 |
1 files changed, 101 insertions, 72 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index aa9e656913..c7c67cfca4 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -468,6 +468,7 @@ typedef struct { dict_T *self; int *status_ptr; uint64_t id; + Queue *events; } TerminalJobData; /// Structure representing current VimL to messagepack conversion state @@ -493,6 +494,13 @@ typedef struct { /// Stack used to convert VimL values to messagepack. typedef kvec_t(MPConvStackVal) MPConvStack; +typedef struct { + TerminalJobData *data; + ufunc_T *callback; + const char *type; + list_T *received; + int status; +} JobEvent; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "eval.c.generated.h" @@ -501,15 +509,6 @@ typedef kvec_t(MPConvStackVal) MPConvStack; #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -// Memory pool for reusing JobEvent structures -typedef struct { - TerminalJobData *data; - ufunc_T *callback; - const char *type; - list_T *received; - int status; -} JobEvent; -static int disable_job_defer = 0; static uint64_t current_job_id = 1; static PMap(uint64_t) *jobs = NULL; @@ -10778,7 +10777,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv) return; } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -10819,7 +10818,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv) return; } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -10860,7 +10859,7 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv) } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -11007,8 +11006,8 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv) } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); - if (!data || data->stopped) { + TerminalJobData *data = find_job(argvars[0].vval.v_number); + if (!data) { EMSG(_(e_invjob)); return; } @@ -11038,28 +11037,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) list_T *rv = list_alloc(); ui_busy_start(); - // disable breakchecks, which could result in job callbacks being executed - // at unexpected places - disable_breakcheck++; - // disable job event deferring so the callbacks are processed while waiting. - if (!disable_job_defer++) { - // process any pending job events in the deferred queue, but only do this if - // deferred is not disabled(at the top-level `jobwait()` call) - loop_process_event(&loop); - } + Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop); // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { list_append_number(rv, -3); } else { // append the list item and set the status pointer so we'll collect the // status code when the job exits list_append_number(rv, -1); data->status_ptr = &rv->lv_last->li_tv.vval.v_number; + // Process any pending events for the job because we'll temporarily + // replace the parent queue + queue_process_events(data->events); + queue_replace_parent(data->events, waiting_jobs); } } @@ -11077,10 +11072,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) break; } if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } - int status = process_wait((Process *)&data->proc, remaining); + int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11100,23 +11095,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) } } - // poll to ensure any pending callbacks from the last job are invoked - loop_poll_events(&loop, 0); - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } // remove the status pointer because the list may be freed before the // job exits data->status_ptr = NULL; } - disable_job_defer--; - disable_breakcheck--; - ui_busy_stop(); + // restore the parent queue for any jobs still alive + for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { + TerminalJobData *data = NULL; + if (arg->li_tv.v_type != VAR_NUMBER + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + continue; + } + // restore the parent queue for the job + queue_process_events(data->events); + queue_replace_parent(data->events, loop.events); + } + + queue_free(waiting_jobs); + ui_busy_stop(); rv->lv_refcount++; rettv->v_type = VAR_LIST; rettv->vval.v_list = rv; @@ -21053,17 +21056,21 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, data->on_stderr = on_stderr; data->on_exit = on_exit; data->self = self; + data->events = queue_new_child(loop.events); if (pty) { - data->proc.pty = pty_process_init(data); + data->proc.pty = pty_process_init(&loop, data); } else { - data->proc.uv = uv_process_init(data); + data->proc.uv = uv_process_init(&loop, data); } Process *proc = (Process *)&data->proc; proc->argv = argv; proc->in = &data->in; proc->out = &data->out; - proc->err = &data->err; + if (!pty) { + proc->err = &data->err; + } proc->cb = on_process_exit; + proc->events = data->events; return data; } @@ -21094,8 +21101,12 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) { data->refcount++; Process *proc = (Process *)&data->proc; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { EMSG(_(e_jobexe)); + if (proc->type == kProcessTypePty) { + xfree(data->proc.pty.term_name); + free_term_job_data(data); + } return false; } @@ -21114,7 +21125,9 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) return true; } -static inline void free_term_job_data(TerminalJobData *data) { +static inline void free_term_job_data_event(void **argv) +{ + TerminalJobData *data = argv[0]; if (data->on_stdout) { user_func_unref(data->on_stdout); } @@ -21129,17 +21142,25 @@ static inline void free_term_job_data(TerminalJobData *data) { data->self->internal_refcount--; dict_unref(data->self); } + queue_free(data->events); xfree(data); } +static inline void free_term_job_data(TerminalJobData *data) +{ + // data->queue may still be used after this function returns(process_wait), so + // only free in the next event loop iteration + queue_put(loop.fast_events, free_term_job_data_event, 1, data); +} + // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, +static inline void process_job_event(TerminalJobData *data, ufunc_T *callback, const char *type, char *buf, size_t count, int status) { - JobEvent *event_data = xmalloc(sizeof(JobEvent)); - event_data->received = NULL; + JobEvent event_data; + event_data.received = NULL; if (buf) { - event_data->received = list_alloc(); + event_data.received = list_alloc(); char *ptr = buf; size_t remaining = count; size_t off = 0; @@ -21147,7 +21168,7 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, while (off < remaining) { // append the line if (ptr[off] == NL) { - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); size_t skip = off + 1; ptr += skip; remaining -= skip; @@ -21160,51 +21181,53 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, } off++; } - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); } else { - event_data->status = status; + event_data.status = status; } - event_data->data = data; - event_data->callback = callback; - event_data->type = type; - loop_push_event(&loop, (Event) { - .handler = on_job_event, - .data = event_data - }, !disable_job_defer); + event_data.data = data; + event_data.callback = callback; + event_data.type = type; + on_job_event(&event_data); } -static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof) +static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, + void *job, bool eof) { TerminalJobData *data = job; - on_job_output(stream, job, buf, eof, data->on_stdout, "stdout"); + on_job_output(stream, job, buf, count, eof, data->on_stdout, "stdout"); } -static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof) +static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, + void *job, bool eof) { TerminalJobData *data = job; - on_job_output(stream, job, buf, eof, data->on_stderr, "stderr"); + on_job_output(stream, job, buf, count, eof, data->on_stderr, "stderr"); } static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, - bool eof, ufunc_T *callback, const char *type) + size_t count, bool eof, ufunc_T *callback, const char *type) { if (eof) { return; } - RBUFFER_UNTIL_EMPTY(buf, ptr, len) { - // The order here matters, the terminal must receive the data first because - // push_job_event will modify the read buffer(convert NULs into NLs) - if (data->term) { - terminal_receive(data->term, ptr, len); - } + // stub variable, to keep reading consistent with the order of events, only + // consider the count parameter. + size_t r; + char *ptr = rbuffer_read_ptr(buf, &r); - if (callback) { - push_job_event(data, callback, type, ptr, len, 0); - } + // The order here matters, the terminal must receive the data first because + // process_job_event will modify the read buffer(convert NULs into NLs) + if (data->term) { + terminal_receive(data->term, ptr, count); + } - rbuffer_consumed(buf, len); + if (callback) { + process_job_event(data, callback, type, ptr, count, 0); } + + rbuffer_consumed(buf, count); } static void on_process_exit(Process *proc, int status, void *d) @@ -21220,7 +21243,7 @@ static void on_process_exit(Process *proc, int status, void *d) *data->status_ptr = status; } - push_job_event(data, data->on_exit, "exit", NULL, 0, status); + process_job_event(data, data->on_exit, "exit", NULL, 0, status); } static void term_write(char *buf, size_t size, void *d) @@ -21254,10 +21277,8 @@ static void term_job_data_decref(TerminalJobData *data) } } -static void on_job_event(Event event) +static void on_job_event(JobEvent *ev) { - JobEvent *ev = event.data; - if (!ev->callback) { goto end; } @@ -21302,7 +21323,15 @@ end: pmap_del(uint64_t)(jobs, ev->data->id); term_job_data_decref(ev->data); } - xfree(ev); +} + +static TerminalJobData *find_job(uint64_t id) +{ + TerminalJobData *data = pmap_get(uint64_t)(jobs, id); + if (!data || data->stopped) { + return NULL; + } + return data; } static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) |