aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBjörn Linse <bjorn.linse@gmail.com>2019-05-17 18:16:45 +0200
committerBjörn Linse <bjorn.linse@gmail.com>2019-06-18 10:49:38 +0200
commitd4938743e6aef04c83d02907048768d0d79aaa30 (patch)
treeafdd0cbd21e494e48da65d346c67aa68416e0b31 /src
parent4946751906370134cd02e8da0736bef221171172 (diff)
downloadrneovim-d4938743e6aef04c83d02907048768d0d79aaa30.tar.gz
rneovim-d4938743e6aef04c83d02907048768d0d79aaa30.tar.bz2
rneovim-d4938743e6aef04c83d02907048768d0d79aaa30.zip
channel: refactor events, prevent recursive invocation of events
Diffstat (limited to 'src')
-rw-r--r--src/nvim/channel.c223
-rw-r--r--src/nvim/channel.h11
-rw-r--r--src/nvim/eval.c2
3 files changed, 120 insertions, 116 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index 8b8d27affd..3a45a8aec7 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -22,19 +22,10 @@ PMap(uint64_t) *channels = NULL;
/// 2 is reserved for stderr channel
static uint64_t next_chan_id = CHAN_STDERR+1;
-
-typedef struct {
- Channel *chan;
- Callback *callback;
- const char *type;
- // if reader is set, status is ignored.
- CallbackReader *reader;
- int status;
-} ChannelEvent;
-
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "channel.c.generated.h"
#endif
+
/// Teardown the module
void channel_teardown(void)
{
@@ -179,6 +170,7 @@ static Channel *channel_alloc(ChannelStreamType type)
}
chan->events = multiqueue_new_child(main_loop.events);
chan->refcount = 1;
+ chan->exit_status = -1;
chan->streamtype = type;
pmap_put(uint64_t)(channels, chan->id, chan);
return chan;
@@ -234,9 +226,10 @@ void callback_reader_free(CallbackReader *reader)
ga_clear(&reader->buffer);
}
-void callback_reader_start(CallbackReader *reader)
+void callback_reader_start(CallbackReader *reader, const char *type)
{
ga_init(&reader->buffer, sizeof(char *), 32);
+ reader->type = type;
}
static void free_channel_event(void **argv)
@@ -246,7 +239,7 @@ static void free_channel_event(void **argv)
rpc_free(chan);
}
- callback_reader_free(&chan->on_stdout);
+ callback_reader_free(&chan->on_data);
callback_reader_free(&chan->on_stderr);
callback_free(&chan->on_exit);
@@ -286,7 +279,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
assert(cwd == NULL || os_isdir_executable(cwd));
Channel *chan = channel_alloc(kChannelStreamProc);
- chan->on_stdout = on_stdout;
+ chan->on_data = on_stdout;
chan->on_stderr = on_stderr;
chan->on_exit = on_exit;
@@ -326,7 +319,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
has_out = true;
has_err = false;
} else {
- has_out = rpc || callback_reader_set(chan->on_stdout);
+ has_out = rpc || callback_reader_set(chan->on_data);
has_err = callback_reader_set(chan->on_stderr);
}
int status = process_spawn(proc, true, has_out, has_err);
@@ -352,13 +345,13 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
rpc_start(chan);
} else {
if (has_out) {
- callback_reader_start(&chan->on_stdout);
- rstream_start(&proc->out, on_job_stdout, chan);
+ callback_reader_start(&chan->on_data, "stdout");
+ rstream_start(&proc->out, on_channel_data, chan);
}
}
if (has_err) {
- callback_reader_start(&chan->on_stderr);
+ callback_reader_start(&chan->on_stderr, "stderr");
rstream_init(&proc->err, 0);
rstream_start(&proc->err, on_job_stderr, chan);
}
@@ -402,9 +395,9 @@ uint64_t channel_connect(bool tcp, const char *address,
if (rpc) {
rpc_start(channel);
} else {
- channel->on_stdout = on_output;
- callback_reader_start(&channel->on_stdout);
- rstream_start(&channel->stream.socket, on_socket_output, channel);
+ channel->on_data = on_output;
+ callback_reader_start(&channel->on_data, "data");
+ rstream_start(&channel->stream.socket, on_channel_data, channel);
}
end:
@@ -452,9 +445,9 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
if (rpc) {
rpc_start(channel);
} else {
- channel->on_stdout = on_output;
- callback_reader_start(&channel->on_stdout);
- rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
+ channel->on_data = on_output;
+ callback_reader_start(&channel->on_data, "stdin");
+ rstream_start(&channel->stream.stdio.in, on_channel_data, channel);
}
return channel->id;
@@ -519,55 +512,22 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
return l;
}
-// vimscript job callbacks must be executed on Nvim main loop
-static inline void process_channel_event(Channel *chan, Callback *callback,
- const char *type,
- CallbackReader *reader, int status)
-{
- assert(callback);
- ChannelEvent *event_data = xmalloc(sizeof(*event_data));
- event_data->reader = reader;
- event_data->status = status;
- channel_incref(chan); // Hold on ref to callback
- event_data->chan = chan;
- event_data->callback = callback;
- event_data->type = type;
-
- multiqueue_put(chan->events, on_channel_event, 1, event_data);
-}
-
-void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
- void *data, bool eof)
+void on_channel_data(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
{
Channel *chan = data;
- on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
}
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
- on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
-}
-
-static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
- void *data, bool eof)
-{
- Channel *chan = data;
- on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
-}
-
-static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
- void *data, bool eof)
-{
- Channel *chan = data;
- on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
}
-/// @param type must have static lifetime
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
- size_t count, bool eof, CallbackReader *reader,
- const char *type)
+ size_t count, bool eof, CallbackReader *reader)
{
// stub variable, to keep reading consistent with the order of events, only
// consider the count parameter.
@@ -575,57 +535,93 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
char *ptr = rbuffer_read_ptr(buf, &r);
if (eof) {
- if (reader->buffered) {
- if (reader->cb.type != kCallbackNone) {
- process_channel_event(chan, &reader->cb, type, reader, 0);
- } else if (reader->self) {
- if (tv_dict_find(reader->self, type, -1) == NULL) {
- list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
- (size_t)reader->buffer.ga_len);
- tv_dict_add_list(reader->self, type, strlen(type), data);
- } else {
- // can't display error message now, defer it.
- channel_incref(chan);
- multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
- }
- ga_clear(&reader->buffer);
- } else {
- abort();
- }
- } else if (reader->cb.type != kCallbackNone) {
- process_channel_event(chan, &reader->cb, type, reader, 0);
+ reader->eof = true;
+ } else {
+ if (chan->term) {
+ terminal_receive(chan->term, ptr, count);
+ terminal_flush_output(chan->term);
}
- return;
- }
- // The order here matters, the terminal must receive the data first because
- // process_channel_event will modify the read buffer(convert NULs into NLs)
- if (chan->term) {
- terminal_receive(chan->term, ptr, count);
- terminal_flush_output(chan->term);
+ rbuffer_consumed(buf, count);
+
+ if (callback_reader_set(*reader)) {
+ ga_concat_len(&reader->buffer, ptr, count);
+ }
}
- rbuffer_consumed(buf, count);
+ if (callback_reader_set(*reader)) {
+ schedule_channel_event(chan);
+ }
+}
- if (callback_reader_set(*reader) || reader->buffered) {
- // if buffer wasn't consumed, a pending callback is stalled. Aggregate the
- // received data and avoid a "burst" of multiple callbacks.
- bool buffer_set = reader->buffer.ga_len > 0;
- ga_concat_len(&reader->buffer, ptr, count);
- if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) {
- process_channel_event(chan, &reader->cb, type, reader, 0);
+/// schedule the necessary callbacks to be invoked as a deferred event
+static void schedule_channel_event(Channel *chan)
+{
+ if (!chan->callback_scheduled) {
+ if (!chan->callback_busy) {
+ multiqueue_put(chan->events, on_channel_event, 1, chan);
+ channel_incref(chan);
}
+ chan->callback_scheduled = true;
}
}
-static void on_buffered_error(void **args)
+static void on_channel_event(void **args)
{
Channel *chan = (Channel *)args[0];
- const char *stream = (const char *)args[1];
- EMSG3(_(e_streamkey), stream, chan->id);
+
+ chan->callback_busy = true;
+ chan->callback_scheduled = false;
+
+ int exit_status = chan->exit_status;
+ channel_reader_callbacks(chan, &chan->on_data);
+ channel_reader_callbacks(chan, &chan->on_stderr);
+ if (exit_status > -1) {
+ channel_callback_call(chan, NULL);
+ chan->exit_status = -1;
+ }
+
+ chan->callback_busy = false;
+ if (chan->callback_scheduled) {
+ // further callback was deferred to avoid recursion.
+ multiqueue_put(chan->events, on_channel_event, 1, chan);
+ channel_incref(chan);
+ }
+
channel_decref(chan);
}
+void channel_reader_callbacks(Channel *chan, CallbackReader *reader)
+{
+ if (reader->buffered) {
+ if (reader->eof) {
+ if (reader->self) {
+ if (tv_dict_find(reader->self, reader->type, -1) == NULL) {
+ list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len);
+ tv_dict_add_list(reader->self, reader->type, strlen(reader->type),
+ data);
+ } else {
+ EMSG3(_(e_streamkey), reader->type, chan->id);
+ }
+ } else {
+ channel_callback_call(chan, reader);
+ }
+ reader->eof = false;
+ }
+ } else {
+ bool is_eof = reader->eof;
+ if (reader->buffer.ga_len > 0) {
+ channel_callback_call(chan, reader);
+ }
+ // if the stream reached eof, invoke extra callback with no data
+ if (is_eof) {
+ channel_callback_call(chan, reader);
+ reader->eof = false;
+ }
+ }
+}
+
static void channel_process_exit_cb(Process *proc, int status, void *data)
{
Channel *chan = data;
@@ -637,45 +633,46 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
// If process did not exit, we only closed the handle of a detached process.
bool exited = (status >= 0);
- if (exited) {
- process_channel_event(chan, &chan->on_exit, "exit", NULL, status);
+ if (exited && chan->on_exit.type != kCallbackNone) {
+ schedule_channel_event(chan);
+ chan->exit_status = status;
}
channel_decref(chan);
}
-static void on_channel_event(void **args)
+static void channel_callback_call(Channel *chan, CallbackReader *reader)
{
- ChannelEvent *ev = (ChannelEvent *)args[0];
-
+ Callback *cb;
typval_T argv[4];
argv[0].v_type = VAR_NUMBER;
argv[0].v_lock = VAR_UNLOCKED;
- argv[0].vval.v_number = (varnumber_T)ev->chan->id;
+ argv[0].vval.v_number = (varnumber_T)chan->id;
- if (ev->reader) {
+ if (reader) {
argv[1].v_type = VAR_LIST;
argv[1].v_lock = VAR_UNLOCKED;
- argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data,
- (size_t)ev->reader->buffer.ga_len);
+ argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len);
tv_list_ref(argv[1].vval.v_list);
- ga_clear(&ev->reader->buffer);
+ ga_clear(&reader->buffer);
+ cb = &reader->cb;
+ argv[2].vval.v_string = (char_u *)reader->type;
} else {
argv[1].v_type = VAR_NUMBER;
argv[1].v_lock = VAR_UNLOCKED;
- argv[1].vval.v_number = ev->status;
+ argv[1].vval.v_number = chan->exit_status;
+ cb = &chan->on_exit;
+ argv[2].vval.v_string = (char_u *)"exit";
}
argv[2].v_type = VAR_STRING;
argv[2].v_lock = VAR_UNLOCKED;
- argv[2].vval.v_string = (uint8_t *)ev->type;
typval_T rettv = TV_INITIAL_VALUE;
- callback_call(ev->callback, 3, argv, &rettv);
+ callback_call(cb, 3, argv, &rettv);
tv_clear(&rettv);
- channel_decref(ev->chan);
- xfree(ev);
}
diff --git a/src/nvim/channel.h b/src/nvim/channel.h
index b856d197f1..c733e276be 100644
--- a/src/nvim/channel.h
+++ b/src/nvim/channel.h
@@ -42,13 +42,16 @@ typedef struct {
Callback cb;
dict_T *self;
garray_T buffer;
+ bool eof;
bool buffered;
+ const char *type;
} CallbackReader;
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
.self = NULL, \
.buffer = GA_EMPTY_INIT_VALUE, \
- .buffered = false })
+ .buffered = false, \
+ .type = NULL })
static inline bool callback_reader_set(CallbackReader reader)
{
return reader.cb.type != kCallbackNone || reader.self;
@@ -73,9 +76,13 @@ struct Channel {
RpcState rpc;
Terminal *term;
- CallbackReader on_stdout;
+ CallbackReader on_data;
CallbackReader on_stderr;
Callback on_exit;
+ int exit_status;
+
+ bool callback_busy;
+ bool callback_scheduled;
};
EXTERN PMap(uint64_t) *channels;
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 15fc994957..42ff6ceef2 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -5165,7 +5165,7 @@ bool garbage_collect(bool testing)
{
Channel *data;
map_foreach_value(channels, data, {
- set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL);
+ set_ref_in_callback_reader(&data->on_data, copyID, NULL, NULL);
set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);
set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);
})