aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event
diff options
context:
space:
mode:
authorbfredl <bjorn.linse@gmail.com>2024-06-09 11:33:00 +0200
committerGitHub <noreply@github.com>2024-06-09 11:33:00 +0200
commit2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c (patch)
treedb5c33d8cc633fae5d74cd361e0a00c12f6541cc /src/nvim/event
parent9afa1fd35510c5fe485f4a1dfdabf94e5f051a1c (diff)
parent78d21593a35cf89692224f1000a04d3c9fff8add (diff)
downloadrneovim-2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c.tar.gz
rneovim-2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c.tar.bz2
rneovim-2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c.zip
Merge pull request #29141 from bfredl/rstream2
refactor(io): make rstream use a linear buffer
Diffstat (limited to 'src/nvim/event')
-rw-r--r--src/nvim/event/defs.h16
-rw-r--r--src/nvim/event/process.c3
-rw-r--r--src/nvim/event/rstream.c163
-rw-r--r--src/nvim/event/stream.c3
4 files changed, 107 insertions, 78 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h
index 8563006159..41690ead88 100644
--- a/src/nvim/event/defs.h
+++ b/src/nvim/event/defs.h
@@ -6,7 +6,6 @@
#include <uv.h>
#include "nvim/eval/typval_defs.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
enum { EVENT_HANDLER_MAX_ARGC = 10, };
@@ -59,11 +58,13 @@ typedef struct rstream RStream;
/// Type of function called when the RStream buffer is filled with data
///
/// @param stream The Stream instance
-/// @param buf The associated RBuffer instance
+/// @param read_data data that was read
/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
-typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof);
+/// @return number of bytes which were consumed
+typedef size_t (*stream_read_cb)(RStream *stream, const char *read_data, size_t count, void *data,
+ bool eof);
/// Type of function called when the Stream has information about a write
/// request.
@@ -102,11 +103,16 @@ struct stream {
struct rstream {
Stream s;
bool did_eof;
- RBuffer *buffer;
+ bool want_read;
+ bool pending_read;
+ bool paused_full;
+ char *buffer; // ARENA_BLOCK_SIZE
+ char *read_pos;
+ char *write_pos;
uv_buf_t uvbuf;
stream_read_cb read_cb;
size_t num_bytes;
- size_t fpos;
+ int64_t fpos;
};
#define ADDRESS_MAX_SIZE 256
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 710376cd62..70fc31ba21 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -18,7 +18,6 @@
#include "nvim/os/pty_process.h"
#include "nvim/os/shell.h"
#include "nvim/os/time.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/ui_client.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -355,7 +354,7 @@ static void flush_stream(Process *proc, RStream *stream)
int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe,
&system_buffer_size);
if (err) {
- system_buffer_size = (int)rbuffer_capacity(stream->buffer);
+ system_buffer_size = ARENA_BLOCK_SIZE;
}
size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size;
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 6c7fa20bd8..71290d0c0d 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -11,38 +11,44 @@
#include "nvim/macros_defs.h"
#include "nvim/main.h"
#include "nvim/os/os_defs.h"
-#include "nvim/rbuffer.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/rstream.c.generated.h"
#endif
-void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize)
+void rstream_init_fd(Loop *loop, RStream *stream, int fd)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream_init(loop, &stream->s, fd, NULL);
- rstream_init(stream, bufsize);
+ rstream_init(stream);
}
-void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize)
+void rstream_init_stream(RStream *stream, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream_init(NULL, &stream->s, -1, uvstream);
- rstream_init(stream, bufsize);
+ rstream_init(stream);
}
-void rstream_init(RStream *stream, size_t bufsize)
+void rstream_init(RStream *stream)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->fpos = 0;
stream->read_cb = NULL;
stream->num_bytes = 0;
- stream->buffer = rbuffer_new(bufsize);
- stream->buffer->data = stream;
- stream->buffer->full_cb = on_rbuffer_full;
- stream->buffer->nonfull_cb = on_rbuffer_nonfull;
+ stream->buffer = alloc_block();
+ stream->read_pos = stream->write_pos = stream->buffer;
+}
+
+void rstream_start_inner(RStream *stream)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ if (stream->s.uvstream) {
+ uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
+ } else {
+ uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
+ }
}
/// Starts watching for events from a `Stream` instance.
@@ -53,17 +59,16 @@ void rstream_start(RStream *stream, stream_read_cb cb, void *data)
{
stream->read_cb = cb;
stream->s.cb_data = data;
- if (stream->s.uvstream) {
- uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
- } else {
- uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
+ stream->want_read = true;
+ if (!stream->paused_full) {
+ rstream_start_inner(stream);
}
}
/// Stops watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
-void rstream_stop(RStream *stream)
+void rstream_stop_inner(RStream *stream)
FUNC_ATTR_NONNULL_ALL
{
if (stream->s.uvstream) {
@@ -73,16 +78,14 @@ void rstream_stop(RStream *stream)
}
}
-static void on_rbuffer_full(RBuffer *buf, void *data)
-{
- rstream_stop(data);
-}
-
-static void on_rbuffer_nonfull(RBuffer *buf, void *data)
+/// Stops watching for events from a `Stream` instance.
+///
+/// @param stream The `Stream` instance
+void rstream_stop(RStream *stream)
+ FUNC_ATTR_NONNULL_ALL
{
- RStream *stream = data;
- assert(stream->read_cb);
- rstream_start(stream, stream->read_cb, stream->s.cb_data);
+ rstream_stop_inner(stream);
+ stream->want_read = false;
}
// Callbacks used by libuv
@@ -91,10 +94,9 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
RStream *stream = handle->data;
- // `uv_buf_t.len` happens to have different size on Windows.
- size_t write_count;
- buf->base = rbuffer_write_ptr(stream->buffer, &write_count);
- buf->len = UV_BUF_LEN(write_count);
+ buf->base = stream->write_pos;
+ // `uv_buf_t.len` happens to have different size on Windows (as a treat)
+ buf->len = UV_BUF_LEN(rstream_space(stream));
}
/// Callback invoked by libuv after it copies the data into the buffer provided
@@ -108,21 +110,21 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
//
- // We don't need to do anything with the RBuffer because the next call
+ // We don't need to do anything with the buffer because the next call
// to `alloc_cb` will return the same unused pointer (`rbuffer_produced`
// won't be called)
if (cnt == UV_ENOBUFS || cnt == 0) {
return;
} else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
// The TTY driver might signal EOF without closing the stream
- invoke_read_cb(stream, 0, true);
+ invoke_read_cb(stream, true);
} else {
DLOG("closing Stream (%p): %s (%s)", (void *)stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(uvstream);
- invoke_read_cb(stream, 0, true);
+ invoke_read_cb(stream, true);
}
return;
}
@@ -130,10 +132,13 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
stream->num_bytes += nread;
- // Data was already written, so all we need is to update 'wpos' to reflect
- // the space actually used in the buffer.
- rbuffer_produced(stream->buffer, nread);
- invoke_read_cb(stream, nread, false);
+ stream->write_pos += cnt;
+ invoke_read_cb(stream, false);
+}
+
+static size_t rstream_space(RStream *stream)
+{
+ return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos);
}
/// Called by the by the 'idle' handle to emulate a reading event
@@ -146,52 +151,37 @@ static void fread_idle_cb(uv_idle_t *handle)
uv_fs_t req;
RStream *stream = handle->data;
+ stream->uvbuf.base = stream->write_pos;
// `uv_buf_t.len` happens to have different size on Windows.
- size_t write_count;
- stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count);
- stream->uvbuf.len = UV_BUF_LEN(write_count);
-
- // the offset argument to uv_fs_read is int64_t, could someone really try
- // to read more than 9 quintillion (9e18) bytes?
- // upcast is meant to avoid tautological condition warning on 32 bits
- uintmax_t fpos_intmax = stream->fpos;
- if (fpos_intmax > INT64_MAX) {
- ELOG("stream offset overflow");
- preserve_exit("stream offset overflow");
- }
+ stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream));
// Synchronous read
- uv_fs_read(handle->loop,
- &req,
- stream->s.fd,
- &stream->uvbuf,
- 1,
- (int64_t)stream->fpos,
- NULL);
+ uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->fpos, NULL);
uv_fs_req_cleanup(&req);
if (req.result <= 0) {
uv_idle_stop(&stream->s.uv.idle);
- invoke_read_cb(stream, 0, true);
+ invoke_read_cb(stream, true);
return;
}
- // no errors (req.result (ssize_t) is positive), it's safe to cast.
- size_t nread = (size_t)req.result;
- rbuffer_produced(stream->buffer, nread);
- stream->fpos += nread;
- invoke_read_cb(stream, nread, false);
+ // no errors (req.result (ssize_t) is positive), it's safe to use.
+ stream->write_pos += req.result;
+ stream->fpos += req.result;
+ invoke_read_cb(stream, false);
}
static void read_event(void **argv)
{
RStream *stream = argv[0];
+ stream->pending_read = false;
if (stream->read_cb) {
- size_t count = (uintptr_t)argv[1];
- bool eof = (uintptr_t)argv[2];
- stream->did_eof = eof;
- stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof);
+ size_t available = rstream_available(stream);
+ size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data,
+ stream->did_eof);
+ assert(consumed <= available);
+ rstream_consume(stream, consumed);
}
stream->s.pending_reqs--;
if (stream->s.closed && !stream->s.pending_reqs) {
@@ -199,13 +189,48 @@ static void read_event(void **argv)
}
}
-static void invoke_read_cb(RStream *stream, size_t count, bool eof)
+size_t rstream_available(RStream *stream)
{
+ return (size_t)(stream->write_pos - stream->read_pos);
+}
+
+void rstream_consume(RStream *stream, size_t consumed)
+{
+ stream->read_pos += consumed;
+ size_t remaining = (size_t)(stream->write_pos - stream->read_pos);
+ if (remaining > 0 && stream->read_pos > stream->buffer) {
+ memmove(stream->buffer, stream->read_pos, remaining);
+ stream->read_pos = stream->buffer;
+ stream->write_pos = stream->buffer + remaining;
+ } else if (remaining == 0) {
+ stream->read_pos = stream->write_pos = stream->buffer;
+ }
+
+ if (stream->want_read && stream->paused_full && rstream_space(stream)) {
+ assert(stream->read_cb);
+ stream->paused_full = false;
+ rstream_start_inner(stream);
+ }
+}
+
+static void invoke_read_cb(RStream *stream, bool eof)
+{
+ stream->did_eof |= eof;
+
+ if (!rstream_space(stream)) {
+ rstream_stop_inner(stream);
+ stream->paused_full = true;
+ }
+
+ // we cannot use pending_reqs as a socket can have both pending reads and writes
+ if (stream->pending_read) {
+ return;
+ }
+
// Don't let the stream be closed before the event is processed.
stream->s.pending_reqs++;
-
- CREATE_EVENT(stream->s.events, read_event,
- stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
+ stream->pending_read = true;
+ CREATE_EVENT(stream->s.events, read_event, stream);
}
void rstream_may_close(RStream *stream)
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 3d26dd868f..bc1b503f4c 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -8,7 +8,6 @@
#include "nvim/event/loop.h"
#include "nvim/event/stream.h"
#include "nvim/log.h"
-#include "nvim/rbuffer.h"
#include "nvim/types_defs.h"
#ifdef MSWIN
# include "nvim/os/os_win_console.h"
@@ -149,7 +148,7 @@ static void rstream_close_cb(uv_handle_t *handle)
{
RStream *stream = handle->data;
if (stream->buffer) {
- rbuffer_free(stream->buffer);
+ free_block(stream->buffer);
}
close_cb(handle);
}