aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/os/event.c4
-rw-r--r--src/os/event_defs.h7
-rw-r--r--src/os/input.c2
-rw-r--r--src/os/job.c4
-rw-r--r--src/os/job.h1
-rw-r--r--src/os/rstream.c45
-rw-r--r--src/os/rstream.h15
7 files changed, 64 insertions, 14 deletions
diff --git a/src/os/event.c b/src/os/event.c
index c96cc692c7..4c5be1e16b 100644
--- a/src/os/event.c
+++ b/src/os/event.c
@@ -8,6 +8,7 @@
#include "os/event.h"
#include "os/input.h"
#include "os/signal.h"
+#include "os/rstream.h"
#include "os/job.h"
#include "vim.h"
#include "memory.h"
@@ -112,6 +113,9 @@ void event_process()
case kEventJobActivity:
job_handle(event);
break;
+ case kEventRStreamData:
+ rstream_read_event(event);
+ break;
default:
abort();
}
diff --git a/src/os/event_defs.h b/src/os/event_defs.h
index 8e00324ba6..5925a31718 100644
--- a/src/os/event_defs.h
+++ b/src/os/event_defs.h
@@ -6,7 +6,8 @@
typedef enum {
kEventSignal,
- kEventJobActivity
+ kEventJobActivity,
+ kEventRStreamData
} EventType;
typedef struct {
@@ -14,6 +15,10 @@ typedef struct {
union {
int signum;
struct {
+ RStream *ptr;
+ bool eof;
+ } rstream;
+ struct {
Job *ptr;
RStream *target;
bool from_stdout;
diff --git a/src/os/input.c b/src/os/input.c
index 4311d70e54..e0a00e006a 100644
--- a/src/os/input.c
+++ b/src/os/input.c
@@ -34,7 +34,7 @@ static int push_event_key(uint8_t *buf, int maxlen);
void input_init()
{
- read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL);
+ read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false);
rstream_set_file(read_stream, read_cmd_fd);
}
diff --git a/src/os/job.c b/src/os/job.c
index 5dd4f7abe1..5bd404e5be 100644
--- a/src/os/job.c
+++ b/src/os/job.c
@@ -159,8 +159,8 @@ int job_start(char **argv, void *data, job_read_cb cb)
}
// Start the readable streams
- job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
- job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
diff --git a/src/os/job.h b/src/os/job.h
index 594a734bab..c0f6734467 100644
--- a/src/os/job.h
+++ b/src/os/job.h
@@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdbool.h>
+#include "os/event_defs.h"
#include "os/event.h"
/// Initializes job control resources
diff --git a/src/os/rstream.c b/src/os/rstream.c
index 5f4cd5ed94..63dfb2aa39 100644
--- a/src/os/rstream.c
+++ b/src/os/rstream.c
@@ -6,6 +6,8 @@
#include "os/rstream_defs.h"
#include "os/rstream.h"
+#include "os/event_defs.h"
+#include "os/event.h"
#include "vim.h"
#include "memory.h"
@@ -19,20 +21,25 @@ struct rstream {
uv_file fd;
rstream_cb cb;
uint32_t buffer_size, rpos, wpos, fpos;
- bool reading, free_handle;
+ bool reading, free_handle, async;
};
// Callbacks used by libuv
static void alloc_cb(uv_handle_t *, size_t, uv_buf_t *);
static void read_cb(uv_stream_t *, ssize_t, const uv_buf_t *);
static void fread_idle_cb(uv_idle_t *);
+static void emit_read_event(RStream *rstream, bool eof);
-RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data)
+RStream * rstream_new(rstream_cb cb,
+ uint32_t buffer_size,
+ void *data,
+ bool async)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
+ rv->async = async;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
@@ -162,6 +169,13 @@ uint32_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
+void rstream_read_event(Event event)
+{
+ RStream *rstream = event.data.rstream.ptr;
+
+ rstream->cb(rstream, rstream->data, event.data.rstream.eof);
+}
+
// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
@@ -191,7 +205,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
- rstream->cb(rstream, rstream->data, true);
+ emit_read_event(rstream, true);
}
return;
}
@@ -205,11 +219,8 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
rstream_stop(rstream);
}
- // Invoke the callback passing in the number of bytes available and data
- // associated with the stream
- rstream->cb(rstream, rstream->data, false);
rstream->reading = false;
-
+ emit_read_event(rstream, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -235,7 +246,7 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
- rstream->cb(rstream, rstream->data, true);
+ emit_read_event(rstream, true);
return;
}
@@ -247,5 +258,21 @@ static void fread_idle_cb(uv_idle_t *handle)
rstream_stop(rstream);
}
- rstream->cb(rstream, rstream->data, false);
+ emit_read_event(rstream, false);
+}
+
+static void emit_read_event(RStream *rstream, bool eof)
+{
+ if (rstream->async) {
+ Event event;
+
+ event.type = kEventRStreamData;
+ event.data.rstream.ptr = rstream;
+ event.data.rstream.eof = eof;
+ event_push(event);
+ } else {
+ // Invoke the callback passing in the number of bytes available and data
+ // associated with the stream
+ rstream->cb(rstream, rstream->data, eof);
+ }
}
diff --git a/src/os/rstream.h b/src/os/rstream.h
index 1b3b679f9f..2ca85bdf23 100644
--- a/src/os/rstream.h
+++ b/src/os/rstream.h
@@ -5,6 +5,7 @@
#include <stdint.h>
#include <uv.h>
+#include "os/event_defs.h"
#include "os/rstream_defs.h"
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
@@ -14,8 +15,15 @@
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
+/// @param async Flag that specifies if the callback should only be called
+/// outside libuv event loop(When processing async events with
+/// KE_EVENT). Only the RStream instance reading user input should set
+/// this to false
/// @return The newly-allocated `RStream` instance
-RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data);
+RStream * rstream_new(rstream_cb cb,
+ uint32_t buffer_size,
+ void *data,
+ bool async);
/// Frees all memory allocated for a RStream instance
///
@@ -71,5 +79,10 @@ uint32_t rstream_read(RStream *rstream, char *buffer, uint32_t count);
/// @return The number of bytes available
uint32_t rstream_available(RStream *rstream);
+/// Runs the read callback associated with the rstream
+///
+/// @param event Object containing data necessary to invoke the callback
+void rstream_read_event(Event event);
+
#endif // NEOVIM_OS_RSTREAM_H