aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nvim/channel.c55
1 files changed, 26 insertions, 29 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index 4e6ca8d278..b37fa10b12 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -25,7 +25,8 @@ typedef struct {
Channel *chan;
Callback *callback;
const char *type;
- list_T *received;
+ // if reader is set, status is ignored.
+ CallbackReader *reader;
int status;
} ChannelEvent;
@@ -253,17 +254,12 @@ void channel_decref(Channel *chan)
void callback_reader_free(CallbackReader *reader)
{
callback_free(&reader->cb);
- if (reader->buffered) {
- ga_clear(&reader->buffer);
- }
+ ga_clear(&reader->buffer);
}
void callback_reader_start(CallbackReader *reader)
{
- if (reader->buffered) {
- ga_init(&reader->buffer, sizeof(char *), 32);
- ga_grow(&reader->buffer, 32);
- }
+ ga_init(&reader->buffer, sizeof(char *), 32);
}
static void free_channel_event(void **argv)
@@ -533,29 +529,27 @@ err:
///
/// @return [allocated] Converted list.
static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
- FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE
+ FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE
{
list_T *const l = tv_list_alloc(kListLenMayKnow);
// Empty buffer should be represented by [''], encode_list_write() thinks
// empty list is fine for the case.
tv_list_append_string(l, "", 0);
- encode_list_write(l, buf, len);
+ if (len > 0) {
+ encode_list_write(l, buf, len);
+ }
return l;
}
// vimscript job callbacks must be executed on Nvim main loop
static inline void process_channel_event(Channel *chan, Callback *callback,
- const char *type, char *buf,
- size_t count, int status)
+ const char *type,
+ CallbackReader *reader, int status)
{
assert(callback);
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
- event_data->received = NULL;
- if (buf) {
- event_data->received = buffer_to_tv_list(buf, count);
- } else {
- event_data->status = status;
- }
+ event_data->reader = reader;
+ event_data->status = status;
channel_incref(chan); // Hold on ref to callback
event_data->chan = chan;
event_data->callback = callback;
@@ -605,8 +599,7 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
if (eof) {
if (reader->buffered) {
if (reader->cb.type != kCallbackNone) {
- process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
- (size_t)reader->buffer.ga_len, 0);
+ process_channel_event(chan, &reader->cb, type, reader, 0);
} else if (reader->self) {
if (tv_dict_find(reader->self, type, -1) == NULL) {
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
@@ -617,12 +610,12 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
channel_incref(chan);
multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
}
+ ga_clear(&reader->buffer);
} else {
abort();
}
- ga_clear(&reader->buffer);
} else if (reader->cb.type != kCallbackNone) {
- process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
+ process_channel_event(chan, &reader->cb, type, reader, 0);
}
return;
}
@@ -634,10 +627,12 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
}
rbuffer_consumed(buf, count);
- if (reader->buffered) {
- ga_concat_len(&reader->buffer, ptr, count);
- } else if (callback_reader_set(*reader)) {
- process_channel_event(chan, &reader->cb, type, ptr, count, 0);
+ // if buffer wasn't consumed, a pending callback is stalled. Aggregate the
+ // received data and avoid a "burst" of multiple callbacks.
+ bool buffer_set = reader->buffer.ga_len > 0;
+ ga_concat_len(&reader->buffer, ptr, count);
+ if (!reader->buffered && !buffer_set && callback_reader_set(*reader)) {
+ process_channel_event(chan, &reader->cb, type, reader, 0);
}
}
@@ -661,7 +656,7 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
// If process did not exit, we only closed the handle of a detached process.
bool exited = (status >= 0);
if (exited) {
- process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
+ process_channel_event(chan, &chan->on_exit, "exit", NULL, status);
}
channel_decref(chan);
@@ -677,11 +672,13 @@ static void on_channel_event(void **args)
argv[0].v_lock = VAR_UNLOCKED;
argv[0].vval.v_number = (varnumber_T)ev->chan->id;
- if (ev->received) {
+ if (ev->reader) {
argv[1].v_type = VAR_LIST;
argv[1].v_lock = VAR_UNLOCKED;
- argv[1].vval.v_list = ev->received;
+ argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data,
+ (size_t)ev->reader->buffer.ga_len);
tv_list_ref(argv[1].vval.v_list);
+ ga_clear(&ev->reader->buffer);
} else {
argv[1].v_type = VAR_NUMBER;
argv[1].v_lock = VAR_UNLOCKED;