diff options
| author | Björn Linse <bjorn.linse@gmail.com> | 2016-05-12 22:25:15 +0200 | 
|---|---|---|
| committer | Björn Linse <bjorn.linse@gmail.com> | 2016-08-20 12:55:35 +0200 | 
| commit | 2d60a15e25f487eda1ac00a9e6cdf9a6564fb416 (patch) | |
| tree | 7844b2d185b27a0303183e90792a5ef807933e88 | |
| parent | 215922120c43163f4e1cc00851bd1b86890d3a28 (diff) | |
| download | rneovim-2d60a15e25f487eda1ac00a9e6cdf9a6564fb416.tar.gz rneovim-2d60a15e25f487eda1ac00a9e6cdf9a6564fb416.tar.bz2 rneovim-2d60a15e25f487eda1ac00a9e6cdf9a6564fb416.zip | |
job control: reuse common job code for rpc jobs
This makes stderr and exit callbacks work for rpc jobs
| -rw-r--r-- | runtime/autoload/provider/pythonx.vim | 21 | ||||
| -rw-r--r-- | runtime/autoload/provider/ruby.vim | 18 | ||||
| -rw-r--r-- | runtime/autoload/remote/host.vim | 5 | ||||
| -rw-r--r-- | runtime/doc/eval.txt | 49 | ||||
| -rw-r--r-- | runtime/doc/msgpack_rpc.txt | 34 | ||||
| -rw-r--r-- | src/nvim/eval.c | 116 | ||||
| -rw-r--r-- | src/nvim/globals.h | 3 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 73 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 1 | ||||
| -rw-r--r-- | test/functional/api/rpc_fixture.lua | 38 | ||||
| -rw-r--r-- | test/functional/api/server_requests_spec.lua | 46 | ||||
| -rw-r--r-- | test/functional/core/job_spec.lua | 8 | 
12 files changed, 272 insertions, 140 deletions
| diff --git a/runtime/autoload/provider/pythonx.vim b/runtime/autoload/provider/pythonx.vim index 0ebf00112f..6d6b38978c 100644 --- a/runtime/autoload/provider/pythonx.vim +++ b/runtime/autoload/provider/pythonx.vim @@ -5,11 +5,24 @@ endif  let s:loaded_pythonx_provider = 1 +let s:stderr = {} +let s:job_opts = {'rpc': v:true} + +" TODO(bfredl): this logic is common and should be builtin +function! s:job_opts.on_stderr(chan_id, data, event) +  let stderr = get(s:stderr, a:chan_id, ['']) +  let last = remove(stderr, -1) +  let a:data[0] = last.a:data[0] +  call extend(stderr, a:data) +  let s:stderr[a:chan_id] = stderr +endfunction +  function! provider#pythonx#Require(host) abort    let ver = (a:host.orig_name ==# 'python') ? 2 : 3    " Python host arguments -  let args = ['-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()'] +  let prog = (ver == '2' ?  provider#python#Prog() : provider#python3#Prog()) +  let args = [prog, '-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']    " Collect registered Python plugins into args    let python_plugins = remote#host#PluginsForHost(a:host.name) @@ -18,14 +31,16 @@ function! provider#pythonx#Require(host) abort    endfor    try -    let channel_id = rpcstart((ver ==# '2' ? -          \ provider#python#Prog() : provider#python3#Prog()), args) +    let channel_id = jobstart(args, s:job_opts)      if rpcrequest(channel_id, 'poll') ==# 'ok'        return channel_id      endif    catch      echomsg v:throwpoint      echomsg v:exception +    for row in get(s:stderr, channel_id, []) +      echomsg row +    endfor    endtry    throw remote#host#LoadErrorForHost(a:host.orig_name,          \ '$NVIM_PYTHON_LOG_FILE') diff --git a/runtime/autoload/provider/ruby.vim b/runtime/autoload/provider/ruby.vim index e9130b98c1..c8ede20a75 100644 --- a/runtime/autoload/provider/ruby.vim +++ b/runtime/autoload/provider/ruby.vim @@ -4,6 +4,17 @@ if exists('g:loaded_ruby_provider')  endif  let g:loaded_ruby_provider = 1 +let s:stderr = {} +let s:job_opts = {'rpc': v:true} + +function! s:job_opts.on_stderr(chan_id, data, event) +  let stderr = get(s:stderr, a:chan_id, ['']) +  let last = remove(stderr, -1) +  let a:data[0] = last.a:data[0] +  call extend(stderr, a:data) +  let s:stderr[a:chan_id] = stderr +endfunction +  function! provider#ruby#Detect() abort    return exepath('neovim-ruby-host')  endfunction @@ -13,7 +24,7 @@ function! provider#ruby#Prog()  endfunction  function! provider#ruby#Require(host) abort -  let args = [] +  let args = [provider#ruby#Prog()]    let ruby_plugins = remote#host#PluginsForHost(a:host.name)    for plugin in ruby_plugins @@ -21,13 +32,16 @@ function! provider#ruby#Require(host) abort    endfor    try -    let channel_id = rpcstart(provider#ruby#Prog(), args) +    let channel_id = jobstart(args, s:job_opts)      if rpcrequest(channel_id, 'poll') ==# 'ok'        return channel_id      endif    catch      echomsg v:throwpoint      echomsg v:exception +    for row in get(s:stderr, channel_id, []) +      echomsg row +    endfor    endtry    throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_RUBY_LOG_FILE')  endfunction diff --git a/runtime/autoload/remote/host.vim b/runtime/autoload/remote/host.vim index 5948de2b3d..065644121a 100644 --- a/runtime/autoload/remote/host.vim +++ b/runtime/autoload/remote/host.vim @@ -261,9 +261,8 @@ function! remote#host#LoadErrorForHost(host, log) abort          \ 'You can try to see what happened '.          \ 'by starting Neovim with the environment variable '.          \ a:log . ' set to a file and opening the generated '. -        \ 'log file. Also, the host stderr will be available '. -        \ 'in Neovim log, so it may contain useful information. '. -        \ 'See also ~/.nvimlog.' +        \ 'log file. Also, the host stderr is available '. +        \ 'in messages.'  endfunction diff --git a/runtime/doc/eval.txt b/runtime/doc/eval.txt index 3fa5474a7e..41373c135d 100644 --- a/runtime/doc/eval.txt +++ b/runtime/doc/eval.txt @@ -2043,7 +2043,6 @@ rpcnotify({channel}, {event}[, {args}...])  				Sends an |RPC| notification to {channel}  rpcrequest({channel}, {method}[, {args}...])  				Sends an |RPC| request to {channel} -rpcstart({prog}[, {argv}])	Spawns {prog} and opens an |RPC| channel  rpcstop({channel})		Closes an |RPC| {channel}  screenattr({row}, {col})	Number	attribute at screen position  screenchar({row}, {col})	Number	character at screen position @@ -4395,8 +4394,10 @@ items({dict})						*items()*  		order.  jobclose({job}[, {stream}])				{Nvim} *jobclose()* -		Close {job}'s {stream}, which can be one "stdin", "stdout" or -		"stderr". If {stream} is omitted, all streams are closed. +		Close {job}'s {stream}, which can be one of "stdin", "stdout", +		"stderr" or "rpc" (closes the rpc channel for a job started +		with the "rpc" option.) If {stream} is omitted, all streams +		are closed.  jobpid({job})						{Nvim} *jobpid()*  		Return the pid (process id) of {job}. @@ -4418,6 +4419,10 @@ jobsend({job}, {data})					{Nvim} *jobsend()*  			:call jobsend(j, ["abc", "123\n456", ""])  < 		will send "abc<NL>123<NUL>456<NL>". +		If the job was started with the rpc option this function +		cannot be used, instead use |rpcnotify()| and |rpcrequest()| +		to communicate with the job. +  jobstart({cmd}[, {opts}])				{Nvim} *jobstart()*  		Spawns {cmd} as a job.  If {cmd} is a |List| it is run  		directly.  If {cmd} is a |String| it is processed like this: > @@ -4433,9 +4438,14 @@ jobstart({cmd}[, {opts}])				{Nvim} *jobstart()*  		  on_exit  : exit event handler (function name or |Funcref|)  		  cwd      : Working directory of the job; defaults to  		             |current-directory|. +		  rpc      : If set, |msgpack-rpc| will be used to communicate +			     with the job over stdin and stdout. "on_stdout" is +			     then ignored, but "on_stderr" can still be used.  		  pty      : If set, the job will be connected to a new pseudo -		             terminal, and the job streams are connected to -		             the master file descriptor. +			     terminal, and the job streams are connected to +			     the master file descriptor. "on_stderr" is ignored +			     as all output will be received on stdout. +  		  width    : (pty only) Width of the terminal screen  		  height   : (pty only) Height of the terminal screen  		  TERM     : (pty only) $TERM environment variable @@ -4447,10 +4457,12 @@ jobstart({cmd}[, {opts}])				{Nvim} *jobstart()*  		{opts} is passed as |self| to the callback; the caller may  		pass arbitrary data by setting other keys.  		Returns: -		  - job ID on success, used by |jobsend()| and |jobstop()| +		  - The job ID on success, which is used by |jobsend()| (or +		    |rpcnotify()| and |rpcrequest()| if "rpc" option was used) +		    and |jobstop()|  		  - 0 on invalid arguments or if the job table is full  		  - -1 if {cmd}[0] is not executable. -		See |job-control| for more information. +		See |job-control| and |msgpack-rpc| for more information.  jobstop({job})						{Nvim} *jobstop()*  		Stop a job created with |jobstart()| by sending a `SIGTERM` @@ -5649,19 +5661,20 @@ rpcrequest({channel}, {method}[, {args}...])		 {Nvim} *rpcrequest()*  			:let result = rpcrequest(rpc_chan, "func", 1, 2, 3)  rpcstart({prog}[, {argv}])				   {Nvim} *rpcstart()* -		Spawns {prog} as a job (optionally passing the list {argv}), -		and opens an |RPC| channel with the spawned process's -		stdin/stdout. Returns: -		  - channel id on success, which is used by |rpcrequest()|, -		    |rpcnotify()| and |rpcstop()| -		  - 0 on failure -		Example: > -			:let rpc_chan = rpcstart('prog', ['arg1', 'arg2']) +		Deprecated. Replace  > +			:let id = rpcstart('prog', ['arg1', 'arg2']) +<		with > +			:let id = jobstart(['prog', 'arg1', 'arg2'], +					   {'rpc': v:true})  rpcstop({channel})					    {Nvim} *rpcstop()* -		Closes an |RPC| {channel}, possibly created via -		|rpcstart()|. Also closes channels created by connections to -		|v:servername|. +		Closes an |RPC| {channel}.  If the channel is a job +		started with |jobstart()|  the job is killed. +		It is better to use |jobstop()| in this case, or use +		|jobclose|(id, "rpc") to only close the channel without +		killing the job. +		Closes the socket connection if the channel was opened by +		connecting to |v:servername|.  screenattr(row, col)						*screenattr()*  		Like screenchar(), but return the attribute.  This is a rather diff --git a/runtime/doc/msgpack_rpc.txt b/runtime/doc/msgpack_rpc.txt index cfd9084cfc..18c0ff8a58 100644 --- a/runtime/doc/msgpack_rpc.txt +++ b/runtime/doc/msgpack_rpc.txt @@ -11,7 +11,6 @@ RPC API for Nvim				     *RPC* *rpc* *msgpack-rpc*  3. Connecting			|rpc-connecting|  4. Clients			|rpc-api-client|  5. Types			|rpc-types| -6. Vimscript functions		|rpc-vim-functions|  ==============================================================================  1. Introduction						            *rpc-intro* @@ -66,12 +65,16 @@ To get a formatted dump of the API using python (requires the `pyyaml` and  ==============================================================================  3. Connecting						      *rpc-connecting* -There are several ways to open a msgpack-rpc stream to an Nvim server: +There are several ways to open a msgpack-rpc channel to an Nvim instance:    1. Through stdin/stdout when `nvim` is started with `--embed`. This is how       applications can embed Nvim. -  2. Through stdin/stdout of some other process spawned by |rpcstart()|. +  2. Through stdin/stdout of some other process spawned by |jobstart()|. +     Set the "rpc" key to |v:true| in the options dict to use the job's stdin +     and stdout as a single msgpack channel that is processed directly by +     Nvim.  Then it is not possible to process raw data to or from the +     process's stdin and stdout. stderr can still be used, though.    3. Through the socket automatically created with each instance. The socket       location is stored in |v:servername|. @@ -110,11 +113,12 @@ functions can be called interactively:      >>> nvim = attach('socket', path='[address]')      >>> nvim.command('echo "hello world!"')  < -You can also embed an Nvim instance via |rpcstart()| +You can also embed an Nvim instance via |jobstart()|, and communicate using +|rpcrequest()| and |rpcnotify()|:  > -    let vim = rpcstart('nvim', ['--embed']) +    let vim = jobstart(['nvim', '--embed'], {'rpc': v:true})      echo rpcrequest(vim, 'vim_eval', '"Hello " . "world!"') -    call rpcstop(vim) +    call jobstop(vim)  <  ==============================================================================  4. Implementing API clients			*rpc-api-client* *api-client* @@ -234,22 +238,4 @@ the type codes, because a client may be built against one Nvim version but  connect to another with different type codes.  ============================================================================== -6. Vimscript functions				           *rpc-vim-functions* - -RPC functions are available in Vimscript: - -  1. |rpcstart()|: Similarly to |jobstart()|, this will spawn a co-process -     with its standard handles connected to Nvim. The difference is that it's -     not possible to process raw data to or from the process's stdin, stdout, -     or stderr.  This is because the job's stdin and stdout are used as -     a single msgpack channel that is processed directly by Nvim. -  2. |rpcstop()|: Same as |jobstop()|, but operates on handles returned by -     |rpcstart()|. -  3. |rpcrequest()|: Sends a msgpack-rpc request to the process. -  4. |rpcnotify()|: Sends a msgpack-rpc notification to the process. - -|rpcrequest()| and |rpcnotify()| can also be used with channels connected to -a nvim server. |v:servername| - -==============================================================================   vim:tw=78:ts=8:noet:ft=help:norl: diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 2a6adbdc8b..dce24230b0 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -408,6 +408,7 @@ typedef struct {    Terminal *term;    bool stopped;    bool exited; +  bool rpc;    int refcount;    ufunc_T *on_stdout, *on_stderr, *on_exit;    dict_T *self; @@ -448,8 +449,7 @@ typedef struct {  #define FNE_INCL_BR     1       /* find_name_end(): include [] in name */  #define FNE_CHECK_START 2       /* find_name_end(): check name starts with                                     valid character */ -static uint64_t current_job_id = 1; -static PMap(uint64_t) *jobs = NULL;  +static PMap(uint64_t) *jobs = NULL;  static uint64_t last_timer_id = 0;  static PMap(uint64_t) *timers = NULL; @@ -11724,16 +11724,35 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)    if (argvars[1].v_type == VAR_STRING) {      char *stream = (char *)argvars[1].vval.v_string;      if (!strcmp(stream, "stdin")) { -      process_close_in(proc); +      if (data->rpc) { +        EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); +      } else { +        process_close_in(proc); +      }      } else if (!strcmp(stream, "stdout")) { -      process_close_out(proc); +      if (data->rpc) { +        EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); +      } else { +        process_close_out(proc); +      }      } else if (!strcmp(stream, "stderr")) {        process_close_err(proc); +    } else if (!strcmp(stream, "rpc")) { +      if (data->rpc) { +        channel_close(data->id); +      } else { +        EMSG(_("Invalid job stream: Not an rpc job")); +      }      } else {        EMSG2(_("Invalid job stream \"%s\""), stream);      }    } else { -    process_close_streams(proc); +    if (data->rpc) { +      channel_close(data->id); +      process_close_err(proc); +    } else { +      process_close_streams(proc); +    }    }  } @@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)      return;    } +  if (data->rpc) { +    EMSG(_("Can't send raw data to rpc channel")); +    return; +  } +    ssize_t input_len;    char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false);    if (!input) { @@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)      return;    } +    dict_T *job_opts = NULL; +  bool detach = false, rpc = false, pty = false;    ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL;    char *cwd = NULL;    if (argvars[1].v_type == VAR_DICT) {      job_opts = argvars[1].vval.v_dict; +    detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0; +    rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0; +    pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0; +    if (pty && rpc) { +      EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set"); +      shell_free_argv(argv); +      return; +    } +      char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false);      if (new_cwd && strlen(new_cwd) > 0) {        cwd = new_cwd; @@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)      }    } -  bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0; -  bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0;    TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, -                                          job_opts, pty, detach, cwd); +                                          job_opts, pty, rpc, detach, cwd);    Process *proc = (Process *)&data->proc;    if (pty) { @@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)      }    } -  if (!on_stdout) { +  if (!rpc && !on_stdout) {      proc->out = NULL;    }    if (!on_stderr) { @@ -14105,7 +14138,7 @@ end:    api_free_object(result);  } -// "rpcstart()" function +// "rpcstart()" function (DEPRECATED)  static void f_rpcstart(typval_T *argvars, typval_T *rettv)  {    rettv->v_type = VAR_NUMBER; @@ -14158,32 +14191,27 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)    // The last item of argv must be NULL    argv[i] = NULL; -  uint64_t channel_id = channel_from_process(argv); -  if (!channel_id) { -    EMSG(_(e_api_spawn_failed)); -  } - -  rettv->vval.v_number = (varnumber_T)channel_id; +  TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL, +                                          NULL, false, true, false, NULL); +  common_job_start(data, rettv);  }  // "rpcstop()" function  static void f_rpcstop(typval_T *argvars, typval_T *rettv)  { -  rettv->v_type = VAR_NUMBER; -  rettv->vval.v_number = 0; - -  if (check_restricted() || check_secure()) { -    return; -  } -    if (argvars[0].v_type != VAR_NUMBER) {      // Wrong argument types      EMSG(_(e_invarg));      return;    } -  rettv->vval.v_number = channel_close(argvars[0].vval.v_number); +  // if called with a job, stop it, else closes the channel +  if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { +    f_jobstop(argvars, rettv); +  } else { +    rettv->vval.v_number = channel_close(argvars[0].vval.v_number); +  }  }  /* @@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)    }    TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, -                                          job_opts, true, false, cwd); +                                          job_opts, true, false, false, cwd);    data->proc.pty.width = curwin->w_width;    data->proc.pty.height = curwin->w_height;    data->proc.pty.term_name = xstrdup("xterm-256color"); @@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv,                                                 ufunc_T *on_exit,                                                 dict_T *self,                                                 bool pty, +                                               bool rpc,                                                 bool detach,                                                 char *cwd)  { @@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv,    data->on_exit = on_exit;    data->self = self;    data->events = queue_new_child(main_loop.events); +  data->rpc = rpc;    if (pty) {      data->proc.pty = pty_process_init(&main_loop, data);    } else { @@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv,    return data;  } -/// Return true/false on success/failure. +/// common code for getting job callbacks for jobstart, termopen and rpcstart +/// +/// @return true/false on success/failure.  static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,                                          ufunc_T **on_stderr, ufunc_T **on_exit)  { @@ -22174,12 +22206,19 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)    }    xfree(cmd); -  data->id = current_job_id++; -  wstream_init(proc->in, 0); -  if (proc->out) { -    rstream_init(proc->out, 0); -    rstream_start(proc->out, on_job_stdout, data); +  data->id = next_chan_id++; + +  if (data->rpc) { +    // the rpc channel takes over the in and out streams +    channel_from_process(proc, data->id); +  } else { +    wstream_init(proc->in, 0); +    if (proc->out) { +      rstream_init(proc->out, 0); +      rstream_start(proc->out, on_job_stdout, data); +    }    } +    if (proc->err) {      rstream_init(proc->err, 0);      rstream_start(proc->err, on_job_stderr, data); @@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d)      snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);      terminal_close(data->term, msg);    } +  if (data->rpc) { +    channel_process_exit(data->id, status); +  }    if (data->status_ptr) {      *data->status_ptr = status;    }    process_job_event(data, data->on_exit, "exit", NULL, 0, status); + +  pmap_del(uint64_t)(jobs, data->id); +  term_job_data_decref(data);  }  static void term_write(char *buf, size_t size, void *d) @@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data)  static void on_job_event(JobEvent *ev)  {    if (!ev->callback) { -    goto end; +    return;    }    typval_T argv[3]; @@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev)    call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum,        curwin->w_cursor.lnum, ev->data->self);    clear_tv(&rettv); - -end: -  if (!ev->received) { -    // exit event, safe to free job data now -    pmap_del(uint64_t)(jobs, ev->data->id); -    term_job_data_decref(ev->data); -  }  }  static TerminalJobData *find_job(uint64_t id) diff --git a/src/nvim/globals.h b/src/nvim/globals.h index 950ceb4c74..4c014010c2 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -1244,6 +1244,9 @@ EXTERN char *ignoredp;  // If a msgpack-rpc channel should be started over stdin/stdout  EXTERN bool embedded_mode INIT(= false); +/// next free id for a job or rpc channel +EXTERN uint64_t next_chan_id INIT(= 1); +  /// Used to track the status of external functions.  /// Currently only used for iconv().  typedef enum { diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index c3378783f2..8b5f212d66 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -19,6 +19,7 @@  #include "nvim/main.h"  #include "nvim/ascii.h"  #include "nvim/memory.h" +#include "nvim/eval.h"  #include "nvim/os_unix.h"  #include "nvim/message.h"  #include "nvim/map.h" @@ -55,12 +56,7 @@ typedef struct {    msgpack_unpacker *unpacker;    union {      Stream stream; -    struct { -      LibuvProcess uvproc; -      Stream in; -      Stream out; -      Stream err; -    } process; +    Process *proc;      struct {        Stream in;        Stream out; @@ -79,7 +75,6 @@ typedef struct {    uint64_t request_id;  } RequestEvent; -static uint64_t next_id = 1;  static PMap(uint64_t) *channels = NULL;  static PMap(cstr_t) *event_strings = NULL;  static msgpack_sbuffer out_buffer; @@ -112,33 +107,20 @@ void channel_teardown(void)  }  /// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is forwarded to the editor error stream. +/// stdin/stdout. stderr is handled by the job infrastructure.  ///  /// @param argv The argument vector for the process. [consumed]  /// @return The channel id (> 0), on success.  ///         0, on error. -uint64_t channel_from_process(char **argv) -{ -  Channel *channel = register_channel(kChannelTypeProc); -  channel->data.process.uvproc = libuv_process_init(&main_loop, channel); -  Process *proc = &channel->data.process.uvproc.process; -  proc->argv = argv; -  proc->in = &channel->data.process.in; -  proc->out = &channel->data.process.out; -  proc->err = &channel->data.process.err; -  proc->cb = process_exit; -  if (!process_spawn(proc)) { -    loop_poll_events(&main_loop, 0); -    decref(channel); -    return 0; -  } - +uint64_t channel_from_process(Process *proc, uint64_t id) +{ +  Channel *channel = register_channel(kChannelTypeProc, id, proc->events);    incref(channel);  // process channels are only closed by the exit_cb +  channel->data.proc = proc; +    wstream_init(proc->in, 0);    rstream_init(proc->out, 0);    rstream_start(proc->out, parse_msgpack, channel); -  rstream_init(proc->err, 0); -  rstream_start(proc->err, forward_stderr, channel);    return channel->id;  } @@ -148,7 +130,7 @@ uint64_t channel_from_process(char **argv)  /// @param watcher The SocketWatcher ready to accept the connection  void channel_from_connection(SocketWatcher *watcher)  { -  Channel *channel = register_channel(kChannelTypeSocket); +  Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);    socket_watcher_accept(watcher, &channel->data.stream);    incref(channel);  // close channel only after the stream is closed    channel->data.stream.internal_close_cb = close_cb; @@ -314,7 +296,7 @@ bool channel_close(uint64_t id)  /// Neovim  void channel_from_stdio(void)  { -  Channel *channel = register_channel(kChannelTypeStdio); +  Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);    incref(channel);  // stdio channels are only closed on exit    // read stream    rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); @@ -323,20 +305,12 @@ void channel_from_stdio(void)    wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);  } -static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, -    void *data, bool eof) +void channel_process_exit(uint64_t id, int status)  { -  while (rbuffer_size(rbuf)) { -    char buf[256]; -    size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); -    buf[read] = NUL; -    ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); -  } -} +  Channel *channel = pmap_get(uint64_t)(channels, id); -static void process_exit(Process *proc, int status, void *data) -{ -  decref(data); +  channel->closed = true; +  decref(channel);  }  static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, @@ -511,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)        success = wstream_write(&channel->data.stream, buffer);        break;      case kChannelTypeProc: -      success = wstream_write(&channel->data.process.in, buffer); +      success = wstream_write(channel->data.proc->in, buffer);        break;      case kChannelTypeStdio:        success = wstream_write(&channel->data.std.out, buffer); @@ -639,9 +613,10 @@ static void close_channel(Channel *channel)        stream_close(&channel->data.stream, NULL, NULL);        break;      case kChannelTypeProc: -      if (!channel->data.process.uvproc.process.closed) { -        process_stop(&channel->data.process.uvproc.process); -      } +      // Only close the rpc channel part, +      // there could be an error message on the stderr stream +      process_close_in(channel->data.proc); +      process_close_out(channel->data.proc);        break;      case kChannelTypeStdio:        stream_close(&channel->data.std.in, NULL, NULL); @@ -679,7 +654,9 @@ static void free_channel(Channel *channel)    pmap_free(cstr_t)(channel->subscribed_events);    kv_destroy(channel->call_stack);    kv_destroy(channel->delayed_notifications); -  queue_free(channel->events); +  if (channel->type != kChannelTypeProc) { +    queue_free(channel->events); +  }    xfree(channel);  } @@ -688,15 +665,15 @@ static void close_cb(Stream *stream, void *data)    decref(data);  } -static Channel *register_channel(ChannelType type) +static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)  {    Channel *rv = xmalloc(sizeof(Channel)); -  rv->events = queue_new_child(main_loop.events); +  rv->events = events ? events : queue_new_child(main_loop.events);    rv->type = type;    rv->refcount = 1;    rv->closed = false;    rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); -  rv->id = next_id++; +  rv->id = id > 0 ? id : next_chan_id++;    rv->pending_requests = 0;    rv->subscribed_events = pmap_new(cstr_t)();    rv->next_request_id = 1; diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index 104547a7b8..0d92976d02 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -6,6 +6,7 @@  #include "nvim/api/private/defs.h"  #include "nvim/event/socket.h" +#include "nvim/event/process.h"  #include "nvim/vim.h"  #define METHOD_MAXLEN 512 diff --git a/test/functional/api/rpc_fixture.lua b/test/functional/api/rpc_fixture.lua new file mode 100644 index 0000000000..423864740f --- /dev/null +++ b/test/functional/api/rpc_fixture.lua @@ -0,0 +1,38 @@ +local deps_prefix = './.deps/usr' +if os.getenv('DEPS_PREFIX') then +  deps_prefix = os.getenv('DEPS_PREFIX') +end + +package.path = deps_prefix .. '/share/lua/5.1/?.lua;' .. +               deps_prefix .. '/share/lua/5.1/?/init.lua;' .. +               package.path + +package.cpath = deps_prefix .. '/lib/lua/5.1/?.so;' .. +                package.cpath + +local mpack = require('mpack') +local StdioStream = require('nvim.stdio_stream') +local Session = require('nvim.session') + +local stdio_stream = StdioStream.open() +local session = Session.new(stdio_stream) + +local function on_request(method, args) +  if method == 'poll' then +    return 'ok' +  elseif method == 'write_stderr' then +    io.stderr:write(args[1]) +    return "done!" +  elseif method == "exit" then +    session:stop() +    return mpack.NIL +  end +end + +local function on_notification(event, args) +  if event == 'ping' and #args == 0 then +    session:notify("vim_eval", "rpcnotify(g:channel, 'pong')") +  end +end + +session:run(on_request, on_notification) diff --git a/test/functional/api/server_requests_spec.lua b/test/functional/api/server_requests_spec.lua index eb63834cb0..b76c3b9cd6 100644 --- a/test/functional/api/server_requests_spec.lua +++ b/test/functional/api/server_requests_spec.lua @@ -4,7 +4,9 @@  local helpers = require('test.functional.helpers')(after_each)  local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval  local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop -local nvim_prog = helpers.nvim_prog +local nvim_prog, command, funcs = helpers.nvim_prog, helpers.command, helpers.funcs +local source, next_message = helpers.source, helpers.next_message +local meths = helpers.meths  describe('server -> client', function() @@ -144,11 +146,11 @@ describe('server -> client', function()      end      before_each(function() -      nvim('command', "let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])") +      command("let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")        neq(0, eval('vim'))      end) -    after_each(function() nvim('command', 'call rpcstop(vim)') end) +    after_each(function() command('call rpcstop(vim)') end)      it('can send/recieve notifications and make requests', function()        nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')") @@ -181,4 +183,42 @@ describe('server -> client', function()        eq(true, string.match(err, ': (.*)') == 'Failed to evaluate expression')      end)    end) + +  describe('when using jobstart', function() +    local jobid +    before_each(function() +      local channel = nvim('get_api_info')[1] +      nvim('set_var', 'channel', channel) +      source([[ +        function! s:OnEvent(id, data, event) +          call rpcnotify(g:channel, a:event, 0, a:data) +        endfunction +        let g:job_opts = { +        \ 'on_stderr': function('s:OnEvent'), +        \ 'on_exit': function('s:OnEvent'), +        \ 'user': 0, +        \ 'rpc': v:true +        \ } +      ]]) +      local lua_prog = arg[-1] +      meths.set_var("args", {lua_prog, 'test/functional/api/rpc_fixture.lua'}) +      jobid = eval("jobstart(g:args, g:job_opts)") +      neq(0, 'jobid') +    end) + +    after_each(function() +      funcs.jobstop(jobid) +    end) + +    it('rpc and text stderr can be combined', function() +      eq("ok",funcs.rpcrequest(jobid, "poll")) +      funcs.rpcnotify(jobid, "ping") +      eq({'notification', 'pong', {}}, next_message()) +      eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n")) +      eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message()) +      funcs.rpcrequest(jobid, "exit") +      eq({'notification', 'exit', {0, 0}}, next_message()) +    end) +  end) +  end) diff --git a/test/functional/core/job_spec.lua b/test/functional/core/job_spec.lua index 1d11374e4d..921bf1655e 100644 --- a/test/functional/core/job_spec.lua +++ b/test/functional/core/job_spec.lua @@ -5,6 +5,7 @@ local clear, eq, eval, execute, feed, insert, neq, next_msg, nvim,    helpers.insert, helpers.neq, helpers.next_message, helpers.nvim,    helpers.nvim_dir, helpers.ok, helpers.source,    helpers.write_file, helpers.mkdir, helpers.rmdir +local command = helpers.command  local Screen = require('test.functional.ui.screen') @@ -429,6 +430,13 @@ describe('jobs', function()      eq({'notification', 'j', {0, {jobid, 'exit'}}}, next_msg())    end) +  it('cannot have both rpc and pty options', function() +    command("let g:job_opts.pty = v:true") +    command("let g:job_opts.rpc = v:true") +    local _, err = pcall(command, "let j = jobstart(['cat', '-'], g:job_opts)") +    ok(string.find(err, "E475: Invalid argument: job cannot have both 'pty' and 'rpc' options set") ~= nil) +  end) +    describe('running tty-test program', function()      local function next_chunk()        local rv | 
