diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-06-30 13:37:19 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-01 05:40:53 -0300 |
commit | 0ef80b9c2b922280c3ba2c0a8638f23ae57d6618 (patch) | |
tree | c8d6f5631df1e8eb69022cae647f6e0436254830 /src/nvim/os | |
parent | dcaf9c6bc3d5f83782fca7a145ba5feac7746b1e (diff) | |
download | rneovim-0ef80b9c2b922280c3ba2c0a8638f23ae57d6618.tar.gz rneovim-0ef80b9c2b922280c3ba2c0a8638f23ae57d6618.tar.bz2 rneovim-0ef80b9c2b922280c3ba2c0a8638f23ae57d6618.zip |
rbuffer: Reimplement as a ring buffer and decouple from rstream
Extract the RBuffer class from rstream.c and reimplement it as a ring buffer,
a more efficient version that doesn't need to relocate memory.
The old rbuffer_read/rbuffer_write interfaces are kept for simple
reading/writing, and the RBUFFER_UNTIL_{FULL,EMPTY} macros are introduced to
hide wrapping logic when more control is required(such as passing the buffer
pointer to a library function that writes directly to the pointer)
Also add a basic infrastructure for writing helper C files that are only
compiled in the unit test library, and use this to write unit tests for RBuffer
which contains some macros that can't be accessed directly by luajit.
Helped-by: oni-link <knil.ino@gmail.com>
Reviewed-by: oni-link <knil.ino@gmail.com>
Reviewed-by: Scott Prager <splinterofchaos@gmail.com>
Reviewed-by: Justin M. Keyes <justinkz@gmail.com>
Reviewed-by: Michael Reed <m.reed@mykolab.com>
Diffstat (limited to 'src/nvim/os')
-rw-r--r-- | src/nvim/os/input.c | 31 | ||||
-rw-r--r-- | src/nvim/os/job.c | 6 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 201 | ||||
-rw-r--r-- | src/nvim/os/rstream.h | 1 | ||||
-rw-r--r-- | src/nvim/os/rstream_defs.h | 7 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 29 |
6 files changed, 53 insertions, 222 deletions
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 74a5d3bc2e..726335bd9a 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -79,7 +79,7 @@ void input_stop(void) // Low level input function int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) { - if (rbuffer_pending(input_buffer)) { + if (rbuffer_size(input_buffer)) { return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); } @@ -108,7 +108,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) return 0; } - if (rbuffer_pending(input_buffer)) { + if (rbuffer_size(input_buffer)) { // Safe to convert rbuffer_read to int, it will never overflow since we use // relatively small buffers. return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); @@ -153,7 +153,7 @@ size_t input_enqueue(String keys) { char *ptr = keys.data, *end = ptr + keys.size; - while (rbuffer_available(input_buffer) >= 6 && ptr < end) { + while (rbuffer_space(input_buffer) >= 6 && ptr < end) { uint8_t buf[6] = {0}; unsigned int new_size = trans_special((uint8_t **)&ptr, buf, true); @@ -309,16 +309,17 @@ static InbufPollResult inbuf_poll(int ms) return input_eof ? kInputEof : kInputNone; } -static void read_cb(RStream *rstream, void *data, bool at_eof) +static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool at_eof) { if (at_eof) { input_eof = true; } - char *buf = rbuffer_read_ptr(read_buffer); - size_t buf_size = rbuffer_pending(read_buffer); - (void)rbuffer_write(input_buffer, buf, buf_size); - rbuffer_consumed(read_buffer, buf_size); + assert(rbuffer_space(input_buffer) >= rbuffer_size(read_buffer)); + RBUFFER_UNTIL_EMPTY(read_buffer, ptr, len) { + (void)rbuffer_write(input_buffer, ptr, len); + rbuffer_consumed(read_buffer, len); + } } static void process_interrupts(void) @@ -327,18 +328,16 @@ static void process_interrupts(void) return; } - char *inbuf = rbuffer_read_ptr(input_buffer); - size_t count = rbuffer_pending(input_buffer), consume_count = 0; - - for (int i = (int)count - 1; i >= 0; i--) { - if (inbuf[i] == 3) { + size_t consume_count = 0; + RBUFFER_EACH_REVERSE(input_buffer, c, i) { + if ((uint8_t)c == 3) { got_int = true; - consume_count = (size_t)i; + consume_count = i; break; } } - if (got_int) { + if (got_int && consume_count) { // Remove everything typed before the CTRL-C rbuffer_consumed(input_buffer, consume_count); } @@ -362,7 +361,7 @@ static int push_event_key(uint8_t *buf, int maxlen) static bool input_ready(void) { return typebuf_was_filled || // API call filled typeahead - rbuffer_pending(input_buffer) > 0 || // Input buffer filled + rbuffer_size(input_buffer) || // Input buffer filled event_has_deferred(); // Events must be processed } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 038d0e3c26..4769ee4d2f 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -404,17 +404,17 @@ static void job_stop_timer_cb(uv_timer_t *handle) } // Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. -static void read_cb(RStream *rstream, void *data, bool eof) +static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) { Job *job = data; if (rstream == job->out) { - job->opts.stdout_cb(rstream, data, eof); + job->opts.stdout_cb(rstream, buf, data, eof); if (eof) { close_job_out(job); } } else { - job->opts.stderr_cb(rstream, data, eof); + job->opts.stderr_cb(rstream, buf, data, eof); if (eof) { close_job_err(job); } diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index a99745f068..af84288f0f 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -14,12 +14,6 @@ #include "nvim/log.h" #include "nvim/misc1.h" -struct rbuffer { - char *data; - size_t capacity, rpos, wpos; - RStream *rstream; -}; - struct rstream { void *data; uv_buf_t uvbuf; @@ -37,135 +31,6 @@ struct rstream { # include "os/rstream.c.generated.h" #endif -/// Creates a new `RBuffer` instance. -RBuffer *rbuffer_new(size_t capacity) -{ - RBuffer *rv = xmalloc(sizeof(RBuffer)); - rv->data = xmalloc(capacity); - rv->capacity = capacity; - rv->rpos = rv->wpos = 0; - rv->rstream = NULL; - return rv; -} - -/// Advances `rbuffer` read pointers to consume data. If the associated -/// RStream had stopped because the buffer was full, this will restart it. -/// -/// This is called automatically by rbuffer_read, but when using -/// `rbuffer_read_ptr` directly, this needs to called after the data was -/// consumed. -void rbuffer_consumed(RBuffer *rbuffer, size_t count) -{ - rbuffer->rpos += count; - if (count && rbuffer->wpos == rbuffer->capacity) { - // `wpos` is at the end of the buffer, so free some space by moving unread - // data... - rbuffer_relocate(rbuffer); - if (rbuffer->rstream) { - // restart the associated RStream - rstream_start(rbuffer->rstream); - } - } -} - -/// Advances `rbuffer` write pointers. If the internal buffer becomes full, -/// this will stop the associated RStream instance. -void rbuffer_produced(RBuffer *rbuffer, size_t count) -{ - rbuffer->wpos += count; - DLOG("Received %u bytes from RStream(%p)", (size_t)count, rbuffer->rstream); - - rbuffer_relocate(rbuffer); - if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { - // The last read filled the buffer, stop reading for now - // - rstream_stop(rbuffer->rstream); - DLOG("Buffer for RStream(%p) is full, stopping it", rbuffer->rstream); - } -} - -/// Reads data from a `RBuffer` instance into a raw buffer. -/// -/// @param rbuffer The `RBuffer` instance -/// @param buffer The buffer which will receive the data -/// @param count Number of bytes that `buffer` can accept -/// @return The number of bytes copied into `buffer` -size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count) -{ - size_t read_count = rbuffer_pending(rbuffer); - - if (count < read_count) { - read_count = count; - } - - if (read_count > 0) { - memcpy(buffer, rbuffer_read_ptr(rbuffer), read_count); - rbuffer_consumed(rbuffer, read_count); - } - - return read_count; -} - -/// Copies data to `rbuffer` read queue. -/// -/// @param rbuffer the `RBuffer` instance -/// @param buffer The buffer containing data to be copied -/// @param count Number of bytes that should be copied -/// @return The number of bytes actually copied -size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count) -{ - size_t write_count = rbuffer_available(rbuffer); - - if (count < write_count) { - write_count = count; - } - - if (write_count > 0) { - memcpy(rbuffer_write_ptr(rbuffer), buffer, write_count); - rbuffer_produced(rbuffer, write_count); - } - - return write_count; -} - -/// Returns a pointer to a raw buffer containing the first byte available for -/// reading. -char *rbuffer_read_ptr(RBuffer *rbuffer) -{ - return rbuffer->data + rbuffer->rpos; -} - -/// Returns a pointer to a raw buffer containing the first byte available for -/// write. -char *rbuffer_write_ptr(RBuffer *rbuffer) -{ - return rbuffer->data + rbuffer->wpos; -} - -/// Returns the number of bytes ready for consumption in `rbuffer` -/// -/// @param rbuffer The `RBuffer` instance -/// @return The number of bytes ready for consumption -size_t rbuffer_pending(RBuffer *rbuffer) -{ - return rbuffer->wpos - rbuffer->rpos; -} - -/// Returns available space in `rbuffer` -/// -/// @param rbuffer The `RBuffer` instance -/// @return The space available in number of bytes -size_t rbuffer_available(RBuffer *rbuffer) -{ - return rbuffer->capacity - rbuffer->wpos; -} - -void rbuffer_free(RBuffer *rbuffer) -{ - xfree(rbuffer->data); - xfree(rbuffer); -} - /// Creates a new RStream instance. A RStream encapsulates all the boilerplate /// necessary for reading from a libuv stream. /// @@ -177,8 +42,10 @@ void rbuffer_free(RBuffer *rbuffer) RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) { RStream *rv = xmalloc(sizeof(RStream)); + buffer->data = rv; + buffer->full_cb = on_rbuffer_full; + buffer->nonfull_cb = on_rbuffer_nonfull; rv->buffer = buffer; - rv->buffer->rstream = rv; rv->fpos = 0; rv->data = data; rv->cb = cb; @@ -190,16 +57,14 @@ RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) return rv; } -/// Returns the read pointer used by the rstream. -char *rstream_read_ptr(RStream *rstream) +static void on_rbuffer_full(RBuffer *buf, void *data) { - return rbuffer_read_ptr(rstream->buffer); + rstream_stop(data); } -/// Returns the number of bytes before the rstream is full. -size_t rstream_available(RStream *rstream) +static void on_rbuffer_nonfull(RBuffer *buf, void *data) { - return rbuffer_available(rstream->buffer); + rstream_start(data); } /// Frees all memory allocated for a RStream instance @@ -297,37 +162,13 @@ void rstream_stop(RStream *rstream) } } -/// Returns the number of bytes ready for consumption in `rstream` -size_t rstream_pending(RStream *rstream) -{ - return rbuffer_pending(rstream->buffer); -} - -/// Reads data from a `RStream` instance into a buffer. -/// -/// @param rstream The `RStream` instance -/// @param buffer The buffer which will receive the data -/// @param count Number of bytes that `buffer` can accept -/// @return The number of bytes copied into `buffer` -size_t rstream_read(RStream *rstream, char *buffer, size_t count) -{ - return rbuffer_read(rstream->buffer, buffer, count); -} - -RBuffer *rstream_buffer(RStream *rstream) -{ - return rstream->buffer; -} - // Callbacks used by libuv // Called by libuv to allocate memory for reading. static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { RStream *rstream = handle_get_rstream(handle); - - buf->len = rbuffer_available(rstream->buffer); - buf->base = rbuffer_write_ptr(rstream->buffer); + buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len); } // Callback invoked by libuv after it copies the data into the buffer provided @@ -341,7 +182,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt != UV_ENOBUFS // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. - // + // // We don't need to do anything with the RBuffer because the next call // to `alloc_cb` will return the same unused pointer(`rbuffer_produced` // won't be called) @@ -351,18 +192,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); - rstream->cb(rstream, rstream->data, true); + rstream->cb(rstream, rstream->buffer, rstream->data, true); } return; } // at this point we're sure that cnt is positive, no error occurred - size_t nread = (size_t) cnt; - + size_t nread = (size_t)cnt; // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(rstream->buffer, nread); - rstream->cb(rstream, rstream->data, false); + rstream->cb(rstream, rstream->buffer, rstream->data, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -371,8 +211,7 @@ static void fread_idle_cb(uv_idle_t *handle) uv_fs_t req; RStream *rstream = handle_get_rstream((uv_handle_t *)handle); - rstream->uvbuf.len = rbuffer_available(rstream->buffer); - rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer); + rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len); // the offset argument to uv_fs_read is int64_t, could someone really try // to read more than 9 quintillion (9e18) bytes? @@ -397,7 +236,7 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(rstream->fread_idle); - rstream->cb(rstream, rstream->data, true); + rstream->cb(rstream, rstream->buffer, rstream->data, true); return; } @@ -412,15 +251,3 @@ static void close_cb(uv_handle_t *handle) xfree(handle->data); xfree(handle); } - -static void rbuffer_relocate(RBuffer *rbuffer) -{ - assert(rbuffer->rpos <= rbuffer->wpos); - // Move data ... - memmove( - rbuffer->data, // ...to the beginning of the buffer(rpos 0) - rbuffer->data + rbuffer->rpos, // ...From the first unread position - rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes - rbuffer->wpos -= rbuffer->rpos; - rbuffer->rpos = 0; -} diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h index 713d1e77e6..3e24724573 100644 --- a/src/nvim/os/rstream.h +++ b/src/nvim/os/rstream.h @@ -5,7 +5,6 @@ #include <stdint.h> #include <uv.h> #include "nvim/os/event_defs.h" - #include "nvim/os/rstream_defs.h" #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/os/rstream_defs.h b/src/nvim/os/rstream_defs.h index 1d71160963..45dced0b62 100644 --- a/src/nvim/os/rstream_defs.h +++ b/src/nvim/os/rstream_defs.h @@ -3,15 +3,18 @@ #include <stdbool.h> -typedef struct rbuffer RBuffer; +#include "nvim/rbuffer.h" + typedef struct rstream RStream; /// Type of function called when the RStream receives data /// /// @param rstream The RStream instance +/// @param rbuffer The associated RBuffer instance /// @param data State associated with the RStream instance /// @param eof If the stream reached EOF. -typedef void (*rstream_cb)(RStream *rstream, void *data, bool eof); +typedef void (*rstream_cb)(RStream *rstream, RBuffer *buf, void *data, + bool eof); #endif // NVIM_OS_RSTREAM_DEFS_H diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 2de3b1aeed..48174533a6 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -283,25 +283,28 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired) buf->data = xrealloc(buf->data, buf->cap); } -static void system_data_cb(RStream *rstream, void *data, bool eof) +static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) { Job *job = data; - DynamicBuffer *buf = job_data(job); + DynamicBuffer *dbuf = job_data(job); - size_t nread = rstream_pending(rstream); - - dynamic_buffer_ensure(buf, buf->len + nread + 1); - rstream_read(rstream, buf->data + buf->len, nread); - - buf->len += nread; + size_t nread = buf->size; + dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1); + rbuffer_read(buf, dbuf->data + dbuf->len, nread); + dbuf->len += nread; } -static void out_data_cb(RStream *rstream, void *data, bool eof) +static void out_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof) { - RBuffer *rbuffer = rstream_buffer(rstream); - size_t written = write_output(rbuffer_read_ptr(rbuffer), - rbuffer_pending(rbuffer), false, eof); - rbuffer_consumed(rbuffer, written); + RBUFFER_UNTIL_EMPTY(buf, ptr, len) { + size_t written = write_output(ptr, len, false, + eof && len <= rbuffer_size(buf)); + if (written) { + rbuffer_consumed(buf, written); + } else { + break; + } + } } /// Parses a command string into a sequence of words, taking quotes into |