diff options
33 files changed, 765 insertions, 991 deletions
| diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 4173acefd9..ae6b99c336 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -83,8 +83,7 @@  #include "nvim/window.h"  #include "nvim/os/os.h"  #include "nvim/os/job.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" +#include "nvim/event/rstream.h"  #include "nvim/os/time.h"  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/server.h" @@ -20353,19 +20352,19 @@ static inline void push_job_event(Job *job, ufunc_T *callback,    }, !disable_job_defer);  } -static void on_job_stdout(RStream *rstream, RBuffer *buf, void *job, bool eof) +static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)  {    TerminalJobData *data = job_data(job); -  on_job_output(rstream, job, buf, eof, data->on_stdout, "stdout"); +  on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");  } -static void on_job_stderr(RStream *rstream, RBuffer *buf, void *job, bool eof) +static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)  {    TerminalJobData *data = job_data(job); -  on_job_output(rstream, job, buf, eof, data->on_stderr, "stderr"); +  on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");  } -static void on_job_output(RStream *rstream, Job *job, RBuffer *buf, bool eof, +static void on_job_output(Stream *stream, Job *job, RBuffer *buf, bool eof,      ufunc_T *callback, const char *type)  {    if (eof) { diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c new file mode 100644 index 0000000000..78c044347f --- /dev/null +++ b/src/nvim/event/rstream.c @@ -0,0 +1,167 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/rstream.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/memory.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/event/loop.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.c.generated.h" +#endif + +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) +  FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(loop, stream, fd, NULL, data); +  rstream_init(stream, bufsize); +} + +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) +  FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(NULL, stream, -1, uvstream, data); +  rstream_init(stream, bufsize); +} + +void rstream_init(Stream *stream, size_t bufsize) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  stream->buffer = rbuffer_new(bufsize); +  stream->buffer->data = stream; +  stream->buffer->full_cb = on_rbuffer_full; +  stream->buffer->nonfull_cb = on_rbuffer_nonfull; +} + + +/// Starts watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_start(Stream *stream, stream_read_cb cb) +{ +  stream->read_cb = cb; +  if (stream->uvstream) { +    uv_read_start(stream->uvstream, alloc_cb, read_cb); +  } else { +    uv_idle_start(&stream->uv.idle, fread_idle_cb); +  } +} + +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(Stream *stream) +{ +  if (stream->uvstream) { +    uv_read_stop(stream->uvstream); +  } else { +    uv_idle_stop(&stream->uv.idle); +  } +} + +static void on_rbuffer_full(RBuffer *buf, void *data) +{ +  rstream_stop(data); +} + +static void on_rbuffer_nonfull(RBuffer *buf, void *data) +{ +  Stream *stream = data; +  assert(stream->read_cb); +  rstream_start(stream, stream->read_cb); +} + +// Callbacks used by libuv + +// Called by libuv to allocate memory for reading. +static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ +  Stream *stream = handle->data; +  buf->base = rbuffer_write_ptr(stream->buffer, &buf->len); +} + +// Callback invoked by libuv after it copies the data into the buffer provided +// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a +// 0-length buffer. +static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) +{ +  Stream *stream = uvstream->data; + +  if (cnt <= 0) { +    if (cnt != UV_ENOBUFS +        // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: +        // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. +        // +        // We don't need to do anything with the RBuffer because the next call +        // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` +        // won't be called) +        && cnt != 0) { +      DLOG("Closing Stream(%p) because of %s(%zd)", stream, +           uv_strerror((int)cnt), cnt); +      // Read error or EOF, either way stop the stream and invoke the callback +      // with eof == true +      uv_read_stop(uvstream); +      stream->read_cb(stream, stream->buffer, stream->data, true); +    } +    return; +  } + +  // at this point we're sure that cnt is positive, no error occurred +  size_t nread = (size_t)cnt; +  // Data was already written, so all we need is to update 'wpos' to reflect +  // the space actually used in the buffer. +  rbuffer_produced(stream->buffer, nread); +  stream->read_cb(stream, stream->buffer, stream->data, false); +} + +// Called by the by the 'idle' handle to emulate a reading event +static void fread_idle_cb(uv_idle_t *handle) +{ +  uv_fs_t req; +  Stream *stream = handle->data; + +  stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &stream->uvbuf.len); + +  // the offset argument to uv_fs_read is int64_t, could someone really try +  // to read more than 9 quintillion (9e18) bytes? +  // upcast is meant to avoid tautological condition warning on 32 bits +  uintmax_t fpos_intmax = stream->fpos; +  if (fpos_intmax > INT64_MAX) { +    ELOG("stream offset overflow"); +    preserve_exit(); +  } + +  // Synchronous read +  uv_fs_read( +      handle->loop, +      &req, +      stream->fd, +      &stream->uvbuf, +      1, +      (int64_t) stream->fpos, +      NULL); + +  uv_fs_req_cleanup(&req); + +  if (req.result <= 0) { +    uv_idle_stop(&stream->uv.idle); +    stream->read_cb(stream, stream->buffer, stream->data, true); +    return; +  } + +  // no errors (req.result (ssize_t) is positive), it's safe to cast. +  size_t nread = (size_t) req.result; +  rbuffer_produced(stream->buffer, nread); +  stream->fpos += nread; +} diff --git a/src/nvim/event/rstream.h b/src/nvim/event/rstream.h new file mode 100644 index 0000000000..f30ad79ee5 --- /dev/null +++ b/src/nvim/event/rstream.h @@ -0,0 +1,16 @@ +#ifndef NVIM_EVENT_RSTREAM_H +#define NVIM_EVENT_RSTREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/rstream.h.generated.h" +#endif +#endif  // NVIM_EVENT_RSTREAM_H diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c new file mode 100644 index 0000000000..959b532146 --- /dev/null +++ b/src/nvim/event/stream.c @@ -0,0 +1,108 @@ +#include <assert.h> +#include <stdio.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/rbuffer.h" +#include "nvim/event/stream.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.c.generated.h" +#endif + +/// Sets the stream associated with `fd` to "blocking" mode. +/// +/// @return `0` on success, or `-errno` on failure. +int stream_set_blocking(int fd, bool blocking) +{ +  // Private loop to avoid conflict with existing watcher(s): +  //    uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. +  uv_loop_t loop; +  uv_pipe_t stream; +  uv_loop_init(&loop); +  uv_pipe_init(&loop, &stream, 0); +  uv_pipe_open(&stream, fd); +  int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); +  uv_close((uv_handle_t *)&stream, NULL); +  uv_run(&loop, UV_RUN_NOWAIT);  // not necessary, but couldn't hurt. +  uv_loop_close(&loop); +  return retval; +} + +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, +    void *data) +{ +  stream->uvstream = uvstream; + +  if (fd >= 0) { +    uv_handle_type type = uv_guess_handle(fd); +    stream->fd = fd; + +    if (type == UV_FILE) { +      // Non-blocking file reads are simulated with an idle handle that reads in +      // chunks of the ring buffer size, giving time for other events to be +      // processed between reads. +      uv_idle_init(&loop->uv, &stream->uv.idle); +      stream->uv.idle.data = stream; +    } else { +      assert(type == UV_NAMED_PIPE || type == UV_TTY); +      uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); +      uv_pipe_open(&stream->uv.pipe, fd); +      stream->uvstream = (uv_stream_t *)&stream->uv.pipe; +    } +  } + +  if (stream->uvstream) { +    stream->uvstream->data = stream; +  } + +  stream->data = data; +  stream->internal_data = NULL; +  stream->fpos = 0; +  stream->curmem = 0; +  stream->maxmem = 0; +  stream->pending_reqs = 0; +  stream->read_cb = NULL; +  stream->write_cb = NULL; +  stream->close_cb = NULL; +  stream->internal_close_cb = NULL; +  stream->closed = false; +  stream->buffer = NULL; +} + +void stream_close(Stream *stream, stream_close_cb on_stream_close) +{ +  assert(!stream->closed); + +  if (stream->buffer) { +    rbuffer_free(stream->buffer); +  } + +  stream->closed = true; +  stream->close_cb = on_stream_close; + +  if (!stream->pending_reqs) { +    stream_close_handle(stream); +  } +} + +void stream_close_handle(Stream *stream) +{ +  if (stream->uvstream) { +    uv_close((uv_handle_t *)stream->uvstream, close_cb); +  } else { +    uv_close((uv_handle_t *)&stream->uv.idle, close_cb); +  } +} + +static void close_cb(uv_handle_t *handle) +{ +  Stream *stream = handle->data; +  if (stream->close_cb) { +    stream->close_cb(stream, stream->data); +  } +  if (stream->internal_close_cb) { +    stream->internal_close_cb(stream, stream->internal_data); +  } +} diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h new file mode 100644 index 0000000000..37410b2036 --- /dev/null +++ b/src/nvim/event/stream.h @@ -0,0 +1,55 @@ +#ifndef NVIM_EVENT_STREAM_H +#define NVIM_EVENT_STREAM_H + +#include <stdbool.h> +#include <stddef.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/rbuffer.h" + +typedef struct stream Stream; +/// Type of function called when the Stream buffer is filled with data +/// +/// @param stream The Stream instance +/// @param rbuffer The associated RBuffer instance +/// @param data User-defined data +/// @param eof If the stream reached EOF. +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, +    bool eof); + +/// Type of function called when the Stream has information about a write +/// request. +/// +/// @param wstream The Stream instance +/// @param data User-defined data +/// @param status 0 on success, anything else indicates failure +typedef void (*stream_write_cb)(Stream *stream, void *data, int status); +typedef void (*stream_close_cb)(Stream *stream, void *data); + +struct stream { +  union { +    uv_pipe_t pipe; +    uv_tcp_t tcp; +    uv_idle_t idle; +  } uv; +  uv_stream_t *uvstream; +  uv_buf_t uvbuf; +  RBuffer *buffer; +  uv_file fd; +  stream_read_cb read_cb; +  stream_write_cb write_cb; +  stream_close_cb close_cb, internal_close_cb; +  size_t fpos; +  size_t curmem; +  size_t maxmem; +  size_t pending_reqs; +  void *data, *internal_data; +  bool closed; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/stream.h.generated.h" +#endif +#endif  // NVIM_EVENT_STREAM_H diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c new file mode 100644 index 0000000000..5fcb724fe3 --- /dev/null +++ b/src/nvim/event/wstream.c @@ -0,0 +1,162 @@ +#include <assert.h> +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/wstream.h" +#include "nvim/vim.h" +#include "nvim/memory.h" + +#define DEFAULT_MAXMEM 1024 * 1024 * 10 + +typedef struct { +  Stream *stream; +  WBuffer *buffer; +  uv_write_t uv_req; +} WRequest; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.c.generated.h" +#endif + +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(loop, stream, fd, NULL, data); +  wstream_init(stream, maxmem); +} + +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, +    void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) +{ +  stream_init(NULL, stream, -1, uvstream, data); +  wstream_init(stream, maxmem); +} + +void wstream_init(Stream *stream, size_t maxmem) +{ +  stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM; +} + +/// Sets a callback that will be called on completion of a write request, +/// indicating failure/success. +/// +/// This affects all requests currently in-flight as well. Overwrites any +/// possible earlier callback. +/// +/// @note This callback will not fire if the write request couldn't even be +///       queued properly (i.e.: when `wstream_write() returns an error`). +/// +/// @param stream The `Stream` instance +/// @param cb The callback +void wstream_set_write_cb(Stream *stream, stream_write_cb cb) +  FUNC_ATTR_NONNULL_ALL +{ +  stream->write_cb = cb; +} + +/// Queues data for writing to the backing file descriptor of a `Stream` +/// instance. This will fail if the write would cause the Stream use more +/// memory than specified by `maxmem`. +/// +/// @param stream The `Stream` instance +/// @param buffer The buffer which contains data to be written +/// @return false if the write failed +bool wstream_write(Stream *stream, WBuffer *buffer) +  FUNC_ATTR_NONNULL_ALL +{ +  assert(stream->maxmem); +  // This should not be called after a stream was freed +  assert(!stream->closed); + +  if (stream->curmem > stream->maxmem) { +    goto err; +  } + +  stream->curmem += buffer->size; + +  WRequest *data = xmalloc(sizeof(WRequest)); +  data->stream = stream; +  data->buffer = buffer; +  data->uv_req.data = data; + +  uv_buf_t uvbuf; +  uvbuf.base = buffer->data; +  uvbuf.len = buffer->size; + +  if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) { +    xfree(data); +    goto err; +  } + +  stream->pending_reqs++; +  return true; + +err: +  wstream_release_wbuffer(buffer); +  return false; +} + +/// Creates a WBuffer object for holding output data. Instances of this +/// object can be reused across Stream instances, and the memory is freed +/// automatically when no longer needed(it tracks the number of references +/// internally) +/// +/// @param data Data stored by the WBuffer +/// @param size The size of the data array +/// @param refcount The number of references for the WBuffer. This will be used +///        by Stream instances to decide when a WBuffer should be freed. +/// @param cb Pointer to function that will be responsible for freeing +///        the buffer data(passing 'free' will work as expected). +/// @return The allocated WBuffer instance +WBuffer *wstream_new_buffer(char *data, +                            size_t size, +                            size_t refcount, +                            wbuffer_data_finalizer cb) +{ +  WBuffer *rv = xmalloc(sizeof(WBuffer)); +  rv->size = size; +  rv->refcount = refcount; +  rv->cb = cb; +  rv->data = data; + +  return rv; +} + +static void write_cb(uv_write_t *req, int status) +{ +  WRequest *data = req->data; + +  data->stream->curmem -= data->buffer->size; + +  wstream_release_wbuffer(data->buffer); + +  if (data->stream->write_cb) { +    data->stream->write_cb(data->stream, data->stream->data, status); +  } + +  data->stream->pending_reqs--; + +  if (data->stream->closed && data->stream->pending_reqs == 0) { +    // Last pending write, free the stream; +    stream_close_handle(data->stream); +  } + +  xfree(data); +} + +void wstream_release_wbuffer(WBuffer *buffer) +{ +  if (!--buffer->refcount) { +    if (buffer->cb) { +      buffer->cb(buffer->data); +    } + +    xfree(buffer); +  } +} diff --git a/src/nvim/event/wstream.h b/src/nvim/event/wstream.h new file mode 100644 index 0000000000..9008de0d97 --- /dev/null +++ b/src/nvim/event/wstream.h @@ -0,0 +1,24 @@ +#ifndef NVIM_EVENT_WSTREAM_H +#define NVIM_EVENT_WSTREAM_H + +#include <stdint.h> +#include <stdbool.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/stream.h" + +typedef struct wbuffer WBuffer; +typedef void (*wbuffer_data_finalizer)(void *data); + +struct wbuffer { +  size_t size, refcount; +  char *data; +  wbuffer_data_finalizer cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/wstream.h.generated.h" +#endif +#endif  // NVIM_EVENT_WSTREAM_H diff --git a/src/nvim/ex_docmd.c b/src/nvim/ex_docmd.c index e53b2e47c5..3b6e05fd8a 100644 --- a/src/nvim/ex_docmd.c +++ b/src/nvim/ex_docmd.c @@ -73,8 +73,8 @@  #include "nvim/os/time.h"  #include "nvim/ex_cmds_defs.h"  #include "nvim/mouse.h" -#include "nvim/os/rstream.h" -#include "nvim/os/wstream.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  static int quitmore = 0;  static int ex_pressedreturn = FALSE; diff --git a/src/nvim/if_cscope.c b/src/nvim/if_cscope.c index 99926ecf16..d6c5cf4fd5 100644 --- a/src/nvim/if_cscope.c +++ b/src/nvim/if_cscope.c @@ -31,6 +31,7 @@  #include "nvim/window.h"  #include "nvim/os/os.h"  #include "nvim/os/input.h" +#include "nvim/event/stream.h"  #include <sys/types.h>  #include <sys/stat.h> diff --git a/src/nvim/misc1.c b/src/nvim/misc1.c index 0737caec5d..4f23b3da63 100644 --- a/src/nvim/misc1.c +++ b/src/nvim/misc1.c @@ -61,6 +61,7 @@  #include "nvim/os/shell.h"  #include "nvim/os/input.h"  #include "nvim/os/time.h" +#include "nvim/event/stream.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "misc1.c.generated.h" diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 3932fa1f36..05badc72d4 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -10,10 +10,8 @@  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/remote_ui.h"  #include "nvim/event/loop.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #include "nvim/os/job.h"  #include "nvim/os/job_defs.h"  #include "nvim/msgpack_rpc/helpers.h" @@ -34,6 +32,12 @@  #define log_server_msg(...)  #endif +typedef enum { +  kChannelTypeSocket, +  kChannelTypeJob, +  kChannelTypeStdio +} ChannelType; +  typedef struct {    uint64_t request_id;    bool returned, errored; @@ -45,15 +49,16 @@ typedef struct {    size_t refcount;    size_t pending_requests;    PMap(cstr_t) *subscribed_events; -  bool is_job, closed; +  bool closed; +  ChannelType type;    msgpack_unpacker *unpacker;    union {      Job *job; +    Stream stream;      struct { -      RStream *read; -      WStream *write; -      uv_stream_t *uv; -    } streams; +      Stream in; +      Stream out; +    } std;    } data;    uint64_t next_request_id;    kvec_t(ChannelCallFrame *) call_stack; @@ -112,8 +117,7 @@ void channel_teardown(void)  ///         0, on error.  uint64_t channel_from_job(char **argv)  { -  Channel *channel = register_channel(); -  channel->is_job = true; +  Channel *channel = register_channel(kChannelTypeJob);    incref(channel);  // job channels are only closed by the exit_cb    int status; @@ -140,21 +144,15 @@ uint64_t channel_from_job(char **argv)  /// pipe/socket client connection  ///  /// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) +void channel_from_stream(uv_stream_t *uvstream)  { -  Channel *channel = register_channel(); -  stream->data = NULL; -  channel->is_job = false; -  // read stream -  channel->data.streams.read = rstream_new(parse_msgpack, -                                           rbuffer_new(CHANNEL_BUFFER_SIZE), -                                           channel); -  rstream_set_stream(channel->data.streams.read, stream); -  rstream_start(channel->data.streams.read); +  Channel *channel = register_channel(kChannelTypeSocket); +  stream_init(NULL, &channel->data.stream, -1, uvstream, channel);    // write stream -  channel->data.streams.write = wstream_new(0); -  wstream_set_stream(channel->data.streams.write, stream); -  channel->data.streams.uv = stream; +  wstream_init(&channel->data.stream, 0); +  // read stream +  rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); +  rstream_start(&channel->data.stream, parse_msgpack);  }  /// Sends event/arguments to channel @@ -313,28 +311,23 @@ bool channel_close(uint64_t id)  /// Neovim  static void channel_from_stdio(void)  { -  Channel *channel = register_channel(); +  Channel *channel = register_channel(kChannelTypeStdio);    incref(channel);  // stdio channels are only closed on exit -  channel->is_job = false;    // read stream -  channel->data.streams.read = rstream_new(parse_msgpack, -                                           rbuffer_new(CHANNEL_BUFFER_SIZE), -                                           channel); -  rstream_set_file(channel->data.streams.read, 0); -  rstream_start(channel->data.streams.read); +  rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, +      channel); +  rstream_start(&channel->data.std.in, parse_msgpack);    // write stream -  channel->data.streams.write = wstream_new(0); -  wstream_set_file(channel->data.streams.write, 1); -  channel->data.streams.uv = NULL; +  wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);  } -static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof)  {    Job *job = data; -  parse_msgpack(rstream, buf, job_data(job), eof); +  parse_msgpack(stream, buf, job_data(job), eof);  } -static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof)  {    while (rbuffer_size(rbuf)) {      char buf[256]; @@ -350,7 +343,7 @@ static void job_exit(Job *job, int status, void *data)    decref(data);  } -static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)  {    Channel *channel = data;    incref(channel); @@ -362,9 +355,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)    }    size_t count = rbuffer_size(rbuf); -  DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", +  DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",         count, -       rstream); +       stream);    // Feed the unpacker with data    msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -516,10 +509,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer)      return false;    } -  if (channel->is_job) { -    success = job_write(channel->data.job, buffer); -  } else { -    success = wstream_write(channel->data.streams.write, buffer); +  switch (channel->type) { +    case kChannelTypeSocket: +      success = wstream_write(&channel->data.stream, buffer); +      break; +    case kChannelTypeJob: +      success = job_write(channel->data.job, buffer); +      break; +    case kChannelTypeStdio: +      success = wstream_write(&channel->data.std.out, buffer); +      break; +    default: +      abort();    }    if (!success) { @@ -637,20 +638,23 @@ static void close_channel(Channel *channel)    channel->closed = true; -  if (channel->is_job) { -    if (channel->data.job) { -      job_stop(channel->data.job); -    } -  } else { -    rstream_free(channel->data.streams.read); -    wstream_free(channel->data.streams.write); -    uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; -    if (handle) { -      uv_close(handle, close_cb); -    } else { +  switch (channel->type) { +    case kChannelTypeSocket: +      stream_close(&channel->data.stream, close_cb); +      break; +    case kChannelTypeJob: +      if (channel->data.job) { +        job_stop(channel->data.job); +      } +      break; +    case kChannelTypeStdio: +      stream_close(&channel->data.std.in, NULL); +      stream_close(&channel->data.std.out, NULL);        loop_push_event(&loop,            (Event) { .handler = on_stdio_close, .data = channel }, false); -    } +      break; +    default: +      abort();    }    decref(channel); @@ -683,15 +687,15 @@ static void free_channel(Channel *channel)    xfree(channel);  } -static void close_cb(uv_handle_t *handle) +static void close_cb(Stream *stream, void *data)  { -  xfree(handle->data); -  xfree(handle); +  xfree(data);  } -static Channel *register_channel(void) +static Channel *register_channel(ChannelType type)  {    Channel *rv = xmalloc(sizeof(Channel)); +  rv->type = type;    rv->refcount = 1;    rv->closed = false;    rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h index bf161d54e0..7d9f114140 100644 --- a/src/nvim/msgpack_rpc/helpers.h +++ b/src/nvim/msgpack_rpc/helpers.h @@ -6,7 +6,7 @@  #include <msgpack.h> -#include "nvim/os/wstream.h" +#include "nvim/event/wstream.h"  #include "nvim/api/private/defs.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 1449c56d59..b0e0f57e60 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -7,8 +7,7 @@  #include "nvim/api/private/defs.h"  #include "nvim/os/input.h"  #include "nvim/event/loop.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/rstream.h" +#include "nvim/event/rstream.h"  #include "nvim/ascii.h"  #include "nvim/vim.h"  #include "nvim/ui.h" @@ -30,8 +29,8 @@ typedef enum {    kInputEof  } InbufPollResult; -static RStream *read_stream = NULL; -static RBuffer *read_buffer = NULL, *input_buffer = NULL; +static Stream read_stream = {.closed = true}; +static RBuffer *input_buffer = NULL;  static bool input_eof = false;  static int global_fd = 0; @@ -54,26 +53,23 @@ int input_global_fd(void)  void input_start(int fd)  { -  if (read_stream) { +  if (!read_stream.closed) {      return;    }    global_fd = fd; -  read_buffer = rbuffer_new(READ_BUFFER_SIZE); -  read_stream = rstream_new(read_cb, read_buffer, NULL); -  rstream_set_file(read_stream, fd); -  rstream_start(read_stream); +  rstream_init_fd(&loop, &read_stream, fd, READ_BUFFER_SIZE, NULL); +  rstream_start(&read_stream, read_cb);  }  void input_stop(void)  { -  if (!read_stream) { +  if (read_stream.closed) {      return;    } -  rstream_stop(read_stream); -  rstream_free(read_stream); -  read_stream = NULL; +  rstream_stop(&read_stream); +  stream_close(&read_stream, NULL);  }  // Low level input function @@ -309,16 +305,16 @@ static InbufPollResult inbuf_poll(int ms)    return input_eof ? kInputEof : kInputNone;  } -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool at_eof) +static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof)  {    if (at_eof) {      input_eof = true;    } -  assert(rbuffer_space(input_buffer) >= rbuffer_size(read_buffer)); -  RBUFFER_UNTIL_EMPTY(read_buffer, ptr, len) { +  assert(rbuffer_space(input_buffer) >= rbuffer_size(buf)); +  RBUFFER_UNTIL_EMPTY(buf, ptr, len) {      (void)rbuffer_write(input_buffer, ptr, len); -    rbuffer_consumed(read_buffer, len); +    rbuffer_consumed(buf, len);    }  } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 7e90994fb3..71419cefca 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -6,15 +6,12 @@  #include "nvim/event/loop.h"  #include "nvim/event/time.h"  #include "nvim/event/signal.h" -#include "nvim/os/uv_helpers.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #include "nvim/os/job.h"  #include "nvim/os/job_defs.h"  #include "nvim/os/job_private.h"  #include "nvim/os/pty_process.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h"  #include "nvim/os/time.h"  #include "nvim/vim.h"  #include "nvim/memory.h" @@ -29,20 +26,16 @@  #define KILL_TIMEOUT (TERM_TIMEOUT * 2)  #define JOB_BUFFER_SIZE 0xFFFF -#define close_job_stream(job, stream, type)                                \ +#define close_job_stream(job, stream)                                      \    do {                                                                     \ -    if (job->stream) {                                                     \ -      type##stream_free(job->stream);                                      \ -      job->stream = NULL;                                                  \ -      if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) {          \ -        uv_close((uv_handle_t *)job->proc_std##stream, close_cb);          \ -      }                                                                    \ +    if (!job->stream.closed) {                                             \ +      stream_close(&job->stream, on_##stream_close);                       \      }                                                                      \    } while (0) -#define close_job_in(job) close_job_stream(job, in, w) -#define close_job_out(job) close_job_stream(job, out, r) -#define close_job_err(job) close_job_stream(job, err, r) +#define close_job_in(job) close_job_stream(job, in) +#define close_job_out(job) close_job_stream(job, out) +#define close_job_err(job) close_job_stream(job, err)  Job *table[MAX_RUNNING_JOBS] = {NULL};  size_t stop_requests = 0; @@ -118,63 +111,45 @@ Job *job_start(JobOptions opts, int *status)    job->refcount = 1;    job->stopped_time = 0;    job->term_sent = false; -  job->in = NULL; -  job->out = NULL; -  job->err = NULL;    job->opts = opts;    job->closed = false; - -  process_init(job); - -  if (opts.writable) { -    handle_set_job((uv_handle_t *)job->proc_stdin, job); -    job->refcount++; -  } - -  if (opts.stdout_cb) { -    handle_set_job((uv_handle_t *)job->proc_stdout, job); -    job->refcount++; -  } - -  if (opts.stderr_cb) { -    handle_set_job((uv_handle_t *)job->proc_stderr, job); -    job->refcount++; -  } +  job->in.closed = true; +  job->out.closed = true; +  job->err.closed = true;    // Spawn the job    if (!process_spawn(job)) { -    if (opts.writable) { -      uv_close((uv_handle_t *)job->proc_stdin, close_cb); +    if (job->opts.writable) { +      uv_close((uv_handle_t *)job->proc_stdin, NULL);      } -    if (opts.stdout_cb) { -      uv_close((uv_handle_t *)job->proc_stdout, close_cb); +    if (job->opts.stdout_cb) { +      uv_close((uv_handle_t *)job->proc_stdout, NULL);      } -    if (opts.stderr_cb) { -      uv_close((uv_handle_t *)job->proc_stderr, close_cb); +    if (job->opts.stderr_cb) { +      uv_close((uv_handle_t *)job->proc_stderr, NULL);      }      process_close(job);      loop_poll_events(&loop, 0); -    // Manually invoke the close_cb to free the job resources      *status = -1;      return NULL;    }    if (opts.writable) { -    job->in = wstream_new(opts.maxmem); -    wstream_set_stream(job->in, job->proc_stdin); +    job->refcount++; +    wstream_init_stream(&job->in, job->proc_stdin, opts.maxmem, job);    }    // Start the readable streams    if (opts.stdout_cb) { -    job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); -    rstream_set_stream(job->out, job->proc_stdout); -    rstream_start(job->out); +    job->refcount++; +    rstream_init_stream(&job->out, job->proc_stdout, JOB_BUFFER_SIZE, job); +    rstream_start(&job->out, read_cb);    }    if (opts.stderr_cb) { -    job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); -    rstream_set_stream(job->err, job->proc_stderr); -    rstream_start(job->err); +    job->refcount++; +    rstream_init_stream(&job->err, job->proc_stderr, JOB_BUFFER_SIZE, job); +    rstream_start(&job->err, read_cb);    }    // Save the job to the table    table[i] = job; @@ -217,7 +192,7 @@ void job_stop(Job *job)      // Close the job's stdin. If the job doesn't close its own stdout/stderr,      // they will be closed when the job exits(possibly due to being terminated      // after a timeout) -    close_job_in(job); +    job_close_in(job);    }    if (!stop_requests++) { @@ -315,9 +290,9 @@ void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL  /// @param job The job instance  /// @param cb The function that will be called on write completion or  ///        failure. It will be called with the job as the `data` argument. -void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL +void job_write_cb(Job *job, stream_write_cb cb) FUNC_ATTR_NONNULL_ALL  { -  wstream_set_write_cb(job->in, cb, job); +  wstream_set_write_cb(&job->in, cb);  }  /// Writes data to the job's stdin. This is a non-blocking operation, it @@ -329,7 +304,7 @@ void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL  ///         to the job stream failed (possibly because the OS buffer is full)  bool job_write(Job *job, WBuffer *buffer)  { -  return wstream_write(job->in, buffer); +  return wstream_write(&job->in, buffer);  }  /// Get the job id @@ -405,26 +380,26 @@ static void job_stop_timer_cb(TimeWatcher *watcher, void *data)  }  // Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  {    Job *job = data; -  if (rstream == job->out) { -    job->opts.stdout_cb(rstream, buf, data, eof); +  if (stream == &job->out) { +    job->opts.stdout_cb(stream, buf, data, eof);      if (eof) {        close_job_out(job);      }    } else { -    job->opts.stderr_cb(rstream, buf, data, eof); +    job->opts.stderr_cb(stream, buf, data, eof);      if (eof) {        close_job_err(job);      }    }  } -static void close_cb(uv_handle_t *handle) +static void on_stream_close(Stream *stream, void *data)  { -  job_decref(handle_get_job(handle)); +  job_decref(data);  }  static void job_exited(Event event) diff --git a/src/nvim/os/job.h b/src/nvim/os/job.h index ed102666d0..2f8bf79a31 100644 --- a/src/nvim/os/job.h +++ b/src/nvim/os/job.h @@ -10,10 +10,9 @@  #include <stdint.h>  #include <stdbool.h> -#include "nvim/os/rstream_defs.h"  #include "nvim/os/job_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "os/job.h.generated.h" diff --git a/src/nvim/os/job_defs.h b/src/nvim/os/job_defs.h index 7fee900ac0..ea7a326404 100644 --- a/src/nvim/os/job_defs.h +++ b/src/nvim/os/job_defs.h @@ -2,8 +2,9 @@  #define NVIM_OS_JOB_DEFS_H  #include <uv.h> -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream_defs.h" + +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #define MAX_RUNNING_JOBS 100  typedef struct job Job; @@ -29,10 +30,10 @@ typedef struct {    bool writable;    // Callback that will be invoked when data is available on stdout. If NULL    // stdout will be redirected to /dev/null. -  rstream_cb stdout_cb; +  stream_read_cb stdout_cb;    // Callback that will be invoked when data is available on stderr. If NULL    // stderr will be redirected to /dev/null. -  rstream_cb  stderr_cb; +  stream_read_cb stderr_cb;    // Callback that will be invoked when the job has exited and will not send    // data    job_exit_cb exit_cb; diff --git a/src/nvim/os/job_private.h b/src/nvim/os/job_private.h index b90f2d0171..6bdb24e6cd 100644 --- a/src/nvim/os/job_private.h +++ b/src/nvim/os/job_private.h @@ -6,8 +6,8 @@  #include <uv.h>  #include "nvim/event/time.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream_defs.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h"  #include "nvim/os/pipe_process.h"  #include "nvim/os/pty_process.h"  #include "nvim/os/shell.h" @@ -28,14 +28,15 @@ struct job {    uint64_t stopped_time;    // If SIGTERM was already sent to the job(only send one before SIGKILL)    bool term_sent; -  // Readable streams(std{out,err}) -  RStream *out, *err; -  // Writable stream(stdin) -  WStream *in; +  // stdio streams(std{in,out,err}) +  Stream in, out, err;    // Libuv streams representing stdin/stdout/stderr    uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr;    // Extra data set by the process spawner -  void *process; +  union { +    UvProcess uv; +    PtyProcess pty; +  } process;    // If process_close has been called on this job    bool closed;    // Startup options @@ -51,15 +52,6 @@ static inline bool process_spawn(Job *job)    return job->opts.pty ? pty_process_spawn(job) : pipe_process_spawn(job);  } -static inline void process_init(Job *job) -{ -  if (job->opts.pty) { -    pty_process_init(job); -  } else { -    pipe_process_init(job); -  } -} -  static inline void process_close(Job *job)  {    if (job->closed) { @@ -73,15 +65,6 @@ static inline void process_close(Job *job)    }  } -static inline void process_destroy(Job *job) -{ -  if (job->opts.pty) { -    pty_process_destroy(job); -  } else { -    pipe_process_destroy(job); -  } -} -  static inline void job_exit_callback(Job *job)  {    // Free the slot now, 'exit_cb' may want to start another job to replace @@ -106,11 +89,10 @@ static inline void job_decref(Job *job)      // Invoke the exit_cb      job_exit_callback(job);      // Free all memory allocated for the job -    xfree(job->proc_stdin->data); -    xfree(job->proc_stdout->data); -    xfree(job->proc_stderr->data);      shell_free_argv(job->opts.argv); -    process_destroy(job); +    if (job->opts.pty) { +      xfree(job->opts.term_name); +    }      xfree(job);    }  } diff --git a/src/nvim/os/os.h b/src/nvim/os/os.h index 3dd099890c..69bd1ff4fd 100644 --- a/src/nvim/os/os.h +++ b/src/nvim/os/os.h @@ -12,7 +12,6 @@  # include "os/mem.h.generated.h"  # include "os/env.h.generated.h"  # include "os/users.h.generated.h" -# include "os/stream.h.generated.h"  #endif  #endif  // NVIM_OS_OS_H diff --git a/src/nvim/os/pipe_process.c b/src/nvim/os/pipe_process.c index 9980cf7c56..1160015c34 100644 --- a/src/nvim/os/pipe_process.c +++ b/src/nvim/os/pipe_process.c @@ -3,7 +3,6 @@  #include <uv.h> -#include "nvim/os/uv_helpers.h"  #include "nvim/os/job.h"  #include "nvim/os/job_defs.h"  #include "nvim/os/job_private.h" @@ -16,17 +15,9 @@  # include "os/pipe_process.c.generated.h"  #endif -typedef struct { -  // Structures for process spawning/management used by libuv -  uv_process_t proc; -  uv_process_options_t proc_opts; -  uv_stdio_container_t stdio[3]; -  uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -} UvProcess; - -void pipe_process_init(Job *job) +bool pipe_process_spawn(Job *job)  { -  UvProcess *pipeproc = xmalloc(sizeof(UvProcess)); +  UvProcess *pipeproc = &job->process.uv;    pipeproc->proc_opts.file = job->opts.argv[0];    pipeproc->proc_opts.args = job->opts.argv;    pipeproc->proc_opts.stdio = pipeproc->stdio; @@ -45,7 +36,7 @@ void pipe_process_init(Job *job)    pipeproc->stdio[1].flags = UV_IGNORE;    pipeproc->stdio[2].flags = UV_IGNORE; -  handle_set_job((uv_handle_t *)&pipeproc->proc, job); +  pipeproc->proc.data = job;    if (job->opts.writable) {      uv_pipe_init(&loop.uv, &pipeproc->proc_stdin, 0); @@ -68,20 +59,6 @@ void pipe_process_init(Job *job)    job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin;    job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout;    job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr; -  job->process = pipeproc; -} - -void pipe_process_destroy(Job *job) -{ -  UvProcess *pipeproc = job->process; -  xfree(pipeproc->proc.data); -  xfree(pipeproc); -  job->process = NULL; -} - -bool pipe_process_spawn(Job *job) -{ -  UvProcess *pipeproc = job->process;    if (uv_spawn(&loop.uv, &pipeproc->proc, &pipeproc->proc_opts) != 0) {      return false; @@ -93,20 +70,19 @@ bool pipe_process_spawn(Job *job)  void pipe_process_close(Job *job)  { -  UvProcess *pipeproc = job->process; -  uv_close((uv_handle_t *)&pipeproc->proc, close_cb); +  uv_close((uv_handle_t *)&job->process.uv.proc, close_cb);  }  static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)  { -  Job *job = handle_get_job((uv_handle_t *)proc); +  Job *job = proc->data;    job->status = (int)status;    pipe_process_close(job);  }  static void close_cb(uv_handle_t *handle)  { -  Job *job = handle_get_job(handle); +  Job *job = handle->data;    job_close_streams(job);    job_decref(job);  } diff --git a/src/nvim/os/pipe_process.h b/src/nvim/os/pipe_process.h index 17a4255ddc..65e5cfa78f 100644 --- a/src/nvim/os/pipe_process.h +++ b/src/nvim/os/pipe_process.h @@ -1,6 +1,16 @@  #ifndef NVIM_OS_PIPE_PROCESS_H  #define NVIM_OS_PIPE_PROCESS_H +#include <uv.h> + +typedef struct { +  // Structures for process spawning/management used by libuv +  uv_process_t proc; +  uv_process_options_t proc_opts; +  uv_stdio_container_t stdio[3]; +  uv_pipe_t proc_stdin, proc_stdout, proc_stderr; +} UvProcess; +  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "os/pipe_process.h.generated.h"  #endif diff --git a/src/nvim/os/pty_process.c b/src/nvim/os/pty_process.c index ca74ebfddd..a8fff2a61d 100644 --- a/src/nvim/os/pty_process.c +++ b/src/nvim/os/pty_process.c @@ -33,15 +33,12 @@  # include "os/pty_process.c.generated.h"  #endif -typedef struct { -  struct winsize winsize; -  uv_pipe_t proc_stdin, proc_stdout, proc_stderr; -  int tty_fd; -} PtyProcess; +static const unsigned int KILL_RETRIES = 5; +static const unsigned int KILL_TIMEOUT = 2;  // seconds -void pty_process_init(Job *job) FUNC_ATTR_NONNULL_ALL +bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL  { -  PtyProcess *ptyproc = xmalloc(sizeof(PtyProcess)); +  PtyProcess *ptyproc = &job->process.pty;    ptyproc->tty_fd = -1;    if (job->opts.writable) { @@ -62,41 +59,8 @@ void pty_process_init(Job *job) FUNC_ATTR_NONNULL_ALL    job->proc_stdin = (uv_stream_t *)&ptyproc->proc_stdin;    job->proc_stdout = (uv_stream_t *)&ptyproc->proc_stdout;    job->proc_stderr = (uv_stream_t *)&ptyproc->proc_stderr; -  job->process = ptyproc; -} - -void pty_process_destroy(Job *job) FUNC_ATTR_NONNULL_ALL -{ -  xfree(job->opts.term_name); -  xfree(job->process); -  job->process = NULL; -} - -static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe) -  FUNC_ATTR_NONNULL_ALL -{ -  int fd_dup = dup(fd); -  if (fd_dup < 0) { -    ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); -    return false; -  } -  int uv_result = uv_pipe_open(pipe, fd_dup); -  if (uv_result) { -    ELOG("Failed to set pipe to descriptor %d: %s", -         fd_dup, uv_strerror(uv_result)); -    close(fd_dup); -    return false; -  } -  return true; -} - -static const unsigned int KILL_RETRIES = 5; -static const unsigned int KILL_TIMEOUT = 2;  // seconds -bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL -{    int master; -  PtyProcess *ptyproc = job->process;    ptyproc->winsize = (struct winsize){job->opts.height, job->opts.width, 0, 0};    struct termios termios;    init_termios(&termios); @@ -158,6 +122,24 @@ error:    return false;  } +static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe) +  FUNC_ATTR_NONNULL_ALL +{ +  int fd_dup = dup(fd); +  if (fd_dup < 0) { +    ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno)); +    return false; +  } +  int uv_result = uv_pipe_open(pipe, fd_dup); +  if (uv_result) { +    ELOG("Failed to set pipe to descriptor %d: %s", +         fd_dup, uv_strerror(uv_result)); +    close(fd_dup); +    return false; +  } +  return true; +} +  void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL  {    pty_process_close_master(job); @@ -167,7 +149,7 @@ void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL  void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL  { -  PtyProcess *ptyproc = job->process; +  PtyProcess *ptyproc = &job->process.pty;    if (ptyproc->tty_fd >= 0) {      close(ptyproc->tty_fd);      ptyproc->tty_fd = -1; @@ -177,7 +159,7 @@ void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL  void pty_process_resize(Job *job, uint16_t width, uint16_t height)    FUNC_ATTR_NONNULL_ALL  { -  PtyProcess *ptyproc = job->process; +  PtyProcess *ptyproc = &job->process.pty;    ptyproc->winsize = (struct winsize){height, width, 0, 0};    ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);  } diff --git a/src/nvim/os/pty_process.h b/src/nvim/os/pty_process.h index 62fcd1671f..b5a2eba8b3 100644 --- a/src/nvim/os/pty_process.h +++ b/src/nvim/os/pty_process.h @@ -1,6 +1,16 @@  #ifndef NVIM_OS_PTY_PROCESS_H  #define NVIM_OS_PTY_PROCESS_H +#include <sys/ioctl.h> + +#include <uv.h> + +typedef struct { +  struct winsize winsize; +  uv_pipe_t proc_stdin, proc_stdout, proc_stderr; +  int tty_fd; +} PtyProcess; +  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "os/pty_process.h.generated.h"  #endif diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c deleted file mode 100644 index dd91c2777e..0000000000 --- a/src/nvim/os/rstream.c +++ /dev/null @@ -1,253 +0,0 @@ -#include <assert.h> -#include <stdint.h> -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/rstream.h" -#include "nvim/ascii.h" -#include "nvim/vim.h" -#include "nvim/memory.h" -#include "nvim/log.h" -#include "nvim/misc1.h" - -struct rstream { -  void *data; -  uv_buf_t uvbuf; -  size_t fpos; -  RBuffer *buffer; -  uv_stream_t *stream; -  uv_idle_t *fread_idle; -  uv_handle_type file_type; -  uv_file fd; -  rstream_cb cb; -  bool free_handle; -}; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/rstream.c.generated.h" -#endif - -/// Creates a new RStream instance. A RStream encapsulates all the boilerplate -/// necessary for reading from a libuv stream. -/// -/// @param cb A function that will be called whenever some data is available -///        for reading with `rstream_read` -/// @param buffer RBuffer instance to associate with the RStream -/// @param data Some state to associate with the `RStream` instance -/// @return The newly-allocated `RStream` instance -RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) -{ -  RStream *rv = xmalloc(sizeof(RStream)); -  buffer->data = rv; -  buffer->full_cb = on_rbuffer_full; -  buffer->nonfull_cb = on_rbuffer_nonfull; -  rv->buffer = buffer; -  rv->fpos = 0; -  rv->data = data; -  rv->cb = cb; -  rv->stream = NULL; -  rv->fread_idle = NULL; -  rv->free_handle = false; -  rv->file_type = UV_UNKNOWN_HANDLE; - -  return rv; -} - -static void on_rbuffer_full(RBuffer *buf, void *data) -{ -  rstream_stop(data); -} - -static void on_rbuffer_nonfull(RBuffer *buf, void *data) -{ -  rstream_start(data); -} - -/// Frees all memory allocated for a RStream instance -/// -/// @param rstream The `RStream` instance -void rstream_free(RStream *rstream) -{ -  if (rstream->free_handle) { -    if (rstream->fread_idle != NULL) { -      uv_close((uv_handle_t *)rstream->fread_idle, close_cb); -    } else { -      uv_close((uv_handle_t *)rstream->stream, close_cb); -    } -  } - -  rbuffer_free(rstream->buffer); -  xfree(rstream); -} - -/// Sets the underlying `uv_stream_t` instance -/// -/// @param rstream The `RStream` instance -/// @param stream The new `uv_stream_t` instance -void rstream_set_stream(RStream *rstream, uv_stream_t *stream) -{ -  handle_set_rstream((uv_handle_t *)stream, rstream); -  rstream->stream = stream; -} - -/// Sets the underlying file descriptor that will be read from. Only pipes -/// and regular files are supported for now. -/// -/// @param rstream The `RStream` instance -/// @param file The file descriptor -void rstream_set_file(RStream *rstream, uv_file file) -{ -  rstream->file_type = uv_guess_handle(file); - -  if (rstream->free_handle) { -    // If this is the second time we're calling this function, free the -    // previously allocated memory -    if (rstream->fread_idle != NULL) { -      uv_close((uv_handle_t *)rstream->fread_idle, close_cb); -      rstream->fread_idle = NULL; -    } else { -      uv_close((uv_handle_t *)rstream->stream, close_cb); -      rstream->stream = NULL; -    } -  } - -  if (rstream->file_type == UV_FILE) { -    // Non-blocking file reads are simulated with an idle handle that reads -    // in chunks of rstream->buffer_size, giving time for other events to -    // be processed between reads. -    rstream->fread_idle = xmalloc(sizeof(uv_idle_t)); -    uv_idle_init(&loop.uv, rstream->fread_idle); -    rstream->fread_idle->data = NULL; -    handle_set_rstream((uv_handle_t *)rstream->fread_idle, rstream); -  } else { -    // Only pipes are supported for now -    assert(rstream->file_type == UV_NAMED_PIPE -        || rstream->file_type == UV_TTY); -    rstream->stream = xmalloc(sizeof(uv_pipe_t)); -    uv_pipe_init(&loop.uv, (uv_pipe_t *)rstream->stream, 0); -    uv_pipe_open((uv_pipe_t *)rstream->stream, file); -    rstream->stream->data = NULL; -    handle_set_rstream((uv_handle_t *)rstream->stream, rstream); -  } - -  rstream->fd = file; -  rstream->free_handle = true; -} - -/// Starts watching for events from a `RStream` instance. -/// -/// @param rstream The `RStream` instance -void rstream_start(RStream *rstream) -{ -  if (rstream->file_type == UV_FILE) { -    uv_idle_start(rstream->fread_idle, fread_idle_cb); -  } else { -    uv_read_start(rstream->stream, alloc_cb, read_cb); -  } -} - -/// Stops watching for events from a `RStream` instance. -/// -/// @param rstream The `RStream` instance -void rstream_stop(RStream *rstream) -{ -  if (rstream->file_type == UV_FILE) { -    uv_idle_stop(rstream->fread_idle); -  } else { -    uv_read_stop(rstream->stream); -  } -} - -// Callbacks used by libuv - -// Called by libuv to allocate memory for reading. -static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) -{ -  RStream *rstream = handle_get_rstream(handle); -  buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len); -} - -// Callback invoked by libuv after it copies the data into the buffer provided -// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a -// 0-length buffer. -static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) -{ -  RStream *rstream = handle_get_rstream((uv_handle_t *)stream); - -  if (cnt <= 0) { -    if (cnt != UV_ENOBUFS -        // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: -        // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. -        // -        // We don't need to do anything with the RBuffer because the next call -        // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` -        // won't be called) -        && cnt != 0) { -      DLOG("Closing RStream(%p) because of %s(%zd)", rstream, -           uv_strerror((int)cnt), cnt); -      // Read error or EOF, either way stop the stream and invoke the callback -      // with eof == true -      uv_read_stop(stream); -      rstream->cb(rstream, rstream->buffer, rstream->data, true); -    } -    return; -  } - -  // at this point we're sure that cnt is positive, no error occurred -  size_t nread = (size_t)cnt; -  // Data was already written, so all we need is to update 'wpos' to reflect -  // the space actually used in the buffer. -  rbuffer_produced(rstream->buffer, nread); -  rstream->cb(rstream, rstream->buffer, rstream->data, false); -} - -// Called by the by the 'idle' handle to emulate a reading event -static void fread_idle_cb(uv_idle_t *handle) -{ -  uv_fs_t req; -  RStream *rstream = handle_get_rstream((uv_handle_t *)handle); - -  rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len); - -  // the offset argument to uv_fs_read is int64_t, could someone really try -  // to read more than 9 quintillion (9e18) bytes? -  // upcast is meant to avoid tautological condition warning on 32 bits -  uintmax_t fpos_intmax = rstream->fpos; -  if (fpos_intmax > INT64_MAX) { -    ELOG("stream offset overflow"); -    preserve_exit(); -  } - -  // Synchronous read -  uv_fs_read( -      &loop.uv, -      &req, -      rstream->fd, -      &rstream->uvbuf, -      1, -      (int64_t) rstream->fpos, -      NULL); - -  uv_fs_req_cleanup(&req); - -  if (req.result <= 0) { -    uv_idle_stop(rstream->fread_idle); -    rstream->cb(rstream, rstream->buffer, rstream->data, true); -    return; -  } - -  // no errors (req.result (ssize_t) is positive), it's safe to cast. -  size_t nread = (size_t) req.result; -  rbuffer_produced(rstream->buffer, nread); -  rstream->fpos += nread; -} - -static void close_cb(uv_handle_t *handle) -{ -  xfree(handle->data); -  xfree(handle); -} diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h deleted file mode 100644 index 7da77b33fa..0000000000 --- a/src/nvim/os/rstream.h +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef NVIM_OS_RSTREAM_H -#define NVIM_OS_RSTREAM_H - -#include <stdbool.h> -#include <stdint.h> -#include <uv.h> -#include "nvim/os/rstream_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/rstream.h.generated.h" -#endif -#endif  // NVIM_OS_RSTREAM_H diff --git a/src/nvim/os/rstream_defs.h b/src/nvim/os/rstream_defs.h deleted file mode 100644 index 45dced0b62..0000000000 --- a/src/nvim/os/rstream_defs.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef NVIM_OS_RSTREAM_DEFS_H -#define NVIM_OS_RSTREAM_DEFS_H - -#include <stdbool.h> - -#include "nvim/rbuffer.h" - -typedef struct rstream RStream; - -/// Type of function called when the RStream receives data -/// -/// @param rstream The RStream instance -/// @param rbuffer The associated RBuffer instance -/// @param data State associated with the RStream instance -/// @param eof If the stream reached EOF. -typedef void (*rstream_cb)(RStream *rstream, RBuffer *buf, void *data, -    bool eof); - -#endif  // NVIM_OS_RSTREAM_DEFS_H - diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 9c0d5fca67..04ac9f1c03 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -10,7 +10,7 @@  #include "nvim/log.h"  #include "nvim/event/loop.h"  #include "nvim/os/job.h" -#include "nvim/os/rstream.h" +#include "nvim/event/rstream.h"  #include "nvim/os/shell.h"  #include "nvim/os/signal.h"  #include "nvim/types.h" @@ -189,7 +189,7 @@ static int do_os_system(char **argv,  {    // the output buffer    DynamicBuffer buf = DYNAMIC_BUFFER_INIT; -  rstream_cb data_cb = system_data_cb; +  stream_read_cb data_cb = system_data_cb;    if (nread) {      *nread = 0;    } @@ -283,7 +283,7 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)    buf->data = xrealloc(buf->data, buf->cap);  } -static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  {    Job *job = data;    DynamicBuffer *dbuf = job_data(job); @@ -294,7 +294,7 @@ static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)    dbuf->len += nread;  } -static void out_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  {    RBUFFER_UNTIL_EMPTY(buf, ptr, len) {      size_t written = write_output(ptr, len, false, @@ -470,7 +470,7 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,    return (size_t)(output - start);  } -static void shell_write_cb(WStream *wstream, void *data, int status) +static void shell_write_cb(Stream *stream, void *data, int status)  {    Job *job = data;    job_close_in(job); diff --git a/src/nvim/os/stream.c b/src/nvim/os/stream.c deleted file mode 100644 index 0c448872c3..0000000000 --- a/src/nvim/os/stream.c +++ /dev/null @@ -1,30 +0,0 @@ -// Functions for working with stdio streams (as opposed to RStream/WStream). - -#include <stdio.h> -#include <stdbool.h> - -#include <uv.h> - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/stream.c.generated.h" -#endif - -/// Sets the stream associated with `fd` to "blocking" mode. -/// -/// @return `0` on success, or `-errno` on failure. -int stream_set_blocking(int fd, bool blocking) -{ -  // Private loop to avoid conflict with existing watcher(s): -  //    uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. -  uv_loop_t loop; -  uv_pipe_t stream; -  uv_loop_init(&loop); -  uv_pipe_init(&loop, &stream, 0); -  uv_pipe_open(&stream, fd); -  int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); -  uv_close((uv_handle_t *)&stream, NULL); -  uv_run(&loop, UV_RUN_NOWAIT);  // not necessary, but couldn't hurt. -  uv_loop_close(&loop); -  return retval; -} - diff --git a/src/nvim/os/uv_helpers.c b/src/nvim/os/uv_helpers.c deleted file mode 100644 index 89687bdac7..0000000000 --- a/src/nvim/os/uv_helpers.c +++ /dev/null @@ -1,98 +0,0 @@ -#include <assert.h> -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -/// Common structure that will always be assigned to the `data` field of -/// libuv handles. It has fields for many types of pointers, and allow a single -/// handle to contain data from many sources -typedef struct { -  WStream *wstream; -  RStream *rstream; -  Job *job; -} HandleData; - - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/uv_helpers.c.generated.h" -#endif - -/// Gets the RStream instance associated with a libuv handle -/// -/// @param handle libuv handle -/// @return the RStream pointer -RStream *handle_get_rstream(uv_handle_t *handle) -{ -  RStream *rv = init(handle)->rstream; -  assert(rv != NULL); -  return rv; -} - -/// Associates a RStream instance with a libuv handle -/// -/// @param handle libuv handle -/// @param rstream the RStream pointer -void handle_set_rstream(uv_handle_t *handle, RStream *rstream) -{ -  init(handle)->rstream = rstream; -} - -/// Gets the WStream instance associated with a libuv handle -/// -/// @param handle libuv handle -/// @return the WStream pointer -WStream *handle_get_wstream(uv_handle_t *handle) -{ -  WStream *rv = init(handle)->wstream; -  assert(rv != NULL); -  return rv; -} - -/// Associates a WStream instance with a libuv handle -/// -/// @param handle libuv handle -/// @param wstream the WStream pointer -void handle_set_wstream(uv_handle_t *handle, WStream *wstream) -{ -  HandleData *data = init(handle); -  data->wstream = wstream; -} - -/// Gets the Job instance associated with a libuv handle -/// -/// @param handle libuv handle -/// @return the Job pointer -Job *handle_get_job(uv_handle_t *handle) -{ -  Job *rv = init(handle)->job; -  assert(rv != NULL); -  return rv; -} - -/// Associates a Job instance with a libuv handle -/// -/// @param handle libuv handle -/// @param job the Job pointer -void handle_set_job(uv_handle_t *handle, Job *job) -{ -  init(handle)->job = job; -} - -static HandleData *init(uv_handle_t *handle) -{ -  HandleData *rv; - -  if (handle->data == NULL) { -    rv = xmalloc(sizeof(HandleData)); -    rv->rstream = NULL; -    rv->wstream = NULL; -    rv->job = NULL; -    handle->data = rv; -  } else { -    rv = handle->data; -  } - -  return rv; -} diff --git a/src/nvim/os/uv_helpers.h b/src/nvim/os/uv_helpers.h deleted file mode 100644 index b49656bcb8..0000000000 --- a/src/nvim/os/uv_helpers.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef NVIM_OS_UV_HELPERS_H -#define NVIM_OS_UV_HELPERS_H - -#include <uv.h> - -#include "nvim/os/wstream_defs.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/job_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/uv_helpers.h.generated.h" -#endif -#endif  // NVIM_OS_UV_HELPERS_H diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c deleted file mode 100644 index 7f5191947a..0000000000 --- a/src/nvim/os/wstream.c +++ /dev/null @@ -1,243 +0,0 @@ -#include <assert.h> -#include <stdint.h> -#include <stdbool.h> -#include <stdlib.h> - -#include <uv.h> - -#include "nvim/os/uv_helpers.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/vim.h" -#include "nvim/memory.h" - -#define DEFAULT_MAXMEM 1024 * 1024 * 10 - -struct wstream { -  uv_stream_t *stream; -  // Memory currently used by pending buffers -  size_t curmem; -  // Maximum memory used by this instance -  size_t maxmem; -  // Number of pending requests -  size_t pending_reqs; -  bool freed, free_handle; -  // (optional) Write callback and data -  wstream_cb cb; -  void *data; -}; - -struct wbuffer { -  size_t size, refcount; -  char *data; -  wbuffer_data_finalizer cb; -}; - -typedef struct { -  WStream *wstream; -  WBuffer *buffer; -  uv_write_t uv_req; -} WRequest; - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/wstream.c.generated.h" -#endif - -/// Creates a new WStream instance. A WStream encapsulates all the boilerplate -/// necessary for writing to a libuv stream. -/// -/// @param maxmem Maximum amount memory used by this `WStream` instance. If 0, -///        a default value of 10mb will be used. -/// @return The newly-allocated `WStream` instance -WStream * wstream_new(size_t maxmem) -{ -  if (!maxmem) { -    maxmem = DEFAULT_MAXMEM; -  } - -  WStream *rv = xmalloc(sizeof(WStream)); -  rv->maxmem = maxmem; -  rv->stream = NULL; -  rv->curmem = 0; -  rv->pending_reqs = 0; -  rv->freed = false; -  rv->free_handle = false; -  rv->cb = NULL; - -  return rv; -} - -/// Frees all memory allocated for a WStream instance -/// -/// @param wstream The `WStream` instance -void wstream_free(WStream *wstream) { -  if (!wstream->pending_reqs) { -    if (wstream->free_handle) { -      uv_close((uv_handle_t *)wstream->stream, close_cb); -    } else { -      handle_set_wstream((uv_handle_t *)wstream->stream, NULL); -      xfree(wstream); -    } -  } else { -    wstream->freed = true; -  } -} - -/// Sets the underlying `uv_stream_t` instance -/// -/// @param wstream The `WStream` instance -/// @param stream The new `uv_stream_t` instance -void wstream_set_stream(WStream *wstream, uv_stream_t *stream) -{ -  handle_set_wstream((uv_handle_t *)stream, wstream); -  wstream->stream = stream; -} - -/// Sets the underlying file descriptor that will be written to. Only pipes -/// are supported for now. -/// -/// @param wstream The `WStream` instance -/// @param file The file descriptor -void wstream_set_file(WStream *wstream, uv_file file) -{ -  assert(uv_guess_handle(file) == UV_NAMED_PIPE || -         uv_guess_handle(file) == UV_TTY); -  wstream->stream = xmalloc(sizeof(uv_pipe_t)); -  uv_pipe_init(&loop.uv, (uv_pipe_t *)wstream->stream, 0); -  uv_pipe_open((uv_pipe_t *)wstream->stream, file); -  wstream->stream->data = NULL; -  handle_set_wstream((uv_handle_t *)wstream->stream, wstream); -  wstream->free_handle = true; -} - -/// Sets a callback that will be called on completion of a write request, -/// indicating failure/success. -/// -/// This affects all requests currently in-flight as well. Overwrites any -/// possible earlier callback. -/// -/// @note This callback will not fire if the write request couldn't even be -///       queued properly (i.e.: when `wstream_write() returns an error`). -/// -/// @param wstream The `WStream` instance -/// @param cb The callback -/// @param data User-provided data that will be passed to `cb` -void wstream_set_write_cb(WStream *wstream, wstream_cb cb, void *data) -  FUNC_ATTR_NONNULL_ARG(1) -{ -  wstream->cb = cb; -  wstream->data = data; -} - -/// Queues data for writing to the backing file descriptor of a `WStream` -/// instance. This will fail if the write would cause the WStream use more -/// memory than specified by `maxmem`. -/// -/// @param wstream The `WStream` instance -/// @param buffer The buffer which contains data to be written -/// @return false if the write failed -bool wstream_write(WStream *wstream, WBuffer *buffer) -{ -  // This should not be called after a wstream was freed -  assert(!wstream->freed); - -  if (wstream->curmem > wstream->maxmem) { -    goto err; -  } - -  wstream->curmem += buffer->size; - -  WRequest *data = xmalloc(sizeof(WRequest)); -  data->wstream = wstream; -  data->buffer = buffer; -  data->uv_req.data = data; - -  uv_buf_t uvbuf; -  uvbuf.base = buffer->data; -  uvbuf.len = buffer->size; - -  if (uv_write(&data->uv_req, wstream->stream, &uvbuf, 1, write_cb)) { -    xfree(data); -    goto err; -  } - -  wstream->pending_reqs++; -  return true; - -err: -  wstream_release_wbuffer(buffer); -  return false; -} - -/// Creates a WBuffer object for holding output data. Instances of this -/// object can be reused across WStream instances, and the memory is freed -/// automatically when no longer needed(it tracks the number of references -/// internally) -/// -/// @param data Data stored by the WBuffer -/// @param size The size of the data array -/// @param refcount The number of references for the WBuffer. This will be used -///        by WStream instances to decide when a WBuffer should be freed. -/// @param cb Pointer to function that will be responsible for freeing -///        the buffer data(passing 'free' will work as expected). -/// @return The allocated WBuffer instance -WBuffer *wstream_new_buffer(char *data, -                            size_t size, -                            size_t refcount, -                            wbuffer_data_finalizer cb) -{ -  WBuffer *rv = xmalloc(sizeof(WBuffer)); -  rv->size = size; -  rv->refcount = refcount; -  rv->cb = cb; -  rv->data = data; - -  return rv; -} - -static void write_cb(uv_write_t *req, int status) -{ -  WRequest *data = req->data; - -  data->wstream->curmem -= data->buffer->size; - -  wstream_release_wbuffer(data->buffer); - -  if (data->wstream->cb) { -    data->wstream->cb(data->wstream, -                      data->wstream->data, -                      status); -  } - -  data->wstream->pending_reqs--; - -  if (data->wstream->freed && data->wstream->pending_reqs == 0) { -    // Last pending write, free the wstream; -    if (data->wstream->free_handle) { -      uv_close((uv_handle_t *)data->wstream->stream, close_cb); -    } else { -      xfree(data->wstream); -    } -  } - -  xfree(data); -} - -void wstream_release_wbuffer(WBuffer *buffer) -{ -  if (!--buffer->refcount) { -    if (buffer->cb) { -      buffer->cb(buffer->data); -    } - -    xfree(buffer); -  } -} - -static void close_cb(uv_handle_t *handle) -{ -  xfree(handle_get_wstream(handle)); -  xfree(handle->data); -  xfree(handle); -} - diff --git a/src/nvim/os/wstream.h b/src/nvim/os/wstream.h deleted file mode 100644 index d0e9bef93a..0000000000 --- a/src/nvim/os/wstream.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef NVIM_OS_WSTREAM_H -#define NVIM_OS_WSTREAM_H - -#include <stdint.h> -#include <stdbool.h> -#include <uv.h> - -#include "nvim/os/wstream_defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/wstream.h.generated.h" -#endif -#endif  // NVIM_OS_WSTREAM_H diff --git a/src/nvim/os/wstream_defs.h b/src/nvim/os/wstream_defs.h deleted file mode 100644 index cfa0bf0b60..0000000000 --- a/src/nvim/os/wstream_defs.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef NVIM_OS_WSTREAM_DEFS_H -#define NVIM_OS_WSTREAM_DEFS_H - -typedef struct wbuffer WBuffer; -typedef struct wstream WStream; -typedef void (*wbuffer_data_finalizer)(void *data); - -/// Type of function called when the WStream has information about a write -/// request. -/// -/// @param wstream The `WStream` instance -/// @param data User-defined data -/// @param status 0 on success, anything else indicates failure -typedef void (*wstream_cb)(WStream *wstream, -                           void *data, -                           int status); - -#endif  // NVIM_OS_WSTREAM_DEFS_H - diff --git a/src/nvim/tui/term_input.inl b/src/nvim/tui/term_input.inl index 331a0a89e0..0a84a3688b 100644 --- a/src/nvim/tui/term_input.inl +++ b/src/nvim/tui/term_input.inl @@ -4,7 +4,7 @@  #include "nvim/misc2.h"  #include "nvim/os/os.h"  #include "nvim/os/input.h" -#include "nvim/os/rstream.h" +#include "nvim/event/rstream.h"  #include "nvim/event/time.h"  #define PASTETOGGLE_KEY "<f37>" @@ -14,8 +14,7 @@ struct term_input {    bool paste_enabled;    TermKey *tk;    TimeWatcher timer_handle; -  RBuffer *read_buffer; -  RStream *read_stream; +  Stream read_stream;  };  static void forward_simple_utf8(TermKeyKey *key) @@ -162,12 +161,12 @@ static void timer_cb(TimeWatcher *watcher, void *data)  static bool handle_bracketed_paste(TermInput *input)  { -  if (rbuffer_size(input->read_buffer) > 5 && -      (!rbuffer_cmp(input->read_buffer, "\x1b[200~", 6) || -       !rbuffer_cmp(input->read_buffer, "\x1b[201~", 6))) { -    bool enable = *rbuffer_get(input->read_buffer, 4) == '0'; +  if (rbuffer_size(input->read_stream.buffer) > 5 && +      (!rbuffer_cmp(input->read_stream.buffer, "\x1b[200~", 6) || +       !rbuffer_cmp(input->read_stream.buffer, "\x1b[201~", 6))) { +    bool enable = *rbuffer_get(input->read_stream.buffer, 4) == '0';      // Advance past the sequence -    rbuffer_consumed(input->read_buffer, 6); +    rbuffer_consumed(input->read_stream.buffer, 6);      if (input->paste_enabled == enable) {        return true;      } @@ -194,19 +193,22 @@ static bool handle_bracketed_paste(TermInput *input)  static bool handle_forced_escape(TermInput *input)  { -  if (rbuffer_size(input->read_buffer) > 1 -      && !rbuffer_cmp(input->read_buffer, "\x1b\x00", 2)) { +  if (rbuffer_size(input->read_stream.buffer) > 1 +      && !rbuffer_cmp(input->read_stream.buffer, "\x1b\x00", 2)) {      // skip the ESC and NUL and push one <esc> to the input buffer      size_t rcnt; -    termkey_push_bytes(input->tk, rbuffer_read_ptr(input->read_buffer, &rcnt), 1); -    rbuffer_consumed(input->read_buffer, 2); +    termkey_push_bytes(input->tk, rbuffer_read_ptr(input->read_stream.buffer, +          &rcnt), 1); +    rbuffer_consumed(input->read_stream.buffer, 2);      tk_getkeys(input, true);      return true;    }    return false;  } -static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void restart_reading(Event event); + +static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)  {    TermInput *input = data; @@ -223,8 +225,9 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)        //        // ls *.md | xargs nvim        input->in_fd = 2; -      rstream_set_file(input->read_stream, input->in_fd); -      rstream_start(input->read_stream); +      stream_close(&input->read_stream, NULL); +      loop_push_event(&loop, +          (Event) { .data = input, .handler = restart_reading }, false);      } else {        input_done();      } @@ -240,7 +243,7 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)      // so the `handle_bracketed_paste`/`handle_forced_escape` calls above work      // as expected.      size_t count = 0; -    RBUFFER_EACH(input->read_buffer, c, i) { +    RBUFFER_EACH(input->read_stream.buffer, c, i) {        count = i + 1;        if (c == '\x1b' && count > 1) {          count--; @@ -248,13 +251,13 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)        }      } -    RBUFFER_UNTIL_EMPTY(input->read_buffer, ptr, len) { +    RBUFFER_UNTIL_EMPTY(input->read_stream.buffer, ptr, len) {        size_t consumed = termkey_push_bytes(input->tk, ptr, MIN(count, len));        // termkey_push_bytes can return (size_t)-1, so it is possible that -      // `consumed > input->read_buffer->size`, but since tk_getkeys is called -      // soon, it shouldn't happen -      assert(consumed <= input->read_buffer->size); -      rbuffer_consumed(input->read_buffer, consumed); +      // `consumed > input->read_stream.buffer->size`, but since tk_getkeys is +      // called soon, it shouldn't happen +      assert(consumed <= input->read_stream.buffer->size); +      rbuffer_consumed(input->read_stream.buffer, consumed);        // Need to process the keys now since there's no guarantee "count" will        // fit into libtermkey's input buffer.        tk_getkeys(input, false); @@ -262,11 +265,18 @@ static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)          break;        }      } -  } while (rbuffer_size(input->read_buffer)); +  } while (rbuffer_size(input->read_stream.buffer));    // Make sure the next input escape sequence fits into the ring buffer    // without wrap around, otherwise it could be misinterpreted. -  rbuffer_reset(input->read_buffer); +  rbuffer_reset(input->read_stream.buffer); +} + +static void restart_reading(Event event) +{ +  TermInput *input = event.data; +  rstream_init_fd(&loop, &input->read_stream, input->in_fd, 0xfff, input); +  rstream_start(&input->read_stream, read_cb);  }  static TermInput *term_input_new(void) @@ -283,10 +293,8 @@ static TermInput *term_input_new(void)    int curflags = termkey_get_canonflags(rv->tk);    termkey_set_canonflags(rv->tk, curflags | TERMKEY_CANON_DELBS);    // setup input handle -  rv->read_buffer = rbuffer_new(0xfff); -  rv->read_stream = rstream_new(read_cb, rv->read_buffer, rv); -  rstream_set_file(rv->read_stream, rv->in_fd); -  rstream_start(rv->read_stream); +  rstream_init_fd(&loop, &rv->read_stream, rv->in_fd, 0xfff, rv); +  rstream_start(&rv->read_stream, read_cb);    // initialize a timer handle for handling ESC with libtermkey    time_watcher_init(&loop, &rv->timer_handle, rv);    // Set the pastetoggle option to a special key that will be sent when @@ -301,8 +309,8 @@ static void term_input_destroy(TermInput *input)  {    time_watcher_stop(&input->timer_handle);    time_watcher_close(&input->timer_handle, NULL); -  rstream_stop(input->read_stream); -  rstream_free(input->read_stream); +  rstream_stop(&input->read_stream); +  stream_close(&input->read_stream, NULL);    termkey_destroy(input->tk);    // Run once to remove references to input/timer handles    loop_poll_events(&loop, 0); | 
