diff options
Diffstat (limited to 'src/nvim/os/rstream.c')
-rw-r--r-- | src/nvim/os/rstream.c | 237 |
1 files changed, 164 insertions, 73 deletions
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index b3a5196351..3e5a99acd1 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -16,16 +16,22 @@ #include "nvim/log.h" #include "nvim/misc1.h" +struct rbuffer { + char *data; + size_t capacity, rpos, wpos; + RStream *rstream; +}; + struct rstream { - uv_buf_t uvbuf; void *data; - char *buffer; + uv_buf_t uvbuf; + size_t fpos; + RBuffer *buffer; uv_stream_t *stream; uv_idle_t *fread_idle; uv_handle_type file_type; uv_file fd; rstream_cb cb; - size_t buffer_size, rpos, wpos, fpos; bool free_handle; EventSource source_override; }; @@ -34,27 +40,151 @@ struct rstream { # include "os/rstream.c.generated.h" #endif +/// Creates a new `RBuffer` instance. +RBuffer *rbuffer_new(size_t capacity) +{ + RBuffer *rv = xmalloc(sizeof(RBuffer)); + rv->data = xmalloc(capacity); + rv->capacity = capacity; + rv->rpos = rv->wpos = 0; + return rv; +} + +/// Advances `rbuffer` read pointers to consume data. If the associated +/// RStream had stopped because the buffer was full, this will restart it. +/// +/// This is called automatically by rbuffer_read, but when using `rbuffer_data` +/// directly, this needs to called after the data was consumed. +void rbuffer_consumed(RBuffer *rbuffer, size_t count) +{ + rbuffer->rpos += count; + if (count && rbuffer->wpos == rbuffer->capacity) { + // `wpos` is at the end of the buffer, so free some space by moving unread + // data... + rbuffer_relocate(rbuffer); + if (rbuffer->rstream) { + // restart the associated RStream + rstream_start(rbuffer->rstream); + } + } +} + +/// Advances `rbuffer` write pointers. If the internal buffer becomes full, +/// this will stop the associated RStream instance. +void rbuffer_produced(RBuffer *rbuffer, size_t count) +{ + rbuffer->wpos += count; + DLOG("Received %u bytes from RStream(address: %p, source: %p)", + (size_t)cnt, + rbuffer->rstream, + rstream_event_source(rbuffer->rstream)); + + rbuffer_relocate(rbuffer); + if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { + // The last read filled the buffer, stop reading for now + rstream_stop(rbuffer->rstream); + DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", + rstream, + rstream_event_source(rstream)); + } +} + +/// Reads data from a `RBuffer` instance into a raw buffer. +/// +/// @param rbuffer The `RBuffer` instance +/// @param buffer The buffer which will receive the data +/// @param count Number of bytes that `buffer` can accept +/// @return The number of bytes copied into `buffer` +size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count) +{ + size_t read_count = rbuffer_pending(rbuffer); + + if (count < read_count) { + read_count = count; + } + + if (read_count > 0) { + memcpy(buffer, rbuffer_data(rbuffer), read_count); + rbuffer_consumed(rbuffer, read_count); + } + + return read_count; +} + +/// Copies data to `rbuffer` read queue. +/// +/// @param rbuffer the `RBuffer` instance +/// @param buffer The buffer containing data to be copied +/// @param count Number of bytes that should be copied +/// @return The number of bytes actually copied +size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count) +{ + size_t write_count = rbuffer_available(rbuffer); + + if (count < write_count) { + write_count = count; + } + + if (write_count > 0) { + memcpy(rbuffer_data(rbuffer), buffer, write_count); + rbuffer_produced(rbuffer, write_count); + } + + return write_count; +} + +/// Returns a pointer to a raw buffer containing the first byte available for +/// reading. +char *rbuffer_data(RBuffer *rbuffer) +{ + return rbuffer->data + rbuffer->rpos; +} + +/// Returns the number of bytes ready for consumption in `rbuffer` +/// +/// @param rbuffer The `RBuffer` instance +/// @return The number of bytes ready for consumption +size_t rbuffer_pending(RBuffer *rbuffer) +{ + return rbuffer->wpos - rbuffer->rpos; +} + +/// Returns available space in `rbuffer` +/// +/// @param rbuffer The `RBuffer` instance +/// @return The space available in number of bytes +size_t rbuffer_available(RBuffer *rbuffer) +{ + return rbuffer->capacity - rbuffer->wpos; +} + +void rbuffer_free(RBuffer *rbuffer) +{ + free(rbuffer->data); + free(rbuffer); +} + /// Creates a new RStream instance. A RStream encapsulates all the boilerplate /// necessary for reading from a libuv stream. /// /// @param cb A function that will be called whenever some data is available /// for reading with `rstream_read` -/// @param buffer_size Size in bytes of the internal buffer. +/// @param buffer RBuffer instance to associate with the RStream /// @param data Some state to associate with the `RStream` instance /// @param source_override Replacement for the default source used in events /// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance RStream * rstream_new(rstream_cb cb, - size_t buffer_size, + RBuffer *buffer, void *data, EventSource source_override) { RStream *rv = xmalloc(sizeof(RStream)); - rv->buffer = xmalloc(buffer_size); - rv->buffer_size = buffer_size; + rv->buffer = buffer; + rv->buffer->rstream = rv; + rv->fpos = 0; rv->data = data; rv->cb = cb; - rv->rpos = rv->wpos = rv->fpos = 0; rv->stream = NULL; rv->fread_idle = NULL; rv->free_handle = false; @@ -77,7 +207,7 @@ void rstream_free(RStream *rstream) } } - free(rstream->buffer); + rbuffer_free(rstream->buffer); free(rstream); } @@ -166,51 +296,21 @@ void rstream_stop(RStream *rstream) } } +/// Returns the number of bytes ready for consumption in `rstream` +size_t rstream_pending(RStream *rstream) +{ + return rbuffer_pending(rstream->buffer); +} + /// Reads data from a `RStream` instance into a buffer. /// /// @param rstream The `RStream` instance /// @param buffer The buffer which will receive the data /// @param count Number of bytes that `buffer` can accept /// @return The number of bytes copied into `buffer` -size_t rstream_read(RStream *rstream, char *buf, size_t count) -{ - size_t read_count = rstream->wpos - rstream->rpos; - - if (count < read_count) { - read_count = count; - } - - if (read_count > 0) { - memcpy(buf, rstream->buffer + rstream->rpos, read_count); - rstream->rpos += read_count; - } - - if (rstream->wpos == rstream->buffer_size) { - // `wpos` is at the end of the buffer, so free some space by moving unread - // data... - memmove( - rstream->buffer, // ...To the beginning of the buffer(rpos 0) - rstream->buffer + rstream->rpos, // ...From the first unread position - rstream->wpos - rstream->rpos); // ...By the number of unread bytes - rstream->wpos -= rstream->rpos; - rstream->rpos = 0; - - if (rstream->wpos < rstream->buffer_size) { - // Restart reading since we have freed some space - rstream_start(rstream); - } - } - - return read_count; -} - -/// Returns the number of bytes available for reading from `rstream` -/// -/// @param rstream The `RStream` instance -/// @return The number of bytes available -size_t rstream_available(RStream *rstream) +size_t rstream_read(RStream *rstream, char *buffer, size_t count) { - return rstream->wpos - rstream->rpos; + return rbuffer_read(rstream->buffer, buffer, count); } /// Runs the read callback associated with the rstream @@ -235,8 +335,8 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { RStream *rstream = handle_get_rstream(handle); - buf->len = rstream->buffer_size - rstream->wpos; - buf->base = rstream->buffer + rstream->wpos; + buf->len = rbuffer_available(rstream->buffer); + buf->base = rbuffer_data(rstream->buffer); } // Callback invoked by libuv after it copies the data into the buffer provided @@ -264,20 +364,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. - rstream->wpos += nread; - DLOG("Received %u bytes from RStream(address: %p, source: %p)", - (size_t)cnt, - rstream, - rstream_event_source(rstream)); - - if (rstream->wpos == rstream->buffer_size) { - // The last read filled the buffer, stop reading for now - rstream_stop(rstream); - DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", - rstream, - rstream_event_source(rstream)); - } - + rbuffer_produced(rstream->buffer, nread); emit_read_event(rstream, false); } @@ -287,8 +374,8 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_t req; RStream *rstream = handle_get_rstream((uv_handle_t *)handle); - rstream->uvbuf.base = rstream->buffer + rstream->wpos; - rstream->uvbuf.len = rstream->buffer_size - rstream->wpos; + rstream->uvbuf.len = rstream->buffer->capacity - rstream->buffer->wpos; + rstream->uvbuf.base = rstream->buffer->data + rstream->buffer->wpos; // the offset argument to uv_fs_read is int64_t, could someone really try // to read more than 9 quintillion (9e18) bytes? @@ -319,15 +406,8 @@ static void fread_idle_cb(uv_idle_t *handle) // no errors (req.result (ssize_t) is positive), it's safe to cast. size_t nread = (size_t) req.result; - - rstream->wpos += nread; + rbuffer_produced(rstream->buffer, nread); rstream->fpos += nread; - - if (rstream->wpos == rstream->buffer_size) { - // The last read filled the buffer, stop reading for now - rstream_stop(rstream); - } - emit_read_event(rstream, false); } @@ -349,3 +429,14 @@ static void emit_read_event(RStream *rstream, bool eof) }; event_push(event); } + +static void rbuffer_relocate(RBuffer *rbuffer) +{ + // Move data ... + memmove( + rbuffer->data, // ...to the beginning of the buffer(rpos 0) + rbuffer->data + rbuffer->rpos, // ...From the first unread position + rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes + rbuffer->wpos -= rbuffer->rpos; + rbuffer->rpos = 0; +} |