diff options
| -rw-r--r-- | src/nvim/eval.c | 1 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 1 | ||||
| -rw-r--r-- | src/nvim/os/job.c | 182 | ||||
| -rw-r--r-- | src/nvim/os/rstream.c | 5 | ||||
| -rw-r--r-- | src/nvim/os/shell.c | 423 | 
5 files changed, 267 insertions, 345 deletions
| diff --git a/src/nvim/eval.c b/src/nvim/eval.c index d0af4b8249..6b6f008a44 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -10675,6 +10675,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)    job_start(argv,              xstrdup((char *)argvars[0].vval.v_string), +            true,              on_job_stdout,              on_job_stderr,              on_job_exit, diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 43bed54b2c..8d74921562 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -119,6 +119,7 @@ uint64_t channel_from_job(char **argv)    int status;    channel->data.job = job_start(argv,                                  channel, +                                true,                                  job_out,                                  job_err,                                  job_exit, diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index f8ad6874c9..8c744b0479 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -13,10 +13,14 @@  #include "nvim/os/event.h"  #include "nvim/os/event_defs.h"  #include "nvim/os/shell.h" +#include "nvim/os/time.h"  #include "nvim/vim.h"  #include "nvim/memory.h" -#define EXIT_TIMEOUT 25 +// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit +// before we send SIGNAL to it +#define TERM_TIMEOUT 1000000000 +#define KILL_TIMEOUT (TERM_TIMEOUT * 2)  #define MAX_RUNNING_JOBS 100  #define JOB_BUFFER_SIZE 0xFFFF @@ -39,14 +43,14 @@ struct job {    // Job id the index in the job table plus one.    int id;    // Exit status code of the job process -  int64_t status; -  // Number of polls after a SIGTERM that will trigger a SIGKILL -  int exit_timeout; +  int status;    // Number of references to the job. The job resources will only be freed by    // close_cb when this is 0    int refcount; -  // If the job was already stopped -  bool stopped; +  // Time when job_stop was called for the job. +  uint64_t stopped_time; +  // If SIGTERM was already sent to the job(only send one before SIGKILL) +  bool term_sent;    // Data associated with the job    void *data;    // Callbacks @@ -64,8 +68,8 @@ struct job {  };  static Job *table[MAX_RUNNING_JOBS] = {NULL}; -static uint32_t job_count = 0; -static uv_prepare_t job_prepare; +size_t stop_requests = 0; +static uv_timer_t job_stop_timer;  // Some helpers shared in this module @@ -78,7 +82,7 @@ static uv_prepare_t job_prepare;  void job_init(void)  {    uv_disable_stdio_inheritance(); -  uv_prepare_init(uv_default_loop(), &job_prepare); +  uv_timer_init(uv_default_loop(), &job_stop_timer);  }  /// Releases job control resources and terminates running jobs @@ -136,10 +140,12 @@ void job_teardown(void)  /// @param argv Argument vector for the process. The first item is the  ///        executable to run.  /// @param data Caller data that will be associated with the job +/// @param writable If true the job stdin will be available for writing with +///                 job_write, otherwise it will be redirected to /dev/null  /// @param stdout_cb Callback that will be invoked when data is available -///        on stdout +///        on stdout. If NULL stdout will be redirected to /dev/null.  /// @param stderr_cb Callback that will be invoked when data is available -///        on stderr +///        on stderr. If NULL stderr will be redirected to /dev/null.  /// @param job_exit_cb Callback that will be invoked when the job exits  /// @param maxmem Maximum amount of memory used by the job WStream  /// @param[out] status The job id if the job started successfully, 0 if the job @@ -147,6 +153,7 @@ void job_teardown(void)  /// @return The job pointer if the job started successfully, NULL otherwise  Job *job_start(char **argv,                 void *data, +               bool writable,                 rstream_cb stdout_cb,                 rstream_cb stderr_cb,                 job_exit_cb job_exit_cb, @@ -174,13 +181,13 @@ Job *job_start(char **argv,    job->id = i + 1;    *status = job->id;    job->status = -1; -  job->refcount = 4; +  job->refcount = 1;    job->data = data;    job->stdout_cb = stdout_cb;    job->stderr_cb = stderr_cb;    job->exit_cb = job_exit_cb; -  job->stopped = false; -  job->exit_timeout = EXIT_TIMEOUT; +  job->stopped_time = 0; +  job->term_sent = false;    job->proc_opts.file = argv[0];    job->proc_opts.args = argv;    job->proc_opts.stdio = job->stdio; @@ -193,49 +200,78 @@ Job *job_start(char **argv,    job->proc_stdin.data = NULL;    job->proc_stdout.data = NULL;    job->proc_stderr.data = NULL; +  job->in = NULL; +  job->out = NULL; +  job->err = NULL;    // Initialize the job std{in,out,err} -  uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); -  job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; -  job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; +  job->stdio[0].flags = UV_IGNORE; +  job->stdio[1].flags = UV_IGNORE; +  job->stdio[2].flags = UV_IGNORE; + +  if (writable) { +    uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); +    job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; +    job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; +    handle_set_job((uv_handle_t *)&job->proc_stdin, job); +    job->refcount++; +  } -  uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); -  job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; -  job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; +  if (stdout_cb) { +    uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); +    job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; +    job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; +    handle_set_job((uv_handle_t *)&job->proc_stdout, job); +    job->refcount++; +  } -  uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); -  job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; -  job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; +  if (stderr_cb) { +    uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); +    job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; +    job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; +    handle_set_job((uv_handle_t *)&job->proc_stderr, job); +    job->refcount++; +  } -  // Give all handles a reference to the job    handle_set_job((uv_handle_t *)&job->proc, job); -  handle_set_job((uv_handle_t *)&job->proc_stdin, job); -  handle_set_job((uv_handle_t *)&job->proc_stdout, job); -  handle_set_job((uv_handle_t *)&job->proc_stderr, job);    // Spawn the job    if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { +    if (writable) { +      uv_close((uv_handle_t *)&job->proc_stdin, close_cb); +    } +    if (stdout_cb) { +      uv_close((uv_handle_t *)&job->proc_stdout, close_cb); +    } +    if (stderr_cb) { +      uv_close((uv_handle_t *)&job->proc_stderr, close_cb); +    } +    uv_close((uv_handle_t *)&job->proc, close_cb); +    event_poll(0); +    // Manually invoke the close_cb to free the job resources      *status = -1;      return NULL;    } -  job->in = wstream_new(maxmem); -  wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); +  if (writable) { +    job->in = wstream_new(maxmem); +    wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); +  } +    // Start the readable streams -  job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); -  job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); -  rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); -  rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); -  rstream_start(job->out); -  rstream_start(job->err); -  // Save the job to the table -  table[i] = job; +  if (stdout_cb) { +    job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); +    rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); +    rstream_start(job->out); +  } -  // Start polling job status if this is the first -  if (job_count == 0) { -    uv_prepare_start(&job_prepare, job_prepare_cb); +  if (stderr_cb) { +    job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); +    rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); +    rstream_start(job->err);    } -  job_count++; +  // Save the job to the table +  table[i] = job;    return job;  } @@ -249,7 +285,7 @@ Job *job_find(int id)    Job *job;    if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1]) -      || job->stopped) { +      || job->stopped_time) {      return NULL;    } @@ -262,7 +298,22 @@ Job *job_find(int id)  /// @param job The Job instance  void job_stop(Job *job)  { -  job->stopped = true; +  if (job->stopped_time) { +    return; +  } + +  job->stopped_time = os_hrtime(); +  // Close the standard streams of the job +  close_job_in(job); +  close_job_out(job); +  close_job_err(job); + +  if (!stop_requests++) { +    // When there's at least one stop request pending, start a timer that +    // will periodically check if a signal should be send to a to the job +    DLOG("Starting job kill timer"); +    uv_timer_start(&job_stop_timer, job_stop_timer_cb, 100, 100); +  }  }  /// job_wait - synchronously wait for a job to finish @@ -276,6 +327,9 @@ void job_stop(Job *job)  ///         is possible on some OS.  int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL  { +  // The default status is -1, which represents a timeout +  int status = -1; +    // Increase refcount to stop the job from being freed before we have a    // chance to get the status.    job->refcount++; @@ -291,15 +345,16 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL      event_poll(0);    } -  if (!--job->refcount) { -    int status = (int) job->status; -    // Manually invoke close_cb to free the job resources +  if (job->refcount == 1) { +    // Job exited, collect status and manually invoke close_cb to free the job +    // resources +    status = job->status;      close_cb((uv_handle_t *)&job->proc); -    return status; +  } else { +    job->refcount--;    } -  // return -1 for a timeout -  return  -1; +  return status;  }  /// Close the pipe used to write to the job. @@ -372,10 +427,10 @@ static void job_exit_callback(Job *job)      job->exit_cb(job, job->data);    } -  // Stop polling job status if this was the last -  job_count--; -  if (job_count == 0) { -    uv_prepare_stop(&job_prepare); +  if (!--stop_requests) { +    // Stop the timer if no more stop requests are pending +    DLOG("Stopping job kill timer"); +    uv_timer_stop(&job_stop_timer);    }  } @@ -386,21 +441,24 @@ static bool is_alive(Job *job)  /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those  /// that didn't die from SIGTERM after a while(exit_timeout is 0). -static void job_prepare_cb(uv_prepare_t *handle) +static void job_stop_timer_cb(uv_timer_t *handle)  {    Job *job; -  int i; +  uint64_t now = os_hrtime(); -  for (i = 0; i < MAX_RUNNING_JOBS; i++) { -    if ((job = table[i]) == NULL || !job->stopped) { +  for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) { +    if ((job = table[i]) == NULL || !job->stopped_time) {        continue;      } -    if ((job->exit_timeout--) == EXIT_TIMEOUT) { -      // Job was just stopped, close all stdio handles and send SIGTERM +    uint64_t elapsed = now - job->stopped_time; + +    if (!job->term_sent && elapsed >= TERM_TIMEOUT) { +      ILOG("Sending SIGTERM to job(id: %d)", job->id);        uv_process_kill(&job->proc, SIGTERM); -    } else if (job->exit_timeout == 0) { -      // We've waited long enough, send SIGKILL +      job->term_sent = true; +    } else if (elapsed >= KILL_TIMEOUT) { +      ILOG("Sending SIGKILL to job(id: %d)", job->id);        uv_process_kill(&job->proc, SIGKILL);      }    } @@ -429,7 +487,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)  {    Job *job = handle_get_job((uv_handle_t *)proc); -  job->status = status; +  job->status = (int)status;    uv_close((uv_handle_t *)&job->proc, close_cb);  } diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index beff404fd0..f16226cdd1 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -309,6 +309,11 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count)    return rbuffer_read(rstream->buffer, buffer, count);  } +RBuffer *rstream_buffer(RStream *rstream) +{ +  return rstream->buffer; +} +  // Callbacks used by libuv  // Called by libuv to allocate memory for reading. diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index d5464f7975..7449ac637c 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -24,24 +24,14 @@  #include "nvim/option_defs.h"  #include "nvim/charset.h"  #include "nvim/strings.h" +#include "nvim/ui.h" -#define BUFFER_LENGTH 1024 - -typedef struct { -  bool reading; -  int old_state, old_mode, exit_status, exited; -  char *wbuffer; -  char rbuffer[BUFFER_LENGTH]; -  uv_buf_t bufs[2]; -  uv_stream_t *shell_stdin; -  garray_T ga; -} ProcessData; +#define DYNAMIC_BUFFER_INIT {NULL, 0, 0}  typedef struct {    char *data; -  size_t cap; -  size_t len; -} dyn_buffer_t; +  size_t cap, len; +} DynamicBuffer;  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "os/shell.c.generated.h" @@ -109,140 +99,70 @@ void shell_free_argv(char **argv)  /// @param cmd The command to be executed. If NULL it will run an interactive  ///        shell  /// @param opts Various options that control how the shell will work -/// @param extra_shell_arg Extra argument to be passed to the shell -int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg) +/// @param extra_arg Extra argument to be passed to the shell +int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_arg)  { -  uv_stdio_container_t proc_stdio[3]; -  uv_process_options_t proc_opts; -  uv_process_t proc; -  uv_pipe_t proc_stdin, proc_stdout; -  uv_write_t write_req; -  int expected_exits = 1; -  ProcessData pdata = { -    .reading = false, -    .exited = 0, -    .old_mode = cur_tmode, -    .old_state = State, -    .shell_stdin = (uv_stream_t *)&proc_stdin, -    .wbuffer = NULL, -  }; - +  DynamicBuffer input = DYNAMIC_BUFFER_INIT; +  char *output = NULL, **output_ptr = NULL; +  int current_state = State, old_mode = cur_tmode; +  bool forward_output = true;    out_flush(); +    if (opts & kShellOptCooked) { -    // set to normal mode      settmode(TMODE_COOK);    }    // While the child is running, ignore terminating signals    signal_reject_deadly(); -  // Create argv for `uv_spawn` -  // TODO(tarruda): we can use a static buffer for small argument vectors. 1024 -  // bytes should be enough for most of the commands and if more is necessary -  // we can allocate a another buffer -  proc_opts.args = shell_build_argv(cmd, extra_shell_arg); -  proc_opts.file = proc_opts.args[0]; -  proc_opts.exit_cb = exit_cb; -  // Initialize libuv structures -  proc_opts.stdio = proc_stdio; -  proc_opts.stdio_count = 3; -  // Hide window on Windows :) -  proc_opts.flags = UV_PROCESS_WINDOWS_HIDE; -  proc_opts.cwd = NULL; -  proc_opts.env = NULL; - -  // The default is to inherit all standard file descriptors(this will change -  // when the UI is moved to an external process) -  proc_stdio[0].flags = UV_INHERIT_FD; -  proc_stdio[0].data.fd = 0; -  proc_stdio[1].flags = UV_INHERIT_FD; -  proc_stdio[1].data.fd = 1; -  proc_stdio[2].flags = UV_INHERIT_FD; -  proc_stdio[2].data.fd = 2; -    if (opts & (kShellOptHideMess | kShellOptExpand)) { -    // Ignore the shell stdio(redirects to /dev/null on unixes) -    proc_stdio[0].flags = UV_IGNORE; -    proc_stdio[1].flags = UV_IGNORE; -    proc_stdio[2].flags = UV_IGNORE; +    forward_output = false;    } else {      State = EXTERNCMD;      if (opts & kShellOptWrite) { -      // Write from the current buffer into the process stdin -      uv_pipe_init(uv_default_loop(), &proc_stdin, 0); -      write_req.data = &pdata; -      proc_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; -      proc_stdio[0].data.stream = (uv_stream_t *)&proc_stdin; +      read_input(&input);      }      if (opts & kShellOptRead) { -      // Read from the process stdout into the current buffer -      uv_pipe_init(uv_default_loop(), &proc_stdout, 0); -      proc_stdout.data = &pdata; -      proc_stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; -      proc_stdio[1].data.stream = (uv_stream_t *)&proc_stdout; -      ga_init(&pdata.ga, 1, BUFFER_LENGTH); +      output_ptr = &output; +      forward_output = false;      }    } -  if (uv_spawn(uv_default_loop(), &proc, &proc_opts)) { -    // Failed, probably due to `sh` not being executable -    if (!emsg_silent) { -      MSG_PUTS(_("\nCannot execute shell ")); -      msg_outtrans(p_sh); -      msg_putchar('\n'); -    } - -    return proc_cleanup_exit(&pdata, &proc_opts, opts); +  size_t nread; +  int status = shell((const char *)cmd, +                     (const char *)extra_arg, +                     input.data, +                     input.len, +                     output_ptr, +                     &nread, +                     emsg_silent, +                     forward_output); + +  if (input.data) { +    free(input.data);    } -  // Assign the flag address after `proc` is initialized by `uv_spawn` -  proc.data = &pdata; - -  if (opts & kShellOptWrite) { -    // Queue everything for writing to the shell stdin -    write_selection(&write_req); -    expected_exits++; -  } - -  if (opts & kShellOptRead) { -    // Start the read stream for the shell stdout -    uv_read_start((uv_stream_t *)&proc_stdout, alloc_cb, read_cb); -    expected_exits++; -  } - -  // Keep running the loop until all three handles are completely closed -  while (pdata.exited < expected_exits) { -    event_poll(0); - -    if (got_int) { -      // Forward SIGINT to the shell -      // TODO(tarruda): for now this is only needed if the terminal is in raw -      // mode, but when the UI is externalized we'll also need it, so leave it -      // here -      uv_process_kill(&proc, SIGINT); -      got_int = false; -    } +  if (output) { +    write_output(output, nread); +    free(output);    } -  if (opts & kShellOptRead) { -    if (!GA_EMPTY(&pdata.ga)) { -      // If there's an unfinished line in the growable array, append it now. -      append_ga_line(&pdata.ga); -      // remember that the NL was missing -      curbuf->b_no_eol_lnum = curwin->w_cursor.lnum; -    } else { -      curbuf->b_no_eol_lnum = 0; -    } -    ga_clear(&pdata.ga); +  if (!emsg_silent && status != 0 && !(opts & kShellOptSilent)) { +    MSG_PUTS(_("\nshell returned ")); +    msg_outnum(status); +    msg_putchar('\n');    } -  if (opts & kShellOptWrite) { -    free(pdata.wbuffer); +  if (old_mode == TMODE_RAW) { +    // restore mode +    settmode(TMODE_RAW);    } +  State = current_state; +  signal_accept_deadly(); -  return proc_cleanup_exit(&pdata, &proc_opts, opts); +  return status;  }  /// os_system - synchronously execute a command in the shell @@ -258,8 +178,8 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg)  /// @param len The length of the input buffer (not used if `input` == NULL)  /// @param[out] output A pointer to to a location where the output will be  ///                    allocated and stored. Will point to NULL if the shell -///                    command did not output anything. NOTE: it's not -///                    allowed to pass NULL yet +///                    command did not output anything. If NULL is passed, +///                    the shell output will be ignored.  /// @param[out] nread the number of bytes in the returned buffer (if the  ///             returned buffer is not NULL)  /// @return the return code of the process, -1 if the process couldn't be @@ -268,26 +188,50 @@ int os_system(const char *cmd,                const char *input,                size_t len,                char **output, -              size_t *nread) FUNC_ATTR_NONNULL_ARG(1, 4) +              size_t *nread) FUNC_ATTR_NONNULL_ARG(1) +{ +  return shell(cmd, NULL, input, len, output, nread, true, false); +} + +static int shell(const char *cmd, +                 const char *extra_arg, +                 const char *input, +                 size_t len, +                 char **output, +                 size_t *nread, +                 bool silent, +                 bool forward_output) FUNC_ATTR_NONNULL_ARG(1)  {    // the output buffer -  dyn_buffer_t buf; -  memset(&buf, 0, sizeof(buf)); +  DynamicBuffer buf = DYNAMIC_BUFFER_INIT; +  rstream_cb data_cb = system_data_cb; -  char **argv = shell_build_argv((char_u *) cmd, NULL); +  if (forward_output) { +    data_cb = out_data_cb; +  } else if (!output) { +    data_cb = NULL; +  } + +  char **argv = shell_build_argv((char_u *) cmd, (char_u *)extra_arg); -  int i; +  int status;    Job *job = job_start(argv,                         &buf, -                       system_data_cb, -                       system_data_cb, +                       input != NULL, +                       data_cb, +                       data_cb,                         NULL,                         0, -                       &i); +                       &status); -  if (i <= 0) { -    // couldn't even start the job -    ELOG("Couldn't start job, error code: '%d'", i); +  if (status <= 0) { +    // Failed, probably due to `sh` not being executable +    ELOG("Couldn't start job, command: '%s', error code: '%d'", cmd, status); +    if (!silent) { +      MSG_PUTS(_("\nCannot execute shell ")); +      msg_outtrans(p_sh); +      msg_putchar('\n'); +    }      return -1;    } @@ -300,34 +244,37 @@ int os_system(const char *cmd,        job_stop(job);        return -1;      } +    // close the input stream, let the process know that no more input is +    // coming +    job_close_in(job);    } -  // close the input stream, let the process know that no more input is coming -  job_close_in(job); -  int status = job_wait(job, -1); +  status = job_wait(job, -1);    // prepare the out parameters if requested -  if (buf.len == 0) { -    // no data received from the process, return NULL -    *output = NULL; -    free(buf.data); -  } else { -    // NUL-terminate to make the output directly usable as a C string -    buf.data[buf.len] = NUL; -    *output = buf.data; -  } +  if (output) { +    if (buf.len == 0) { +      // no data received from the process, return NULL +      *output = NULL; +      free(buf.data); +    } else { +      // NUL-terminate to make the output directly usable as a C string +      buf.data[buf.len] = NUL; +      *output = buf.data; +    } -  if (nread) { -    *nread = buf.len; +    if (nread) { +      *nread = buf.len; +    }    }    return status;  } -/// dyn_buf_ensure - ensures at least `desired` bytes in buffer +///  - ensures at least `desired` bytes in buffer  ///  /// TODO(aktau): fold with kvec/garray -static void dyn_buf_ensure(dyn_buffer_t *buf, size_t desired) +static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)  {    if (buf->cap >= desired) {      return; @@ -341,16 +288,24 @@ static void dyn_buf_ensure(dyn_buffer_t *buf, size_t desired)  static void system_data_cb(RStream *rstream, void *data, bool eof)  {    Job *job = data; -  dyn_buffer_t *buf = job_data(job); +  DynamicBuffer *buf = job_data(job);    size_t nread = rstream_pending(rstream); -  dyn_buf_ensure(buf, buf->len + nread + 1); +  dynamic_buffer_ensure(buf, buf->len + nread + 1);    rstream_read(rstream, buf->data + buf->len, nread);    buf->len += nread;  } +static void out_data_cb(RStream *rstream, void *data, bool eof) +{ +  RBuffer *rbuffer = rstream_buffer(rstream); +  size_t len = rbuffer_pending(rbuffer); +  ui_write((char_u *)rbuffer_read_ptr(rbuffer), (int)len); +  rbuffer_consumed(rbuffer, len); +} +  /// Parses a command string into a sequence of words, taking quotes into  /// consideration.  /// @@ -411,24 +366,11 @@ static size_t word_length(const char_u *str)  /// event loop starts. If we don't(by writing in chunks returned by `ml_get`)  /// the buffer being modified might get modified by reading from the process  /// before we finish writing. -/// Queues selected range for writing to the child process stdin. -/// -/// @param req The structure containing information to peform the write -static void write_selection(uv_write_t *req) +static void read_input(DynamicBuffer *buf)  { -  ProcessData *pdata = (ProcessData *)req->data; -  // TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and -  // only after filled we should start allocating memory(skip unnecessary -  // allocations for small writes) -  size_t buflen = BUFFER_LENGTH; -  pdata->wbuffer = (char *)xmalloc(buflen); -  uv_buf_t uvbuf; +  size_t written = 0, l = 0, len = 0;    linenr_T lnum = curbuf->b_op_start.lnum; -  size_t off = 0; -  size_t written = 0; -  char_u      *lp = ml_get(lnum); -  size_t l; -  size_t len; +  char_u *lp = ml_get(lnum);    for (;;) {      l = strlen((char *)lp + written); @@ -437,26 +379,17 @@ static void write_selection(uv_write_t *req)      } else if (lp[written] == NL) {        // NL -> NUL translation        len = 1; -      if (off + len >= buflen) { -        // Resize the buffer -        buflen *= 2; -        pdata->wbuffer = xrealloc(pdata->wbuffer, buflen); -      } -      pdata->wbuffer[off++] = NUL; +      dynamic_buffer_ensure(buf, buf->len + len); +      buf->data[buf->len++] = NUL;      } else {        char_u  *s = vim_strchr(lp + written, NL);        len = s == NULL ? l : (size_t)(s - (lp + written)); -      while (off + len >= buflen) { -        // Resize the buffer -        buflen *= 2; -        pdata->wbuffer = xrealloc(pdata->wbuffer, buflen); -      } -      memcpy(pdata->wbuffer + off, lp + written, len); -      off += len; +      dynamic_buffer_ensure(buf, buf->len + len); +      memcpy(buf->data + buf->len, lp + written, len); +      buf->len += len;      }      if (len == l) { -      // Finished a line, add a NL, unless this line -      // should not have one. +      // Finished a line, add a NL, unless this line should not have one.        // FIXME need to make this more readable        if (lnum != curbuf->b_op_end.lnum            || !curbuf->b_p_bin @@ -464,12 +397,8 @@ static void write_selection(uv_write_t *req)              && (lnum !=                curbuf->b_ml.ml_line_count                || curbuf->b_p_eol))) { -        if (off + 1 >= buflen) { -          // Resize the buffer -          buflen *= 2; -          pdata->wbuffer = xrealloc(pdata->wbuffer, buflen); -        } -        pdata->wbuffer[off++] = NL; +        dynamic_buffer_ensure(buf, buf->len + 1); +        buf->data[buf->len++] = NL;        }        ++lnum;        if (lnum > curbuf->b_op_end.lnum) { @@ -481,112 +410,40 @@ static void write_selection(uv_write_t *req)        written += len;      }    } - -  uvbuf.base = pdata->wbuffer; -  uvbuf.len = off; - -  uv_write(req, pdata->shell_stdin, &uvbuf, 1, write_cb); -} - -// "Allocates" a buffer for reading from the shell stdout. -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) -{ -  ProcessData *pdata = (ProcessData *)handle->data; - -  if (pdata->reading) { -    buf->len = 0; -    return; -  } - -  buf->base = pdata->rbuffer; -  buf->len = BUFFER_LENGTH; -  // Avoid `alloc_cb`, `alloc_cb` sequences on windows -  pdata->reading = true;  } -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) +static void write_output(char *output, size_t remaining)  { -  // TODO(tarruda): avoid using a growable array for this, refactor the -  // algorithm to call `ml_append` directly(skip unnecessary copies/resizes) -  int i; -  ProcessData *pdata = (ProcessData *)stream->data; - -  if (cnt <= 0) { -    if (cnt != UV_ENOBUFS) { -      uv_read_stop(stream); -      uv_close((uv_handle_t *)stream, NULL); -      pdata->exited++; -    } +  if (!output) {      return;    } -  for (i = 0; i < cnt; ++i) { -    if (pdata->rbuffer[i] == NL) { +  size_t off = 0; +  while (off < remaining) { +    if (output[off] == NL) {        // Insert the line -      append_ga_line(&pdata->ga); -    } else if (pdata->rbuffer[i] == NUL) { -      // Translate NUL to NL -      ga_append(&pdata->ga, NL); -    } else { -      // buffer data into the grow array -      ga_append(&pdata->ga, pdata->rbuffer[i]); +      output[off] = NUL; +      ml_append(curwin->w_cursor.lnum++, (char_u *)output, 0, false); +      size_t skip = off + 1; +      output += skip; +      remaining -= skip; +      off = 0; +      continue;      } -  } - -  windgoto(msg_row, msg_col); -  cursor_on(); -  out_flush(); -  pdata->reading = false; -} - -static void write_cb(uv_write_t *req, int status) -{ -  ProcessData *pdata = (ProcessData *)req->data; -  uv_close((uv_handle_t *)pdata->shell_stdin, NULL); -  pdata->exited++; -} - -/// Cleanup memory and restore state modified by `os_call_shell`. -/// -/// @param data State shared by all functions collaborating with -///        `os_call_shell`. -/// @param opts Process spawning options, containing some allocated memory -/// @param shellopts Options passed to `os_call_shell`. Used for deciding -///        if/which messages are displayed. -static int proc_cleanup_exit(ProcessData *proc_data, -                             uv_process_options_t *proc_opts, -                             int shellopts) -{ -  if (proc_data->exited) { -    if (!emsg_silent && proc_data->exit_status != 0 && -        !(shellopts & kShellOptSilent)) { -      MSG_PUTS(_("\nshell returned ")); -      msg_outnum((int64_t)proc_data->exit_status); -      msg_putchar('\n'); +    if (output[off] == NUL) { +      // Translate NUL to NL +      output[off] = NL;      } +    off++;    } -  State = proc_data->old_state; - -  if (proc_data->old_mode == TMODE_RAW) { -    // restore mode -    settmode(TMODE_RAW); +  if (remaining) { +    // append unfinished line +    ml_append(curwin->w_cursor.lnum++, (char_u *)output, 0, false); +    // remember that the NL was missing +    curbuf->b_no_eol_lnum = curwin->w_cursor.lnum; +  } else { +    curbuf->b_no_eol_lnum = 0;    } - -  signal_accept_deadly(); - -  // Release argv memory -  shell_free_argv(proc_opts->args); - -  return proc_data->exit_status; -} - -static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) -{ -  ProcessData *data = (ProcessData *)proc->data; -  data->exited++; -  assert(status <= INT_MAX); -  data->exit_status = (int)status; -  uv_close((uv_handle_t *)proc, NULL);  } | 
