diff options
Diffstat (limited to 'src/nvim/eval.c')
| -rw-r--r-- | src/nvim/eval.c | 44 |
1 files changed, 25 insertions, 19 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 7689ea1a27..4aba1ec1bb 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -405,7 +405,7 @@ typedef struct { LibuvProcess uv; PtyProcess pty; } proc; - Stream in, out, err; + Stream in, out, err; // Initialized in common_job_start(). Terminal *term; bool stopped; bool exited; @@ -415,7 +415,7 @@ typedef struct { dict_T *self; int *status_ptr; uint64_t id; - Queue *events; + MultiQueue *events; } TerminalJobData; typedef struct dict_watcher { @@ -11712,7 +11712,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) list_T *rv = list_alloc(); ui_busy_start(); - Queue *waiting_jobs = queue_new_parent(loop_on_put, &main_loop); + MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop); // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. @@ -11728,8 +11728,8 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) data->status_ptr = &rv->lv_last->li_tv.vval.v_number; // Process any pending events for the job because we'll temporarily // replace the parent queue - queue_process_events(data->events); - queue_replace_parent(data->events, waiting_jobs); + multiqueue_process_events(data->events); + multiqueue_replace_parent(data->events, waiting_jobs); } } @@ -11789,11 +11789,11 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) continue; } // restore the parent queue for the job - queue_process_events(data->events); - queue_replace_parent(data->events, main_loop.events); + multiqueue_process_events(data->events); + multiqueue_replace_parent(data->events, main_loop.events); } - queue_free(waiting_jobs); + multiqueue_free(waiting_jobs); ui_busy_stop(); rv->lv_refcount++; rettv->v_type = VAR_LIST; @@ -16420,7 +16420,7 @@ static void f_timer_start(typval_T *argvars, typval_T *rettv, FunPtr fptr) timer->callback = func; time_watcher_init(&main_loop, &timer->tw, timer); - timer->tw.events = queue_new_child(main_loop.events); + timer->tw.events = multiqueue_new_child(main_loop.events); // if main loop is blocked, don't queue up multiple events timer->tw.blockable = true; time_watcher_start(&timer->tw, timer_due_cb, timeout, @@ -16497,7 +16497,7 @@ static void timer_stop(timer_T *timer) static void timer_close_cb(TimeWatcher *tw, void *data) { timer_T *timer = (timer_T *)data; - queue_free(timer->tw.events); + multiqueue_free(timer->tw.events); user_func_unref(timer->callback); pmap_del(uint64_t)(timers, timer->timer_id); timer_decref(timer); @@ -21745,7 +21745,7 @@ static inline TerminalJobData *common_job_init(char **argv, data->on_stderr = on_stderr; data->on_exit = on_exit; data->self = self; - data->events = queue_new_child(main_loop.events); + data->events = multiqueue_new_child(main_loop.events); data->rpc = rpc; if (pty) { data->proc.pty = pty_process_init(&main_loop, data); @@ -21759,7 +21759,7 @@ static inline TerminalJobData *common_job_init(char **argv, if (!pty) { proc->err = &data->err; } - proc->cb = on_process_exit; + proc->cb = eval_job_process_exit_cb; proc->events = data->events; proc->detach = detach; proc->cwd = cwd; @@ -21854,7 +21854,7 @@ static inline void free_term_job_data_event(void **argv) if (data->self) { dict_unref(data->self); } - queue_free(data->events); + multiqueue_free(data->events); pmap_del(uint64_t)(jobs, data->id); xfree(data); } @@ -21863,7 +21863,7 @@ static inline void free_term_job_data(TerminalJobData *data) { // data->queue may still be used after this function returns(process_wait), so // only free in the next event loop iteration - queue_put(main_loop.fast_events, free_term_job_data_event, 1, data); + multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data); } // vimscript job callbacks must be executed on Nvim main loop @@ -21943,7 +21943,7 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, rbuffer_consumed(buf, count); } -static void on_process_exit(Process *proc, int status, void *d) +static void eval_job_process_exit_cb(Process *proc, int status, void *d) { TerminalJobData *data = d; if (data->term && !data->exited) { @@ -21967,9 +21967,15 @@ static void on_process_exit(Process *proc, int status, void *d) static void term_write(char *buf, size_t size, void *d) { - TerminalJobData *data = d; + TerminalJobData *job = d; + if (job->in.closed) { + // If the backing stream was closed abruptly, there may be write events + // ahead of the terminal close event. Just ignore the writes. + ILOG("write failed: stream is closed"); + return; + } WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); - wstream_write(&data->in, wbuf); + wstream_write(&job->in, wbuf); } static void term_resize(uint16_t width, uint16_t height, void *d) @@ -21982,7 +21988,7 @@ static inline void term_delayed_free(void **argv) { TerminalJobData *j = argv[0]; if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) { - queue_put(j->events, term_delayed_free, 1, j); + multiqueue_put(j->events, term_delayed_free, 1, j); return; } @@ -21997,7 +22003,7 @@ static void term_close(void *d) data->exited = true; process_stop((Process *)&data->proc); } - queue_put(data->events, term_delayed_free, 1, data); + multiqueue_put(data->events, term_delayed_free, 1, data); } static void term_job_data_decref(TerminalJobData *data) |