aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-04-16 20:30:00 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-04-18 16:11:59 -0300
commitc40428c934534b44f847b62cad54c38169bbdb83 (patch)
tree69cc70b57ddf5762c82110ca21e6be3a2290671f
parent9d16a6b3709a7f2a467ce7866c04c5fa84ecbf65 (diff)
downloadrneovim-c40428c934534b44f847b62cad54c38169bbdb83.tar.gz
rneovim-c40428c934534b44f847b62cad54c38169bbdb83.tar.bz2
rneovim-c40428c934534b44f847b62cad54c38169bbdb83.zip
Deal with backpressure on RStream instances
Each RStream instance will now stop its libuv watcher when the buffer is full, and automatically restart when some data is read with `rstream_read`.
-rw-r--r--src/os/rstream.c21
1 files changed, 20 insertions, 1 deletions
diff --git a/src/os/rstream.c b/src/os/rstream.c
index 1d3716284c..5f4cd5ed94 100644
--- a/src/os/rstream.c
+++ b/src/os/rstream.c
@@ -147,6 +147,11 @@ uint32_t rstream_read(RStream *rstream, char *buf, uint32_t count)
rstream->wpos - rstream->rpos); // ...By the number of unread bytes
rstream->wpos -= rstream->rpos;
rstream->rpos = 0;
+
+ if (rstream->wpos < rstream->buffer_size) {
+ // Restart reading since we have freed some space
+ rstream_start(rstream);
+ }
}
return read_count;
@@ -167,8 +172,9 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
return;
}
- buf->base = rstream->buffer + rstream->wpos;
buf->len = rstream->buffer_size - rstream->wpos;
+ buf->base = rstream->buffer + rstream->wpos;
+
// Avoid `alloc_cb`, `alloc_cb` sequences on windows
rstream->reading = true;
}
@@ -193,10 +199,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rstream->wpos += cnt;
+
+ if (rstream->wpos == rstream->buffer_size) {
+ // The last read filled the buffer, stop reading for now
+ rstream_stop(rstream);
+ }
+
// Invoke the callback passing in the number of bytes available and data
// associated with the stream
rstream->cb(rstream, rstream->data, false);
rstream->reading = false;
+
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -228,5 +241,11 @@ static void fread_idle_cb(uv_idle_t *handle)
rstream->wpos += req.result;
rstream->fpos += req.result;
+
+ if (rstream->wpos == rstream->buffer_size) {
+ // The last read filled the buffer, stop reading for now
+ rstream_stop(rstream);
+ }
+
rstream->cb(rstream, rstream->data, false);
}