diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/channel.c | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c index 4e6ca8d278..b37fa10b12 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -25,7 +25,8 @@ typedef struct { Channel *chan; Callback *callback; const char *type; - list_T *received; + // if reader is set, status is ignored. + CallbackReader *reader; int status; } ChannelEvent; @@ -253,17 +254,12 @@ void channel_decref(Channel *chan) void callback_reader_free(CallbackReader *reader) { callback_free(&reader->cb); - if (reader->buffered) { - ga_clear(&reader->buffer); - } + ga_clear(&reader->buffer); } void callback_reader_start(CallbackReader *reader) { - if (reader->buffered) { - ga_init(&reader->buffer, sizeof(char *), 32); - ga_grow(&reader->buffer, 32); - } + ga_init(&reader->buffer, sizeof(char *), 32); } static void free_channel_event(void **argv) @@ -533,29 +529,27 @@ err: /// /// @return [allocated] Converted list. static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) - FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE + FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE { list_T *const l = tv_list_alloc(kListLenMayKnow); // Empty buffer should be represented by [''], encode_list_write() thinks // empty list is fine for the case. tv_list_append_string(l, "", 0); - encode_list_write(l, buf, len); + if (len > 0) { + encode_list_write(l, buf, len); + } return l; } // vimscript job callbacks must be executed on Nvim main loop static inline void process_channel_event(Channel *chan, Callback *callback, - const char *type, char *buf, - size_t count, int status) + const char *type, + CallbackReader *reader, int status) { assert(callback); ChannelEvent *event_data = xmalloc(sizeof(*event_data)); - event_data->received = NULL; - if (buf) { - event_data->received = buffer_to_tv_list(buf, count); - } else { - event_data->status = status; - } + event_data->reader = reader; + event_data->status = status; channel_incref(chan); // Hold on ref to callback event_data->chan = chan; event_data->callback = callback; @@ -605,8 +599,7 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, if (eof) { if (reader->buffered) { if (reader->cb.type != kCallbackNone) { - process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data, - (size_t)reader->buffer.ga_len, 0); + process_channel_event(chan, &reader->cb, type, reader, 0); } else if (reader->self) { if (tv_dict_find(reader->self, type, -1) == NULL) { list_T *data = buffer_to_tv_list(reader->buffer.ga_data, @@ -617,12 +610,12 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, channel_incref(chan); multiqueue_put(chan->events, on_buffered_error, 2, chan, type); } + ga_clear(&reader->buffer); } else { abort(); } - ga_clear(&reader->buffer); } else if (reader->cb.type != kCallbackNone) { - process_channel_event(chan, &reader->cb, type, ptr, 0, 0); + process_channel_event(chan, &reader->cb, type, reader, 0); } return; } @@ -634,10 +627,12 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, } rbuffer_consumed(buf, count); - if (reader->buffered) { - ga_concat_len(&reader->buffer, ptr, count); - } else if (callback_reader_set(*reader)) { - process_channel_event(chan, &reader->cb, type, ptr, count, 0); + // if buffer wasn't consumed, a pending callback is stalled. Aggregate the + // received data and avoid a "burst" of multiple callbacks. + bool buffer_set = reader->buffer.ga_len > 0; + ga_concat_len(&reader->buffer, ptr, count); + if (!reader->buffered && !buffer_set && callback_reader_set(*reader)) { + process_channel_event(chan, &reader->cb, type, reader, 0); } } @@ -661,7 +656,7 @@ static void channel_process_exit_cb(Process *proc, int status, void *data) // If process did not exit, we only closed the handle of a detached process. bool exited = (status >= 0); if (exited) { - process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status); + process_channel_event(chan, &chan->on_exit, "exit", NULL, status); } channel_decref(chan); @@ -677,11 +672,13 @@ static void on_channel_event(void **args) argv[0].v_lock = VAR_UNLOCKED; argv[0].vval.v_number = (varnumber_T)ev->chan->id; - if (ev->received) { + if (ev->reader) { argv[1].v_type = VAR_LIST; argv[1].v_lock = VAR_UNLOCKED; - argv[1].vval.v_list = ev->received; + argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data, + (size_t)ev->reader->buffer.ga_len); tv_list_ref(argv[1].vval.v_list); + ga_clear(&ev->reader->buffer); } else { argv[1].v_type = VAR_NUMBER; argv[1].v_lock = VAR_UNLOCKED; |