diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/eval.c | 2 | ||||
-rw-r--r-- | src/nvim/os/channel.c | 8 | ||||
-rw-r--r-- | src/nvim/os/input.c | 9 | ||||
-rw-r--r-- | src/nvim/os/job.c | 10 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 237 | ||||
-rw-r--r-- | src/nvim/os/rstream_defs.h | 1 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 2 |
7 files changed, 185 insertions, 84 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 8b02a4187a..e3bd37a03f 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -19531,7 +19531,7 @@ static void on_job_exit(Job *job, void *data) static void on_job_data(RStream *rstream, void *data, bool eof, char *type) { Job *job = data; - uint32_t read_count = rstream_available(rstream); + uint32_t read_count = rstream_pending(rstream); char *str = xmalloc(read_count + 1); rstream_read(rstream, str, read_count); diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 5598b485ba..959fbc6e73 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -127,7 +127,7 @@ void channel_from_stream(uv_stream_t *stream) channel->is_job = false; // read stream channel->data.streams.read = rstream_new(parse_msgpack, - CHANNEL_BUFFER_SIZE, + rbuffer_new(CHANNEL_BUFFER_SIZE), channel, NULL); rstream_set_stream(channel->data.streams.read, stream); @@ -290,7 +290,7 @@ static void channel_from_stdio(void) channel->is_job = false; // read stream channel->data.streams.read = rstream_new(parse_msgpack, - CHANNEL_BUFFER_SIZE, + rbuffer_new(CHANNEL_BUFFER_SIZE), channel, NULL); rstream_set_file(channel->data.streams.read, 0); @@ -313,7 +313,7 @@ static void job_err(RStream *rstream, void *data, bool eof) char buf[256]; Channel *channel = job_data(data); - while ((count = rstream_available(rstream))) { + while ((count = rstream_pending(rstream))) { size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); buf[read] = NUL; ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); @@ -336,7 +336,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - uint32_t count = rstream_available(rstream); + uint32_t count = rstream_pending(rstream); DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", count, rstream); diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 53024d1389..511dfd7b07 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -39,7 +39,10 @@ void input_init(void) return; } - read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL); + read_stream = rstream_new(read_cb, + rbuffer_new(READ_BUFFER_SIZE), + NULL, + NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -167,7 +170,7 @@ static InbufPollResult inbuf_poll(int32_t ms) } if (input_poll(ms)) { - return eof && rstream_available(read_stream) == 0 ? + return eof && rstream_pending(read_stream) == 0 ? kInputEof : kInputAvail; } @@ -227,6 +230,6 @@ static int push_event_key(uint8_t *buf, int maxlen) // Check if there's pending input bool input_ready(void) { - return rstream_available(read_stream) > 0 || eof; + return rstream_pending(read_stream) > 0 || eof; } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 9fb2a49e50..2ca1023290 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -213,8 +213,14 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job)); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job)); + job->out = rstream_new(read_cb, + rbuffer_new(JOB_BUFFER_SIZE), + job, + job_event_source(job)); + job->err = rstream_new(read_cb, + rbuffer_new(JOB_BUFFER_SIZE), + job, + job_event_source(job)); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index b3a5196351..3e5a99acd1 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -16,16 +16,22 @@ #include "nvim/log.h" #include "nvim/misc1.h" +struct rbuffer { + char *data; + size_t capacity, rpos, wpos; + RStream *rstream; +}; + struct rstream { - uv_buf_t uvbuf; void *data; - char *buffer; + uv_buf_t uvbuf; + size_t fpos; + RBuffer *buffer; uv_stream_t *stream; uv_idle_t *fread_idle; uv_handle_type file_type; uv_file fd; rstream_cb cb; - size_t buffer_size, rpos, wpos, fpos; bool free_handle; EventSource source_override; }; @@ -34,27 +40,151 @@ struct rstream { # include "os/rstream.c.generated.h" #endif +/// Creates a new `RBuffer` instance. +RBuffer *rbuffer_new(size_t capacity) +{ + RBuffer *rv = xmalloc(sizeof(RBuffer)); + rv->data = xmalloc(capacity); + rv->capacity = capacity; + rv->rpos = rv->wpos = 0; + return rv; +} + +/// Advances `rbuffer` read pointers to consume data. If the associated +/// RStream had stopped because the buffer was full, this will restart it. +/// +/// This is called automatically by rbuffer_read, but when using `rbuffer_data` +/// directly, this needs to called after the data was consumed. +void rbuffer_consumed(RBuffer *rbuffer, size_t count) +{ + rbuffer->rpos += count; + if (count && rbuffer->wpos == rbuffer->capacity) { + // `wpos` is at the end of the buffer, so free some space by moving unread + // data... + rbuffer_relocate(rbuffer); + if (rbuffer->rstream) { + // restart the associated RStream + rstream_start(rbuffer->rstream); + } + } +} + +/// Advances `rbuffer` write pointers. If the internal buffer becomes full, +/// this will stop the associated RStream instance. +void rbuffer_produced(RBuffer *rbuffer, size_t count) +{ + rbuffer->wpos += count; + DLOG("Received %u bytes from RStream(address: %p, source: %p)", + (size_t)cnt, + rbuffer->rstream, + rstream_event_source(rbuffer->rstream)); + + rbuffer_relocate(rbuffer); + if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { + // The last read filled the buffer, stop reading for now + rstream_stop(rbuffer->rstream); + DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", + rstream, + rstream_event_source(rstream)); + } +} + +/// Reads data from a `RBuffer` instance into a raw buffer. +/// +/// @param rbuffer The `RBuffer` instance +/// @param buffer The buffer which will receive the data +/// @param count Number of bytes that `buffer` can accept +/// @return The number of bytes copied into `buffer` +size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count) +{ + size_t read_count = rbuffer_pending(rbuffer); + + if (count < read_count) { + read_count = count; + } + + if (read_count > 0) { + memcpy(buffer, rbuffer_data(rbuffer), read_count); + rbuffer_consumed(rbuffer, read_count); + } + + return read_count; +} + +/// Copies data to `rbuffer` read queue. +/// +/// @param rbuffer the `RBuffer` instance +/// @param buffer The buffer containing data to be copied +/// @param count Number of bytes that should be copied +/// @return The number of bytes actually copied +size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count) +{ + size_t write_count = rbuffer_available(rbuffer); + + if (count < write_count) { + write_count = count; + } + + if (write_count > 0) { + memcpy(rbuffer_data(rbuffer), buffer, write_count); + rbuffer_produced(rbuffer, write_count); + } + + return write_count; +} + +/// Returns a pointer to a raw buffer containing the first byte available for +/// reading. +char *rbuffer_data(RBuffer *rbuffer) +{ + return rbuffer->data + rbuffer->rpos; +} + +/// Returns the number of bytes ready for consumption in `rbuffer` +/// +/// @param rbuffer The `RBuffer` instance +/// @return The number of bytes ready for consumption +size_t rbuffer_pending(RBuffer *rbuffer) +{ + return rbuffer->wpos - rbuffer->rpos; +} + +/// Returns available space in `rbuffer` +/// +/// @param rbuffer The `RBuffer` instance +/// @return The space available in number of bytes +size_t rbuffer_available(RBuffer *rbuffer) +{ + return rbuffer->capacity - rbuffer->wpos; +} + +void rbuffer_free(RBuffer *rbuffer) +{ + free(rbuffer->data); + free(rbuffer); +} + /// Creates a new RStream instance. A RStream encapsulates all the boilerplate /// necessary for reading from a libuv stream. /// /// @param cb A function that will be called whenever some data is available /// for reading with `rstream_read` -/// @param buffer_size Size in bytes of the internal buffer. +/// @param buffer RBuffer instance to associate with the RStream /// @param data Some state to associate with the `RStream` instance /// @param source_override Replacement for the default source used in events /// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance RStream * rstream_new(rstream_cb cb, - size_t buffer_size, + RBuffer *buffer, void *data, EventSource source_override) { RStream *rv = xmalloc(sizeof(RStream)); - rv->buffer = xmalloc(buffer_size); - rv->buffer_size = buffer_size; + rv->buffer = buffer; + rv->buffer->rstream = rv; + rv->fpos = 0; rv->data = data; rv->cb = cb; - rv->rpos = rv->wpos = rv->fpos = 0; rv->stream = NULL; rv->fread_idle = NULL; rv->free_handle = false; @@ -77,7 +207,7 @@ void rstream_free(RStream *rstream) } } - free(rstream->buffer); + rbuffer_free(rstream->buffer); free(rstream); } @@ -166,51 +296,21 @@ void rstream_stop(RStream *rstream) } } +/// Returns the number of bytes ready for consumption in `rstream` +size_t rstream_pending(RStream *rstream) +{ + return rbuffer_pending(rstream->buffer); +} + /// Reads data from a `RStream` instance into a buffer. /// /// @param rstream The `RStream` instance /// @param buffer The buffer which will receive the data /// @param count Number of bytes that `buffer` can accept /// @return The number of bytes copied into `buffer` -size_t rstream_read(RStream *rstream, char *buf, size_t count) -{ - size_t read_count = rstream->wpos - rstream->rpos; - - if (count < read_count) { - read_count = count; - } - - if (read_count > 0) { - memcpy(buf, rstream->buffer + rstream->rpos, read_count); - rstream->rpos += read_count; - } - - if (rstream->wpos == rstream->buffer_size) { - // `wpos` is at the end of the buffer, so free some space by moving unread - // data... - memmove( - rstream->buffer, // ...To the beginning of the buffer(rpos 0) - rstream->buffer + rstream->rpos, // ...From the first unread position - rstream->wpos - rstream->rpos); // ...By the number of unread bytes - rstream->wpos -= rstream->rpos; - rstream->rpos = 0; - - if (rstream->wpos < rstream->buffer_size) { - // Restart reading since we have freed some space - rstream_start(rstream); - } - } - - return read_count; -} - -/// Returns the number of bytes available for reading from `rstream` -/// -/// @param rstream The `RStream` instance -/// @return The number of bytes available -size_t rstream_available(RStream *rstream) +size_t rstream_read(RStream *rstream, char *buffer, size_t count) { - return rstream->wpos - rstream->rpos; + return rbuffer_read(rstream->buffer, buffer, count); } /// Runs the read callback associated with the rstream @@ -235,8 +335,8 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { RStream *rstream = handle_get_rstream(handle); - buf->len = rstream->buffer_size - rstream->wpos; - buf->base = rstream->buffer + rstream->wpos; + buf->len = rbuffer_available(rstream->buffer); + buf->base = rbuffer_data(rstream->buffer); } // Callback invoked by libuv after it copies the data into the buffer provided @@ -264,20 +364,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. - rstream->wpos += nread; - DLOG("Received %u bytes from RStream(address: %p, source: %p)", - (size_t)cnt, - rstream, - rstream_event_source(rstream)); - - if (rstream->wpos == rstream->buffer_size) { - // The last read filled the buffer, stop reading for now - rstream_stop(rstream); - DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", - rstream, - rstream_event_source(rstream)); - } - + rbuffer_produced(rstream->buffer, nread); emit_read_event(rstream, false); } @@ -287,8 +374,8 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_t req; RStream *rstream = handle_get_rstream((uv_handle_t *)handle); - rstream->uvbuf.base = rstream->buffer + rstream->wpos; - rstream->uvbuf.len = rstream->buffer_size - rstream->wpos; + rstream->uvbuf.len = rstream->buffer->capacity - rstream->buffer->wpos; + rstream->uvbuf.base = rstream->buffer->data + rstream->buffer->wpos; // the offset argument to uv_fs_read is int64_t, could someone really try // to read more than 9 quintillion (9e18) bytes? @@ -319,15 +406,8 @@ static void fread_idle_cb(uv_idle_t *handle) // no errors (req.result (ssize_t) is positive), it's safe to cast. size_t nread = (size_t) req.result; - - rstream->wpos += nread; + rbuffer_produced(rstream->buffer, nread); rstream->fpos += nread; - - if (rstream->wpos == rstream->buffer_size) { - // The last read filled the buffer, stop reading for now - rstream_stop(rstream); - } - emit_read_event(rstream, false); } @@ -349,3 +429,14 @@ static void emit_read_event(RStream *rstream, bool eof) }; event_push(event); } + +static void rbuffer_relocate(RBuffer *rbuffer) +{ + // Move data ... + memmove( + rbuffer->data, // ...to the beginning of the buffer(rpos 0) + rbuffer->data + rbuffer->rpos, // ...From the first unread position + rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes + rbuffer->wpos -= rbuffer->rpos; + rbuffer->rpos = 0; +} diff --git a/src/nvim/os/rstream_defs.h b/src/nvim/os/rstream_defs.h index 5c7183c4c3..1d71160963 100644 --- a/src/nvim/os/rstream_defs.h +++ b/src/nvim/os/rstream_defs.h @@ -3,6 +3,7 @@ #include <stdbool.h> +typedef struct rbuffer RBuffer; typedef struct rstream RStream; /// Type of function called when the RStream receives data diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 912dc95aca..453cc6d605 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -341,7 +341,7 @@ static void system_data_cb(RStream *rstream, void *data, bool eof) Job *job = data; dyn_buffer_t *buf = job_data(job); - size_t nread = rstream_available(rstream); + size_t nread = rstream_pending(rstream); dyn_buf_ensure(buf, buf->len + nread + 1); rstream_read(rstream, buf->data + buf->len, nread); |