diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-07-08 13:08:29 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-07-17 11:37:42 -0300 |
commit | 2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f (patch) | |
tree | 5f43aacf9a42b18e02c9873d18a4549c6b7cb2c1 | |
parent | cf30837951120bb27563054ab9aadd4ccf6fadbf (diff) | |
download | rneovim-2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f.tar.gz rneovim-2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f.tar.bz2 rneovim-2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f.zip |
events: Refactor how event deferral is handled
- Remove all *_set_defer methods and the 'defer' flag from rstream/jobs
- Added {signal,rstream,job}_event_source functions. Each return a pointer that
represent the event source for the object in question(For signals, a static
pointer is returned)
- Added a 'source' field to the Event struct, which is set to the appropriate
value by the code that created the event.
- Added a 'sources' parameter to `event_poll`. It should point to a
NULL-terminated array of event sources that will be used to decide which
events should be processed immediately
- Added a 'source_override' parameter to `rstream_new`. This was required to use
jobs as event sources of RStream instances(When "focusing" on a job, for
example).
- Extracted `process_from` static function from `event_process`.
- Remove 'defer' parameter from `event_process`, which now operates only on
deferred events.
- Refactor `channel_send_call` to use the new lock mechanism
What changed in a single sentence: Code that calls `event_poll` have to specify
which event sources should NOT be deferred. This change was necessary for a
number of reasons:
- To fix a bug where due to race conditions, a client request
could end in the deferred queue in the middle of a `channel_send_call`
invocation, resulting in a deadlock since the client process would never
receive a response, and channel_send_call would never return because
the client would still be waiting for the response.
- To handle "event locking" correctly in recursive `channel_send_call`
invocations when the frames are waiting for responses from different
clients. Not much of an issue now since there's only a python client, but
could break things later.
- To simplify the process of implementing synchronous functions that depend on
asynchronous events.
-rw-r--r-- | src/nvim/edit.c | 2 | ||||
-rw-r--r-- | src/nvim/ex_getln.c | 4 | ||||
-rw-r--r-- | src/nvim/getchar.c | 2 | ||||
-rw-r--r-- | src/nvim/message.c | 7 | ||||
-rw-r--r-- | src/nvim/normal.c | 2 | ||||
-rw-r--r-- | src/nvim/os/channel.c | 30 | ||||
-rw-r--r-- | src/nvim/os/event.c | 88 | ||||
-rw-r--r-- | src/nvim/os/event_defs.h | 3 | ||||
-rw-r--r-- | src/nvim/os/input.c | 9 | ||||
-rw-r--r-- | src/nvim/os/job.c | 31 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 52 | ||||
-rw-r--r-- | src/nvim/os/signal.c | 8 |
12 files changed, 152 insertions, 86 deletions
diff --git a/src/nvim/edit.c b/src/nvim/edit.c index 8cfaddde86..3dddaea39d 100644 --- a/src/nvim/edit.c +++ b/src/nvim/edit.c @@ -944,7 +944,7 @@ doESCkey: break; case K_EVENT: - event_process(true); + event_process(); break; case K_HOME: /* <Home> */ diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c index 67748fa164..bf5076bdc3 100644 --- a/src/nvim/ex_getln.c +++ b/src/nvim/ex_getln.c @@ -762,7 +762,7 @@ getcmdline ( */ switch (c) { case K_EVENT: - event_process(true); + event_process(); // Force a redraw even though the command line didn't change shell_resized(); goto cmdline_not_changed; @@ -1878,7 +1878,7 @@ redraw: if (IS_SPECIAL(c1)) { // Process deferred events - event_process(true); + event_process(); // Ignore other special key codes continue; } diff --git a/src/nvim/getchar.c b/src/nvim/getchar.c index 31fa6a702f..340b31d80a 100644 --- a/src/nvim/getchar.c +++ b/src/nvim/getchar.c @@ -2473,7 +2473,7 @@ inchar ( char_u dum[DUM_LEN + 1]; for (;; ) { - event_process(true); + event_process(); len = ui_inchar(dum, DUM_LEN, 0L, 0); if (len == 0 || (len == 1 && dum[0] == 3)) break; diff --git a/src/nvim/message.c b/src/nvim/message.c index dea02e21fa..ef0faa35ee 100644 --- a/src/nvim/message.c +++ b/src/nvim/message.c @@ -2074,7 +2074,7 @@ static int do_more_prompt(int typed_char) toscroll = 0; switch (c) { case K_EVENT: - event_process(true); + event_process(); break; case BS: /* scroll one line back */ case K_BS: @@ -2734,8 +2734,11 @@ do_dialog ( retval = 0; break; default: /* Could be a hotkey? */ - if (c < 0) /* special keys are ignored here */ + if (c < 0) { /* special keys are ignored here */ + // drain event queue to prevent infinite loop + event_process(); continue; + } if (c == ':' && ex_cmd) { retval = dfltbutton; ins_char_typebuf(':'); diff --git a/src/nvim/normal.c b/src/nvim/normal.c index 5a4c3a326a..55b86f61dd 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -7375,5 +7375,5 @@ static void nv_cursorhold(cmdarg_T *cap) static void nv_event(cmdarg_T *cap) { - event_process(true); + event_process(); } diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 11a58f246a..d5f29aa667 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -118,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream) stream->data = NULL; channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -189,16 +189,10 @@ bool channel_send_call(uint64_t id, // Send the msgpack-rpc request send_request(channel, request_id, name, arg); - if (!kv_size(channel->call_stack)) { - // This is the first frame, we must disable event deferral for this - // channel because we won't be returning until the client sends a - // response - if (channel->is_job) { - job_set_defer(channel->data.job, false); - } else { - rstream_set_defer(channel->data.streams.read, false); - } - } + EventSource channel_source = channel->is_job + ? job_event_source(channel->data.job) + : rstream_event_source(channel->data.streams.read); + EventSource sources[] = {channel_source, NULL}; // Push the frame ChannelCallFrame frame = {request_id, false, NIL}; @@ -206,24 +200,18 @@ bool channel_send_call(uint64_t id, size_t size = kv_size(channel->call_stack); do { - event_poll(-1); + event_poll(-1, sources); } while ( // Continue running if ... channel->enabled && // the channel is still enabled kv_size(channel->call_stack) >= size); // the call didn't return - if (!kv_size(channel->call_stack)) { - // Popped last frame, restore event deferral - if (channel->is_job) { - job_set_defer(channel->data.job, true); - } else { - rstream_set_defer(channel->data.streams.read, true); - } - if (!channel->enabled && !channel->rpc_call_level) { + if (!(kv_size(channel->call_stack) + || channel->enabled + || channel->rpc_call_level)) { // Close the channel if it has been disabled and we have not been called // by `parse_msgpack`(It would be unsafe to close the channel otherwise) close_channel(channel); - } } *errored = frame.errored; diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 6367a60af7..a460b2db96 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -33,6 +33,11 @@ typedef struct { # include "os/event.c.generated.h" #endif static klist_t(Event) *deferred_events, *immediate_events; +// NULL-terminated array of event sources that we should process immediately. +// +// Events from sources that are not contained in this array are processed +// later when `event_process` is called +static EventSource *immediate_sources = NULL; void event_init(void) { @@ -63,7 +68,8 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms) +bool event_poll(int32_t ms, EventSource sources[]) + FUNC_ATTR_NONNULL_ARG(2) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -99,10 +105,7 @@ bool event_poll(int32_t ms) do { // Run one event loop iteration, blocking for events if run_mode is // UV_RUN_ONCE - DLOG("Entering event loop"); - uv_run(uv_default_loop(), run_mode); - processed_events = event_process(false); - DLOG("Exited event loop, processed %u events", processed_events); + processed_events = loop(run_mode, sources); } while ( // Continue running if ... !processed_events && // we didn't process any immediate events @@ -120,8 +123,7 @@ bool event_poll(int32_t ms) // once more to let libuv perform it's cleanup uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL); - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - event_process(false); + processed_events += loop(UV_RUN_NOWAIT, sources); } return !timer_data.timed_out && (processed_events || event_has_deferred()); @@ -129,22 +131,41 @@ bool event_poll(int32_t ms) bool event_has_deferred(void) { - return !kl_empty(get_queue(true)); + return !kl_empty(deferred_events); } -// Push an event to the queue -void event_push(Event event, bool deferred) +// Queue an event +void event_push(Event event) { - *kl_pushp(Event, get_queue(deferred)) = event; + bool defer = true; + + if (immediate_sources) { + size_t i; + EventSource src; + + for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) { + if (src == event.source) { + defer = false; + break; + } + } + } + + *kl_pushp(Event, defer ? deferred_events : immediate_events) = event; +} + +void event_process(void) +{ + process_from(deferred_events); } // Runs the appropriate action for each queued event -size_t event_process(bool deferred) +static size_t process_from(klist_t(Event) *queue) { size_t count = 0; Event event; - while (kl_shift(Event, get_queue(deferred), &event) == 0) { + while (kl_shift(Event, queue, &event) == 0) { switch (event.type) { case kEventSignal: signal_handle(event); @@ -161,6 +182,8 @@ size_t event_process(bool deferred) count++; } + DLOG("Processed %u events", count); + return count; } @@ -179,7 +202,42 @@ static void timer_prepare_cb(uv_prepare_t *handle) uv_prepare_stop(handle); } -static klist_t(Event) *get_queue(bool deferred) +static void requeue_deferred_events(void) { - return deferred ? deferred_events : immediate_events; + size_t remaining = deferred_events->size; + + DLOG("Number of deferred events: %u", remaining); + + while (remaining--) { + // Re-push each deferred event to ensure it will be in the right queue + Event event; + kl_shift(Event, deferred_events, &event); + event_push(event); + DLOG("Re-queueing event"); + } + + DLOG("Number of deferred events: %u", deferred_events->size); +} + +static size_t loop(uv_run_mode run_mode, EventSource *sources) +{ + size_t count; + immediate_sources = sources; + // It's possible that some events from the immediate sources are waiting + // in the deferred queue. If so, move them to the immediate queue so they + // will be processed in order of arrival by the next `process_from` call. + requeue_deferred_events(); + count = process_from(immediate_events); + + if (count) { + // No need to enter libuv, events were already processed + return count; + } + + DLOG("Enter event loop"); + uv_run(uv_default_loop(), run_mode); + DLOG("Exit event loop"); + immediate_sources = NULL; + count = process_from(immediate_events); + return count; } diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index ca2cabd75a..dbee3e2ba7 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -6,6 +6,8 @@ #include "nvim/os/job_defs.h" #include "nvim/os/rstream_defs.h" +typedef void * EventSource; + typedef enum { kEventSignal, kEventRStreamData, @@ -13,6 +15,7 @@ typedef enum { } EventType; typedef struct { + EventSource source; EventType type; union { int signum; diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 58bdf0cf52..15aebdbf3d 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -34,7 +34,7 @@ static bool eof = false, started_reading = false; void input_init(void) { - read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false); + read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -129,7 +129,12 @@ bool os_isatty(int fd) static bool input_poll(int32_t ms) { - return input_ready() || event_poll(ms) || input_ready(); + EventSource input_sources[] = { + rstream_event_source(read_stream), + NULL + }; + + return input_ready() || event_poll(ms, input_sources) || input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 2f5b257b91..203aa2c990 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -214,8 +214,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job)); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job)); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -269,18 +269,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Sets the `defer` flag for a Job instance -/// -/// @param rstream The Job id -/// @param defer The new value for the flag -void job_set_defer(Job *job, bool defer) -{ - job->defer = defer; - rstream_set_defer(job->out, defer); - rstream_set_defer(job->err, defer); -} - - /// Runs the read callback associated with the job exit event /// /// @param event Object containing data necessary to invoke the callback @@ -307,6 +295,11 @@ void *job_data(Job *job) return job->data; } +EventSource job_event_source(Job *job) +{ + return job; +} + static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -391,10 +384,12 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) static void emit_exit_event(Job *job) { - Event event; - event.type = kEventJobExit; - event.data.job = job; - event_push(event, true); + Event event = { + .source = job_event_source(job), + .type = kEventJobExit, + .data.job = job + }; + event_push(event); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 96dd26407a..d7ab5b8a64 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -26,7 +26,8 @@ struct rstream { uv_file fd; rstream_cb cb; size_t buffer_size, rpos, wpos, fpos; - bool reading, free_handle, defer; + bool reading, free_handle; + EventSource source_override; }; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -40,25 +41,25 @@ struct rstream { /// for reading with `rstream_read` /// @param buffer_size Size in bytes of the internal buffer. /// @param data Some state to associate with the `RStream` instance -/// @param defer Flag that specifies if callback invocation should be deferred -/// to vim main loop(as a KE_EVENT special key) +/// @param source_override Replacement for the default source used in events +/// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance RStream * rstream_new(rstream_cb cb, size_t buffer_size, void *data, - bool defer) + EventSource source_override) { RStream *rv = xmalloc(sizeof(RStream)); rv->buffer = xmalloc(buffer_size); rv->buffer_size = buffer_size; rv->data = data; - rv->defer = defer; rv->cb = cb; rv->rpos = rv->wpos = rv->fpos = 0; rv->stream = NULL; rv->fread_idle = NULL; rv->free_handle = false; rv->file_type = UV_UNKNOWN_HANDLE; + rv->source_override = source_override ? source_override : rv; return rv; } @@ -213,15 +214,6 @@ size_t rstream_available(RStream *rstream) return rstream->wpos - rstream->rpos; } -/// Sets the `defer` flag for a a RStream instance -/// -/// @param rstream The RStream instance -/// @param defer The new value for the flag -void rstream_set_defer(RStream *rstream, bool defer) -{ - rstream->defer = defer; -} - /// Runs the read callback associated with the rstream /// /// @param event Object containing data necessary to invoke the callback @@ -232,6 +224,11 @@ void rstream_read_event(Event event) rstream->cb(rstream, rstream->data, event.data.rstream.eof); } +EventSource rstream_event_source(RStream *rstream) +{ + return rstream->source_override; +} + // Callbacks used by libuv // Called by libuv to allocate memory for reading. @@ -260,7 +257,9 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt <= 0) { if (cnt != UV_ENOBUFS) { - DLOG("Closing RStream(%p)", rstream); + DLOG("Closing RStream(address: %p, source: %p)", + rstream, + rstream_event_source(rstream)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); @@ -275,12 +274,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rstream->wpos += nread; - DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rstream); + DLOG("Received %u bytes from RStream(address: %p, source: %p)", + (size_t)cnt, + rstream, + rstream_event_source(rstream)); if (rstream->wpos == rstream->buffer_size) { // The last read filled the buffer, stop reading for now rstream_stop(rstream); - DLOG("Buffer for RStream(%p) is full, stopping it", rstream); + DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", + rstream, + rstream_event_source(rstream)); } rstream->reading = false; @@ -345,9 +349,13 @@ static void close_cb(uv_handle_t *handle) static void emit_read_event(RStream *rstream, bool eof) { - Event event; - event.type = kEventRStreamData; - event.data.rstream.ptr = rstream; - event.data.rstream.eof = eof; - event_push(event, rstream->defer); + Event event = { + .source = rstream_event_source(rstream), + .type = kEventRStreamData, + .data.rstream = { + .ptr = rstream, + .eof = eof + } + }; + event_push(event); } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 65657fda9c..17f270a5cc 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -103,6 +103,11 @@ void signal_handle(Event event) } } +EventSource signal_event_source(void) +{ + return &sint; +} + static char * signal_name(int signum) { switch (signum) { @@ -155,10 +160,11 @@ static void signal_cb(uv_signal_t *handle, int signum) } Event event = { + .source = signal_event_source(), .type = kEventSignal, .data = { .signum = signum } }; - event_push(event, true); + event_push(event); } |