aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjörn Linse <bjorn.linse@gmail.com>2017-09-17 16:23:39 +0200
committerBjörn Linse <bjorn.linse@gmail.com>2017-11-25 09:37:00 +0100
commita97cdff14df1bb788a4b659e0db94e2b2ba1f539 (patch)
tree3f596ce114554d8f9e3f1286420f9f3e52c54971
parentfee367a74f3269fd0543bae128c8aaee21f5e592 (diff)
downloadrneovim-a97cdff14df1bb788a4b659e0db94e2b2ba1f539.tar.gz
rneovim-a97cdff14df1bb788a4b659e0db94e2b2ba1f539.tar.bz2
rneovim-a97cdff14df1bb788a4b659e0db94e2b2ba1f539.zip
channels: improvements to buffering
-rw-r--r--src/nvim/channel.c70
-rw-r--r--src/nvim/channel.h4
-rw-r--r--src/nvim/eval.c25
3 files changed, 70 insertions, 29 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index 1c34b2ffce..40af470bde 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -257,7 +257,8 @@ void callback_reader_free(CallbackReader *reader)
void callback_reader_start(CallbackReader *reader)
{
if (reader->buffered) {
- ga_init(&reader->buffer, sizeof(char *), 1);
+ ga_init(&reader->buffer, sizeof(char *), 32);
+ ga_grow(&reader->buffer, 32);
}
}
@@ -521,6 +522,34 @@ err:
return 0;
}
+/// NB: mutates buf in place!
+static list_T *buffer_to_tv_list(char *buf, size_t count)
+{
+ list_T *ret = tv_list_alloc();
+ char *ptr = buf;
+ size_t remaining = count;
+ size_t off = 0;
+
+ while (off < remaining) {
+ // append the line
+ if (ptr[off] == NL) {
+ tv_list_append_string(ret, ptr, (ssize_t)off);
+ size_t skip = off + 1;
+ ptr += skip;
+ remaining -= skip;
+ off = 0;
+ continue;
+ }
+ if (ptr[off] == NUL) {
+ // Translate NUL to NL
+ ptr[off] = NL;
+ }
+ off++;
+ }
+ tv_list_append_string(ret, ptr, (ssize_t)off);
+ return ret;
+}
+
// vimscript job callbacks must be executed on Nvim main loop
static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type, char *buf,
@@ -530,28 +559,7 @@ static inline void process_channel_event(Channel *chan, Callback *callback,
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
event_data->received = NULL;
if (buf) {
- event_data->received = tv_list_alloc();
- char *ptr = buf;
- size_t remaining = count;
- size_t off = 0;
-
- while (off < remaining) {
- // append the line
- if (ptr[off] == NL) {
- tv_list_append_string(event_data->received, ptr, (ssize_t)off);
- size_t skip = off + 1;
- ptr += skip;
- remaining -= skip;
- off = 0;
- continue;
- }
- if (ptr[off] == NUL) {
- // Translate NUL to NL
- ptr[off] = NL;
- }
- off++;
- }
- tv_list_append_string(event_data->received, ptr, (ssize_t)off);
+ event_data->received = buffer_to_tv_list(buf, count);
} else {
event_data->status = status;
}
@@ -602,10 +610,18 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
if (eof) {
if (reader->buffered) {
- process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
- (size_t)reader->buffer.ga_len, 0);
- ga_clear(&reader->buffer);
- } else if (callback_reader_set(*reader)) {
+ if (reader->cb.type != kCallbackNone) {
+ process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len, 0);
+ ga_clear(&reader->buffer);
+ } else if (reader->self) {
+ list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len);
+ tv_dict_add_list(reader->self, type, strlen(type), data);
+ } else {
+ abort();
+ }
+ } else if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
}
return;
diff --git a/src/nvim/channel.h b/src/nvim/channel.h
index 48c21e37f8..b856d197f1 100644
--- a/src/nvim/channel.h
+++ b/src/nvim/channel.h
@@ -40,16 +40,18 @@ typedef struct {
typedef struct {
Callback cb;
+ dict_T *self;
garray_T buffer;
bool buffered;
} CallbackReader;
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
+ .self = NULL, \
.buffer = GA_EMPTY_INIT_VALUE, \
.buffered = false })
static inline bool callback_reader_set(CallbackReader reader)
{
- return reader.cb.type != kCallbackNone;
+ return reader.cb.type != kCallbackNone || reader.self;
}
struct Channel {
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 5fa92cedbd..577aa67c60 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -15090,6 +15090,9 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr)
return;
}
on_data.buffered = tv_dict_get_number(opts, "data_buffered");
+ if (on_data.buffered && on_data.cb.type == kCallbackNone) {
+ on_data.self = opts;
+ }
}
const char *error = NULL;
@@ -15490,6 +15493,10 @@ static void f_stdioopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)
if (!tv_dict_get_callback(opts, S_LEN("on_stdin"), &on_stdin.cb)) {
return;
}
+ on_stdin.buffered = tv_dict_get_number(opts, "stdin_buffered");
+ if (on_stdin.buffered && on_stdin.cb.type == kCallbackNone) {
+ on_stdin.self = opts;
+ }
const char *error;
uint64_t id = channel_from_stdio(rpc, on_stdin, &error);
@@ -16764,7 +16771,17 @@ static bool set_ref_in_callback_reader(CallbackReader *reader, int copyID,
ht_stack_T **ht_stack,
list_stack_T **list_stack)
{
- return set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack);
+ if (set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack)) {
+ return true;
+ }
+
+ if (reader->self) {
+ typval_T tv;
+ tv.v_type = VAR_DICT;
+ tv.vval.v_dict = reader->self;
+ return set_ref_in_item(&tv, copyID, ht_stack, list_stack);
+ }
+ return false;
}
static void add_timer_info(typval_T *rettv, timer_T *timer)
@@ -22344,6 +22361,12 @@ static inline bool common_job_callbacks(dict_T *vopts,
&& tv_dict_get_callback(vopts, S_LEN("on_exit"), on_exit)) {
on_stdout->buffered = tv_dict_get_number(vopts, "stdout_buffered");
on_stderr->buffered = tv_dict_get_number(vopts, "stderr_buffered");
+ if (on_stdout->buffered && on_stdout->cb.type == kCallbackNone) {
+ on_stdout->self = vopts;
+ }
+ if (on_stderr->buffered && on_stderr->cb.type == kCallbackNone) {
+ on_stderr->self = vopts;
+ }
vopts->dv_refcount++;
return true;
}