diff options
Diffstat (limited to 'src')
65 files changed, 2135 insertions, 2372 deletions
| diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index ad5bac1898..4448859b64 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -30,15 +30,17 @@ file(MAKE_DIRECTORY ${GENERATED_DIR}/api)  file(MAKE_DIRECTORY ${GENERATED_DIR}/api/private)  file(MAKE_DIRECTORY ${GENERATED_DIR}/msgpack_rpc)  file(MAKE_DIRECTORY ${GENERATED_DIR}/tui) +file(MAKE_DIRECTORY ${GENERATED_DIR}/event)  file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR})  file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/os)  file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api)  file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api/private)  file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/msgpack_rpc)  file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/tui) +file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/event)  file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c msgpack_rpc/*.c -  tui/*.c) +  tui/*.c event/*.c)  file(GLOB_RECURSE NEOVIM_HEADERS *.h)  file(GLOB UNIT_TEST_FIXTURES ${PROJECT_SOURCE_DIR}/test/unit/fixtures/*.c) diff --git a/src/nvim/edit.c b/src/nvim/edit.c index dd82b06158..390d62210b 100644 --- a/src/nvim/edit.c +++ b/src/nvim/edit.c @@ -59,7 +59,7 @@  #include "nvim/terminal.h"  #include "nvim/undo.h"  #include "nvim/window.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  #include "nvim/os/input.h"  #include "nvim/os/time.h" @@ -601,15 +601,15 @@ edit (       * Get a character for Insert mode.  Ignore K_IGNORE.       */      lastc = c;                          /* remember previous char for CTRL-D */ -    event_enable_deferred(); +    loop_enable_deferred_events(&loop);      do {        c = safe_vgetc();      } while (c == K_IGNORE); -    event_disable_deferred(); +    loop_disable_deferred_events(&loop);      if (c == K_EVENT) {        c = lastc; -      event_process(); +      loop_process_event(&loop);        continue;      } diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 7da5cfb731..b43ab238cd 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -55,6 +55,7 @@  #include "nvim/misc1.h"  #include "nvim/misc2.h"  #include "nvim/keymap.h" +#include "nvim/map.h"  #include "nvim/file_search.h"  #include "nvim/garray.h"  #include "nvim/move.h" @@ -82,17 +83,18 @@  #include "nvim/version.h"  #include "nvim/window.h"  #include "nvim/os/os.h" -#include "nvim/os/job.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/pty_process.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #include "nvim/os/time.h"  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/server.h"  #include "nvim/api/private/helpers.h"  #include "nvim/api/vim.h"  #include "nvim/os/dl.h" -#include "nvim/os/event.h"  #include "nvim/os/input.h" +#include "nvim/event/loop.h"  #define DICT_MAXNEST 100        /* maximum nesting of lists and dicts */ @@ -443,14 +445,19 @@ static dictitem_T vimvars_var;                  /* variable used for v: */  #define vimvarht  vimvardict.dv_hashtab  typedef struct { -  Job *job; +  union { +    UvProcess uv; +    PtyProcess pty; +  } proc; +  Stream in, out, err;    Terminal *term; +  bool stopped;    bool exited; -  bool stdin_closed;    int refcount;    ufunc_T *on_stdout, *on_stderr, *on_exit;    dict_T *self;    int *status_ptr; +  uint64_t id;  } TerminalJobData; @@ -463,7 +470,6 @@ typedef struct {                                     valid character */  // Memory pool for reusing JobEvent structures  typedef struct { -  int job_id;    TerminalJobData *data;    ufunc_T *callback;    const char *type; @@ -471,12 +477,15 @@ typedef struct {    int status;  } JobEvent;  static int disable_job_defer = 0; +static uint64_t current_job_id = 1; +static PMap(uint64_t) *jobs = NULL;   /*   * Initialize the global and v: variables.   */  void eval_init(void)  { +  jobs = pmap_new(uint64_t)();    int i;    struct vimvar   *p; @@ -10693,29 +10702,27 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); - -  if (!is_user_job(job)) { -    // Invalid job id +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data) {      EMSG(_(e_invjob));      return;    } +  Process *proc = (Process *)&data->proc; +    if (argvars[1].v_type == VAR_STRING) {      char *stream = (char *)argvars[1].vval.v_string;      if (!strcmp(stream, "stdin")) { -      job_close_in(job); -      ((TerminalJobData *)job_data(job))->stdin_closed = true; +      process_close_in(proc);      } else if (!strcmp(stream, "stdout")) { -      job_close_out(job); +      process_close_out(proc);      } else if (!strcmp(stream, "stderr")) { -      job_close_err(job); +      process_close_err(proc);      } else {        EMSG2(_("Invalid job stream \"%s\""), stream);      }    } else { -    ((TerminalJobData *)job_data(job))->stdin_closed = true; -    job_close_streams(job); +    process_close_streams(proc);    }  } @@ -10736,15 +10743,13 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); - -  if (!is_user_job(job)) { -    // Invalid job id +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data) {      EMSG(_(e_invjob));      return;    } -  if (((TerminalJobData *)job_data(job))->stdin_closed) { +  if (((Process *)&data->proc)->in->closed) {      EMSG(_("Can't send data to the job: stdin is closed"));      return;    } @@ -10758,7 +10763,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)    }    WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); -  rettv->vval.v_number = job_write(job, buf); +  rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf);  }  // "jobresize(job, width, height)" function @@ -10778,19 +10783,20 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); -  if (!is_user_job(job)) { -    // Probably an invalid job id +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data) {      EMSG(_(e_invjob));      return;    } -  if (!job_resize(job, argvars[1].vval.v_number, argvars[2].vval.v_number)) { +  if (data->proc.uv.process.type != kProcessTypePty) {      EMSG(_(e_jobnotpty));      return;    } +  pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, +      argvars[2].vval.v_number);    rettv->vval.v_number = 1;  } @@ -10879,37 +10885,33 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)      }    } -  JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit, -      job_opts); +  bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0; +  TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, +      job_opts, pty); +  Process *proc = (Process *)&data->proc; -  if (!job_opts) { -    goto start; -  } - -  opts.pty = get_dict_number(job_opts, (uint8_t *)"pty"); -  if (opts.pty) { +  if (pty) {      uint16_t width = get_dict_number(job_opts, (uint8_t *)"width");      if (width > 0) { -      opts.width = width; +      data->proc.pty.width = width;      }      uint16_t height = get_dict_number(job_opts, (uint8_t *)"height");      if (height > 0) { -      opts.height = height; +      data->proc.pty.height = height;      }      char *term = (char *)get_dict_string(job_opts, (uint8_t *)"TERM", true);      if (term) { -      opts.term_name = term; +      data->proc.pty.term_name = term;      }    } -start:    if (!on_stdout) { -    opts.stdout_cb = NULL; +    proc->out = NULL;    }    if (!on_stderr) { -    opts.stderr_cb = NULL; +    proc->err = NULL;    } -  common_job_start(opts, rettv); +  common_job_start(data, rettv);  }  // "jobstop()" function @@ -10928,14 +10930,15 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)      return;    } -  Job *job = job_find(argvars[0].vval.v_number); -  if (!is_user_job(job)) { +  TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); +  if (!data || data->stopped) {      EMSG(_(e_invjob));      return;    } -  job_stop(job); +  process_stop((Process *)&data->proc); +  data->stopped = true;    rettv->vval.v_number = 1;  } @@ -10966,19 +10969,17 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)    if (!disable_job_defer++) {      // process any pending job events in the deferred queue, but only do this if      // deferred is not disabled(at the top-level `jobwait()` call) -    event_process(); +    loop_process_event(&loop);    }    // For each item in the input list append an integer to the output list. -3    // is used to represent an invalid job id, -2 is for a interrupted job and    // -1 for jobs that were skipped or timed out.    for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    Job *job = NULL; +    TerminalJobData *data = NULL;      if (arg->li_tv.v_type != VAR_NUMBER -        || !(job = job_find(arg->li_tv.vval.v_number)) -        || !is_user_job(job)) { +        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {        list_append_number(rv, -3);      } else { -      TerminalJobData *data = job_data(job);        // append the list item and set the status pointer so we'll collect the        // status code when the job exits        list_append_number(rv, -1); @@ -10994,18 +10995,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)    }    for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    Job *job = NULL; +    TerminalJobData *data = NULL;      if (remaining == 0) {        // timed out        break;      }      if (arg->li_tv.v_type != VAR_NUMBER -        || !(job = job_find(arg->li_tv.vval.v_number)) -        || !is_user_job(job)) { +        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {        continue;      } -    TerminalJobData *data = job_data(job); -    int status = job_wait(job, remaining); +    int status = process_wait((Process *)&data->proc, remaining);      if (status < 0) {        // interrupted or timed out, skip remaining jobs.        if (status == -2) { @@ -11026,16 +11025,14 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)    }    // poll to ensure any pending callbacks from the last job are invoked -  event_poll(0); +  loop_poll_events(&loop, 0);    for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { -    Job *job = NULL; +    TerminalJobData *data = NULL;      if (arg->li_tv.v_type != VAR_NUMBER -        || !(job = job_find(arg->li_tv.vval.v_number)) -        || !is_user_job(job)) { +        || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {        continue;      } -    TerminalJobData *data = job_data(job);      // remove the status pointer because the list may be freed before the      // job exits      data->status_ptr = NULL; @@ -12952,7 +12949,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)    // The last item of argv must be NULL    argv[i] = NULL; -  uint64_t channel_id = channel_from_job(argv); +  uint64_t channel_id = channel_from_process(argv);    if (!channel_id) {      EMSG(_(e_api_spawn_failed)); @@ -15226,19 +15223,15 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)      }    } -  JobOptions opts = common_job_options(argv, on_stdout, on_stderr, on_exit, -      job_opts); -  opts.pty = true; -  opts.width = curwin->w_width; -  opts.height = curwin->w_height; -  opts.term_name = xstrdup("xterm-256color"); -  Job *job = common_job_start(opts, rettv); -  if (!job) { -    shell_free_argv(argv); +  TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, +      job_opts, true); +  data->proc.pty.width = curwin->w_width; +  data->proc.pty.height = curwin->w_height; +  data->proc.pty.term_name = xstrdup("xterm-256color"); +  if (!common_job_start(data, rettv)) {      return;    } -  TerminalJobData *data = opts.data; -  TerminalOptions topts = TERMINAL_OPTIONS_INIT; +  TerminalOptions topts;    topts.data = data;    topts.width = curwin->w_width;    topts.height = curwin->w_height; @@ -15251,7 +15244,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)        && os_isdir(argvars[1].vval.v_string)) {      cwd = (char *)argvars[1].vval.v_string;    } -  int pid = job_pid(job); +  int pid = data->proc.pty.process.pid;    // Get the desired name of the buffer.    char *name = job_opts ? @@ -20223,21 +20216,27 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)    return ret;  } -static inline JobOptions common_job_options(char **argv, ufunc_T *on_stdout, -    ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self) +static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, +    ufunc_T *on_stderr, ufunc_T *on_exit, dict_T *self, bool pty)  {    TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); +  data->stopped = false;    data->on_stdout = on_stdout;    data->on_stderr = on_stderr;    data->on_exit = on_exit;    data->self = self; -  JobOptions opts = JOB_OPTIONS_INIT; -  opts.argv = argv; -  opts.data = data; -  opts.stdout_cb = on_job_stdout; -  opts.stderr_cb = on_job_stderr; -  opts.exit_cb = on_job_exit; -  return opts; +  if (pty) { +    data->proc.pty = pty_process_init(data); +  } else { +    data->proc.uv = uv_process_init(data); +  } +  Process *proc = (Process *)&data->proc; +  proc->argv = argv; +  proc->in = &data->in; +  proc->out = &data->out; +  proc->err = &data->err; +  proc->cb = on_process_exit; +  return data;  }  /// Return true/false on success/failure. @@ -20263,24 +20262,28 @@ static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,    return false;  } -static inline Job *common_job_start(JobOptions opts, typval_T *rettv) +static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)  { -  TerminalJobData *data = opts.data;    data->refcount++; -  Job *job = job_start(opts, &rettv->vval.v_number); +  Process *proc = (Process *)&data->proc; +  if (!process_spawn(&loop, proc)) { +    EMSG(_(e_jobexe)); +    return false; +  } -  if (rettv->vval.v_number <= 0) { -    if (rettv->vval.v_number == 0) { -      EMSG(_(e_jobtblfull)); -      xfree(opts.term_name); -      free_term_job_data(data); -    } else { -      EMSG(_(e_jobexe)); -    } -    return NULL; +  data->id = current_job_id++; +  wstream_init(proc->in, 0); +  if (proc->out) { +    rstream_init(proc->out, 0); +    rstream_start(proc->out, on_job_stdout); +  } +  if (proc->err) { +    rstream_init(proc->err, 0); +    rstream_start(proc->err, on_job_stderr);    } -  data->job = job; -  return job; +  pmap_put(uint64_t)(jobs, data->id, data); +  rettv->vval.v_number = data->id; +  return true;  }  static inline void free_term_job_data(TerminalJobData *data) { @@ -20301,25 +20304,15 @@ static inline void free_term_job_data(TerminalJobData *data) {    xfree(data);  } -static inline bool is_user_job(Job *job) -{ -  if (!job) { -    return false; -  } - -  JobOptions *opts = job_opts(job); -  return opts->exit_cb == on_job_exit; -} -  // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(Job *job, ufunc_T *callback, -    const char *type, char *data, size_t count, int status) +static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, +    const char *type, char *buf, size_t count, int status)  {    JobEvent *event_data = xmalloc(sizeof(JobEvent));    event_data->received = NULL; -  if (data) { +  if (buf) {      event_data->received = list_alloc(); -    char *ptr = data; +    char *ptr = buf;      size_t remaining = count;      size_t off = 0; @@ -20343,36 +20336,34 @@ static inline void push_job_event(Job *job, ufunc_T *callback,    } else {      event_data->status = status;    } -  event_data->job_id = job_id(job); -  event_data->data = job_data(job); +  event_data->data = data;    event_data->callback = callback;    event_data->type = type; -  event_push((Event) { +  loop_push_event(&loop, (Event) {      .handler = on_job_event,      .data = event_data    }, !disable_job_defer);  } -static void on_job_stdout(RStream *rstream, RBuffer *buf, void *job, bool eof) +static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)  { -  TerminalJobData *data = job_data(job); -  on_job_output(rstream, job, buf, eof, data->on_stdout, "stdout"); +  TerminalJobData *data = job; +  on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");  } -static void on_job_stderr(RStream *rstream, RBuffer *buf, void *job, bool eof) +static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)  { -  TerminalJobData *data = job_data(job); -  on_job_output(rstream, job, buf, eof, data->on_stderr, "stderr"); +  TerminalJobData *data = job; +  on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");  } -static void on_job_output(RStream *rstream, Job *job, RBuffer *buf, bool eof, -    ufunc_T *callback, const char *type) +static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, +    bool eof, ufunc_T *callback, const char *type)  {    if (eof) {      return;    } -  TerminalJobData *data = job_data(job);    RBUFFER_UNTIL_EMPTY(buf, ptr, len) {      // The order here matters, the terminal must receive the data first because      // push_job_event will modify the read buffer(convert NULs into NLs) @@ -20381,17 +20372,16 @@ static void on_job_output(RStream *rstream, Job *job, RBuffer *buf, bool eof,      }      if (callback) { -      push_job_event(job, callback, type, ptr, len, 0); +      push_job_event(data, callback, type, ptr, len, 0);      }      rbuffer_consumed(buf, len);    }  } -static void on_job_exit(Job *job, int status, void *d) +static void on_process_exit(Process *proc, int status, void *d)  {    TerminalJobData *data = d; -    if (data->term && !data->exited) {      data->exited = true;      terminal_close(data->term, @@ -20402,19 +20392,20 @@ static void on_job_exit(Job *job, int status, void *d)      *data->status_ptr = status;    } -  push_job_event(job, data->on_exit, "exit", NULL, 0, status); +  push_job_event(data, data->on_exit, "exit", NULL, 0, status);  } -static void term_write(char *buf, size_t size, void *data) +static void term_write(char *buf, size_t size, void *d)  { -  Job *job = ((TerminalJobData *)data)->job; +  TerminalJobData *data = d;    WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); -  job_write(job, wbuf); +  wstream_write(&data->in, wbuf);  } -static void term_resize(uint16_t width, uint16_t height, void *data) +static void term_resize(uint16_t width, uint16_t height, void *d)  { -  job_resize(((TerminalJobData *)data)->job, width, height); +  TerminalJobData *data = d; +  pty_process_resize(&data->proc.pty, width, height);  }  static void term_close(void *d) @@ -20422,7 +20413,7 @@ static void term_close(void *d)    TerminalJobData *data = d;    if (!data->exited) {      data->exited = true; -    job_stop(data->job); +    process_stop((Process *)&data->proc);    }    terminal_destroy(data->term);    term_job_data_decref(d); @@ -20449,7 +20440,7 @@ static void on_job_event(Event event)    if (argc > 0) {      argv[0].v_type = VAR_NUMBER;      argv[0].v_lock = 0; -    argv[0].vval.v_number = ev->job_id; +    argv[0].vval.v_number = ev->data->id;    }    if (argc > 1) { @@ -20480,6 +20471,7 @@ static void on_job_event(Event event)  end:    if (!ev->received) {      // exit event, safe to free job data now +    pmap_del(uint64_t)(jobs, ev->data->id);      term_job_data_decref(ev->data);    }    xfree(ev); diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c new file mode 100644 index 0000000000..d90565002e --- /dev/null +++ b/src/nvim/event/loop.c @@ -0,0 +1,144 @@ +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/process.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/loop.c.generated.h" +#endif + + +void loop_init(Loop *loop, void *data) +{ +  uv_loop_init(&loop->uv); +  loop->uv.data = loop; +  loop->deferred_events = kl_init(Event); +  loop->immediate_events = kl_init(Event); +  loop->children = kl_init(WatcherPtr); +  loop->children_stop_requests = 0; +  uv_signal_init(&loop->uv, &loop->children_watcher); +  uv_timer_init(&loop->uv, &loop->children_kill_timer); +} + +void loop_poll_events(Loop *loop, int ms) +{ +  static int recursive = 0; + +  if (recursive++) { +    abort();  // Should not re-enter uv_run +  } + +  bool wait = true; +  uv_timer_t timer; + +  if (ms > 0) { +    uv_timer_init(&loop->uv, &timer); +    // Use a repeating timeout of ms milliseconds to make sure +    // we do not block indefinitely for I/O. +    uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms); +  } else if (ms == 0) { +    // For ms == 0, we need to do a non-blocking event poll by +    // setting the run mode to UV_RUN_NOWAIT. +    wait = false; +  } + +  if (wait) { +    loop_run_once(loop); +  } else { +    loop_run_nowait(loop); +  } + +  if (ms > 0) { +    // Ensure the timer handle is closed and run the event loop +    // once more to let libuv perform it's cleanup +    uv_timer_stop(&timer); +    uv_close((uv_handle_t *)&timer, NULL); +    loop_run_nowait(loop); +  } + +  recursive--;  // Can re-enter uv_run now +  process_events_from(loop->immediate_events); +} + +bool loop_has_deferred_events(Loop *loop) +{ +  return loop->deferred_events_allowed && !kl_empty(loop->deferred_events); +} + +void loop_enable_deferred_events(Loop *loop) +{ +  ++loop->deferred_events_allowed; +} + +void loop_disable_deferred_events(Loop *loop) +{ +  --loop->deferred_events_allowed; +} + +// Queue an event +void loop_push_event(Loop *loop, Event event, bool deferred) +{ +  // Sometimes libuv will run pending callbacks(timer for example) before +  // blocking for a poll. If this happens and the callback pushes a event to one +  // of the queues, the event would only be processed after the poll +  // returns(user hits a key for example). To avoid this scenario, we call +  // uv_stop when a event is enqueued. +  loop_stop(loop); +  kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, +      event); +} + +void loop_process_event(Loop *loop) +{ +  process_events_from(loop->deferred_events); +} + + +void loop_run(Loop *loop) +{ +  uv_run(&loop->uv, UV_RUN_DEFAULT); +} + +void loop_run_once(Loop *loop) +{ +  uv_run(&loop->uv, UV_RUN_ONCE); +} + +void loop_run_nowait(Loop *loop) +{ +  uv_run(&loop->uv, UV_RUN_NOWAIT); +} + +void loop_stop(Loop *loop) +{ +  uv_stop(&loop->uv); +} + +void loop_close(Loop *loop) +{ +  uv_close((uv_handle_t *)&loop->children_watcher, NULL); +  uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); +  do { +    uv_run(&loop->uv, UV_RUN_DEFAULT); +  } while (uv_loop_close(&loop->uv)); +} + +void loop_process_all_events(Loop *loop) +{ +  process_events_from(loop->immediate_events); +  process_events_from(loop->deferred_events); +} + +static void process_events_from(klist_t(Event) *queue) +{ +  while (!kl_empty(queue)) { +    Event event = kl_shift(Event, queue); +    event.handler(event); +  } +} + +static void timer_cb(uv_timer_t *handle) +{ +} diff --git a/src/nvim/os/event.h b/src/nvim/event/loop.h index db02b38c7f..5eb4d32ca8 100644 --- a/src/nvim/os/event.h +++ b/src/nvim/event/loop.h @@ -1,20 +1,44 @@ -#ifndef NVIM_OS_EVENT_H -#define NVIM_OS_EVENT_H +#ifndef NVIM_EVENT_LOOP_H +#define NVIM_EVENT_LOOP_H  #include <stdint.h> -#include <stdbool.h> -#include "nvim/os/event_defs.h" -#include "nvim/os/job_defs.h" +#include <uv.h> + +#include "nvim/lib/klist.h"  #include "nvim/os/time.h" +typedef struct event Event; +typedef void (*event_handler)(Event event); + +struct event { +  void *data; +  event_handler handler; +}; + +typedef void * WatcherPtr; + +#define _noop(x) +KLIST_INIT(WatcherPtr, WatcherPtr, _noop) +KLIST_INIT(Event, Event, _noop) + +typedef struct loop { +  uv_loop_t uv; +  klist_t(Event) *deferred_events, *immediate_events; +  int deferred_events_allowed; +  klist_t(WatcherPtr) *children; +  uv_signal_t children_watcher; +  uv_timer_t children_kill_timer; +  size_t children_stop_requests; +} Loop; +  // Poll for events until a condition or timeout -#define event_poll_until(timeout, condition)                                 \ +#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition)                     \    do {                                                                       \      int remaining = timeout;                                                 \      uint64_t before = (remaining > 0) ? os_hrtime() : 0;                     \      while (!(condition)) {                                                   \ -      event_poll(remaining);                                                 \ +      loop_poll_events(loop, remaining);                                     \        if (remaining == 0) {                                                  \          break;                                                               \        } else if (remaining > 0) {                                            \ @@ -29,7 +53,7 @@    } while (0)  #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/event.h.generated.h" +# include "event/loop.h.generated.h"  #endif -#endif  // NVIM_OS_EVENT_H +#endif  // NVIM_EVENT_LOOP_H diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c new file mode 100644 index 0000000000..2b1f1ae096 --- /dev/null +++ b/src/nvim/event/process.c @@ -0,0 +1,325 @@ +#include <assert.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/os/shell.h" +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/pty_process.h" +#include "nvim/globals.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/process.c.generated.h" +#endif + +// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a process has to cleanly +// exit before we send SIGNAL to it +#define TERM_TIMEOUT 1000000000 +#define KILL_TIMEOUT (TERM_TIMEOUT * 2) + +#define CLOSE_PROC_STREAM(proc, stream)                          \ +  do {                                                              \ +    if (proc->stream && !proc->stream->closed) {                    \ +      stream_close(proc->stream, NULL);                             \ +    }                                                               \ +  } while (0) + + +bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL +{ +  proc->loop = loop; +  if (proc->in) { +    uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0); +  } + +  if (proc->out) { +    uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0); +  } + +  if (proc->err) { +    uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0); +  } + +  bool success; +  switch (proc->type) { +    case kProcessTypeUv: +      success = uv_process_spawn((UvProcess *)proc); +      break; +    case kProcessTypePty: +      success = pty_process_spawn((PtyProcess *)proc); +      break; +    default: +      abort(); +  } + +  if (!success) { +    if (proc->in) { +      uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL); +    } +    if (proc->out) { +      uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL); +    } +    if (proc->err) { +      uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL); +    } +    process_close(proc); +    shell_free_argv(proc->argv); +    proc->status = -1; +    return false; +  } + +  void *data = proc->data; + +  if (proc->in) { +    stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); +    proc->in->internal_data = proc; +    proc->in->internal_close_cb = on_process_stream_close; +    proc->refcount++; +  } + +  if (proc->out) { +    stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); +    proc->out->internal_data = proc; +    proc->out->internal_close_cb = on_process_stream_close; +    proc->refcount++; +  } + +  if (proc->err) { +    stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); +    proc->err->internal_data = proc; +    proc->err->internal_close_cb = on_process_stream_close; +    proc->refcount++; +  } + +  proc->internal_exit_cb = on_process_exit; +  proc->internal_close_cb = decref; +  proc->refcount++; +  kl_push(WatcherPtr, loop->children, proc); +  return true; +} + +void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL +{ +  kl_iter(WatcherPtr, loop->children, current) { +    Process *proc = (*current)->data; +    uv_kill(proc->pid, SIGTERM); +    proc->term_sent = true; +    process_stop(proc); +  } + +  // Wait until all children exit +  LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); +  pty_process_teardown(loop); +} + +// Wrappers around `stream_close` that protect against double-closing. +void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL +{ +  process_close_in(proc); +  process_close_out(proc); +  process_close_err(proc); +} + +void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL +{ +  CLOSE_PROC_STREAM(proc, in); +} + +void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL +{ +  CLOSE_PROC_STREAM(proc, out); +} + +void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL +{ +  CLOSE_PROC_STREAM(proc, err); +} + +/// Synchronously wait for a process to finish +/// +/// @param process The Process instance +/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for +///        waiting until the process quits. +/// @return returns the status code of the exited process. -1 if the process is +///         still running and the `timeout` has expired. Note that this is +///         indistinguishable from the process returning -1 by itself. Which +///         is possible on some OS. Returns -2 if an user has interruped the +///         wait. +int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +{ +  // The default status is -1, which represents a timeout +  int status = -1; +  bool interrupted = false; + +  // Increase refcount to stop the exit callback from being called(and possibly +  // being freed) before we have a chance to get the status. +  proc->refcount++; +  LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, +      // Until... +      got_int ||             // interrupted by the user +      proc->refcount == 1);  // job exited + +  // we'll assume that a user frantically hitting interrupt doesn't like +  // the current job. Signal that it has to be killed. +  if (got_int) { +    interrupted = true; +    got_int = false; +    process_stop(proc); +    if (ms == -1) { +      // We can only return, if all streams/handles are closed and the job + +      // exited. +      LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); +    } else { +      loop_poll_events(proc->loop, 0); +    } +  } + +  if (proc->refcount == 1) { +    // Job exited, collect status and manually invoke close_cb to free the job +    // resources +    status = interrupted ? -2 : proc->status; +    decref(proc); +  } else { +    proc->refcount--; +  } + +  return status; +} + +/// Ask a process to terminate and eventually kill if it doesn't respond +void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL +{ +  if (proc->stopped_time) { +    return; +  } + +  proc->stopped_time = os_hrtime(); +  switch (proc->type) { +    case kProcessTypeUv: +      // Close the process's stdin. If the process doesn't close its own +      // stdout/stderr, they will be closed when it exits(possibly due to being +      // terminated after a timeout) +      process_close_in(proc); +      break; +    case kProcessTypePty: +      // close all streams for pty processes to send SIGHUP to the process +      process_close_streams(proc); +      pty_process_close_master((PtyProcess *)proc); +      break; +    default: +      abort(); +  } + +  Loop *loop = proc->loop; +  if (!loop->children_stop_requests++) { +    // When there's at least one stop request pending, start a timer that +    // will periodically check if a signal should be send to a to the job +    DLOG("Starting job kill timer"); +    uv_timer_start(&loop->children_kill_timer, children_kill_cb, 100, 100); +  } +} + +/// Iterates the process list sending SIGTERM to stopped processes and SIGKILL +/// to those that didn't die from SIGTERM after a while(exit_timeout is 0). +static void children_kill_cb(uv_timer_t *handle) +{ +  Loop *loop = handle->loop->data; +  uint64_t now = os_hrtime(); + +  kl_iter(WatcherPtr, loop->children, current) { +    Process *proc = (*current)->data; +    if (!proc->stopped_time) { +      continue; +    } +    uint64_t elapsed = now - proc->stopped_time; + +    if (!proc->term_sent && elapsed >= TERM_TIMEOUT) { +      ILOG("Sending SIGTERM to pid %d", proc->pid); +      uv_kill(proc->pid, SIGTERM); +      proc->term_sent = true; +    } else if (elapsed >= KILL_TIMEOUT) { +      ILOG("Sending SIGKILL to pid %d", proc->pid); +      uv_kill(proc->pid, SIGKILL); +    } +  } +} + +static void decref(Process *proc) +{ +  if (--proc->refcount != 0) { +    return; +  } + +  Loop *loop = proc->loop; +  kliter_t(WatcherPtr) **node = NULL; +  kl_iter(WatcherPtr, loop->children, current) { +    if ((*current)->data == proc) { +      node = current; +      break; +    } +  } + +  assert(node); +  kl_shift_at(WatcherPtr, loop->children, node); +  shell_free_argv(proc->argv); +  if (proc->type == kProcessTypePty) { +    xfree(((PtyProcess *)proc)->term_name); +  } +  if (proc->cb) { +    proc->cb(proc, proc->status, proc->data); +  } +} + +static void process_close(Process *proc) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  assert(!proc->closed); +  proc->closed = true; +  switch (proc->type) { +    case kProcessTypeUv: +      uv_process_close((UvProcess *)proc); +      break; +    case kProcessTypePty: +      pty_process_close((PtyProcess *)proc); +      break; +    default: +      abort(); +  } +} + +static void on_process_exit(Process *proc) +{ +  if (exiting) { +    on_process_exit_event((Event) {.data = proc}); +  } else { +    loop_push_event(proc->loop, +        (Event) {.handler = on_process_exit_event, .data = proc}, false); +  } + +  Loop *loop = proc->loop; +  if (loop->children_stop_requests && !--loop->children_stop_requests) { +    // Stop the timer if no more stop requests are pending +    DLOG("Stopping process kill timer"); +    uv_timer_stop(&loop->children_kill_timer); +  } +} + +static void on_process_exit_event(Event event) +{ +  Process *proc = event.data; +  process_close_streams(proc); +  process_close(proc); +} + +static void on_process_stream_close(Stream *stream, void *data) +{ +  Process *proc = data; +  decref(proc); +} + diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h new file mode 100644 index 0000000000..5c84a7d1d0 --- /dev/null +++ b/src/nvim/event/process.h @@ -0,0 +1,56 @@ +#ifndef NVIM_EVENT_PROCESS_H +#define NVIM_EVENT_PROCESS_H + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" + +typedef enum { +  kProcessTypeUv, +  kProcessTypePty +} ProcessType; + +typedef struct process Process; +typedef void (*process_exit_cb)(Process *proc, int status, void *data); +typedef void (*internal_process_cb)(Process *proc); + +struct process { +  ProcessType type; +  Loop *loop; +  void *data; +  int pid, status, refcount; +  // set to the hrtime of when process_stop was called for the process. +  uint64_t stopped_time; +  char **argv; +  Stream *in, *out, *err; +  process_exit_cb cb; +  internal_process_cb internal_exit_cb, internal_close_cb; +  bool closed, term_sent; +}; + +static inline Process process_init(ProcessType type, void *data) +{ +  return (Process) { +    .type = type, +    .data = data, +    .loop = NULL, +    .pid = 0, +    .status = 0, +    .refcount = 0, +    .stopped_time = 0, +    .argv = NULL, +    .in = NULL, +    .out = NULL, +    .err = NULL, +    .cb = NULL, +    .closed = false, +    .term_sent = false, +    .internal_close_cb = NULL, +    .internal_exit_cb = NULL +  }; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/process.h.generated.h" +#endif +#endif  // NVIM_EVENT_PROCESS_H diff --git a/src/nvim/os/pty_process.c b/src/nvim/event/pty_process.c index ff0bcfb6de..1e24d7c919 100644 --- a/src/nvim/os/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -20,92 +20,39 @@  #include <uv.h> -#include "nvim/func_attr.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/job_private.h" -#include "nvim/os/pty_process.h" -#include "nvim/memory.h" +#include "nvim/lib/klist.h" + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/pty_process.h" +#include "nvim/log.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pty_process.c.generated.h" +# include "event/pty_process.c.generated.h"  #endif -typedef struct { -  struct winsize winsize; -  uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -  int tty_fd; -} PtyProcess; - -void pty_process_init(Job *job) FUNC_ATTR_NONNULL_ALL -{ -  PtyProcess *ptyproc = xmalloc(sizeof(PtyProcess)); -  ptyproc->tty_fd = -1; - -  if (job->opts.writable) { -    uv_pipe_init(uv_default_loop(), &ptyproc->proc_stdin, 0); -    ptyproc->proc_stdin.data = NULL; -  } - -  if (job->opts.stdout_cb) { -    uv_pipe_init(uv_default_loop(), &ptyproc->proc_stdout, 0); -    ptyproc->proc_stdout.data = NULL; -  } - -  if (job->opts.stderr_cb) { -    uv_pipe_init(uv_default_loop(), &ptyproc->proc_stderr, 0); -    ptyproc->proc_stderr.data = NULL; -  } - -  job->proc_stdin = (uv_stream_t *)&ptyproc->proc_stdin; -  job->proc_stdout = (uv_stream_t *)&ptyproc->proc_stdout; -  job->proc_stderr = (uv_stream_t *)&ptyproc->proc_stderr; -  job->process = ptyproc; -} - -void pty_process_destroy(Job *job) FUNC_ATTR_NONNULL_ALL -{ -  xfree(job->opts.term_name); -  xfree(job->process); -  job->process = NULL; -} - -static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe) -  FUNC_ATTR_NONNULL_ALL -{ -  int fd_dup = dup(fd); -  if (fd_dup < 0) { -    ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); -    return false; -  } -  int uv_result = uv_pipe_open(pipe, fd_dup); -  if (uv_result) { -    ELOG("Failed to set pipe to descriptor %d: %s", -         fd_dup, uv_strerror(uv_result)); -    close(fd_dup); -    return false; -  } -  return true; -} -  static const unsigned int KILL_RETRIES = 5;  static const unsigned int KILL_TIMEOUT = 2;  // seconds -bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL +bool pty_process_spawn(PtyProcess *ptyproc) +  FUNC_ATTR_NONNULL_ALL  { -  int master; -  PtyProcess *ptyproc = job->process; -  ptyproc->winsize = (struct winsize){job->opts.height, job->opts.width, 0, 0}; +  Process *proc = (Process *)ptyproc; +  uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); +  ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};    struct termios termios;    init_termios(&termios);    uv_disable_stdio_inheritance(); - +  int master;    int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);    if (pid < 0) { +    ELOG("forkpty failed: %s", strerror(errno));      return false;    } else if (pid == 0) { -    init_child(job); +    init_child(ptyproc);      abort();    } @@ -120,23 +67,18 @@ bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL      goto error;    } -  if (job->opts.writable -      && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdin)) { +  if (proc->in && !set_duplicating_descriptor(master, &proc->in->uv.pipe)) {      goto error;    } - -  if (job->opts.stdout_cb -      && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdout)) { +  if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {      goto error;    } - -  if (job->opts.stderr_cb -      && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stderr)) { +  if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) {      goto error;    }    ptyproc->tty_fd = master; -  job->pid = pid; +  proc->pid = pid;    return true;  error: @@ -151,36 +93,44 @@ error:    }    if (child != pid) {      kill(pid, SIGKILL); +    waitpid(pid, NULL, 0);    }    return false;  } -void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL +void pty_process_resize(PtyProcess *ptyproc, uint16_t width, +    uint16_t height) +  FUNC_ATTR_NONNULL_ALL +{ +  ptyproc->winsize = (struct winsize){height, width, 0, 0}; +  ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize); +} + +void pty_process_close(PtyProcess *ptyproc) +  FUNC_ATTR_NONNULL_ALL  { -  pty_process_close_master(job); -  job_close_streams(job); -  job_decref(job); +  pty_process_close_master(ptyproc); +  Process *proc = (Process *)ptyproc; +  if (proc->internal_close_cb) { +    proc->internal_close_cb(proc); +  }  } -void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL +void pty_process_close_master(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL  { -  PtyProcess *ptyproc = job->process;    if (ptyproc->tty_fd >= 0) {      close(ptyproc->tty_fd);      ptyproc->tty_fd = -1;    }  } -void pty_process_resize(Job *job, uint16_t width, uint16_t height) -  FUNC_ATTR_NONNULL_ALL +void pty_process_teardown(Loop *loop)  { -  PtyProcess *ptyproc = job->process; -  ptyproc->winsize = (struct winsize){height, width, 0, 0}; -  ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize); +  uv_signal_stop(&loop->children_watcher);  } -static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL +static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL  {    unsetenv("COLUMNS");    unsetenv("LINES"); @@ -195,8 +145,8 @@ static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL    signal(SIGTERM, SIG_DFL);    signal(SIGALRM, SIG_DFL); -  setenv("TERM", job->opts.term_name ? job->opts.term_name : "ansi", 1); -  execvp(job->opts.argv[0], job->opts.argv); +  setenv("TERM", ptyproc->term_name ? ptyproc->term_name : "ansi", 1); +  execvp(ptyproc->process.argv[0], ptyproc->process.argv);    fprintf(stderr, "execvp failed: %s\n", strerror(errno));  } @@ -255,3 +205,50 @@ static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL    termios->c_cc[VMIN]     = 1;    termios->c_cc[VTIME]    = 0;  } + +static bool set_duplicating_descriptor(int fd, uv_pipe_t *pipe) +  FUNC_ATTR_NONNULL_ALL +{ +  int fd_dup = dup(fd); +  if (fd_dup < 0) { +    ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); +    return false; +  } +  int uv_result = uv_pipe_open(pipe, fd_dup); +  if (uv_result) { +    ELOG("Failed to set pipe to descriptor %d: %s", +         fd_dup, uv_strerror(uv_result)); +    close(fd_dup); +    return false; +  } +  return true; +} + +static void chld_handler(uv_signal_t *handle, int signum) +{ +  int stat = 0; +  int pid; + +  do { +    pid = waitpid(-1, &stat, WNOHANG); +  } while (pid < 0 && errno == EINTR); + +  if (pid <= 0) { +    return; +  } + +  Loop *loop = handle->loop->data; + +  kl_iter(WatcherPtr, loop->children, current) { +    Process *proc = (*current)->data; +    if (proc->pid == pid) { +      if (WIFEXITED(stat)) { +        proc->status = WEXITSTATUS(stat); +      } else if (WIFSIGNALED(stat)) { +        proc->status = WTERMSIG(stat); +      } +      proc->internal_exit_cb(proc); +      break; +    } +  } +} diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h new file mode 100644 index 0000000000..a12b5489c5 --- /dev/null +++ b/src/nvim/event/pty_process.h @@ -0,0 +1,30 @@ +#ifndef NVIM_EVENT_PTY_PROCESS_H +#define NVIM_EVENT_PTY_PROCESS_H + +#include <sys/ioctl.h> + +#include "nvim/event/process.h" + +typedef struct pty_process { +  Process process; +  char *term_name; +  uint16_t width, height; +  struct winsize winsize; +  int tty_fd; +} PtyProcess; + +static inline PtyProcess pty_process_init(void *data) +{ +  PtyProcess rv; +  rv.process = process_init(kProcessTypePty, data); +  rv.term_name = NULL; +  rv.width = 80; +  rv.height = 24; +  rv.tty_fd = -1; +  return rv; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/pty_process.h.generated.h" +#endif +#endif  // NVIM_EVENT_PTY_PROCESS_H diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c new file mode 100644 index 0000000000..78c044347f --- /dev/null +++ b/src/nvim/event/rstream.c @@ -0,0 +1,167 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/rstream.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/memory.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/event/loop.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.c.generated.h" +#endif + +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) +  FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(loop, stream, fd, NULL, data); +  rstream_init(stream, bufsize); +} + +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) +  FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(NULL, stream, -1, uvstream, data); +  rstream_init(stream, bufsize); +} + +void rstream_init(Stream *stream, size_t bufsize) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  stream->buffer = rbuffer_new(bufsize); +  stream->buffer->data = stream; +  stream->buffer->full_cb = on_rbuffer_full; +  stream->buffer->nonfull_cb = on_rbuffer_nonfull; +} + + +/// Starts watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_start(Stream *stream, stream_read_cb cb) +{ +  stream->read_cb = cb; +  if (stream->uvstream) { +    uv_read_start(stream->uvstream, alloc_cb, read_cb); +  } else { +    uv_idle_start(&stream->uv.idle, fread_idle_cb); +  } +} + +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(Stream *stream) +{ +  if (stream->uvstream) { +    uv_read_stop(stream->uvstream); +  } else { +    uv_idle_stop(&stream->uv.idle); +  } +} + +static void on_rbuffer_full(RBuffer *buf, void *data) +{ +  rstream_stop(data); +} + +static void on_rbuffer_nonfull(RBuffer *buf, void *data) +{ +  Stream *stream = data; +  assert(stream->read_cb); +  rstream_start(stream, stream->read_cb); +} + +// Callbacks used by libuv + +// Called by libuv to allocate memory for reading. +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ +  Stream *stream = handle->data; +  buf->base = rbuffer_write_ptr(stream->buffer, &buf->len); +} + +// Callback invoked by libuv after it copies the data into the buffer provided +// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a +// 0-length buffer. +static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) +{ +  Stream *stream = uvstream->data; + +  if (cnt <= 0) { +    if (cnt != UV_ENOBUFS +        // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: +        // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. +        // +        // We don't need to do anything with the RBuffer because the next call +        // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` +        // won't be called) +        && cnt != 0) { +      DLOG("Closing Stream(%p) because of %s(%zd)", stream, +           uv_strerror((int)cnt), cnt); +      // Read error or EOF, either way stop the stream and invoke the callback +      // with eof == true +      uv_read_stop(uvstream); +      stream->read_cb(stream, stream->buffer, stream->data, true); +    } +    return; +  } + +  // at this point we're sure that cnt is positive, no error occurred +  size_t nread = (size_t)cnt; +  // Data was already written, so all we need is to update 'wpos' to reflect +  // the space actually used in the buffer. +  rbuffer_produced(stream->buffer, nread); +  stream->read_cb(stream, stream->buffer, stream->data, false); +} + +// Called by the by the 'idle' handle to emulate a reading event +static void fread_idle_cb(uv_idle_t *handle) +{ +  uv_fs_t req; +  Stream *stream = handle->data; + +  stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len); + +  // the offset argument to uv_fs_read is int64_t, could someone really try +  // to read more than 9 quintillion (9e18) bytes? +  // upcast is meant to avoid tautological condition warning on 32 bits +  uintmax_t fpos_intmax = stream->fpos; +  if (fpos_intmax > INT64_MAX) { +    ELOG("stream offset overflow"); +    preserve_exit(); +  } + +  // Synchronous read +  uv_fs_read( +      handle->loop, +      &req, +      stream->fd, +      &stream->uvbuf, +      1, +      (int64_t) stream->fpos, +      NULL); + +  uv_fs_req_cleanup(&req); + +  if (req.result <= 0) { +    uv_idle_stop(&stream->uv.idle); +    stream->read_cb(stream, stream->buffer, stream->data, true); +    return; +  } + +  // no errors (req.result (ssize_t) is positive), it's safe to cast. +  size_t nread = (size_t) req.result; +  rbuffer_produced(stream->buffer, nread); +  stream->fpos += nread; +} diff --git a/src/nvim/event/rstream.h b/src/nvim/event/rstream.h new file mode 100644 index 0000000000..f30ad79ee5 --- /dev/null +++ b/src/nvim/event/rstream.h @@ -0,0 +1,16 @@ +#ifndef NVIM_EVENT_RSTREAM_H +#define NVIM_EVENT_RSTREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.h.generated.h" +#endif +#endif  // NVIM_EVENT_RSTREAM_H diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c new file mode 100644 index 0000000000..63133b4f57 --- /dev/null +++ b/src/nvim/event/signal.c @@ -0,0 +1,52 @@ +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/signal.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/signal.c.generated.h" +#endif + + +void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ +  uv_signal_init(&loop->uv, &watcher->uv); +  watcher->uv.data = watcher; +  watcher->data = data; +  watcher->cb = NULL; +} + +void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) +  FUNC_ATTR_NONNULL_ALL +{ +  watcher->cb = cb; +  uv_signal_start(&watcher->uv, signal_watcher_cb, signum); +} + +void signal_watcher_stop(SignalWatcher *watcher) +  FUNC_ATTR_NONNULL_ALL +{ +  uv_signal_stop(&watcher->uv); +} + +void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  watcher->close_cb = cb; +  uv_close((uv_handle_t *)&watcher->uv, close_cb); +} + +static void signal_watcher_cb(uv_signal_t *handle, int signum) +{ +  SignalWatcher *watcher = handle->data; +  watcher->cb(watcher, signum, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ +  SignalWatcher *watcher = handle->data; +  if (watcher->close_cb) { +    watcher->close_cb(watcher, watcher->data); +  } +} diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h new file mode 100644 index 0000000000..c269fa9d95 --- /dev/null +++ b/src/nvim/event/signal.h @@ -0,0 +1,22 @@ +#ifndef NVIM_EVENT_SIGNAL_H +#define NVIM_EVENT_SIGNAL_H + +#include <uv.h> + +#include "nvim/event/loop.h" + +typedef struct signal_watcher SignalWatcher; +typedef void (*signal_cb)(SignalWatcher *watcher, int signum, void *data); +typedef void (*signal_close_cb)(SignalWatcher *watcher, void *data); + +struct signal_watcher { +  uv_signal_t uv; +  void *data; +  signal_cb cb; +  signal_close_cb close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/signal.h.generated.h" +#endif +#endif  // NVIM_EVENT_SIGNAL_H diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c new file mode 100644 index 0000000000..bdc632abf0 --- /dev/null +++ b/src/nvim/event/socket.c @@ -0,0 +1,157 @@ +#include <assert.h> +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/socket.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/os/os.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/strings.h" +#include "nvim/path.h" +#include "nvim/memory.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/socket.c.generated.h" +#endif + +#define NVIM_DEFAULT_TCP_PORT 7450 + +void socket_watcher_init(Loop *loop, SocketWatcher *watcher, +    const char *endpoint, void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3) +{ +  // Trim to `ADDRESS_MAX_SIZE` +  if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr)) +      >= sizeof(watcher->addr)) { +    // TODO(aktau): since this is not what the user wanted, perhaps we +    // should return an error here +    WLOG("Address was too long, truncated to %s", watcher->addr); +  } + +  bool tcp = true; +  char ip[16], *ip_end = xstrchrnul(watcher->addr, ':'); + +  // (ip_end - addr) is always > 0, so convert to size_t +  size_t addr_len = (size_t)(ip_end - watcher->addr); + +  if (addr_len > sizeof(ip) - 1) { +    // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) +    addr_len = sizeof(ip) - 1; +  } + +  // Extract the address part +  xstrlcpy(ip, watcher->addr, addr_len + 1); +  int port = NVIM_DEFAULT_TCP_PORT; + +  if (*ip_end == ':') { +    // Extract the port +    long lport = strtol(ip_end + 1, NULL, 10); // NOLINT +    if (lport <= 0 || lport > 0xffff) { +      // Invalid port, treat as named pipe or unix socket +      tcp = false; +    } else { +      port = (int) lport; +    } +  } + +  if (tcp) { +    // Try to parse ip address +    if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) { +      // Invalid address, treat as named pipe or unix socket +      tcp = false; +    } +  } + +  if (tcp) { +    uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle); +    watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle; +  } else { +    uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0); +    watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle; +  } + +  watcher->stream->data = watcher; +  watcher->cb = NULL; +  watcher->close_cb = NULL; +} + +int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) +  FUNC_ATTR_NONNULL_ALL +{ +  watcher->cb = cb; +  int result; + +  if (watcher->stream->type == UV_TCP) { +    result = uv_tcp_bind(&watcher->uv.tcp.handle, +                         (const struct sockaddr *)&watcher->uv.tcp.addr, 0); +  } else { +    result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr); +  } + +  if (result == 0) { +    result = uv_listen(watcher->stream, backlog, connection_cb); +  } + +  assert(result <= 0);  // libuv should have returned -errno or zero. +  if (result < 0) { +    if (result == -EACCES) { +      // Libuv converts ENOENT to EACCES for Windows compatibility, but if +      // the parent directory does not exist, ENOENT would be more accurate. +      *path_tail((char_u *)watcher->addr) = NUL; +      if (!os_file_exists((char_u *)watcher->addr)) { +        result = -ENOENT; +      } +    } +    return result; +  } + +  return 0; +} + +int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) +{ +  uv_stream_t *client; + +  if (watcher->stream->type == UV_TCP) { +    client = (uv_stream_t *)&stream->uv.tcp; +    uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); +  } else { +    client = (uv_stream_t *)&stream->uv.pipe; +    uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); +  } + +  int result = uv_accept(watcher->stream, client); + +  if (result) { +    uv_close((uv_handle_t *)client, NULL); +    return result; +  } + +  stream_init(NULL, stream, -1, client, data); +  return 0; +} + +void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  watcher->close_cb = cb; +  uv_close((uv_handle_t *)watcher->stream, close_cb); +} + +static void connection_cb(uv_stream_t *handle, int status) +{ +  SocketWatcher *watcher = handle->data; +  watcher->cb(watcher, status, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ +  SocketWatcher *watcher = handle->data; +  if (watcher->close_cb) { +    watcher->close_cb(watcher, watcher->data); +  } +} diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h new file mode 100644 index 0000000000..17fd39f33b --- /dev/null +++ b/src/nvim/event/socket.h @@ -0,0 +1,38 @@ +#ifndef NVIM_EVENT_SOCKET_H +#define NVIM_EVENT_SOCKET_H + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" + +#define ADDRESS_MAX_SIZE 256 + +typedef struct socket_watcher SocketWatcher; +typedef void (*socket_cb)(SocketWatcher *watcher, int result, void *data); +typedef void (*socket_close_cb)(SocketWatcher *watcher, void *data); + +struct socket_watcher { +  // Pipe/socket path, or TCP address string +  char addr[ADDRESS_MAX_SIZE]; +  // TCP server or unix socket (named pipe on Windows) +  union { +    struct { +      uv_tcp_t handle; +      struct sockaddr_in addr; +    } tcp; +    struct { +      uv_pipe_t handle; +    } pipe; +  } uv; +  uv_stream_t *stream; +  void *data; +  socket_cb cb; +  socket_close_cb close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/socket.h.generated.h" +#endif +#endif  // NVIM_EVENT_SOCKET_H diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c new file mode 100644 index 0000000000..959b532146 --- /dev/null +++ b/src/nvim/event/stream.c @@ -0,0 +1,108 @@ +#include <assert.h> +#include <stdio.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/rbuffer.h" +#include "nvim/event/stream.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.c.generated.h" +#endif + +/// Sets the stream associated with `fd` to "blocking" mode. +/// +/// @return `0` on success, or `-errno` on failure. +int stream_set_blocking(int fd, bool blocking) +{ +  // Private loop to avoid conflict with existing watcher(s): +  //    uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. +  uv_loop_t loop; +  uv_pipe_t stream; +  uv_loop_init(&loop); +  uv_pipe_init(&loop, &stream, 0); +  uv_pipe_open(&stream, fd); +  int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); +  uv_close((uv_handle_t *)&stream, NULL); +  uv_run(&loop, UV_RUN_NOWAIT);  // not necessary, but couldn't hurt. +  uv_loop_close(&loop); +  return retval; +} + +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, +    void *data) +{ +  stream->uvstream = uvstream; + +  if (fd >= 0) { +    uv_handle_type type = uv_guess_handle(fd); +    stream->fd = fd; + +    if (type == UV_FILE) { +      // Non-blocking file reads are simulated with an idle handle that reads in +      // chunks of the ring buffer size, giving time for other events to be +      // processed between reads. +      uv_idle_init(&loop->uv, &stream->uv.idle); +      stream->uv.idle.data = stream; +    } else { +      assert(type == UV_NAMED_PIPE || type == UV_TTY); +      uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); +      uv_pipe_open(&stream->uv.pipe, fd); +      stream->uvstream = (uv_stream_t *)&stream->uv.pipe; +    } +  } + +  if (stream->uvstream) { +    stream->uvstream->data = stream; +  } + +  stream->data = data; +  stream->internal_data = NULL; +  stream->fpos = 0; +  stream->curmem = 0; +  stream->maxmem = 0; +  stream->pending_reqs = 0; +  stream->read_cb = NULL; +  stream->write_cb = NULL; +  stream->close_cb = NULL; +  stream->internal_close_cb = NULL; +  stream->closed = false; +  stream->buffer = NULL; +} + +void stream_close(Stream *stream, stream_close_cb on_stream_close) +{ +  assert(!stream->closed); + +  if (stream->buffer) { +    rbuffer_free(stream->buffer); +  } + +  stream->closed = true; +  stream->close_cb = on_stream_close; + +  if (!stream->pending_reqs) { +    stream_close_handle(stream); +  } +} + +void stream_close_handle(Stream *stream) +{ +  if (stream->uvstream) { +    uv_close((uv_handle_t *)stream->uvstream, close_cb); +  } else { +    uv_close((uv_handle_t *)&stream->uv.idle, close_cb); +  } +} + +static void close_cb(uv_handle_t *handle) +{ +  Stream *stream = handle->data; +  if (stream->close_cb) { +    stream->close_cb(stream, stream->data); +  } +  if (stream->internal_close_cb) { +    stream->internal_close_cb(stream, stream->internal_data); +  } +} diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h new file mode 100644 index 0000000000..37410b2036 --- /dev/null +++ b/src/nvim/event/stream.h @@ -0,0 +1,55 @@ +#ifndef NVIM_EVENT_STREAM_H +#define NVIM_EVENT_STREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/rbuffer.h" + +typedef struct stream Stream; +/// Type of function called when the Stream buffer is filled with data +/// +/// @param stream The Stream instance +/// @param rbuffer The associated RBuffer instance +/// @param data User-defined data +/// @param eof If the stream reached EOF. +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, +    bool eof); + +/// Type of function called when the Stream has information about a write +/// request. +/// +/// @param wstream The Stream instance +/// @param data User-defined data +/// @param status 0 on success, anything else indicates failure +typedef void (*stream_write_cb)(Stream *stream, void *data, int status); +typedef void (*stream_close_cb)(Stream *stream, void *data); + +struct stream { +  union { +    uv_pipe_t pipe; +    uv_tcp_t tcp; +    uv_idle_t idle; +  } uv; +  uv_stream_t *uvstream; +  uv_buf_t uvbuf; +  RBuffer *buffer; +  uv_file fd; +  stream_read_cb read_cb; +  stream_write_cb write_cb; +  stream_close_cb close_cb, internal_close_cb; +  size_t fpos; +  size_t curmem; +  size_t maxmem; +  size_t pending_reqs; +  void *data, *internal_data; +  bool closed; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.h.generated.h" +#endif +#endif  // NVIM_EVENT_STREAM_H diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c new file mode 100644 index 0000000000..ce33cdfc10 --- /dev/null +++ b/src/nvim/event/time.c @@ -0,0 +1,55 @@ +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/time.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/time.c.generated.h" +#endif + + +void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ +  uv_timer_init(&loop->uv, &watcher->uv); +  watcher->uv.data = watcher; +  watcher->data = data; +} + +void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, +    uint64_t repeat) +  FUNC_ATTR_NONNULL_ALL +{ +  watcher->cb = cb; +  uv_timer_start(&watcher->uv, time_watcher_cb, timeout, repeat); +} + +void time_watcher_stop(TimeWatcher *watcher) +  FUNC_ATTR_NONNULL_ALL +{ +  uv_timer_stop(&watcher->uv); +} + +void time_watcher_close(TimeWatcher *watcher, time_cb cb) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  watcher->close_cb = cb; +  uv_close((uv_handle_t *)&watcher->uv, close_cb); +} + +static void time_watcher_cb(uv_timer_t *handle) +  FUNC_ATTR_NONNULL_ALL +{ +  TimeWatcher *watcher = handle->data; +  watcher->cb(watcher, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ +  TimeWatcher *watcher = handle->data; +  if (watcher->close_cb) { +    watcher->close_cb(watcher, watcher->data); +  } +} diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h new file mode 100644 index 0000000000..ee50e53d11 --- /dev/null +++ b/src/nvim/event/time.h @@ -0,0 +1,20 @@ +#ifndef NVIM_EVENT_TIME_H +#define NVIM_EVENT_TIME_H + +#include <uv.h> + +#include "nvim/event/loop.h" + +typedef struct time_watcher TimeWatcher; +typedef void (*time_cb)(TimeWatcher *watcher, void *data); + +struct time_watcher { +  uv_timer_t uv; +  void *data; +  time_cb cb, close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/time.h.generated.h" +#endif +#endif  // NVIM_EVENT_TIME_H diff --git a/src/nvim/event/uv_process.c b/src/nvim/event/uv_process.c new file mode 100644 index 0000000000..21c2fd1790 --- /dev/null +++ b/src/nvim/event/uv_process.c @@ -0,0 +1,77 @@ +#include <assert.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/process.h" +#include "nvim/event/uv_process.h" +#include "nvim/log.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/uv_process.c.generated.h" +#endif + +bool uv_process_spawn(UvProcess *uvproc) +  FUNC_ATTR_NONNULL_ALL +{ +  Process *proc = (Process *)uvproc; +  uvproc->uvopts.file = proc->argv[0]; +  uvproc->uvopts.args = proc->argv; +  uvproc->uvopts.flags = UV_PROCESS_WINDOWS_HIDE; +  uvproc->uvopts.exit_cb = exit_cb; +  uvproc->uvopts.cwd = NULL; +  uvproc->uvopts.env = NULL; +  uvproc->uvopts.stdio = uvproc->uvstdio; +  uvproc->uvopts.stdio_count = 3; +  uvproc->uvstdio[0].flags = UV_IGNORE; +  uvproc->uvstdio[1].flags = UV_IGNORE; +  uvproc->uvstdio[2].flags = UV_IGNORE; +  uvproc->uv.data = proc; + +  if (proc->in) { +    uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; +    uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe; +  } + +  if (proc->out) { +    uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; +    uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe; +  } + +  if (proc->err) { +    uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; +    uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe; +  } + +  int status; +  if ((status = uv_spawn(&proc->loop->uv, &uvproc->uv, &uvproc->uvopts))) { +    ELOG("uv_spawn failed: %s", uv_strerror(status)); +    return false; +  } + +  proc->pid = uvproc->uv.pid; +  return true; +} + +void uv_process_close(UvProcess *uvproc) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  uv_close((uv_handle_t *)&uvproc->uv, close_cb); +} + +static void close_cb(uv_handle_t *handle) +{ +  Process *proc = handle->data; +  if (proc->internal_close_cb) { +    proc->internal_close_cb(proc); +  } +} + +static void exit_cb(uv_process_t *handle, int64_t status, int term_signal) +{ +  Process *proc = handle->data; +  proc->status = (int)status; +  proc->internal_exit_cb(proc); +} diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h new file mode 100644 index 0000000000..a17f1446b3 --- /dev/null +++ b/src/nvim/event/uv_process.h @@ -0,0 +1,25 @@ +#ifndef NVIM_EVENT_UV_PROCESS_H +#define NVIM_EVENT_UV_PROCESS_H + +#include <uv.h> + +#include "nvim/event/process.h" + +typedef struct uv_process { +  Process process; +  uv_process_t uv; +  uv_process_options_t uvopts; +  uv_stdio_container_t uvstdio[3]; +} UvProcess; + +static inline UvProcess uv_process_init(void *data) +{ +  UvProcess rv; +  rv.process = process_init(kProcessTypeUv, data); +  return rv; +} + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/uv_process.h.generated.h" +#endif +#endif  // NVIM_EVENT_UV_PROCESS_H diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c new file mode 100644 index 0000000000..5fcb724fe3 --- /dev/null +++ b/src/nvim/event/wstream.c @@ -0,0 +1,162 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/wstream.h" +#include "nvim/vim.h" +#include "nvim/memory.h" + +#define DEFAULT_MAXMEM 1024 * 1024 * 10 + +typedef struct { +  Stream *stream; +  WBuffer *buffer; +  uv_write_t uv_req; +} WRequest; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.c.generated.h" +#endif + +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(loop, stream, fd, NULL, data); +  wstream_init(stream, maxmem); +} + +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(NULL, stream, -1, uvstream, data); +  wstream_init(stream, maxmem); +} + +void wstream_init(Stream *stream, size_t maxmem) +{ +  stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM; +} + +/// Sets a callback that will be called on completion of a write request, +/// indicating failure/success. +/// +/// This affects all requests currently in-flight as well. Overwrites any +/// possible earlier callback. +/// +/// @note This callback will not fire if the write request couldn't even be +///       queued properly (i.e.: when `wstream_write() returns an error`). +/// +/// @param stream The `Stream` instance +/// @param cb The callback +void wstream_set_write_cb(Stream *stream, stream_write_cb cb) +  FUNC_ATTR_NONNULL_ALL +{ +  stream->write_cb = cb; +} + +/// Queues data for writing to the backing file descriptor of a `Stream` +/// instance. This will fail if the write would cause the Stream use more +/// memory than specified by `maxmem`. +/// +/// @param stream The `Stream` instance +/// @param buffer The buffer which contains data to be written +/// @return false if the write failed +bool wstream_write(Stream *stream, WBuffer *buffer) +  FUNC_ATTR_NONNULL_ALL +{ +  assert(stream->maxmem); +  // This should not be called after a stream was freed +  assert(!stream->closed); + +  if (stream->curmem > stream->maxmem) { +    goto err; +  } + +  stream->curmem += buffer->size; + +  WRequest *data = xmalloc(sizeof(WRequest)); +  data->stream = stream; +  data->buffer = buffer; +  data->uv_req.data = data; + +  uv_buf_t uvbuf; +  uvbuf.base = buffer->data; +  uvbuf.len = buffer->size; + +  if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) { +    xfree(data); +    goto err; +  } + +  stream->pending_reqs++; +  return true; + +err: +  wstream_release_wbuffer(buffer); +  return false; +} + +/// Creates a WBuffer object for holding output data. Instances of this +/// object can be reused across Stream instances, and the memory is freed +/// automatically when no longer needed(it tracks the number of references +/// internally) +/// +/// @param data Data stored by the WBuffer +/// @param size The size of the data array +/// @param refcount The number of references for the WBuffer. This will be used +///        by Stream instances to decide when a WBuffer should be freed. +/// @param cb Pointer to function that will be responsible for freeing +///        the buffer data(passing 'free' will work as expected). +/// @return The allocated WBuffer instance +WBuffer *wstream_new_buffer(char *data, +                            size_t size, +                            size_t refcount, +                            wbuffer_data_finalizer cb) +{ +  WBuffer *rv = xmalloc(sizeof(WBuffer)); +  rv->size = size; +  rv->refcount = refcount; +  rv->cb = cb; +  rv->data = data; + +  return rv; +} + +static void write_cb(uv_write_t *req, int status) +{ +  WRequest *data = req->data; + +  data->stream->curmem -= data->buffer->size; + +  wstream_release_wbuffer(data->buffer); + +  if (data->stream->write_cb) { +    data->stream->write_cb(data->stream, data->stream->data, status); +  } + +  data->stream->pending_reqs--; + +  if (data->stream->closed && data->stream->pending_reqs == 0) { +    // Last pending write, free the stream; +    stream_close_handle(data->stream); +  } + +  xfree(data); +} + +void wstream_release_wbuffer(WBuffer *buffer) +{ +  if (!--buffer->refcount) { +    if (buffer->cb) { +      buffer->cb(buffer->data); +    } + +    xfree(buffer); +  } +} diff --git a/src/nvim/event/wstream.h b/src/nvim/event/wstream.h new file mode 100644 index 0000000000..9008de0d97 --- /dev/null +++ b/src/nvim/event/wstream.h @@ -0,0 +1,24 @@ +#ifndef NVIM_EVENT_WSTREAM_H +#define NVIM_EVENT_WSTREAM_H + +#include <stdint.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + +typedef struct wbuffer WBuffer; +typedef void (*wbuffer_data_finalizer)(void *data); + +struct wbuffer { +  size_t size, refcount; +  char *data; +  wbuffer_data_finalizer cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.h.generated.h" +#endif +#endif  // NVIM_EVENT_WSTREAM_H diff --git a/src/nvim/ex_docmd.c b/src/nvim/ex_docmd.c index e53b2e47c5..3b6e05fd8a 100644 --- a/src/nvim/ex_docmd.c +++ b/src/nvim/ex_docmd.c @@ -73,8 +73,8 @@  #include "nvim/os/time.h"  #include "nvim/ex_cmds_defs.h"  #include "nvim/mouse.h" -#include "nvim/os/rstream.h" -#include "nvim/os/wstream.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  static int quitmore = 0;  static int ex_pressedreturn = FALSE; diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c index 3af035a6e3..785db1dbd1 100644 --- a/src/nvim/ex_getln.c +++ b/src/nvim/ex_getln.c @@ -63,7 +63,7 @@  #include "nvim/window.h"  #include "nvim/ui.h"  #include "nvim/os/os.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  /*   * Variables shared between getcmdline(), redrawcmdline() and others. @@ -298,14 +298,14 @@ getcmdline (      /* Get a character.  Ignore K_IGNORE, it should not do anything, such       * as stop completion. */ -    event_enable_deferred(); +    loop_enable_deferred_events(&loop);      do {        c = safe_vgetc();      } while (c == K_IGNORE); -    event_disable_deferred(); +    loop_disable_deferred_events(&loop);      if (c == K_EVENT) { -      event_process(); +      loop_process_event(&loop);        continue;      } diff --git a/src/nvim/getchar.c b/src/nvim/getchar.c index 864aa6a622..bbeef376b0 100644 --- a/src/nvim/getchar.c +++ b/src/nvim/getchar.c @@ -49,7 +49,7 @@  #include "nvim/strings.h"  #include "nvim/ui.h"  #include "nvim/undo.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  #include "nvim/os/input.h"  #include "nvim/os/os.h" diff --git a/src/nvim/globals.h b/src/nvim/globals.h index 1d93900a94..c7164ef1f1 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -28,6 +28,7 @@  #include "nvim/menu.h"  #include "nvim/syntax_defs.h"  #include "nvim/types.h" +#include "nvim/event/loop.h"  /*   * definition of global variables @@ -1216,6 +1217,7 @@ EXTERN char *ignoredp;  // If a msgpack-rpc channel should be started over stdin/stdout  EXTERN bool embedded_mode INIT(= false); +EXTERN Loop loop;  /// Used to track the status of external functions.  /// Currently only used for iconv(). diff --git a/src/nvim/if_cscope.c b/src/nvim/if_cscope.c index 99926ecf16..d6c5cf4fd5 100644 --- a/src/nvim/if_cscope.c +++ b/src/nvim/if_cscope.c @@ -31,6 +31,7 @@  #include "nvim/window.h"  #include "nvim/os/os.h"  #include "nvim/os/input.h" +#include "nvim/event/stream.h"  #include <sys/types.h>  #include <sys/stat.h> diff --git a/src/nvim/lib/klist.h b/src/nvim/lib/klist.h index 10d6846133..1280a927e8 100644 --- a/src/nvim/lib/klist.h +++ b/src/nvim/lib/klist.h @@ -136,6 +136,6 @@  // `break` statement is executed before the next iteration.  #define kl_iter(name, kl, p) kl_iter_at(name, kl, p, NULL)  #define kl_iter_at(name, kl, p, h) \ -  for (kl1_##name *p = h ? h : kl->head; p != kl->tail; p = p->next) +  for (kl1_##name **p = h ? h : &kl->head; *p != kl->tail; p = &(*p)->next)  #endif diff --git a/src/nvim/main.c b/src/nvim/main.c index 50c16c51d6..e2ae63e134 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -61,9 +61,13 @@  #include "nvim/os/input.h"  #include "nvim/os/os.h"  #include "nvim/os/time.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  #include "nvim/os/signal.h" +#include "nvim/event/process.h" +#include "nvim/msgpack_rpc/defs.h"  #include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/msgpack_rpc/channel.h"  #include "nvim/api/private/defs.h"  #include "nvim/api/private/helpers.h"  #include "nvim/api/private/handle.h" @@ -133,6 +137,41 @@ static const char *err_extra_cmd =    N_("Too many \"+command\", \"-c command\" or \"--cmd command\" arguments"); +void event_init(void) +{ +  loop_init(&loop, NULL); +  // early msgpack-rpc initialization +  msgpack_rpc_init_method_table(); +  msgpack_rpc_helpers_init(); +  // Initialize input events +  input_init(); +  // Timer to wake the event loop if a timeout argument is passed to +  // `event_poll` +  // Signals +  signal_init(); +  // finish mspgack-rpc initialization +  channel_init(); +  server_init(); +  terminal_init(); +} + +void event_teardown(void) +{ +  if (!loop.deferred_events) { +    return; +  } + +  loop_process_all_events(&loop); +  input_stop(); +  channel_teardown(); +  process_teardown(&loop); +  server_teardown(); +  signal_teardown(); +  terminal_teardown(); + +  loop_close(&loop); +} +  /// Performs early initialization.  ///  /// Needed for unit tests. Must be called after `time_init()`. diff --git a/src/nvim/misc1.c b/src/nvim/misc1.c index 0737caec5d..4f23b3da63 100644 --- a/src/nvim/misc1.c +++ b/src/nvim/misc1.c @@ -61,6 +61,7 @@  #include "nvim/os/shell.h"  #include "nvim/os/input.h"  #include "nvim/os/time.h" +#include "nvim/event/stream.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "misc1.c.generated.h" diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 2a81b4f160..861614f147 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -9,13 +9,11 @@  #include "nvim/api/vim.h"  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/remote_ui.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" +#include "nvim/event/loop.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/socket.h"  #include "nvim/msgpack_rpc/helpers.h"  #include "nvim/vim.h"  #include "nvim/ascii.h" @@ -34,6 +32,12 @@  #define log_server_msg(...)  #endif +typedef enum { +  kChannelTypeSocket, +  kChannelTypeProc, +  kChannelTypeStdio +} ChannelType; +  typedef struct {    uint64_t request_id;    bool returned, errored; @@ -45,15 +49,21 @@ typedef struct {    size_t refcount;    size_t pending_requests;    PMap(cstr_t) *subscribed_events; -  bool is_job, closed; +  bool closed; +  ChannelType type;    msgpack_unpacker *unpacker;    union { -    Job *job; +    Stream stream; +    struct { +      UvProcess uvproc; +      Stream in; +      Stream out; +      Stream err; +    } process;      struct { -      RStream *read; -      WStream *write; -      uv_stream_t *uv; -    } streams; +      Stream in; +      Stream out; +    } std;    } data;    uint64_t next_request_id;    kvec_t(ChannelCallFrame *) call_stack; @@ -104,57 +114,48 @@ void channel_teardown(void)    });  } -/// Creates an API channel by starting a job and connecting to its +/// Creates an API channel by starting a process and connecting to its  /// stdin/stdout. stderr is forwarded to the editor error stream.  ///  /// @param argv The argument vector for the process. [consumed]  /// @return The channel id (> 0), on success.  ///         0, on error. -uint64_t channel_from_job(char **argv) -{ -  Channel *channel = register_channel(); -  channel->is_job = true; -  incref(channel);  // job channels are only closed by the exit_cb - -  int status; -  JobOptions opts = JOB_OPTIONS_INIT; -  opts.argv = argv; -  opts.data = channel; -  opts.stdout_cb = job_out; -  opts.stderr_cb = job_err; -  opts.exit_cb = job_exit; -  channel->data.job = job_start(opts, &status); - -  if (status <= 0) { -    if (status == 0) {  // Two decrefs needed if status == 0. -      decref(channel);  // Only one needed if status < 0, -    }                   // because exit_cb will do the second one. +uint64_t channel_from_process(char **argv) +{ +  Channel *channel = register_channel(kChannelTypeProc); +  channel->data.process.uvproc = uv_process_init(channel); +  Process *proc = &channel->data.process.uvproc.process; +  proc->argv = argv; +  proc->in = &channel->data.process.in; +  proc->out = &channel->data.process.out; +  proc->err = &channel->data.process.err; +  proc->cb = process_exit; +  if (!process_spawn(&loop, proc)) { +    loop_poll_events(&loop, 0);      decref(channel);      return 0;    } +  incref(channel);  // process channels are only closed by the exit_cb +  wstream_init(proc->in, 0); +  rstream_init(proc->out, 0); +  rstream_start(proc->out, parse_msgpack); +  rstream_init(proc->err, 0); +  rstream_start(proc->err, forward_stderr); +    return channel->id;  } -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection +/// Creates an API channel from a tcp/pipe socket connection  /// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) +/// @param watcher The SocketWatcher ready to accept the connection +void channel_from_connection(SocketWatcher *watcher)  { -  Channel *channel = register_channel(); -  stream->data = NULL; -  channel->is_job = false; -  // read stream -  channel->data.streams.read = rstream_new(parse_msgpack, -                                           rbuffer_new(CHANNEL_BUFFER_SIZE), -                                           channel); -  rstream_set_stream(channel->data.streams.read, stream); -  rstream_start(channel->data.streams.read); -  // write stream -  channel->data.streams.write = wstream_new(0); -  wstream_set_stream(channel->data.streams.write, stream); -  channel->data.streams.uv = stream; +  Channel *channel = register_channel(kChannelTypeSocket); +  socket_watcher_accept(watcher, &channel->data.stream, channel); +  wstream_init(&channel->data.stream, 0); +  rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); +  rstream_start(&channel->data.stream, parse_msgpack);  }  /// Sends event/arguments to channel @@ -220,7 +221,7 @@ Object channel_send_call(uint64_t id,    ChannelCallFrame frame = {request_id, false, false, NIL};    kv_push(ChannelCallFrame *, channel->call_stack, &frame);    channel->pending_requests++; -  event_poll_until(-1, frame.returned); +  LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned);    (void)kv_pop(channel->call_stack);    channel->pending_requests--; @@ -313,44 +314,32 @@ bool channel_close(uint64_t id)  /// Neovim  static void channel_from_stdio(void)  { -  Channel *channel = register_channel(); +  Channel *channel = register_channel(kChannelTypeStdio);    incref(channel);  // stdio channels are only closed on exit -  channel->is_job = false;    // read stream -  channel->data.streams.read = rstream_new(parse_msgpack, -                                           rbuffer_new(CHANNEL_BUFFER_SIZE), -                                           channel); -  rstream_set_file(channel->data.streams.read, 0); -  rstream_start(channel->data.streams.read); +  rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, +      channel); +  rstream_start(&channel->data.std.in, parse_msgpack);    // write stream -  channel->data.streams.write = wstream_new(0); -  wstream_set_file(channel->data.streams.write, 1); -  channel->data.streams.uv = NULL; -} - -static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof) -{ -  Job *job = data; -  parse_msgpack(rstream, buf, job_data(job), eof); +  wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);  } -static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)  {    while (rbuffer_size(rbuf)) {      char buf[256];      size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);      buf[read] = NUL; -    ELOG("Channel %" PRIu64 " stderr: %s", -         ((Channel *)job_data(data))->id, buf); +    ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);    }  } -static void job_exit(Job *job, int status, void *data) +static void process_exit(Process *proc, int status, void *data)  {    decref(data);  } -static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)  {    Channel *channel = data;    incref(channel); @@ -362,9 +351,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)    }    size_t count = rbuffer_size(rbuf); -  DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", +  DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",         count, -       rstream); +       stream);    // Feed the unpacker with data    msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -474,7 +463,7 @@ static void handle_request(Channel *channel, msgpack_object *request)    event_data->args = args;    event_data->request_id = request_id;    incref(channel); -  event_push((Event) { +  loop_push_event(&loop, (Event) {      .handler = on_request_event,      .data = event_data    }, defer); @@ -516,10 +505,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer)      return false;    } -  if (channel->is_job) { -    success = job_write(channel->data.job, buffer); -  } else { -    success = wstream_write(channel->data.streams.write, buffer); +  switch (channel->type) { +    case kChannelTypeSocket: +      success = wstream_write(&channel->data.stream, buffer); +      break; +    case kChannelTypeProc: +      success = wstream_write(&channel->data.process.in, buffer); +      break; +    case kChannelTypeStdio: +      success = wstream_write(&channel->data.std.out, buffer); +      break; +    default: +      abort();    }    if (!success) { @@ -628,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event)    xfree(event_string);  } -/// Close the channel streams/job and free the channel resources. +/// Close the channel streams/process and free the channel resources.  static void close_channel(Channel *channel)  {    if (channel->closed) { @@ -637,19 +634,23 @@ static void close_channel(Channel *channel)    channel->closed = true; -  if (channel->is_job) { -    if (channel->data.job) { -      job_stop(channel->data.job); -    } -  } else { -    rstream_free(channel->data.streams.read); -    wstream_free(channel->data.streams.write); -    uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; -    if (handle) { -      uv_close(handle, close_cb); -    } else { -      event_push((Event) { .handler = on_stdio_close, .data = channel }, false); -    } +  switch (channel->type) { +    case kChannelTypeSocket: +      stream_close(&channel->data.stream, close_cb); +      break; +    case kChannelTypeProc: +      if (!channel->data.process.uvproc.process.closed) { +        process_stop(&channel->data.process.uvproc.process); +      } +      break; +    case kChannelTypeStdio: +      stream_close(&channel->data.std.in, NULL); +      stream_close(&channel->data.std.out, NULL); +      loop_push_event(&loop, +          (Event) { .handler = on_stdio_close, .data = channel }, false); +      break; +    default: +      abort();    }    decref(channel); @@ -682,15 +683,15 @@ static void free_channel(Channel *channel)    xfree(channel);  } -static void close_cb(uv_handle_t *handle) +static void close_cb(Stream *stream, void *data)  { -  xfree(handle->data); -  xfree(handle); +  xfree(data);  } -static Channel *register_channel(void) +static Channel *register_channel(ChannelType type)  {    Channel *rv = xmalloc(sizeof(Channel)); +  rv->type = type;    rv->refcount = 1;    rv->closed = false;    rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index df742fe368..104547a7b8 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -5,6 +5,7 @@  #include <uv.h>  #include "nvim/api/private/defs.h" +#include "nvim/event/socket.h"  #include "nvim/vim.h"  #define METHOD_MAXLEN 512 diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h index bf161d54e0..7d9f114140 100644 --- a/src/nvim/msgpack_rpc/helpers.h +++ b/src/nvim/msgpack_rpc/helpers.h @@ -6,7 +6,7 @@  #include <msgpack.h> -#include "nvim/os/wstream.h" +#include "nvim/event/wstream.h"  #include "nvim/api/private/defs.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index 388b5a04cf..8dffea35ca 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -3,11 +3,10 @@  #include <string.h>  #include <stdint.h> -#include <uv.h> -  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/server.h"  #include "nvim/os/os.h" +#include "nvim/event/socket.h"  #include "nvim/ascii.h"  #include "nvim/eval.h"  #include "nvim/garray.h" @@ -19,35 +18,9 @@  #include "nvim/strings.h"  #define MAX_CONNECTIONS 32 -#define ADDRESS_MAX_SIZE 256 -#define NVIM_DEFAULT_TCP_PORT 7450  #define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" -typedef enum { -  kServerTypeTcp, -  kServerTypePipe -} ServerType; - -typedef struct { -  // Pipe/socket path, or TCP address string -  char addr[ADDRESS_MAX_SIZE]; - -  // Type of the union below -  ServerType type; - -  // TCP server or unix socket (named pipe on Windows) -  union { -    struct { -      uv_tcp_t handle; -      struct sockaddr_in addr; -    } tcp; -    struct { -      uv_pipe_t handle; -    } pipe; -  } socket; -} Server; - -static garray_T servers = GA_EMPTY_INIT_VALUE; +static garray_T watchers = GA_EMPTY_INIT_VALUE;  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "msgpack_rpc/server.c.generated.h" @@ -56,7 +29,7 @@ static garray_T servers = GA_EMPTY_INIT_VALUE;  /// Initializes the module  bool server_init(void)  { -  ga_init(&servers, sizeof(Server *), 1); +  ga_init(&watchers, sizeof(SocketWatcher *), 1);    bool must_free = false;    const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); @@ -72,18 +45,10 @@ bool server_init(void)    return ok;  } -/// Retrieve the file handle from a server. -static uv_handle_t *server_handle(Server *server) -{ -  return server->type == kServerTypeTcp -    ? (uv_handle_t *)&server->socket.tcp.handle -    : (uv_handle_t *) &server->socket.pipe.handle; -} -  /// Teardown a single server -static void server_close_cb(Server **server) +static void close_socket_watcher(SocketWatcher **watcher)  { -  uv_close(server_handle(*server), free_server); +  socket_watcher_close(*watcher, free_server);  }  /// Set v:servername to the first server in the server list, or unset it if no @@ -91,7 +56,7 @@ static void server_close_cb(Server **server)  static void set_vservername(garray_T *srvs)  {    char *default_server = (srvs->ga_len > 0) -    ? ((Server **)srvs->ga_data)[0]->addr +    ? ((SocketWatcher **)srvs->ga_data)[0]->addr      : NULL;    set_vim_var_string(VV_SEND_SERVER, (char_u *)default_server, -1);  } @@ -99,7 +64,7 @@ static void set_vservername(garray_T *srvs)  /// Teardown the server module  void server_teardown(void)  { -  GA_DEEP_CLEAR(&servers, Server *, server_close_cb); +  GA_DEEP_CLEAR(&watchers, SocketWatcher *, close_socket_watcher);  }  /// Starts listening for API calls on the TCP address or pipe path `endpoint`. @@ -116,120 +81,38 @@ void server_teardown(void)  int server_start(const char *endpoint)    FUNC_ATTR_NONNULL_ALL  { -  char addr[ADDRESS_MAX_SIZE]; - -  // Trim to `ADDRESS_MAX_SIZE` -  if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { -    // TODO(aktau): since this is not what the user wanted, perhaps we -    // should return an error here -    WLOG("Address was too long, truncated to %s", addr); -  } - -  // Check if the server already exists -  for (int i = 0; i < servers.ga_len; i++) { -    if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) { -      ELOG("Already listening on %s", addr); +  SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher)); +  socket_watcher_init(&loop, watcher, endpoint, NULL); + +  // Check if a watcher for the endpoint already exists +  for (int i = 0; i < watchers.ga_len; i++) { +    if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { +      ELOG("Already listening on %s", watcher->addr); +      socket_watcher_close(watcher, free_server);        return 1;      }    } -  ServerType server_type = kServerTypeTcp; -  Server *server = xmalloc(sizeof(Server)); -  char ip[16], *ip_end = strrchr(addr, ':'); - -  if (!ip_end) { -    ip_end = strchr(addr, NUL); -  } - -  // (ip_end - addr) is always > 0, so convert to size_t -  size_t addr_len = (size_t)(ip_end - addr); - -  if (addr_len > sizeof(ip) - 1) { -    // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) -    addr_len = sizeof(ip) - 1; -  } - -  // Extract the address part -  xstrlcpy(ip, addr, addr_len + 1); - -  int port = NVIM_DEFAULT_TCP_PORT; - -  if (*ip_end == ':') { -    // Extract the port -    long lport = strtol(ip_end + 1, NULL, 10); // NOLINT -    if (lport <= 0 || lport > 0xffff) { -      // Invalid port, treat as named pipe or unix socket -      server_type = kServerTypePipe; -    } else { -      port = (int) lport; -    } -  } - -  if (server_type == kServerTypeTcp) { -    // Try to parse ip address -    if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { -      // Invalid address, treat as named pipe or unix socket -      server_type = kServerTypePipe; -    } -  } - -  int result; -  uv_stream_t *stream = NULL; - -  xstrlcpy(server->addr, addr, sizeof(server->addr)); - -  if (server_type == kServerTypeTcp) { -    // Listen on tcp address/port -    uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); -    result = uv_tcp_bind(&server->socket.tcp.handle, -                         (const struct sockaddr *)&server->socket.tcp.addr, -                         0); -    stream = (uv_stream_t *)&server->socket.tcp.handle; -  } else { -    // Listen on named pipe or unix socket -    uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); -    result = uv_pipe_bind(&server->socket.pipe.handle, server->addr); -    stream = (uv_stream_t *)&server->socket.pipe.handle; -  } - -  stream->data = server; - -  if (result == 0) { -    result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, -                       MAX_CONNECTIONS, -                       connection_cb); -  } - -  assert(result <= 0);  // libuv should have returned -errno or zero. +  int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb);    if (result < 0) { -    if (result == -EACCES) { -      // Libuv converts ENOENT to EACCES for Windows compatibility, but if -      // the parent directory does not exist, ENOENT would be more accurate. -      *path_tail((char_u *) addr) = NUL; -      if (!os_file_exists((char_u *) addr)) { -        result = -ENOENT; -      } -    } -    uv_close((uv_handle_t *)stream, free_server);      ELOG("Failed to start server: %s", uv_strerror(result)); +    socket_watcher_close(watcher, free_server);      return result;    }    // Update $NVIM_LISTEN_ADDRESS, if not set.    const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);    if (listen_address == NULL) { -    os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1); +    os_setenv(LISTEN_ADDRESS_ENV_VAR, watcher->addr, 1);    } -  server->type = server_type; - -  // Add the server to the list. -  ga_grow(&servers, 1); -  ((Server **)servers.ga_data)[servers.ga_len++] = server; +  // Add the watcher to the list. +  ga_grow(&watchers, 1); +  ((SocketWatcher **)watchers.ga_data)[watchers.ga_len++] = watcher;    // Update v:servername, if not set.    if (STRLEN(get_vim_var_str(VV_SEND_SERVER)) == 0) { -    set_vservername(&servers); +    set_vservername(&watchers);    }    return 0; @@ -240,21 +123,21 @@ int server_start(const char *endpoint)  /// @param endpoint Address of the server.  void server_stop(char *endpoint)  { -  Server *server; +  SocketWatcher *watcher;    char addr[ADDRESS_MAX_SIZE];    // Trim to `ADDRESS_MAX_SIZE`    xstrlcpy(addr, endpoint, sizeof(addr));    int i = 0;  // Index of the server whose address equals addr. -  for (; i < servers.ga_len; i++) { -    server = ((Server **)servers.ga_data)[i]; -    if (strcmp(addr, server->addr) == 0) { +  for (; i < watchers.ga_len; i++) { +    watcher = ((SocketWatcher **)watchers.ga_data)[i]; +    if (strcmp(addr, watcher->addr) == 0) {        break;      }    } -  if (i >= servers.ga_len) { +  if (i >= watchers.ga_len) {      ELOG("Not listening on %s", addr);      return;    } @@ -265,18 +148,18 @@ void server_stop(char *endpoint)      os_unsetenv(LISTEN_ADDRESS_ENV_VAR);    } -  uv_close(server_handle(server), free_server); +  socket_watcher_close(watcher, free_server);    // Remove this server from the list by swapping it with the last item. -  if (i != servers.ga_len - 1) { -    ((Server **)servers.ga_data)[i] = -      ((Server **)servers.ga_data)[servers.ga_len - 1]; +  if (i != watchers.ga_len - 1) { +    ((SocketWatcher **)watchers.ga_data)[i] = +      ((SocketWatcher **)watchers.ga_data)[watchers.ga_len - 1];    } -  servers.ga_len--; +  watchers.ga_len--;    // If v:servername is the stopped address, re-initialize it.    if (STRCMP(addr, get_vim_var_str(VV_SEND_SERVER)) == 0) { -    set_vservername(&servers); +    set_vservername(&watchers);    }  } @@ -285,52 +168,28 @@ void server_stop(char *endpoint)  char **server_address_list(size_t *size)    FUNC_ATTR_NONNULL_ALL  { -  if ((*size = (size_t) servers.ga_len) == 0) { +  if ((*size = (size_t)watchers.ga_len) == 0) {      return NULL;    } -  char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char *)); -  for (int i = 0; i < servers.ga_len; i++) { -    addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr); +  char **addrs = xcalloc((size_t)watchers.ga_len, sizeof(const char *)); +  for (int i = 0; i < watchers.ga_len; i++) { +    addrs[i] = xstrdup(((SocketWatcher **)watchers.ga_data)[i]->addr);    }    return addrs;  } -static void connection_cb(uv_stream_t *server, int status) +static void connection_cb(SocketWatcher *watcher, int result, void *data)  { -  int result; -  uv_stream_t *client; -  Server *srv = server->data; - -  if (status < 0) { -    abort(); -  } - -  if (srv->type == kServerTypeTcp) { -    client = xmalloc(sizeof(uv_tcp_t)); -    uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); -  } else { -    client = xmalloc(sizeof(uv_pipe_t)); -    uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); -  } - -  result = uv_accept(server, client); -    if (result) {      ELOG("Failed to accept connection: %s", uv_strerror(result)); -    uv_close((uv_handle_t *)client, free_client);      return;    } -  channel_from_stream(client); -} - -static void free_client(uv_handle_t *handle) -{ -  xfree(handle); +  channel_from_connection(watcher);  } -static void free_server(uv_handle_t *handle) +static void free_server(SocketWatcher *watcher, void *data)  { -  xfree(handle->data); +  xfree(watcher);  } diff --git a/src/nvim/normal.c b/src/nvim/normal.c index 92734e404a..b66bc31b74 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -61,7 +61,7 @@  #include "nvim/mouse.h"  #include "nvim/undo.h"  #include "nvim/window.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  #include "nvim/os/time.h"  /* @@ -487,12 +487,12 @@ normal_cmd (    /*     * Get the command character from the user.     */ -  event_enable_deferred(); +  loop_enable_deferred_events(&loop);    c = safe_vgetc(); -  event_disable_deferred(); +  loop_disable_deferred_events(&loop);    if (c == K_EVENT) { -    event_process(); +    loop_process_event(&loop);      return;    } diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c deleted file mode 100644 index 56874b495d..0000000000 --- a/src/nvim/os/event.c +++ /dev/null @@ -1,177 +0,0 @@ -#include <assert.h> -#include <stdint.h> -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/event.h" -#include "nvim/os/input.h" -#include "nvim/msgpack_rpc/defs.h" -#include "nvim/msgpack_rpc/channel.h" -#include "nvim/msgpack_rpc/server.h" -#include "nvim/msgpack_rpc/helpers.h" -#include "nvim/os/signal.h" -#include "nvim/os/rstream.h" -#include "nvim/os/wstream.h" -#include "nvim/os/job.h" -#include "nvim/vim.h" -#include "nvim/memory.h" -#include "nvim/misc2.h" -#include "nvim/ui.h" -#include "nvim/screen.h" -#include "nvim/terminal.h" - -#include "nvim/lib/klist.h" - -// event will be cleaned up after it gets processed -#define _destroy_event(x)  // do nothing -KLIST_INIT(Event, Event, _destroy_event) - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/event.c.generated.h" -#endif -// deferred_events:  Events that should be processed as the K_EVENT special key -// immediate_events: Events that should be processed after exiting libuv event -//                   loop(to avoid recursion), but before returning from -//                   `event_poll` -static klist_t(Event) *deferred_events = NULL, *immediate_events = NULL; -static int deferred_events_allowed = 0; - -void event_init(void) -{ -  // Initialize the event queues -  deferred_events = kl_init(Event); -  immediate_events = kl_init(Event); -  // early msgpack-rpc initialization -  msgpack_rpc_init_method_table(); -  msgpack_rpc_helpers_init(); -  // Initialize input events -  input_init(); -  // Timer to wake the event loop if a timeout argument is passed to -  // `event_poll` -  // Signals -  signal_init(); -  // Jobs -  job_init(); -  // finish mspgack-rpc initialization -  channel_init(); -  server_init(); -  terminal_init(); -} - -void event_teardown(void) -{ -  if (!deferred_events) { -    // Not initialized(possibly a --version invocation) -    return; -  } - -  process_events_from(immediate_events); -  process_events_from(deferred_events); -  input_stop(); -  channel_teardown(); -  job_teardown(); -  server_teardown(); -  signal_teardown(); -  terminal_teardown(); - -  // this last `uv_run` will return after all handles are stopped, it will -  // also take care of finishing any uv_close calls made by other *_teardown -  // functions. -  do { -    uv_run(uv_default_loop(), UV_RUN_DEFAULT); -  } while (uv_loop_close(uv_default_loop())); -} - -// Wait for some event -void event_poll(int ms) -{ -  static int recursive = 0; - -  if (recursive++) { -    abort();  // Should not re-enter uv_run -  } - -  uv_run_mode run_mode = UV_RUN_ONCE; -  uv_timer_t timer; - -  if (ms > 0) { -    uv_timer_init(uv_default_loop(), &timer); -    // Use a repeating timeout of ms milliseconds to make sure -    // we do not block indefinitely for I/O. -    uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms); -  } else if (ms == 0) { -    // For ms == 0, we need to do a non-blocking event poll by -    // setting the run mode to UV_RUN_NOWAIT. -    run_mode = UV_RUN_NOWAIT; -  } - -  loop(run_mode); - -  if (ms > 0) { -    // Ensure the timer handle is closed and run the event loop -    // once more to let libuv perform it's cleanup -    uv_timer_stop(&timer); -    uv_close((uv_handle_t *)&timer, NULL); -    loop(UV_RUN_NOWAIT); -  } - -  recursive--;  // Can re-enter uv_run now - -  // In case this is run before event_init, don't process any events. -  if (immediate_events) { -    process_events_from(immediate_events); -  } -} - -bool event_has_deferred(void) -{ -  return deferred_events_allowed && !kl_empty(deferred_events); -} - -void event_enable_deferred(void) -{ -  ++deferred_events_allowed; -} - -void event_disable_deferred(void) -{ -  --deferred_events_allowed; -} - -// Queue an event -void event_push(Event event, bool deferred) -{ -  // Sometimes libuv will run pending callbacks(timer for example) before -  // blocking for a poll. If this happens and the callback pushes a event to one -  // of the queues, the event would only be processed after the poll -  // returns(user hits a key for example). To avoid this scenario, we call -  // uv_stop when a event is enqueued. -  uv_stop(uv_default_loop()); -  kl_push(Event, deferred ? deferred_events : immediate_events, event); -} - -void event_process(void) -{ -  process_events_from(deferred_events); -} - -static void process_events_from(klist_t(Event) *queue) -{ -  while (!kl_empty(queue)) { -    Event event = kl_shift(Event, queue); -    event.handler(event); -  } -} - -static void timer_cb(uv_timer_t *handle) -{ -} - -static void loop(uv_run_mode run_mode) -{ -  DLOG("Enter event loop"); -  uv_run(uv_default_loop(), run_mode); -  DLOG("Exit event loop"); -} diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h deleted file mode 100644 index 2dd9403d9f..0000000000 --- a/src/nvim/os/event_defs.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef NVIM_OS_EVENT_DEFS_H -#define NVIM_OS_EVENT_DEFS_H - -#include <stdbool.h> - -#include "nvim/os/job_defs.h" -#include "nvim/os/rstream_defs.h" - -typedef struct event Event; -typedef void (*event_handler)(Event event); - -struct event { -  void *data; -  event_handler handler; -}; - -#endif  // NVIM_OS_EVENT_DEFS_H diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 726335bd9a..b0e0f57e60 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -6,9 +6,8 @@  #include "nvim/api/private/defs.h"  #include "nvim/os/input.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/rstream.h" +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h"  #include "nvim/ascii.h"  #include "nvim/vim.h"  #include "nvim/ui.h" @@ -30,8 +29,8 @@ typedef enum {    kInputEof  } InbufPollResult; -static RStream *read_stream = NULL; -static RBuffer *read_buffer = NULL, *input_buffer = NULL; +static Stream read_stream = {.closed = true}; +static RBuffer *input_buffer = NULL;  static bool input_eof = false;  static int global_fd = 0; @@ -54,26 +53,23 @@ int input_global_fd(void)  void input_start(int fd)  { -  if (read_stream) { +  if (!read_stream.closed) {      return;    }    global_fd = fd; -  read_buffer = rbuffer_new(READ_BUFFER_SIZE); -  read_stream = rstream_new(read_cb, read_buffer, NULL); -  rstream_set_file(read_stream, fd); -  rstream_start(read_stream); +  rstream_init_fd(&loop, &read_stream, fd, READ_BUFFER_SIZE, NULL); +  rstream_start(&read_stream, read_cb);  }  void input_stop(void)  { -  if (!read_stream) { +  if (read_stream.closed) {      return;    } -  rstream_stop(read_stream); -  rstream_free(read_stream); -  read_stream = NULL; +  rstream_stop(&read_stream); +  stream_close(&read_stream, NULL);  }  // Low level input function @@ -115,7 +111,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)    }    // If there are deferred events, return the keys directly -  if (event_has_deferred()) { +  if (loop_has_deferred_events(&loop)) {      return push_event_key(buf, maxlen);    } @@ -136,7 +132,7 @@ bool os_char_avail(void)  void os_breakcheck(void)  {    if (!disable_breakcheck && !got_int) { -    event_poll(0); +    loop_poll_events(&loop, 0);    }  } @@ -285,7 +281,7 @@ static bool input_poll(int ms)      prof_inchar_enter();    } -  event_poll_until(ms, input_ready() || input_eof); +  LOOP_POLL_EVENTS_UNTIL(&loop, ms, input_ready() || input_eof);    if (do_profiling == PROF_YES && ms) {      prof_inchar_exit(); @@ -309,16 +305,16 @@ static InbufPollResult inbuf_poll(int ms)    return input_eof ? kInputEof : kInputNone;  } -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool at_eof) +static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof)  {    if (at_eof) {      input_eof = true;    } -  assert(rbuffer_space(input_buffer) >= rbuffer_size(read_buffer)); -  RBUFFER_UNTIL_EMPTY(read_buffer, ptr, len) { +  assert(rbuffer_space(input_buffer) >= rbuffer_size(buf)); +  RBUFFER_UNTIL_EMPTY(buf, ptr, len) {      (void)rbuffer_write(input_buffer, ptr, len); -    rbuffer_consumed(read_buffer, len); +    rbuffer_consumed(buf, len);    }  } @@ -362,7 +358,7 @@ static bool input_ready(void)  {    return typebuf_was_filled ||                 // API call filled typeahead           rbuffer_size(input_buffer) ||         // Input buffer filled -         event_has_deferred();                 // Events must be processed +         loop_has_deferred_events(&loop);      // Events must be processed  }  // Exit because of an input read error. diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c deleted file mode 100644 index f9bde21361..0000000000 --- a/src/nvim/os/job.c +++ /dev/null @@ -1,467 +0,0 @@ -#include <stdint.h> -#include <stdbool.h> - -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/job_private.h" -#include "nvim/os/pty_process.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/event.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/time.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#ifdef HAVE_SYS_WAIT_H -# include <sys/wait.h> -#endif - -// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit -// before we send SIGNAL to it -#define TERM_TIMEOUT 1000000000 -#define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define JOB_BUFFER_SIZE 0xFFFF - -#define close_job_stream(job, stream, type)                                \ -  do {                                                                     \ -    if (job->stream) {                                                     \ -      type##stream_free(job->stream);                                      \ -      job->stream = NULL;                                                  \ -      if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) {          \ -        uv_close((uv_handle_t *)job->proc_std##stream, close_cb);          \ -      }                                                                    \ -    }                                                                      \ -  } while (0) - -#define close_job_in(job) close_job_stream(job, in, w) -#define close_job_out(job) close_job_stream(job, out, r) -#define close_job_err(job) close_job_stream(job, err, r) - -Job *table[MAX_RUNNING_JOBS] = {NULL}; -size_t stop_requests = 0; -uv_timer_t job_stop_timer; -uv_signal_t schld; - -// Some helpers shared in this module - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/job.c.generated.h" -#endif -// Callbacks for libuv - -/// Initializes job control resources -void job_init(void) -{ -  uv_disable_stdio_inheritance(); -  uv_timer_init(uv_default_loop(), &job_stop_timer); -  uv_signal_init(uv_default_loop(), &schld); -  uv_signal_start(&schld, chld_handler, SIGCHLD); -} - -/// Releases job control resources and terminates running jobs -void job_teardown(void) -{ -  // Stop all jobs -  for (int i = 0; i < MAX_RUNNING_JOBS; i++) { -    Job *job; -    if ((job = table[i]) != NULL) { -      uv_kill(job->pid, SIGTERM); -      job->term_sent = true; -      job_stop(job); -    } -  } - -  // Wait until all jobs are closed -  event_poll_until(-1, !stop_requests); -  uv_signal_stop(&schld); -  uv_close((uv_handle_t *)&schld, NULL); -  // Close the timer -  uv_close((uv_handle_t *)&job_stop_timer, NULL); -} - -/// Tries to start a new job. -/// -/// @param[out] status The job id if the job started successfully, 0 if the job -///             table is full, -1 if the program could not be executed. -/// @return The job pointer if the job started successfully, NULL otherwise -Job *job_start(JobOptions opts, int *status) -{ -  int i; -  Job *job; - -  // Search for a free slot in the table -  for (i = 0; i < MAX_RUNNING_JOBS; i++) { -    if (table[i] == NULL) { -      break; -    } -  } - -  if (i == MAX_RUNNING_JOBS) { -    // No free slots -    shell_free_argv(opts.argv); -    *status = 0; -    return NULL; -  } - -  job = xmalloc(sizeof(Job)); -  // Initialize -  job->id = i + 1; -  *status = job->id; -  job->status = -1; -  job->refcount = 1; -  job->stopped_time = 0; -  job->term_sent = false; -  job->in = NULL; -  job->out = NULL; -  job->err = NULL; -  job->opts = opts; -  job->closed = false; - -  process_init(job); - -  if (opts.writable) { -    handle_set_job((uv_handle_t *)job->proc_stdin, job); -    job->refcount++; -  } - -  if (opts.stdout_cb) { -    handle_set_job((uv_handle_t *)job->proc_stdout, job); -    job->refcount++; -  } - -  if (opts.stderr_cb) { -    handle_set_job((uv_handle_t *)job->proc_stderr, job); -    job->refcount++; -  } - -  // Spawn the job -  if (!process_spawn(job)) { -    if (opts.writable) { -      uv_close((uv_handle_t *)job->proc_stdin, close_cb); -    } -    if (opts.stdout_cb) { -      uv_close((uv_handle_t *)job->proc_stdout, close_cb); -    } -    if (opts.stderr_cb) { -      uv_close((uv_handle_t *)job->proc_stderr, close_cb); -    } -    process_close(job); -    event_poll(0); -    // Manually invoke the close_cb to free the job resources -    *status = -1; -    return NULL; -  } - -  if (opts.writable) { -    job->in = wstream_new(opts.maxmem); -    wstream_set_stream(job->in, job->proc_stdin); -  } - -  // Start the readable streams -  if (opts.stdout_cb) { -    job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); -    rstream_set_stream(job->out, job->proc_stdout); -    rstream_start(job->out); -  } - -  if (opts.stderr_cb) { -    job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); -    rstream_set_stream(job->err, job->proc_stderr); -    rstream_start(job->err); -  } -  // Save the job to the table -  table[i] = job; - -  return job; -} - -/// Finds a job instance by id -/// -/// @param id The job id -/// @return the Job instance -Job *job_find(int id) -{ -  Job *job; - -  if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1]) -      || job->stopped_time) { -    return NULL; -  } - -  return job; -} - -/// Terminates a job. This is a non-blocking operation, but if the job exists -/// it's guaranteed to succeed(SIGKILL will eventually be sent) -/// -/// @param job The Job instance -void job_stop(Job *job) -{ -  if (job->stopped_time) { -    return; -  } - -  job->stopped_time = os_hrtime(); -  if (job->opts.pty) { -    // close all streams for pty jobs to send SIGHUP to the process -    job_close_streams(job); -    pty_process_close_master(job); -  } else { -    // Close the job's stdin. If the job doesn't close its own stdout/stderr, -    // they will be closed when the job exits(possibly due to being terminated -    // after a timeout) -    close_job_in(job); -  } - -  if (!stop_requests++) { -    // When there's at least one stop request pending, start a timer that -    // will periodically check if a signal should be send to a to the job -    DLOG("Starting job kill timer"); -    uv_timer_start(&job_stop_timer, job_stop_timer_cb, 100, 100); -  } -} - -/// job_wait - synchronously wait for a job to finish -/// -/// @param job The job instance -/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for -///        waiting until the job quits. -/// @return returns the status code of the exited job. -1 if the job is -///         still running and the `timeout` has expired. Note that this is -///         indistinguishable from the process returning -1 by itself. Which -///         is possible on some OS. Returns -2 if the job was interrupted. -int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL -{ -  // The default status is -1, which represents a timeout -  int status = -1; -  bool interrupted = false; - -  // Increase refcount to stop the job from being freed before we have a -  // chance to get the status. -  job->refcount++; -  event_poll_until(ms, -      // Until... -      got_int ||                // interrupted by the user -      job->refcount == 1);  // job exited - -  // we'll assume that a user frantically hitting interrupt doesn't like -  // the current job. Signal that it has to be killed. -  if (got_int) { -    interrupted = true; -    got_int = false; -    job_stop(job); -    if (ms == -1) { -      // We can only return, if all streams/handles are closed and the job -      // exited. -      event_poll_until(-1, job->refcount == 1); -    } else { -      event_poll(0); -    } -  } - -  if (job->refcount == 1) { -    // Job exited, collect status and manually invoke close_cb to free the job -    // resources -    status = interrupted ? -2 : job->status; -    job_close_streams(job); -    job_decref(job); -  } else { -    job->refcount--; -  } - -  return status; -} - -/// Close the pipe used to write to the job. -/// -/// This can be used for example to indicate to the job process that no more -/// input is coming, and that it should shut down cleanly. -/// -/// It has no effect when the input pipe doesn't exist or was already -/// closed. -/// -/// @param job The job instance -void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL -{ -  close_job_in(job); -} - -// Close the job stdout stream. -void job_close_out(Job *job) FUNC_ATTR_NONNULL_ALL -{ -  close_job_out(job); -} - -// Close the job stderr stream. -void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL -{ -  close_job_out(job); -} - -/// All writes that complete after calling this function will be reported -/// to `cb`. -/// -/// Use this function to be notified about the status of an in-flight write. -/// -/// @see {wstream_set_write_cb} -/// -/// @param job The job instance -/// @param cb The function that will be called on write completion or -///        failure. It will be called with the job as the `data` argument. -void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL -{ -  wstream_set_write_cb(job->in, cb, job); -} - -/// Writes data to the job's stdin. This is a non-blocking operation, it -/// returns when the write request was sent. -/// -/// @param job The Job instance -/// @param buffer The buffer which contains the data to be written -/// @return true if the write request was successfully sent, false if writing -///         to the job stream failed (possibly because the OS buffer is full) -bool job_write(Job *job, WBuffer *buffer) -{ -  return wstream_write(job->in, buffer); -} - -/// Get the job id -/// -/// @param job A pointer to the job -/// @return The job id -int job_id(Job *job) -{ -  return job->id; -} - -// Get the job pid -int job_pid(Job *job) -{ -  return job->pid; -} - -/// Get data associated with a job -/// -/// @param job A pointer to the job -/// @return The job data -void *job_data(Job *job) -{ -  return job->opts.data; -} - -/// Resize the window for a pty job -bool job_resize(Job *job, uint16_t width, uint16_t height) -{ -  if (!job->opts.pty) { -    return false; -  } -  pty_process_resize(job, width, height); -  return true; -} - -void job_close_streams(Job *job) -{ -  close_job_in(job); -  close_job_out(job); -  close_job_err(job); -} - -JobOptions *job_opts(Job *job) -{ -  return &job->opts; -} - -/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those -/// that didn't die from SIGTERM after a while(exit_timeout is 0). -static void job_stop_timer_cb(uv_timer_t *handle) -{ -  Job *job; -  uint64_t now = os_hrtime(); - -  for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) { -    if ((job = table[i]) == NULL || !job->stopped_time) { -      continue; -    } - -    uint64_t elapsed = now - job->stopped_time; - -    if (!job->term_sent && elapsed >= TERM_TIMEOUT) { -      ILOG("Sending SIGTERM to job(id: %d)", job->id); -      uv_kill(job->pid, SIGTERM); -      job->term_sent = true; -    } else if (elapsed >= KILL_TIMEOUT) { -      ILOG("Sending SIGKILL to job(id: %d)", job->id); -      uv_kill(job->pid, SIGKILL); -      process_close(job); -    } -  } -} - -// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) -{ -  Job *job = data; - -  if (rstream == job->out) { -    job->opts.stdout_cb(rstream, buf, data, eof); -    if (eof) { -      close_job_out(job); -    } -  } else { -    job->opts.stderr_cb(rstream, buf, data, eof); -    if (eof) { -      close_job_err(job); -    } -  } -} - -static void close_cb(uv_handle_t *handle) -{ -  job_decref(handle_get_job(handle)); -} - -static void job_exited(Event event) -{ -  Job *job = event.data; -  process_close(job); -} - -static void chld_handler(uv_signal_t *handle, int signum) -{ -  int stat = 0; -  int pid; - -  do { -    pid = waitpid(-1, &stat, WNOHANG); -  } while (pid < 0 && errno == EINTR); - -  if (pid <= 0) { -    return; -  } - -  Job *job = NULL; -  // find the job corresponding to the exited pid -  for (int i = 0; i < MAX_RUNNING_JOBS; i++) { -    if ((job = table[i]) != NULL && job->pid == pid) { -      if (WIFEXITED(stat)) { -        job->status = WEXITSTATUS(stat); -      } else if (WIFSIGNALED(stat)) { -        job->status = WTERMSIG(stat); -      } -      if (exiting) { -        // don't enqueue more events when exiting -        process_close(job); -      } else { -        event_push((Event) {.handler = job_exited, .data = job}, false); -      } -      break; -    } -  } -} - diff --git a/src/nvim/os/job.h b/src/nvim/os/job.h deleted file mode 100644 index e0ca615626..0000000000 --- a/src/nvim/os/job.h +++ /dev/null @@ -1,21 +0,0 @@ -// Job is a short name we use to refer to child processes that run in parallel -// with the editor, probably executing long-running tasks and sending updates -// asynchronously. Communication happens through anonymous pipes connected to -// the job's std{in,out,err}. They are more like bash/zsh co-processes than the -// usual shell background job. The name 'Job' was chosen because it applies to -// the concept while being significantly shorter. -#ifndef NVIM_OS_JOB_H -#define NVIM_OS_JOB_H - -#include <stdint.h> -#include <stdbool.h> - -#include "nvim/os/rstream_defs.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/job.h.generated.h" -#endif -#endif  // NVIM_OS_JOB_H diff --git a/src/nvim/os/job_defs.h b/src/nvim/os/job_defs.h deleted file mode 100644 index 7fee900ac0..0000000000 --- a/src/nvim/os/job_defs.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef NVIM_OS_JOB_DEFS_H -#define NVIM_OS_JOB_DEFS_H - -#include <uv.h> -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream_defs.h" - -#define MAX_RUNNING_JOBS 100 -typedef struct job Job; - -/// Function called when the job reads data -/// -/// @param id The job id -/// @param data Some data associated with the job by the caller -typedef void (*job_exit_cb)(Job *job, int status, void *data); - -// Job startup options -// job_exit_cb Callback that will be invoked when the job exits -// maxmem Maximum amount of memory used by the job WStream -typedef struct { -  // Argument vector for the process. The first item is the -  // executable to run. -  // [consumed] -  char **argv; -  // Caller data that will be associated with the job -  void *data; -  // If true the job stdin will be available for writing with job_write, -  // otherwise it will be redirected to /dev/null -  bool writable; -  // Callback that will be invoked when data is available on stdout. If NULL -  // stdout will be redirected to /dev/null. -  rstream_cb stdout_cb; -  // Callback that will be invoked when data is available on stderr. If NULL -  // stderr will be redirected to /dev/null. -  rstream_cb  stderr_cb; -  // Callback that will be invoked when the job has exited and will not send -  // data -  job_exit_cb exit_cb; -  // Maximum memory used by the job's WStream -  size_t maxmem; -  // Connect the job to a pseudo terminal -  bool pty; -  // Initial window dimensions if the job is connected to a pseudo terminal -  uint16_t width, height; -  // Value for the $TERM environment variable. A default value of "ansi" is -  // assumed if NULL -  char *term_name; -} JobOptions; - -#define JOB_OPTIONS_INIT ((JobOptions) {                     \ -    .argv = NULL,                                            \ -    .data = NULL,                                            \ -    .writable = true,                                        \ -    .stdout_cb = NULL,                                       \ -    .stderr_cb = NULL,                                       \ -    .exit_cb = NULL,                                         \ -    .maxmem = 0,                                             \ -    .pty = false,                                            \ -    .width = 80,                                             \ -    .height = 24,                                            \ -    .term_name = NULL                                        \ -    }) -#endif  // NVIM_OS_JOB_DEFS_H diff --git a/src/nvim/os/job_private.h b/src/nvim/os/job_private.h deleted file mode 100644 index 983106d918..0000000000 --- a/src/nvim/os/job_private.h +++ /dev/null @@ -1,118 +0,0 @@ -#ifndef NVIM_OS_JOB_PRIVATE_H -#define NVIM_OS_JOB_PRIVATE_H - -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/pipe_process.h" -#include "nvim/os/pty_process.h" -#include "nvim/os/shell.h" -#include "nvim/log.h" -#include "nvim/memory.h" - -struct job { -  // Job id the index in the job table plus one. -  int id; -  // Process id -  int pid; -  // Exit status code of the job process -  int status; -  // Number of references to the job. The job resources will only be freed by -  // close_cb when this is 0 -  int refcount; -  // Time when job_stop was called for the job. -  uint64_t stopped_time; -  // If SIGTERM was already sent to the job(only send one before SIGKILL) -  bool term_sent; -  // Readable streams(std{out,err}) -  RStream *out, *err; -  // Writable stream(stdin) -  WStream *in; -  // Libuv streams representing stdin/stdout/stderr -  uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr; -  // Extra data set by the process spawner -  void *process; -  // If process_close has been called on this job -  bool closed; -  // Startup options -  JobOptions opts; -}; - -extern Job *table[]; -extern size_t stop_requests; -extern uv_timer_t job_stop_timer; - -static inline bool process_spawn(Job *job) -{ -  return job->opts.pty ? pty_process_spawn(job) : pipe_process_spawn(job); -} - -static inline void process_init(Job *job) -{ -  if (job->opts.pty) { -    pty_process_init(job); -  } else { -    pipe_process_init(job); -  } -} - -static inline void process_close(Job *job) -{ -  if (job->closed) { -    return; -  } -  job->closed = true; -  if (job->opts.pty) { -    pty_process_close(job); -  } else { -    pipe_process_close(job); -  } -} - -static inline void process_destroy(Job *job) -{ -  if (job->opts.pty) { -    pty_process_destroy(job); -  } else { -    pipe_process_destroy(job); -  } -} - -static inline void job_exit_callback(Job *job) -{ -  // Free the slot now, 'exit_cb' may want to start another job to replace -  // this one -  table[job->id - 1] = NULL; - -  if (job->opts.exit_cb) { -    // Invoke the exit callback -    job->opts.exit_cb(job, job->status, job->opts.data); -  } - -  if (stop_requests && !--stop_requests) { -    // Stop the timer if no more stop requests are pending -    DLOG("Stopping job kill timer"); -    uv_timer_stop(&job_stop_timer); -  } -} - -static inline void job_decref(Job *job) -{ -  if (--job->refcount == 0) { -    // Invoke the exit_cb -    job_exit_callback(job); -    // Free all memory allocated for the job -    xfree(job->proc_stdin->data); -    xfree(job->proc_stdout->data); -    xfree(job->proc_stderr->data); -    shell_free_argv(job->opts.argv); -    process_destroy(job); -    xfree(job); -  } -} - - -#endif  // NVIM_OS_JOB_PRIVATE_H diff --git a/src/nvim/os/os.h b/src/nvim/os/os.h index 3dd099890c..69bd1ff4fd 100644 --- a/src/nvim/os/os.h +++ b/src/nvim/os/os.h @@ -12,7 +12,6 @@  # include "os/mem.h.generated.h"  # include "os/env.h.generated.h"  # include "os/users.h.generated.h" -# include "os/stream.h.generated.h"  #endif  #endif  // NVIM_OS_OS_H diff --git a/src/nvim/os/pipe_process.c b/src/nvim/os/pipe_process.c deleted file mode 100644 index 2ac305e967..0000000000 --- a/src/nvim/os/pipe_process.c +++ /dev/null @@ -1,110 +0,0 @@ -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" -#include "nvim/os/job_private.h" -#include "nvim/os/pipe_process.h" -#include "nvim/memory.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pipe_process.c.generated.h" -#endif - -typedef struct { -  // Structures for process spawning/management used by libuv -  uv_process_t proc; -  uv_process_options_t proc_opts; -  uv_stdio_container_t stdio[3]; -  uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -} UvProcess; - -void pipe_process_init(Job *job) -{ -  UvProcess *pipeproc = xmalloc(sizeof(UvProcess)); -  pipeproc->proc_opts.file = job->opts.argv[0]; -  pipeproc->proc_opts.args = job->opts.argv; -  pipeproc->proc_opts.stdio = pipeproc->stdio; -  pipeproc->proc_opts.stdio_count = 3; -  pipeproc->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; -  pipeproc->proc_opts.exit_cb = exit_cb; -  pipeproc->proc_opts.cwd = NULL; -  pipeproc->proc_opts.env = NULL; -  pipeproc->proc.data = NULL; -  pipeproc->proc_stdin.data = NULL; -  pipeproc->proc_stdout.data = NULL; -  pipeproc->proc_stderr.data = NULL; - -  // Initialize the job std{in,out,err} -  pipeproc->stdio[0].flags = UV_IGNORE; -  pipeproc->stdio[1].flags = UV_IGNORE; -  pipeproc->stdio[2].flags = UV_IGNORE; - -  handle_set_job((uv_handle_t *)&pipeproc->proc, job); - -  if (job->opts.writable) { -    uv_pipe_init(uv_default_loop(), &pipeproc->proc_stdin, 0); -    pipeproc->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; -    pipeproc->stdio[0].data.stream = (uv_stream_t *)&pipeproc->proc_stdin; -  } - -  if (job->opts.stdout_cb) { -    uv_pipe_init(uv_default_loop(), &pipeproc->proc_stdout, 0); -    pipeproc->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; -    pipeproc->stdio[1].data.stream = (uv_stream_t *)&pipeproc->proc_stdout; -  } - -  if (job->opts.stderr_cb) { -    uv_pipe_init(uv_default_loop(), &pipeproc->proc_stderr, 0); -    pipeproc->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; -    pipeproc->stdio[2].data.stream = (uv_stream_t *)&pipeproc->proc_stderr; -  } - -  job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin; -  job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout; -  job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr; -  job->process = pipeproc; -} - -void pipe_process_destroy(Job *job) -{ -  UvProcess *pipeproc = job->process; -  xfree(pipeproc->proc.data); -  xfree(pipeproc); -  job->process = NULL; -} - -bool pipe_process_spawn(Job *job) -{ -  UvProcess *pipeproc = job->process; - -  if (uv_spawn(uv_default_loop(), &pipeproc->proc, &pipeproc->proc_opts) != 0) { -    return false; -  } - -  job->pid = pipeproc->proc.pid; -  return true; -} - -void pipe_process_close(Job *job) -{ -  UvProcess *pipeproc = job->process; -  uv_close((uv_handle_t *)&pipeproc->proc, close_cb); -} - -static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) -{ -  Job *job = handle_get_job((uv_handle_t *)proc); -  job->status = (int)status; -  pipe_process_close(job); -} - -static void close_cb(uv_handle_t *handle) -{ -  Job *job = handle_get_job(handle); -  job_close_streams(job); -  job_decref(job); -} diff --git a/src/nvim/os/pipe_process.h b/src/nvim/os/pipe_process.h deleted file mode 100644 index 17a4255ddc..0000000000 --- a/src/nvim/os/pipe_process.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_PIPE_PROCESS_H -#define NVIM_OS_PIPE_PROCESS_H - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pipe_process.h.generated.h" -#endif -#endif  // NVIM_OS_PIPE_PROCESS_H diff --git a/src/nvim/os/pty_process.h b/src/nvim/os/pty_process.h deleted file mode 100644 index 62fcd1671f..0000000000 --- a/src/nvim/os/pty_process.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_PTY_PROCESS_H -#define NVIM_OS_PTY_PROCESS_H - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/pty_process.h.generated.h" -#endif -#endif  // NVIM_OS_PTY_PROCESS_H diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c deleted file mode 100644 index af84288f0f..0000000000 --- a/src/nvim/os/rstream.c +++ /dev/null @@ -1,253 +0,0 @@ -#include <assert.h> -#include <stdint.h> -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/rstream.h" -#include "nvim/ascii.h" -#include "nvim/vim.h" -#include "nvim/memory.h" -#include "nvim/log.h" -#include "nvim/misc1.h" - -struct rstream { -  void *data; -  uv_buf_t uvbuf; -  size_t fpos; -  RBuffer *buffer; -  uv_stream_t *stream; -  uv_idle_t *fread_idle; -  uv_handle_type file_type; -  uv_file fd; -  rstream_cb cb; -  bool free_handle; -}; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/rstream.c.generated.h" -#endif - -/// Creates a new RStream instance. A RStream encapsulates all the boilerplate -/// necessary for reading from a libuv stream. -/// -/// @param cb A function that will be called whenever some data is available -///        for reading with `rstream_read` -/// @param buffer RBuffer instance to associate with the RStream -/// @param data Some state to associate with the `RStream` instance -/// @return The newly-allocated `RStream` instance -RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) -{ -  RStream *rv = xmalloc(sizeof(RStream)); -  buffer->data = rv; -  buffer->full_cb = on_rbuffer_full; -  buffer->nonfull_cb = on_rbuffer_nonfull; -  rv->buffer = buffer; -  rv->fpos = 0; -  rv->data = data; -  rv->cb = cb; -  rv->stream = NULL; -  rv->fread_idle = NULL; -  rv->free_handle = false; -  rv->file_type = UV_UNKNOWN_HANDLE; - -  return rv; -} - -static void on_rbuffer_full(RBuffer *buf, void *data) -{ -  rstream_stop(data); -} - -static void on_rbuffer_nonfull(RBuffer *buf, void *data) -{ -  rstream_start(data); -} - -/// Frees all memory allocated for a RStream instance -/// -/// @param rstream The `RStream` instance -void rstream_free(RStream *rstream) -{ -  if (rstream->free_handle) { -    if (rstream->fread_idle != NULL) { -      uv_close((uv_handle_t *)rstream->fread_idle, close_cb); -    } else { -      uv_close((uv_handle_t *)rstream->stream, close_cb); -    } -  } - -  rbuffer_free(rstream->buffer); -  xfree(rstream); -} - -/// Sets the underlying `uv_stream_t` instance -/// -/// @param rstream The `RStream` instance -/// @param stream The new `uv_stream_t` instance -void rstream_set_stream(RStream *rstream, uv_stream_t *stream) -{ -  handle_set_rstream((uv_handle_t *)stream, rstream); -  rstream->stream = stream; -} - -/// Sets the underlying file descriptor that will be read from. Only pipes -/// and regular files are supported for now. -/// -/// @param rstream The `RStream` instance -/// @param file The file descriptor -void rstream_set_file(RStream *rstream, uv_file file) -{ -  rstream->file_type = uv_guess_handle(file); - -  if (rstream->free_handle) { -    // If this is the second time we're calling this function, free the -    // previously allocated memory -    if (rstream->fread_idle != NULL) { -      uv_close((uv_handle_t *)rstream->fread_idle, close_cb); -      rstream->fread_idle = NULL; -    } else { -      uv_close((uv_handle_t *)rstream->stream, close_cb); -      rstream->stream = NULL; -    } -  } - -  if (rstream->file_type == UV_FILE) { -    // Non-blocking file reads are simulated with an idle handle that reads -    // in chunks of rstream->buffer_size, giving time for other events to -    // be processed between reads. -    rstream->fread_idle = xmalloc(sizeof(uv_idle_t)); -    uv_idle_init(uv_default_loop(), rstream->fread_idle); -    rstream->fread_idle->data = NULL; -    handle_set_rstream((uv_handle_t *)rstream->fread_idle, rstream); -  } else { -    // Only pipes are supported for now -    assert(rstream->file_type == UV_NAMED_PIPE -        || rstream->file_type == UV_TTY); -    rstream->stream = xmalloc(sizeof(uv_pipe_t)); -    uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0); -    uv_pipe_open((uv_pipe_t *)rstream->stream, file); -    rstream->stream->data = NULL; -    handle_set_rstream((uv_handle_t *)rstream->stream, rstream); -  } - -  rstream->fd = file; -  rstream->free_handle = true; -} - -/// Starts watching for events from a `RStream` instance. -/// -/// @param rstream The `RStream` instance -void rstream_start(RStream *rstream) -{ -  if (rstream->file_type == UV_FILE) { -    uv_idle_start(rstream->fread_idle, fread_idle_cb); -  } else { -    uv_read_start(rstream->stream, alloc_cb, read_cb); -  } -} - -/// Stops watching for events from a `RStream` instance. -/// -/// @param rstream The `RStream` instance -void rstream_stop(RStream *rstream) -{ -  if (rstream->file_type == UV_FILE) { -    uv_idle_stop(rstream->fread_idle); -  } else { -    uv_read_stop(rstream->stream); -  } -} - -// Callbacks used by libuv - -// Called by libuv to allocate memory for reading. -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) -{ -  RStream *rstream = handle_get_rstream(handle); -  buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len); -} - -// Callback invoked by libuv after it copies the data into the buffer provided -// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a -// 0-length buffer. -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) -{ -  RStream *rstream = handle_get_rstream((uv_handle_t *)stream); - -  if (cnt <= 0) { -    if (cnt != UV_ENOBUFS -        // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: -        // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. -        // -        // We don't need to do anything with the RBuffer because the next call -        // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` -        // won't be called) -        && cnt != 0) { -      DLOG("Closing RStream(%p) because of %s(%zd)", rstream, -           uv_strerror((int)cnt), cnt); -      // Read error or EOF, either way stop the stream and invoke the callback -      // with eof == true -      uv_read_stop(stream); -      rstream->cb(rstream, rstream->buffer, rstream->data, true); -    } -    return; -  } - -  // at this point we're sure that cnt is positive, no error occurred -  size_t nread = (size_t)cnt; -  // Data was already written, so all we need is to update 'wpos' to reflect -  // the space actually used in the buffer. -  rbuffer_produced(rstream->buffer, nread); -  rstream->cb(rstream, rstream->buffer, rstream->data, false); -} - -// Called by the by the 'idle' handle to emulate a reading event -static void fread_idle_cb(uv_idle_t *handle) -{ -  uv_fs_t req; -  RStream *rstream = handle_get_rstream((uv_handle_t *)handle); - -  rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len); - -  // the offset argument to uv_fs_read is int64_t, could someone really try -  // to read more than 9 quintillion (9e18) bytes? -  // upcast is meant to avoid tautological condition warning on 32 bits -  uintmax_t fpos_intmax = rstream->fpos; -  if (fpos_intmax > INT64_MAX) { -    ELOG("stream offset overflow"); -    preserve_exit(); -  } - -  // Synchronous read -  uv_fs_read( -      uv_default_loop(), -      &req, -      rstream->fd, -      &rstream->uvbuf, -      1, -      (int64_t) rstream->fpos, -      NULL); - -  uv_fs_req_cleanup(&req); - -  if (req.result <= 0) { -    uv_idle_stop(rstream->fread_idle); -    rstream->cb(rstream, rstream->buffer, rstream->data, true); -    return; -  } - -  // no errors (req.result (ssize_t) is positive), it's safe to cast. -  size_t nread = (size_t) req.result; -  rbuffer_produced(rstream->buffer, nread); -  rstream->fpos += nread; -} - -static void close_cb(uv_handle_t *handle) -{ -  xfree(handle->data); -  xfree(handle); -} diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h deleted file mode 100644 index 3e24724573..0000000000 --- a/src/nvim/os/rstream.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef NVIM_OS_RSTREAM_H -#define NVIM_OS_RSTREAM_H - -#include <stdbool.h> -#include <stdint.h> -#include <uv.h> -#include "nvim/os/event_defs.h" -#include "nvim/os/rstream_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/rstream.h.generated.h" -#endif -#endif  // NVIM_OS_RSTREAM_H diff --git a/src/nvim/os/rstream_defs.h b/src/nvim/os/rstream_defs.h deleted file mode 100644 index 45dced0b62..0000000000 --- a/src/nvim/os/rstream_defs.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef NVIM_OS_RSTREAM_DEFS_H -#define NVIM_OS_RSTREAM_DEFS_H - -#include <stdbool.h> - -#include "nvim/rbuffer.h" - -typedef struct rstream RStream; - -/// Type of function called when the RStream receives data -/// -/// @param rstream The RStream instance -/// @param rbuffer The associated RBuffer instance -/// @param data State associated with the RStream instance -/// @param eof If the stream reached EOF. -typedef void (*rstream_cb)(RStream *rstream, RBuffer *buf, void *data, -    bool eof); - -#endif  // NVIM_OS_RSTREAM_DEFS_H - diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 48174533a6..e0d67d4951 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -8,9 +8,9 @@  #include "nvim/ascii.h"  #include "nvim/lib/kvec.h"  #include "nvim/log.h" -#include "nvim/os/event.h" -#include "nvim/os/job.h" -#include "nvim/os/rstream.h" +#include "nvim/event/loop.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/rstream.h"  #include "nvim/os/shell.h"  #include "nvim/os/signal.h"  #include "nvim/types.h" @@ -189,7 +189,7 @@ static int do_os_system(char **argv,  {    // the output buffer    DynamicBuffer buf = DYNAMIC_BUFFER_INIT; -  rstream_cb data_cb = system_data_cb; +  stream_read_cb data_cb = system_data_cb;    if (nread) {      *nread = 0;    } @@ -204,17 +204,15 @@ static int do_os_system(char **argv,    char prog[MAXPATHL];    xstrlcpy(prog, argv[0], MAXPATHL); -  int status; -  JobOptions opts = JOB_OPTIONS_INIT; -  opts.argv = argv; -  opts.data = &buf; -  opts.writable = input != NULL; -  opts.stdout_cb = data_cb; -  opts.stderr_cb = data_cb; -  opts.exit_cb = NULL; -  Job *job = job_start(opts, &status); - -  if (status <= 0) { +  Stream in, out, err; +  UvProcess uvproc = uv_process_init(&buf); +  Process *proc = &uvproc.process; +  proc->argv = argv; +  proc->in = input != NULL ? &in : NULL; +  proc->out = &out; +  proc->err = &err; +  if (!process_spawn(&loop, proc)) { +    loop_poll_events(&loop, 0);      // Failed, probably due to `sh` not being executable      if (!silent) {        MSG_PUTS(_("\nCannot execute ")); @@ -224,28 +222,32 @@ static int do_os_system(char **argv,      return -1;    } +  if (input != NULL) { +    wstream_init(proc->in, 0); +  } +  rstream_init(proc->out, 0); +  rstream_start(proc->out, data_cb); +  rstream_init(proc->err, 0); +  rstream_start(proc->err, data_cb); +    // write the input, if any    if (input) {      WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL); -    if (!job_write(job, input_buffer)) { -      // couldn't write, stop the job and tell the user about it -      job_stop(job); +    if (!wstream_write(&in, input_buffer)) { +      // couldn't write, stop the process and tell the user about it +      process_stop(proc);        return -1;      }      // close the input stream after everything is written -    job_write_cb(job, shell_write_cb); -  } else { -    // close the input stream, let the process know that no more input is -    // coming -    job_close_in(job); +    wstream_set_write_cb(&in, shell_write_cb);    }    // invoke busy_start here so event_poll_until wont change the busy state for    // the UI    ui_busy_start();    ui_flush(); -  status = job_wait(job, -1); +  int status = process_wait(proc, -1);    ui_busy_stop();    // prepare the out parameters if requested @@ -283,10 +285,9 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)    buf->data = xrealloc(buf->data, buf->cap);  } -static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  { -  Job *job = data; -  DynamicBuffer *dbuf = job_data(job); +  DynamicBuffer *dbuf = data;    size_t nread = buf->size;    dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1); @@ -294,7 +295,7 @@ static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)    dbuf->len += nread;  } -static void out_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  {    RBUFFER_UNTIL_EMPTY(buf, ptr, len) {      size_t written = write_output(ptr, len, false, @@ -470,8 +471,7 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,    return (size_t)(output - start);  } -static void shell_write_cb(WStream *wstream, void *data, int status) +static void shell_write_cb(Stream *stream, void *data, int status)  { -  Job *job = data; -  job_close_in(job); +  stream_close(stream, NULL);  } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index f824543003..6de3435c4c 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -11,12 +11,13 @@  #include "nvim/memory.h"  #include "nvim/misc1.h"  #include "nvim/misc2.h" +#include "nvim/event/signal.h"  #include "nvim/os/signal.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h" -static uv_signal_t spipe, shup, squit, sterm; +static SignalWatcher spipe, shup, squit, sterm;  #ifdef SIGPWR -static uv_signal_t spwr; +static SignalWatcher spwr;  #endif  static bool rejecting_deadly; @@ -27,40 +28,40 @@ static bool rejecting_deadly;  void signal_init(void)  { -  uv_signal_init(uv_default_loop(), &spipe); -  uv_signal_init(uv_default_loop(), &shup); -  uv_signal_init(uv_default_loop(), &squit); -  uv_signal_init(uv_default_loop(), &sterm); -  uv_signal_start(&spipe, signal_cb, SIGPIPE); -  uv_signal_start(&shup, signal_cb, SIGHUP); -  uv_signal_start(&squit, signal_cb, SIGQUIT); -  uv_signal_start(&sterm, signal_cb, SIGTERM); +  signal_watcher_init(&loop, &spipe, NULL); +  signal_watcher_init(&loop, &shup, NULL); +  signal_watcher_init(&loop, &squit, NULL); +  signal_watcher_init(&loop, &sterm, NULL); +  signal_watcher_start(&spipe, on_signal, SIGPIPE); +  signal_watcher_start(&shup, on_signal, SIGHUP); +  signal_watcher_start(&squit, on_signal, SIGQUIT); +  signal_watcher_start(&sterm, on_signal, SIGTERM);  #ifdef SIGPWR -  uv_signal_init(uv_default_loop(), &spwr); -  uv_signal_start(&spwr, signal_cb, SIGPWR); +  signal_watcher_init(&loop, &spwr, NULL); +  signal_watcher_start(&spwr, on_signal, SIGPWR);  #endif  }  void signal_teardown(void)  {    signal_stop(); -  uv_close((uv_handle_t *)&spipe, NULL); -  uv_close((uv_handle_t *)&shup, NULL); -  uv_close((uv_handle_t *)&squit, NULL); -  uv_close((uv_handle_t *)&sterm, NULL); +  signal_watcher_close(&spipe, NULL); +  signal_watcher_close(&shup, NULL); +  signal_watcher_close(&squit, NULL); +  signal_watcher_close(&sterm, NULL);  #ifdef SIGPWR -  uv_close((uv_handle_t *)&spwr, NULL); +  signal_watcher_close(&spwr, NULL);  #endif  }  void signal_stop(void)  { -  uv_signal_stop(&spipe); -  uv_signal_stop(&shup); -  uv_signal_stop(&squit); -  uv_signal_stop(&sterm); +  signal_watcher_stop(&spipe); +  signal_watcher_stop(&shup); +  signal_watcher_stop(&squit); +  signal_watcher_stop(&sterm);  #ifdef SIGPWR -  uv_signal_stop(&spwr); +  signal_watcher_stop(&spwr);  #endif  } @@ -111,10 +112,10 @@ static void deadly_signal(int signum)    preserve_exit();  } -static void signal_cb(uv_signal_t *handle, int signum) +static void on_signal(SignalWatcher *handle, int signum, void *data)  {    assert(signum >= 0); -  event_push((Event) { +  loop_push_event(&loop, (Event) {      .handler = on_signal_event,      .data = (void *)(uintptr_t)signum    }, false); diff --git a/src/nvim/os/signal.h b/src/nvim/os/signal.h index 927437b2db..5d8cc6f661 100644 --- a/src/nvim/os/signal.h +++ b/src/nvim/os/signal.h @@ -1,8 +1,6 @@  #ifndef NVIM_OS_SIGNAL_H  #define NVIM_OS_SIGNAL_H -#include "nvim/os/event_defs.h" -  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "os/signal.h.generated.h"  #endif diff --git a/src/nvim/os/stream.c b/src/nvim/os/stream.c deleted file mode 100644 index 0c448872c3..0000000000 --- a/src/nvim/os/stream.c +++ /dev/null @@ -1,30 +0,0 @@ -// Functions for working with stdio streams (as opposed to RStream/WStream). - -#include <stdio.h> -#include <stdbool.h> - -#include <uv.h> - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/stream.c.generated.h" -#endif - -/// Sets the stream associated with `fd` to "blocking" mode. -/// -/// @return `0` on success, or `-errno` on failure. -int stream_set_blocking(int fd, bool blocking) -{ -  // Private loop to avoid conflict with existing watcher(s): -  //    uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. -  uv_loop_t loop; -  uv_pipe_t stream; -  uv_loop_init(&loop); -  uv_pipe_init(&loop, &stream, 0); -  uv_pipe_open(&stream, fd); -  int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); -  uv_close((uv_handle_t *)&stream, NULL); -  uv_run(&loop, UV_RUN_NOWAIT);  // not necessary, but couldn't hurt. -  uv_loop_close(&loop); -  return retval; -} - diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index 590dfba797..6b5d4359db 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -7,7 +7,7 @@  #include <uv.h>  #include "nvim/os/time.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  #include "nvim/vim.h"  static uv_mutex_t delay_mutex; @@ -43,7 +43,7 @@ void os_delay(uint64_t milliseconds, bool ignoreinput)      if (milliseconds > INT_MAX) {        milliseconds = INT_MAX;      } -    event_poll_until((int)milliseconds, got_int); +    LOOP_POLL_EVENTS_UNTIL(&loop, (int)milliseconds, got_int);    } else {      os_microdelay(milliseconds * 1000);    } diff --git a/src/nvim/os/uv_helpers.c b/src/nvim/os/uv_helpers.c deleted file mode 100644 index 89687bdac7..0000000000 --- a/src/nvim/os/uv_helpers.c +++ /dev/null @@ -1,98 +0,0 @@ -#include <assert.h> -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -/// Common structure that will always be assigned to the `data` field of -/// libuv handles. It has fields for many types of pointers, and allow a single -/// handle to contain data from many sources -typedef struct { -  WStream *wstream; -  RStream *rstream; -  Job *job; -} HandleData; - - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/uv_helpers.c.generated.h" -#endif - -/// Gets the RStream instance associated with a libuv handle -/// -/// @param handle libuv handle -/// @return the RStream pointer -RStream *handle_get_rstream(uv_handle_t *handle) -{ -  RStream *rv = init(handle)->rstream; -  assert(rv != NULL); -  return rv; -} - -/// Associates a RStream instance with a libuv handle -/// -/// @param handle libuv handle -/// @param rstream the RStream pointer -void handle_set_rstream(uv_handle_t *handle, RStream *rstream) -{ -  init(handle)->rstream = rstream; -} - -/// Gets the WStream instance associated with a libuv handle -/// -/// @param handle libuv handle -/// @return the WStream pointer -WStream *handle_get_wstream(uv_handle_t *handle) -{ -  WStream *rv = init(handle)->wstream; -  assert(rv != NULL); -  return rv; -} - -/// Associates a WStream instance with a libuv handle -/// -/// @param handle libuv handle -/// @param wstream the WStream pointer -void handle_set_wstream(uv_handle_t *handle, WStream *wstream) -{ -  HandleData *data = init(handle); -  data->wstream = wstream; -} - -/// Gets the Job instance associated with a libuv handle -/// -/// @param handle libuv handle -/// @return the Job pointer -Job *handle_get_job(uv_handle_t *handle) -{ -  Job *rv = init(handle)->job; -  assert(rv != NULL); -  return rv; -} - -/// Associates a Job instance with a libuv handle -/// -/// @param handle libuv handle -/// @param job the Job pointer -void handle_set_job(uv_handle_t *handle, Job *job) -{ -  init(handle)->job = job; -} - -static HandleData *init(uv_handle_t *handle) -{ -  HandleData *rv; - -  if (handle->data == NULL) { -    rv = xmalloc(sizeof(HandleData)); -    rv->rstream = NULL; -    rv->wstream = NULL; -    rv->job = NULL; -    handle->data = rv; -  } else { -    rv = handle->data; -  } - -  return rv; -} diff --git a/src/nvim/os/uv_helpers.h b/src/nvim/os/uv_helpers.h deleted file mode 100644 index b49656bcb8..0000000000 --- a/src/nvim/os/uv_helpers.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef NVIM_OS_UV_HELPERS_H -#define NVIM_OS_UV_HELPERS_H - -#include <uv.h> - -#include "nvim/os/wstream_defs.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/job_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/uv_helpers.h.generated.h" -#endif -#endif  // NVIM_OS_UV_HELPERS_H diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c deleted file mode 100644 index 73896c381d..0000000000 --- a/src/nvim/os/wstream.c +++ /dev/null @@ -1,243 +0,0 @@ -#include <assert.h> -#include <stdint.h> -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#define DEFAULT_MAXMEM 1024 * 1024 * 10 - -struct wstream { -  uv_stream_t *stream; -  // Memory currently used by pending buffers -  size_t curmem; -  // Maximum memory used by this instance -  size_t maxmem; -  // Number of pending requests -  size_t pending_reqs; -  bool freed, free_handle; -  // (optional) Write callback and data -  wstream_cb cb; -  void *data; -}; - -struct wbuffer { -  size_t size, refcount; -  char *data; -  wbuffer_data_finalizer cb; -}; - -typedef struct { -  WStream *wstream; -  WBuffer *buffer; -  uv_write_t uv_req; -} WRequest; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/wstream.c.generated.h" -#endif - -/// Creates a new WStream instance. A WStream encapsulates all the boilerplate -/// necessary for writing to a libuv stream. -/// -/// @param maxmem Maximum amount memory used by this `WStream` instance. If 0, -///        a default value of 10mb will be used. -/// @return The newly-allocated `WStream` instance -WStream * wstream_new(size_t maxmem) -{ -  if (!maxmem) { -    maxmem = DEFAULT_MAXMEM; -  } - -  WStream *rv = xmalloc(sizeof(WStream)); -  rv->maxmem = maxmem; -  rv->stream = NULL; -  rv->curmem = 0; -  rv->pending_reqs = 0; -  rv->freed = false; -  rv->free_handle = false; -  rv->cb = NULL; - -  return rv; -} - -/// Frees all memory allocated for a WStream instance -/// -/// @param wstream The `WStream` instance -void wstream_free(WStream *wstream) { -  if (!wstream->pending_reqs) { -    if (wstream->free_handle) { -      uv_close((uv_handle_t *)wstream->stream, close_cb); -    } else { -      handle_set_wstream((uv_handle_t *)wstream->stream, NULL); -      xfree(wstream); -    } -  } else { -    wstream->freed = true; -  } -} - -/// Sets the underlying `uv_stream_t` instance -/// -/// @param wstream The `WStream` instance -/// @param stream The new `uv_stream_t` instance -void wstream_set_stream(WStream *wstream, uv_stream_t *stream) -{ -  handle_set_wstream((uv_handle_t *)stream, wstream); -  wstream->stream = stream; -} - -/// Sets the underlying file descriptor that will be written to. Only pipes -/// are supported for now. -/// -/// @param wstream The `WStream` instance -/// @param file The file descriptor -void wstream_set_file(WStream *wstream, uv_file file) -{ -  assert(uv_guess_handle(file) == UV_NAMED_PIPE || -         uv_guess_handle(file) == UV_TTY); -  wstream->stream = xmalloc(sizeof(uv_pipe_t)); -  uv_pipe_init(uv_default_loop(), (uv_pipe_t *)wstream->stream, 0); -  uv_pipe_open((uv_pipe_t *)wstream->stream, file); -  wstream->stream->data = NULL; -  handle_set_wstream((uv_handle_t *)wstream->stream, wstream); -  wstream->free_handle = true; -} - -/// Sets a callback that will be called on completion of a write request, -/// indicating failure/success. -/// -/// This affects all requests currently in-flight as well. Overwrites any -/// possible earlier callback. -/// -/// @note This callback will not fire if the write request couldn't even be -///       queued properly (i.e.: when `wstream_write() returns an error`). -/// -/// @param wstream The `WStream` instance -/// @param cb The callback -/// @param data User-provided data that will be passed to `cb` -void wstream_set_write_cb(WStream *wstream, wstream_cb cb, void *data) -  FUNC_ATTR_NONNULL_ARG(1) -{ -  wstream->cb = cb; -  wstream->data = data; -} - -/// Queues data for writing to the backing file descriptor of a `WStream` -/// instance. This will fail if the write would cause the WStream use more -/// memory than specified by `maxmem`. -/// -/// @param wstream The `WStream` instance -/// @param buffer The buffer which contains data to be written -/// @return false if the write failed -bool wstream_write(WStream *wstream, WBuffer *buffer) -{ -  // This should not be called after a wstream was freed -  assert(!wstream->freed); - -  if (wstream->curmem > wstream->maxmem) { -    goto err; -  } - -  wstream->curmem += buffer->size; - -  WRequest *data = xmalloc(sizeof(WRequest)); -  data->wstream = wstream; -  data->buffer = buffer; -  data->uv_req.data = data; - -  uv_buf_t uvbuf; -  uvbuf.base = buffer->data; -  uvbuf.len = buffer->size; - -  if (uv_write(&data->uv_req, wstream->stream, &uvbuf, 1, write_cb)) { -    xfree(data); -    goto err; -  } - -  wstream->pending_reqs++; -  return true; - -err: -  wstream_release_wbuffer(buffer); -  return false; -} - -/// Creates a WBuffer object for holding output data. Instances of this -/// object can be reused across WStream instances, and the memory is freed -/// automatically when no longer needed(it tracks the number of references -/// internally) -/// -/// @param data Data stored by the WBuffer -/// @param size The size of the data array -/// @param refcount The number of references for the WBuffer. This will be used -///        by WStream instances to decide when a WBuffer should be freed. -/// @param cb Pointer to function that will be responsible for freeing -///        the buffer data(passing 'free' will work as expected). -/// @return The allocated WBuffer instance -WBuffer *wstream_new_buffer(char *data, -                            size_t size, -                            size_t refcount, -                            wbuffer_data_finalizer cb) -{ -  WBuffer *rv = xmalloc(sizeof(WBuffer)); -  rv->size = size; -  rv->refcount = refcount; -  rv->cb = cb; -  rv->data = data; - -  return rv; -} - -static void write_cb(uv_write_t *req, int status) -{ -  WRequest *data = req->data; - -  data->wstream->curmem -= data->buffer->size; - -  wstream_release_wbuffer(data->buffer); - -  if (data->wstream->cb) { -    data->wstream->cb(data->wstream, -                      data->wstream->data, -                      status); -  } - -  data->wstream->pending_reqs--; - -  if (data->wstream->freed && data->wstream->pending_reqs == 0) { -    // Last pending write, free the wstream; -    if (data->wstream->free_handle) { -      uv_close((uv_handle_t *)data->wstream->stream, close_cb); -    } else { -      xfree(data->wstream); -    } -  } - -  xfree(data); -} - -void wstream_release_wbuffer(WBuffer *buffer) -{ -  if (!--buffer->refcount) { -    if (buffer->cb) { -      buffer->cb(buffer->data); -    } - -    xfree(buffer); -  } -} - -static void close_cb(uv_handle_t *handle) -{ -  xfree(handle_get_wstream(handle)); -  xfree(handle->data); -  xfree(handle); -} - diff --git a/src/nvim/os/wstream.h b/src/nvim/os/wstream.h deleted file mode 100644 index d0e9bef93a..0000000000 --- a/src/nvim/os/wstream.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef NVIM_OS_WSTREAM_H -#define NVIM_OS_WSTREAM_H - -#include <stdint.h> -#include <stdbool.h> -#include <uv.h> - -#include "nvim/os/wstream_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/wstream.h.generated.h" -#endif -#endif  // NVIM_OS_WSTREAM_H diff --git a/src/nvim/os/wstream_defs.h b/src/nvim/os/wstream_defs.h deleted file mode 100644 index cfa0bf0b60..0000000000 --- a/src/nvim/os/wstream_defs.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef NVIM_OS_WSTREAM_DEFS_H -#define NVIM_OS_WSTREAM_DEFS_H - -typedef struct wbuffer WBuffer; -typedef struct wstream WStream; -typedef void (*wbuffer_data_finalizer)(void *data); - -/// Type of function called when the WStream has information about a write -/// request. -/// -/// @param wstream The `WStream` instance -/// @param data User-defined data -/// @param status 0 on success, anything else indicates failure -typedef void (*wstream_cb)(WStream *wstream, -                           void *data, -                           int status); - -#endif  // NVIM_OS_WSTREAM_DEFS_H - diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c index ccd0073db1..122b3a171d 100644 --- a/src/nvim/os_unix.c +++ b/src/nvim/os_unix.c @@ -47,13 +47,10 @@  #include "nvim/types.h"  #include "nvim/os/os.h"  #include "nvim/os/time.h" -#include "nvim/os/event.h"  #include "nvim/os/input.h"  #include "nvim/os/shell.h"  #include "nvim/os/signal.h" -#include "nvim/os/job.h"  #include "nvim/msgpack_rpc/helpers.h" -#include "nvim/msgpack_rpc/defs.h"  #ifdef HAVE_STROPTS_H  # include <stropts.h> diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c index 9ce050ed7a..47fef692db 100644 --- a/src/nvim/terminal.c +++ b/src/nvim/terminal.c @@ -67,7 +67,8 @@  #include "nvim/ex_cmds.h"  #include "nvim/window.h"  #include "nvim/fileio.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h" +#include "nvim/event/time.h"  #include "nvim/api/private/helpers.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -80,7 +81,7 @@  // of data.  #define REFRESH_DELAY 10 -static uv_timer_t refresh_timer; +static TimeWatcher refresh_timer;  static bool refresh_pending = false;  typedef struct { @@ -150,7 +151,7 @@ static VTermColor default_vt_bg_rgb;  void terminal_init(void)  {    invalidated_terminals = pmap_new(ptr_t)(); -  uv_timer_init(uv_default_loop(), &refresh_timer); +  time_watcher_init(&loop, &refresh_timer, NULL);    // initialize a rgb->color index map for cterm attributes(VTermScreenCell    // only has RGB information and we need color indexes for terminal UIs) @@ -175,8 +176,8 @@ void terminal_init(void)  void terminal_teardown(void)  { -  uv_timer_stop(&refresh_timer); -  uv_close((uv_handle_t *)&refresh_timer, NULL); +  time_watcher_stop(&refresh_timer); +  time_watcher_close(&refresh_timer, NULL);    pmap_free(ptr_t)(invalidated_terminals);    map_free(int, int)(color_indexes);  } @@ -353,13 +354,13 @@ void terminal_enter(bool process_deferred)    while (term->buf == curbuf) {      if (process_deferred) { -      event_enable_deferred(); +      loop_enable_deferred_events(&loop);      }      c = safe_vgetc();      if (process_deferred) { -      event_disable_deferred(); +      loop_disable_deferred_events(&loop);      }      switch (c) { @@ -380,7 +381,7 @@ void terminal_enter(bool process_deferred)          break;        case K_EVENT: -        event_process(); +        loop_process_event(&loop);          break;        case Ctrl_N: @@ -877,16 +878,16 @@ static void invalidate_terminal(Terminal *term, int start_row, int end_row)    pmap_put(ptr_t)(invalidated_terminals, term, NULL);    if (!refresh_pending) { -    uv_timer_start(&refresh_timer, refresh_timer_cb, REFRESH_DELAY, 0); +    time_watcher_start(&refresh_timer, refresh_timer_cb, REFRESH_DELAY, 0);      refresh_pending = true;    }  }  // libuv timer callback. This will enqueue on_refresh to be processed as an  // event. -static void refresh_timer_cb(uv_timer_t *handle) +static void refresh_timer_cb(TimeWatcher *watcher, void *data)  { -  event_push((Event) {.handler = on_refresh}, false); +  loop_push_event(&loop, (Event) {.handler = on_refresh}, false);    refresh_pending = false;  } diff --git a/src/nvim/tui/term_input.inl b/src/nvim/tui/term_input.inl index efdcf0a41e..0a84a3688b 100644 --- a/src/nvim/tui/term_input.inl +++ b/src/nvim/tui/term_input.inl @@ -4,7 +4,8 @@  #include "nvim/misc2.h"  #include "nvim/os/os.h"  #include "nvim/os/input.h" -#include "nvim/os/rstream.h" +#include "nvim/event/rstream.h" +#include "nvim/event/time.h"  #define PASTETOGGLE_KEY "<f37>" @@ -12,9 +13,8 @@ struct term_input {    int in_fd;    bool paste_enabled;    TermKey *tk; -  uv_timer_t timer_handle; -  RBuffer *read_buffer; -  RStream *read_stream; +  TimeWatcher timer_handle; +  Stream read_stream;  };  static void forward_simple_utf8(TermKeyKey *key) @@ -107,7 +107,7 @@ static TermKeyResult tk_getkey(TermKey *tk, TermKeyKey *key, bool force)    return force ? termkey_getkey_force(tk, key) : termkey_getkey(tk, key);  } -static void timer_cb(uv_timer_t *handle); +static void timer_cb(TimeWatcher *watcher, void *data);  static int get_key_code_timeout(void)  { @@ -147,27 +147,26 @@ static void tk_getkeys(TermInput *input, bool force)    if (ms > 0) {      // Stop the current timer if already running -    uv_timer_stop(&input->timer_handle); -    uv_timer_start(&input->timer_handle, timer_cb, (uint32_t)ms, 0); +    time_watcher_stop(&input->timer_handle); +    time_watcher_start(&input->timer_handle, timer_cb, (uint32_t)ms, 0);    } else {      tk_getkeys(input, true);    }  } - -static void timer_cb(uv_timer_t *handle) +static void timer_cb(TimeWatcher *watcher, void *data)  { -  tk_getkeys(handle->data, true); +  tk_getkeys(data, true);  }  static bool handle_bracketed_paste(TermInput *input)  { -  if (rbuffer_size(input->read_buffer) > 5 && -      (!rbuffer_cmp(input->read_buffer, "\x1b[200~", 6) || -       !rbuffer_cmp(input->read_buffer, "\x1b[201~", 6))) { -    bool enable = *rbuffer_get(input->read_buffer, 4) == '0'; +  if (rbuffer_size(input->read_stream.buffer) > 5 && +      (!rbuffer_cmp(input->read_stream.buffer, "\x1b[200~", 6) || +       !rbuffer_cmp(input->read_stream.buffer, "\x1b[201~", 6))) { +    bool enable = *rbuffer_get(input->read_stream.buffer, 4) == '0';      // Advance past the sequence -    rbuffer_consumed(input->read_buffer, 6); +    rbuffer_consumed(input->read_stream.buffer, 6);      if (input->paste_enabled == enable) {        return true;      } @@ -194,19 +193,22 @@ static bool handle_bracketed_paste(TermInput *input)  static bool handle_forced_escape(TermInput *input)  { -  if (rbuffer_size(input->read_buffer) > 1 -      && !rbuffer_cmp(input->read_buffer, "\x1b\x00", 2)) { +  if (rbuffer_size(input->read_stream.buffer) > 1 +      && !rbuffer_cmp(input->read_stream.buffer, "\x1b\x00", 2)) {      // skip the ESC and NUL and push one <esc> to the input buffer      size_t rcnt; -    termkey_push_bytes(input->tk, rbuffer_read_ptr(input->read_buffer, &rcnt), 1); -    rbuffer_consumed(input->read_buffer, 2); +    termkey_push_bytes(input->tk, rbuffer_read_ptr(input->read_stream.buffer, +          &rcnt), 1); +    rbuffer_consumed(input->read_stream.buffer, 2);      tk_getkeys(input, true);      return true;    }    return false;  } -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void restart_reading(Event event); + +static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  {    TermInput *input = data; @@ -223,8 +225,9 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)        //        // ls *.md | xargs nvim        input->in_fd = 2; -      rstream_set_file(input->read_stream, input->in_fd); -      rstream_start(input->read_stream); +      stream_close(&input->read_stream, NULL); +      loop_push_event(&loop, +          (Event) { .data = input, .handler = restart_reading }, false);      } else {        input_done();      } @@ -240,7 +243,7 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)      // so the `handle_bracketed_paste`/`handle_forced_escape` calls above work      // as expected.      size_t count = 0; -    RBUFFER_EACH(input->read_buffer, c, i) { +    RBUFFER_EACH(input->read_stream.buffer, c, i) {        count = i + 1;        if (c == '\x1b' && count > 1) {          count--; @@ -248,13 +251,13 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)        }      } -    RBUFFER_UNTIL_EMPTY(input->read_buffer, ptr, len) { +    RBUFFER_UNTIL_EMPTY(input->read_stream.buffer, ptr, len) {        size_t consumed = termkey_push_bytes(input->tk, ptr, MIN(count, len));        // termkey_push_bytes can return (size_t)-1, so it is possible that -      // `consumed > input->read_buffer->size`, but since tk_getkeys is called -      // soon, it shouldn't happen -      assert(consumed <= input->read_buffer->size); -      rbuffer_consumed(input->read_buffer, consumed); +      // `consumed > input->read_stream.buffer->size`, but since tk_getkeys is +      // called soon, it shouldn't happen +      assert(consumed <= input->read_stream.buffer->size); +      rbuffer_consumed(input->read_stream.buffer, consumed);        // Need to process the keys now since there's no guarantee "count" will        // fit into libtermkey's input buffer.        tk_getkeys(input, false); @@ -262,11 +265,18 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)          break;        }      } -  } while (rbuffer_size(input->read_buffer)); +  } while (rbuffer_size(input->read_stream.buffer));    // Make sure the next input escape sequence fits into the ring buffer    // without wrap around, otherwise it could be misinterpreted. -  rbuffer_reset(input->read_buffer); +  rbuffer_reset(input->read_stream.buffer); +} + +static void restart_reading(Event event) +{ +  TermInput *input = event.data; +  rstream_init_fd(&loop, &input->read_stream, input->in_fd, 0xfff, input); +  rstream_start(&input->read_stream, read_cb);  }  static TermInput *term_input_new(void) @@ -283,13 +293,10 @@ static TermInput *term_input_new(void)    int curflags = termkey_get_canonflags(rv->tk);    termkey_set_canonflags(rv->tk, curflags | TERMKEY_CANON_DELBS);    // setup input handle -  rv->read_buffer = rbuffer_new(0xfff); -  rv->read_stream = rstream_new(read_cb, rv->read_buffer, rv); -  rstream_set_file(rv->read_stream, rv->in_fd); -  rstream_start(rv->read_stream); +  rstream_init_fd(&loop, &rv->read_stream, rv->in_fd, 0xfff, rv); +  rstream_start(&rv->read_stream, read_cb);    // initialize a timer handle for handling ESC with libtermkey -  uv_timer_init(uv_default_loop(), &rv->timer_handle); -  rv->timer_handle.data = rv; +  time_watcher_init(&loop, &rv->timer_handle, rv);    // Set the pastetoggle option to a special key that will be sent when    // \e[20{0,1}~/ are received    Error err = ERROR_INIT; @@ -300,12 +307,13 @@ static TermInput *term_input_new(void)  static void term_input_destroy(TermInput *input)  { -  uv_timer_stop(&input->timer_handle); -  rstream_stop(input->read_stream); -  rstream_free(input->read_stream); -  uv_close((uv_handle_t *)&input->timer_handle, NULL); +  time_watcher_stop(&input->timer_handle); +  time_watcher_close(&input->timer_handle, NULL); +  rstream_stop(&input->read_stream); +  stream_close(&input->read_stream, NULL);    termkey_destroy(input->tk); -  event_poll(0);  // Run once to remove references to input/timer handles +  // Run once to remove references to input/timer handles +  loop_poll_events(&loop, 0);    xfree(input);  } diff --git a/src/nvim/tui/tui.c b/src/nvim/tui/tui.c index fe29dbd961..a12ee880d6 100644 --- a/src/nvim/tui/tui.c +++ b/src/nvim/tui/tui.c @@ -13,7 +13,8 @@  #include "nvim/memory.h"  #include "nvim/api/vim.h"  #include "nvim/api/private/helpers.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h" +#include "nvim/event/signal.h"  #include "nvim/tui/tui.h"  #include "nvim/strings.h" @@ -43,7 +44,7 @@ typedef struct {    uv_loop_t *write_loop;    unibi_term *ut;    uv_tty_t output_handle; -  uv_signal_t winch_handle; +  SignalWatcher winch_handle;    Rect scroll_region;    kvec_t(Rect) invalid_regions;    int row, col; @@ -132,9 +133,8 @@ UI *tui_start(void)    update_size(ui);    // listen for SIGWINCH -  uv_signal_init(uv_default_loop(), &data->winch_handle); -  uv_signal_start(&data->winch_handle, sigwinch_cb, SIGWINCH); -  data->winch_handle.data = ui; +  signal_watcher_init(&loop, &data->winch_handle, ui); +  signal_watcher_start(&data->winch_handle, sigwinch_cb, SIGWINCH);    ui->stop = tui_stop;    ui->rgb = os_getenv("NVIM_TUI_ENABLE_TRUE_COLOR") != NULL; @@ -172,8 +172,8 @@ static void tui_stop(UI *ui)    TUIData *data = ui->data;    // Destroy common stuff    kv_destroy(data->invalid_regions); -  uv_signal_stop(&data->winch_handle); -  uv_close((uv_handle_t *)&data->winch_handle, NULL); +  signal_watcher_stop(&data->winch_handle); +  signal_watcher_close(&data->winch_handle, NULL);    // Destroy input stuff    term_input_destroy(data->input);    // Destroy output stuff @@ -207,12 +207,12 @@ static void try_resize(Event ev)    ui_refresh();  } -static void sigwinch_cb(uv_signal_t *handle, int signum) +static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data)  {    // Queue the event because resizing can result in recursive event_poll calls    // FIXME(blueyed): TUI does not resize properly when not deferred. Why? #2322 -  event_push((Event) { -    .data = handle->data, +  loop_push_event(&loop, (Event) { +    .data = data,      .handler = try_resize    }, true);  } diff --git a/src/nvim/ui.c b/src/nvim/ui.c index 088055777a..dc2bc0898c 100644 --- a/src/nvim/ui.c +++ b/src/nvim/ui.c @@ -23,7 +23,7 @@  #include "nvim/normal.h"  #include "nvim/option.h"  #include "nvim/os_unix.h" -#include "nvim/os/event.h" +#include "nvim/event/loop.h"  #include "nvim/os/time.h"  #include "nvim/os/input.h"  #include "nvim/os/signal.h" @@ -216,7 +216,7 @@ void ui_detach(UI *ui)    ui_count--;    // schedule a refresh -  event_push((Event) { .handler = refresh }, false); +  loop_push_event(&loop, (Event) { .handler = refresh }, false);  }  void ui_clear(void) | 
