diff options
Diffstat (limited to 'src/nvim/eval.c')
| -rw-r--r-- | src/nvim/eval.c | 820 | 
1 files changed, 250 insertions, 570 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index f414e771d7..577aa67c60 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -24,6 +24,7 @@  #endif  #include "nvim/eval.h"  #include "nvim/buffer.h" +#include "nvim/channel.h"  #include "nvim/charset.h"  #include "nvim/cursor.h"  #include "nvim/diff.h" @@ -365,6 +366,7 @@ static struct vimvar {    VV(VV_DYING,          "dying",            VAR_NUMBER, VV_RO),    VV(VV_EXCEPTION,      "exception",        VAR_STRING, VV_RO),    VV(VV_THROWPOINT,     "throwpoint",       VAR_STRING, VV_RO), +  VV(VV_STDERR,         "stderr",           VAR_NUMBER, VV_RO),    VV(VV_REG,            "register",         VAR_STRING, VV_RO),    VV(VV_CMDBANG,        "cmdbang",          VAR_NUMBER, VV_RO),    VV(VV_INSERTMODE,     "insertmode",       VAR_STRING, VV_RO), @@ -437,31 +439,6 @@ static ScopeDictDictItem vimvars_var;  #define vimvarht  vimvardict.dv_hashtab  typedef struct { -  union { -    LibuvProcess uv; -    PtyProcess pty; -  } proc; -  Stream in, out, err;  // Initialized in common_job_start(). -  Terminal *term; -  bool stopped; -  bool exited; -  bool rpc; -  int refcount; -  Callback on_stdout, on_stderr, on_exit; -  varnumber_T *status_ptr; -  uint64_t id; -  MultiQueue *events; -} TerminalJobData; - -typedef struct { -  TerminalJobData *data; -  Callback *callback; -  const char *type; -  list_T *received; -  int status; -} JobEvent; - -typedef struct {    TimeWatcher tw;    int timer_id;    int repeat_count; @@ -513,7 +490,6 @@ typedef enum {  #define FNE_INCL_BR     1       /* find_name_end(): include [] in name */  #define FNE_CHECK_START 2       /* find_name_end(): check name starts with                                     valid character */ -static PMap(uint64_t) *jobs = NULL;  static uint64_t last_timer_id = 0;  static PMap(uint64_t) *timers = NULL; @@ -556,7 +532,6 @@ void eval_init(void)  {    vimvars[VV_VERSION].vv_nr = VIM_VERSION_100; -  jobs = pmap_new(uint64_t)();    timers = pmap_new(uint64_t)();    struct vimvar   *p; @@ -612,6 +587,7 @@ void eval_init(void)    v_event->dv_lock = VAR_FIXED;    set_vim_var_dict(VV_EVENT, v_event);    set_vim_var_list(VV_ERRORS, tv_list_alloc()); +  set_vim_var_nr(VV_STDERR,   CHAN_STDERR);    set_vim_var_nr(VV_SEARCHFORWARD, 1L);    set_vim_var_nr(VV_HLSEARCH, 1L);    set_vim_var_nr(VV_COUNT1, 1); @@ -5139,12 +5115,12 @@ bool garbage_collect(bool testing)    // named functions (matters for closures)    ABORTING(set_ref_in_functions(copyID)); -  // Jobs +  // Channels    { -    TerminalJobData *data; -    map_foreach_value(jobs, data, { -      set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); -      set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); +    Channel *data; +    map_foreach_value(channels, data, { +      set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL); +      set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);        set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);      })    } @@ -7348,6 +7324,76 @@ static void f_changenr(typval_T *argvars, typval_T *rettv, FunPtr fptr)    rettv->vval.v_number = curbuf->b_u_seq_cur;  } +// "chanclose(id[, stream])" function +static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ +  rettv->v_type = VAR_NUMBER; +  rettv->vval.v_number = 0; + +  if (check_restricted() || check_secure()) { +    return; +  } + +  if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING +        && argvars[1].v_type != VAR_UNKNOWN)) { +    EMSG(_(e_invarg)); +    return; +  } + +  ChannelPart part = kChannelPartAll; +  if (argvars[1].v_type == VAR_STRING) { +    char *stream = (char *)argvars[1].vval.v_string; +    if (!strcmp(stream, "stdin")) { +      part = kChannelPartStdin; +    } else if (!strcmp(stream, "stdout")) { +      part = kChannelPartStdout; +    } else if (!strcmp(stream, "stderr")) { +      part = kChannelPartStderr; +    } else if (!strcmp(stream, "rpc")) { +      part = kChannelPartRpc; +    } else { +      EMSG2(_("Invalid channel stream \"%s\""), stream); +      return; +    } +  } +  const char *error; +  rettv->vval.v_number = channel_close(argvars[0].vval.v_number, part, &error); +  if (!rettv->vval.v_number) { +    EMSG(error); +  } +} + +// "chansend(id, data)" function +static void f_chansend(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ +  rettv->v_type = VAR_NUMBER; +  rettv->vval.v_number = 0; + +  if (check_restricted() || check_secure()) { +    return; +  } + +  if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) { +    // First argument is the channel id and second is the data to write +    EMSG(_(e_invarg)); +    return; +  } + +  ptrdiff_t input_len = 0; +  char *input = save_tv_as_string(&argvars[1], &input_len, false); +  if (!input) { +    // Either the error has been handled by save_tv_as_string(), +    // or there is no input to send. +    return; +  } +  uint64_t id = argvars[0].vval.v_number; +  const char *error = NULL; +  rettv->vval.v_number = channel_send(id, input, input_len, &error); +  if (error) { +    EMSG(error); +  } +} +  /*   * "char2nr(string)" function   */ @@ -11417,68 +11463,6 @@ static void f_items(typval_T *argvars, typval_T *rettv, FunPtr fptr)    dict_list(argvars, rettv, 2);  } -// "jobclose(id[, stream])" function -static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) -{ -  rettv->v_type = VAR_NUMBER; -  rettv->vval.v_number = 0; - -  if (check_restricted() || check_secure()) { -    return; -  } - -  if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING -        && argvars[1].v_type != VAR_UNKNOWN)) { -    EMSG(_(e_invarg)); -    return; -  } - -  TerminalJobData *data = find_job(argvars[0].vval.v_number); -  if (!data) { -    EMSG(_(e_invjob)); -    return; -  } - -  Process *proc = (Process *)&data->proc; - -  if (argvars[1].v_type == VAR_STRING) { -    char *stream = (char *)argvars[1].vval.v_string; -    if (!strcmp(stream, "stdin")) { -      if (data->rpc) { -        EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); -      } else { -        process_close_in(proc); -      } -    } else if (!strcmp(stream, "stdout")) { -      if (data->rpc) { -        EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); -      } else { -        process_close_out(proc); -      } -    } else if (!strcmp(stream, "stderr")) { -      process_close_err(proc); -    } else if (!strcmp(stream, "rpc")) { -      if (data->rpc) { -        channel_close(data->id); -      } else { -        EMSG(_("Invalid job stream: Not an rpc job")); -      } -    } else { -      EMSG2(_("Invalid job stream \"%s\""), stream); -    } -  } else { -    if (data->rpc) { -      channel_close(data->id); -      process_close_err(proc); -    } else { -      process_close_streams(proc); -      if (proc->type == kProcessTypePty) { -        pty_process_close_master(&data->proc.pty); -      } -    } -  } -} -  // "jobpid(id)" function  static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)  { @@ -11494,61 +11478,15 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)      return;    } -  TerminalJobData *data = find_job(argvars[0].vval.v_number); +  Channel *data = find_job(argvars[0].vval.v_number, true);    if (!data) { -    EMSG(_(e_invjob));      return;    } -  Process *proc = (Process *)&data->proc; +  Process *proc = (Process *)&data->stream.proc;    rettv->vval.v_number = proc->pid;  } -// "jobsend()" function -static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) -{ -  rettv->v_type = VAR_NUMBER; -  rettv->vval.v_number = 0; - -  if (check_restricted() || check_secure()) { -    return; -  } - -  if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) { -    // First argument is the job id and second is the string or list to write -    // to the job's stdin -    EMSG(_(e_invarg)); -    return; -  } - -  TerminalJobData *data = find_job(argvars[0].vval.v_number); -  if (!data) { -    EMSG(_(e_invjob)); -    return; -  } - -  if (((Process *)&data->proc)->in->closed) { -    EMSG(_("Can't send data to the job: stdin is closed")); -    return; -  } - -  if (data->rpc) { -    EMSG(_("Can't send raw data to rpc channel")); -    return; -  } - -  ptrdiff_t input_len = 0; -  char *input = save_tv_as_string(&argvars[1], &input_len, false); -  if (!input) { -    // Either the error has been handled by save_tv_as_string(), or there is no -    // input to send. -    return; -  } - -  WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); -  rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf); -} -  // "jobresize(job, width, height)" function  static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr)  { @@ -11567,19 +11505,18 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr)    } -  TerminalJobData *data = find_job(argvars[0].vval.v_number); +  Channel *data = find_job(argvars[0].vval.v_number, true);    if (!data) { -    EMSG(_(e_invjob));      return;    } -  if (data->proc.uv.process.type != kProcessTypePty) { -    EMSG(_(e_jobnotpty)); +  if (data->stream.proc.type != kProcessTypePty) { +    EMSG(_(e_channotpty));      return;    } -  pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, -      argvars[2].vval.v_number); +  pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, +                     argvars[2].vval.v_number);    rettv->vval.v_number = 1;  } @@ -11664,8 +11601,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr)    bool detach = false;    bool rpc = false;    bool pty = false; -  Callback on_stdout = CALLBACK_NONE; -  Callback on_stderr = CALLBACK_NONE; +  CallbackReader on_stdout = CALLBACK_READER_INIT, +                 on_stderr = CALLBACK_READER_INIT;    Callback on_exit = CALLBACK_NONE;    char *cwd = NULL;    if (argvars[1].v_type == VAR_DICT) { @@ -11697,32 +11634,21 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr)      }    } -  TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, -                                          pty, rpc, detach, cwd); -  Process *proc = (Process *)&data->proc; +  uint16_t width = 0, height = 0; +  char *term_name = NULL;    if (pty) { -    uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); -    if (width > 0) { -      data->proc.pty.width = width; -    } -    uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); -    if (height > 0) { -      data->proc.pty.height = height; -    } -    char *term = tv_dict_get_string(job_opts, "TERM", true); -    if (term) { -      data->proc.pty.term_name = term; -    } +    width = (uint16_t)tv_dict_get_number(job_opts, "width"); +    height = (uint16_t)tv_dict_get_number(job_opts, "height"); +    term_name = tv_dict_get_string(job_opts, "TERM", true);    } -  if (!rpc && on_stdout.type == kCallbackNone) { -    proc->out = NULL; +  Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit, pty, +                                    rpc, detach, cwd, width, height, term_name, +                                    &rettv->vval.v_number); +  if (chan) { +    channel_create_event(chan, NULL);    } -  if (on_stderr.type == kCallbackNone) { -    proc->err = NULL; -  } -  common_job_start(data, rettv);  }  // "jobstop()" function @@ -11742,14 +11668,12 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv, FunPtr fptr)    } -  TerminalJobData *data = find_job(argvars[0].vval.v_number); +  Channel *data = find_job(argvars[0].vval.v_number, true);    if (!data) { -    EMSG(_(e_invjob));      return;    } -  process_stop((Process *)&data->proc); -  data->stopped = true; +  process_stop((Process *)&data->stream.proc);    rettv->vval.v_number = 1;  } @@ -11769,28 +11693,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)      return;    } +    list_T *args = argvars[0].vval.v_list; -  list_T *rv = tv_list_alloc(); +  Channel **jobs = xcalloc(args->lv_len, sizeof(*jobs));    ui_busy_start();    MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop);    // For each item in the input list append an integer to the output list. -3    // is used to represent an invalid job id, -2 is for a interrupted job and    // -1 for jobs that were skipped or timed out. -  for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    TerminalJobData *data = NULL; + +  int i = 0; +  for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next, i++) { +    Channel *chan = NULL;      if (arg->li_tv.v_type != VAR_NUMBER -        || !(data = find_job(arg->li_tv.vval.v_number))) { -      tv_list_append_number(rv, -3); +        || !(chan = find_job(arg->li_tv.vval.v_number, false))) { +      jobs[i] = NULL;      } else { -      // append the list item and set the status pointer so we'll collect the -      // status code when the job exits -      tv_list_append_number(rv, -1); -      data->status_ptr = &rv->lv_last->li_tv.vval.v_number; -      // Process any pending events for the job because we'll temporarily -      // replace the parent queue -      multiqueue_process_events(data->events); -      multiqueue_replace_parent(data->events, waiting_jobs); +      jobs[i] = chan; +      channel_incref(chan); +      if (chan->stream.proc.status < 0) { +        // Process any pending events for the job because we'll temporarily +        // replace the parent queue +        multiqueue_process_events(chan->events); +        multiqueue_replace_parent(chan->events, waiting_jobs); +      }      }    } @@ -11801,24 +11728,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)      before = os_hrtime();    } -  for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    TerminalJobData *data = NULL; +  for (i = 0; i < args->lv_len; i++) {      if (remaining == 0) {        // timed out        break;      } -    if (arg->li_tv.v_type != VAR_NUMBER -        || !(data = find_job(arg->li_tv.vval.v_number))) { + +    // if the job already exited, but wasn't freed yet +    if (jobs[i] == NULL || jobs[i]->stream.proc.status >= 0) {        continue;      } -    int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); + +    int status = process_wait(&jobs[i]->stream.proc, remaining, +                              waiting_jobs);      if (status < 0) {        // interrupted or timed out, skip remaining jobs. -      if (status == -2) { -        // set the status so the user can distinguish between interrupted and -        // skipped/timeout jobs. -        *data->status_ptr = -2; -      }        break;      }      if (remaining > 0) { @@ -11831,30 +11755,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)      }    } -  for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    TerminalJobData *data = NULL; -    if (arg->li_tv.v_type != VAR_NUMBER -        || !(data = find_job(arg->li_tv.vval.v_number))) { -      continue; -    } -    // remove the status pointer because the list may be freed before the -    // job exits -    data->status_ptr = NULL; -  } +  list_T *rv = tv_list_alloc();    // restore the parent queue for any jobs still alive -  for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    TerminalJobData *data = NULL; -    if (arg->li_tv.v_type != VAR_NUMBER -        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { +  for (i = 0; i < args->lv_len; i++) { +    if (jobs[i] == NULL) { +      tv_list_append_number(rv, -3);        continue;      }      // restore the parent queue for the job -    multiqueue_process_events(data->events); -    multiqueue_replace_parent(data->events, main_loop.events); +    multiqueue_process_events(jobs[i]->events); +    multiqueue_replace_parent(jobs[i]->events, main_loop.events); + +    tv_list_append_number(rv, jobs[i]->stream.proc.status); +    channel_decref(jobs[i]);    }    multiqueue_free(waiting_jobs); +  xfree(jobs);    ui_busy_stop();    rv->lv_refcount++;    rettv->v_type = VAR_LIST; @@ -13803,9 +13721,8 @@ static void f_rpcnotify(typval_T *argvars, typval_T *rettv, FunPtr fptr)      ADD(args, vim_to_object(tv));    } -  if (!channel_send_event((uint64_t)argvars[0].vval.v_number, -                          tv_get_string(&argvars[1]), -                          args)) { +  if (!rpc_send_event((uint64_t)argvars[0].vval.v_number, +                      tv_get_string(&argvars[1]), args)) {      EMSG2(_(e_invarg2), "Channel doesn't exist");      return;    } @@ -13870,10 +13787,8 @@ static void f_rpcrequest(typval_T *argvars, typval_T *rettv, FunPtr fptr)    Error err = ERROR_INIT; -  Object result = channel_send_call((uint64_t)argvars[0].vval.v_number, -                                    tv_get_string(&argvars[1]), -                                    args, -                                    &err); +  Object result = rpc_send_call((uint64_t)argvars[0].vval.v_number, +                                tv_get_string(&argvars[1]), args, &err);    if (l_provider_call_nesting) {      current_SID = save_current_SID; @@ -13954,10 +13869,13 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr)    // The last item of argv must be NULL    argv[i] = NULL; -  TerminalJobData *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, -                                          CALLBACK_NONE, false, true, false, -                                          NULL); -  common_job_start(data, rettv); +  Channel *chan = channel_job_start(argv, CALLBACK_READER_INIT, +                                    CALLBACK_READER_INIT, CALLBACK_NONE, +                                    false, true, false, NULL, 0, 0, NULL, +                                    &rettv->vval.v_number); +  if (chan) { +    channel_create_event(chan, NULL); +  }  }  // "rpcstop()" function @@ -13977,10 +13895,16 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr)    }    // if called with a job, stop it, else closes the channel -  if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { +  uint64_t id = argvars[0].vval.v_number; +  if (find_job(id, false)) {      f_jobstop(argvars, rettv, NULL);    } else { -    rettv->vval.v_number = channel_close(argvars[0].vval.v_number); +    const char *error; +    rettv->vval.v_number = channel_close(argvars[0].vval.v_number, +                                         kChannelPartRpc, &error); +    if (!rettv->vval.v_number) { +      EMSG(error); +    }    }  } @@ -15157,19 +15081,22 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr)    }    bool rpc = false; +  CallbackReader on_data = CALLBACK_READER_INIT;    if (argvars[2].v_type == VAR_DICT) {      dict_T *opts = argvars[2].vval.v_dict;      rpc = tv_dict_get_number(opts, "rpc") != 0; -  } -  if (!rpc) { -    EMSG2(_(e_invarg2), "rpc option must be true"); -    return; +    if (!tv_dict_get_callback(opts, S_LEN("on_data"), &on_data.cb)) { +      return; +    } +    on_data.buffered = tv_dict_get_number(opts, "data_buffered"); +    if (on_data.buffered && on_data.cb.type == kCallbackNone) { +      on_data.self = opts; +    }    }    const char *error = NULL; -  eval_format_source_name_line((char *)IObuff, sizeof(IObuff)); -  uint64_t id = channel_connect(tcp, address, 50, (char *)IObuff, &error); +  uint64_t id = channel_connect(tcp, address, rpc, on_data, 50, &error);    if (error) {      EMSG2(_("connection failed: %s"), error); @@ -15549,6 +15476,39 @@ static void f_sort(typval_T *argvars, typval_T *rettv, FunPtr fptr)    do_sort_uniq(argvars, rettv, true);  } +/// "stdioopen()" function +static void f_stdioopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) +{ +  if (argvars[0].v_type != VAR_DICT) { +    EMSG(_(e_invarg)); +    return; +  } + + +  bool rpc = false; +  CallbackReader on_stdin = CALLBACK_READER_INIT; +  dict_T *opts = argvars[0].vval.v_dict; +  rpc = tv_dict_get_number(opts, "rpc") != 0; + +  if (!tv_dict_get_callback(opts, S_LEN("on_stdin"), &on_stdin.cb)) { +    return; +  } +  on_stdin.buffered = tv_dict_get_number(opts, "stdin_buffered"); +  if (on_stdin.buffered && on_stdin.cb.type == kCallbackNone) { +    on_stdin.self = opts; +  } + +  const char *error; +  uint64_t id = channel_from_stdio(rpc, on_stdin, &error); +  if (!id) { +    EMSG2(e_stdiochan2, error); +  } + + +  rettv->vval.v_number = (varnumber_T)id; +  rettv->v_type = VAR_NUMBER; +} +  /// "uniq({list})" function  static void f_uniq(typval_T *argvars, typval_T *rettv, FunPtr fptr)  { @@ -16665,8 +16625,9 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)      return;    } -  Callback on_stdout = CALLBACK_NONE, on_stderr = CALLBACK_NONE, -           on_exit = CALLBACK_NONE; +  CallbackReader on_stdout = CALLBACK_READER_INIT, +                 on_stderr = CALLBACK_READER_INIT; +  Callback on_exit = CALLBACK_NONE;    dict_T *job_opts = NULL;    const char *cwd = ".";    if (argvars[1].v_type == VAR_DICT) { @@ -16690,23 +16651,16 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)    }    uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); -  TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, -                                          true, false, false, cwd); -  data->proc.pty.width = term_width; -  data->proc.pty.height = curwin->w_height; -  data->proc.pty.term_name = xstrdup("xterm-256color"); -  if (!common_job_start(data, rettv)) { +  Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit, +                                    true, false, false, cwd, +                                    term_width, curwin->w_height, +                                    xstrdup("xterm-256color"), +                                    &rettv->vval.v_number); +  if (rettv->vval.v_number <= 0) {      return;    } -  TerminalOptions topts; -  topts.data = data; -  topts.width = term_width; -  topts.height = curwin->w_height; -  topts.write_cb = term_write; -  topts.resize_cb = term_resize; -  topts.close_cb = term_close; -  int pid = data->proc.pty.process.pid; +  int pid = chan->stream.pty.process.pid;    char buf[1024];    // format the title with the pid to conform with the term:// URI @@ -16717,18 +16671,16 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)    (void)setfname(curbuf, (char_u *)buf, NULL, true);    // Save the job id and pid in b:terminal_job_{id,pid}    Error err = ERROR_INIT; +  // deprecated: use 'channel' buffer option    dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_id"), -               INTEGER_OBJ(rettv->vval.v_number), false, false, &err); +               INTEGER_OBJ(chan->id), false, false, &err);    api_clear_error(&err);    dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_pid"),                 INTEGER_OBJ(pid), false, false, &err);    api_clear_error(&err); -  Terminal *term = terminal_open(topts); -  data->term = term; -  data->refcount++; - -  return; +  channel_terminal_open(chan); +  channel_create_event(chan, NULL);  }  // "test_garbagecollect_now()" function @@ -16761,30 +16713,6 @@ bool callback_from_typval(Callback *const callback, typval_T *const arg)    return true;  } -/// Unref/free callback -void callback_free(Callback *const callback) -  FUNC_ATTR_NONNULL_ALL -{ -  switch (callback->type) { -    case kCallbackFuncref: { -      func_unref(callback->data.funcref); -      xfree(callback->data.funcref); -      break; -    } -    case kCallbackPartial: { -      partial_unref(callback->data.partial); -      break; -    } -    case kCallbackNone: { -      break; -    } -    default: { -      abort(); -    } -  } -  callback->type = kCallbackNone; -} -  bool callback_call(Callback *const callback, const int argcount_in,                     typval_T *const argvars_in, typval_T *const rettv)    FUNC_ATTR_NONNULL_ALL @@ -16839,6 +16767,23 @@ static bool set_ref_in_callback(Callback *callback, int copyID,    return false;  } +static bool set_ref_in_callback_reader(CallbackReader *reader, int copyID, +                                       ht_stack_T **ht_stack, +                                       list_stack_T **list_stack) +{ +  if (set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack)) { +    return true; +  } + +  if (reader->self) { +    typval_T tv; +    tv.v_type = VAR_DICT; +    tv.vval.v_dict = reader->self; +    return set_ref_in_item(&tv, copyID, ht_stack, list_stack); +  } +  return false; +} +  static void add_timer_info(typval_T *rettv, timer_T *timer)  {    list_T *list = rettv->vval.v_list; @@ -22403,318 +22348,54 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub,    return ret;  } -static inline TerminalJobData *common_job_init(char **argv, -                                               Callback on_stdout, -                                               Callback on_stderr, -                                               Callback on_exit, -                                               bool pty, -                                               bool rpc, -                                               bool detach, -                                               const char *cwd) -{ -  TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); -  data->stopped = false; -  data->on_stdout = on_stdout; -  data->on_stderr = on_stderr; -  data->on_exit = on_exit; -  data->events = multiqueue_new_child(main_loop.events); -  data->rpc = rpc; -  if (pty) { -    data->proc.pty = pty_process_init(&main_loop, data); -  } else { -    data->proc.uv = libuv_process_init(&main_loop, data); -  } -  Process *proc = (Process *)&data->proc; -  proc->argv = argv; -  proc->in = &data->in; -  proc->out = &data->out; -  if (!pty) { -    proc->err = &data->err; -  } -  proc->cb = eval_job_process_exit_cb; -  proc->events = data->events; -  proc->detach = detach; -  proc->cwd = cwd; -  return data; -} -  /// common code for getting job callbacks for jobstart, termopen and rpcstart  ///  /// @return true/false on success/failure. -static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, -                                        Callback *on_stderr, Callback *on_exit) +static inline bool common_job_callbacks(dict_T *vopts, +                                        CallbackReader *on_stdout, +                                        CallbackReader *on_stderr, +                                        Callback *on_exit)  { -  if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), on_stdout) -      &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), on_stderr) +  if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), &on_stdout->cb) +      &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), &on_stderr->cb)        && tv_dict_get_callback(vopts, S_LEN("on_exit"), on_exit)) { +    on_stdout->buffered = tv_dict_get_number(vopts, "stdout_buffered"); +    on_stderr->buffered = tv_dict_get_number(vopts, "stderr_buffered"); +    if (on_stdout->buffered && on_stdout->cb.type == kCallbackNone) { +      on_stdout->self = vopts; +    } +    if (on_stderr->buffered && on_stderr->cb.type == kCallbackNone) { +      on_stderr->self = vopts; +    }      vopts->dv_refcount++;      return true;    } -  callback_free(on_stdout); -  callback_free(on_stderr); +  callback_reader_free(on_stdout); +  callback_reader_free(on_stderr);    callback_free(on_exit);    return false;  } -static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) -{ -  Process *proc = (Process *)&data->proc; -  if (proc->type == kProcessTypePty && proc->detach) { -    EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); -    xfree(data->proc.pty.term_name); -    shell_free_argv(proc->argv); -    free_term_job_data_event((void **)&data); -    return false; -  } - -  data->id = next_chan_id++; -  pmap_put(uint64_t)(jobs, data->id, data); - -  data->refcount++; -  char *cmd = xstrdup(proc->argv[0]); -  int status = process_spawn(proc); -  if (status) { -    EMSG3(_(e_jobspawn), os_strerror(status), cmd); -    xfree(cmd); -    if (proc->type == kProcessTypePty) { -      xfree(data->proc.pty.term_name); -    } -    rettv->vval.v_number = proc->status; -    term_job_data_decref(data); -    return false; -  } -  xfree(cmd); - -  if (data->rpc) { -    eval_format_source_name_line((char *)IObuff, sizeof(IObuff)); -    // RPC channel takes over the in/out streams. -    channel_from_process(proc, data->id, (char *)IObuff); -  } else { -    wstream_init(proc->in, 0); -    if (proc->out) { -      rstream_init(proc->out, 0); -      rstream_start(proc->out, on_job_stdout, data); -    } -  } - -  if (proc->err) { -    rstream_init(proc->err, 0); -    rstream_start(proc->err, on_job_stderr, data); -  } -  rettv->vval.v_number = data->id; -  return true; -} - -static inline void free_term_job_data_event(void **argv) +static Channel *find_job(uint64_t id, bool show_error)  { -  TerminalJobData *data = argv[0]; -  callback_free(&data->on_stdout); -  callback_free(&data->on_stderr); -  callback_free(&data->on_exit); - -  multiqueue_free(data->events); -  pmap_del(uint64_t)(jobs, data->id); -  xfree(data); -} - -static inline void free_term_job_data(TerminalJobData *data) -{ -  // data->queue may still be used after this function returns(process_wait), so -  // only free in the next event loop iteration -  multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data); -} - -// vimscript job callbacks must be executed on Nvim main loop -static inline void process_job_event(TerminalJobData *data, Callback *callback, -                                     const char *type, char *buf, size_t count, -                                     int status) -{ -  JobEvent event_data; -  event_data.received = NULL; -  if (buf) { -    event_data.received = tv_list_alloc(); -    char *ptr = buf; -    size_t remaining = count; -    size_t off = 0; - -    while (off < remaining) { -      // append the line -      if (ptr[off] == NL) { -        tv_list_append_string(event_data.received, ptr, off); -        size_t skip = off + 1; -        ptr += skip; -        remaining -= skip; -        off = 0; -        continue; -      } -      if (ptr[off] == NUL) { -        // Translate NUL to NL -        ptr[off] = NL; +  Channel *data = find_channel(id); +  if (!data || data->streamtype != kChannelStreamProc +      || process_is_stopped(&data->stream.proc)) { +    if (show_error) { +      if (data && data->streamtype != kChannelStreamProc) { +        EMSG(_(e_invchanjob)); +      } else { +        EMSG(_(e_invchan));        } -      off++;      } -    tv_list_append_string(event_data.received, ptr, off); -  } else { -    event_data.status = status; -  } -  event_data.data = data; -  event_data.callback = callback; -  event_data.type = type; -  on_job_event(&event_data); -} - -static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, -    void *job, bool eof) -{ -  TerminalJobData *data = job; -  on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); -} - -static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, -    void *job, bool eof) -{ -  TerminalJobData *data = job; -  on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); -} - -static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, -                          size_t count, bool eof, Callback *callback, -                          const char *type) -{ -  if (eof) { -    return; -  } - -  // stub variable, to keep reading consistent with the order of events, only -  // consider the count parameter. -  size_t r; -  char *ptr = rbuffer_read_ptr(buf, &r); - -  // The order here matters, the terminal must receive the data first because -  // process_job_event will modify the read buffer(convert NULs into NLs) -  if (data->term) { -    terminal_receive(data->term, ptr, count); -  } - -  rbuffer_consumed(buf, count); -  if (callback->type != kCallbackNone) { -    process_job_event(data, callback, type, ptr, count, 0); -  } -} - -static void eval_job_process_exit_cb(Process *proc, int status, void *d) -{ -  TerminalJobData *data = d; -  if (data->term && !data->exited) { -    data->exited = true; -    char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; -    snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); -    terminal_close(data->term, msg); -  } -  if (data->rpc) { -    channel_process_exit(data->id, status); -  } - -  if (data->status_ptr) { -    *data->status_ptr = status; -  } - -  process_job_event(data, &data->on_exit, "exit", NULL, 0, status); - -  term_job_data_decref(data); -} - -static void term_write(char *buf, size_t size, void *d) -{ -  TerminalJobData *job = d; -  if (job->in.closed) { -    // If the backing stream was closed abruptly, there may be write events -    // ahead of the terminal close event. Just ignore the writes. -    ILOG("write failed: stream is closed"); -    return; -  } -  WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); -  wstream_write(&job->in, wbuf); -} - -static void term_resize(uint16_t width, uint16_t height, void *d) -{ -  TerminalJobData *data = d; -  pty_process_resize(&data->proc.pty, width, height); -} - -static inline void term_delayed_free(void **argv) -{ -  TerminalJobData *j = argv[0]; -  if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) { -    multiqueue_put(j->events, term_delayed_free, 1, j); -    return; -  } - -  terminal_destroy(j->term); -  term_job_data_decref(j); -} - -static void term_close(void *d) -{ -  TerminalJobData *data = d; -  if (!data->exited) { -    data->exited = true; -    process_stop((Process *)&data->proc); -  } -  multiqueue_put(data->events, term_delayed_free, 1, data); -} - -static void term_job_data_decref(TerminalJobData *data) -{ -  if (!(--data->refcount)) { -    free_term_job_data(data); -  } -} - -static void on_job_event(JobEvent *ev) -{ -  if (!ev->callback) { -    return; -  } - -  typval_T argv[4]; - -  argv[0].v_type = VAR_NUMBER; -  argv[0].v_lock = 0; -  argv[0].vval.v_number = ev->data->id; - -  if (ev->received) { -    argv[1].v_type = VAR_LIST; -    argv[1].v_lock = 0; -    argv[1].vval.v_list = ev->received; -    argv[1].vval.v_list->lv_refcount++; -  } else { -    argv[1].v_type = VAR_NUMBER; -    argv[1].v_lock = 0; -    argv[1].vval.v_number = ev->status; -  } - -  argv[2].v_type = VAR_STRING; -  argv[2].v_lock = 0; -  argv[2].vval.v_string = (uint8_t *)ev->type; - -  typval_T rettv = TV_INITIAL_VALUE; -  callback_call(ev->callback, 3, argv, &rettv); -  tv_clear(&rettv); -} - -static TerminalJobData *find_job(uint64_t id) -{ -  TerminalJobData *data = pmap_get(uint64_t)(jobs, id); -  if (!data || data->stopped) {      return NULL;    }    return data;  } +  static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv)  {    if (check_restricted() || check_secure()) { @@ -22855,4 +22536,3 @@ void ex_checkhealth(exarg_T *eap)    xfree(buf);  } -  | 
