diff options
author | bfredl <bjorn.linse@gmail.com> | 2024-06-09 11:33:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-09 11:33:00 +0200 |
commit | 2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c (patch) | |
tree | db5c33d8cc633fae5d74cd361e0a00c12f6541cc /src/nvim/event | |
parent | 9afa1fd35510c5fe485f4a1dfdabf94e5f051a1c (diff) | |
parent | 78d21593a35cf89692224f1000a04d3c9fff8add (diff) | |
download | rneovim-2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c.tar.gz rneovim-2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c.tar.bz2 rneovim-2b2dd6accf249f03f78f72c75ce7e516f1c1ac3c.zip |
Merge pull request #29141 from bfredl/rstream2
refactor(io): make rstream use a linear buffer
Diffstat (limited to 'src/nvim/event')
-rw-r--r-- | src/nvim/event/defs.h | 16 | ||||
-rw-r--r-- | src/nvim/event/process.c | 3 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 163 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 3 |
4 files changed, 107 insertions, 78 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index 8563006159..41690ead88 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -6,7 +6,6 @@ #include <uv.h> #include "nvim/eval/typval_defs.h" -#include "nvim/rbuffer_defs.h" #include "nvim/types_defs.h" enum { EVENT_HANDLER_MAX_ARGC = 10, }; @@ -59,11 +58,13 @@ typedef struct rstream RStream; /// Type of function called when the RStream buffer is filled with data /// /// @param stream The Stream instance -/// @param buf The associated RBuffer instance +/// @param read_data data that was read /// @param count Number of bytes that was read. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof); +/// @return number of bytes which were consumed +typedef size_t (*stream_read_cb)(RStream *stream, const char *read_data, size_t count, void *data, + bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -102,11 +103,16 @@ struct stream { struct rstream { Stream s; bool did_eof; - RBuffer *buffer; + bool want_read; + bool pending_read; + bool paused_full; + char *buffer; // ARENA_BLOCK_SIZE + char *read_pos; + char *write_pos; uv_buf_t uvbuf; stream_read_cb read_cb; size_t num_bytes; - size_t fpos; + int64_t fpos; }; #define ADDRESS_MAX_SIZE 256 diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 710376cd62..70fc31ba21 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -18,7 +18,6 @@ #include "nvim/os/pty_process.h" #include "nvim/os/shell.h" #include "nvim/os/time.h" -#include "nvim/rbuffer_defs.h" #include "nvim/ui_client.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -355,7 +354,7 @@ static void flush_stream(Process *proc, RStream *stream) int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, &system_buffer_size); if (err) { - system_buffer_size = (int)rbuffer_capacity(stream->buffer); + system_buffer_size = ARENA_BLOCK_SIZE; } size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 6c7fa20bd8..71290d0c0d 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -11,38 +11,44 @@ #include "nvim/macros_defs.h" #include "nvim/main.h" #include "nvim/os/os_defs.h" -#include "nvim/rbuffer.h" -#include "nvim/rbuffer_defs.h" #include "nvim/types_defs.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize) +void rstream_init_fd(Loop *loop, RStream *stream, int fd) FUNC_ATTR_NONNULL_ARG(1, 2) { stream_init(loop, &stream->s, fd, NULL); - rstream_init(stream, bufsize); + rstream_init(stream); } -void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize) +void rstream_init_stream(RStream *stream, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(1, 2) { stream_init(NULL, &stream->s, -1, uvstream); - rstream_init(stream, bufsize); + rstream_init(stream); } -void rstream_init(RStream *stream, size_t bufsize) +void rstream_init(RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { stream->fpos = 0; stream->read_cb = NULL; stream->num_bytes = 0; - stream->buffer = rbuffer_new(bufsize); - stream->buffer->data = stream; - stream->buffer->full_cb = on_rbuffer_full; - stream->buffer->nonfull_cb = on_rbuffer_nonfull; + stream->buffer = alloc_block(); + stream->read_pos = stream->write_pos = stream->buffer; +} + +void rstream_start_inner(RStream *stream) + FUNC_ATTR_NONNULL_ARG(1) +{ + if (stream->s.uvstream) { + uv_read_start(stream->s.uvstream, alloc_cb, read_cb); + } else { + uv_idle_start(&stream->s.uv.idle, fread_idle_cb); + } } /// Starts watching for events from a `Stream` instance. @@ -53,17 +59,16 @@ void rstream_start(RStream *stream, stream_read_cb cb, void *data) { stream->read_cb = cb; stream->s.cb_data = data; - if (stream->s.uvstream) { - uv_read_start(stream->s.uvstream, alloc_cb, read_cb); - } else { - uv_idle_start(&stream->s.uv.idle, fread_idle_cb); + stream->want_read = true; + if (!stream->paused_full) { + rstream_start_inner(stream); } } /// Stops watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_stop(RStream *stream) +void rstream_stop_inner(RStream *stream) FUNC_ATTR_NONNULL_ALL { if (stream->s.uvstream) { @@ -73,16 +78,14 @@ void rstream_stop(RStream *stream) } } -static void on_rbuffer_full(RBuffer *buf, void *data) -{ - rstream_stop(data); -} - -static void on_rbuffer_nonfull(RBuffer *buf, void *data) +/// Stops watching for events from a `Stream` instance. +/// +/// @param stream The `Stream` instance +void rstream_stop(RStream *stream) + FUNC_ATTR_NONNULL_ALL { - RStream *stream = data; - assert(stream->read_cb); - rstream_start(stream, stream->read_cb, stream->s.cb_data); + rstream_stop_inner(stream); + stream->want_read = false; } // Callbacks used by libuv @@ -91,10 +94,9 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { RStream *stream = handle->data; - // `uv_buf_t.len` happens to have different size on Windows. - size_t write_count; - buf->base = rbuffer_write_ptr(stream->buffer, &write_count); - buf->len = UV_BUF_LEN(write_count); + buf->base = stream->write_pos; + // `uv_buf_t.len` happens to have different size on Windows (as a treat) + buf->len = UV_BUF_LEN(rstream_space(stream)); } /// Callback invoked by libuv after it copies the data into the buffer provided @@ -108,21 +110,21 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. // - // We don't need to do anything with the RBuffer because the next call + // We don't need to do anything with the buffer because the next call // to `alloc_cb` will return the same unused pointer (`rbuffer_produced` // won't be called) if (cnt == UV_ENOBUFS || cnt == 0) { return; } else if (cnt == UV_EOF && uvstream->type == UV_TTY) { // The TTY driver might signal EOF without closing the stream - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); } else { DLOG("closing Stream (%p): %s (%s)", (void *)stream, uv_err_name((int)cnt), os_strerror((int)cnt)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); } return; } @@ -130,10 +132,13 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // at this point we're sure that cnt is positive, no error occurred size_t nread = (size_t)cnt; stream->num_bytes += nread; - // Data was already written, so all we need is to update 'wpos' to reflect - // the space actually used in the buffer. - rbuffer_produced(stream->buffer, nread); - invoke_read_cb(stream, nread, false); + stream->write_pos += cnt; + invoke_read_cb(stream, false); +} + +static size_t rstream_space(RStream *stream) +{ + return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos); } /// Called by the by the 'idle' handle to emulate a reading event @@ -146,52 +151,37 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_t req; RStream *stream = handle->data; + stream->uvbuf.base = stream->write_pos; // `uv_buf_t.len` happens to have different size on Windows. - size_t write_count; - stream->uvbuf.base = rbuffer_write_ptr(stream->buffer, &write_count); - stream->uvbuf.len = UV_BUF_LEN(write_count); - - // the offset argument to uv_fs_read is int64_t, could someone really try - // to read more than 9 quintillion (9e18) bytes? - // upcast is meant to avoid tautological condition warning on 32 bits - uintmax_t fpos_intmax = stream->fpos; - if (fpos_intmax > INT64_MAX) { - ELOG("stream offset overflow"); - preserve_exit("stream offset overflow"); - } + stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream)); // Synchronous read - uv_fs_read(handle->loop, - &req, - stream->s.fd, - &stream->uvbuf, - 1, - (int64_t)stream->fpos, - NULL); + uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->fpos, NULL); uv_fs_req_cleanup(&req); if (req.result <= 0) { uv_idle_stop(&stream->s.uv.idle); - invoke_read_cb(stream, 0, true); + invoke_read_cb(stream, true); return; } - // no errors (req.result (ssize_t) is positive), it's safe to cast. - size_t nread = (size_t)req.result; - rbuffer_produced(stream->buffer, nread); - stream->fpos += nread; - invoke_read_cb(stream, nread, false); + // no errors (req.result (ssize_t) is positive), it's safe to use. + stream->write_pos += req.result; + stream->fpos += req.result; + invoke_read_cb(stream, false); } static void read_event(void **argv) { RStream *stream = argv[0]; + stream->pending_read = false; if (stream->read_cb) { - size_t count = (uintptr_t)argv[1]; - bool eof = (uintptr_t)argv[2]; - stream->did_eof = eof; - stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof); + size_t available = rstream_available(stream); + size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data, + stream->did_eof); + assert(consumed <= available); + rstream_consume(stream, consumed); } stream->s.pending_reqs--; if (stream->s.closed && !stream->s.pending_reqs) { @@ -199,13 +189,48 @@ static void read_event(void **argv) } } -static void invoke_read_cb(RStream *stream, size_t count, bool eof) +size_t rstream_available(RStream *stream) { + return (size_t)(stream->write_pos - stream->read_pos); +} + +void rstream_consume(RStream *stream, size_t consumed) +{ + stream->read_pos += consumed; + size_t remaining = (size_t)(stream->write_pos - stream->read_pos); + if (remaining > 0 && stream->read_pos > stream->buffer) { + memmove(stream->buffer, stream->read_pos, remaining); + stream->read_pos = stream->buffer; + stream->write_pos = stream->buffer + remaining; + } else if (remaining == 0) { + stream->read_pos = stream->write_pos = stream->buffer; + } + + if (stream->want_read && stream->paused_full && rstream_space(stream)) { + assert(stream->read_cb); + stream->paused_full = false; + rstream_start_inner(stream); + } +} + +static void invoke_read_cb(RStream *stream, bool eof) +{ + stream->did_eof |= eof; + + if (!rstream_space(stream)) { + rstream_stop_inner(stream); + stream->paused_full = true; + } + + // we cannot use pending_reqs as a socket can have both pending reads and writes + if (stream->pending_read) { + return; + } + // Don't let the stream be closed before the event is processed. stream->s.pending_reqs++; - - CREATE_EVENT(stream->s.events, read_event, - stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); + stream->pending_read = true; + CREATE_EVENT(stream->s.events, read_event, stream); } void rstream_may_close(RStream *stream) diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 3d26dd868f..bc1b503f4c 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -8,7 +8,6 @@ #include "nvim/event/loop.h" #include "nvim/event/stream.h" #include "nvim/log.h" -#include "nvim/rbuffer.h" #include "nvim/types_defs.h" #ifdef MSWIN # include "nvim/os/os_win_console.h" @@ -149,7 +148,7 @@ static void rstream_close_cb(uv_handle_t *handle) { RStream *stream = handle->data; if (stream->buffer) { - rbuffer_free(stream->buffer); + free_block(stream->buffer); } close_cb(handle); } |