aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/rstream.c
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-07-08 13:08:29 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-07-17 11:37:42 -0300
commit2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f (patch)
tree5f43aacf9a42b18e02c9873d18a4549c6b7cb2c1 /src/nvim/os/rstream.c
parentcf30837951120bb27563054ab9aadd4ccf6fadbf (diff)
downloadrneovim-2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f.tar.gz
rneovim-2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f.tar.bz2
rneovim-2e4ea29d2c7b62eb8baf1c41cd43433e085dda0f.zip
events: Refactor how event deferral is handled
- Remove all *_set_defer methods and the 'defer' flag from rstream/jobs - Added {signal,rstream,job}_event_source functions. Each return a pointer that represent the event source for the object in question(For signals, a static pointer is returned) - Added a 'source' field to the Event struct, which is set to the appropriate value by the code that created the event. - Added a 'sources' parameter to `event_poll`. It should point to a NULL-terminated array of event sources that will be used to decide which events should be processed immediately - Added a 'source_override' parameter to `rstream_new`. This was required to use jobs as event sources of RStream instances(When "focusing" on a job, for example). - Extracted `process_from` static function from `event_process`. - Remove 'defer' parameter from `event_process`, which now operates only on deferred events. - Refactor `channel_send_call` to use the new lock mechanism What changed in a single sentence: Code that calls `event_poll` have to specify which event sources should NOT be deferred. This change was necessary for a number of reasons: - To fix a bug where due to race conditions, a client request could end in the deferred queue in the middle of a `channel_send_call` invocation, resulting in a deadlock since the client process would never receive a response, and channel_send_call would never return because the client would still be waiting for the response. - To handle "event locking" correctly in recursive `channel_send_call` invocations when the frames are waiting for responses from different clients. Not much of an issue now since there's only a python client, but could break things later. - To simplify the process of implementing synchronous functions that depend on asynchronous events.
Diffstat (limited to 'src/nvim/os/rstream.c')
-rw-r--r--src/nvim/os/rstream.c52
1 files changed, 30 insertions, 22 deletions
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index 96dd26407a..d7ab5b8a64 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -26,7 +26,8 @@ struct rstream {
uv_file fd;
rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos;
- bool reading, free_handle, defer;
+ bool reading, free_handle;
+ EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -40,25 +41,25 @@ struct rstream {
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
-/// @param defer Flag that specifies if callback invocation should be deferred
-/// to vim main loop(as a KE_EVENT special key)
+/// @param source_override Replacement for the default source used in events
+/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
size_t buffer_size,
void *data,
- bool defer)
+ EventSource source_override)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
- rv->defer = defer;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
+ rv->source_override = source_override ? source_override : rv;
return rv;
}
@@ -213,15 +214,6 @@ size_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
-/// Sets the `defer` flag for a a RStream instance
-///
-/// @param rstream The RStream instance
-/// @param defer The new value for the flag
-void rstream_set_defer(RStream *rstream, bool defer)
-{
- rstream->defer = defer;
-}
-
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
@@ -232,6 +224,11 @@ void rstream_read_event(Event event)
rstream->cb(rstream, rstream->data, event.data.rstream.eof);
}
+EventSource rstream_event_source(RStream *rstream)
+{
+ return rstream->source_override;
+}
+
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@@ -260,7 +257,9 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
- DLOG("Closing RStream(%p)", rstream);
+ DLOG("Closing RStream(address: %p, source: %p)",
+ rstream,
+ rstream_event_source(rstream));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
@@ -275,12 +274,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rstream->wpos += nread;
- DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rstream);
+ DLOG("Received %u bytes from RStream(address: %p, source: %p)",
+ (size_t)cnt,
+ rstream,
+ rstream_event_source(rstream));
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
- DLOG("Buffer for RStream(%p) is full, stopping it", rstream);
+ DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
+ rstream,
+ rstream_event_source(rstream));
}
rstream->reading = false;
@@ -345,9 +349,13 @@ static void close_cb(uv_handle_t *handle)
static void emit_read_event(RStream *rstream, bool eof)
{
- Event event;
- event.type = kEventRStreamData;
- event.data.rstream.ptr = rstream;
- event.data.rstream.eof = eof;
- event_push(event, rstream->defer);
+ Event event = {
+ .source = rstream_event_source(rstream),
+ .type = kEventRStreamData,
+ .data.rstream = {
+ .ptr = rstream,
+ .eof = eof
+ }
+ };
+ event_push(event);
}