aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBjörn Linse <bjorn.linse@gmail.com>2016-08-20 12:58:37 +0200
committerGitHub <noreply@github.com>2016-08-20 12:58:37 +0200
commit71b3e20d0fba20d4957c1949352bde1744a0a947 (patch)
tree7844b2d185b27a0303183e90792a5ef807933e88 /src
parent1b825a9ada4d89059645bc7a458e1e8d931c6161 (diff)
parent2d60a15e25f487eda1ac00a9e6cdf9a6564fb416 (diff)
downloadrneovim-71b3e20d0fba20d4957c1949352bde1744a0a947.tar.gz
rneovim-71b3e20d0fba20d4957c1949352bde1744a0a947.tar.bz2
rneovim-71b3e20d0fba20d4957c1949352bde1744a0a947.zip
Merge pull request #4723 from bfredl/rpcstderr
allow stderr handler for rpc jobs and use it to display python/ruby startup error
Diffstat (limited to 'src')
-rw-r--r--src/nvim/eval.c118
-rw-r--r--src/nvim/event/process.c12
-rw-r--r--src/nvim/event/rstream.c17
-rw-r--r--src/nvim/event/socket.c4
-rw-r--r--src/nvim/event/stream.c9
-rw-r--r--src/nvim/event/stream.h3
-rw-r--r--src/nvim/event/wstream.c17
-rw-r--r--src/nvim/globals.h3
-rw-r--r--src/nvim/msgpack_rpc/channel.c92
-rw-r--r--src/nvim/msgpack_rpc/channel.h1
-rw-r--r--src/nvim/os/input.c6
-rw-r--r--src/nvim/os/shell.c8
-rw-r--r--src/nvim/tui/input.c12
13 files changed, 158 insertions, 144 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index d936c9572a..dce24230b0 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -408,6 +408,7 @@ typedef struct {
Terminal *term;
bool stopped;
bool exited;
+ bool rpc;
int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self;
@@ -448,8 +449,7 @@ typedef struct {
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
-static uint64_t current_job_id = 1;
-static PMap(uint64_t) *jobs = NULL;
+static PMap(uint64_t) *jobs = NULL;
static uint64_t last_timer_id = 0;
static PMap(uint64_t) *timers = NULL;
@@ -11724,16 +11724,35 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
- process_close_in(proc);
+ if (data->rpc) {
+ EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
+ } else {
+ process_close_in(proc);
+ }
} else if (!strcmp(stream, "stdout")) {
- process_close_out(proc);
+ if (data->rpc) {
+ EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
+ } else {
+ process_close_out(proc);
+ }
} else if (!strcmp(stream, "stderr")) {
process_close_err(proc);
+ } else if (!strcmp(stream, "rpc")) {
+ if (data->rpc) {
+ channel_close(data->id);
+ } else {
+ EMSG(_("Invalid job stream: Not an rpc job"));
+ }
} else {
EMSG2(_("Invalid job stream \"%s\""), stream);
}
} else {
- process_close_streams(proc);
+ if (data->rpc) {
+ channel_close(data->id);
+ process_close_err(proc);
+ } else {
+ process_close_streams(proc);
+ }
}
}
@@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
+ if (data->rpc) {
+ EMSG(_("Can't send raw data to rpc channel"));
+ return;
+ }
+
ssize_t input_len;
char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false);
if (!input) {
@@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
return;
}
+
dict_T *job_opts = NULL;
+ bool detach = false, rpc = false, pty = false;
ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL;
char *cwd = NULL;
if (argvars[1].v_type == VAR_DICT) {
job_opts = argvars[1].vval.v_dict;
+ detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0;
+ rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0;
+ pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0;
+ if (pty && rpc) {
+ EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set");
+ shell_free_argv(argv);
+ return;
+ }
+
char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false);
if (new_cwd && strlen(new_cwd) > 0) {
cwd = new_cwd;
@@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
- bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
- bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0;
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
- job_opts, pty, detach, cwd);
+ job_opts, pty, rpc, detach, cwd);
Process *proc = (Process *)&data->proc;
if (pty) {
@@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
- if (!on_stdout) {
+ if (!rpc && !on_stdout) {
proc->out = NULL;
}
if (!on_stderr) {
@@ -14105,7 +14138,7 @@ end:
api_free_object(result);
}
-// "rpcstart()" function
+// "rpcstart()" function (DEPRECATED)
static void f_rpcstart(typval_T *argvars, typval_T *rettv)
{
rettv->v_type = VAR_NUMBER;
@@ -14158,32 +14191,27 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
- uint64_t channel_id = channel_from_process(argv);
- if (!channel_id) {
- EMSG(_(e_api_spawn_failed));
- }
-
- rettv->vval.v_number = (varnumber_T)channel_id;
+ TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL,
+ NULL, false, true, false, NULL);
+ common_job_start(data, rettv);
}
// "rpcstop()" function
static void f_rpcstop(typval_T *argvars, typval_T *rettv)
{
- rettv->v_type = VAR_NUMBER;
- rettv->vval.v_number = 0;
-
- if (check_restricted() || check_secure()) {
- return;
- }
-
if (argvars[0].v_type != VAR_NUMBER) {
// Wrong argument types
EMSG(_(e_invarg));
return;
}
- rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
+ // if called with a job, stop it, else closes the channel
+ if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) {
+ f_jobstop(argvars, rettv);
+ } else {
+ rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
+ }
}
/*
@@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
}
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
- job_opts, true, false, cwd);
+ job_opts, true, false, false, cwd);
data->proc.pty.width = curwin->w_width;
data->proc.pty.height = curwin->w_height;
data->proc.pty.term_name = xstrdup("xterm-256color");
@@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv,
ufunc_T *on_exit,
dict_T *self,
bool pty,
+ bool rpc,
bool detach,
char *cwd)
{
@@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv,
data->on_exit = on_exit;
data->self = self;
data->events = queue_new_child(main_loop.events);
+ data->rpc = rpc;
if (pty) {
data->proc.pty = pty_process_init(&main_loop, data);
} else {
@@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv,
return data;
}
-/// Return true/false on success/failure.
+/// common code for getting job callbacks for jobstart, termopen and rpcstart
+///
+/// @return true/false on success/failure.
static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
ufunc_T **on_stderr, ufunc_T **on_exit)
{
@@ -22174,15 +22206,22 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
}
xfree(cmd);
- data->id = current_job_id++;
- wstream_init(proc->in, 0);
- if (proc->out) {
- rstream_init(proc->out, 0);
- rstream_start(proc->out, on_job_stdout);
+ data->id = next_chan_id++;
+
+ if (data->rpc) {
+ // the rpc channel takes over the in and out streams
+ channel_from_process(proc, data->id);
+ } else {
+ wstream_init(proc->in, 0);
+ if (proc->out) {
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, on_job_stdout, data);
+ }
}
+
if (proc->err) {
rstream_init(proc->err, 0);
- rstream_start(proc->err, on_job_stderr);
+ rstream_start(proc->err, on_job_stderr, data);
}
pmap_put(uint64_t)(jobs, data->id, data);
rettv->vval.v_number = data->id;
@@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d)
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
terminal_close(data->term, msg);
}
+ if (data->rpc) {
+ channel_process_exit(data->id, status);
+ }
if (data->status_ptr) {
*data->status_ptr = status;
}
process_job_event(data, data->on_exit, "exit", NULL, 0, status);
+
+ pmap_del(uint64_t)(jobs, data->id);
+ term_job_data_decref(data);
}
static void term_write(char *buf, size_t size, void *d)
@@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data)
static void on_job_event(JobEvent *ev)
{
if (!ev->callback) {
- goto end;
+ return;
}
typval_T argv[3];
@@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev)
call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum,
curwin->w_cursor.lnum, ev->data->self);
clear_tv(&rettv);
-
-end:
- if (!ev->received) {
- // exit event, safe to free job data now
- pmap_del(uint64_t)(jobs, ev->data->id);
- term_job_data_decref(ev->data);
- }
}
static TerminalJobData *find_job(uint64_t id)
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 317e40e43a..f507e3d71d 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -25,7 +25,7 @@
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
- stream_close(proc->stream, NULL); \
+ stream_close(proc->stream, NULL, NULL); \
} \
} while (0)
@@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return false;
}
- void *data = proc->data;
-
if (proc->in) {
- stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
+ stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
@@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->out) {
- stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
+ stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
@@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->err) {
- stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
+ stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
@@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream)
if (stream->read_cb) {
// Stream callback could miss EOF handling if a child keeps the stream
// open.
- stream->read_cb(stream, stream->buffer, 0, stream->data, true);
+ stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
}
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index a520143064..5126dfd84e 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -17,21 +17,19 @@
# include "event/rstream.c.generated.h"
#endif
-void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
- void *data)
+void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL, data);
+ stream_init(loop, stream, fd, NULL);
rstream_init(stream, bufsize);
}
-void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
- void *data)
+void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream, data);
+ stream_init(NULL, stream, -1, uvstream);
rstream_init(stream, bufsize);
}
@@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize)
/// Starts watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
-void rstream_start(Stream *stream, stream_read_cb cb)
+void rstream_start(Stream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
+ stream->cb_data = data;
if (stream->uvstream) {
uv_read_start(stream->uvstream, alloc_cb, read_cb);
} else {
@@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
Stream *stream = data;
assert(stream->read_cb);
- rstream_start(stream, stream->read_cb);
+ rstream_start(stream, stream->read_cb, stream->cb_data);
}
// Callbacks used by libuv
@@ -179,7 +178,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
- stream->read_cb(stream, stream->buffer, count, stream->data, eof);
+ stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
if (stream->closed && !stream->pending_reqs) {
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index cdaf40849b..8f9327f3d4 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0;
}
-int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
+int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
@@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
return result;
}
- stream_init(NULL, stream, -1, client, data);
+ stream_init(NULL, stream, -1, client);
return 0;
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 33404158cf..26083c20f4 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking)
return retval;
}
-void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
- void *data)
+void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(2)
{
stream->uvstream = uvstream;
@@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->uvstream->data = stream;
}
- stream->data = data;
stream->internal_data = NULL;
stream->fpos = 0;
stream->curmem = 0;
@@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->num_bytes = 0;
}
-void stream_close(Stream *stream, stream_close_cb on_stream_close)
+void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
stream->closed = true;
stream->close_cb = on_stream_close;
+ stream->close_cb_data = data;
if (!stream->pending_reqs) {
stream_close_handle(stream);
@@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle)
rbuffer_free(stream->buffer);
}
if (stream->close_cb) {
- stream->close_cb(stream, stream->data);
+ stream->close_cb(stream, stream->close_cb_data);
}
if (stream->internal_close_cb) {
stream->internal_close_cb(stream, stream->internal_data);
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index ad4e24775b..a176fac1c0 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -44,13 +44,14 @@ struct stream {
uv_file fd;
stream_read_cb read_cb;
stream_write_cb write_cb;
+ void *cb_data;
stream_close_cb close_cb, internal_close_cb;
+ void *close_cb_data, *internal_data;
size_t fpos;
size_t curmem;
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
- void *data, *internal_data;
bool closed;
Queue *events;
};
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index 8028e35e6b..fc7aad8eb9 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -22,19 +22,17 @@ typedef struct {
# include "event/wstream.c.generated.h"
#endif
-void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
- void *data)
+void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL, data);
+ stream_init(loop, stream, fd, NULL);
wstream_init(stream, maxmem);
}
-void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
- void *data)
+void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream, data);
+ stream_init(NULL, stream, -1, uvstream);
wstream_init(stream, maxmem);
}
@@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem)
///
/// @param stream The `Stream` instance
/// @param cb The callback
-void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
- FUNC_ATTR_NONNULL_ALL
+void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
+ FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream->write_cb = cb;
+ stream->cb_data = data;
}
/// Queues data for writing to the backing file descriptor of a `Stream`
@@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status)
wstream_release_wbuffer(data->buffer);
if (data->stream->write_cb) {
- data->stream->write_cb(data->stream, data->stream->data, status);
+ data->stream->write_cb(data->stream, data->stream->cb_data, status);
}
data->stream->pending_reqs--;
diff --git a/src/nvim/globals.h b/src/nvim/globals.h
index 950ceb4c74..4c014010c2 100644
--- a/src/nvim/globals.h
+++ b/src/nvim/globals.h
@@ -1244,6 +1244,9 @@ EXTERN char *ignoredp;
// If a msgpack-rpc channel should be started over stdin/stdout
EXTERN bool embedded_mode INIT(= false);
+/// next free id for a job or rpc channel
+EXTERN uint64_t next_chan_id INIT(= 1);
+
/// Used to track the status of external functions.
/// Currently only used for iconv().
typedef enum {
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 5b249ee1c7..8b5f212d66 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -19,6 +19,7 @@
#include "nvim/main.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
+#include "nvim/eval.h"
#include "nvim/os_unix.h"
#include "nvim/message.h"
#include "nvim/map.h"
@@ -55,12 +56,7 @@ typedef struct {
msgpack_unpacker *unpacker;
union {
Stream stream;
- struct {
- LibuvProcess uvproc;
- Stream in;
- Stream out;
- Stream err;
- } process;
+ Process *proc;
struct {
Stream in;
Stream out;
@@ -79,7 +75,6 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
-static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@@ -112,33 +107,20 @@ void channel_teardown(void)
}
/// Creates an API channel by starting a process and connecting to its
-/// stdin/stdout. stderr is forwarded to the editor error stream.
+/// stdin/stdout. stderr is handled by the job infrastructure.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_process(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeProc);
- channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
- Process *proc = &channel->data.process.uvproc.process;
- proc->argv = argv;
- proc->in = &channel->data.process.in;
- proc->out = &channel->data.process.out;
- proc->err = &channel->data.process.err;
- proc->cb = process_exit;
- if (!process_spawn(proc)) {
- loop_poll_events(&main_loop, 0);
- decref(channel);
- return 0;
- }
-
+uint64_t channel_from_process(Process *proc, uint64_t id)
+{
+ Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
incref(channel); // process channels are only closed by the exit_cb
+ channel->data.proc = proc;
+
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack);
- rstream_init(proc->err, 0);
- rstream_start(proc->err, forward_stderr);
+ rstream_start(proc->out, parse_msgpack, channel);
return channel->id;
}
@@ -148,14 +130,14 @@ uint64_t channel_from_process(char **argv)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel(kChannelTypeSocket);
- socket_watcher_accept(watcher, &channel->data.stream, channel);
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack);
+ rstream_start(&channel->data.stream, parse_msgpack, channel);
}
/// Sends event/arguments to channel
@@ -314,30 +296,21 @@ bool channel_close(uint64_t id)
/// Neovim
void channel_from_stdio(void)
{
- Channel *channel = register_channel(kChannelTypeStdio);
+ Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
- rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
- channel);
- rstream_start(&channel->data.std.in, parse_msgpack);
+ rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.std.in, parse_msgpack, channel);
// write stream
- wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL);
+ wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
-static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
- void *data, bool eof)
+void channel_process_exit(uint64_t id, int status)
{
- while (rbuffer_size(rbuf)) {
- char buf[256];
- size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
- buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
- }
-}
+ Channel *channel = pmap_get(uint64_t)(channels, id);
-static void process_exit(Process *proc, int status, void *data)
-{
- decref(data);
+ channel->closed = true;
+ decref(channel);
}
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
@@ -512,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeProc:
- success = wstream_write(&channel->data.process.in, buffer);
+ success = wstream_write(channel->data.proc->in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@@ -637,16 +610,17 @@ static void close_channel(Channel *channel)
switch (channel->type) {
case kChannelTypeSocket:
- stream_close(&channel->data.stream, NULL);
+ stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
- if (!channel->data.process.uvproc.process.closed) {
- process_stop(&channel->data.process.uvproc.process);
- }
+ // Only close the rpc channel part,
+ // there could be an error message on the stderr stream
+ process_close_in(channel->data.proc);
+ process_close_out(channel->data.proc);
break;
case kChannelTypeStdio:
- stream_close(&channel->data.std.in, NULL);
- stream_close(&channel->data.std.out, NULL);
+ stream_close(&channel->data.std.in, NULL, NULL);
+ stream_close(&channel->data.std.out, NULL, NULL);
queue_put(main_loop.fast_events, exit_event, 1, channel);
return;
default:
@@ -680,7 +654,9 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
- queue_free(channel->events);
+ if (channel->type != kChannelTypeProc) {
+ queue_free(channel->events);
+ }
xfree(channel);
}
@@ -689,15 +665,15 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
-static Channel *register_channel(ChannelType type)
+static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)
{
Channel *rv = xmalloc(sizeof(Channel));
- rv->events = queue_new_child(main_loop.events);
+ rv->events = events ? events : queue_new_child(main_loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = next_id++;
+ rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
index 104547a7b8..0d92976d02 100644
--- a/src/nvim/msgpack_rpc/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -6,6 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/event/socket.h"
+#include "nvim/event/process.h"
#include "nvim/vim.h"
#define METHOD_MAXLEN 512
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index a4e01b18cd..c0c73364c0 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -60,8 +60,8 @@ void input_start(int fd)
}
global_fd = fd;
- rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL);
- rstream_start(&read_stream, read_cb);
+ rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE);
+ rstream_start(&read_stream, read_cb, NULL);
}
void input_stop(void)
@@ -71,7 +71,7 @@ void input_stop(void)
}
rstream_stop(&read_stream);
- stream_close(&read_stream, NULL);
+ stream_close(&read_stream, NULL, NULL);
}
static void cursorhold_event(void **argv)
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 64c673930a..ba52b9f661 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -236,10 +236,10 @@ static int do_os_system(char **argv,
}
proc->out->events = NULL;
rstream_init(proc->out, 0);
- rstream_start(proc->out, data_cb);
+ rstream_start(proc->out, data_cb, &buf);
proc->err->events = NULL;
rstream_init(proc->err, 0);
- rstream_start(proc->err, data_cb);
+ rstream_start(proc->err, data_cb, &buf);
// write the input, if any
if (input) {
@@ -251,7 +251,7 @@ static int do_os_system(char **argv,
return -1;
}
// close the input stream after everything is written
- wstream_set_write_cb(&in, shell_write_cb);
+ wstream_set_write_cb(&in, shell_write_cb, NULL);
}
// invoke busy_start here so event_poll_until wont change the busy state for
@@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
static void shell_write_cb(Stream *stream, void *data, int status)
{
- stream_close(stream, NULL);
+ stream_close(stream, NULL, NULL);
}
diff --git a/src/nvim/tui/input.c b/src/nvim/tui/input.c
index be256f3ebc..3ef4d34c9a 100644
--- a/src/nvim/tui/input.c
+++ b/src/nvim/tui/input.c
@@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop)
int curflags = termkey_get_canonflags(input->tk);
termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS);
// setup input handle
- rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input);
+ rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff);
// initialize a timer handle for handling ESC with libtermkey
time_watcher_init(loop, &input->timer_handle, input);
}
@@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input)
uv_mutex_destroy(&input->key_buffer_mutex);
uv_cond_destroy(&input->key_buffer_cond);
time_watcher_close(&input->timer_handle, NULL);
- stream_close(&input->read_stream, NULL);
+ stream_close(&input->read_stream, NULL, NULL);
termkey_destroy(input->tk);
}
void term_input_start(TermInput *input)
{
- rstream_start(&input->read_stream, read_cb);
+ rstream_start(&input->read_stream, read_cb, input);
}
void term_input_stop(TermInput *input)
@@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
//
// ls *.md | xargs nvim
input->in_fd = 2;
- stream_close(&input->read_stream, NULL);
+ stream_close(&input->read_stream, NULL, NULL);
queue_put(input->loop->fast_events, restart_reading, 1, input);
} else {
loop_schedule(&main_loop, event_create(1, input_done_event, 0));
@@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
static void restart_reading(void **argv)
{
TermInput *input = argv[0];
- rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input);
- rstream_start(&input->read_stream, read_cb);
+ rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff);
+ rstream_start(&input->read_stream, read_cb, input);
}