diff options
| author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:46:34 -0300 | 
|---|---|---|
| committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:46:34 -0300 | 
| commit | 883b78d29864f39b8032468c4374766dad7d142f (patch) | |
| tree | b555f3a48c08862c07ef7518a8ba6c8fa58c1aee /src/nvim/eval.c | |
| parent | d88c93acf390ea9d5e8674283927cff60fb41e0d (diff) | |
| parent | aa9cb48bf08af14068178619414590254b263882 (diff) | |
| download | rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.gz rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.bz2 rneovim-883b78d29864f39b8032468c4374766dad7d142f.zip | |
Merge PR #2980 'Refactor event loop layer'
Helped-by: oni-link <knil.ino@gmail.com>
Reviewed-by: oni-link <knil.ino@gmail.com>
Reviewed-by: Scott Prager <splinterofchaos@gmail.com>
Diffstat (limited to 'src/nvim/eval.c')
| -rw-r--r-- | src/nvim/eval.c | 258 | 
1 files changed, 125 insertions, 133 deletions
| diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 7da5cfb731..b43ab238cd 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -55,6 +55,7 @@  #include "nvim/misc1.h"  #include "nvim/misc2.h"  #include "nvim/keymap.h" +#include "nvim/map.h"  #include "nvim/file_search.h"  #include "nvim/garray.h"  #include "nvim/move.h" @@ -82,17 +83,18 @@  #include "nvim/version.h"  #include "nvim/window.h"  #include "nvim/os/os.h" -#include "nvim/os/job.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/pty_process.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #include "nvim/os/time.h"  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/server.h"  #include "nvim/api/private/helpers.h"  #include "nvim/api/vim.h"  #include "nvim/os/dl.h" -#include "nvim/os/event.h"  #include "nvim/os/input.h" +#include "nvim/event/loop.h"  #define DICT_MAXNEST 100        /* maximum nesting of lists and dicts */ @@ -443,14 +445,19 @@ static dictitem_T vimvars_var;                  /* variable used for v: */  #define vimvarht  vimvardict.dv_hashtab  typedef struct { -  Job *job; +  union { +    UvProcess uv; +    PtyProcess pty; +  } proc; +  Stream in, out, err;    Terminal *term; +  bool stopped;    bool exited; -  bool stdin_closed;    int refcount;    ufunc_T *on_stdout, *on_stderr, *on_exit;    dict_T *self;    int *status_ptr; +  uint64_t id;  } TerminalJobData; @@ -463,7 +470,6 @@ typedef struct {                                     valid character */  // Memory pool for reusing JobEvent structures  typedef struct { -  int job_id;    TerminalJobData *data;    ufunc_T *callback;    const char *type; @@ -471,12 +477,15 @@ typedef struct {    int status;  } JobEvent;  static int disable_job_defer = 0; +static uint64_t current_job_id = 1; +static PMap(uint64_t) *jobs = NULL;   /*   * Initialize the global and v: variables.   */  void eval_init(void)  { +  jobs = pmap_new(uint64_t)();    int i;    struct vimvar   *p; @@ -10693,29 +10702,27 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); - -  if (!is_user_job(job)) { -    // Invalid job id +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data) {      EMSG(_(e_invjob));      return;    } +  Process *proc = (Process *)&data->proc; +    if (argvars[1].v_type == VAR_STRING) {      char *stream = (char *)argvars[1].vval.v_string;      if (!strcmp(stream, "stdin")) { -      job_close_in(job); -      ((TerminalJobData *)job_data(job))->stdin_closed = true; +      process_close_in(proc);      } else if (!strcmp(stream, "stdout")) { -      job_close_out(job); +      process_close_out(proc);      } else if (!strcmp(stream, "stderr")) { -      job_close_err(job); +      process_close_err(proc);      } else {        EMSG2(_("Invalid job stream \"%s\""), stream);      }    } else { -    ((TerminalJobData *)job_data(job))->stdin_closed = true; -    job_close_streams(job); +    process_close_streams(proc);    }  } @@ -10736,15 +10743,13 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); - -  if (!is_user_job(job)) { -    // Invalid job id +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data) {      EMSG(_(e_invjob));      return;    } -  if (((TerminalJobData *)job_data(job))->stdin_closed) { +  if (((Process *)&data->proc)->in->closed) {      EMSG(_("Can't send data to the job: stdin is closed"));      return;    } @@ -10758,7 +10763,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)    }    WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); -  rettv->vval.v_number = job_write(job, buf); +  rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf);  }  // "jobresize(job, width, height)" function @@ -10778,19 +10783,20 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); -  if (!is_user_job(job)) { -    // Probably an invalid job id +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data) {      EMSG(_(e_invjob));      return;    } -  if (!job_resize(job, argvars[1].vval.v_number, argvars[2].vval.v_number)) { +  if (data->proc.uv.process.type != kProcessTypePty) {      EMSG(_(e_jobnotpty));      return;    } +  pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, +      argvars[2].vval.v_number);    rettv->vval.v_number = 1;  } @@ -10879,37 +10885,33 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)      }    } -  JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit, -      job_opts); +  bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0; +  TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, +      job_opts, pty); +  Process *proc = (Process *)&data->proc; -  if (!job_opts) { -    goto start; -  } - -  opts.pty = get_dict_number(job_opts, (uint8_t *)"pty"); -  if (opts.pty) { +  if (pty) {      uint16_t width = get_dict_number(job_opts, (uint8_t *)"width");      if (width > 0) { -      opts.width = width; +      data->proc.pty.width = width;      }      uint16_t height = get_dict_number(job_opts, (uint8_t *)"height");      if (height > 0) { -      opts.height = height; +      data->proc.pty.height = height;      }      char *term = (char *)get_dict_string(job_opts, (uint8_t *)"TERM", true);      if (term) { -      opts.term_name = term; +      data->proc.pty.term_name = term;      }    } -start:    if (!on_stdout) { -    opts.stdout_cb = NULL; +    proc->out = NULL;    }    if (!on_stderr) { -    opts.stderr_cb = NULL; +    proc->err = NULL;    } -  common_job_start(opts, rettv); +  common_job_start(data, rettv);  }  // "jobstop()" function @@ -10928,14 +10930,15 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); -  if (!is_user_job(job)) { +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data || data->stopped) {      EMSG(_(e_invjob));      return;    } -  job_stop(job); +  process_stop((Process *)&data->proc); +  data->stopped = true;    rettv->vval.v_number = 1;  } @@ -10966,19 +10969,17 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)    if (!disable_job_defer++) {      // process any pending job events in the deferred queue, but only do this if      // deferred is not disabled(at the top-level `jobwait()` call) -    event_process(); +    loop_process_event(&loop);    }    // For each item in the input list append an integer to the output list. -3    // is used to represent an invalid job id, -2 is for a interrupted job and    // -1 for jobs that were skipped or timed out.    for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    Job *job = NULL; +    TerminalJobData *data = NULL;      if (arg->li_tv.v_type != VAR_NUMBER -        || !(job = job_find(arg->li_tv.vval.v_number)) -        || !is_user_job(job)) { +        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {        list_append_number(rv, -3);      } else { -      TerminalJobData *data = job_data(job);        // append the list item and set the status pointer so we'll collect the        // status code when the job exits        list_append_number(rv, -1); @@ -10994,18 +10995,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)    }    for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    Job *job = NULL; +    TerminalJobData *data = NULL;      if (remaining == 0) {        // timed out        break;      }      if (arg->li_tv.v_type != VAR_NUMBER -        || !(job = job_find(arg->li_tv.vval.v_number)) -        || !is_user_job(job)) { +        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {        continue;      } -    TerminalJobData *data = job_data(job); -    int status = job_wait(job, remaining); +    int status = process_wait((Process *)&data->proc, remaining);      if (status < 0) {        // interrupted or timed out, skip remaining jobs.        if (status == -2) { @@ -11026,16 +11025,14 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)    }    // poll to ensure any pending callbacks from the last job are invoked -  event_poll(0); +  loop_poll_events(&loop, 0);    for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    Job *job = NULL; +    TerminalJobData *data = NULL;      if (arg->li_tv.v_type != VAR_NUMBER -        || !(job = job_find(arg->li_tv.vval.v_number)) -        || !is_user_job(job)) { +        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {        continue;      } -    TerminalJobData *data = job_data(job);      // remove the status pointer because the list may be freed before the      // job exits      data->status_ptr = NULL; @@ -12952,7 +12949,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)    // The last item of argv must be NULL    argv[i] = NULL; -  uint64_t channel_id = channel_from_job(argv); +  uint64_t channel_id = channel_from_process(argv);    if (!channel_id) {      EMSG(_(e_api_spawn_failed)); @@ -15226,19 +15223,15 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)      }    } -  JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit, -      job_opts); -  opts.pty = true; -  opts.width = curwin->w_width; -  opts.height = curwin->w_height; -  opts.term_name = xstrdup("xterm-256color"); -  Job *job = common_job_start(opts, rettv); -  if (!job) { -    shell_free_argv(argv); +  TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, +      job_opts, true); +  data->proc.pty.width = curwin->w_width; +  data->proc.pty.height = curwin->w_height; +  data->proc.pty.term_name = xstrdup("xterm-256color"); +  if (!common_job_start(data, rettv)) {      return;    } -  TerminalJobData *data = opts.data; -  TerminalOptions topts = TERMINAL_OPTIONS_INIT; +  TerminalOptions topts;    topts.data = data;    topts.width = curwin->w_width;    topts.height = curwin->w_height; @@ -15251,7 +15244,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)        && os_isdir(argvars[1].vval.v_string)) {      cwd = (char *)argvars[1].vval.v_string;    } -  int pid = job_pid(job); +  int pid = data->proc.pty.process.pid;    // Get the desired name of the buffer.    char *name = job_opts ? @@ -20223,21 +20216,27 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)    return ret;  } -static inline JobOptions common_job_options(char **argv, ufunc_T *on_stdout, -    ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self) +static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, +    ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self, bool pty)  {    TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); +  data->stopped = false;    data->on_stdout = on_stdout;    data->on_stderr = on_stderr;    data->on_exit = on_exit;    data->self = self; -  JobOptions opts = JOB_OPTIONS_INIT; -  opts.argv = argv; -  opts.data = data; -  opts.stdout_cb = on_job_stdout; -  opts.stderr_cb = on_job_stderr; -  opts.exit_cb = on_job_exit; -  return opts; +  if (pty) { +    data->proc.pty = pty_process_init(data); +  } else { +    data->proc.uv = uv_process_init(data); +  } +  Process *proc = (Process *)&data->proc; +  proc->argv = argv; +  proc->in = &data->in; +  proc->out = &data->out; +  proc->err = &data->err; +  proc->cb = on_process_exit; +  return data;  }  /// Return true/false on success/failure. @@ -20263,24 +20262,28 @@ static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,    return false;  } -static inline Job *common_job_start(JobOptions opts, typval_T *rettv) +static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)  { -  TerminalJobData *data = opts.data;    data->refcount++; -  Job *job = job_start(opts, &rettv->vval.v_number); +  Process *proc = (Process *)&data->proc; +  if (!process_spawn(&loop, proc)) { +    EMSG(_(e_jobexe)); +    return false; +  } -  if (rettv->vval.v_number <= 0) { -    if (rettv->vval.v_number == 0) { -      EMSG(_(e_jobtblfull)); -      xfree(opts.term_name); -      free_term_job_data(data); -    } else { -      EMSG(_(e_jobexe)); -    } -    return NULL; +  data->id = current_job_id++; +  wstream_init(proc->in, 0); +  if (proc->out) { +    rstream_init(proc->out, 0); +    rstream_start(proc->out, on_job_stdout); +  } +  if (proc->err) { +    rstream_init(proc->err, 0); +    rstream_start(proc->err, on_job_stderr);    } -  data->job = job; -  return job; +  pmap_put(uint64_t)(jobs, data->id, data); +  rettv->vval.v_number = data->id; +  return true;  }  static inline void free_term_job_data(TerminalJobData *data) { @@ -20301,25 +20304,15 @@ static inline void free_term_job_data(TerminalJobData *data) {    xfree(data);  } -static inline bool is_user_job(Job *job) -{ -  if (!job) { -    return false; -  } - -  JobOptions *opts = job_opts(job); -  return opts->exit_cb == on_job_exit; -} -  // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(Job *job, ufunc_T *callback, -    const char *type, char *data, size_t count, int status) +static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, +    const char *type, char *buf, size_t count, int status)  {    JobEvent *event_data = xmalloc(sizeof(JobEvent));    event_data->received = NULL; -  if (data) { +  if (buf) {      event_data->received = list_alloc(); -    char *ptr = data; +    char *ptr = buf;      size_t remaining = count;      size_t off = 0; @@ -20343,36 +20336,34 @@ static inline void push_job_event(Job *job, ufunc_T *callback,    } else {      event_data->status = status;    } -  event_data->job_id = job_id(job); -  event_data->data = job_data(job); +  event_data->data = data;    event_data->callback = callback;    event_data->type = type; -  event_push((Event) { +  loop_push_event(&loop, (Event) {      .handler = on_job_event,      .data = event_data    }, !disable_job_defer);  } -static void on_job_stdout(RStream *rstream, RBuffer *buf, void *job, bool eof) +static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)  { -  TerminalJobData *data = job_data(job); -  on_job_output(rstream, job, buf, eof, data->on_stdout, "stdout"); +  TerminalJobData *data = job; +  on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");  } -static void on_job_stderr(RStream *rstream, RBuffer *buf, void *job, bool eof) +static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)  { -  TerminalJobData *data = job_data(job); -  on_job_output(rstream, job, buf, eof, data->on_stderr, "stderr"); +  TerminalJobData *data = job; +  on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");  } -static void on_job_output(RStream *rstream, Job *job, RBuffer *buf, bool eof, -    ufunc_T *callback, const char *type) +static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, +    bool eof, ufunc_T *callback, const char *type)  {    if (eof) {      return;    } -  TerminalJobData *data = job_data(job);    RBUFFER_UNTIL_EMPTY(buf, ptr, len) {      // The order here matters, the terminal must receive the data first because      // push_job_event will modify the read buffer(convert NULs into NLs) @@ -20381,17 +20372,16 @@ static void on_job_output(RStream *rstream, Job *job, RBuffer *buf, bool eof,      }      if (callback) { -      push_job_event(job, callback, type, ptr, len, 0); +      push_job_event(data, callback, type, ptr, len, 0);      }      rbuffer_consumed(buf, len);    }  } -static void on_job_exit(Job *job, int status, void *d) +static void on_process_exit(Process *proc, int status, void *d)  {    TerminalJobData *data = d; -    if (data->term && !data->exited) {      data->exited = true;      terminal_close(data->term, @@ -20402,19 +20392,20 @@ static void on_job_exit(Job *job, int status, void *d)      *data->status_ptr = status;    } -  push_job_event(job, data->on_exit, "exit", NULL, 0, status); +  push_job_event(data, data->on_exit, "exit", NULL, 0, status);  } -static void term_write(char *buf, size_t size, void *data) +static void term_write(char *buf, size_t size, void *d)  { -  Job *job = ((TerminalJobData *)data)->job; +  TerminalJobData *data = d;    WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); -  job_write(job, wbuf); +  wstream_write(&data->in, wbuf);  } -static void term_resize(uint16_t width, uint16_t height, void *data) +static void term_resize(uint16_t width, uint16_t height, void *d)  { -  job_resize(((TerminalJobData *)data)->job, width, height); +  TerminalJobData *data = d; +  pty_process_resize(&data->proc.pty, width, height);  }  static void term_close(void *d) @@ -20422,7 +20413,7 @@ static void term_close(void *d)    TerminalJobData *data = d;    if (!data->exited) {      data->exited = true; -    job_stop(data->job); +    process_stop((Process *)&data->proc);    }    terminal_destroy(data->term);    term_job_data_decref(d); @@ -20449,7 +20440,7 @@ static void on_job_event(Event event)    if (argc > 0) {      argv[0].v_type = VAR_NUMBER;      argv[0].v_lock = 0; -    argv[0].vval.v_number = ev->job_id; +    argv[0].vval.v_number = ev->data->id;    }    if (argc > 1) { @@ -20480,6 +20471,7 @@ static void on_job_event(Event event)  end:    if (!ev->received) {      // exit event, safe to free job data now +    pmap_del(uint64_t)(jobs, ev->data->id);      term_job_data_decref(ev->data);    }    xfree(ev); | 
