From 3717e2157f2d45ce23dbe4ac03085fea2d956dc4 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 23 Jul 2017 18:04:14 +0200 Subject: Revert channel logging, rebased on new code below --- src/nvim/eval.c | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index f414e771d7..b8bae3e293 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -15168,8 +15168,7 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr) } const char *error = NULL; - eval_format_source_name_line((char *)IObuff, sizeof(IObuff)); - uint64_t id = channel_connect(tcp, address, 50, (char *)IObuff, &error); + uint64_t id = channel_connect(tcp, address, 50, &error); if (error) { EMSG2(_("connection failed: %s"), error); @@ -22488,9 +22487,8 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) if (data->rpc) { - eval_format_source_name_line((char *)IObuff, sizeof(IObuff)); - // RPC channel takes over the in/out streams. - channel_from_process(proc, data->id, (char *)IObuff); + // the rpc channel takes over the in and out streams + channel_from_process(proc, data->id); } else { wstream_init(proc->in, 0); if (proc->out) { @@ -22855,4 +22853,3 @@ void ex_checkhealth(exarg_T *eap) xfree(buf); } - -- cgit From 5215e3205a07b85e4e4cf1f8a8ca6be2b9556459 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 27 Aug 2017 11:59:33 +0200 Subject: channels: refactor --- src/nvim/eval.c | 314 ++++++++++++++++++++++---------------------------------- 1 file changed, 120 insertions(+), 194 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index b8bae3e293..013cfce78d 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -24,6 +24,7 @@ #endif #include "nvim/eval.h" #include "nvim/buffer.h" +#include "nvim/channel.h" #include "nvim/charset.h" #include "nvim/cursor.h" #include "nvim/diff.h" @@ -437,29 +438,12 @@ static ScopeDictDictItem vimvars_var; #define vimvarht vimvardict.dv_hashtab typedef struct { - union { - LibuvProcess uv; - PtyProcess pty; - } proc; - Stream in, out, err; // Initialized in common_job_start(). - Terminal *term; - bool stopped; - bool exited; - bool rpc; - int refcount; - Callback on_stdout, on_stderr, on_exit; - varnumber_T *status_ptr; - uint64_t id; - MultiQueue *events; -} TerminalJobData; - -typedef struct { - TerminalJobData *data; + Channel *data; Callback *callback; const char *type; list_T *received; int status; -} JobEvent; +} ChannelEvent; typedef struct { TimeWatcher tw; @@ -513,7 +497,6 @@ typedef enum { #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -static PMap(uint64_t) *jobs = NULL; static uint64_t last_timer_id = 0; static PMap(uint64_t) *timers = NULL; @@ -556,7 +539,6 @@ void eval_init(void) { vimvars[VV_VERSION].vv_nr = VIM_VERSION_100; - jobs = pmap_new(uint64_t)(); timers = pmap_new(uint64_t)(); struct vimvar *p; @@ -5141,8 +5123,8 @@ bool garbage_collect(bool testing) // Jobs { - TerminalJobData *data; - map_foreach_value(jobs, data, { + Channel *data; + map_foreach_value(channels, data, { set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); @@ -11433,24 +11415,23 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; if (argvars[1].v_type == VAR_STRING) { char *stream = (char *)argvars[1].vval.v_string; if (!strcmp(stream, "stdin")) { - if (data->rpc) { + if (data->is_rpc) { EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); } else { process_close_in(proc); } } else if (!strcmp(stream, "stdout")) { - if (data->rpc) { + if (data->is_rpc) { EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); } else { process_close_out(proc); @@ -11458,7 +11439,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) } else if (!strcmp(stream, "stderr")) { process_close_err(proc); } else if (!strcmp(stream, "rpc")) { - if (data->rpc) { + if (data->is_rpc) { channel_close(data->id); } else { EMSG(_("Invalid job stream: Not an rpc job")); @@ -11467,13 +11448,13 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) EMSG2(_("Invalid job stream \"%s\""), stream); } } else { - if (data->rpc) { + if (data->is_rpc) { channel_close(data->id); process_close_err(proc); } else { process_close_streams(proc); if (proc->type == kProcessTypePty) { - pty_process_close_master(&data->proc.pty); + pty_process_close_master(&data->stream.pty); } } } @@ -11494,13 +11475,12 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; rettv->vval.v_number = proc->pid; } @@ -11521,18 +11501,19 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_channel(argvars[0].vval.v_number); if (!data) { - EMSG(_(e_invjob)); + EMSG(_(e_invchan)); return; } - if (((Process *)&data->proc)->in->closed) { + Stream *in = channel_instream(data); + if (in->closed) { EMSG(_("Can't send data to the job: stdin is closed")); return; } - if (data->rpc) { + if (data->is_rpc) { EMSG(_("Can't send raw data to rpc channel")); return; } @@ -11546,7 +11527,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) } WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); - rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf); + rettv->vval.v_number = wstream_write(in, buf); } // "jobresize(job, width, height)" function @@ -11567,19 +11548,17 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - if (data->proc.uv.process.type != kProcessTypePty) { - EMSG(_(e_jobnotpty)); + if (data->stream.proc.type != kProcessTypePty) { + EMSG(_(e_channotpty)); return; } - pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, - argvars[2].vval.v_number); + pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, argvars[2].vval.v_number); rettv->vval.v_number = 1; } @@ -11697,31 +11676,25 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) } } - TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, + Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, pty, rpc, detach, cwd); - Process *proc = (Process *)&data->proc; if (pty) { + PtyProcess *pty = &data->stream.pty; uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); if (width > 0) { - data->proc.pty.width = width; + pty->width = width; } uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); if (height > 0) { - data->proc.pty.height = height; + pty->height = height; } char *term = tv_dict_get_string(job_opts, "TERM", true); if (term) { - data->proc.pty.term_name = term; + pty->term_name = term; } } - if (!rpc && on_stdout.type == kCallbackNone) { - proc->out = NULL; - } - if (on_stderr.type == kCallbackNone) { - proc->err = NULL; - } common_job_start(data, rettv); } @@ -11742,14 +11715,12 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - process_stop((Process *)&data->proc); - data->stopped = true; + process_stop((Process *)&data->stream.proc); rettv->vval.v_number = 1; } @@ -11778,9 +11749,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { tv_list_append_number(rv, -3); } else { // append the list item and set the status pointer so we'll collect the @@ -11802,16 +11773,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) } for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (remaining == 0) { // timed out break; } if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } - int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); + int status = process_wait((Process *)&data->stream.proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11832,9 +11803,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) } for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } // remove the status pointer because the list may be freed before the @@ -11844,9 +11815,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) // restore the parent queue for any jobs still alive for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } // restore the parent queue for the job @@ -13803,9 +13774,8 @@ static void f_rpcnotify(typval_T *argvars, typval_T *rettv, FunPtr fptr) ADD(args, vim_to_object(tv)); } - if (!channel_send_event((uint64_t)argvars[0].vval.v_number, - tv_get_string(&argvars[1]), - args)) { + if (!rpc_send_event((uint64_t)argvars[0].vval.v_number, + tv_get_string(&argvars[1]), args)) { EMSG2(_(e_invarg2), "Channel doesn't exist"); return; } @@ -13870,10 +13840,8 @@ static void f_rpcrequest(typval_T *argvars, typval_T *rettv, FunPtr fptr) Error err = ERROR_INIT; - Object result = channel_send_call((uint64_t)argvars[0].vval.v_number, - tv_get_string(&argvars[1]), - args, - &err); + Object result = rpc_send_call((uint64_t)argvars[0].vval.v_number, + tv_get_string(&argvars[1]), args, &err); if (l_provider_call_nesting) { current_SID = save_current_SID; @@ -13954,7 +13922,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) // The last item of argv must be NULL argv[i] = NULL; - TerminalJobData *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, + Channel *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, CALLBACK_NONE, false, true, false, NULL); common_job_start(data, rettv); @@ -13977,10 +13945,11 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) } // if called with a job, stop it, else closes the channel - if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { + uint64_t id = argvars[0].vval.v_number; + if (find_job(id, false)) { // FIXME f_jobstop(argvars, rettv, NULL); } else { - rettv->vval.v_number = channel_close(argvars[0].vval.v_number); + rettv->vval.v_number = channel_close(id); } } @@ -16689,11 +16658,11 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) } uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); - TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, + Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, true, false, false, cwd); - data->proc.pty.width = term_width; - data->proc.pty.height = curwin->w_height; - data->proc.pty.term_name = xstrdup("xterm-256color"); + data->stream.pty.width = term_width; + data->stream.pty.height = curwin->w_height; + data->stream.pty.term_name = xstrdup("xterm-256color"); if (!common_job_start(data, rettv)) { return; } @@ -16705,7 +16674,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) topts.resize_cb = term_resize; topts.close_cb = term_close; - int pid = data->proc.pty.process.pid; + int pid = data->stream.pty.process.pid; char buf[1024]; // format the title with the pid to conform with the term:// URI @@ -16725,7 +16694,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) Terminal *term = terminal_open(topts); data->term = term; - data->refcount++; + channel_incref(data); return; } @@ -16760,30 +16729,6 @@ bool callback_from_typval(Callback *const callback, typval_T *const arg) return true; } -/// Unref/free callback -void callback_free(Callback *const callback) - FUNC_ATTR_NONNULL_ALL -{ - switch (callback->type) { - case kCallbackFuncref: { - func_unref(callback->data.funcref); - xfree(callback->data.funcref); - break; - } - case kCallbackPartial: { - partial_unref(callback->data.partial); - break; - } - case kCallbackNone: { - break; - } - default: { - abort(); - } - } - callback->type = kCallbackNone; -} - bool callback_call(Callback *const callback, const int argcount_in, typval_T *const argvars_in, typval_T *const rettv) FUNC_ATTR_NONNULL_ALL @@ -22402,7 +22347,7 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, return ret; } -static inline TerminalJobData *common_job_init(char **argv, +static inline Channel *common_job_init(char **argv, Callback on_stdout, Callback on_stderr, Callback on_exit, @@ -22411,25 +22356,18 @@ static inline TerminalJobData *common_job_init(char **argv, bool detach, const char *cwd) { - TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); - data->stopped = false; + Channel *data = channel_alloc(kChannelStreamProc); data->on_stdout = on_stdout; data->on_stderr = on_stderr; data->on_exit = on_exit; - data->events = multiqueue_new_child(main_loop.events); - data->rpc = rpc; + data->is_rpc = rpc; if (pty) { - data->proc.pty = pty_process_init(&main_loop, data); + data->stream.pty = pty_process_init(&main_loop, data); } else { - data->proc.uv = libuv_process_init(&main_loop, data); + data->stream.uv = libuv_process_init(&main_loop, data); } - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; proc->argv = argv; - proc->in = &data->in; - proc->out = &data->out; - if (!pty) { - proc->err = &data->err; - } proc->cb = eval_job_process_exit_cb; proc->events = data->events; proc->detach = detach; @@ -22456,80 +22394,66 @@ static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, return false; } -static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) +static inline bool common_job_start(Channel *data, typval_T *rettv) { - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; if (proc->type == kProcessTypePty && proc->detach) { EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); - xfree(data->proc.pty.term_name); + xfree(data->stream.pty.term_name); shell_free_argv(proc->argv); - free_term_job_data_event((void **)&data); + channel_decref(data); return false; } - data->id = next_chan_id++; - pmap_put(uint64_t)(jobs, data->id, data); - data->refcount++; char *cmd = xstrdup(proc->argv[0]); - int status = process_spawn(proc); + bool has_out, has_err; + if (proc->type == kProcessTypePty) { + has_out = true; + has_err = false; + } else { + has_out = data->is_rpc || data->on_stdout.type != kCallbackNone; + has_err = data->on_stderr.type != kCallbackNone; + } + int status = process_spawn(proc, true, has_out, has_err); if (status) { EMSG3(_(e_jobspawn), os_strerror(status), cmd); xfree(cmd); if (proc->type == kProcessTypePty) { - xfree(data->proc.pty.term_name); + xfree(data->stream.pty.term_name); } rettv->vval.v_number = proc->status; - term_job_data_decref(data); + channel_decref(data); return false; } xfree(cmd); - if (data->rpc) { - // the rpc channel takes over the in and out streams - channel_from_process(proc, data->id); + if (data->is_rpc) { + // the rpc takes over the in and out streams + rpc_start(data); } else { - wstream_init(proc->in, 0); - if (proc->out) { - rstream_init(proc->out, 0); - rstream_start(proc->out, on_job_stdout, data); + wstream_init(&proc->in, 0); + if (has_out) { + rstream_init(&proc->out, 0); + rstream_start(&proc->out, on_job_stdout, data); } } - if (proc->err) { - rstream_init(proc->err, 0); - rstream_start(proc->err, on_job_stderr, data); + if (has_err) { + rstream_init(&proc->err, 0); + rstream_start(&proc->err, on_job_stderr, data); } rettv->vval.v_number = data->id; return true; } -static inline void free_term_job_data_event(void **argv) -{ - TerminalJobData *data = argv[0]; - callback_free(&data->on_stdout); - callback_free(&data->on_stderr); - callback_free(&data->on_exit); - - multiqueue_free(data->events); - pmap_del(uint64_t)(jobs, data->id); - xfree(data); -} - -static inline void free_term_job_data(TerminalJobData *data) -{ - // data->queue may still be used after this function returns(process_wait), so - // only free in the next event loop iteration - multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data); -} - // vimscript job callbacks must be executed on Nvim main loop -static inline void process_job_event(TerminalJobData *data, Callback *callback, +static inline void process_job_event(Channel *data, Callback *callback, const char *type, char *buf, size_t count, int status) { - JobEvent event_data; + ChannelEvent event_data; event_data.received = NULL; if (buf) { event_data.received = tv_list_alloc(); @@ -22566,18 +22490,18 @@ static inline void process_job_event(TerminalJobData *data, Callback *callback, static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, void *job, bool eof) { - TerminalJobData *data = job; + Channel *data = job; on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); } static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *job, bool eof) { - TerminalJobData *data = job; + Channel *data = job; on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); } -static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, +static void on_job_output(Stream *stream, Channel *data, RBuffer *buf, size_t count, bool eof, Callback *callback, const char *type) { @@ -22604,14 +22528,14 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, static void eval_job_process_exit_cb(Process *proc, int status, void *d) { - TerminalJobData *data = d; - if (data->term && !data->exited) { - data->exited = true; + Channel *data = d; + if (data->term && !data->stream.proc.exited) { + data->stream.proc.exited = true; char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); terminal_close(data->term, msg); } - if (data->rpc) { + if (data->is_rpc) { channel_process_exit(data->id, status); } @@ -22621,58 +22545,51 @@ static void eval_job_process_exit_cb(Process *proc, int status, void *d) process_job_event(data, &data->on_exit, "exit", NULL, 0, status); - term_job_data_decref(data); + channel_decref(data); } static void term_write(char *buf, size_t size, void *d) { - TerminalJobData *job = d; - if (job->in.closed) { + Channel *job = d; + if (job->stream.proc.in.closed) { // If the backing stream was closed abruptly, there may be write events // ahead of the terminal close event. Just ignore the writes. ILOG("write failed: stream is closed"); return; } WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); - wstream_write(&job->in, wbuf); + wstream_write(&job->stream.proc.in, wbuf); } static void term_resize(uint16_t width, uint16_t height, void *d) { - TerminalJobData *data = d; - pty_process_resize(&data->proc.pty, width, height); + Channel *data = d; + pty_process_resize(&data->stream.pty, width, height); } static inline void term_delayed_free(void **argv) { - TerminalJobData *j = argv[0]; - if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) { + Channel *j = argv[0]; + if (j->stream.proc.in.pending_reqs || j->stream.proc.out.pending_reqs) { multiqueue_put(j->events, term_delayed_free, 1, j); return; } terminal_destroy(j->term); - term_job_data_decref(j); + channel_decref(j); } static void term_close(void *d) { - TerminalJobData *data = d; - if (!data->exited) { - data->exited = true; - process_stop((Process *)&data->proc); + Channel *data = d; + if (!data->stream.proc.exited) { + data->stream.proc.exited = true; + process_stop((Process *)&data->stream.proc); } multiqueue_put(data->events, term_delayed_free, 1, data); } -static void term_job_data_decref(TerminalJobData *data) -{ - if (!(--data->refcount)) { - free_term_job_data(data); - } -} - -static void on_job_event(JobEvent *ev) +static void on_job_event(ChannelEvent *ev) { if (!ev->callback) { return; @@ -22704,15 +22621,24 @@ static void on_job_event(JobEvent *ev) tv_clear(&rettv); } -static TerminalJobData *find_job(uint64_t id) +static Channel *find_job(uint64_t id, bool show_error) { - TerminalJobData *data = pmap_get(uint64_t)(jobs, id); - if (!data || data->stopped) { + Channel *data = find_channel(id); + if (!data || data->streamtype != kChannelStreamProc + || process_is_stopped(&data->stream.proc)) { + if (show_error) { + if (data && data->streamtype != kChannelStreamProc) { + EMSG(_(e_invchanjob)); + } else { + EMSG(_(e_invchan)); + } + } return NULL; } return data; } + static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) { if (check_restricted() || check_secure()) { -- cgit From 1ebc96fe10fbdbec22caa26d5d52a9f095da9687 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Mon, 5 Jun 2017 08:29:10 +0200 Subject: channels: allow bytes sockets and stdio, and buffered bytes output --- src/nvim/eval.c | 359 ++++++++++++++------------------------------------------ 1 file changed, 87 insertions(+), 272 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 013cfce78d..c2fd7ac19c 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -437,14 +437,6 @@ static ScopeDictDictItem vimvars_var; /// v: hashtab #define vimvarht vimvardict.dv_hashtab -typedef struct { - Channel *data; - Callback *callback; - const char *type; - list_T *received; - int status; -} ChannelEvent; - typedef struct { TimeWatcher tw; int timer_id; @@ -5121,12 +5113,12 @@ bool garbage_collect(bool testing) // named functions (matters for closures) ABORTING(set_ref_in_functions(copyID)); - // Jobs + // Channels { Channel *data; map_foreach_value(channels, data, { - set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); - set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); + set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL); + set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL); set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); }) } @@ -11643,8 +11635,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) bool detach = false; bool rpc = false; bool pty = false; - Callback on_stdout = CALLBACK_NONE; - Callback on_stderr = CALLBACK_NONE; + CallbackReader on_stdout = CALLBACK_READER_INIT, + on_stderr = CALLBACK_READER_INIT; Callback on_exit = CALLBACK_NONE; char *cwd = NULL; if (argvars[1].v_type == VAR_DICT) { @@ -11676,26 +11668,17 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) } } - Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, - pty, rpc, detach, cwd); + uint16_t width = 0, height = 0; + char *term_name = NULL; if (pty) { - PtyProcess *pty = &data->stream.pty; - uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); - if (width > 0) { - pty->width = width; - } - uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); - if (height > 0) { - pty->height = height; - } - char *term = tv_dict_get_string(job_opts, "TERM", true); - if (term) { - pty->term_name = term; - } + width = (uint16_t)tv_dict_get_number(job_opts, "width"); + height = (uint16_t)tv_dict_get_number(job_opts, "height"); + term_name = tv_dict_get_string(job_opts, "TERM", true); } - common_job_start(data, rettv); + channel_job_start(argv, on_stdout, on_stderr, on_exit, pty, rpc, detach, + cwd, width, height, term_name, &rettv->vval.v_number); } // "jobstop()" function @@ -11782,7 +11765,8 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } - int status = process_wait((Process *)&data->stream.proc, remaining, waiting_jobs); + int status = process_wait((Process *)&data->stream.proc, remaining, + waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -13922,10 +13906,9 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) // The last item of argv must be NULL argv[i] = NULL; - Channel *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, - CALLBACK_NONE, false, true, false, - NULL); - common_job_start(data, rettv); + channel_job_start(argv, CALLBACK_READER_INIT, CALLBACK_READER_INIT, + CALLBACK_NONE, false, true, false, NULL, 0, 0, NULL, + &rettv->vval.v_number); } // "rpcstop()" function @@ -13946,7 +13929,7 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) // if called with a job, stop it, else closes the channel uint64_t id = argvars[0].vval.v_number; - if (find_job(id, false)) { // FIXME + if (find_job(id, false)) { f_jobstop(argvars, rettv, NULL); } else { rettv->vval.v_number = channel_close(id); @@ -15126,18 +15109,19 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr) } bool rpc = false; + CallbackReader on_data = CALLBACK_READER_INIT; if (argvars[2].v_type == VAR_DICT) { dict_T *opts = argvars[2].vval.v_dict; rpc = tv_dict_get_number(opts, "rpc") != 0; - } - if (!rpc) { - EMSG2(_(e_invarg2), "rpc option must be true"); - return; + if (!tv_dict_get_callback(opts, S_LEN("on_data"), &on_data.cb)) { + return; + } + on_data.buffered = tv_dict_get_number(opts, "data_buffered"); } const char *error = NULL; - uint64_t id = channel_connect(tcp, address, 50, &error); + uint64_t id = channel_connect(tcp, address, rpc, on_data, 50, &error); if (error) { EMSG2(_("connection failed: %s"), error); @@ -15517,6 +15501,35 @@ static void f_sort(typval_T *argvars, typval_T *rettv, FunPtr fptr) do_sort_uniq(argvars, rettv, true); } +/// "stdioopen()" function +static void f_stdioopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ + if (argvars[0].v_type != VAR_DICT) { + EMSG(_(e_invarg)); + return; + } + + + bool rpc = false; + CallbackReader on_stdin = CALLBACK_READER_INIT; + dict_T *opts = argvars[0].vval.v_dict; + rpc = tv_dict_get_number(opts, "rpc") != 0; + + if (!tv_dict_get_callback(opts, S_LEN("on_stdin"), &on_stdin.cb)) { + return; + } + + const char *error; + uint64_t id = channel_from_stdio(rpc, on_stdin, &error); + if (!id) { + EMSG2(e_stdiochan2, error); + } + + + rettv->vval.v_number = (varnumber_T)id; + rettv->v_type = VAR_NUMBER; +} + /// "uniq({list})" function static void f_uniq(typval_T *argvars, typval_T *rettv, FunPtr fptr) { @@ -16633,8 +16646,9 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - Callback on_stdout = CALLBACK_NONE, on_stderr = CALLBACK_NONE, - on_exit = CALLBACK_NONE; + CallbackReader on_stdout = CALLBACK_READER_INIT, + on_stderr = CALLBACK_READER_INIT; + Callback on_exit = CALLBACK_NONE; dict_T *job_opts = NULL; const char *cwd = "."; if (argvars[1].v_type == VAR_DICT) { @@ -16658,23 +16672,23 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) } uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); - Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, - true, false, false, cwd); - data->stream.pty.width = term_width; - data->stream.pty.height = curwin->w_height; - data->stream.pty.term_name = xstrdup("xterm-256color"); - if (!common_job_start(data, rettv)) { + Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit, + true, false, false, cwd, + term_width, curwin->w_height, + xstrdup("xterm-256color"), + &rettv->vval.v_number); + if (rettv->vval.v_number <= 0) { return; } TerminalOptions topts; - topts.data = data; + topts.data = chan; topts.width = term_width; topts.height = curwin->w_height; topts.write_cb = term_write; topts.resize_cb = term_resize; topts.close_cb = term_close; - int pid = data->stream.pty.process.pid; + int pid = chan->stream.pty.process.pid; char buf[1024]; // format the title with the pid to conform with the term:// URI @@ -16685,16 +16699,19 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) (void)setfname(curbuf, (char_u *)buf, NULL, true); // Save the job id and pid in b:terminal_job_{id,pid} Error err = ERROR_INIT; + dict_set_var(curbuf->b_vars, cstr_as_string("terminal_channel_id"), + INTEGER_OBJ(chan->id), false, false, &err); + // deprecated name: dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_id"), - INTEGER_OBJ(rettv->vval.v_number), false, false, &err); + INTEGER_OBJ(chan->id), false, false, &err); api_clear_error(&err); dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_pid"), INTEGER_OBJ(pid), false, false, &err); api_clear_error(&err); Terminal *term = terminal_open(topts); - data->term = term; - channel_incref(data); + chan->term = term; + channel_incref(chan); return; } @@ -16783,6 +16800,13 @@ static bool set_ref_in_callback(Callback *callback, int copyID, return false; } +static bool set_ref_in_callback_reader(CallbackReader *reader, int copyID, + ht_stack_T **ht_stack, + list_stack_T **list_stack) +{ + return set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack); +} + static void add_timer_info(typval_T *rettv, timer_T *timer) { list_T *list = rettv->vval.v_list; @@ -22347,206 +22371,29 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, return ret; } -static inline Channel *common_job_init(char **argv, - Callback on_stdout, - Callback on_stderr, - Callback on_exit, - bool pty, - bool rpc, - bool detach, - const char *cwd) -{ - Channel *data = channel_alloc(kChannelStreamProc); - data->on_stdout = on_stdout; - data->on_stderr = on_stderr; - data->on_exit = on_exit; - data->is_rpc = rpc; - if (pty) { - data->stream.pty = pty_process_init(&main_loop, data); - } else { - data->stream.uv = libuv_process_init(&main_loop, data); - } - Process *proc = (Process *)&data->stream.proc; - proc->argv = argv; - proc->cb = eval_job_process_exit_cb; - proc->events = data->events; - proc->detach = detach; - proc->cwd = cwd; - return data; -} - /// common code for getting job callbacks for jobstart, termopen and rpcstart /// /// @return true/false on success/failure. -static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, - Callback *on_stderr, Callback *on_exit) +static inline bool common_job_callbacks(dict_T *vopts, + CallbackReader *on_stdout, + CallbackReader *on_stderr, + Callback *on_exit) { - if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), on_stdout) - &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), on_stderr) + if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), &on_stdout->cb) + &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), &on_stderr->cb) && tv_dict_get_callback(vopts, S_LEN("on_exit"), on_exit)) { + on_stdout->buffered = tv_dict_get_number(vopts, "stdout_buffered"); + on_stderr->buffered = tv_dict_get_number(vopts, "stderr_buffered"); vopts->dv_refcount++; return true; } - callback_free(on_stdout); - callback_free(on_stderr); + callback_reader_free(on_stdout); + callback_reader_free(on_stderr); callback_free(on_exit); return false; } -static inline bool common_job_start(Channel *data, typval_T *rettv) -{ - Process *proc = (Process *)&data->stream.proc; - if (proc->type == kProcessTypePty && proc->detach) { - EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); - xfree(data->stream.pty.term_name); - shell_free_argv(proc->argv); - channel_decref(data); - return false; - } - - data->refcount++; - char *cmd = xstrdup(proc->argv[0]); - bool has_out, has_err; - if (proc->type == kProcessTypePty) { - has_out = true; - has_err = false; - } else { - has_out = data->is_rpc || data->on_stdout.type != kCallbackNone; - has_err = data->on_stderr.type != kCallbackNone; - } - int status = process_spawn(proc, true, has_out, has_err); - if (status) { - EMSG3(_(e_jobspawn), os_strerror(status), cmd); - xfree(cmd); - if (proc->type == kProcessTypePty) { - xfree(data->stream.pty.term_name); - } - rettv->vval.v_number = proc->status; - channel_decref(data); - return false; - } - xfree(cmd); - - - if (data->is_rpc) { - // the rpc takes over the in and out streams - rpc_start(data); - } else { - wstream_init(&proc->in, 0); - if (has_out) { - rstream_init(&proc->out, 0); - rstream_start(&proc->out, on_job_stdout, data); - } - } - - if (has_err) { - rstream_init(&proc->err, 0); - rstream_start(&proc->err, on_job_stderr, data); - } - rettv->vval.v_number = data->id; - return true; -} - -// vimscript job callbacks must be executed on Nvim main loop -static inline void process_job_event(Channel *data, Callback *callback, - const char *type, char *buf, size_t count, - int status) -{ - ChannelEvent event_data; - event_data.received = NULL; - if (buf) { - event_data.received = tv_list_alloc(); - char *ptr = buf; - size_t remaining = count; - size_t off = 0; - - while (off < remaining) { - // append the line - if (ptr[off] == NL) { - tv_list_append_string(event_data.received, ptr, off); - size_t skip = off + 1; - ptr += skip; - remaining -= skip; - off = 0; - continue; - } - if (ptr[off] == NUL) { - // Translate NUL to NL - ptr[off] = NL; - } - off++; - } - tv_list_append_string(event_data.received, ptr, off); - } else { - event_data.status = status; - } - event_data.data = data; - event_data.callback = callback; - event_data.type = type; - on_job_event(&event_data); -} - -static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, - void *job, bool eof) -{ - Channel *data = job; - on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); -} - -static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, - void *job, bool eof) -{ - Channel *data = job; - on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); -} - -static void on_job_output(Stream *stream, Channel *data, RBuffer *buf, - size_t count, bool eof, Callback *callback, - const char *type) -{ - if (eof) { - return; - } - - // stub variable, to keep reading consistent with the order of events, only - // consider the count parameter. - size_t r; - char *ptr = rbuffer_read_ptr(buf, &r); - - // The order here matters, the terminal must receive the data first because - // process_job_event will modify the read buffer(convert NULs into NLs) - if (data->term) { - terminal_receive(data->term, ptr, count); - } - - rbuffer_consumed(buf, count); - if (callback->type != kCallbackNone) { - process_job_event(data, callback, type, ptr, count, 0); - } -} - -static void eval_job_process_exit_cb(Process *proc, int status, void *d) -{ - Channel *data = d; - if (data->term && !data->stream.proc.exited) { - data->stream.proc.exited = true; - char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; - snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); - terminal_close(data->term, msg); - } - if (data->is_rpc) { - channel_process_exit(data->id, status); - } - - if (data->status_ptr) { - *data->status_ptr = status; - } - - process_job_event(data, &data->on_exit, "exit", NULL, 0, status); - - channel_decref(data); -} static void term_write(char *buf, size_t size, void *d) { @@ -22589,38 +22436,6 @@ static void term_close(void *d) multiqueue_put(data->events, term_delayed_free, 1, data); } -static void on_job_event(ChannelEvent *ev) -{ - if (!ev->callback) { - return; - } - - typval_T argv[4]; - - argv[0].v_type = VAR_NUMBER; - argv[0].v_lock = 0; - argv[0].vval.v_number = ev->data->id; - - if (ev->received) { - argv[1].v_type = VAR_LIST; - argv[1].v_lock = 0; - argv[1].vval.v_list = ev->received; - argv[1].vval.v_list->lv_refcount++; - } else { - argv[1].v_type = VAR_NUMBER; - argv[1].v_lock = 0; - argv[1].vval.v_number = ev->status; - } - - argv[2].v_type = VAR_STRING; - argv[2].v_lock = 0; - argv[2].vval.v_string = (uint8_t *)ev->type; - - typval_T rettv = TV_INITIAL_VALUE; - callback_call(ev->callback, 3, argv, &rettv); - tv_clear(&rettv); -} - static Channel *find_job(uint64_t id, bool show_error) { Channel *data = find_channel(id); -- cgit From 3e59c1e20d605315d299e17ac9a059ccedd7e9d5 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 16 Jul 2017 14:29:04 +0200 Subject: channels: move away term code from eval.c --- src/nvim/eval.c | 61 ++++----------------------------------------------------- 1 file changed, 4 insertions(+), 57 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index c2fd7ac19c..6899474577 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -11550,7 +11550,8 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, argvars[2].vval.v_number); + pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, + argvars[2].vval.v_number); rettv->vval.v_number = 1; } @@ -16680,13 +16681,6 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) if (rettv->vval.v_number <= 0) { return; } - TerminalOptions topts; - topts.data = chan; - topts.width = term_width; - topts.height = curwin->w_height; - topts.write_cb = term_write; - topts.resize_cb = term_resize; - topts.close_cb = term_close; int pid = chan->stream.pty.process.pid; @@ -16699,9 +16693,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) (void)setfname(curbuf, (char_u *)buf, NULL, true); // Save the job id and pid in b:terminal_job_{id,pid} Error err = ERROR_INIT; - dict_set_var(curbuf->b_vars, cstr_as_string("terminal_channel_id"), - INTEGER_OBJ(chan->id), false, false, &err); - // deprecated name: + // deprecated: use 'channel' buffer option dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_id"), INTEGER_OBJ(chan->id), false, false, &err); api_clear_error(&err); @@ -16709,11 +16701,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) INTEGER_OBJ(pid), false, false, &err); api_clear_error(&err); - Terminal *term = terminal_open(topts); - chan->term = term; - channel_incref(chan); - - return; + channel_terminal_open(chan); } // "test_garbagecollect_now()" function @@ -22395,47 +22383,6 @@ static inline bool common_job_callbacks(dict_T *vopts, } -static void term_write(char *buf, size_t size, void *d) -{ - Channel *job = d; - if (job->stream.proc.in.closed) { - // If the backing stream was closed abruptly, there may be write events - // ahead of the terminal close event. Just ignore the writes. - ILOG("write failed: stream is closed"); - return; - } - WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); - wstream_write(&job->stream.proc.in, wbuf); -} - -static void term_resize(uint16_t width, uint16_t height, void *d) -{ - Channel *data = d; - pty_process_resize(&data->stream.pty, width, height); -} - -static inline void term_delayed_free(void **argv) -{ - Channel *j = argv[0]; - if (j->stream.proc.in.pending_reqs || j->stream.proc.out.pending_reqs) { - multiqueue_put(j->events, term_delayed_free, 1, j); - return; - } - - terminal_destroy(j->term); - channel_decref(j); -} - -static void term_close(void *d) -{ - Channel *data = d; - if (!data->stream.proc.exited) { - data->stream.proc.exited = true; - process_stop((Process *)&data->stream.proc); - } - multiqueue_put(data->events, term_delayed_free, 1, data); -} - static Channel *find_job(uint64_t id, bool show_error) { Channel *data = find_channel(id); -- cgit From 90e5cc5484ceeb410ae2a2706e09ed475cade4a5 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Thu, 8 Jun 2017 17:15:53 +0200 Subject: channels: generalize jobclose() --- src/nvim/eval.c | 107 ++++++++++++++++++++++++-------------------------------- 1 file changed, 45 insertions(+), 62 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 6899474577..ba356f28b9 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -7322,6 +7322,45 @@ static void f_changenr(typval_T *argvars, typval_T *rettv, FunPtr fptr) rettv->vval.v_number = curbuf->b_u_seq_cur; } +// "chanclose(id[, stream])" function +static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ + rettv->v_type = VAR_NUMBER; + rettv->vval.v_number = 0; + + if (check_restricted() || check_secure()) { + return; + } + + if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING + && argvars[1].v_type != VAR_UNKNOWN)) { + EMSG(_(e_invarg)); + return; + } + + ChannelPart part = kChannelPartAll; + if (argvars[1].v_type == VAR_STRING) { + char *stream = (char *)argvars[1].vval.v_string; + if (!strcmp(stream, "stdin")) { + part = kChannelPartStdin; + } else if (!strcmp(stream, "stdout")) { + part = kChannelPartStdout; + } else if (!strcmp(stream, "stderr")) { + part = kChannelPartStderr; + } else if (!strcmp(stream, "rpc")) { + part = kChannelPartRpc; + } else { + EMSG2(_("Invalid channel stream \"%s\""), stream); + return; + } + } + const char *error; + rettv->vval.v_number = channel_close(argvars[0].vval.v_number, part, &error); + if (!rettv->vval.v_number) { + EMSG(error); + } +} + /* * "char2nr(string)" function */ @@ -11391,67 +11430,6 @@ static void f_items(typval_T *argvars, typval_T *rettv, FunPtr fptr) dict_list(argvars, rettv, 2); } -// "jobclose(id[, stream])" function -static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) -{ - rettv->v_type = VAR_NUMBER; - rettv->vval.v_number = 0; - - if (check_restricted() || check_secure()) { - return; - } - - if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING - && argvars[1].v_type != VAR_UNKNOWN)) { - EMSG(_(e_invarg)); - return; - } - - Channel *data = find_job(argvars[0].vval.v_number, true); - if (!data) { - return; - } - - Process *proc = (Process *)&data->stream.proc; - - if (argvars[1].v_type == VAR_STRING) { - char *stream = (char *)argvars[1].vval.v_string; - if (!strcmp(stream, "stdin")) { - if (data->is_rpc) { - EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); - } else { - process_close_in(proc); - } - } else if (!strcmp(stream, "stdout")) { - if (data->is_rpc) { - EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); - } else { - process_close_out(proc); - } - } else if (!strcmp(stream, "stderr")) { - process_close_err(proc); - } else if (!strcmp(stream, "rpc")) { - if (data->is_rpc) { - channel_close(data->id); - } else { - EMSG(_("Invalid job stream: Not an rpc job")); - } - } else { - EMSG2(_("Invalid job stream \"%s\""), stream); - } - } else { - if (data->is_rpc) { - channel_close(data->id); - process_close_err(proc); - } else { - process_close_streams(proc); - if (proc->type == kProcessTypePty) { - pty_process_close_master(&data->stream.pty); - } - } - } -} - // "jobpid(id)" function static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr) { @@ -13933,7 +13911,12 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) if (find_job(id, false)) { f_jobstop(argvars, rettv, NULL); } else { - rettv->vval.v_number = channel_close(id); + const char *error; + rettv->vval.v_number = channel_close(argvars[0].vval.v_number, + kChannelPartRpc, &error); + if (!rettv->vval.v_number) { + EMSG(error); + } } } -- cgit From 5af47031773fc647de867444693d1598d0da458d Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Fri, 9 Jun 2017 08:40:24 +0200 Subject: channels: stderr channel --- src/nvim/eval.c | 79 ++++++++++++++++++++++++--------------------------------- 1 file changed, 33 insertions(+), 46 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index ba356f28b9..f92e2d8d65 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -366,6 +366,7 @@ static struct vimvar { VV(VV_DYING, "dying", VAR_NUMBER, VV_RO), VV(VV_EXCEPTION, "exception", VAR_STRING, VV_RO), VV(VV_THROWPOINT, "throwpoint", VAR_STRING, VV_RO), + VV(VV_STDERR, "stderr", VAR_NUMBER, VV_RO), VV(VV_REG, "register", VAR_STRING, VV_RO), VV(VV_CMDBANG, "cmdbang", VAR_NUMBER, VV_RO), VV(VV_INSERTMODE, "insertmode", VAR_STRING, VV_RO), @@ -586,6 +587,7 @@ void eval_init(void) v_event->dv_lock = VAR_FIXED; set_vim_var_dict(VV_EVENT, v_event); set_vim_var_list(VV_ERRORS, tv_list_alloc()); + set_vim_var_nr(VV_STDERR, CHAN_STDERR); set_vim_var_nr(VV_SEARCHFORWARD, 1L); set_vim_var_nr(VV_HLSEARCH, 1L); set_vim_var_nr(VV_COUNT1, 1); @@ -7361,6 +7363,37 @@ static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) } } +// "chansend(id, data)" function +static void f_chansend(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ + rettv->v_type = VAR_NUMBER; + rettv->vval.v_number = 0; + + if (check_restricted() || check_secure()) { + return; + } + + if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) { + // First argument is the channel id and second is the data to write + EMSG(_(e_invarg)); + return; + } + + ptrdiff_t input_len = 0; + char *input = save_tv_as_string(&argvars[1], &input_len, false); + if (!input) { + // Either the error has been handled by save_tv_as_string(), + // or there is no input to send. + return; + } + uint64_t id = argvars[0].vval.v_number; + const char *error = NULL; + rettv->vval.v_number = channel_send(id, input, input_len, &error); + if (error) { + EMSG(error); + } +} + /* * "char2nr(string)" function */ @@ -11454,52 +11487,6 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr) rettv->vval.v_number = proc->pid; } -// "jobsend()" function -static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) -{ - rettv->v_type = VAR_NUMBER; - rettv->vval.v_number = 0; - - if (check_restricted() || check_secure()) { - return; - } - - if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) { - // First argument is the job id and second is the string or list to write - // to the job's stdin - EMSG(_(e_invarg)); - return; - } - - Channel *data = find_channel(argvars[0].vval.v_number); - if (!data) { - EMSG(_(e_invchan)); - return; - } - - Stream *in = channel_instream(data); - if (in->closed) { - EMSG(_("Can't send data to the job: stdin is closed")); - return; - } - - if (data->is_rpc) { - EMSG(_("Can't send raw data to rpc channel")); - return; - } - - ptrdiff_t input_len = 0; - char *input = save_tv_as_string(&argvars[1], &input_len, false); - if (!input) { - // Either the error has been handled by save_tv_as_string(), or there is no - // input to send. - return; - } - - WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); - rettv->vval.v_number = wstream_write(in, buf); -} - // "jobresize(job, width, height)" function static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) { -- cgit From 5517d2323ba359d5ed0cb9f0e9abdfc2a9871894 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 23 Jul 2017 19:23:02 +0200 Subject: channels: reimplement logging (as stub for proper event) --- src/nvim/eval.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index f92e2d8d65..40ee3545b6 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -11643,8 +11643,12 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) term_name = tv_dict_get_string(job_opts, "TERM", true); } - channel_job_start(argv, on_stdout, on_stderr, on_exit, pty, rpc, detach, - cwd, width, height, term_name, &rettv->vval.v_number); + Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit, pty, + rpc, detach, cwd, width, height, term_name, + &rettv->vval.v_number); + if (chan) { + channel_create_event(chan, NULL); + } } // "jobstop()" function @@ -13872,9 +13876,13 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) // The last item of argv must be NULL argv[i] = NULL; - channel_job_start(argv, CALLBACK_READER_INIT, CALLBACK_READER_INIT, - CALLBACK_NONE, false, true, false, NULL, 0, 0, NULL, - &rettv->vval.v_number); + Channel *chan = channel_job_start(argv, CALLBACK_READER_INIT, + CALLBACK_READER_INIT, CALLBACK_NONE, + false, true, false, NULL, 0, 0, NULL, + &rettv->vval.v_number); + if (chan) { + channel_create_event(chan, NULL); + } } // "rpcstop()" function @@ -16672,6 +16680,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) api_clear_error(&err); channel_terminal_open(chan); + channel_create_event(chan, NULL); } // "test_garbagecollect_now()" function -- cgit From f629f8312d2a830ce7999a6612203977ec83daf8 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Tue, 25 Jul 2017 11:59:08 +0200 Subject: channels: refactor jobwait --- src/nvim/eval.c | 71 ++++++++++++++++++++++++++------------------------------- 1 file changed, 32 insertions(+), 39 deletions(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 40ee3545b6..5fa92cedbd 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -11693,28 +11693,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } + list_T *args = argvars[0].vval.v_list; - list_T *rv = tv_list_alloc(); + Channel **jobs = xcalloc(args->lv_len, sizeof(*jobs)); ui_busy_start(); MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop); // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Channel *data = NULL; + + int i = 0; + for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next, i++) { + Channel *chan = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number, false))) { - tv_list_append_number(rv, -3); + || !(chan = find_job(arg->li_tv.vval.v_number, false))) { + jobs[i] = NULL; } else { - // append the list item and set the status pointer so we'll collect the - // status code when the job exits - tv_list_append_number(rv, -1); - data->status_ptr = &rv->lv_last->li_tv.vval.v_number; - // Process any pending events for the job because we'll temporarily - // replace the parent queue - multiqueue_process_events(data->events); - multiqueue_replace_parent(data->events, waiting_jobs); + jobs[i] = chan; + channel_incref(chan); + if (chan->stream.proc.status < 0) { + // Process any pending events for the job because we'll temporarily + // replace the parent queue + multiqueue_process_events(chan->events); + multiqueue_replace_parent(chan->events, waiting_jobs); + } } } @@ -11725,25 +11728,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) before = os_hrtime(); } - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Channel *data = NULL; + for (i = 0; i < args->lv_len; i++) { if (remaining == 0) { // timed out break; } - if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number, false))) { + + // if the job already exited, but wasn't freed yet + if (jobs[i] == NULL || jobs[i]->stream.proc.status >= 0) { continue; } - int status = process_wait((Process *)&data->stream.proc, remaining, + + int status = process_wait(&jobs[i]->stream.proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. - if (status == -2) { - // set the status so the user can distinguish between interrupted and - // skipped/timeout jobs. - *data->status_ptr = -2; - } break; } if (remaining > 0) { @@ -11756,30 +11755,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) } } - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Channel *data = NULL; - if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number, false))) { - continue; - } - // remove the status pointer because the list may be freed before the - // job exits - data->status_ptr = NULL; - } + list_T *rv = tv_list_alloc(); // restore the parent queue for any jobs still alive - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - Channel *data = NULL; - if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number, false))) { + for (i = 0; i < args->lv_len; i++) { + if (jobs[i] == NULL) { + tv_list_append_number(rv, -3); continue; } // restore the parent queue for the job - multiqueue_process_events(data->events); - multiqueue_replace_parent(data->events, main_loop.events); + multiqueue_process_events(jobs[i]->events); + multiqueue_replace_parent(jobs[i]->events, main_loop.events); + + tv_list_append_number(rv, jobs[i]->stream.proc.status); + channel_decref(jobs[i]); } multiqueue_free(waiting_jobs); + xfree(jobs); ui_busy_stop(); rv->lv_refcount++; rettv->v_type = VAR_LIST; -- cgit From a97cdff14df1bb788a4b659e0db94e2b2ba1f539 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 17 Sep 2017 16:23:39 +0200 Subject: channels: improvements to buffering --- src/nvim/eval.c | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'src/nvim/eval.c') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 5fa92cedbd..577aa67c60 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -15090,6 +15090,9 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } on_data.buffered = tv_dict_get_number(opts, "data_buffered"); + if (on_data.buffered && on_data.cb.type == kCallbackNone) { + on_data.self = opts; + } } const char *error = NULL; @@ -15490,6 +15493,10 @@ static void f_stdioopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) if (!tv_dict_get_callback(opts, S_LEN("on_stdin"), &on_stdin.cb)) { return; } + on_stdin.buffered = tv_dict_get_number(opts, "stdin_buffered"); + if (on_stdin.buffered && on_stdin.cb.type == kCallbackNone) { + on_stdin.self = opts; + } const char *error; uint64_t id = channel_from_stdio(rpc, on_stdin, &error); @@ -16764,7 +16771,17 @@ static bool set_ref_in_callback_reader(CallbackReader *reader, int copyID, ht_stack_T **ht_stack, list_stack_T **list_stack) { - return set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack); + if (set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack)) { + return true; + } + + if (reader->self) { + typval_T tv; + tv.v_type = VAR_DICT; + tv.vval.v_dict = reader->self; + return set_ref_in_item(&tv, copyID, ht_stack, list_stack); + } + return false; } static void add_timer_info(typval_T *rettv, timer_T *timer) @@ -22344,6 +22361,12 @@ static inline bool common_job_callbacks(dict_T *vopts, && tv_dict_get_callback(vopts, S_LEN("on_exit"), on_exit)) { on_stdout->buffered = tv_dict_get_number(vopts, "stdout_buffered"); on_stderr->buffered = tv_dict_get_number(vopts, "stderr_buffered"); + if (on_stdout->buffered && on_stdout->cb.type == kCallbackNone) { + on_stdout->self = vopts; + } + if (on_stderr->buffered && on_stderr->cb.type == kCallbackNone) { + on_stderr->self = vopts; + } vopts->dv_refcount++; return true; } -- cgit