diff options
Diffstat (limited to 'src/nvim/os/rstream.c')
-rw-r--r-- | src/nvim/os/rstream.c | 201 |
1 files changed, 14 insertions, 187 deletions
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index a99745f068..af84288f0f 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -14,12 +14,6 @@ #include "nvim/log.h" #include "nvim/misc1.h" -struct rbuffer { - char *data; - size_t capacity, rpos, wpos; - RStream *rstream; -}; - struct rstream { void *data; uv_buf_t uvbuf; @@ -37,135 +31,6 @@ struct rstream { # include "os/rstream.c.generated.h" #endif -/// Creates a new `RBuffer` instance. -RBuffer *rbuffer_new(size_t capacity) -{ - RBuffer *rv = xmalloc(sizeof(RBuffer)); - rv->data = xmalloc(capacity); - rv->capacity = capacity; - rv->rpos = rv->wpos = 0; - rv->rstream = NULL; - return rv; -} - -/// Advances `rbuffer` read pointers to consume data. If the associated -/// RStream had stopped because the buffer was full, this will restart it. -/// -/// This is called automatically by rbuffer_read, but when using -/// `rbuffer_read_ptr` directly, this needs to called after the data was -/// consumed. -void rbuffer_consumed(RBuffer *rbuffer, size_t count) -{ - rbuffer->rpos += count; - if (count && rbuffer->wpos == rbuffer->capacity) { - // `wpos` is at the end of the buffer, so free some space by moving unread - // data... - rbuffer_relocate(rbuffer); - if (rbuffer->rstream) { - // restart the associated RStream - rstream_start(rbuffer->rstream); - } - } -} - -/// Advances `rbuffer` write pointers. If the internal buffer becomes full, -/// this will stop the associated RStream instance. -void rbuffer_produced(RBuffer *rbuffer, size_t count) -{ - rbuffer->wpos += count; - DLOG("Received %u bytes from RStream(%p)", (size_t)count, rbuffer->rstream); - - rbuffer_relocate(rbuffer); - if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { - // The last read filled the buffer, stop reading for now - // - rstream_stop(rbuffer->rstream); - DLOG("Buffer for RStream(%p) is full, stopping it", rbuffer->rstream); - } -} - -/// Reads data from a `RBuffer` instance into a raw buffer. -/// -/// @param rbuffer The `RBuffer` instance -/// @param buffer The buffer which will receive the data -/// @param count Number of bytes that `buffer` can accept -/// @return The number of bytes copied into `buffer` -size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count) -{ - size_t read_count = rbuffer_pending(rbuffer); - - if (count < read_count) { - read_count = count; - } - - if (read_count > 0) { - memcpy(buffer, rbuffer_read_ptr(rbuffer), read_count); - rbuffer_consumed(rbuffer, read_count); - } - - return read_count; -} - -/// Copies data to `rbuffer` read queue. -/// -/// @param rbuffer the `RBuffer` instance -/// @param buffer The buffer containing data to be copied -/// @param count Number of bytes that should be copied -/// @return The number of bytes actually copied -size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count) -{ - size_t write_count = rbuffer_available(rbuffer); - - if (count < write_count) { - write_count = count; - } - - if (write_count > 0) { - memcpy(rbuffer_write_ptr(rbuffer), buffer, write_count); - rbuffer_produced(rbuffer, write_count); - } - - return write_count; -} - -/// Returns a pointer to a raw buffer containing the first byte available for -/// reading. -char *rbuffer_read_ptr(RBuffer *rbuffer) -{ - return rbuffer->data + rbuffer->rpos; -} - -/// Returns a pointer to a raw buffer containing the first byte available for -/// write. -char *rbuffer_write_ptr(RBuffer *rbuffer) -{ - return rbuffer->data + rbuffer->wpos; -} - -/// Returns the number of bytes ready for consumption in `rbuffer` -/// -/// @param rbuffer The `RBuffer` instance -/// @return The number of bytes ready for consumption -size_t rbuffer_pending(RBuffer *rbuffer) -{ - return rbuffer->wpos - rbuffer->rpos; -} - -/// Returns available space in `rbuffer` -/// -/// @param rbuffer The `RBuffer` instance -/// @return The space available in number of bytes -size_t rbuffer_available(RBuffer *rbuffer) -{ - return rbuffer->capacity - rbuffer->wpos; -} - -void rbuffer_free(RBuffer *rbuffer) -{ - xfree(rbuffer->data); - xfree(rbuffer); -} - /// Creates a new RStream instance. A RStream encapsulates all the boilerplate /// necessary for reading from a libuv stream. /// @@ -177,8 +42,10 @@ void rbuffer_free(RBuffer *rbuffer) RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) { RStream *rv = xmalloc(sizeof(RStream)); + buffer->data = rv; + buffer->full_cb = on_rbuffer_full; + buffer->nonfull_cb = on_rbuffer_nonfull; rv->buffer = buffer; - rv->buffer->rstream = rv; rv->fpos = 0; rv->data = data; rv->cb = cb; @@ -190,16 +57,14 @@ RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) return rv; } -/// Returns the read pointer used by the rstream. -char *rstream_read_ptr(RStream *rstream) +static void on_rbuffer_full(RBuffer *buf, void *data) { - return rbuffer_read_ptr(rstream->buffer); + rstream_stop(data); } -/// Returns the number of bytes before the rstream is full. -size_t rstream_available(RStream *rstream) +static void on_rbuffer_nonfull(RBuffer *buf, void *data) { - return rbuffer_available(rstream->buffer); + rstream_start(data); } /// Frees all memory allocated for a RStream instance @@ -297,37 +162,13 @@ void rstream_stop(RStream *rstream) } } -/// Returns the number of bytes ready for consumption in `rstream` -size_t rstream_pending(RStream *rstream) -{ - return rbuffer_pending(rstream->buffer); -} - -/// Reads data from a `RStream` instance into a buffer. -/// -/// @param rstream The `RStream` instance -/// @param buffer The buffer which will receive the data -/// @param count Number of bytes that `buffer` can accept -/// @return The number of bytes copied into `buffer` -size_t rstream_read(RStream *rstream, char *buffer, size_t count) -{ - return rbuffer_read(rstream->buffer, buffer, count); -} - -RBuffer *rstream_buffer(RStream *rstream) -{ - return rstream->buffer; -} - // Callbacks used by libuv // Called by libuv to allocate memory for reading. static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { RStream *rstream = handle_get_rstream(handle); - - buf->len = rbuffer_available(rstream->buffer); - buf->base = rbuffer_write_ptr(rstream->buffer); + buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len); } // Callback invoked by libuv after it copies the data into the buffer provided @@ -341,7 +182,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt != UV_ENOBUFS // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. - // + // // We don't need to do anything with the RBuffer because the next call // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` // won't be called) @@ -351,18 +192,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); - rstream->cb(rstream, rstream->data, true); + rstream->cb(rstream, rstream->buffer, rstream->data, true); } return; } // at this point we're sure that cnt is positive, no error occurred - size_t nread = (size_t) cnt; - + size_t nread = (size_t)cnt; // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(rstream->buffer, nread); - rstream->cb(rstream, rstream->data, false); + rstream->cb(rstream, rstream->buffer, rstream->data, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -371,8 +211,7 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_t req; RStream *rstream = handle_get_rstream((uv_handle_t *)handle); - rstream->uvbuf.len = rbuffer_available(rstream->buffer); - rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer); + rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len); // the offset argument to uv_fs_read is int64_t, could someone really try // to read more than 9 quintillion (9e18) bytes? @@ -397,7 +236,7 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(rstream->fread_idle); - rstream->cb(rstream, rstream->data, true); + rstream->cb(rstream, rstream->buffer, rstream->data, true); return; } @@ -412,15 +251,3 @@ static void close_cb(uv_handle_t *handle) xfree(handle->data); xfree(handle); } - -static void rbuffer_relocate(RBuffer *rbuffer) -{ - assert(rbuffer->rpos <= rbuffer->wpos); - // Move data ... - memmove( - rbuffer->data, // ...to the beginning of the buffer(rpos 0) - rbuffer->data + rbuffer->rpos, // ...From the first unread position - rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes - rbuffer->wpos -= rbuffer->rpos; - rbuffer->rpos = 0; -} |