aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/rstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/rstream.c')
-rw-r--r--src/nvim/os/rstream.c201
1 files changed, 14 insertions, 187 deletions
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index a99745f068..af84288f0f 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -14,12 +14,6 @@
#include "nvim/log.h"
#include "nvim/misc1.h"
-struct rbuffer {
- char *data;
- size_t capacity, rpos, wpos;
- RStream *rstream;
-};
-
struct rstream {
void *data;
uv_buf_t uvbuf;
@@ -37,135 +31,6 @@ struct rstream {
# include "os/rstream.c.generated.h"
#endif
-/// Creates a new `RBuffer` instance.
-RBuffer *rbuffer_new(size_t capacity)
-{
- RBuffer *rv = xmalloc(sizeof(RBuffer));
- rv->data = xmalloc(capacity);
- rv->capacity = capacity;
- rv->rpos = rv->wpos = 0;
- rv->rstream = NULL;
- return rv;
-}
-
-/// Advances `rbuffer` read pointers to consume data. If the associated
-/// RStream had stopped because the buffer was full, this will restart it.
-///
-/// This is called automatically by rbuffer_read, but when using
-/// `rbuffer_read_ptr` directly, this needs to called after the data was
-/// consumed.
-void rbuffer_consumed(RBuffer *rbuffer, size_t count)
-{
- rbuffer->rpos += count;
- if (count && rbuffer->wpos == rbuffer->capacity) {
- // `wpos` is at the end of the buffer, so free some space by moving unread
- // data...
- rbuffer_relocate(rbuffer);
- if (rbuffer->rstream) {
- // restart the associated RStream
- rstream_start(rbuffer->rstream);
- }
- }
-}
-
-/// Advances `rbuffer` write pointers. If the internal buffer becomes full,
-/// this will stop the associated RStream instance.
-void rbuffer_produced(RBuffer *rbuffer, size_t count)
-{
- rbuffer->wpos += count;
- DLOG("Received %u bytes from RStream(%p)", (size_t)count, rbuffer->rstream);
-
- rbuffer_relocate(rbuffer);
- if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
- // The last read filled the buffer, stop reading for now
- //
- rstream_stop(rbuffer->rstream);
- DLOG("Buffer for RStream(%p) is full, stopping it", rbuffer->rstream);
- }
-}
-
-/// Reads data from a `RBuffer` instance into a raw buffer.
-///
-/// @param rbuffer The `RBuffer` instance
-/// @param buffer The buffer which will receive the data
-/// @param count Number of bytes that `buffer` can accept
-/// @return The number of bytes copied into `buffer`
-size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count)
-{
- size_t read_count = rbuffer_pending(rbuffer);
-
- if (count < read_count) {
- read_count = count;
- }
-
- if (read_count > 0) {
- memcpy(buffer, rbuffer_read_ptr(rbuffer), read_count);
- rbuffer_consumed(rbuffer, read_count);
- }
-
- return read_count;
-}
-
-/// Copies data to `rbuffer` read queue.
-///
-/// @param rbuffer the `RBuffer` instance
-/// @param buffer The buffer containing data to be copied
-/// @param count Number of bytes that should be copied
-/// @return The number of bytes actually copied
-size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count)
-{
- size_t write_count = rbuffer_available(rbuffer);
-
- if (count < write_count) {
- write_count = count;
- }
-
- if (write_count > 0) {
- memcpy(rbuffer_write_ptr(rbuffer), buffer, write_count);
- rbuffer_produced(rbuffer, write_count);
- }
-
- return write_count;
-}
-
-/// Returns a pointer to a raw buffer containing the first byte available for
-/// reading.
-char *rbuffer_read_ptr(RBuffer *rbuffer)
-{
- return rbuffer->data + rbuffer->rpos;
-}
-
-/// Returns a pointer to a raw buffer containing the first byte available for
-/// write.
-char *rbuffer_write_ptr(RBuffer *rbuffer)
-{
- return rbuffer->data + rbuffer->wpos;
-}
-
-/// Returns the number of bytes ready for consumption in `rbuffer`
-///
-/// @param rbuffer The `RBuffer` instance
-/// @return The number of bytes ready for consumption
-size_t rbuffer_pending(RBuffer *rbuffer)
-{
- return rbuffer->wpos - rbuffer->rpos;
-}
-
-/// Returns available space in `rbuffer`
-///
-/// @param rbuffer The `RBuffer` instance
-/// @return The space available in number of bytes
-size_t rbuffer_available(RBuffer *rbuffer)
-{
- return rbuffer->capacity - rbuffer->wpos;
-}
-
-void rbuffer_free(RBuffer *rbuffer)
-{
- xfree(rbuffer->data);
- xfree(rbuffer);
-}
-
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
/// necessary for reading from a libuv stream.
///
@@ -177,8 +42,10 @@ void rbuffer_free(RBuffer *rbuffer)
RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
{
RStream *rv = xmalloc(sizeof(RStream));
+ buffer->data = rv;
+ buffer->full_cb = on_rbuffer_full;
+ buffer->nonfull_cb = on_rbuffer_nonfull;
rv->buffer = buffer;
- rv->buffer->rstream = rv;
rv->fpos = 0;
rv->data = data;
rv->cb = cb;
@@ -190,16 +57,14 @@ RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
return rv;
}
-/// Returns the read pointer used by the rstream.
-char *rstream_read_ptr(RStream *rstream)
+static void on_rbuffer_full(RBuffer *buf, void *data)
{
- return rbuffer_read_ptr(rstream->buffer);
+ rstream_stop(data);
}
-/// Returns the number of bytes before the rstream is full.
-size_t rstream_available(RStream *rstream)
+static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
- return rbuffer_available(rstream->buffer);
+ rstream_start(data);
}
/// Frees all memory allocated for a RStream instance
@@ -297,37 +162,13 @@ void rstream_stop(RStream *rstream)
}
}
-/// Returns the number of bytes ready for consumption in `rstream`
-size_t rstream_pending(RStream *rstream)
-{
- return rbuffer_pending(rstream->buffer);
-}
-
-/// Reads data from a `RStream` instance into a buffer.
-///
-/// @param rstream The `RStream` instance
-/// @param buffer The buffer which will receive the data
-/// @param count Number of bytes that `buffer` can accept
-/// @return The number of bytes copied into `buffer`
-size_t rstream_read(RStream *rstream, char *buffer, size_t count)
-{
- return rbuffer_read(rstream->buffer, buffer, count);
-}
-
-RBuffer *rstream_buffer(RStream *rstream)
-{
- return rstream->buffer;
-}
-
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
RStream *rstream = handle_get_rstream(handle);
-
- buf->len = rbuffer_available(rstream->buffer);
- buf->base = rbuffer_write_ptr(rstream->buffer);
+ buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len);
}
// Callback invoked by libuv after it copies the data into the buffer provided
@@ -341,7 +182,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt != UV_ENOBUFS
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
- //
+ //
// We don't need to do anything with the RBuffer because the next call
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
// won't be called)
@@ -351,18 +192,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
- rstream->cb(rstream, rstream->data, true);
+ rstream->cb(rstream, rstream->buffer, rstream->data, true);
}
return;
}
// at this point we're sure that cnt is positive, no error occurred
- size_t nread = (size_t) cnt;
-
+ size_t nread = (size_t)cnt;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(rstream->buffer, nread);
- rstream->cb(rstream, rstream->data, false);
+ rstream->cb(rstream, rstream->buffer, rstream->data, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -371,8 +211,7 @@ static void fread_idle_cb(uv_idle_t *handle)
uv_fs_t req;
RStream *rstream = handle_get_rstream((uv_handle_t *)handle);
- rstream->uvbuf.len = rbuffer_available(rstream->buffer);
- rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer);
+ rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len);
// the offset argument to uv_fs_read is int64_t, could someone really try
// to read more than 9 quintillion (9e18) bytes?
@@ -397,7 +236,7 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
- rstream->cb(rstream, rstream->data, true);
+ rstream->cb(rstream, rstream->buffer, rstream->data, true);
return;
}
@@ -412,15 +251,3 @@ static void close_cb(uv_handle_t *handle)
xfree(handle->data);
xfree(handle);
}
-
-static void rbuffer_relocate(RBuffer *rbuffer)
-{
- assert(rbuffer->rpos <= rbuffer->wpos);
- // Move data ...
- memmove(
- rbuffer->data, // ...to the beginning of the buffer(rpos 0)
- rbuffer->data + rbuffer->rpos, // ...From the first unread position
- rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes
- rbuffer->wpos -= rbuffer->rpos;
- rbuffer->rpos = 0;
-}