aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/eval.c26
-rw-r--r--src/os/event_defs.h7
-rw-r--r--src/os/job.c87
-rw-r--r--src/os/job.h15
-rw-r--r--src/os/job_defs.h15
5 files changed, 64 insertions, 86 deletions
diff --git a/src/eval.c b/src/eval.c
index 4704af1a68..ba34d83e41 100644
--- a/src/eval.c
+++ b/src/eval.c
@@ -65,6 +65,8 @@
#include "os/os.h"
#include "os/job.h"
#include "os/shell.h"
+#include "os/rstream.h"
+#include "os/rstream_defs.h"
#if defined(FEAT_FLOAT)
# include <math.h>
@@ -406,8 +408,7 @@ static dictitem_T vimvars_var; /* variable used for v: */
static void apply_job_autocmds(int id,
void *data,
- char *buffer,
- uint32_t len,
+ RStream *target,
bool from_stdout);
static void prepare_vimvar(int idx, typval_T *save_tv);
static void restore_vimvar(int idx, typval_T *save_tv);
@@ -19798,18 +19799,29 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
static void apply_job_autocmds(int id,
void *data,
- char *buffer,
- uint32_t len,
+ RStream *target,
bool from_stdout)
{
list_T *list;
-
- // Call JobActivity autocommands
+ char *str;
+ listitem_T *str_slot = listitem_alloc();
+ uint32_t read_count = rstream_available(target);
+
+ // Prepare the list item containing the data read
+ str = xmalloc(read_count + 1);
+ rstream_read(target, str, read_count);
+ str[read_count] = NUL;
+ str_slot->li_tv.v_type = VAR_STRING;
+ str_slot->li_tv.v_lock = 0;
+ str_slot->li_tv.vval.v_string = (char_u *)str;
+ // Create the list which will be set to v:job_data
list = list_alloc();
list_append_number(list, id);
- list_append_string(list, (char_u *)buffer, len);
+ list_append(list, str_slot);
list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3);
+ // Update v:job_data for the autocommands
set_vim_var_list(VV_JOB_DATA, list);
+ // Call JobActivity autocommands
apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL);
}
diff --git a/src/os/event_defs.h b/src/os/event_defs.h
index a72462b18a..8e00324ba6 100644
--- a/src/os/event_defs.h
+++ b/src/os/event_defs.h
@@ -2,6 +2,7 @@
#define NEOVIM_OS_EVENT_DEFS_H
#include "os/job_defs.h"
+#include "os/rstream_defs.h"
typedef enum {
kEventSignal,
@@ -12,7 +13,11 @@ typedef struct {
EventType type;
union {
int signum;
- Job *job;
+ struct {
+ Job *ptr;
+ RStream *target;
+ bool from_stdout;
+ } job;
} data;
} Event;
diff --git a/src/os/job.c b/src/os/job.c
index 0d62940013..5dd4f7abe1 100644
--- a/src/os/job.c
+++ b/src/os/job.c
@@ -5,6 +5,8 @@
#include "os/job.h"
#include "os/job_defs.h"
+#include "os/rstream.h"
+#include "os/rstream_defs.h"
#include "os/time.h"
#include "os/shell.h"
#include "vim.h"
@@ -15,13 +17,6 @@
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 1024
-/// Possible lock states of the job buffer
-typedef enum {
- kBufferLockNone = 0, ///< No data was read
- kBufferLockStdout, ///< Data read from stdout
- kBufferLockStderr ///< Data read from stderr
-} BufferLock;
-
struct job {
// Job id the index in the job table plus one.
int id;
@@ -31,14 +26,10 @@ struct job {
bool stopped;
// Data associated with the job
void *data;
- // Buffer for reading from stdout or stderr
- char buffer[JOB_BUFFER_SIZE];
- // Size of the data from the last read
- uint32_t length;
- // Buffer lock state
- BufferLock lock;
// Callback for consuming data from the buffer
job_read_cb read_cb;
+ // Readable streams
+ RStream *out, *err;
// Structures for process spawning/management used by libuv
uv_process_t proc;
uv_process_options_t proc_opts;
@@ -49,6 +40,7 @@ struct job {
static Job *table[MAX_RUNNING_JOBS] = {NULL};
static uv_prepare_t job_prepare;
+static void read_cb(RStream *rstream, void *data, bool eof);
// Some helpers shared in this module
static bool is_alive(Job *job);
static Job * find_job(int id);
@@ -56,8 +48,7 @@ static void free_job(Job *job);
// Callbacks for libuv
static void job_prepare_cb(uv_prepare_t *handle);
-static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf);
-static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
+// static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
static void write_cb(uv_write_t *req, int status);
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal);
@@ -154,12 +145,10 @@ int job_start(char **argv, void *data, job_read_cb cb)
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
- job->proc_stdout.data = job;
job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
- job->proc_stderr.data = job;
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
@@ -170,8 +159,12 @@ int job_start(char **argv, void *data, job_read_cb cb)
}
// Start the readable streams
- uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb);
- uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
+ rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
+ rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
+ rstream_start(job->out);
+ rstream_start(job->err);
// Give the callback a reference to the job
job->proc.data = job;
// Save the job to the table
@@ -217,19 +210,13 @@ bool job_write(int id, char *data, uint32_t len)
void job_handle(Event event)
{
- Job *job = event.data.job;
+ Job *job = event.data.job.ptr;
// Invoke the job callback
job->read_cb(job->id,
job->data,
- job->buffer,
- job->length,
- job->lock == kBufferLockStdout);
-
- // restart reading
- job->lock = kBufferLockNone;
- uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb);
- uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb);
+ event.data.job.target,
+ event.data.job.from_stdout);
}
static bool is_alive(Job *job)
@@ -252,6 +239,8 @@ static void free_job(Job *job)
uv_close((uv_handle_t *)&job->proc_stdin, NULL);
uv_close((uv_handle_t *)&job->proc_stderr, NULL);
uv_close((uv_handle_t *)&job->proc, NULL);
+ rstream_free(job->out);
+ rstream_free(job->err);
free(job);
}
@@ -277,50 +266,22 @@ static void job_prepare_cb(uv_prepare_t *handle)
}
}
-/// Puts the job into a 'reading state' which 'locks' the job buffer
-/// until the data is consumed
-static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
-{
- Job *job = (Job *)handle->data;
-
- if (job->lock != kBufferLockNone) {
- // Already reserved the buffer for reading from stdout or stderr.
- buf->len = 0;
- return;
- }
-
- buf->base = job->buffer;
- buf->len = JOB_BUFFER_SIZE;
- // Avoid `alloc_cb`, `alloc_cb` sequences on windows and also mark which
- // stream we are reading from
- job->lock =
- (handle == (uv_handle_t *)&job->proc_stdout) ?
- kBufferLockStdout :
- kBufferLockStderr;
-}
-
/// Pushes a event object to the event queue, which will be handled later by
/// `job_handle`
-static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
+static void read_cb(RStream *rstream, void *data, bool eof)
{
Event event;
- Job *job = (Job *)stream->data;
- // pause reading on both streams
- uv_read_stop((uv_stream_t *)&job->proc_stdout);
- uv_read_stop((uv_stream_t *)&job->proc_stderr);
+ Job *job = data;
- if (cnt <= 0) {
- if (cnt != UV_ENOBUFS) {
- // Assume it's EOF and exit the job. Doesn't harm sending a SIGTERM
- // at this point
- uv_process_kill(&job->proc, SIGTERM);
- }
+ if (eof) {
+ uv_process_kill(&job->proc, SIGTERM);
return;
}
- job->length = cnt;
event.type = kEventJobActivity;
- event.data.job = job;
+ event.data.job.ptr = job;
+ event.data.job.target = rstream;
+ event.data.job.from_stdout = rstream == job->out;
event_push(event);
}
diff --git a/src/os/job.h b/src/os/job.h
index b0f9c99bb8..594a734bab 100644
--- a/src/os/job.h
+++ b/src/os/job.h
@@ -12,21 +12,6 @@
#include "os/event.h"
-/// Function called when the job reads data
-///
-/// @param id The job is
-/// @param data Some data associated with the job by the caller
-/// @param buffer Buffer containing the data read. It must be copied
-/// immediately.
-/// @param len Amount of bytes that must be read from `buffer`
-/// @param from_stdout This is true if data was read from the job's stdout,
-/// false if it came from stderr.
-typedef void (*job_read_cb)(int id,
- void *data,
- char *buffer,
- uint32_t len,
- bool from_stdout);
-
/// Initializes job control resources
void job_init(void);
diff --git a/src/os/job_defs.h b/src/os/job_defs.h
index 27a1133c2c..ada33d757e 100644
--- a/src/os/job_defs.h
+++ b/src/os/job_defs.h
@@ -1,6 +1,21 @@
#ifndef NEOVIM_OS_JOB_DEFS_H
#define NEOVIM_OS_JOB_DEFS_H
+#include "os/rstream_defs.h"
+
typedef struct job Job;
+/// Function called when the job reads data
+///
+/// @param id The job id
+/// @param data Some data associated with the job by the caller
+/// @param target The `RStream` instance containing data to be read
+/// @param from_stdout This is true if data was read from the job's stdout,
+/// false if it came from stderr.
+typedef void (*job_read_cb)(int id,
+ void *data,
+ RStream *target,
+ bool from_stdout);
+
#endif // NEOVIM_OS_JOB_DEFS_H
+