aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--runtime/autoload/provider/pythonx.vim21
-rw-r--r--runtime/autoload/provider/ruby.vim18
-rw-r--r--runtime/autoload/remote/host.vim5
-rw-r--r--runtime/doc/eval.txt49
-rw-r--r--runtime/doc/msgpack_rpc.txt34
-rw-r--r--src/nvim/eval.c116
-rw-r--r--src/nvim/globals.h3
-rw-r--r--src/nvim/msgpack_rpc/channel.c73
-rw-r--r--src/nvim/msgpack_rpc/channel.h1
-rw-r--r--test/functional/api/rpc_fixture.lua38
-rw-r--r--test/functional/api/server_requests_spec.lua46
-rw-r--r--test/functional/core/job_spec.lua8
12 files changed, 272 insertions, 140 deletions
diff --git a/runtime/autoload/provider/pythonx.vim b/runtime/autoload/provider/pythonx.vim
index 0ebf00112f..6d6b38978c 100644
--- a/runtime/autoload/provider/pythonx.vim
+++ b/runtime/autoload/provider/pythonx.vim
@@ -5,11 +5,24 @@ endif
let s:loaded_pythonx_provider = 1
+let s:stderr = {}
+let s:job_opts = {'rpc': v:true}
+
+" TODO(bfredl): this logic is common and should be builtin
+function! s:job_opts.on_stderr(chan_id, data, event)
+ let stderr = get(s:stderr, a:chan_id, [''])
+ let last = remove(stderr, -1)
+ let a:data[0] = last.a:data[0]
+ call extend(stderr, a:data)
+ let s:stderr[a:chan_id] = stderr
+endfunction
+
function! provider#pythonx#Require(host) abort
let ver = (a:host.orig_name ==# 'python') ? 2 : 3
" Python host arguments
- let args = ['-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
+ let prog = (ver == '2' ? provider#python#Prog() : provider#python3#Prog())
+ let args = [prog, '-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
" Collect registered Python plugins into args
let python_plugins = remote#host#PluginsForHost(a:host.name)
@@ -18,14 +31,16 @@ function! provider#pythonx#Require(host) abort
endfor
try
- let channel_id = rpcstart((ver ==# '2' ?
- \ provider#python#Prog() : provider#python3#Prog()), args)
+ let channel_id = jobstart(args, s:job_opts)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
+ for row in get(s:stderr, channel_id, [])
+ echomsg row
+ endfor
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name,
\ '$NVIM_PYTHON_LOG_FILE')
diff --git a/runtime/autoload/provider/ruby.vim b/runtime/autoload/provider/ruby.vim
index e9130b98c1..c8ede20a75 100644
--- a/runtime/autoload/provider/ruby.vim
+++ b/runtime/autoload/provider/ruby.vim
@@ -4,6 +4,17 @@ if exists('g:loaded_ruby_provider')
endif
let g:loaded_ruby_provider = 1
+let s:stderr = {}
+let s:job_opts = {'rpc': v:true}
+
+function! s:job_opts.on_stderr(chan_id, data, event)
+ let stderr = get(s:stderr, a:chan_id, [''])
+ let last = remove(stderr, -1)
+ let a:data[0] = last.a:data[0]
+ call extend(stderr, a:data)
+ let s:stderr[a:chan_id] = stderr
+endfunction
+
function! provider#ruby#Detect() abort
return exepath('neovim-ruby-host')
endfunction
@@ -13,7 +24,7 @@ function! provider#ruby#Prog()
endfunction
function! provider#ruby#Require(host) abort
- let args = []
+ let args = [provider#ruby#Prog()]
let ruby_plugins = remote#host#PluginsForHost(a:host.name)
for plugin in ruby_plugins
@@ -21,13 +32,16 @@ function! provider#ruby#Require(host) abort
endfor
try
- let channel_id = rpcstart(provider#ruby#Prog(), args)
+ let channel_id = jobstart(args, s:job_opts)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
+ for row in get(s:stderr, channel_id, [])
+ echomsg row
+ endfor
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_RUBY_LOG_FILE')
endfunction
diff --git a/runtime/autoload/remote/host.vim b/runtime/autoload/remote/host.vim
index 5948de2b3d..065644121a 100644
--- a/runtime/autoload/remote/host.vim
+++ b/runtime/autoload/remote/host.vim
@@ -261,9 +261,8 @@ function! remote#host#LoadErrorForHost(host, log) abort
\ 'You can try to see what happened '.
\ 'by starting Neovim with the environment variable '.
\ a:log . ' set to a file and opening the generated '.
- \ 'log file. Also, the host stderr will be available '.
- \ 'in Neovim log, so it may contain useful information. '.
- \ 'See also ~/.nvimlog.'
+ \ 'log file. Also, the host stderr is available '.
+ \ 'in messages.'
endfunction
diff --git a/runtime/doc/eval.txt b/runtime/doc/eval.txt
index 3fa5474a7e..41373c135d 100644
--- a/runtime/doc/eval.txt
+++ b/runtime/doc/eval.txt
@@ -2043,7 +2043,6 @@ rpcnotify({channel}, {event}[, {args}...])
Sends an |RPC| notification to {channel}
rpcrequest({channel}, {method}[, {args}...])
Sends an |RPC| request to {channel}
-rpcstart({prog}[, {argv}]) Spawns {prog} and opens an |RPC| channel
rpcstop({channel}) Closes an |RPC| {channel}
screenattr({row}, {col}) Number attribute at screen position
screenchar({row}, {col}) Number character at screen position
@@ -4395,8 +4394,10 @@ items({dict}) *items()*
order.
jobclose({job}[, {stream}]) {Nvim} *jobclose()*
- Close {job}'s {stream}, which can be one "stdin", "stdout" or
- "stderr". If {stream} is omitted, all streams are closed.
+ Close {job}'s {stream}, which can be one of "stdin", "stdout",
+ "stderr" or "rpc" (closes the rpc channel for a job started
+ with the "rpc" option.) If {stream} is omitted, all streams
+ are closed.
jobpid({job}) {Nvim} *jobpid()*
Return the pid (process id) of {job}.
@@ -4418,6 +4419,10 @@ jobsend({job}, {data}) {Nvim} *jobsend()*
:call jobsend(j, ["abc", "123\n456", ""])
< will send "abc<NL>123<NUL>456<NL>".
+ If the job was started with the rpc option this function
+ cannot be used, instead use |rpcnotify()| and |rpcrequest()|
+ to communicate with the job.
+
jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
Spawns {cmd} as a job. If {cmd} is a |List| it is run
directly. If {cmd} is a |String| it is processed like this: >
@@ -4433,9 +4438,14 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
on_exit : exit event handler (function name or |Funcref|)
cwd : Working directory of the job; defaults to
|current-directory|.
+ rpc : If set, |msgpack-rpc| will be used to communicate
+ with the job over stdin and stdout. "on_stdout" is
+ then ignored, but "on_stderr" can still be used.
pty : If set, the job will be connected to a new pseudo
- terminal, and the job streams are connected to
- the master file descriptor.
+ terminal, and the job streams are connected to
+ the master file descriptor. "on_stderr" is ignored
+ as all output will be received on stdout.
+
width : (pty only) Width of the terminal screen
height : (pty only) Height of the terminal screen
TERM : (pty only) $TERM environment variable
@@ -4447,10 +4457,12 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
{opts} is passed as |self| to the callback; the caller may
pass arbitrary data by setting other keys.
Returns:
- - job ID on success, used by |jobsend()| and |jobstop()|
+ - The job ID on success, which is used by |jobsend()| (or
+ |rpcnotify()| and |rpcrequest()| if "rpc" option was used)
+ and |jobstop()|
- 0 on invalid arguments or if the job table is full
- -1 if {cmd}[0] is not executable.
- See |job-control| for more information.
+ See |job-control| and |msgpack-rpc| for more information.
jobstop({job}) {Nvim} *jobstop()*
Stop a job created with |jobstart()| by sending a `SIGTERM`
@@ -5649,19 +5661,20 @@ rpcrequest({channel}, {method}[, {args}...]) {Nvim} *rpcrequest()*
:let result = rpcrequest(rpc_chan, "func", 1, 2, 3)
rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()*
- Spawns {prog} as a job (optionally passing the list {argv}),
- and opens an |RPC| channel with the spawned process's
- stdin/stdout. Returns:
- - channel id on success, which is used by |rpcrequest()|,
- |rpcnotify()| and |rpcstop()|
- - 0 on failure
- Example: >
- :let rpc_chan = rpcstart('prog', ['arg1', 'arg2'])
+ Deprecated. Replace >
+ :let id = rpcstart('prog', ['arg1', 'arg2'])
+< with >
+ :let id = jobstart(['prog', 'arg1', 'arg2'],
+ {'rpc': v:true})
rpcstop({channel}) {Nvim} *rpcstop()*
- Closes an |RPC| {channel}, possibly created via
- |rpcstart()|. Also closes channels created by connections to
- |v:servername|.
+ Closes an |RPC| {channel}. If the channel is a job
+ started with |jobstart()| the job is killed.
+ It is better to use |jobstop()| in this case, or use
+ |jobclose|(id, "rpc") to only close the channel without
+ killing the job.
+ Closes the socket connection if the channel was opened by
+ connecting to |v:servername|.
screenattr(row, col) *screenattr()*
Like screenchar(), but return the attribute. This is a rather
diff --git a/runtime/doc/msgpack_rpc.txt b/runtime/doc/msgpack_rpc.txt
index cfd9084cfc..18c0ff8a58 100644
--- a/runtime/doc/msgpack_rpc.txt
+++ b/runtime/doc/msgpack_rpc.txt
@@ -11,7 +11,6 @@ RPC API for Nvim *RPC* *rpc* *msgpack-rpc*
3. Connecting |rpc-connecting|
4. Clients |rpc-api-client|
5. Types |rpc-types|
-6. Vimscript functions |rpc-vim-functions|
==============================================================================
1. Introduction *rpc-intro*
@@ -66,12 +65,16 @@ To get a formatted dump of the API using python (requires the `pyyaml` and
==============================================================================
3. Connecting *rpc-connecting*
-There are several ways to open a msgpack-rpc stream to an Nvim server:
+There are several ways to open a msgpack-rpc channel to an Nvim instance:
1. Through stdin/stdout when `nvim` is started with `--embed`. This is how
applications can embed Nvim.
- 2. Through stdin/stdout of some other process spawned by |rpcstart()|.
+ 2. Through stdin/stdout of some other process spawned by |jobstart()|.
+ Set the "rpc" key to |v:true| in the options dict to use the job's stdin
+ and stdout as a single msgpack channel that is processed directly by
+ Nvim. Then it is not possible to process raw data to or from the
+ process's stdin and stdout. stderr can still be used, though.
3. Through the socket automatically created with each instance. The socket
location is stored in |v:servername|.
@@ -110,11 +113,12 @@ functions can be called interactively:
>>> nvim = attach('socket', path='[address]')
>>> nvim.command('echo "hello world!"')
<
-You can also embed an Nvim instance via |rpcstart()|
+You can also embed an Nvim instance via |jobstart()|, and communicate using
+|rpcrequest()| and |rpcnotify()|:
>
- let vim = rpcstart('nvim', ['--embed'])
+ let vim = jobstart(['nvim', '--embed'], {'rpc': v:true})
echo rpcrequest(vim, 'vim_eval', '"Hello " . "world!"')
- call rpcstop(vim)
+ call jobstop(vim)
<
==============================================================================
4. Implementing API clients *rpc-api-client* *api-client*
@@ -234,22 +238,4 @@ the type codes, because a client may be built against one Nvim version but
connect to another with different type codes.
==============================================================================
-6. Vimscript functions *rpc-vim-functions*
-
-RPC functions are available in Vimscript:
-
- 1. |rpcstart()|: Similarly to |jobstart()|, this will spawn a co-process
- with its standard handles connected to Nvim. The difference is that it's
- not possible to process raw data to or from the process's stdin, stdout,
- or stderr. This is because the job's stdin and stdout are used as
- a single msgpack channel that is processed directly by Nvim.
- 2. |rpcstop()|: Same as |jobstop()|, but operates on handles returned by
- |rpcstart()|.
- 3. |rpcrequest()|: Sends a msgpack-rpc request to the process.
- 4. |rpcnotify()|: Sends a msgpack-rpc notification to the process.
-
-|rpcrequest()| and |rpcnotify()| can also be used with channels connected to
-a nvim server. |v:servername|
-
-==============================================================================
vim:tw=78:ts=8:noet:ft=help:norl:
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 2a6adbdc8b..dce24230b0 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -408,6 +408,7 @@ typedef struct {
Terminal *term;
bool stopped;
bool exited;
+ bool rpc;
int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self;
@@ -448,8 +449,7 @@ typedef struct {
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
-static uint64_t current_job_id = 1;
-static PMap(uint64_t) *jobs = NULL;
+static PMap(uint64_t) *jobs = NULL;
static uint64_t last_timer_id = 0;
static PMap(uint64_t) *timers = NULL;
@@ -11724,16 +11724,35 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
- process_close_in(proc);
+ if (data->rpc) {
+ EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
+ } else {
+ process_close_in(proc);
+ }
} else if (!strcmp(stream, "stdout")) {
- process_close_out(proc);
+ if (data->rpc) {
+ EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
+ } else {
+ process_close_out(proc);
+ }
} else if (!strcmp(stream, "stderr")) {
process_close_err(proc);
+ } else if (!strcmp(stream, "rpc")) {
+ if (data->rpc) {
+ channel_close(data->id);
+ } else {
+ EMSG(_("Invalid job stream: Not an rpc job"));
+ }
} else {
EMSG2(_("Invalid job stream \"%s\""), stream);
}
} else {
- process_close_streams(proc);
+ if (data->rpc) {
+ channel_close(data->id);
+ process_close_err(proc);
+ } else {
+ process_close_streams(proc);
+ }
}
}
@@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
+ if (data->rpc) {
+ EMSG(_("Can't send raw data to rpc channel"));
+ return;
+ }
+
ssize_t input_len;
char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false);
if (!input) {
@@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
return;
}
+
dict_T *job_opts = NULL;
+ bool detach = false, rpc = false, pty = false;
ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL;
char *cwd = NULL;
if (argvars[1].v_type == VAR_DICT) {
job_opts = argvars[1].vval.v_dict;
+ detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0;
+ rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0;
+ pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0;
+ if (pty && rpc) {
+ EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set");
+ shell_free_argv(argv);
+ return;
+ }
+
char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false);
if (new_cwd && strlen(new_cwd) > 0) {
cwd = new_cwd;
@@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
- bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
- bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0;
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
- job_opts, pty, detach, cwd);
+ job_opts, pty, rpc, detach, cwd);
Process *proc = (Process *)&data->proc;
if (pty) {
@@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
- if (!on_stdout) {
+ if (!rpc && !on_stdout) {
proc->out = NULL;
}
if (!on_stderr) {
@@ -14105,7 +14138,7 @@ end:
api_free_object(result);
}
-// "rpcstart()" function
+// "rpcstart()" function (DEPRECATED)
static void f_rpcstart(typval_T *argvars, typval_T *rettv)
{
rettv->v_type = VAR_NUMBER;
@@ -14158,32 +14191,27 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
- uint64_t channel_id = channel_from_process(argv);
- if (!channel_id) {
- EMSG(_(e_api_spawn_failed));
- }
-
- rettv->vval.v_number = (varnumber_T)channel_id;
+ TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL,
+ NULL, false, true, false, NULL);
+ common_job_start(data, rettv);
}
// "rpcstop()" function
static void f_rpcstop(typval_T *argvars, typval_T *rettv)
{
- rettv->v_type = VAR_NUMBER;
- rettv->vval.v_number = 0;
-
- if (check_restricted() || check_secure()) {
- return;
- }
-
if (argvars[0].v_type != VAR_NUMBER) {
// Wrong argument types
EMSG(_(e_invarg));
return;
}
- rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
+ // if called with a job, stop it, else closes the channel
+ if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) {
+ f_jobstop(argvars, rettv);
+ } else {
+ rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
+ }
}
/*
@@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
}
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
- job_opts, true, false, cwd);
+ job_opts, true, false, false, cwd);
data->proc.pty.width = curwin->w_width;
data->proc.pty.height = curwin->w_height;
data->proc.pty.term_name = xstrdup("xterm-256color");
@@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv,
ufunc_T *on_exit,
dict_T *self,
bool pty,
+ bool rpc,
bool detach,
char *cwd)
{
@@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv,
data->on_exit = on_exit;
data->self = self;
data->events = queue_new_child(main_loop.events);
+ data->rpc = rpc;
if (pty) {
data->proc.pty = pty_process_init(&main_loop, data);
} else {
@@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv,
return data;
}
-/// Return true/false on success/failure.
+/// common code for getting job callbacks for jobstart, termopen and rpcstart
+///
+/// @return true/false on success/failure.
static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
ufunc_T **on_stderr, ufunc_T **on_exit)
{
@@ -22174,12 +22206,19 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
}
xfree(cmd);
- data->id = current_job_id++;
- wstream_init(proc->in, 0);
- if (proc->out) {
- rstream_init(proc->out, 0);
- rstream_start(proc->out, on_job_stdout, data);
+ data->id = next_chan_id++;
+
+ if (data->rpc) {
+ // the rpc channel takes over the in and out streams
+ channel_from_process(proc, data->id);
+ } else {
+ wstream_init(proc->in, 0);
+ if (proc->out) {
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, on_job_stdout, data);
+ }
}
+
if (proc->err) {
rstream_init(proc->err, 0);
rstream_start(proc->err, on_job_stderr, data);
@@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d)
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
terminal_close(data->term, msg);
}
+ if (data->rpc) {
+ channel_process_exit(data->id, status);
+ }
if (data->status_ptr) {
*data->status_ptr = status;
}
process_job_event(data, data->on_exit, "exit", NULL, 0, status);
+
+ pmap_del(uint64_t)(jobs, data->id);
+ term_job_data_decref(data);
}
static void term_write(char *buf, size_t size, void *d)
@@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data)
static void on_job_event(JobEvent *ev)
{
if (!ev->callback) {
- goto end;
+ return;
}
typval_T argv[3];
@@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev)
call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum,
curwin->w_cursor.lnum, ev->data->self);
clear_tv(&rettv);
-
-end:
- if (!ev->received) {
- // exit event, safe to free job data now
- pmap_del(uint64_t)(jobs, ev->data->id);
- term_job_data_decref(ev->data);
- }
}
static TerminalJobData *find_job(uint64_t id)
diff --git a/src/nvim/globals.h b/src/nvim/globals.h
index 950ceb4c74..4c014010c2 100644
--- a/src/nvim/globals.h
+++ b/src/nvim/globals.h
@@ -1244,6 +1244,9 @@ EXTERN char *ignoredp;
// If a msgpack-rpc channel should be started over stdin/stdout
EXTERN bool embedded_mode INIT(= false);
+/// next free id for a job or rpc channel
+EXTERN uint64_t next_chan_id INIT(= 1);
+
/// Used to track the status of external functions.
/// Currently only used for iconv().
typedef enum {
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index c3378783f2..8b5f212d66 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -19,6 +19,7 @@
#include "nvim/main.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
+#include "nvim/eval.h"
#include "nvim/os_unix.h"
#include "nvim/message.h"
#include "nvim/map.h"
@@ -55,12 +56,7 @@ typedef struct {
msgpack_unpacker *unpacker;
union {
Stream stream;
- struct {
- LibuvProcess uvproc;
- Stream in;
- Stream out;
- Stream err;
- } process;
+ Process *proc;
struct {
Stream in;
Stream out;
@@ -79,7 +75,6 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
-static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@@ -112,33 +107,20 @@ void channel_teardown(void)
}
/// Creates an API channel by starting a process and connecting to its
-/// stdin/stdout. stderr is forwarded to the editor error stream.
+/// stdin/stdout. stderr is handled by the job infrastructure.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_process(char **argv)
-{
- Channel *channel = register_channel(kChannelTypeProc);
- channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
- Process *proc = &channel->data.process.uvproc.process;
- proc->argv = argv;
- proc->in = &channel->data.process.in;
- proc->out = &channel->data.process.out;
- proc->err = &channel->data.process.err;
- proc->cb = process_exit;
- if (!process_spawn(proc)) {
- loop_poll_events(&main_loop, 0);
- decref(channel);
- return 0;
- }
-
+uint64_t channel_from_process(Process *proc, uint64_t id)
+{
+ Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
incref(channel); // process channels are only closed by the exit_cb
+ channel->data.proc = proc;
+
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
rstream_start(proc->out, parse_msgpack, channel);
- rstream_init(proc->err, 0);
- rstream_start(proc->err, forward_stderr, channel);
return channel->id;
}
@@ -148,7 +130,7 @@ uint64_t channel_from_process(char **argv)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel(kChannelTypeSocket);
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
@@ -314,7 +296,7 @@ bool channel_close(uint64_t id)
/// Neovim
void channel_from_stdio(void)
{
- Channel *channel = register_channel(kChannelTypeStdio);
+ Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
@@ -323,20 +305,12 @@ void channel_from_stdio(void)
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
-static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
- void *data, bool eof)
+void channel_process_exit(uint64_t id, int status)
{
- while (rbuffer_size(rbuf)) {
- char buf[256];
- size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
- buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
- }
-}
+ Channel *channel = pmap_get(uint64_t)(channels, id);
-static void process_exit(Process *proc, int status, void *data)
-{
- decref(data);
+ channel->closed = true;
+ decref(channel);
}
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
@@ -511,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeProc:
- success = wstream_write(&channel->data.process.in, buffer);
+ success = wstream_write(channel->data.proc->in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@@ -639,9 +613,10 @@ static void close_channel(Channel *channel)
stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
- if (!channel->data.process.uvproc.process.closed) {
- process_stop(&channel->data.process.uvproc.process);
- }
+ // Only close the rpc channel part,
+ // there could be an error message on the stderr stream
+ process_close_in(channel->data.proc);
+ process_close_out(channel->data.proc);
break;
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL, NULL);
@@ -679,7 +654,9 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
- queue_free(channel->events);
+ if (channel->type != kChannelTypeProc) {
+ queue_free(channel->events);
+ }
xfree(channel);
}
@@ -688,15 +665,15 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
-static Channel *register_channel(ChannelType type)
+static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)
{
Channel *rv = xmalloc(sizeof(Channel));
- rv->events = queue_new_child(main_loop.events);
+ rv->events = events ? events : queue_new_child(main_loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = next_id++;
+ rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
index 104547a7b8..0d92976d02 100644
--- a/src/nvim/msgpack_rpc/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -6,6 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/event/socket.h"
+#include "nvim/event/process.h"
#include "nvim/vim.h"
#define METHOD_MAXLEN 512
diff --git a/test/functional/api/rpc_fixture.lua b/test/functional/api/rpc_fixture.lua
new file mode 100644
index 0000000000..423864740f
--- /dev/null
+++ b/test/functional/api/rpc_fixture.lua
@@ -0,0 +1,38 @@
+local deps_prefix = './.deps/usr'
+if os.getenv('DEPS_PREFIX') then
+ deps_prefix = os.getenv('DEPS_PREFIX')
+end
+
+package.path = deps_prefix .. '/share/lua/5.1/?.lua;' ..
+ deps_prefix .. '/share/lua/5.1/?/init.lua;' ..
+ package.path
+
+package.cpath = deps_prefix .. '/lib/lua/5.1/?.so;' ..
+ package.cpath
+
+local mpack = require('mpack')
+local StdioStream = require('nvim.stdio_stream')
+local Session = require('nvim.session')
+
+local stdio_stream = StdioStream.open()
+local session = Session.new(stdio_stream)
+
+local function on_request(method, args)
+ if method == 'poll' then
+ return 'ok'
+ elseif method == 'write_stderr' then
+ io.stderr:write(args[1])
+ return "done!"
+ elseif method == "exit" then
+ session:stop()
+ return mpack.NIL
+ end
+end
+
+local function on_notification(event, args)
+ if event == 'ping' and #args == 0 then
+ session:notify("vim_eval", "rpcnotify(g:channel, 'pong')")
+ end
+end
+
+session:run(on_request, on_notification)
diff --git a/test/functional/api/server_requests_spec.lua b/test/functional/api/server_requests_spec.lua
index eb63834cb0..b76c3b9cd6 100644
--- a/test/functional/api/server_requests_spec.lua
+++ b/test/functional/api/server_requests_spec.lua
@@ -4,7 +4,9 @@
local helpers = require('test.functional.helpers')(after_each)
local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval
local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop
-local nvim_prog = helpers.nvim_prog
+local nvim_prog, command, funcs = helpers.nvim_prog, helpers.command, helpers.funcs
+local source, next_message = helpers.source, helpers.next_message
+local meths = helpers.meths
describe('server -> client', function()
@@ -144,11 +146,11 @@ describe('server -> client', function()
end
before_each(function()
- nvim('command', "let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
+ command("let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
neq(0, eval('vim'))
end)
- after_each(function() nvim('command', 'call rpcstop(vim)') end)
+ after_each(function() command('call rpcstop(vim)') end)
it('can send/recieve notifications and make requests', function()
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
@@ -181,4 +183,42 @@ describe('server -> client', function()
eq(true, string.match(err, ': (.*)') == 'Failed to evaluate expression')
end)
end)
+
+ describe('when using jobstart', function()
+ local jobid
+ before_each(function()
+ local channel = nvim('get_api_info')[1]
+ nvim('set_var', 'channel', channel)
+ source([[
+ function! s:OnEvent(id, data, event)
+ call rpcnotify(g:channel, a:event, 0, a:data)
+ endfunction
+ let g:job_opts = {
+ \ 'on_stderr': function('s:OnEvent'),
+ \ 'on_exit': function('s:OnEvent'),
+ \ 'user': 0,
+ \ 'rpc': v:true
+ \ }
+ ]])
+ local lua_prog = arg[-1]
+ meths.set_var("args", {lua_prog, 'test/functional/api/rpc_fixture.lua'})
+ jobid = eval("jobstart(g:args, g:job_opts)")
+ neq(0, 'jobid')
+ end)
+
+ after_each(function()
+ funcs.jobstop(jobid)
+ end)
+
+ it('rpc and text stderr can be combined', function()
+ eq("ok",funcs.rpcrequest(jobid, "poll"))
+ funcs.rpcnotify(jobid, "ping")
+ eq({'notification', 'pong', {}}, next_message())
+ eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
+ eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message())
+ funcs.rpcrequest(jobid, "exit")
+ eq({'notification', 'exit', {0, 0}}, next_message())
+ end)
+ end)
+
end)
diff --git a/test/functional/core/job_spec.lua b/test/functional/core/job_spec.lua
index 1d11374e4d..921bf1655e 100644
--- a/test/functional/core/job_spec.lua
+++ b/test/functional/core/job_spec.lua
@@ -5,6 +5,7 @@ local clear, eq, eval, execute, feed, insert, neq, next_msg, nvim,
helpers.insert, helpers.neq, helpers.next_message, helpers.nvim,
helpers.nvim_dir, helpers.ok, helpers.source,
helpers.write_file, helpers.mkdir, helpers.rmdir
+local command = helpers.command
local Screen = require('test.functional.ui.screen')
@@ -429,6 +430,13 @@ describe('jobs', function()
eq({'notification', 'j', {0, {jobid, 'exit'}}}, next_msg())
end)
+ it('cannot have both rpc and pty options', function()
+ command("let g:job_opts.pty = v:true")
+ command("let g:job_opts.rpc = v:true")
+ local _, err = pcall(command, "let j = jobstart(['cat', '-'], g:job_opts)")
+ ok(string.find(err, "E475: Invalid argument: job cannot have both 'pty' and 'rpc' options set") ~= nil)
+ end)
+
describe('running tty-test program', function()
local function next_chunk()
local rv