aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-07-01 09:27:42 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-07-01 09:27:42 -0300
commitb656a954cfec0379a5bb2f7f5d1f28cbc03324df (patch)
treec8d6f5631df1e8eb69022cae647f6e0436254830 /src/nvim/os
parentbfadf5a28b550bf99101c17244d5ea1b926e40c3 (diff)
parent0ef80b9c2b922280c3ba2c0a8638f23ae57d6618 (diff)
downloadrneovim-b656a954cfec0379a5bb2f7f5d1f28cbc03324df.tar.gz
rneovim-b656a954cfec0379a5bb2f7f5d1f28cbc03324df.tar.bz2
rneovim-b656a954cfec0379a5bb2f7f5d1f28cbc03324df.zip
Merge PR #2650 'Data structure improvements'
Diffstat (limited to 'src/nvim/os')
-rw-r--r--src/nvim/os/event.c7
-rw-r--r--src/nvim/os/fs.c33
-rw-r--r--src/nvim/os/input.c31
-rw-r--r--src/nvim/os/job.c6
-rw-r--r--src/nvim/os/rstream.c211
-rw-r--r--src/nvim/os/rstream.h1
-rw-r--r--src/nvim/os/rstream_defs.h7
-rw-r--r--src/nvim/os/shell.c29
8 files changed, 86 insertions, 239 deletions
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index 4c3a4581c3..56874b495d 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -149,7 +149,7 @@ void event_push(Event event, bool deferred)
// returns(user hits a key for example). To avoid this scenario, we call
// uv_stop when a event is enqueued.
uv_stop(uv_default_loop());
- *kl_pushp(Event, deferred ? deferred_events : immediate_events) = event;
+ kl_push(Event, deferred ? deferred_events : immediate_events, event);
}
void event_process(void)
@@ -159,9 +159,8 @@ void event_process(void)
static void process_events_from(klist_t(Event) *queue)
{
- Event event;
-
- while (kl_shift(Event, queue, &event) == 0) {
+ while (!kl_empty(queue)) {
+ Event event = kl_shift(Event, queue);
event.handler(event);
}
}
diff --git a/src/nvim/os/fs.c b/src/nvim/os/fs.c
index 52c10d0ca7..553dda5e88 100644
--- a/src/nvim/os/fs.c
+++ b/src/nvim/os/fs.c
@@ -19,6 +19,15 @@
// Many fs functions from libuv return that value on success.
static const int kLibuvSuccess = 0;
+static uv_loop_t fs_loop;
+
+
+// Initialize the fs module
+void fs_init(void)
+{
+ uv_loop_init(&fs_loop);
+}
+
/// Change to the given directory.
///
@@ -184,7 +193,7 @@ int os_open(const char* path, int flags, int mode)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t open_req;
- int r = uv_fs_open(uv_default_loop(), &open_req, path, flags, mode, NULL);
+ int r = uv_fs_open(&fs_loop, &open_req, path, flags, mode, NULL);
uv_fs_req_cleanup(&open_req);
// r is the same as open_req.result (except for OOM: then only r is set).
return r;
@@ -197,7 +206,7 @@ static bool os_stat(const char *name, uv_stat_t *statbuf)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_stat(uv_default_loop(), &request, name, NULL);
+ int result = uv_fs_stat(&fs_loop, &request, name, NULL);
*statbuf = request.statbuf;
uv_fs_req_cleanup(&request);
return (result == kLibuvSuccess);
@@ -224,7 +233,7 @@ int os_setperm(const char_u *name, int perm)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_chmod(uv_default_loop(), &request,
+ int result = uv_fs_chmod(&fs_loop, &request,
(const char*)name, perm, NULL);
uv_fs_req_cleanup(&request);
@@ -245,7 +254,7 @@ int os_fchown(int file_descriptor, uv_uid_t owner, uv_gid_t group)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_fchown(uv_default_loop(), &request, file_descriptor,
+ int result = uv_fs_fchown(&fs_loop, &request, file_descriptor,
owner, group, NULL);
uv_fs_req_cleanup(&request);
return result;
@@ -294,7 +303,7 @@ int os_rename(const char_u *path, const char_u *new_path)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_rename(uv_default_loop(), &request,
+ int result = uv_fs_rename(&fs_loop, &request,
(const char *)path, (const char *)new_path, NULL);
uv_fs_req_cleanup(&request);
@@ -312,7 +321,7 @@ int os_mkdir(const char *path, int32_t mode)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_mkdir(uv_default_loop(), &request, path, mode, NULL);
+ int result = uv_fs_mkdir(&fs_loop, &request, path, mode, NULL);
uv_fs_req_cleanup(&request);
return result;
}
@@ -328,7 +337,7 @@ int os_mkdtemp(const char *template, char *path)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_mkdtemp(uv_default_loop(), &request, template, NULL);
+ int result = uv_fs_mkdtemp(&fs_loop, &request, template, NULL);
if (result == kLibuvSuccess) {
STRNCPY(path, request.path, TEMP_FILE_PATH_MAXLEN);
}
@@ -343,7 +352,7 @@ int os_rmdir(const char *path)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_rmdir(uv_default_loop(), &request, path, NULL);
+ int result = uv_fs_rmdir(&fs_loop, &request, path, NULL);
uv_fs_req_cleanup(&request);
return result;
}
@@ -356,7 +365,7 @@ int os_rmdir(const char *path)
bool os_scandir(Directory *dir, const char *path)
FUNC_ATTR_NONNULL_ALL
{
- int r = uv_fs_scandir(uv_default_loop(), &dir->request, path, 0, NULL);
+ int r = uv_fs_scandir(&fs_loop, &dir->request, path, 0, NULL);
if (r <= 0) {
os_closedir(dir);
}
@@ -388,7 +397,7 @@ int os_remove(const char *path)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_unlink(uv_default_loop(), &request, path, NULL);
+ int result = uv_fs_unlink(&fs_loop, &request, path, NULL);
uv_fs_req_cleanup(&request);
return result;
}
@@ -413,7 +422,7 @@ bool os_fileinfo_link(const char *path, FileInfo *file_info)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_lstat(uv_default_loop(), &request, path, NULL);
+ int result = uv_fs_lstat(&fs_loop, &request, path, NULL);
file_info->stat = request.statbuf;
uv_fs_req_cleanup(&request);
return (result == kLibuvSuccess);
@@ -428,7 +437,7 @@ bool os_fileinfo_fd(int file_descriptor, FileInfo *file_info)
FUNC_ATTR_NONNULL_ALL
{
uv_fs_t request;
- int result = uv_fs_fstat(uv_default_loop(), &request, file_descriptor, NULL);
+ int result = uv_fs_fstat(&fs_loop, &request, file_descriptor, NULL);
file_info->stat = request.statbuf;
uv_fs_req_cleanup(&request);
return (result == kLibuvSuccess);
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index 74a5d3bc2e..726335bd9a 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -79,7 +79,7 @@ void input_stop(void)
// Low level input function
int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
{
- if (rbuffer_pending(input_buffer)) {
+ if (rbuffer_size(input_buffer)) {
return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen);
}
@@ -108,7 +108,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
return 0;
}
- if (rbuffer_pending(input_buffer)) {
+ if (rbuffer_size(input_buffer)) {
// Safe to convert rbuffer_read to int, it will never overflow since we use
// relatively small buffers.
return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen);
@@ -153,7 +153,7 @@ size_t input_enqueue(String keys)
{
char *ptr = keys.data, *end = ptr + keys.size;
- while (rbuffer_available(input_buffer) >= 6 && ptr < end) {
+ while (rbuffer_space(input_buffer) >= 6 && ptr < end) {
uint8_t buf[6] = {0};
unsigned int new_size = trans_special((uint8_t **)&ptr, buf, true);
@@ -309,16 +309,17 @@ static InbufPollResult inbuf_poll(int ms)
return input_eof ? kInputEof : kInputNone;
}
-static void read_cb(RStream *rstream, void *data, bool at_eof)
+static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool at_eof)
{
if (at_eof) {
input_eof = true;
}
- char *buf = rbuffer_read_ptr(read_buffer);
- size_t buf_size = rbuffer_pending(read_buffer);
- (void)rbuffer_write(input_buffer, buf, buf_size);
- rbuffer_consumed(read_buffer, buf_size);
+ assert(rbuffer_space(input_buffer) >= rbuffer_size(read_buffer));
+ RBUFFER_UNTIL_EMPTY(read_buffer, ptr, len) {
+ (void)rbuffer_write(input_buffer, ptr, len);
+ rbuffer_consumed(read_buffer, len);
+ }
}
static void process_interrupts(void)
@@ -327,18 +328,16 @@ static void process_interrupts(void)
return;
}
- char *inbuf = rbuffer_read_ptr(input_buffer);
- size_t count = rbuffer_pending(input_buffer), consume_count = 0;
-
- for (int i = (int)count - 1; i >= 0; i--) {
- if (inbuf[i] == 3) {
+ size_t consume_count = 0;
+ RBUFFER_EACH_REVERSE(input_buffer, c, i) {
+ if ((uint8_t)c == 3) {
got_int = true;
- consume_count = (size_t)i;
+ consume_count = i;
break;
}
}
- if (got_int) {
+ if (got_int && consume_count) {
// Remove everything typed before the CTRL-C
rbuffer_consumed(input_buffer, consume_count);
}
@@ -362,7 +361,7 @@ static int push_event_key(uint8_t *buf, int maxlen)
static bool input_ready(void)
{
return typebuf_was_filled || // API call filled typeahead
- rbuffer_pending(input_buffer) > 0 || // Input buffer filled
+ rbuffer_size(input_buffer) || // Input buffer filled
event_has_deferred(); // Events must be processed
}
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index 038d0e3c26..4769ee4d2f 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -404,17 +404,17 @@ static void job_stop_timer_cb(uv_timer_t *handle)
}
// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
-static void read_cb(RStream *rstream, void *data, bool eof)
+static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
if (rstream == job->out) {
- job->opts.stdout_cb(rstream, data, eof);
+ job->opts.stdout_cb(rstream, buf, data, eof);
if (eof) {
close_job_out(job);
}
} else {
- job->opts.stderr_cb(rstream, data, eof);
+ job->opts.stderr_cb(rstream, buf, data, eof);
if (eof) {
close_job_err(job);
}
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index 702f282d53..af84288f0f 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -14,12 +14,6 @@
#include "nvim/log.h"
#include "nvim/misc1.h"
-struct rbuffer {
- char *data;
- size_t capacity, rpos, wpos;
- RStream *rstream;
-};
-
struct rstream {
void *data;
uv_buf_t uvbuf;
@@ -37,135 +31,6 @@ struct rstream {
# include "os/rstream.c.generated.h"
#endif
-/// Creates a new `RBuffer` instance.
-RBuffer *rbuffer_new(size_t capacity)
-{
- RBuffer *rv = xmalloc(sizeof(RBuffer));
- rv->data = xmalloc(capacity);
- rv->capacity = capacity;
- rv->rpos = rv->wpos = 0;
- rv->rstream = NULL;
- return rv;
-}
-
-/// Advances `rbuffer` read pointers to consume data. If the associated
-/// RStream had stopped because the buffer was full, this will restart it.
-///
-/// This is called automatically by rbuffer_read, but when using
-/// `rbuffer_read_ptr` directly, this needs to called after the data was
-/// consumed.
-void rbuffer_consumed(RBuffer *rbuffer, size_t count)
-{
- rbuffer->rpos += count;
- if (count && rbuffer->wpos == rbuffer->capacity) {
- // `wpos` is at the end of the buffer, so free some space by moving unread
- // data...
- rbuffer_relocate(rbuffer);
- if (rbuffer->rstream) {
- // restart the associated RStream
- rstream_start(rbuffer->rstream);
- }
- }
-}
-
-/// Advances `rbuffer` write pointers. If the internal buffer becomes full,
-/// this will stop the associated RStream instance.
-void rbuffer_produced(RBuffer *rbuffer, size_t count)
-{
- rbuffer->wpos += count;
- DLOG("Received %u bytes from RStream(%p)", (size_t)count, rbuffer->rstream);
-
- rbuffer_relocate(rbuffer);
- if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
- // The last read filled the buffer, stop reading for now
- //
- rstream_stop(rbuffer->rstream);
- DLOG("Buffer for RStream(%p) is full, stopping it", rbuffer->rstream);
- }
-}
-
-/// Reads data from a `RBuffer` instance into a raw buffer.
-///
-/// @param rbuffer The `RBuffer` instance
-/// @param buffer The buffer which will receive the data
-/// @param count Number of bytes that `buffer` can accept
-/// @return The number of bytes copied into `buffer`
-size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count)
-{
- size_t read_count = rbuffer_pending(rbuffer);
-
- if (count < read_count) {
- read_count = count;
- }
-
- if (read_count > 0) {
- memcpy(buffer, rbuffer_read_ptr(rbuffer), read_count);
- rbuffer_consumed(rbuffer, read_count);
- }
-
- return read_count;
-}
-
-/// Copies data to `rbuffer` read queue.
-///
-/// @param rbuffer the `RBuffer` instance
-/// @param buffer The buffer containing data to be copied
-/// @param count Number of bytes that should be copied
-/// @return The number of bytes actually copied
-size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count)
-{
- size_t write_count = rbuffer_available(rbuffer);
-
- if (count < write_count) {
- write_count = count;
- }
-
- if (write_count > 0) {
- memcpy(rbuffer_write_ptr(rbuffer), buffer, write_count);
- rbuffer_produced(rbuffer, write_count);
- }
-
- return write_count;
-}
-
-/// Returns a pointer to a raw buffer containing the first byte available for
-/// reading.
-char *rbuffer_read_ptr(RBuffer *rbuffer)
-{
- return rbuffer->data + rbuffer->rpos;
-}
-
-/// Returns a pointer to a raw buffer containing the first byte available for
-/// write.
-char *rbuffer_write_ptr(RBuffer *rbuffer)
-{
- return rbuffer->data + rbuffer->wpos;
-}
-
-/// Returns the number of bytes ready for consumption in `rbuffer`
-///
-/// @param rbuffer The `RBuffer` instance
-/// @return The number of bytes ready for consumption
-size_t rbuffer_pending(RBuffer *rbuffer)
-{
- return rbuffer->wpos - rbuffer->rpos;
-}
-
-/// Returns available space in `rbuffer`
-///
-/// @param rbuffer The `RBuffer` instance
-/// @return The space available in number of bytes
-size_t rbuffer_available(RBuffer *rbuffer)
-{
- return rbuffer->capacity - rbuffer->wpos;
-}
-
-void rbuffer_free(RBuffer *rbuffer)
-{
- xfree(rbuffer->data);
- xfree(rbuffer);
-}
-
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
/// necessary for reading from a libuv stream.
///
@@ -177,8 +42,10 @@ void rbuffer_free(RBuffer *rbuffer)
RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
{
RStream *rv = xmalloc(sizeof(RStream));
+ buffer->data = rv;
+ buffer->full_cb = on_rbuffer_full;
+ buffer->nonfull_cb = on_rbuffer_nonfull;
rv->buffer = buffer;
- rv->buffer->rstream = rv;
rv->fpos = 0;
rv->data = data;
rv->cb = cb;
@@ -190,16 +57,14 @@ RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
return rv;
}
-/// Returns the read pointer used by the rstream.
-char *rstream_read_ptr(RStream *rstream)
+static void on_rbuffer_full(RBuffer *buf, void *data)
{
- return rbuffer_read_ptr(rstream->buffer);
+ rstream_stop(data);
}
-/// Returns the number of bytes before the rstream is full.
-size_t rstream_available(RStream *rstream)
+static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
- return rbuffer_available(rstream->buffer);
+ rstream_start(data);
}
/// Frees all memory allocated for a RStream instance
@@ -297,37 +162,13 @@ void rstream_stop(RStream *rstream)
}
}
-/// Returns the number of bytes ready for consumption in `rstream`
-size_t rstream_pending(RStream *rstream)
-{
- return rbuffer_pending(rstream->buffer);
-}
-
-/// Reads data from a `RStream` instance into a buffer.
-///
-/// @param rstream The `RStream` instance
-/// @param buffer The buffer which will receive the data
-/// @param count Number of bytes that `buffer` can accept
-/// @return The number of bytes copied into `buffer`
-size_t rstream_read(RStream *rstream, char *buffer, size_t count)
-{
- return rbuffer_read(rstream->buffer, buffer, count);
-}
-
-RBuffer *rstream_buffer(RStream *rstream)
-{
- return rstream->buffer;
-}
-
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
RStream *rstream = handle_get_rstream(handle);
-
- buf->len = rbuffer_available(rstream->buffer);
- buf->base = rbuffer_write_ptr(rstream->buffer);
+ buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len);
}
// Callback invoked by libuv after it copies the data into the buffer provided
@@ -338,23 +179,30 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
RStream *rstream = handle_get_rstream((uv_handle_t *)stream);
if (cnt <= 0) {
- if (cnt != UV_ENOBUFS) {
- DLOG("Closing RStream(%p)", rstream);
+ if (cnt != UV_ENOBUFS
+ // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
+ // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
+ //
+ // We don't need to do anything with the RBuffer because the next call
+ // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
+ // won't be called)
+ && cnt != 0) {
+ DLOG("Closing RStream(%p) because of %s(%zd)", rstream,
+ uv_strerror((int)cnt), cnt);
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
- rstream->cb(rstream, rstream->data, true);
+ rstream->cb(rstream, rstream->buffer, rstream->data, true);
}
return;
}
// at this point we're sure that cnt is positive, no error occurred
- size_t nread = (size_t) cnt;
-
+ size_t nread = (size_t)cnt;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(rstream->buffer, nread);
- rstream->cb(rstream, rstream->data, false);
+ rstream->cb(rstream, rstream->buffer, rstream->data, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -363,8 +211,7 @@ static void fread_idle_cb(uv_idle_t *handle)
uv_fs_t req;
RStream *rstream = handle_get_rstream((uv_handle_t *)handle);
- rstream->uvbuf.len = rbuffer_available(rstream->buffer);
- rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer);
+ rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len);
// the offset argument to uv_fs_read is int64_t, could someone really try
// to read more than 9 quintillion (9e18) bytes?
@@ -389,7 +236,7 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
- rstream->cb(rstream, rstream->data, true);
+ rstream->cb(rstream, rstream->buffer, rstream->data, true);
return;
}
@@ -404,15 +251,3 @@ static void close_cb(uv_handle_t *handle)
xfree(handle->data);
xfree(handle);
}
-
-static void rbuffer_relocate(RBuffer *rbuffer)
-{
- assert(rbuffer->rpos <= rbuffer->wpos);
- // Move data ...
- memmove(
- rbuffer->data, // ...to the beginning of the buffer(rpos 0)
- rbuffer->data + rbuffer->rpos, // ...From the first unread position
- rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes
- rbuffer->wpos -= rbuffer->rpos;
- rbuffer->rpos = 0;
-}
diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h
index 713d1e77e6..3e24724573 100644
--- a/src/nvim/os/rstream.h
+++ b/src/nvim/os/rstream.h
@@ -5,7 +5,6 @@
#include <stdint.h>
#include <uv.h>
#include "nvim/os/event_defs.h"
-
#include "nvim/os/rstream_defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/os/rstream_defs.h b/src/nvim/os/rstream_defs.h
index 1d71160963..45dced0b62 100644
--- a/src/nvim/os/rstream_defs.h
+++ b/src/nvim/os/rstream_defs.h
@@ -3,15 +3,18 @@
#include <stdbool.h>
-typedef struct rbuffer RBuffer;
+#include "nvim/rbuffer.h"
+
typedef struct rstream RStream;
/// Type of function called when the RStream receives data
///
/// @param rstream The RStream instance
+/// @param rbuffer The associated RBuffer instance
/// @param data State associated with the RStream instance
/// @param eof If the stream reached EOF.
-typedef void (*rstream_cb)(RStream *rstream, void *data, bool eof);
+typedef void (*rstream_cb)(RStream *rstream, RBuffer *buf, void *data,
+ bool eof);
#endif // NVIM_OS_RSTREAM_DEFS_H
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 2de3b1aeed..48174533a6 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -283,25 +283,28 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
buf->data = xrealloc(buf->data, buf->cap);
}
-static void system_data_cb(RStream *rstream, void *data, bool eof)
+static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
- DynamicBuffer *buf = job_data(job);
+ DynamicBuffer *dbuf = job_data(job);
- size_t nread = rstream_pending(rstream);
-
- dynamic_buffer_ensure(buf, buf->len + nread + 1);
- rstream_read(rstream, buf->data + buf->len, nread);
-
- buf->len += nread;
+ size_t nread = buf->size;
+ dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1);
+ rbuffer_read(buf, dbuf->data + dbuf->len, nread);
+ dbuf->len += nread;
}
-static void out_data_cb(RStream *rstream, void *data, bool eof)
+static void out_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
{
- RBuffer *rbuffer = rstream_buffer(rstream);
- size_t written = write_output(rbuffer_read_ptr(rbuffer),
- rbuffer_pending(rbuffer), false, eof);
- rbuffer_consumed(rbuffer, written);
+ RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
+ size_t written = write_output(ptr, len, false,
+ eof && len <= rbuffer_size(buf));
+ if (written) {
+ rbuffer_consumed(buf, written);
+ } else {
+ break;
+ }
+ }
}
/// Parses a command string into a sequence of words, taking quotes into