aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--runtime/autoload/provider.vim20
-rw-r--r--runtime/autoload/provider/clipboard.vim6
-rw-r--r--runtime/autoload/provider/node.vim9
-rw-r--r--runtime/autoload/provider/pythonx.vim9
-rw-r--r--runtime/doc/channel.txt168
-rw-r--r--runtime/doc/deprecated.txt2
-rw-r--r--runtime/doc/eval.txt140
-rw-r--r--runtime/doc/job_control.txt38
-rw-r--r--runtime/doc/msgpack_rpc.txt29
-rw-r--r--runtime/doc/nvim_terminal_emulator.txt4
-rw-r--r--runtime/doc/options.txt6
-rw-r--r--runtime/doc/starting.txt2
-rw-r--r--src/nvim/api/ui.c2
-rw-r--r--src/nvim/api/vim.c4
-rw-r--r--src/nvim/buffer_defs.h1
-rw-r--r--src/nvim/channel.c752
-rw-r--r--src/nvim/channel.h134
-rw-r--r--src/nvim/eval.c820
-rw-r--r--src/nvim/eval.h4
-rw-r--r--src/nvim/eval.lua7
-rw-r--r--src/nvim/eval/typval.c27
-rw-r--r--src/nvim/event/libuv_process.c12
-rw-r--r--src/nvim/event/process.c123
-rw-r--r--src/nvim/event/process.h16
-rw-r--r--src/nvim/event/rstream.c28
-rw-r--r--src/nvim/event/stream.c7
-rw-r--r--src/nvim/event/stream.h8
-rw-r--r--src/nvim/globals.h18
-rw-r--r--src/nvim/if_cscope.c1
-rw-r--r--src/nvim/main.c35
-rw-r--r--src/nvim/misc1.c5
-rw-r--r--src/nvim/msgpack_rpc/channel.c454
-rw-r--r--src/nvim/msgpack_rpc/channel.h2
-rw-r--r--src/nvim/msgpack_rpc/channel_defs.h36
-rw-r--r--src/nvim/option.c6
-rw-r--r--src/nvim/option_defs.h1
-rw-r--r--src/nvim/options.lua8
-rw-r--r--src/nvim/os/input.c2
-rw-r--r--src/nvim/os/pty_process_unix.c31
-rw-r--r--src/nvim/os/pty_process_win.c12
-rw-r--r--src/nvim/os/shell.c31
-rw-r--r--src/nvim/os_unix.c4
-rw-r--r--src/nvim/rbuffer.c2
-rw-r--r--src/nvim/terminal.c5
-rw-r--r--test/functional/api/server_requests_spec.lua1
-rw-r--r--test/functional/core/channels_spec.lua266
-rw-r--r--test/functional/core/job_spec.lua50
-rw-r--r--test/functional/helpers.lua45
-rw-r--r--test/helpers.lua17
49 files changed, 2170 insertions, 1240 deletions
diff --git a/runtime/autoload/provider.vim b/runtime/autoload/provider.vim
deleted file mode 100644
index e6514f5ba8..0000000000
--- a/runtime/autoload/provider.vim
+++ /dev/null
@@ -1,20 +0,0 @@
-" Common functionality for providers
-
-let s:stderr = {}
-
-function! provider#stderr_collector(chan_id, data, event)
- let stderr = get(s:stderr, a:chan_id, [''])
- let stderr[-1] .= a:data[0]
- call extend(stderr, a:data[1:])
- let s:stderr[a:chan_id] = stderr
-endfunction
-
-function! provider#clear_stderr(chan_id)
- if has_key(s:stderr, a:chan_id)
- call remove(s:stderr, a:chan_id)
- endif
-endfunction
-
-function! provider#get_stderr(chan_id)
- return get(s:stderr, a:chan_id, [])
-endfunction
diff --git a/runtime/autoload/provider/clipboard.vim b/runtime/autoload/provider/clipboard.vim
index 6454a01c2a..381fe2cf2d 100644
--- a/runtime/autoload/provider/clipboard.vim
+++ b/runtime/autoload/provider/clipboard.vim
@@ -7,7 +7,7 @@ let s:clipboard = {}
" When caching is enabled, store the jobid of the xclip/xsel process keeping
" ownership of the selection, so we know how long the cache is valid.
-let s:selection = { 'owner': 0, 'data': [], 'on_stderr': function('provider#stderr_collector') }
+let s:selection = { 'owner': 0, 'data': [], 'stderr_buffered': v:true }
function! s:selection.on_exit(jobid, data, event) abort
" At this point this nvim instance might already have launched
@@ -16,12 +16,10 @@ function! s:selection.on_exit(jobid, data, event) abort
let self.owner = 0
endif
if a:data != 0
- let stderr = provider#get_stderr(a:jobid)
echohl WarningMsg
- echomsg 'clipboard: error invoking '.get(self.argv, 0, '?').': '.join(stderr)
+ echomsg 'clipboard: error invoking '.get(self.argv, 0, '?').': '.join(self.stderr)
echohl None
endif
- call provider#clear_stderr(a:jobid)
endfunction
let s:selections = { '*': s:selection, '+': copy(s:selection) }
diff --git a/runtime/autoload/provider/node.vim b/runtime/autoload/provider/node.vim
index b08ad4f316..419dd517cd 100644
--- a/runtime/autoload/provider/node.vim
+++ b/runtime/autoload/provider/node.vim
@@ -3,7 +3,7 @@ if exists('g:loaded_node_provider')
endif
let g:loaded_node_provider = 1
-let s:job_opts = {'rpc': v:true, 'on_stderr': function('provider#stderr_collector')}
+let s:job_opts = {'rpc': v:true, 'stderr_buffered': v:true}
function! provider#node#Detect() abort
return has('win32') ? exepath('neovim-node-host.cmd') : exepath('neovim-node-host')
@@ -32,19 +32,18 @@ function! provider#node#Require(host) abort
endif
try
- let channel_id = jobstart(args, s:job_opts)
+ let job = copy(s:job_opts)
+ let channel_id = jobstart(args, job)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
- for row in provider#get_stderr(channel_id)
+ for row in job.stderr
echomsg row
endfor
endtry
- finally
- call provider#clear_stderr(channel_id)
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_NODE_LOG_FILE')
endfunction
diff --git a/runtime/autoload/provider/pythonx.vim b/runtime/autoload/provider/pythonx.vim
index 7285ed43ea..1c77eabe23 100644
--- a/runtime/autoload/provider/pythonx.vim
+++ b/runtime/autoload/provider/pythonx.vim
@@ -5,7 +5,7 @@ endif
let s:loaded_pythonx_provider = 1
-let s:job_opts = {'rpc': v:true, 'on_stderr': function('provider#stderr_collector')}
+let s:job_opts = {'rpc': v:true, 'stderr_buffered': v:true}
function! provider#pythonx#Require(host) abort
let ver = (a:host.orig_name ==# 'python') ? 2 : 3
@@ -21,18 +21,17 @@ function! provider#pythonx#Require(host) abort
endfor
try
- let channel_id = jobstart(args, s:job_opts)
+ let job = copy(s:job_opts)
+ let channel_id = jobstart(args, job)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
- for row in provider#get_stderr(channel_id)
+ for row in job.stderr
echomsg row
endfor
- finally
- call provider#clear_stderr(channel_id)
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name,
\ '$NVIM_PYTHON_LOG_FILE')
diff --git a/runtime/doc/channel.txt b/runtime/doc/channel.txt
new file mode 100644
index 0000000000..c94c64eb84
--- /dev/null
+++ b/runtime/doc/channel.txt
@@ -0,0 +1,168 @@
+*channel.txt* Nvim
+
+
+ NVIM REFERENCE MANUAL by Thiago de Arruda
+
+
+Nvim's facilities for async io *channel*
+
+ Type <M-]> to see the table of contents.
+
+==============================================================================
+1. Introduction *channel-intro*
+
+Channels are nvim's way of communicating with external processes.
+
+There are several ways to open a channel:
+
+ 1. Through stdin/stdout when `nvim` is started with `--headless`, and a startup
+ script or --cmd command opens the stdio channel using |stdioopen()|.
+
+ 2. Through stdin, stdout and stderr of a process spawned by |jobstart()|.
+
+ 3. Through the PTY master end of a PTY opened with
+ `jobstart(..., {'pty': v:true})` or |termopen()|.
+
+ 4. By connecting to a TCP/IP socket or named pipe with |sockconnect()|.
+
+ 5. By another process connecting to a socket listened to by nvim. This only
+ supports RPC channels, see |rpc-connecting|.
+
+Channels support multiple modes or protocols. In the most basic
+mode of operation, raw bytes are read and written to the channel.
+The |rpc| protocol, based on the msgpack-rpc standard, enables nvim and the
+process at the other end to send remote calls and events to each other.
+Additionally, the builtin |terminal-emulator|, is implemented on top of PTY
+channels.
+
+==============================================================================
+2. Reading and writing raw bytes *channel-bytes*
+
+By default, channels opened by vimscript functions will operate with raw
+bytes. Additionally, for a job channel using rpc, bytes can still be
+read over its stderr. Similarily, only bytes can be written to nvim's own stderr.
+
+ *channel-callback* *buffered*
+ *on_stdout* *on_stderr* *on_stdin* *on_data*
+A callback function `on_{stream}` will be invoked with data read from the
+channel. By default, the callback will be invoked immediately when data is
+available, to facilitate interactive communication. The same callback will
+then be invoked with empty data, to indicate that the stream reached EOF.
+Alternatively the `{stream}_buffered` option can be set to invoke the callback
+only when the underlying stream reaches EOF, and will then be passed in
+complete output. This is helpful when only the complete output is useful, and
+not partial data. Futhermore if `{stream}_buffered` is set but not a callback,
+the data is saved in the options dict, with the stream name as key.
+
+- The arguments passed to the callback function are:
+
+ 0: The channel id
+ 1: the raw data read from the channel, formatted as a |readfile()|-style
+ list. If EOF occured, a single empty string `['']` will be passed in.
+ Note that the items in this list do not directly correspond to actual
+ lines in the output. See |channel-lines|
+ 2: Stream name as a string, like `"stdout"`. This is to allow multiple
+ on_{event} handlers to be implemented by the same function. The available
+ events depend on how the channel was opened and in what mode/protocol.
+
+ *channel-lines*
+ Note:
+ stream event handlers may receive partial (incomplete) lines. For a given
+ invocation of on_stdout etc, `a:data` is not guaranteed to end
+ with a newline.
+ - `abcdefg` may arrive as `['abc']`, `['defg']`.
+ - `abc\nefg` may arrive as `['abc', '']`, `['efg']` or `['abc']`,
+ `['','efg']`, or even `['ab']`, `['c','efg']`.
+
+ If you only are interested in complete output when the process exits,
+ use buffered mode. Otherwise, an easy way to deal with this:
+ initialize a list as `['']`, then append to it as follows: >
+ let s:chunks = ['']
+ func! s:on_event(job_id, data, event) dict
+ let s:chunks[-1] .= a:data[0]
+ call extend(s:chunks, a:data[1:])
+ endf
+<
+
+Additionally, if the callbacks are Dictionary functions, |self| can be used to
+refer to the options dictionary containing the callbacks. |Partial|s can also be
+used as callbacks.
+
+Data can be sent to the channel using the |chansend()| function. Here is a
+simple example, echoing some data through a cat-process:
+>
+ function! s:OnEvent(id, data, event) dict
+ let str = join(a:data, "\n")
+ echomsg str
+ endfunction
+ let id = jobstart(['cat'], {'on_stdout': function('s:OnEvent') } )
+ call chansend(id, "hello!")
+<
+
+Here is a example of setting a buffer to the result of grep, but only after
+all data has been processed:
+>
+ function! s:OnEvent(id, data, event) dict
+ call nvim_buf_set_lines(2, 0, -1, v:true, a:data)
+ endfunction
+ let id = jobstart(['grep', '^[0-9]'], { 'on_stdout': function('s:OnEvent'),
+ \ 'stdout_buffered':v:true } )
+
+ call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")
+ " no output is received, buffer is empty
+
+ call chansend(id, "xx\n20 GOTO 10\nzz\n")
+ call chanclose(id, 'stdin')
+ " now buffer has result
+<
+For additional examples with jobs, see |job-control|.
+
+ *channel-pty*
+A special case is PTY channels opened by `jobstart(..., {'pty': v:true})` .
+No preprocessing of ANSI escape sequences is done, these will be sent raw to
+the callback. However, change of PTY size can be signaled to the slave using
+|jobresize()|. See also |terminal-emulator|.
+
+==============================================================================
+3. Communicating using msgpack-rpc *channel-rpc*
+
+When channels are opened with the `rpc` option set to true, the channel can be
+used for remote method calls in both directions, see |msgpack-rpc|. Note that
+rpc channels are implicitly trusted and the process at the other end can
+invoke any |api| function!
+
+==============================================================================
+4. Using the stdio channel *channel-stdio*
+
+When invoked normally, nvim will use stdin and stdout to interact with the
+user over the terminal interface (TUI). However when invoked with
+`--headless`, the TUI is not started and stdin and stdout can be used as a
+channel. To open the stdio channel |stdioopen()| must be called during
+|startup|, as there later will be no way of invoking a command. As a
+convenience, the stdio channel will always have channel id 1.
+
+Here is an example:
+>
+ func! OnEvent(id, data, event)
+ if a:data == [""]
+ quit
+ end
+ call chansend(a:id, map(a:data, {i,v -> toupper(v)}))
+ endfunc
+
+ call stdioopen({'on_stdin': 'OnEvent'})
+<
+Put this in `uppercase.vim` and invoke nvim with
+>
+ nvim --headless --cmd "source uppercase.vim"
+<
+ *--embed*
+An common use case is another program embedding nvim and communicating with it
+over rpc. Therefore, the option `--embed` exists as a shorthand for
+`nvim --headless --cmd "call stdioopen({'rpc': v:true})"`
+
+Nvim's stderr is implicitly open as a write-only bytes channel. It will
+always have channel id 2, however to be explicit |v:stderr| can be used.
+
+==============================================================================
+ vim:tw=78:ts=8:noet:ft=help:norl:
diff --git a/runtime/doc/deprecated.txt b/runtime/doc/deprecated.txt
index f3d4f16244..72dfe1230e 100644
--- a/runtime/doc/deprecated.txt
+++ b/runtime/doc/deprecated.txt
@@ -37,6 +37,8 @@ Functions ~
*file_readable()* Obsolete name for |filereadable()|.
*highlight_exists()* Obsolete name for |hlexists()|.
*highlightID()* Obsolete name for |hlID()|.
+*jobclose()* Obsolete name for |chanclose()|
+*jobsend()* Obsolete name for |chansend()|
*last_buffer_nr()* Obsolete name for bufnr("$").
Modifiers ~
diff --git a/runtime/doc/eval.txt b/runtime/doc/eval.txt
index ea4393bc6f..d2a3a962e6 100644
--- a/runtime/doc/eval.txt
+++ b/runtime/doc/eval.txt
@@ -1818,6 +1818,13 @@ v:shell_error Result of the last shell command. When non-zero, the last
*v:statusmsg* *statusmsg-variable*
v:statusmsg Last given status message. It's allowed to set this variable.
+ *v:stderr* *stderr-variable*
+v:stderr Channel id for stderr. Unlike stdin and stdout (see
+ |stdioopen()|), stderr is always open for writing. This channel
+ ID is always 2, but this variable can be used to be explicit.
+ Example: >
+ :call chansend(v:stderr, "something bad happened\n")
+<
*v:swapname* *swapname-variable*
v:swapname Only valid when executing |SwapExists| autocommands: Name of
the swap file found. Read-only.
@@ -1989,6 +1996,8 @@ call({func}, {arglist} [, {dict}])
any call {func} with arguments {arglist}
ceil({expr}) Float round {expr} up
changenr() Number current change number
+chanclose({id}[, {stream}]) Number Closes a channel or one of its streams
+chansend({id}, {data}) Number Writes {data} to channel
char2nr({expr}[, {utf8}]) Number ASCII/UTF8 value of first char in {expr}
cindent({lnum}) Number C indent for line {lnum}
clearmatches() none clear all matches
@@ -2137,13 +2146,11 @@ isdirectory({directory}) Number |TRUE| if {directory} is a directory
islocked({expr}) Number |TRUE| if {expr} is locked
id({expr}) String identifier of the container
items({dict}) List key-value pairs in {dict}
-jobclose({job}[, {stream}]) Number Closes a job stream(s)
-jobpid({job}) Number Returns pid of a job.
-jobresize({job}, {width}, {height})
- Number Resize {job}'s pseudo terminal window
-jobsend({job}, {data}) Number Writes {data} to {job}'s stdin
+jobpid({id}) Number Returns pid of a job.
+jobresize({id}, {width}, {height})
+ Number Resize pseudo terminal window of a job
jobstart({cmd}[, {opts}]) Number Spawns {cmd} as a job
-jobstop({job}) Number Stops a job
+jobstop({id}) Number Stops a job
jobwait({ids}[, {timeout}]) Number Wait for a set of jobs
join({list} [, {sep}]) String join {list} items into one String
json_decode({expr}) any Convert {expr} from JSON
@@ -2226,7 +2233,6 @@ rpcnotify({channel}, {event}[, {args}...])
Sends an |RPC| notification to {channel}
rpcrequest({channel}, {method}[, {args}...])
Sends an |RPC| request to {channel}
-rpcstop({channel}) Closes an |RPC| {channel}
screenattr({row}, {col}) Number attribute at screen position
screenchar({row}, {col}) Number character at screen position
screencol() Number current cursor column
@@ -2268,6 +2274,8 @@ shiftwidth() Number effective value of 'shiftwidth'
simplify({filename}) String simplify filename as much as possible
sin({expr}) Float sine of {expr}
sinh({expr}) Float hyperbolic sine of {expr}
+sockconnect({mode}, {address} [, {opts}])
+ Number Connects to socket
sort({list} [, {func} [, {dict}]])
List sort {list}, using {func} to compare
soundfold({word}) String sound-fold {word}
@@ -2277,6 +2285,7 @@ spellsuggest({word} [, {max} [, {capital}]])
split({expr} [, {pat} [, {keepempty}]])
List make |List| from {pat} separated {expr}
sqrt({expr}) Float square root of {expr}
+stdioopen({dict}) Number open stdio in a headless instance.
str2float({expr}) Float convert String to Float
str2nr({expr} [, {base}]) Number convert String to Number
strchars({expr} [, {skipcc}]) Number character length of the String {expr}
@@ -2761,6 +2770,35 @@ changenr() *changenr()*
redo it is the number of the redone change. After undo it is
one less than the number of the undone change.
+chanclose({id}[, {stream}]) {Nvim} *chanclose()*
+ Close a channel or a specific stream associated with it.
+ For a job, {stream} can be one of "stdin", "stdout",
+ "stderr" or "rpc" (closes stdin/stdout for a job started
+ with `"rpc":v:true`) If {stream} is omitted, all streams
+ are closed. If the channel is a pty, this will then close the
+ pty master, sending SIGHUP to the job process.
+ For a socket, there is only one stream, and {stream} should be
+ ommited.
+
+chansend({id}, {data}) {Nvim} *chansend()*
+ Send data to channel {id}. For a job, it writes it to the
+ stdin of the process. For the stdio channel |channel-stdio|,
+ it writes to Nvim's stdout. Returns the number of bytes
+ written if the write succeeded, 0 otherwise.
+ See |channel-bytes| for more information.
+
+ {data} may be a string, string convertible, or a list. If
+ {data} is a list, the items will be joined by newlines; any
+ newlines in an item will be sent as NUL. To send a final
+ newline, include a final empty string. Example: >
+ :call chansend(id, ["abc", "123\n456", ""])
+< will send "abc<NL>123<NUL>456<NL>".
+
+ chansend() writes raw data, not RPC messages. If the channel
+ was created with `"rpc":v:true` then the channel expects RPC
+ messages, use |rpcnotify()| and |rpcrequest()| instead.
+
+
char2nr({expr} [, {utf8}]) *char2nr()*
Return number value of the first char in {expr}. Examples: >
char2nr(" ") returns 32
@@ -4931,12 +4969,6 @@ items({dict}) *items()*
entry and the value of this entry. The |List| is in arbitrary
order.
-jobclose({job}[, {stream}]) *jobclose()*
- Close {stream} of |job-id| {job}, where {stream} is one of:
- "stdin", "stdout", "stderr", "rpc" (RPC channel of a job
- started with `"rpc":v:true`). If {stream} is omitted, all
- streams are closed. If the job is a pty job, this will close
- the pty master, sending SIGHUP to the job process.
jobpid({job}) *jobpid()*
Return the PID (process id) of |job-id| {job}.
@@ -4946,22 +4978,6 @@ jobresize({job}, {width}, {height}) *jobresize()*
columns and {height} rows.
Fails if the job was not started with `"pty":v:true`.
-jobsend({job}, {data}) *jobsend()*
- Writes to stdin of the process associated with |job-id| {job}.
- Returns 1 if the write succeeded, 0 otherwise.
- See |job-control|.
-
- {data} may be a string, string convertible, or a list. If
- {data} is a list, the items will be joined by newlines; any
- newlines in an item will be sent as NUL. To send a final
- newline, include a final empty string. Example: >
- :call jobsend(j, ["abc", "123\n456", ""])
-< will send "abc<NL>123<NUL>456<NL>".
-
- jobsend() writes raw data, not RPC messages. If the job was
- created with `"rpc":v:true` then the channel expects RPC
- messages, use |rpcnotify()| and |rpcrequest()| instead.
-
jobstart({cmd}[, {opts}]) *jobstart()*
Spawns {cmd} as a job.
If {cmd} is a List it runs directly (no 'shell').
@@ -4971,6 +4987,11 @@ jobstart({cmd}[, {opts}]) *jobstart()*
Returns |job-id| on success, 0 on invalid arguments (or job
table is full), -1 if {cmd}[0] or 'shell' is not executable.
+ For communication over the job's stdio, it is represented as a
+ |channel|, and a channel ID is returned on success. Use
+ |chansend()| (or |rpcnotify()| and |rpcrequest()| if "rpc" option
+ was used) to send data to stdin and |chanclose()| to close stdio
+ streams without stopping the job explicitly.
See |job-control| and |rpc|.
@@ -4987,7 +5008,9 @@ jobstart({cmd}[, {opts}]) *jobstart()*
*jobstart-options*
{opts} is a dictionary with these keys:
|on_stdout|: stdout event handler (function name or |Funcref|)
+ stdout_buffered : read stdout in |buffered| mode.
|on_stderr|: stderr event handler (function name or |Funcref|)
+ stderr_buffered : read stderr in |buffered| mode.
|on_exit| : exit event handler (function name or |Funcref|)
cwd : Working directory of the job; defaults to
|current-directory|.
@@ -5009,9 +5032,14 @@ jobstart({cmd}[, {opts}]) *jobstart()*
{opts} is passed as |self| dictionary to the callback; the
caller may set other keys to pass application-specific data.
+ Returns:
+ - The channel ID on success
+ - 0 on invalid arguments
+ - -1 if {cmd}[0] is not executable.
+ See |job-control|, |channels|, and |msgpack-rpc| for more information.
-jobstop({job}) *jobstop()*
- Stop |job-id| {job} by sending SIGTERM to the job process. If
+jobstop({id}) *jobstop()*
+ Stop |job-id| {id} by sending SIGTERM to the job process. If
the process does not terminate after a timeout then SIGKILL
will be sent. When the job terminates its |on_exit| handler
(if any) will be invoked.
@@ -6328,13 +6356,11 @@ rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()*
:let id = jobstart(['prog', 'arg1', 'arg2'], {'rpc': v:true})
rpcstop({channel}) {Nvim} *rpcstop()*
- Closes an |RPC| {channel}. If the channel is a job
- started with |jobstart()| the job is killed.
- It is better to use |jobstop()| in this case, or use
- |jobclose|(id, "rpc") to only close the channel without
- killing the job.
- Closes the socket connection if the channel was opened by
- connecting to |v:servername|.
+ Deprecated. This function was used to stop a job with |rpc|
+ channel, and additionally closed rpc sockets. Instead use
+ |jobstop()| to stop any job, and |chanclose|(id, "rpc") to close
+ rpc communication without stopping the job. Use |chanclose|(id)
+ to close any socket.
screenattr({row}, {col}) *screenattr()*
Like |screenchar()|, but return the attribute. This is a rather
@@ -7034,15 +7060,20 @@ sockconnect({mode}, {address}, {opts}) *sockconnect()*
{address} should be the path of a named pipe. If {mode} is
"tcp" then {address} should be of the form "host:port" where
the host should be an ip adderess or host name, and port the
- port number. Currently only rpc sockets are supported, so
- {opts} must be passed with "rpc" set to |TRUE|.
+ port number.
+
+ Returns a |channel| ID. Close the socket with |chanclose()|.
+ Use |chansend()| to send data over a bytes socket, and
+ |rpcrequest()| and |rpcnotify()| to communicate with a RPC
+ socket.
{opts} is a dictionary with these keys:
- rpc : If set, |msgpack-rpc| will be used to communicate
- over the socket.
+ |on_data| : callback invoked when data was read from socket
+ data_buffered : read data from socket in |buffered| mode.
+ rpc : If set, |msgpack-rpc| will be used to communicate
+ over the socket.
Returns:
- - The channel ID on success, which is used by
- |rpcnotify()| and |rpcrequest()| and |rpcstop()|.
+ - The channel ID on success (greater than zero)
- 0 on invalid arguments or connection failure.
sort({list} [, {func} [, {dict}]]) *sort()* *E702*
@@ -7194,6 +7225,27 @@ sqrt({expr}) *sqrt()*
"nan" may be different, it depends on system libraries.
+stdioopen({opts}) *stdioopen()*
+ In a nvim launched with the |--headless| option, this opens
+ stdin and stdout as a |channel|. This function can only be
+ invoked once per instance. See |channel-stdio| for more
+ information and examples. Note that stderr is not handled by
+ this function, see |v:stderr|.
+
+ Returns a |channel| ID. Close the stdio descriptors with |chanclose()|.
+ Use |chansend()| to send data to stdout, and
+ |rpcrequest()| and |rpcnotify()| to communicate over RPC.
+
+ {opts} is a dictionary with these keys:
+ |on_stdin| : callback invoked when stdin is written to.
+ stdin_buffered : read stdin in |buffered| mode.
+ rpc : If set, |msgpack-rpc| will be used to communicate
+ over stdio
+ Returns:
+ - The channel ID on success (this is always 1)
+ - 0 on invalid arguments
+
+
str2float({expr}) *str2float()*
Convert String {expr} to a Float. This mostly works the same
as when using a floating point number in an expression, see
diff --git a/runtime/doc/job_control.txt b/runtime/doc/job_control.txt
index 7ba0acff48..7df43d6793 100644
--- a/runtime/doc/job_control.txt
+++ b/runtime/doc/job_control.txt
@@ -20,6 +20,8 @@ When a job starts it is assigned a number, unique for the life of the current
Nvim session. Functions like |jobstart()| return job ids. Functions like
|jobsend()|, |jobstop()|, |rpcnotify()|, and |rpcrequest()| take job ids.
+The job's stdio streams are represented as a |channel|. It is possible to send
+and recieve raw bytes, or use |msgpack-rpc|.
==============================================================================
Usage *job-control-usage*
@@ -40,9 +42,9 @@ Example: >
call append(line('$'), str)
endfunction
let s:callbacks = {
- \ 'on_stdout': function('s:JobHandler'),
- \ 'on_stderr': function('s:JobHandler'),
- \ 'on_exit': function('s:JobHandler')
+ \ 'on_stdout': function('s:OnEvent'),
+ \ 'on_stderr': function('s:OnEvent'),
+ \ 'on_exit': function('s:OnEvent')
\ }
let job1 = jobstart(['bash'], extend({'shell': 'shell 1'}, s:callbacks))
let job2 = jobstart(['bash', '-c', 'for i in {1..10}; do echo hello $i!; sleep 1; done'], extend({'shell': 'shell 2'}, s:callbacks))
@@ -59,26 +61,14 @@ Description of what happens:
- `JobHandler()` callback is passed to |jobstart()| to handle various job
events. It displays stdout/stderr data received from the shells.
- *on_stdout*
-Arguments passed to on_stdout callback:
- 0: |job-id|
- 1: List of lines read from the stream. If the last item is not "" (empty
- string), then it is an incomplete line that might be continued at the
- next on_stdout invocation. See Note 2 below.
- 2: Event type: "stdout"
- *on_stderr*
-Arguments passed to on_stderr callback:
- 0: |job-id|
- 1: List of lines read from the stream. If the last item is not "" (empty
- string), then it is an incomplete line that might be continued at the
- next on_stderr invocation. See Note 2 below.
- 2: Event type: "stderr"
+For |on_stdout| and |on_stderr| see |channel-callback|.
*on_exit*
Arguments passed to on_exit callback:
0: |job-id|
1: Exit-code of the process.
2: Event type: "exit"
+
Note: Buffered stdout/stderr data which has not been flushed by the sender
will not trigger the on_stdout/on_stderr callback (but if the process
ends, the on_exit callback will be invoked).
@@ -137,13 +127,19 @@ The above example could be written in this "object-oriented" style: >
let instance = Shell.new('bomb',
\ 'for i in $(seq 9 -1 1); do echo $i 1>&$((i % 2 + 1)); sleep 1; done')
<
-To send data to the job's stdin, use |jobsend()|: >
- :call jobsend(job1, "ls\n")
- :call jobsend(job1, "invalid-command\n")
- :call jobsend(job1, "exit\n")
+To send data to the job's stdin, use |chansend()|: >
+ :call chansend(job1, "ls\n")
+ :call chansend(job1, "invalid-command\n")
+ :call chansend(job1, "exit\n")
<
A job may be killed with |jobstop()|: >
:call jobstop(job1)
<
+A job may be killed at any time with the |jobstop()| function:
+>
+ :call jobstop(job1)
+<
+Individual streams can be closed without killing the job, see |chanclose()|.
+
==============================================================================
vim:tw=78:ts=8:noet:ft=help:norl:
diff --git a/runtime/doc/msgpack_rpc.txt b/runtime/doc/msgpack_rpc.txt
index a1453a6cc6..2d8f5af6d2 100644
--- a/runtime/doc/msgpack_rpc.txt
+++ b/runtime/doc/msgpack_rpc.txt
@@ -1,4 +1,3 @@
-*msgpack_rpc.txt* Nvim
NVIM REFERENCE MANUAL by Thiago de Arruda
@@ -61,24 +60,24 @@ To get a formatted dump of the API using python (requires the `pyyaml` and
==============================================================================
3. Connecting *rpc-connecting*
-There are several ways to open a msgpack-rpc channel to an Nvim instance:
+See |channel-intro|, for various ways to open a channel. Most of the channel
+opening functions take an `rpc` key in the options dictionary, to enable rpc.
- 1. Through stdin/stdout when `nvim` is started with `--embed`. This is how
- applications can embed Nvim.
+Additionally, rpc channels can be opened by other processes connecting to
+TCP/IP sockets or named pipes listened to by nvim.
- 2. Through stdin/stdout of some other process spawned by |jobstart()|.
- Set the "rpc" key to |v:true| in the options dict to use the job's stdin
- and stdout as a single msgpack channel that is processed directly by
- Nvim. Then it is not possible to process raw data to or from the
- process's stdin and stdout. stderr can still be used, though.
+An rpc socket is automatically created with each instance. The socket
+ location is stored in |v:servername|. By default this is a named pipe
+with an automatically generated address. See |XXX|.
- 3. Through the socket automatically created with each instance. The socket
- location is stored in |v:servername|.
-
- 4. Through a TCP/IP socket. To make Nvim listen on a TCP/IP socket, set the
- |$NVIM_LISTEN_ADDRESS| environment variable before starting Nvim: >
+To make Nvim listen on a TCP/IP socket instead, set the
+ |$NVIM_LISTEN_ADDRESS| environment variable before starting Nvim: >
NVIM_LISTEN_ADDRESS=127.0.0.1:6666 nvim
-<
+<Also, more sockets and named pipes can be listened on using |serverstart()|.
+
+Note that localhost TCP sockets are generally less secure than named pipes,
+and can lead to vunerabilities like remote code execution.
+
Connecting to the socket is the easiest way a programmer can test the API,
which can be done through any msgpack-rpc client library or full-featured
|api-client|. Here's a Ruby script that prints 'hello world!' in the current
diff --git a/runtime/doc/nvim_terminal_emulator.txt b/runtime/doc/nvim_terminal_emulator.txt
index 9dae69ae26..94e7899b6a 100644
--- a/runtime/doc/nvim_terminal_emulator.txt
+++ b/runtime/doc/nvim_terminal_emulator.txt
@@ -117,8 +117,8 @@ variables:
- *b:term_title* The settable title of the terminal, typically displayed in
the window title or tab title of a graphical terminal emulator. Programs
running in the terminal can set this title via an escape sequence.
-- *b:terminal_job_id* The nvim job ID of the job running in the terminal. See
- |job-control| for more information.
+- |'channel'| The nvim channel ID for the underlying PTY.
+ |chansend()| can be used to send input to the terminal.
- *b:terminal_job_pid* The PID of the top-level process running in the
terminal.
diff --git a/runtime/doc/options.txt b/runtime/doc/options.txt
index 8a8ad58efd..4fe2e07909 100644
--- a/runtime/doc/options.txt
+++ b/runtime/doc/options.txt
@@ -1213,6 +1213,12 @@ A jump table for the options with a short description can be found at |Q_op|.
< |Nvi| also has this option, but it only uses the first character.
See |cmdwin|.
+ *'channel'*
+'channel' number (default: 0)
+ local to buffer
+ |Channel| connected to the buffer. Currently only used by
+ |terminal-emulator|. Is 0 if no terminal is open. Cannot be changed.
+
*'charconvert'* *'ccv'* *E202* *E214* *E513*
'charconvert' 'ccv' string (default "")
global
diff --git a/runtime/doc/starting.txt b/runtime/doc/starting.txt
index 05e3f72b8d..9b33926d04 100644
--- a/runtime/doc/starting.txt
+++ b/runtime/doc/starting.txt
@@ -351,6 +351,8 @@ argument.
*--headless*
--headless Do not start the built-in UI.
+ See |channel-stdio| for how to use stdio for other purposes
+ instead.
See also |silent-mode|, which does start a (limited) UI.
==============================================================================
diff --git a/src/nvim/api/ui.c b/src/nvim/api/ui.c
index afbee09c1c..a9eaccfac5 100644
--- a/src/nvim/api/ui.c
+++ b/src/nvim/api/ui.c
@@ -252,7 +252,7 @@ static void remote_ui_flush(UI *ui)
{
UIData *data = ui->data;
if (data->buffer.size > 0) {
- channel_send_event(data->channel_id, "redraw", data->buffer);
+ rpc_send_event(data->channel_id, "redraw", data->buffer);
data->buffer = (Array)ARRAY_DICT_INIT;
}
}
diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c
index d2b0e329c9..f4ccf07bec 100644
--- a/src/nvim/api/vim.c
+++ b/src/nvim/api/vim.c
@@ -721,7 +721,7 @@ void nvim_subscribe(uint64_t channel_id, String event)
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
- channel_subscribe(channel_id, e);
+ rpc_subscribe(channel_id, e);
}
/// Unsubscribes to event broadcasts
@@ -737,7 +737,7 @@ void nvim_unsubscribe(uint64_t channel_id, String event)
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
- channel_unsubscribe(channel_id, e);
+ rpc_unsubscribe(channel_id, e);
}
Integer nvim_get_color_by_name(String name)
diff --git a/src/nvim/buffer_defs.h b/src/nvim/buffer_defs.h
index 559dffb945..f1cbcb2627 100644
--- a/src/nvim/buffer_defs.h
+++ b/src/nvim/buffer_defs.h
@@ -603,6 +603,7 @@ struct file_buffer {
char_u *b_p_bt; ///< 'buftype'
int b_has_qf_entry; ///< quickfix exists for buffer
int b_p_bl; ///< 'buflisted'
+ long b_p_channel; ///< 'channel'
int b_p_cin; ///< 'cindent'
char_u *b_p_cino; ///< 'cinoptions'
char_u *b_p_cink; ///< 'cinkeys'
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
new file mode 100644
index 0000000000..40af470bde
--- /dev/null
+++ b/src/nvim/channel.c
@@ -0,0 +1,752 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
+#include "nvim/api/ui.h"
+#include "nvim/channel.h"
+#include "nvim/eval.h"
+#include "nvim/event/socket.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
+#include "nvim/os/shell.h"
+#include "nvim/path.h"
+#include "nvim/ascii.h"
+
+static bool did_stdio = false;
+PMap(uint64_t) *channels = NULL;
+
+/// next free id for a job or rpc channel
+/// 1 is reserved for stdio channel
+/// 2 is reserved for stderr channel
+static uint64_t next_chan_id = CHAN_STDERR+1;
+
+
+typedef struct {
+ Channel *chan;
+ Callback *callback;
+ const char *type;
+ list_T *received;
+ int status;
+} ChannelEvent;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "channel.c.generated.h"
+#endif
+/// Teardown the module
+void channel_teardown(void)
+{
+ if (!channels) {
+ return;
+ }
+
+ Channel *channel;
+
+ map_foreach_value(channels, channel, {
+ channel_close(channel->id, kChannelPartAll, NULL);
+ });
+}
+
+/// Closes a channel
+///
+/// @param id The channel id
+/// @return true if successful, false otherwise
+bool channel_close(uint64_t id, ChannelPart part, const char **error)
+{
+ Channel *chan;
+ Process *proc;
+
+ const char *dummy;
+ if (!error) {
+ error = &dummy;
+ }
+
+ if (!(chan = find_channel(id))) {
+ if (id < next_chan_id) {
+ // allow double close, even though we can't say what parts was valid.
+ return true;
+ }
+ *error = (const char *)e_invchan;
+ return false;
+ }
+
+ bool close_main = false;
+ if (part == kChannelPartRpc || part == kChannelPartAll) {
+ close_main = true;
+ if (chan->is_rpc) {
+ rpc_close(chan);
+ } else if (part == kChannelPartRpc) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ } else if ((part == kChannelPartStdin || part == kChannelPartStdout)
+ && chan->is_rpc) {
+ *error = (const char *)e_invstreamrpc;
+ return false;
+ }
+
+ switch (chan->streamtype) {
+ case kChannelStreamSocket:
+ if (!close_main) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ stream_may_close(&chan->stream.socket);
+ break;
+
+ case kChannelStreamProc:
+ proc = (Process *)&chan->stream.proc;
+ if (part == kChannelPartStdin || close_main) {
+ stream_may_close(&proc->in);
+ }
+ if (part == kChannelPartStdout || close_main) {
+ stream_may_close(&proc->out);
+ }
+ if (part == kChannelPartStderr || part == kChannelPartAll) {
+ stream_may_close(&proc->err);
+ }
+ if (proc->type == kProcessTypePty && part == kChannelPartAll) {
+ pty_process_close_master(&chan->stream.pty);
+ }
+
+ break;
+
+ case kChannelStreamStdio:
+ if (part == kChannelPartStdin || close_main) {
+ stream_may_close(&chan->stream.stdio.in);
+ }
+ if (part == kChannelPartStdout || close_main) {
+ stream_may_close(&chan->stream.stdio.out);
+ }
+ if (part == kChannelPartStderr) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ break;
+
+ case kChannelStreamStderr:
+ if (part != kChannelPartAll && part != kChannelPartStderr) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ if (!chan->stream.err.closed) {
+ chan->stream.err.closed = true;
+ // Don't close on exit, in case late error messages
+ if (!exiting) {
+ fclose(stderr);
+ }
+ channel_decref(chan);
+ }
+ break;
+
+ case kChannelStreamInternal:
+ if (!close_main) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ break;
+ }
+
+ return true;
+}
+
+/// Initializes the module
+void channel_init(void)
+{
+ channels = pmap_new(uint64_t)();
+ channel_alloc(kChannelStreamStderr);
+ rpc_init();
+ remote_ui_init();
+}
+
+/// Allocates a channel.
+///
+/// Channel is allocated with refcount 1, which should be decreased
+/// when the underlying stream closes.
+static Channel *channel_alloc(ChannelStreamType type)
+{
+ Channel *chan = xcalloc(1, sizeof(*chan));
+ if (type == kChannelStreamStdio) {
+ chan->id = CHAN_STDIO;
+ } else if (type == kChannelStreamStderr) {
+ chan->id = CHAN_STDERR;
+ } else {
+ chan->id = next_chan_id++;
+ }
+ chan->events = multiqueue_new_child(main_loop.events);
+ chan->refcount = 1;
+ chan->streamtype = type;
+ pmap_put(uint64_t)(channels, chan->id, chan);
+ return chan;
+}
+
+/// Not implemented, only logging for now
+void channel_create_event(Channel *chan, char *ext_source)
+{
+#if MIN_LOG_LEVEL <= INFO_LOG_LEVEL
+ char *stream_desc, *mode_desc, *source;
+
+ switch (chan->streamtype) {
+ case kChannelStreamProc:
+ if (chan->stream.proc.type == kProcessTypePty) {
+ stream_desc = "pty job";
+ } else {
+ stream_desc = "job";
+ }
+ break;
+
+ case kChannelStreamStdio:
+ stream_desc = "stdio";
+ break;
+
+ case kChannelStreamSocket:
+ stream_desc = "socket";
+ break;
+
+ case kChannelStreamInternal:
+ stream_desc = "socket (internal)";
+ break;
+
+ default:
+ stream_desc = "?";
+ }
+
+ if (chan->is_rpc) {
+ mode_desc = ", rpc";
+ } else if (chan->term) {
+ mode_desc = ", terminal";
+ } else {
+ mode_desc = "";
+ }
+
+ if (ext_source) {
+ // TODO(bfredl): in a future improved traceback solution,
+ // external events should be included.
+ source = ext_source;
+ } else {
+ eval_format_source_name_line((char *)IObuff, sizeof(IObuff));
+ source = (char *)IObuff;
+ }
+
+ ILOG("new channel %" PRIu64 " (%s%s): %s", chan->id, stream_desc,
+ mode_desc, source);
+#else
+ (void)chan;
+ (void)ext_source;
+#endif
+}
+
+void channel_incref(Channel *channel)
+{
+ channel->refcount++;
+}
+
+void channel_decref(Channel *channel)
+{
+ if (!(--channel->refcount)) {
+ multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel);
+ }
+}
+
+void callback_reader_free(CallbackReader *reader)
+{
+ callback_free(&reader->cb);
+ if (reader->buffered) {
+ ga_clear(&reader->buffer);
+ }
+}
+
+void callback_reader_start(CallbackReader *reader)
+{
+ if (reader->buffered) {
+ ga_init(&reader->buffer, sizeof(char *), 32);
+ ga_grow(&reader->buffer, 32);
+ }
+}
+
+static void free_channel_event(void **argv)
+{
+ Channel *channel = argv[0];
+ if (channel->is_rpc) {
+ rpc_free(channel);
+ }
+
+ callback_reader_free(&channel->on_stdout);
+ callback_reader_free(&channel->on_stderr);
+ callback_free(&channel->on_exit);
+
+ pmap_del(uint64_t)(channels, channel->id);
+ multiqueue_free(channel->events);
+ xfree(channel);
+}
+
+static void channel_destroy_early(Channel *chan)
+{
+ if ((chan->id != --next_chan_id)) {
+ abort();
+ }
+
+ if ((--chan->refcount != 0)) {
+ abort();
+ }
+
+ free_channel_event((void **)&chan);
+}
+
+
+static void close_cb(Stream *stream, void *data)
+{
+ channel_decref(data);
+}
+
+Channel *channel_job_start(char **argv, CallbackReader on_stdout,
+ CallbackReader on_stderr, Callback on_exit,
+ bool pty, bool rpc, bool detach, const char *cwd,
+ uint16_t pty_width, uint16_t pty_height,
+ char *term_name, varnumber_T *status_out)
+{
+ Channel *chan = channel_alloc(kChannelStreamProc);
+ chan->on_stdout = on_stdout;
+ chan->on_stderr = on_stderr;
+ chan->on_exit = on_exit;
+ chan->is_rpc = rpc;
+
+ if (pty) {
+ if (detach) {
+ EMSG2(_(e_invarg2), "terminal/pty job cannot be detached");
+ shell_free_argv(argv);
+ xfree(term_name);
+ channel_destroy_early(chan);
+ *status_out = 0;
+ return NULL;
+ }
+ chan->stream.pty = pty_process_init(&main_loop, chan);
+ if (pty_width > 0) {
+ chan->stream.pty.width = pty_width;
+ }
+ if (pty_height > 0) {
+ chan->stream.pty.height = pty_height;
+ }
+ if (term_name) {
+ chan->stream.pty.term_name = term_name;
+ }
+ } else {
+ chan->stream.uv = libuv_process_init(&main_loop, chan);
+ }
+
+ Process *proc = (Process *)&chan->stream.proc;
+ proc->argv = argv;
+ proc->cb = channel_process_exit_cb;
+ proc->events = chan->events;
+ proc->detach = detach;
+ proc->cwd = cwd;
+
+ char *cmd = xstrdup(proc->argv[0]);
+ bool has_out, has_err;
+ if (proc->type == kProcessTypePty) {
+ has_out = true;
+ has_err = false;
+ } else {
+ has_out = chan->is_rpc || callback_reader_set(chan->on_stdout);
+ has_err = callback_reader_set(chan->on_stderr);
+ }
+ int status = process_spawn(proc, true, has_out, has_err);
+ if (status) {
+ EMSG3(_(e_jobspawn), os_strerror(status), cmd);
+ xfree(cmd);
+ if (proc->type == kProcessTypePty) {
+ xfree(chan->stream.pty.term_name);
+ }
+ channel_destroy_early(chan);
+ *status_out = proc->status;
+ return NULL;
+ }
+ xfree(cmd);
+
+ wstream_init(&proc->in, 0);
+ if (has_out) {
+ rstream_init(&proc->out, 0);
+ }
+
+ if (chan->is_rpc) {
+ // the rpc takes over the in and out streams
+ rpc_start(chan);
+ } else {
+ if (has_out) {
+ callback_reader_start(&chan->on_stdout);
+ rstream_start(&proc->out, on_job_stdout, chan);
+ }
+ }
+
+ if (has_err) {
+ callback_reader_start(&chan->on_stderr);
+ rstream_init(&proc->err, 0);
+ rstream_start(&proc->err, on_job_stderr, chan);
+ }
+
+ *status_out = (varnumber_T)chan->id;
+ return chan;
+}
+
+
+uint64_t channel_connect(bool tcp, const char *address,
+ bool rpc, CallbackReader on_output,
+ int timeout, const char **error)
+{
+ if (!tcp && rpc) {
+ char *path = fix_fname(address);
+ if (server_owns_pipe_address(path)) {
+ // avoid deadlock
+ xfree(path);
+ return channel_create_internal_rpc();
+ }
+ xfree(path);
+ }
+
+ Channel *channel = channel_alloc(kChannelStreamSocket);
+ if (!socket_connect(&main_loop, &channel->stream.socket,
+ tcp, address, timeout, error)) {
+ channel_destroy_early(channel);
+ return 0;
+ }
+
+ channel->stream.socket.internal_close_cb = close_cb;
+ channel->stream.socket.internal_data = channel;
+ wstream_init(&channel->stream.socket, 0);
+ rstream_init(&channel->stream.socket, 0);
+
+ if (rpc) {
+ rpc_start(channel);
+ } else {
+ channel->on_stdout = on_output;
+ callback_reader_start(&channel->on_stdout);
+ rstream_start(&channel->stream.socket, on_socket_output, channel);
+ }
+
+ channel_create_event(channel, NULL);
+ return channel->id;
+}
+
+/// Creates an RPC channel from a tcp/pipe socket connection
+///
+/// @param watcher The SocketWatcher ready to accept the connection
+void channel_from_connection(SocketWatcher *watcher)
+{
+ Channel *channel = channel_alloc(kChannelStreamSocket);
+ socket_watcher_accept(watcher, &channel->stream.socket);
+ channel->stream.socket.internal_close_cb = close_cb;
+ channel->stream.socket.internal_data = channel;
+ wstream_init(&channel->stream.socket, 0);
+ rstream_init(&channel->stream.socket, 0);
+ rpc_start(channel);
+ channel_create_event(channel, watcher->addr);
+}
+
+/// Creates a loopback channel. This is used to avoid deadlock
+/// when an instance connects to its own named pipe.
+static uint64_t channel_create_internal_rpc(void)
+{
+ Channel *channel = channel_alloc(kChannelStreamInternal);
+ rpc_start(channel);
+ return channel->id;
+}
+
+/// Creates an API channel from stdin/stdout. This is used when embedding
+/// Neovim
+uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
+ const char **error)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (!headless_mode) {
+ *error = _("can only be opened in headless mode");
+ return 0;
+ }
+
+ if (did_stdio) {
+ *error = _("channel was already open");
+ return 0;
+ }
+ did_stdio = true;
+
+ Channel *channel = channel_alloc(kChannelStreamStdio);
+
+ rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0);
+ wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0);
+
+ if (rpc) {
+ rpc_start(channel);
+ } else {
+ channel->on_stdout = on_output;
+ callback_reader_start(&channel->on_stdout);
+ rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
+ }
+
+ return channel->id;
+}
+
+/// @param data will be consumed
+size_t channel_send(uint64_t id, char *data, size_t len, const char **error)
+{
+ Channel *chan = find_channel(id);
+ if (!chan) {
+ EMSG(_(e_invchan));
+ goto err;
+ }
+
+ if (chan->streamtype == kChannelStreamStderr) {
+ if (chan->stream.err.closed) {
+ *error = _("Can't send data to closed stream");
+ goto err;
+ }
+ // unbuffered write
+ size_t written = fwrite(data, len, 1, stderr);
+ xfree(data);
+ return len * written;
+ }
+
+
+ Stream *in = channel_instream(chan);
+ if (in->closed) {
+ *error = _("Can't send data to closed stream");
+ goto err;
+ }
+
+ if (chan->is_rpc) {
+ *error = _("Can't send raw data to rpc channel");
+ goto err;
+ }
+
+ WBuffer *buf = wstream_new_buffer(data, len, 1, xfree);
+ return wstream_write(in, buf) ? len : 0;
+
+err:
+ xfree(data);
+ return 0;
+}
+
+/// NB: mutates buf in place!
+static list_T *buffer_to_tv_list(char *buf, size_t count)
+{
+ list_T *ret = tv_list_alloc();
+ char *ptr = buf;
+ size_t remaining = count;
+ size_t off = 0;
+
+ while (off < remaining) {
+ // append the line
+ if (ptr[off] == NL) {
+ tv_list_append_string(ret, ptr, (ssize_t)off);
+ size_t skip = off + 1;
+ ptr += skip;
+ remaining -= skip;
+ off = 0;
+ continue;
+ }
+ if (ptr[off] == NUL) {
+ // Translate NUL to NL
+ ptr[off] = NL;
+ }
+ off++;
+ }
+ tv_list_append_string(ret, ptr, (ssize_t)off);
+ return ret;
+}
+
+// vimscript job callbacks must be executed on Nvim main loop
+static inline void process_channel_event(Channel *chan, Callback *callback,
+ const char *type, char *buf,
+ size_t count, int status)
+{
+ assert(callback);
+ ChannelEvent *event_data = xmalloc(sizeof(*event_data));
+ event_data->received = NULL;
+ if (buf) {
+ event_data->received = buffer_to_tv_list(buf, count);
+ } else {
+ event_data->status = status;
+ }
+ channel_incref(chan); // Hold on ref to callback
+ event_data->chan = chan;
+ event_data->callback = callback;
+ event_data->type = type;
+
+ multiqueue_put(chan->events, on_channel_event, 1, event_data);
+}
+
+void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
+}
+
+void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
+}
+
+static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
+}
+
+static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
+}
+
+static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
+ size_t count, bool eof, CallbackReader *reader,
+ const char *type)
+{
+ // stub variable, to keep reading consistent with the order of events, only
+ // consider the count parameter.
+ size_t r;
+ char *ptr = rbuffer_read_ptr(buf, &r);
+
+ if (eof) {
+ if (reader->buffered) {
+ if (reader->cb.type != kCallbackNone) {
+ process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len, 0);
+ ga_clear(&reader->buffer);
+ } else if (reader->self) {
+ list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len);
+ tv_dict_add_list(reader->self, type, strlen(type), data);
+ } else {
+ abort();
+ }
+ } else if (reader->cb.type != kCallbackNone) {
+ process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
+ }
+ return;
+ }
+
+ // The order here matters, the terminal must receive the data first because
+ // process_channel_event will modify the read buffer(convert NULs into NLs)
+ if (chan->term) {
+ terminal_receive(chan->term, ptr, count);
+ }
+
+ rbuffer_consumed(buf, count);
+ if (reader->buffered) {
+ ga_concat_len(&reader->buffer, ptr, count);
+ } else if (callback_reader_set(*reader)) {
+ process_channel_event(chan, &reader->cb, type, ptr, count, 0);
+ }
+}
+
+static void channel_process_exit_cb(Process *proc, int status, void *data)
+{
+ Channel *chan = data;
+ if (chan->term) {
+ char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN];
+ snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
+ terminal_close(chan->term, msg);
+ }
+
+ // if status is -1 the process did not really exit,
+ // we just closed the handle onto a detached process
+ if (status >= 0) {
+ process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
+ }
+
+ channel_decref(chan);
+}
+
+static void on_channel_event(void **args)
+{
+ ChannelEvent *ev = (ChannelEvent *)args[0];
+
+ typval_T argv[4];
+
+ argv[0].v_type = VAR_NUMBER;
+ argv[0].v_lock = VAR_UNLOCKED;
+ argv[0].vval.v_number = (varnumber_T)ev->chan->id;
+
+ if (ev->received) {
+ argv[1].v_type = VAR_LIST;
+ argv[1].v_lock = VAR_UNLOCKED;
+ argv[1].vval.v_list = ev->received;
+ argv[1].vval.v_list->lv_refcount++;
+ } else {
+ argv[1].v_type = VAR_NUMBER;
+ argv[1].v_lock = VAR_UNLOCKED;
+ argv[1].vval.v_number = ev->status;
+ }
+
+ argv[2].v_type = VAR_STRING;
+ argv[2].v_lock = VAR_UNLOCKED;
+ argv[2].vval.v_string = (uint8_t *)ev->type;
+
+ typval_T rettv = TV_INITIAL_VALUE;
+ callback_call(ev->callback, 3, argv, &rettv);
+ tv_clear(&rettv);
+ channel_decref(ev->chan);
+ xfree(ev);
+}
+
+
+/// Open terminal for channel
+///
+/// Channel `chan` is assumed to be an open pty channel,
+/// and curbuf is assumed to be a new, unmodified buffer.
+void channel_terminal_open(Channel *chan)
+{
+ TerminalOptions topts;
+ topts.data = chan;
+ topts.width = chan->stream.pty.width;
+ topts.height = chan->stream.pty.height;
+ topts.write_cb = term_write;
+ topts.resize_cb = term_resize;
+ topts.close_cb = term_close;
+ curbuf->b_p_channel = (long)chan->id; // 'channel' option
+ Terminal *term = terminal_open(topts);
+ chan->term = term;
+ channel_incref(chan);
+}
+
+static void term_write(char *buf, size_t size, void *data)
+{
+ Channel *chan = data;
+ if (chan->stream.proc.in.closed) {
+ // If the backing stream was closed abruptly, there may be write events
+ // ahead of the terminal close event. Just ignore the writes.
+ ILOG("write failed: stream is closed");
+ return;
+ }
+ WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
+ wstream_write(&chan->stream.proc.in, wbuf);
+}
+
+static void term_resize(uint16_t width, uint16_t height, void *data)
+{
+ Channel *chan = data;
+ pty_process_resize(&chan->stream.pty, width, height);
+}
+
+static inline void term_delayed_free(void **argv)
+{
+ Channel *chan = argv[0];
+ if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) {
+ multiqueue_put(chan->events, term_delayed_free, 1, chan);
+ return;
+ }
+
+ terminal_destroy(chan->term);
+ chan->term = NULL;
+ channel_decref(chan);
+}
+
+static void term_close(void *data)
+{
+ Channel *chan = data;
+ process_stop(&chan->stream.proc);
+ multiqueue_put(chan->events, term_delayed_free, 1, data);
+}
+
diff --git a/src/nvim/channel.h b/src/nvim/channel.h
new file mode 100644
index 0000000000..b856d197f1
--- /dev/null
+++ b/src/nvim/channel.h
@@ -0,0 +1,134 @@
+#ifndef NVIM_CHANNEL_H
+#define NVIM_CHANNEL_H
+
+#include "nvim/main.h"
+#include "nvim/event/socket.h"
+#include "nvim/event/process.h"
+#include "nvim/os/pty_process.h"
+#include "nvim/event/libuv_process.h"
+#include "nvim/eval/typval.h"
+#include "nvim/msgpack_rpc/channel_defs.h"
+
+#define CHAN_STDIO 1
+#define CHAN_STDERR 2
+
+typedef enum {
+ kChannelStreamProc,
+ kChannelStreamSocket,
+ kChannelStreamStdio,
+ kChannelStreamStderr,
+ kChannelStreamInternal
+} ChannelStreamType;
+
+typedef enum {
+ kChannelPartStdin,
+ kChannelPartStdout,
+ kChannelPartStderr,
+ kChannelPartRpc,
+ kChannelPartAll
+} ChannelPart;
+
+
+typedef struct {
+ Stream in;
+ Stream out;
+} StdioPair;
+
+typedef struct {
+ bool closed;
+} StderrState;
+
+typedef struct {
+ Callback cb;
+ dict_T *self;
+ garray_T buffer;
+ bool buffered;
+} CallbackReader;
+
+#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
+ .self = NULL, \
+ .buffer = GA_EMPTY_INIT_VALUE, \
+ .buffered = false })
+static inline bool callback_reader_set(CallbackReader reader)
+{
+ return reader.cb.type != kCallbackNone || reader.self;
+}
+
+struct Channel {
+ uint64_t id;
+ size_t refcount;
+ MultiQueue *events;
+
+ ChannelStreamType streamtype;
+ union {
+ Process proc;
+ LibuvProcess uv;
+ PtyProcess pty;
+ Stream socket;
+ StdioPair stdio;
+ StderrState err;
+ } stream;
+
+ bool is_rpc;
+ RpcState rpc;
+ Terminal *term;
+
+ CallbackReader on_stdout;
+ CallbackReader on_stderr;
+ Callback on_exit;
+};
+
+EXTERN PMap(uint64_t) *channels;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "channel.h.generated.h"
+#endif
+
+/// @returns Channel with the id or NULL if not found
+static inline Channel *find_channel(uint64_t id)
+{
+ return pmap_get(uint64_t)(channels, id);
+}
+
+static inline Stream *channel_instream(Channel *chan)
+ FUNC_ATTR_NONNULL_ALL
+{
+ switch (chan->streamtype) {
+ case kChannelStreamProc:
+ return &chan->stream.proc.in;
+
+ case kChannelStreamSocket:
+ return &chan->stream.socket;
+
+ case kChannelStreamStdio:
+ return &chan->stream.stdio.out;
+
+ case kChannelStreamInternal:
+ case kChannelStreamStderr:
+ abort();
+ }
+ abort();
+}
+
+static inline Stream *channel_outstream(Channel *chan)
+ FUNC_ATTR_NONNULL_ALL
+{
+ switch (chan->streamtype) {
+ case kChannelStreamProc:
+ return &chan->stream.proc.out;
+
+ case kChannelStreamSocket:
+ return &chan->stream.socket;
+
+ case kChannelStreamStdio:
+ return &chan->stream.stdio.in;
+
+ case kChannelStreamInternal:
+ case kChannelStreamStderr:
+ abort();
+ }
+ abort();
+}
+
+
+#endif // NVIM_CHANNEL_H
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index f414e771d7..577aa67c60 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -24,6 +24,7 @@
#endif
#include "nvim/eval.h"
#include "nvim/buffer.h"
+#include "nvim/channel.h"
#include "nvim/charset.h"
#include "nvim/cursor.h"
#include "nvim/diff.h"
@@ -365,6 +366,7 @@ static struct vimvar {
VV(VV_DYING, "dying", VAR_NUMBER, VV_RO),
VV(VV_EXCEPTION, "exception", VAR_STRING, VV_RO),
VV(VV_THROWPOINT, "throwpoint", VAR_STRING, VV_RO),
+ VV(VV_STDERR, "stderr", VAR_NUMBER, VV_RO),
VV(VV_REG, "register", VAR_STRING, VV_RO),
VV(VV_CMDBANG, "cmdbang", VAR_NUMBER, VV_RO),
VV(VV_INSERTMODE, "insertmode", VAR_STRING, VV_RO),
@@ -437,31 +439,6 @@ static ScopeDictDictItem vimvars_var;
#define vimvarht vimvardict.dv_hashtab
typedef struct {
- union {
- LibuvProcess uv;
- PtyProcess pty;
- } proc;
- Stream in, out, err; // Initialized in common_job_start().
- Terminal *term;
- bool stopped;
- bool exited;
- bool rpc;
- int refcount;
- Callback on_stdout, on_stderr, on_exit;
- varnumber_T *status_ptr;
- uint64_t id;
- MultiQueue *events;
-} TerminalJobData;
-
-typedef struct {
- TerminalJobData *data;
- Callback *callback;
- const char *type;
- list_T *received;
- int status;
-} JobEvent;
-
-typedef struct {
TimeWatcher tw;
int timer_id;
int repeat_count;
@@ -513,7 +490,6 @@ typedef enum {
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
-static PMap(uint64_t) *jobs = NULL;
static uint64_t last_timer_id = 0;
static PMap(uint64_t) *timers = NULL;
@@ -556,7 +532,6 @@ void eval_init(void)
{
vimvars[VV_VERSION].vv_nr = VIM_VERSION_100;
- jobs = pmap_new(uint64_t)();
timers = pmap_new(uint64_t)();
struct vimvar *p;
@@ -612,6 +587,7 @@ void eval_init(void)
v_event->dv_lock = VAR_FIXED;
set_vim_var_dict(VV_EVENT, v_event);
set_vim_var_list(VV_ERRORS, tv_list_alloc());
+ set_vim_var_nr(VV_STDERR, CHAN_STDERR);
set_vim_var_nr(VV_SEARCHFORWARD, 1L);
set_vim_var_nr(VV_HLSEARCH, 1L);
set_vim_var_nr(VV_COUNT1, 1);
@@ -5139,12 +5115,12 @@ bool garbage_collect(bool testing)
// named functions (matters for closures)
ABORTING(set_ref_in_functions(copyID));
- // Jobs
+ // Channels
{
- TerminalJobData *data;
- map_foreach_value(jobs, data, {
- set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL);
- set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL);
+ Channel *data;
+ map_foreach_value(channels, data, {
+ set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL);
+ set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL);
set_ref_in_callback(&data->on_exit, copyID, NULL, NULL);
})
}
@@ -7348,6 +7324,76 @@ static void f_changenr(typval_T *argvars, typval_T *rettv, FunPtr fptr)
rettv->vval.v_number = curbuf->b_u_seq_cur;
}
+// "chanclose(id[, stream])" function
+static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
+{
+ rettv->v_type = VAR_NUMBER;
+ rettv->vval.v_number = 0;
+
+ if (check_restricted() || check_secure()) {
+ return;
+ }
+
+ if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING
+ && argvars[1].v_type != VAR_UNKNOWN)) {
+ EMSG(_(e_invarg));
+ return;
+ }
+
+ ChannelPart part = kChannelPartAll;
+ if (argvars[1].v_type == VAR_STRING) {
+ char *stream = (char *)argvars[1].vval.v_string;
+ if (!strcmp(stream, "stdin")) {
+ part = kChannelPartStdin;
+ } else if (!strcmp(stream, "stdout")) {
+ part = kChannelPartStdout;
+ } else if (!strcmp(stream, "stderr")) {
+ part = kChannelPartStderr;
+ } else if (!strcmp(stream, "rpc")) {
+ part = kChannelPartRpc;
+ } else {
+ EMSG2(_("Invalid channel stream \"%s\""), stream);
+ return;
+ }
+ }
+ const char *error;
+ rettv->vval.v_number = channel_close(argvars[0].vval.v_number, part, &error);
+ if (!rettv->vval.v_number) {
+ EMSG(error);
+ }
+}
+
+// "chansend(id, data)" function
+static void f_chansend(typval_T *argvars, typval_T *rettv, FunPtr fptr)
+{
+ rettv->v_type = VAR_NUMBER;
+ rettv->vval.v_number = 0;
+
+ if (check_restricted() || check_secure()) {
+ return;
+ }
+
+ if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) {
+ // First argument is the channel id and second is the data to write
+ EMSG(_(e_invarg));
+ return;
+ }
+
+ ptrdiff_t input_len = 0;
+ char *input = save_tv_as_string(&argvars[1], &input_len, false);
+ if (!input) {
+ // Either the error has been handled by save_tv_as_string(),
+ // or there is no input to send.
+ return;
+ }
+ uint64_t id = argvars[0].vval.v_number;
+ const char *error = NULL;
+ rettv->vval.v_number = channel_send(id, input, input_len, &error);
+ if (error) {
+ EMSG(error);
+ }
+}
+
/*
* "char2nr(string)" function
*/
@@ -11417,68 +11463,6 @@ static void f_items(typval_T *argvars, typval_T *rettv, FunPtr fptr)
dict_list(argvars, rettv, 2);
}
-// "jobclose(id[, stream])" function
-static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
-{
- rettv->v_type = VAR_NUMBER;
- rettv->vval.v_number = 0;
-
- if (check_restricted() || check_secure()) {
- return;
- }
-
- if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING
- && argvars[1].v_type != VAR_UNKNOWN)) {
- EMSG(_(e_invarg));
- return;
- }
-
- TerminalJobData *data = find_job(argvars[0].vval.v_number);
- if (!data) {
- EMSG(_(e_invjob));
- return;
- }
-
- Process *proc = (Process *)&data->proc;
-
- if (argvars[1].v_type == VAR_STRING) {
- char *stream = (char *)argvars[1].vval.v_string;
- if (!strcmp(stream, "stdin")) {
- if (data->rpc) {
- EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
- } else {
- process_close_in(proc);
- }
- } else if (!strcmp(stream, "stdout")) {
- if (data->rpc) {
- EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
- } else {
- process_close_out(proc);
- }
- } else if (!strcmp(stream, "stderr")) {
- process_close_err(proc);
- } else if (!strcmp(stream, "rpc")) {
- if (data->rpc) {
- channel_close(data->id);
- } else {
- EMSG(_("Invalid job stream: Not an rpc job"));
- }
- } else {
- EMSG2(_("Invalid job stream \"%s\""), stream);
- }
- } else {
- if (data->rpc) {
- channel_close(data->id);
- process_close_err(proc);
- } else {
- process_close_streams(proc);
- if (proc->type == kProcessTypePty) {
- pty_process_close_master(&data->proc.pty);
- }
- }
- }
-}
-
// "jobpid(id)" function
static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
@@ -11494,61 +11478,15 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)
return;
}
- TerminalJobData *data = find_job(argvars[0].vval.v_number);
+ Channel *data = find_job(argvars[0].vval.v_number, true);
if (!data) {
- EMSG(_(e_invjob));
return;
}
- Process *proc = (Process *)&data->proc;
+ Process *proc = (Process *)&data->stream.proc;
rettv->vval.v_number = proc->pid;
}
-// "jobsend()" function
-static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr)
-{
- rettv->v_type = VAR_NUMBER;
- rettv->vval.v_number = 0;
-
- if (check_restricted() || check_secure()) {
- return;
- }
-
- if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) {
- // First argument is the job id and second is the string or list to write
- // to the job's stdin
- EMSG(_(e_invarg));
- return;
- }
-
- TerminalJobData *data = find_job(argvars[0].vval.v_number);
- if (!data) {
- EMSG(_(e_invjob));
- return;
- }
-
- if (((Process *)&data->proc)->in->closed) {
- EMSG(_("Can't send data to the job: stdin is closed"));
- return;
- }
-
- if (data->rpc) {
- EMSG(_("Can't send raw data to rpc channel"));
- return;
- }
-
- ptrdiff_t input_len = 0;
- char *input = save_tv_as_string(&argvars[1], &input_len, false);
- if (!input) {
- // Either the error has been handled by save_tv_as_string(), or there is no
- // input to send.
- return;
- }
-
- WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree);
- rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf);
-}
-
// "jobresize(job, width, height)" function
static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
@@ -11567,19 +11505,18 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
- TerminalJobData *data = find_job(argvars[0].vval.v_number);
+ Channel *data = find_job(argvars[0].vval.v_number, true);
if (!data) {
- EMSG(_(e_invjob));
return;
}
- if (data->proc.uv.process.type != kProcessTypePty) {
- EMSG(_(e_jobnotpty));
+ if (data->stream.proc.type != kProcessTypePty) {
+ EMSG(_(e_channotpty));
return;
}
- pty_process_resize(&data->proc.pty, argvars[1].vval.v_number,
- argvars[2].vval.v_number);
+ pty_process_resize(&data->stream.pty, argvars[1].vval.v_number,
+ argvars[2].vval.v_number);
rettv->vval.v_number = 1;
}
@@ -11664,8 +11601,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr)
bool detach = false;
bool rpc = false;
bool pty = false;
- Callback on_stdout = CALLBACK_NONE;
- Callback on_stderr = CALLBACK_NONE;
+ CallbackReader on_stdout = CALLBACK_READER_INIT,
+ on_stderr = CALLBACK_READER_INIT;
Callback on_exit = CALLBACK_NONE;
char *cwd = NULL;
if (argvars[1].v_type == VAR_DICT) {
@@ -11697,32 +11634,21 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
}
- TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
- pty, rpc, detach, cwd);
- Process *proc = (Process *)&data->proc;
+ uint16_t width = 0, height = 0;
+ char *term_name = NULL;
if (pty) {
- uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width");
- if (width > 0) {
- data->proc.pty.width = width;
- }
- uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height");
- if (height > 0) {
- data->proc.pty.height = height;
- }
- char *term = tv_dict_get_string(job_opts, "TERM", true);
- if (term) {
- data->proc.pty.term_name = term;
- }
+ width = (uint16_t)tv_dict_get_number(job_opts, "width");
+ height = (uint16_t)tv_dict_get_number(job_opts, "height");
+ term_name = tv_dict_get_string(job_opts, "TERM", true);
}
- if (!rpc && on_stdout.type == kCallbackNone) {
- proc->out = NULL;
+ Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit, pty,
+ rpc, detach, cwd, width, height, term_name,
+ &rettv->vval.v_number);
+ if (chan) {
+ channel_create_event(chan, NULL);
}
- if (on_stderr.type == kCallbackNone) {
- proc->err = NULL;
- }
- common_job_start(data, rettv);
}
// "jobstop()" function
@@ -11742,14 +11668,12 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
- TerminalJobData *data = find_job(argvars[0].vval.v_number);
+ Channel *data = find_job(argvars[0].vval.v_number, true);
if (!data) {
- EMSG(_(e_invjob));
return;
}
- process_stop((Process *)&data->proc);
- data->stopped = true;
+ process_stop((Process *)&data->stream.proc);
rettv->vval.v_number = 1;
}
@@ -11769,28 +11693,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
return;
}
+
list_T *args = argvars[0].vval.v_list;
- list_T *rv = tv_list_alloc();
+ Channel **jobs = xcalloc(args->lv_len, sizeof(*jobs));
ui_busy_start();
MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop);
// For each item in the input list append an integer to the output list. -3
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
- for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- TerminalJobData *data = NULL;
+
+ int i = 0;
+ for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next, i++) {
+ Channel *chan = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = find_job(arg->li_tv.vval.v_number))) {
- tv_list_append_number(rv, -3);
+ || !(chan = find_job(arg->li_tv.vval.v_number, false))) {
+ jobs[i] = NULL;
} else {
- // append the list item and set the status pointer so we'll collect the
- // status code when the job exits
- tv_list_append_number(rv, -1);
- data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
- // Process any pending events for the job because we'll temporarily
- // replace the parent queue
- multiqueue_process_events(data->events);
- multiqueue_replace_parent(data->events, waiting_jobs);
+ jobs[i] = chan;
+ channel_incref(chan);
+ if (chan->stream.proc.status < 0) {
+ // Process any pending events for the job because we'll temporarily
+ // replace the parent queue
+ multiqueue_process_events(chan->events);
+ multiqueue_replace_parent(chan->events, waiting_jobs);
+ }
}
}
@@ -11801,24 +11728,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
before = os_hrtime();
}
- for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- TerminalJobData *data = NULL;
+ for (i = 0; i < args->lv_len; i++) {
if (remaining == 0) {
// timed out
break;
}
- if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = find_job(arg->li_tv.vval.v_number))) {
+
+ // if the job already exited, but wasn't freed yet
+ if (jobs[i] == NULL || jobs[i]->stream.proc.status >= 0) {
continue;
}
- int status = process_wait((Process *)&data->proc, remaining, waiting_jobs);
+
+ int status = process_wait(&jobs[i]->stream.proc, remaining,
+ waiting_jobs);
if (status < 0) {
// interrupted or timed out, skip remaining jobs.
- if (status == -2) {
- // set the status so the user can distinguish between interrupted and
- // skipped/timeout jobs.
- *data->status_ptr = -2;
- }
break;
}
if (remaining > 0) {
@@ -11831,30 +11755,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
}
- for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- TerminalJobData *data = NULL;
- if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = find_job(arg->li_tv.vval.v_number))) {
- continue;
- }
- // remove the status pointer because the list may be freed before the
- // job exits
- data->status_ptr = NULL;
- }
+ list_T *rv = tv_list_alloc();
// restore the parent queue for any jobs still alive
- for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
- TerminalJobData *data = NULL;
- if (arg->li_tv.v_type != VAR_NUMBER
- || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
+ for (i = 0; i < args->lv_len; i++) {
+ if (jobs[i] == NULL) {
+ tv_list_append_number(rv, -3);
continue;
}
// restore the parent queue for the job
- multiqueue_process_events(data->events);
- multiqueue_replace_parent(data->events, main_loop.events);
+ multiqueue_process_events(jobs[i]->events);
+ multiqueue_replace_parent(jobs[i]->events, main_loop.events);
+
+ tv_list_append_number(rv, jobs[i]->stream.proc.status);
+ channel_decref(jobs[i]);
}
multiqueue_free(waiting_jobs);
+ xfree(jobs);
ui_busy_stop();
rv->lv_refcount++;
rettv->v_type = VAR_LIST;
@@ -13803,9 +13721,8 @@ static void f_rpcnotify(typval_T *argvars, typval_T *rettv, FunPtr fptr)
ADD(args, vim_to_object(tv));
}
- if (!channel_send_event((uint64_t)argvars[0].vval.v_number,
- tv_get_string(&argvars[1]),
- args)) {
+ if (!rpc_send_event((uint64_t)argvars[0].vval.v_number,
+ tv_get_string(&argvars[1]), args)) {
EMSG2(_(e_invarg2), "Channel doesn't exist");
return;
}
@@ -13870,10 +13787,8 @@ static void f_rpcrequest(typval_T *argvars, typval_T *rettv, FunPtr fptr)
Error err = ERROR_INIT;
- Object result = channel_send_call((uint64_t)argvars[0].vval.v_number,
- tv_get_string(&argvars[1]),
- args,
- &err);
+ Object result = rpc_send_call((uint64_t)argvars[0].vval.v_number,
+ tv_get_string(&argvars[1]), args, &err);
if (l_provider_call_nesting) {
current_SID = save_current_SID;
@@ -13954,10 +13869,13 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr)
// The last item of argv must be NULL
argv[i] = NULL;
- TerminalJobData *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE,
- CALLBACK_NONE, false, true, false,
- NULL);
- common_job_start(data, rettv);
+ Channel *chan = channel_job_start(argv, CALLBACK_READER_INIT,
+ CALLBACK_READER_INIT, CALLBACK_NONE,
+ false, true, false, NULL, 0, 0, NULL,
+ &rettv->vval.v_number);
+ if (chan) {
+ channel_create_event(chan, NULL);
+ }
}
// "rpcstop()" function
@@ -13977,10 +13895,16 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
// if called with a job, stop it, else closes the channel
- if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) {
+ uint64_t id = argvars[0].vval.v_number;
+ if (find_job(id, false)) {
f_jobstop(argvars, rettv, NULL);
} else {
- rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
+ const char *error;
+ rettv->vval.v_number = channel_close(argvars[0].vval.v_number,
+ kChannelPartRpc, &error);
+ if (!rettv->vval.v_number) {
+ EMSG(error);
+ }
}
}
@@ -15157,19 +15081,22 @@ static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
bool rpc = false;
+ CallbackReader on_data = CALLBACK_READER_INIT;
if (argvars[2].v_type == VAR_DICT) {
dict_T *opts = argvars[2].vval.v_dict;
rpc = tv_dict_get_number(opts, "rpc") != 0;
- }
- if (!rpc) {
- EMSG2(_(e_invarg2), "rpc option must be true");
- return;
+ if (!tv_dict_get_callback(opts, S_LEN("on_data"), &on_data.cb)) {
+ return;
+ }
+ on_data.buffered = tv_dict_get_number(opts, "data_buffered");
+ if (on_data.buffered && on_data.cb.type == kCallbackNone) {
+ on_data.self = opts;
+ }
}
const char *error = NULL;
- eval_format_source_name_line((char *)IObuff, sizeof(IObuff));
- uint64_t id = channel_connect(tcp, address, 50, (char *)IObuff, &error);
+ uint64_t id = channel_connect(tcp, address, rpc, on_data, 50, &error);
if (error) {
EMSG2(_("connection failed: %s"), error);
@@ -15549,6 +15476,39 @@ static void f_sort(typval_T *argvars, typval_T *rettv, FunPtr fptr)
do_sort_uniq(argvars, rettv, true);
}
+/// "stdioopen()" function
+static void f_stdioopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)
+{
+ if (argvars[0].v_type != VAR_DICT) {
+ EMSG(_(e_invarg));
+ return;
+ }
+
+
+ bool rpc = false;
+ CallbackReader on_stdin = CALLBACK_READER_INIT;
+ dict_T *opts = argvars[0].vval.v_dict;
+ rpc = tv_dict_get_number(opts, "rpc") != 0;
+
+ if (!tv_dict_get_callback(opts, S_LEN("on_stdin"), &on_stdin.cb)) {
+ return;
+ }
+ on_stdin.buffered = tv_dict_get_number(opts, "stdin_buffered");
+ if (on_stdin.buffered && on_stdin.cb.type == kCallbackNone) {
+ on_stdin.self = opts;
+ }
+
+ const char *error;
+ uint64_t id = channel_from_stdio(rpc, on_stdin, &error);
+ if (!id) {
+ EMSG2(e_stdiochan2, error);
+ }
+
+
+ rettv->vval.v_number = (varnumber_T)id;
+ rettv->v_type = VAR_NUMBER;
+}
+
/// "uniq({list})" function
static void f_uniq(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
@@ -16665,8 +16625,9 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)
return;
}
- Callback on_stdout = CALLBACK_NONE, on_stderr = CALLBACK_NONE,
- on_exit = CALLBACK_NONE;
+ CallbackReader on_stdout = CALLBACK_READER_INIT,
+ on_stderr = CALLBACK_READER_INIT;
+ Callback on_exit = CALLBACK_NONE;
dict_T *job_opts = NULL;
const char *cwd = ".";
if (argvars[1].v_type == VAR_DICT) {
@@ -16690,23 +16651,16 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)
}
uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin));
- TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
- true, false, false, cwd);
- data->proc.pty.width = term_width;
- data->proc.pty.height = curwin->w_height;
- data->proc.pty.term_name = xstrdup("xterm-256color");
- if (!common_job_start(data, rettv)) {
+ Channel *chan = channel_job_start(argv, on_stdout, on_stderr, on_exit,
+ true, false, false, cwd,
+ term_width, curwin->w_height,
+ xstrdup("xterm-256color"),
+ &rettv->vval.v_number);
+ if (rettv->vval.v_number <= 0) {
return;
}
- TerminalOptions topts;
- topts.data = data;
- topts.width = term_width;
- topts.height = curwin->w_height;
- topts.write_cb = term_write;
- topts.resize_cb = term_resize;
- topts.close_cb = term_close;
- int pid = data->proc.pty.process.pid;
+ int pid = chan->stream.pty.process.pid;
char buf[1024];
// format the title with the pid to conform with the term:// URI
@@ -16717,18 +16671,16 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr)
(void)setfname(curbuf, (char_u *)buf, NULL, true);
// Save the job id and pid in b:terminal_job_{id,pid}
Error err = ERROR_INIT;
+ // deprecated: use 'channel' buffer option
dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_id"),
- INTEGER_OBJ(rettv->vval.v_number), false, false, &err);
+ INTEGER_OBJ(chan->id), false, false, &err);
api_clear_error(&err);
dict_set_var(curbuf->b_vars, cstr_as_string("terminal_job_pid"),
INTEGER_OBJ(pid), false, false, &err);
api_clear_error(&err);
- Terminal *term = terminal_open(topts);
- data->term = term;
- data->refcount++;
-
- return;
+ channel_terminal_open(chan);
+ channel_create_event(chan, NULL);
}
// "test_garbagecollect_now()" function
@@ -16761,30 +16713,6 @@ bool callback_from_typval(Callback *const callback, typval_T *const arg)
return true;
}
-/// Unref/free callback
-void callback_free(Callback *const callback)
- FUNC_ATTR_NONNULL_ALL
-{
- switch (callback->type) {
- case kCallbackFuncref: {
- func_unref(callback->data.funcref);
- xfree(callback->data.funcref);
- break;
- }
- case kCallbackPartial: {
- partial_unref(callback->data.partial);
- break;
- }
- case kCallbackNone: {
- break;
- }
- default: {
- abort();
- }
- }
- callback->type = kCallbackNone;
-}
-
bool callback_call(Callback *const callback, const int argcount_in,
typval_T *const argvars_in, typval_T *const rettv)
FUNC_ATTR_NONNULL_ALL
@@ -16839,6 +16767,23 @@ static bool set_ref_in_callback(Callback *callback, int copyID,
return false;
}
+static bool set_ref_in_callback_reader(CallbackReader *reader, int copyID,
+ ht_stack_T **ht_stack,
+ list_stack_T **list_stack)
+{
+ if (set_ref_in_callback(&reader->cb, copyID, ht_stack, list_stack)) {
+ return true;
+ }
+
+ if (reader->self) {
+ typval_T tv;
+ tv.v_type = VAR_DICT;
+ tv.vval.v_dict = reader->self;
+ return set_ref_in_item(&tv, copyID, ht_stack, list_stack);
+ }
+ return false;
+}
+
static void add_timer_info(typval_T *rettv, timer_T *timer)
{
list_T *list = rettv->vval.v_list;
@@ -22403,318 +22348,54 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub,
return ret;
}
-static inline TerminalJobData *common_job_init(char **argv,
- Callback on_stdout,
- Callback on_stderr,
- Callback on_exit,
- bool pty,
- bool rpc,
- bool detach,
- const char *cwd)
-{
- TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData));
- data->stopped = false;
- data->on_stdout = on_stdout;
- data->on_stderr = on_stderr;
- data->on_exit = on_exit;
- data->events = multiqueue_new_child(main_loop.events);
- data->rpc = rpc;
- if (pty) {
- data->proc.pty = pty_process_init(&main_loop, data);
- } else {
- data->proc.uv = libuv_process_init(&main_loop, data);
- }
- Process *proc = (Process *)&data->proc;
- proc->argv = argv;
- proc->in = &data->in;
- proc->out = &data->out;
- if (!pty) {
- proc->err = &data->err;
- }
- proc->cb = eval_job_process_exit_cb;
- proc->events = data->events;
- proc->detach = detach;
- proc->cwd = cwd;
- return data;
-}
-
/// common code for getting job callbacks for jobstart, termopen and rpcstart
///
/// @return true/false on success/failure.
-static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout,
- Callback *on_stderr, Callback *on_exit)
+static inline bool common_job_callbacks(dict_T *vopts,
+ CallbackReader *on_stdout,
+ CallbackReader *on_stderr,
+ Callback *on_exit)
{
- if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), on_stdout)
- &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), on_stderr)
+ if (tv_dict_get_callback(vopts, S_LEN("on_stdout"), &on_stdout->cb)
+ &&tv_dict_get_callback(vopts, S_LEN("on_stderr"), &on_stderr->cb)
&& tv_dict_get_callback(vopts, S_LEN("on_exit"), on_exit)) {
+ on_stdout->buffered = tv_dict_get_number(vopts, "stdout_buffered");
+ on_stderr->buffered = tv_dict_get_number(vopts, "stderr_buffered");
+ if (on_stdout->buffered && on_stdout->cb.type == kCallbackNone) {
+ on_stdout->self = vopts;
+ }
+ if (on_stderr->buffered && on_stderr->cb.type == kCallbackNone) {
+ on_stderr->self = vopts;
+ }
vopts->dv_refcount++;
return true;
}
- callback_free(on_stdout);
- callback_free(on_stderr);
+ callback_reader_free(on_stdout);
+ callback_reader_free(on_stderr);
callback_free(on_exit);
return false;
}
-static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
-{
- Process *proc = (Process *)&data->proc;
- if (proc->type == kProcessTypePty && proc->detach) {
- EMSG2(_(e_invarg2), "terminal/pty job cannot be detached");
- xfree(data->proc.pty.term_name);
- shell_free_argv(proc->argv);
- free_term_job_data_event((void **)&data);
- return false;
- }
-
- data->id = next_chan_id++;
- pmap_put(uint64_t)(jobs, data->id, data);
-
- data->refcount++;
- char *cmd = xstrdup(proc->argv[0]);
- int status = process_spawn(proc);
- if (status) {
- EMSG3(_(e_jobspawn), os_strerror(status), cmd);
- xfree(cmd);
- if (proc->type == kProcessTypePty) {
- xfree(data->proc.pty.term_name);
- }
- rettv->vval.v_number = proc->status;
- term_job_data_decref(data);
- return false;
- }
- xfree(cmd);
-
- if (data->rpc) {
- eval_format_source_name_line((char *)IObuff, sizeof(IObuff));
- // RPC channel takes over the in/out streams.
- channel_from_process(proc, data->id, (char *)IObuff);
- } else {
- wstream_init(proc->in, 0);
- if (proc->out) {
- rstream_init(proc->out, 0);
- rstream_start(proc->out, on_job_stdout, data);
- }
- }
-
- if (proc->err) {
- rstream_init(proc->err, 0);
- rstream_start(proc->err, on_job_stderr, data);
- }
- rettv->vval.v_number = data->id;
- return true;
-}
-
-static inline void free_term_job_data_event(void **argv)
+static Channel *find_job(uint64_t id, bool show_error)
{
- TerminalJobData *data = argv[0];
- callback_free(&data->on_stdout);
- callback_free(&data->on_stderr);
- callback_free(&data->on_exit);
-
- multiqueue_free(data->events);
- pmap_del(uint64_t)(jobs, data->id);
- xfree(data);
-}
-
-static inline void free_term_job_data(TerminalJobData *data)
-{
- // data->queue may still be used after this function returns(process_wait), so
- // only free in the next event loop iteration
- multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data);
-}
-
-// vimscript job callbacks must be executed on Nvim main loop
-static inline void process_job_event(TerminalJobData *data, Callback *callback,
- const char *type, char *buf, size_t count,
- int status)
-{
- JobEvent event_data;
- event_data.received = NULL;
- if (buf) {
- event_data.received = tv_list_alloc();
- char *ptr = buf;
- size_t remaining = count;
- size_t off = 0;
-
- while (off < remaining) {
- // append the line
- if (ptr[off] == NL) {
- tv_list_append_string(event_data.received, ptr, off);
- size_t skip = off + 1;
- ptr += skip;
- remaining -= skip;
- off = 0;
- continue;
- }
- if (ptr[off] == NUL) {
- // Translate NUL to NL
- ptr[off] = NL;
+ Channel *data = find_channel(id);
+ if (!data || data->streamtype != kChannelStreamProc
+ || process_is_stopped(&data->stream.proc)) {
+ if (show_error) {
+ if (data && data->streamtype != kChannelStreamProc) {
+ EMSG(_(e_invchanjob));
+ } else {
+ EMSG(_(e_invchan));
}
- off++;
}
- tv_list_append_string(event_data.received, ptr, off);
- } else {
- event_data.status = status;
- }
- event_data.data = data;
- event_data.callback = callback;
- event_data.type = type;
- on_job_event(&event_data);
-}
-
-static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
- void *job, bool eof)
-{
- TerminalJobData *data = job;
- on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout");
-}
-
-static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
- void *job, bool eof)
-{
- TerminalJobData *data = job;
- on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr");
-}
-
-static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf,
- size_t count, bool eof, Callback *callback,
- const char *type)
-{
- if (eof) {
- return;
- }
-
- // stub variable, to keep reading consistent with the order of events, only
- // consider the count parameter.
- size_t r;
- char *ptr = rbuffer_read_ptr(buf, &r);
-
- // The order here matters, the terminal must receive the data first because
- // process_job_event will modify the read buffer(convert NULs into NLs)
- if (data->term) {
- terminal_receive(data->term, ptr, count);
- }
-
- rbuffer_consumed(buf, count);
- if (callback->type != kCallbackNone) {
- process_job_event(data, callback, type, ptr, count, 0);
- }
-}
-
-static void eval_job_process_exit_cb(Process *proc, int status, void *d)
-{
- TerminalJobData *data = d;
- if (data->term && !data->exited) {
- data->exited = true;
- char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN];
- snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
- terminal_close(data->term, msg);
- }
- if (data->rpc) {
- channel_process_exit(data->id, status);
- }
-
- if (data->status_ptr) {
- *data->status_ptr = status;
- }
-
- process_job_event(data, &data->on_exit, "exit", NULL, 0, status);
-
- term_job_data_decref(data);
-}
-
-static void term_write(char *buf, size_t size, void *d)
-{
- TerminalJobData *job = d;
- if (job->in.closed) {
- // If the backing stream was closed abruptly, there may be write events
- // ahead of the terminal close event. Just ignore the writes.
- ILOG("write failed: stream is closed");
- return;
- }
- WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
- wstream_write(&job->in, wbuf);
-}
-
-static void term_resize(uint16_t width, uint16_t height, void *d)
-{
- TerminalJobData *data = d;
- pty_process_resize(&data->proc.pty, width, height);
-}
-
-static inline void term_delayed_free(void **argv)
-{
- TerminalJobData *j = argv[0];
- if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) {
- multiqueue_put(j->events, term_delayed_free, 1, j);
- return;
- }
-
- terminal_destroy(j->term);
- term_job_data_decref(j);
-}
-
-static void term_close(void *d)
-{
- TerminalJobData *data = d;
- if (!data->exited) {
- data->exited = true;
- process_stop((Process *)&data->proc);
- }
- multiqueue_put(data->events, term_delayed_free, 1, data);
-}
-
-static void term_job_data_decref(TerminalJobData *data)
-{
- if (!(--data->refcount)) {
- free_term_job_data(data);
- }
-}
-
-static void on_job_event(JobEvent *ev)
-{
- if (!ev->callback) {
- return;
- }
-
- typval_T argv[4];
-
- argv[0].v_type = VAR_NUMBER;
- argv[0].v_lock = 0;
- argv[0].vval.v_number = ev->data->id;
-
- if (ev->received) {
- argv[1].v_type = VAR_LIST;
- argv[1].v_lock = 0;
- argv[1].vval.v_list = ev->received;
- argv[1].vval.v_list->lv_refcount++;
- } else {
- argv[1].v_type = VAR_NUMBER;
- argv[1].v_lock = 0;
- argv[1].vval.v_number = ev->status;
- }
-
- argv[2].v_type = VAR_STRING;
- argv[2].v_lock = 0;
- argv[2].vval.v_string = (uint8_t *)ev->type;
-
- typval_T rettv = TV_INITIAL_VALUE;
- callback_call(ev->callback, 3, argv, &rettv);
- tv_clear(&rettv);
-}
-
-static TerminalJobData *find_job(uint64_t id)
-{
- TerminalJobData *data = pmap_get(uint64_t)(jobs, id);
- if (!data || data->stopped) {
return NULL;
}
return data;
}
+
static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv)
{
if (check_restricted() || check_secure()) {
@@ -22855,4 +22536,3 @@ void ex_checkhealth(exarg_T *eap)
xfree(buf);
}
-
diff --git a/src/nvim/eval.h b/src/nvim/eval.h
index 070bc35bd5..0c0a6881f6 100644
--- a/src/nvim/eval.h
+++ b/src/nvim/eval.h
@@ -7,6 +7,9 @@
#include "nvim/eval/typval.h"
#include "nvim/profile.h"
#include "nvim/garray.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/channel.h"
#define COPYID_INC 2
#define COPYID_MASK (~0x1)
@@ -53,6 +56,7 @@ typedef enum {
VV_DYING,
VV_EXCEPTION,
VV_THROWPOINT,
+ VV_STDERR,
VV_REG,
VV_CMDBANG,
VV_INSERTMODE,
diff --git a/src/nvim/eval.lua b/src/nvim/eval.lua
index 0e359fb61c..54cbc54d78 100644
--- a/src/nvim/eval.lua
+++ b/src/nvim/eval.lua
@@ -55,6 +55,8 @@ return {
call={args={2, 3}},
ceil={args=1, func="float_op_wrapper", data="&ceil"},
changenr={},
+ chanclose={args={1, 2}},
+ chansend={args=2},
char2nr={args={1, 2}},
cindent={args=1},
clearmatches={},
@@ -173,10 +175,10 @@ return {
islocked={args=1},
id={args=1},
items={args=1},
- jobclose={args={1, 2}},
+ jobclose={args={1, 2}, func="f_chanclose"},
jobpid={args=1},
jobresize={args=3},
- jobsend={args=2},
+ jobsend={args=2, func="f_chansend"},
jobstart={args={1, 2}},
jobstop={args=1},
jobwait={args={1, 2}},
@@ -273,6 +275,7 @@ return {
sockconnect={args={2,3}},
sort={args={1, 3}},
soundfold={args=1},
+ stdioopen={args=1},
spellbadword={args={0, 1}},
spellsuggest={args={1, 3}},
split={args={1, 3}},
diff --git a/src/nvim/eval/typval.c b/src/nvim/eval/typval.c
index 262ea922ef..4bc3a85efb 100644
--- a/src/nvim/eval/typval.c
+++ b/src/nvim/eval/typval.c
@@ -374,7 +374,7 @@ void tv_list_append_dict(list_T *const list, dict_T *const dict)
/// case string is considered to be usual zero-terminated
/// string or NULL “empty” string.
void tv_list_append_string(list_T *const l, const char *const str,
- const ptrdiff_t len)
+ const ssize_t len)
FUNC_ATTR_NONNULL_ARG(1)
{
if (str == NULL) {
@@ -824,7 +824,7 @@ void tv_dict_watcher_add(dict_T *const dict, const char *const key_pattern,
/// @param[in] cb2 Second callback to check.
///
/// @return True if they are equal, false otherwise.
-bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2)
+bool tv_callback_equal(const Callback *cb1, const Callback *cb2)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT
{
if (cb1->type != cb2->type) {
@@ -843,10 +843,31 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2)
return true;
}
}
- assert(false);
+ abort();
return false;
}
+/// Unref/free callback
+void callback_free(Callback *callback)
+ FUNC_ATTR_NONNULL_ALL
+{
+ switch (callback->type) {
+ case kCallbackFuncref: {
+ func_unref(callback->data.funcref);
+ xfree(callback->data.funcref);
+ break;
+ }
+ case kCallbackPartial: {
+ partial_unref(callback->data.partial);
+ break;
+ }
+ case kCallbackNone: {
+ break;
+ }
+ }
+ callback->type = kCallbackNone;
+}
+
/// Remove watcher from a dictionary
///
/// @param dict Dictionary to remove watcher from.
diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c
index 758b35796e..c101cb1bb9 100644
--- a/src/nvim/event/libuv_process.c
+++ b/src/nvim/event/libuv_process.c
@@ -46,22 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvstdio[2].flags = UV_IGNORE;
uvproc->uv.data = proc;
- if (proc->in) {
+ if (!proc->in.closed) {
uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t,
- &proc->in->uv.pipe);
+ &proc->in.uv.pipe);
}
- if (proc->out) {
+ if (!proc->out.closed) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t,
- &proc->out->uv.pipe);
+ &proc->out.uv.pipe);
}
- if (proc->err) {
+ if (!proc->err.closed) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t,
- &proc->err->uv.pipe);
+ &proc->err.uv.pipe);
}
int status;
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 41e793500a..4eb2dd0baf 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -25,28 +25,28 @@
// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
#define KILL_TIMEOUT_MS 2000
-#define CLOSE_PROC_STREAM(proc, stream) \
- do { \
- if (proc->stream && !proc->stream->closed) { \
- stream_close(proc->stream, NULL, NULL); \
- } \
- } while (0)
-
static bool process_is_tearing_down = false;
/// @returns zero on success, or negative error code
-int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
+int process_spawn(Process *proc, bool in, bool out, bool err)
+ FUNC_ATTR_NONNULL_ALL
{
- if (proc->in) {
- uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
+ if (in) {
+ uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0);
+ } else {
+ proc->in.closed = true;
}
- if (proc->out) {
- uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
+ if (out) {
+ uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0);
+ } else {
+ proc->out.closed = true;
}
- if (proc->err) {
- uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
+ if (err) {
+ uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0);
+ } else {
+ proc->err.closed = true;
}
int status;
@@ -62,14 +62,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (status) {
- if (proc->in) {
- uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
+ if (in) {
+ uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
}
- if (proc->out) {
- uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
+ if (out) {
+ uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL);
}
- if (proc->err) {
- uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
+ if (err) {
+ uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL);
}
if (proc->type == kProcessTypeUv) {
@@ -82,30 +82,27 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return status;
}
- if (proc->in) {
- stream_init(NULL, proc->in, -1,
- STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe));
- proc->in->events = proc->events;
- proc->in->internal_data = proc;
- proc->in->internal_close_cb = on_process_stream_close;
+ if (in) {
+ stream_init(NULL, &proc->in, -1,
+ STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe));
+ proc->in.internal_data = proc;
+ proc->in.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
- if (proc->out) {
- stream_init(NULL, proc->out, -1,
- STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe));
- proc->out->events = proc->events;
- proc->out->internal_data = proc;
- proc->out->internal_close_cb = on_process_stream_close;
+ if (out) {
+ stream_init(NULL, &proc->out, -1,
+ STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe));
+ proc->out.internal_data = proc;
+ proc->out.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
- if (proc->err) {
- stream_init(NULL, proc->err, -1,
- STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe));
- proc->err->events = proc->events;
- proc->err->internal_data = proc;
- proc->err->internal_close_cb = on_process_stream_close;
+ if (err) {
+ stream_init(NULL, &proc->err, -1,
+ STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe));
+ proc->err.internal_data = proc;
+ proc->err.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
@@ -136,27 +133,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
pty_process_teardown(loop);
}
-// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
- process_close_in(proc);
- process_close_out(proc);
- process_close_err(proc);
-}
-
-void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, in);
-}
-
-void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, out);
-}
-
-void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, err);
+ stream_may_close(&proc->in);
+ stream_may_close(&proc->out);
+ stream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@@ -164,16 +145,15 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// @param process Process instance
/// @param ms Time in milliseconds to wait for the process.
/// 0 for no wait. -1 to wait until the process quits.
-/// @return Exit code of the process.
+/// @return Exit code of the process. proc->status will have the same value.
/// -1 if the timeout expired while the process is still running.
/// -2 if the user interruped the wait.
int process_wait(Process *proc, int ms, MultiQueue *events)
FUNC_ATTR_NONNULL_ARG(1)
{
- int status = -1; // default
bool interrupted = false;
if (!proc->refcount) {
- status = proc->status;
+ int status = proc->status;
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
return status;
}
@@ -209,7 +189,9 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
if (proc->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job
// resources
- status = interrupted ? -2 : proc->status;
+ if (interrupted) {
+ proc->status = -2;
+ }
decref(proc);
if (events) {
// the decref call created an exit event, process it now
@@ -219,7 +201,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
proc->refcount--;
}
- return status;
+ return proc->status;
}
/// Ask a process to terminate and eventually kill if it doesn't respond
@@ -233,8 +215,9 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
switch (proc->type) {
case kProcessTypeUv:
// Close the process's stdin. If the process doesn't close its own
- // stdout/stderr, they will be closed when it exits (voluntarily or not).
- process_close_in(proc);
+ // stdout/stderr, they will be closed when it exits(possibly due to being
+ // terminated after a timeout)
+ stream_may_close(&proc->in);
ILOG("Sending SIGTERM to pid %d", proc->pid);
uv_kill(proc->pid, SIGTERM);
break;
@@ -375,15 +358,15 @@ static void flush_stream(Process *proc, Stream *stream)
// Poll for data and process the generated events.
loop_poll_events(proc->loop, 0);
- if (proc->events) {
- multiqueue_process_events(proc->events);
+ if (stream->events) {
+ multiqueue_process_events(stream->events);
}
// Stream can be closed if it is empty.
if (num_bytes == stream->num_bytes) {
- if (stream->read_cb) {
+ if (stream->read_cb && !stream->did_eof) {
// Stream callback could miss EOF handling if a child keeps the stream
- // open.
+ // open. But only send EOF if we haven't already.
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
@@ -395,8 +378,8 @@ static void process_close_handles(void **argv)
{
Process *proc = argv[0];
- flush_stream(proc, proc->out);
- flush_stream(proc, proc->err);
+ flush_stream(proc, &proc->out);
+ flush_stream(proc, &proc->err);
process_close_streams(proc);
process_close(proc);
diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h
index 5c00e8e7ec..033ce3604b 100644
--- a/src/nvim/event/process.h
+++ b/src/nvim/event/process.h
@@ -23,13 +23,14 @@ struct process {
uint64_t stopped_time;
const char *cwd;
char **argv;
- Stream *in, *out, *err;
+ Stream in, out, err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
bool closed, detach;
MultiQueue *events;
};
+
static inline Process process_init(Loop *loop, ProcessType type, void *data)
{
return (Process) {
@@ -38,14 +39,14 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.loop = loop,
.events = NULL,
.pid = 0,
- .status = 0,
+ .status = -1,
.refcount = 0,
.stopped_time = 0,
.cwd = NULL,
.argv = NULL,
- .in = NULL,
- .out = NULL,
- .err = NULL,
+ .in = { .closed = false },
+ .out = { .closed = false },
+ .err = { .closed = false },
.cb = NULL,
.closed = false,
.internal_close_cb = NULL,
@@ -54,6 +55,11 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
};
}
+static inline bool process_is_stopped(Process *proc)
+{
+ return proc->stopped_time != 0;
+}
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.h.generated.h"
#endif
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index 2c4db08b30..e0500ba828 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -105,20 +105,20 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
Stream *stream = uvstream->data;
- if (cnt > 0) {
- stream->num_bytes += (size_t)cnt;
- }
-
if (cnt <= 0) {
- if (cnt != UV_ENOBUFS
- // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
- // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
- //
- // We don't need to do anything with the RBuffer because the next call
- // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
- // won't be called)
- && cnt != 0) {
- DLOG("closing Stream: %p: %s (%s)", stream,
+ // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
+ // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
+ //
+ // We don't need to do anything with the RBuffer because the next call
+ // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
+ // won't be called)
+ if (cnt == UV_ENOBUFS || cnt == 0) {
+ return;
+ } else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
+ // The TTY driver might signal TTY without closing the stream
+ invoke_read_cb(stream, 0, true);
+ } else {
+ DLOG("Closing Stream (%p): %s (%s)", stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
@@ -130,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
+ stream->num_bytes += nread;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
@@ -187,6 +188,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
+ stream->did_eof = eof;
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e..ba25b76ec7 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -92,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
}
}
+void stream_may_close(Stream *stream)
+{
+ if (!stream->closed) {
+ stream_close(stream, NULL, NULL);
+ }
+}
+
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index d27497e4a4..e713323f5c 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -14,10 +14,7 @@ typedef struct stream Stream;
///
/// @param stream The Stream instance
/// @param rbuffer The associated RBuffer instance
-/// @param count Number of bytes to read. This must be respected if keeping
-/// the order of events is a requirement. This is because events
-/// may be queued and only processed later when more data is copied
-/// into to the buffer, so one read may starve another.
+/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
@@ -33,6 +30,8 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream {
+ bool closed;
+ bool did_eof;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
@@ -52,7 +51,6 @@ struct stream {
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
- bool closed;
MultiQueue *events;
};
diff --git a/src/nvim/globals.h b/src/nvim/globals.h
index 1ff0f7eb89..dcb8b40973 100644
--- a/src/nvim/globals.h
+++ b/src/nvim/globals.h
@@ -1074,11 +1074,17 @@ EXTERN char_u e_invexpr2[] INIT(= N_("E15: Invalid expression: %s"));
EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range"));
EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command"));
EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory"));
-EXTERN char_u e_invjob[] INIT(= N_("E900: Invalid job id"));
+EXTERN char_u e_invchan[] INIT(= N_("E900: Invalid channel id"));
+EXTERN char_u e_invchanjob[] INIT(= N_("E900: Invalid channel id: not a job"));
EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full"));
EXTERN char_u e_jobspawn[] INIT(= N_(
- "E903: Process failed to start: %s: \"%s\""));
-EXTERN char_u e_jobnotpty[] INIT(= N_("E904: Job is not connected to a pty"));
+ "E903: Process failed to start: %s: \"%s\""));
+EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty"));
+EXTERN char_u e_stdiochan2[] INIT(= N_(
+ "E905: Couldn't open stdio channel: %s"));
+EXTERN char_u e_invstream[] INIT(= N_("E906: invalid stream for channel"));
+EXTERN char_u e_invstreamrpc[] INIT(= N_(
+ "E906: invalid stream for rpc channel, use 'rpc'"));
EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\""));
EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s"));
EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number"));
@@ -1189,9 +1195,9 @@ EXTERN char *ignoredp;
// If a msgpack-rpc channel should be started over stdin/stdout
EXTERN bool embedded_mode INIT(= false);
-
-/// next free id for a job or rpc channel
-EXTERN uint64_t next_chan_id INIT(= 1);
+// Dont try to start an user interface
+// or read/write to stdio (unless embedding)
+EXTERN bool headless_mode INIT(= false);
/// Used to track the status of external functions.
/// Currently only used for iconv().
diff --git a/src/nvim/if_cscope.c b/src/nvim/if_cscope.c
index 0f9ecdf2d7..6834e7a802 100644
--- a/src/nvim/if_cscope.c
+++ b/src/nvim/if_cscope.c
@@ -778,7 +778,6 @@ err_closing:
if (execl("/bin/sh", "sh", "-c", cmd, (char *)NULL) == -1)
PERROR(_("cs_create_connection exec failed"));
- stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
exit(127);
/* NOTREACHED */
default: /* parent. */
diff --git a/src/nvim/main.c b/src/nvim/main.c
index 93afe11f3a..0346414697 100644
--- a/src/nvim/main.c
+++ b/src/nvim/main.c
@@ -73,6 +73,9 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/handle.h"
#include "nvim/api/private/dispatch.h"
+#ifndef WIN32
+# include "nvim/os/pty_process_unix.h"
+#endif
/* Maximum number of commands from + or -c arguments. */
#define MAX_ARG_CMDS 10
@@ -103,7 +106,6 @@ typedef struct {
bool input_isatty; // stdin is a terminal
bool output_isatty; // stdout is a terminal
bool err_isatty; // stderr is a terminal
- bool headless; // Do not start the builtin UI.
int no_swap_file; // "-n" argument used
int use_debug_break_level;
int window_count; /* number of windows to use */
@@ -298,8 +300,8 @@ int main(int argc, char **argv)
assert(p_ch >= 0 && Rows >= p_ch && Rows - p_ch <= INT_MAX);
cmdline_row = (int)(Rows - p_ch);
msg_row = cmdline_row;
- screenalloc(false); /* allocate screen buffers */
- set_init_2(params.headless);
+ screenalloc(false); // allocate screen buffers
+ set_init_2(headless_mode);
TIME_MSG("inits 2");
msg_scroll = TRUE;
@@ -311,8 +313,9 @@ int main(int argc, char **argv)
/* Set the break level after the terminal is initialized. */
debug_break_level = params.use_debug_break_level;
- bool reading_input = !params.headless && (params.input_isatty
- || params.output_isatty || params.err_isatty);
+ bool reading_input = !headless_mode
+ && (params.input_isatty || params.output_isatty
+ || params.err_isatty);
if (reading_input) {
// One of the startup commands (arguments, sourced scripts or plugins) may
@@ -448,7 +451,7 @@ int main(int argc, char **argv)
wait_return(TRUE);
}
- if (!params.headless) {
+ if (!headless_mode) {
// Stop reading from input stream, the UI layer will take over now.
input_stop();
ui_builtin_start();
@@ -809,11 +812,14 @@ static void command_line_scan(mparm_T *parmp)
}
mch_exit(0);
} else if (STRICMP(argv[0] + argv_idx, "headless") == 0) {
- parmp->headless = true;
+ headless_mode = true;
} else if (STRICMP(argv[0] + argv_idx, "embed") == 0) {
embedded_mode = true;
- parmp->headless = true;
- channel_from_stdio();
+ headless_mode = true;
+ const char *err;
+ if (!channel_from_stdio(true, CALLBACK_READER_INIT, &err)) {
+ abort();
+ }
} else if (STRNICMP(argv[0] + argv_idx, "literal", 7) == 0) {
#if !defined(UNIX)
parmp->literal = TRUE;
@@ -1216,7 +1222,6 @@ static void init_params(mparm_T *paramp, int argc, char **argv)
memset(paramp, 0, sizeof(*paramp));
paramp->argc = argc;
paramp->argv = argv;
- paramp->headless = false;
paramp->want_full_screen = true;
paramp->use_debug_break_level = -1;
paramp->window_count = -1;
@@ -1245,6 +1250,14 @@ static void check_and_set_isatty(mparm_T *paramp)
stdout_isatty
= paramp->output_isatty = os_isatty(fileno(stdout));
paramp->err_isatty = os_isatty(fileno(stderr));
+ int tty_fd = paramp->input_isatty
+ ? OS_STDIN_FILENO
+ : (paramp->output_isatty
+ ? OS_STDOUT_FILENO
+ : (paramp->err_isatty ? OS_STDERR_FILENO : -1));
+#ifndef WIN32
+ pty_process_save_termios(tty_fd);
+#endif
TIME_MSG("window checked");
}
@@ -1387,7 +1400,7 @@ static void handle_tag(char_u *tagname)
// When starting in Ex mode and commands come from a file, set Silent mode.
static void check_tty(mparm_T *parmp)
{
- if (parmp->headless) {
+ if (headless_mode) {
return;
}
diff --git a/src/nvim/misc1.c b/src/nvim/misc1.c
index 137de84953..f7ee2950ef 100644
--- a/src/nvim/misc1.c
+++ b/src/nvim/misc1.c
@@ -2622,7 +2622,10 @@ void preserve_exit(void)
// Prevent repeated calls into this method.
if (really_exiting) {
- stream_set_blocking(input_global_fd(), true); //normalize stream (#2598)
+ if (input_global_fd() >= 0) {
+ // normalize stream (#2598)
+ stream_set_blocking(input_global_fd(), true);
+ }
exit(2);
}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 5efdb9a194..32781cf4d9 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -11,8 +11,8 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
+#include "nvim/channel.h"
#include "nvim/msgpack_rpc/channel.h"
-#include "nvim/msgpack_rpc/server.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@@ -29,58 +29,14 @@
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
-#include "nvim/path.h"
#include "nvim/lib/kvec.h"
#include "nvim/os/input.h"
-#define CHANNEL_BUFFER_SIZE 0xffff
-
#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
#define log_client_msg(...)
#define log_server_msg(...)
#endif
-typedef enum {
- kChannelTypeSocket,
- kChannelTypeProc,
- kChannelTypeStdio,
- kChannelTypeInternal
-} ChannelType;
-
-typedef struct {
- uint64_t request_id;
- bool returned, errored;
- Object result;
-} ChannelCallFrame;
-
-typedef struct {
- uint64_t id;
- size_t refcount;
- PMap(cstr_t) *subscribed_events;
- bool closed;
- ChannelType type;
- msgpack_unpacker *unpacker;
- union {
- Stream stream; // bidirectional (socket)
- Process *proc;
- struct {
- Stream in;
- Stream out;
- } std;
- } data;
- uint64_t next_request_id;
- kvec_t(ChannelCallFrame *) call_stack;
- MultiQueue *events;
-} Channel;
-
-typedef struct {
- Channel *channel;
- MsgpackRpcRequestHandler handler;
- Array args;
- uint64_t request_id;
-} RequestEvent;
-
-static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@@ -88,102 +44,44 @@ static msgpack_sbuffer out_buffer;
# include "msgpack_rpc/channel.c.generated.h"
#endif
-/// Initializes the module
-void channel_init(void)
+void rpc_init(void)
{
ch_before_blocking_events = multiqueue_new_child(main_loop.events);
- channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
- remote_ui_init();
}
-/// Teardown the module
-void channel_teardown(void)
-{
- if (!channels) {
- return;
- }
-
- Channel *channel;
-
- map_foreach_value(channels, channel, {
- close_channel(channel);
- });
-}
-/// Creates an API channel by starting a process and connecting to its
-/// stdin/stdout. stderr is handled by the job infrastructure.
-///
-/// @param proc process object
-/// @param id (optional) channel id
-/// @param source description of source function, rplugin name, TCP addr, etc
-///
-/// @return Channel id (> 0), on success. 0, on error.
-uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
+void rpc_start(Channel *channel)
{
- Channel *channel = register_channel(kChannelTypeProc, id, proc->events,
- source);
- incref(channel); // process channels are only closed by the exit_cb
- channel->data.proc = proc;
+ channel_incref(channel);
+ channel->is_rpc = true;
+ RpcState *rpc = &channel->rpc;
+ rpc->closed = false;
+ rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ rpc->subscribed_events = pmap_new(cstr_t)();
+ rpc->next_request_id = 1;
+ kv_init(rpc->call_stack);
- wstream_init(proc->in, 0);
- rstream_init(proc->out, 0);
- rstream_start(proc->out, receive_msgpack, channel);
-
- DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
- proc->out);
+ if (channel->streamtype != kChannelStreamInternal) {
+ Stream *out = channel_outstream(channel);
+#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
+ Stream *in = channel_instream(channel);
+ DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out);
+#endif
- return channel->id;
+ rstream_start(out, receive_msgpack, channel);
+ }
}
-/// Creates an API channel from a tcp/pipe socket connection
-///
-/// @param watcher The SocketWatcher ready to accept the connection
-void channel_from_connection(SocketWatcher *watcher)
-{
- Channel *channel = register_channel(kChannelTypeSocket, 0, NULL,
- watcher->addr);
- socket_watcher_accept(watcher, &channel->data.stream);
- incref(channel); // close channel only after the stream is closed
- channel->data.stream.internal_close_cb = close_cb;
- channel->data.stream.internal_data = channel;
- wstream_init(&channel->data.stream, 0);
- rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, receive_msgpack, channel);
-
- DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
- &channel->data.stream);
-}
-/// @param source description of source function, rplugin name, TCP addr, etc
-uint64_t channel_connect(bool tcp, const char *address, int timeout,
- char *source, const char **error)
+static Channel *find_rpc_channel(uint64_t id)
{
- if (!tcp) {
- char *path = fix_fname(address);
- if (server_owns_pipe_address(path)) {
- // avoid deadlock
- xfree(path);
- return channel_create_internal();
- }
- xfree(path);
- }
-
- Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source);
- if (!socket_connect(&main_loop, &channel->data.stream,
- tcp, address, timeout, error)) {
- decref(channel);
- return 0;
+ Channel *chan = find_channel(id);
+ if (!chan || !chan->is_rpc || chan->rpc.closed) {
+ return NULL;
}
-
- incref(channel); // close channel only after the stream is closed
- channel->data.stream.internal_close_cb = close_cb;
- channel->data.stream.internal_data = channel;
- wstream_init(&channel->data.stream, 0);
- rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, receive_msgpack, channel);
- return channel->id;
+ return chan;
}
/// Publishes an event to a channel.
@@ -192,12 +90,11 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout,
/// @param name Event name (application-defined)
/// @param args Array of event arguments
/// @return True if the event was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, const char *name, Array args)
+bool rpc_send_event(uint64_t id, const char *name, Array args)
{
Channel *channel = NULL;
- if (id && (!(channel = pmap_get(uint64_t)(channels, id))
- || channel->closed)) {
+ if (id && (!(channel = find_rpc_channel(id)))) {
api_free_array(args);
return false;
}
@@ -218,29 +115,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
/// @param args Array with method arguments
/// @param[out] error True if the return value is an error
/// @return Whatever the remote method returned
-Object channel_send_call(uint64_t id,
- const char *method_name,
- Array args,
- Error *err)
+Object rpc_send_call(uint64_t id,
+ const char *method_name,
+ Array args,
+ Error *err)
{
Channel *channel = NULL;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
+ if (!(channel = find_rpc_channel(id))) {
api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
api_free_array(args);
return NIL;
}
- incref(channel);
- uint64_t request_id = channel->next_request_id++;
+ channel_incref(channel);
+ RpcState *rpc = &channel->rpc;
+ uint64_t request_id = rpc->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, method_name, args);
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
- kv_push(channel->call_stack, &frame);
+ kv_push(rpc->call_stack, &frame);
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
- (void)kv_pop(channel->call_stack);
+ (void)kv_pop(rpc->call_stack);
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
@@ -265,7 +163,7 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}
- decref(channel);
+ channel_decref(channel);
return frame.errored ? NIL : frame.result;
}
@@ -274,11 +172,11 @@ Object channel_send_call(uint64_t id,
///
/// @param id The channel id
/// @param event The event type string
-void channel_subscribe(uint64_t id, char *event)
+void rpc_subscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
+ if (!(channel = find_rpc_channel(id))) {
abort();
}
@@ -289,81 +187,32 @@ void channel_subscribe(uint64_t id, char *event)
pmap_put(cstr_t)(event_strings, event_string, event_string);
}
- pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
+ pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
}
/// Unsubscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
-void channel_unsubscribe(uint64_t id, char *event)
+void rpc_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
+ if (!(channel = find_rpc_channel(id))) {
abort();
}
unsubscribe(channel, event);
}
-/// Closes a channel
-///
-/// @param id The channel id
-/// @return true if successful, false otherwise
-bool channel_close(uint64_t id)
-{
- Channel *channel;
-
- if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
- return false;
- }
-
- close_channel(channel);
- return true;
-}
-
-/// Creates an API channel from stdin/stdout. Used to embed Nvim.
-void channel_from_stdio(void)
-{
- Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL);
- incref(channel); // stdio channels are only closed on exit
- // read stream
- rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.std.in, receive_msgpack, channel);
- // write stream
- wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
-
- DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
- &channel->data.std.in, &channel->data.std.out);
-}
-
-/// Creates a loopback channel. This is used to avoid deadlock
-/// when an instance connects to its own named pipe.
-uint64_t channel_create_internal(void)
-{
- Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL);
- incref(channel); // internal channel lives until process exit
- return channel->id;
-}
-
-void channel_process_exit(uint64_t id, int status)
-{
- Channel *channel = pmap_get(uint64_t)(channels, id);
-
- channel->closed = true;
- decref(channel);
-}
-
-// rstream.c:read_event() invokes this as stream->read_cb().
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
void *data, bool eof)
{
Channel *channel = data;
- incref(channel);
+ channel_incref(channel);
if (eof) {
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
@@ -371,30 +220,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
- if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
- || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
- char buf[256];
- snprintf(buf, sizeof(buf),
- "ch %" PRIu64 ": stream closed unexpectedly. "
- "closing channel",
- channel->id);
- call_set_error(channel, buf, WARN_LOG_LEVEL);
- goto end;
- }
-
size_t count = rbuffer_size(rbuf);
- DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
+ DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
channel->id, count, stream);
// Feed the unpacker with data
- msgpack_unpacker_reserve_buffer(channel->unpacker, count);
- rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
- msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+ msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
+ rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
+ msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
parse_msgpack(channel);
end:
- decref(channel);
+ channel_decref(channel);
}
static void parse_msgpack(Channel *channel)
@@ -404,8 +242,8 @@ static void parse_msgpack(Channel *channel)
msgpack_unpack_return result;
// Deserialize everything we can.
- while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
- MSGPACK_UNPACK_SUCCESS) {
+ while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
+ MSGPACK_UNPACK_SUCCESS) {
bool is_response = is_rpc_response(&unpacked.data);
log_client_msg(channel->id, !is_response, unpacked.data);
@@ -431,7 +269,7 @@ static void parse_msgpack(Channel *channel)
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
mch_errmsg(e_outofmem);
mch_errmsg("\n");
- decref(channel);
+ channel_decref(channel);
preserve_exit();
}
@@ -496,7 +334,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
evdata->handler = handler;
evdata->args = args;
evdata->request_id = request_id;
- incref(channel);
+ channel_incref(channel);
if (handler.async) {
bool is_get_mode = handler.fn == handle_nvim_get_mode;
@@ -534,66 +372,30 @@ static void on_request_event(void **argv)
api_free_object(result);
}
api_free_array(args);
- decref(channel);
+ channel_decref(channel);
xfree(e);
api_clear_error(&error);
}
-/// Returns the Stream that a Channel writes to.
-static Stream *chan_wstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->in;
- case kChannelTypeStdio:
- return &chan->data.std.out;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-/// Returns the Stream that a Channel reads from.
-static Stream *chan_rstream(Channel *chan)
-{
- switch (chan->type) {
- case kChannelTypeSocket:
- return &chan->data.stream;
- case kChannelTypeProc:
- return chan->data.proc->out;
- case kChannelTypeStdio:
- return &chan->data.std.in;
- case kChannelTypeInternal:
- return NULL;
- }
- abort();
-}
-
-
static bool channel_write(Channel *channel, WBuffer *buffer)
{
- bool success = false;
+ bool success;
- if (channel->closed) {
+ if (channel->rpc.closed) {
wstream_release_wbuffer(buffer);
return false;
}
- switch (channel->type) {
- case kChannelTypeSocket:
- case kChannelTypeProc:
- case kChannelTypeStdio:
- success = wstream_write(chan_wstream(channel), buffer);
- break;
- case kChannelTypeInternal:
- incref(channel);
- CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
- success = true;
- break;
+ if (channel->streamtype == kChannelStreamInternal) {
+ channel_incref(channel);
+ CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
+ success = true;
+ } else {
+ Stream *in = channel_instream(channel);
+ success = wstream_write(in, buffer);
}
+
if (!success) {
// If the write failed for any reason, close the channel
char buf[256];
@@ -613,14 +415,14 @@ static void internal_read_event(void **argv)
Channel *channel = argv[0];
WBuffer *buffer = argv[1];
- msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
- memcpy(msgpack_unpacker_buffer(channel->unpacker),
+ msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
+ memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
buffer->data, buffer->size);
- msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
+ msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
parse_msgpack(channel);
- decref(channel);
+ channel_decref(channel);
wstream_release_wbuffer(buffer);
}
@@ -669,7 +471,8 @@ static void broadcast_event(const char *name, Array args)
Channel *channel;
map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
+ if (channel->is_rpc
+ && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
kv_push(subscribed, channel);
}
});
@@ -699,10 +502,11 @@ end:
static void unsubscribe(Channel *channel, char *event)
{
char *event_string = pmap_get(cstr_t)(event_strings, event);
- pmap_del(cstr_t)(channel->subscribed_events, event_string);
+ pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
+ if (channel->is_rpc
+ && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
return;
}
});
@@ -712,98 +516,43 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/process and free the channel resources.
-static void close_channel(Channel *channel)
+
+/// Mark rpc state as closed, and release its reference to the channel.
+/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
+void rpc_close(Channel *channel)
{
- if (channel->closed) {
+ if (channel->rpc.closed) {
return;
}
- channel->closed = true;
+ channel->rpc.closed = true;
+ channel_decref(channel);
- switch (channel->type) {
- case kChannelTypeSocket:
- stream_close(&channel->data.stream, NULL, NULL);
- break;
- case kChannelTypeProc:
- // Only close the rpc channel part,
- // there could be an error message on the stderr stream
- process_close_in(channel->data.proc);
- process_close_out(channel->data.proc);
- break;
- case kChannelTypeStdio:
- stream_close(&channel->data.std.in, NULL, NULL);
- stream_close(&channel->data.std.out, NULL, NULL);
- multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
- return;
- case kChannelTypeInternal:
- // nothing to free.
- break;
+ if (channel->streamtype == kChannelStreamStdio) {
+ multiqueue_put(main_loop.fast_events, exit_event, 0);
}
-
- decref(channel);
}
static void exit_event(void **argv)
{
- decref(argv[0]);
-
if (!exiting) {
mch_exit(0);
}
}
-static void free_channel(Channel *channel)
+void rpc_free(Channel *channel)
{
remote_ui_disconnect(channel->id);
- pmap_del(uint64_t)(channels, channel->id);
- msgpack_unpacker_free(channel->unpacker);
+ msgpack_unpacker_free(channel->rpc.unpacker);
// Unsubscribe from all events
char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
+ map_foreach_value(channel->rpc.subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
- if (channel->type != kChannelTypeProc) {
- multiqueue_free(channel->events);
- }
- xfree(channel);
-}
-
-static void close_cb(Stream *stream, void *data)
-{
- decref(data);
-}
-
-/// @param source description of source function, rplugin name, TCP addr, etc
-static Channel *register_channel(ChannelType type, uint64_t id,
- MultiQueue *events, char *source)
-{
- // Jobs and channels share the same id namespace.
- assert(id == 0 || !pmap_get(uint64_t)(channels, id));
- Channel *rv = xmalloc(sizeof(Channel));
- rv->events = events ? events : multiqueue_new_child(main_loop.events);
- rv->type = type;
- rv->refcount = 1;
- rv->closed = false;
- rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->id = id > 0 ? id : next_chan_id++;
- rv->subscribed_events = pmap_new(cstr_t)();
- rv->next_request_id = 1;
- kv_init(rv->call_stack);
- pmap_put(uint64_t)(channels, rv->id, rv);
-
- ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
- (type == kChannelTypeProc ? "proc"
- : (type == kChannelTypeSocket ? "socket"
- : (type == kChannelTypeStdio ? "stdio"
- : (type == kChannelTypeInternal ? "internal" : "?")))),
- (source ? source : "?"));
-
- return rv;
+ pmap_free(cstr_t)(channel->rpc.subscribed_events);
+ kv_destroy(channel->rpc.call_stack);
}
static bool is_rpc_response(msgpack_object *obj)
@@ -818,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj)
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
{
uint64_t response_id = obj->via.array.ptr[1].via.u64;
+ if (kv_size(channel->rpc.call_stack) == 0) {
+ return false;
+ }
+
// Must be equal to the frame at the stack's bottom
- return kv_size(channel->call_stack) && response_id
- == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id;
+ ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
+ return response_id == frame->request_id;
}
static void complete_call(msgpack_object *obj, Channel *channel)
{
- ChannelCallFrame *frame = kv_A(channel->call_stack,
- kv_size(channel->call_stack) - 1);
+ ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
frame->returned = true;
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
@@ -840,15 +592,15 @@ static void complete_call(msgpack_object *obj, Channel *channel)
static void call_set_error(Channel *channel, char *msg, int loglevel)
{
LOG(loglevel, "RPC: %s", msg);
- for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
- ChannelCallFrame *frame = kv_A(channel->call_stack, i);
+ for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
+ ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
frame->returned = true;
frame->errored = true;
api_free_object(frame->result);
frame->result = STRING_OBJ(cstr_to_string(msg));
}
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
}
static WBuffer *serialize_request(uint64_t channel_id,
@@ -890,18 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}
-static void incref(Channel *channel)
-{
- channel->refcount++;
-}
-
-static void decref(Channel *channel)
-{
- if (!(--channel->refcount)) {
- free_channel(channel);
- }
-}
-
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
#define REQ "[request] "
#define RES "[response] "
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
index f8fe6f129b..9ff5abdc5f 100644
--- a/src/nvim/msgpack_rpc/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -8,6 +8,7 @@
#include "nvim/event/socket.h"
#include "nvim/event/process.h"
#include "nvim/vim.h"
+#include "nvim/channel.h"
#define METHOD_MAXLEN 512
@@ -16,6 +17,7 @@
/// of os_inchar(), so they are processed "just-in-time".
MultiQueue *ch_before_blocking_events;
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/channel.h.generated.h"
#endif
diff --git a/src/nvim/msgpack_rpc/channel_defs.h b/src/nvim/msgpack_rpc/channel_defs.h
new file mode 100644
index 0000000000..6d8362e8b7
--- /dev/null
+++ b/src/nvim/msgpack_rpc/channel_defs.h
@@ -0,0 +1,36 @@
+#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
+#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
+
+#include <stdbool.h>
+#include <uv.h>
+#include <msgpack.h>
+
+#include "nvim/api/private/defs.h"
+#include "nvim/event/socket.h"
+#include "nvim/event/process.h"
+#include "nvim/vim.h"
+
+typedef struct Channel Channel;
+
+typedef struct {
+ uint64_t request_id;
+ bool returned, errored;
+ Object result;
+} ChannelCallFrame;
+
+typedef struct {
+ Channel *channel;
+ MsgpackRpcRequestHandler handler;
+ Array args;
+ uint64_t request_id;
+} RequestEvent;
+
+typedef struct {
+ PMap(cstr_t) *subscribed_events;
+ bool closed;
+ msgpack_unpacker *unpacker;
+ uint64_t next_request_id;
+ kvec_t(ChannelCallFrame *) call_stack;
+} RpcState;
+
+#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
diff --git a/src/nvim/option.c b/src/nvim/option.c
index f6f334f432..65ab7a54a6 100644
--- a/src/nvim/option.c
+++ b/src/nvim/option.c
@@ -115,6 +115,7 @@ static int p_bomb;
static char_u *p_bh;
static char_u *p_bt;
static int p_bl;
+static long p_channel;
static int p_ci;
static int p_cin;
static char_u *p_cink;
@@ -4193,6 +4194,9 @@ static char *set_num_option(int opt_idx, char_u *varp, long value,
curbuf->b_p_imsearch = B_IMODE_NONE;
}
p_imsearch = curbuf->b_p_imsearch;
+ } else if (pp == &p_channel || pp == &curbuf->b_p_channel) {
+ errmsg = e_invarg;
+ *pp = old_value;
}
/* if 'titlelen' has changed, redraw the title */
else if (pp == &p_titlelen) {
@@ -5472,6 +5476,7 @@ static char_u *get_varp(vimoption_T *p)
case PV_BH: return (char_u *)&(curbuf->b_p_bh);
case PV_BT: return (char_u *)&(curbuf->b_p_bt);
case PV_BL: return (char_u *)&(curbuf->b_p_bl);
+ case PV_CHANNEL:return (char_u *)&(curbuf->b_p_channel);
case PV_CI: return (char_u *)&(curbuf->b_p_ci);
case PV_CIN: return (char_u *)&(curbuf->b_p_cin);
case PV_CINK: return (char_u *)&(curbuf->b_p_cink);
@@ -5773,6 +5778,7 @@ void buf_copy_options(buf_T *buf, int flags)
buf->b_p_nf = vim_strsave(p_nf);
buf->b_p_mps = vim_strsave(p_mps);
buf->b_p_si = p_si;
+ buf->b_p_channel = 0;
buf->b_p_ci = p_ci;
buf->b_p_cin = p_cin;
buf->b_p_cink = vim_strsave(p_cink);
diff --git a/src/nvim/option_defs.h b/src/nvim/option_defs.h
index 1f62490ab9..b16f222705 100644
--- a/src/nvim/option_defs.h
+++ b/src/nvim/option_defs.h
@@ -695,6 +695,7 @@ enum {
, BV_BIN
, BV_BL
, BV_BOMB
+ , BV_CHANNEL
, BV_CI
, BV_CIN
, BV_CINK
diff --git a/src/nvim/options.lua b/src/nvim/options.lua
index cb3e5ad856..dd28a765fd 100644
--- a/src/nvim/options.lua
+++ b/src/nvim/options.lua
@@ -295,6 +295,14 @@ return {
defaults={if_true={vi="", vim=macros('CTRL_F_STR')}}
},
{
+ full_name='channel',
+ type='number', scope={'buffer'},
+ no_mkrc=true,
+ nodefault=true,
+ varname='p_channel',
+ defaults={if_true={vi=0}}
+ },
+ {
full_name='charconvert', abbreviation='ccv',
type='string', scope={'global'},
secure=true,
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index 31e06ce404..7d6f2abd7f 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -37,7 +37,7 @@ typedef enum {
static Stream read_stream = {.closed = true};
static RBuffer *input_buffer = NULL;
static bool input_eof = false;
-static int global_fd = 0;
+static int global_fd = -1;
static int events_enabled = 0;
static bool blocking = false;
diff --git a/src/nvim/os/pty_process_unix.c b/src/nvim/os/pty_process_unix.c
index ee3ab96a83..53301e4b53 100644
--- a/src/nvim/os/pty_process_unix.c
+++ b/src/nvim/os/pty_process_unix.c
@@ -36,23 +36,36 @@
# include "os/pty_process_unix.c.generated.h"
#endif
+/// termios saved at startup (for TUI) or initialized by pty_process_spawn().
+static struct termios termios_default;
+
+/// Saves the termios properties associated with `tty_fd`.
+///
+/// @param tty_fd TTY file descriptor, or -1 if not in a terminal.
+void pty_process_save_termios(int tty_fd)
+{
+ if (tty_fd == -1 || tcgetattr(tty_fd, &termios_default) != 0) {
+ return;
+ }
+}
+
/// @returns zero on success, or negative error code
int pty_process_spawn(PtyProcess *ptyproc)
FUNC_ATTR_NONNULL_ALL
{
- static struct termios termios;
- if (!termios.c_cflag) {
- init_termios(&termios);
+ if (!termios_default.c_cflag) {
+ // TODO(jkeyes): We could pass NULL to forkpty() instead ...
+ init_termios(&termios_default);
}
int status = 0; // zero or negative error code (libuv convention)
Process *proc = (Process *)ptyproc;
- assert(!proc->err);
+ assert(proc->err.closed);
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 };
uv_disable_stdio_inheritance();
int master;
- int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
+ int pid = forkpty(&master, NULL, &termios_default, &ptyproc->winsize);
if (pid < 0) {
status = -errno;
@@ -83,12 +96,12 @@ int pty_process_spawn(PtyProcess *ptyproc)
goto error;
}
- if (proc->in
- && (status = set_duplicating_descriptor(master, &proc->in->uv.pipe))) {
+ if (!proc->in.closed
+ && (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) {
goto error;
}
- if (proc->out
- && (status = set_duplicating_descriptor(master, &proc->out->uv.pipe))) {
+ if (!proc->out.closed
+ && (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) {
goto error;
}
diff --git a/src/nvim/os/pty_process_win.c b/src/nvim/os/pty_process_win.c
index ef8a699c56..3c4839a076 100644
--- a/src/nvim/os/pty_process_win.c
+++ b/src/nvim/os/pty_process_win.c
@@ -44,7 +44,7 @@ int pty_process_spawn(PtyProcess *ptyproc)
wchar_t *cwd = NULL;
const char *emsg = NULL;
- assert(!proc->err);
+ assert(proc->err.closed);
cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err);
if (cfg == NULL) {
@@ -71,20 +71,20 @@ int pty_process_spawn(PtyProcess *ptyproc)
goto cleanup;
}
- if (proc->in != NULL) {
+ if (!proc->in.closed) {
in_req = xmalloc(sizeof(uv_connect_t));
uv_pipe_connect(
in_req,
- &proc->in->uv.pipe,
+ &proc->in.uv.pipe,
in_name,
pty_process_connect_cb);
}
- if (proc->out != NULL) {
+ if (!proc->out.closed) {
out_req = xmalloc(sizeof(uv_connect_t));
uv_pipe_connect(
out_req,
- &proc->out->uv.pipe,
+ &proc->out.uv.pipe,
out_name,
pty_process_connect_cb);
}
@@ -228,7 +228,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer)
PtyProcess *ptyproc = wait_eof_timer->data;
Process *proc = (Process *)ptyproc;
- if (!proc->out || !uv_is_readable(proc->out->uvstream)) {
+ if (proc->out.closed || !uv_is_readable(proc->out.uvstream)) {
uv_timer_stop(&ptyproc->wait_eof_timer);
pty_process_finish2(ptyproc);
}
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 32e9a70e57..e32c6e05d2 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -207,16 +207,12 @@ static int do_os_system(char **argv,
char prog[MAXPATHL];
xstrlcpy(prog, argv[0], MAXPATHL);
- Stream in, out, err;
LibuvProcess uvproc = libuv_process_init(&main_loop, &buf);
Process *proc = &uvproc.process;
MultiQueue *events = multiqueue_new_child(main_loop.events);
proc->events = events;
proc->argv = argv;
- proc->in = input != NULL ? &in : NULL;
- proc->out = &out;
- proc->err = &err;
- int status = process_spawn(proc);
+ int status = process_spawn(proc, input != NULL, true, true);
if (status) {
loop_poll_events(&main_loop, 0);
// Failed, probably 'shell' is not executable.
@@ -231,32 +227,29 @@ static int do_os_system(char **argv,
return -1;
}
- // We want to deal with stream events as fast a possible while queueing
- // process events, so reset everything to NULL. It prevents closing the
+ // Note: unlike process events, stream events are not queued, as we want to
+ // deal with stream events as fast a possible. It prevents closing the
// streams while there's still data in the OS buffer (due to the process
// exiting before all data is read).
if (input != NULL) {
- proc->in->events = NULL;
- wstream_init(proc->in, 0);
+ wstream_init(&proc->in, 0);
}
- proc->out->events = NULL;
- rstream_init(proc->out, 0);
- rstream_start(proc->out, data_cb, &buf);
- proc->err->events = NULL;
- rstream_init(proc->err, 0);
- rstream_start(proc->err, data_cb, &buf);
+ rstream_init(&proc->out, 0);
+ rstream_start(&proc->out, data_cb, &buf);
+ rstream_init(&proc->err, 0);
+ rstream_start(&proc->err, data_cb, &buf);
// write the input, if any
if (input) {
WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL);
- if (!wstream_write(&in, input_buffer)) {
+ if (!wstream_write(&proc->in, input_buffer)) {
// couldn't write, stop the process and tell the user about it
process_stop(proc);
return -1;
}
// close the input stream after everything is written
- wstream_set_write_cb(&in, shell_write_cb, NULL);
+ wstream_set_write_cb(&proc->in, shell_write_cb, NULL);
}
// Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the
@@ -684,10 +677,6 @@ static void shell_write_cb(Stream *stream, void *data, int status)
msg_schedule_emsgf(_("E5677: Error writing input to shell-command: %s"),
uv_err_name(status));
}
- if (stream->closed) { // Process may have exited before this write.
- WLOG("stream was already closed");
- return;
- }
stream_close(stream, NULL, NULL);
}
diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c
index 692bcc97f4..d7ba675b68 100644
--- a/src/nvim/os_unix.c
+++ b/src/nvim/os_unix.c
@@ -144,7 +144,9 @@ void mch_exit(int r) FUNC_ATTR_NORETURN
if (!event_teardown() && r == 0) {
r = 1; // Exit with error if main_loop did not teardown gracefully.
}
- stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
+ if (input_global_fd() >= 0) {
+ stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
+ }
#ifdef EXITFREE
free_all_mem();
diff --git a/src/nvim/rbuffer.c b/src/nvim/rbuffer.c
index 18d453cbe9..df9394fbb2 100644
--- a/src/nvim/rbuffer.c
+++ b/src/nvim/rbuffer.c
@@ -121,7 +121,7 @@ char *rbuffer_read_ptr(RBuffer *buf, size_t *read_count) FUNC_ATTR_NONNULL_ALL
{
if (!buf->size) {
*read_count = 0;
- return NULL;
+ return buf->read_ptr;
}
if (buf->read_ptr < buf->write_ptr) {
diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c
index 1dac9c69bd..dfa758f41e 100644
--- a/src/nvim/terminal.c
+++ b/src/nvim/terminal.c
@@ -1094,11 +1094,12 @@ static void refresh_terminal(Terminal *term)
// Calls refresh_terminal() on all invalidated_terminals.
static void refresh_timer_cb(TimeWatcher *watcher, void *data)
{
+ refresh_pending = false;
if (exiting // Cannot redraw (requires event loop) during teardown/exit.
// WM_LIST (^D) is not redrawn, unlike the normal wildmenu. So we must
// skip redraws to keep it visible.
|| wild_menu_showing == WM_LIST) {
- goto end;
+ return;
}
Terminal *term;
void *stub; (void)(stub);
@@ -1113,8 +1114,6 @@ static void refresh_timer_cb(TimeWatcher *watcher, void *data)
if (any_visible) {
redraw(true);
}
-end:
- refresh_pending = false;
}
static void refresh_size(Terminal *term, buf_T *buf)
diff --git a/test/functional/api/server_requests_spec.lua b/test/functional/api/server_requests_spec.lua
index a2a198ca83..37ac532d18 100644
--- a/test/functional/api/server_requests_spec.lua
+++ b/test/functional/api/server_requests_spec.lua
@@ -262,6 +262,7 @@ describe('server -> client', function()
eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message())
funcs.rpcrequest(jobid, "exit")
+ eq({'notification', 'stderr', {0, {''}}}, next_message())
eq({'notification', 'exit', {0, 0}}, next_message())
end)
end)
diff --git a/test/functional/core/channels_spec.lua b/test/functional/core/channels_spec.lua
new file mode 100644
index 0000000000..7a2f157df3
--- /dev/null
+++ b/test/functional/core/channels_spec.lua
@@ -0,0 +1,266 @@
+
+local helpers = require('test.functional.helpers')(after_each)
+local clear, eq, eval, next_msg, ok, source = helpers.clear, helpers.eq,
+ helpers.eval, helpers.next_message, helpers.ok, helpers.source
+local command, funcs, meths = helpers.command, helpers.funcs, helpers.meths
+local sleep = helpers.sleep
+local spawn, nvim_argv = helpers.spawn, helpers.nvim_argv
+local set_session = helpers.set_session
+local nvim_prog = helpers.nvim_prog
+local retry = helpers.retry
+local expect_twostreams = helpers.expect_twostreams
+
+describe('channels', function()
+ local init = [[
+ function! Normalize(data) abort
+ " Windows: remove ^M
+ return type([]) == type(a:data)
+ \ ? map(a:data, 'substitute(v:val, "\r", "", "g")')
+ \ : a:data
+ endfunction
+ function! OnEvent(id, data, event) dict
+ call rpcnotify(1, a:event, a:id, a:data)
+ endfunction
+ ]]
+ before_each(function()
+ clear()
+ source(init)
+ end)
+
+ it('can connect to socket', function()
+ local server = spawn(nvim_argv)
+ set_session(server)
+ local address = funcs.serverlist()[1]
+ local client = spawn(nvim_argv)
+ set_session(client, true)
+ source(init)
+
+ meths.set_var('address', address)
+ command("let g:id = sockconnect('pipe', address, {'on_data':'OnEvent'})")
+ local id = eval("g:id")
+ ok(id > 0)
+
+ command("call chansend(g:id, msgpackdump([[2,'nvim_set_var',['code',23]]]))")
+ set_session(server, true)
+ retry(nil, 1000, function()
+ eq(23, meths.get_var('code'))
+ end)
+ set_session(client, true)
+
+ command("call chansend(g:id, msgpackdump([[0,0,'nvim_eval',['2+3']]]))")
+
+
+ local res = eval("msgpackdump([[1,0,v:null,5]])")
+ eq({"\148\001\n\192\005"}, res)
+ eq({'notification', 'data', {id, res}}, next_msg())
+ command("call chansend(g:id, msgpackdump([[2,'nvim_command',['quit']]]))")
+ eq({'notification', 'data', {id, {''}}}, next_msg())
+ end)
+
+ it('can use stdio channel', function()
+ source([[
+ let g:job_opts = {
+ \ 'on_stdout': function('OnEvent'),
+ \ 'on_stderr': function('OnEvent'),
+ \ 'on_exit': function('OnEvent'),
+ \ }
+ ]])
+ meths.set_var("nvim_prog", nvim_prog)
+ meths.set_var("code", [[
+ function! OnEvent(id, data, event) dict
+ let text = string([a:id, a:data, a:event])
+ call chansend(g:x, text)
+
+ if a:data == ['']
+ call chansend(v:stderr, "*dies*")
+ quit
+ endif
+ endfunction
+ let g:x = stdioopen({'on_stdin':'OnEvent'})
+ call chansend(x, "hello")
+ ]])
+ command("let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)")
+ local id = eval("g:id")
+ ok(id > 0)
+
+ eq({ "notification", "stdout", {id, { "hello" } } }, next_msg())
+
+ command("call chansend(id, 'howdy')")
+ eq({"notification", "stdout", {id, {"[1, ['howdy'], 'stdin']"}}}, next_msg())
+
+ command("call chanclose(id, 'stdin')")
+ expect_twostreams({{"notification", "stdout", {id, {"[1, [''], 'stdin']"}}},
+ {'notification', 'stdout', {id, {''}}}},
+ {{"notification", "stderr", {id, {"*dies*"}}},
+ {'notification', 'stderr', {id, {''}}}})
+ eq({"notification", "exit", {3,0}}, next_msg())
+ end)
+
+ local function expect_twoline(id, stream, line1, line2, nobr)
+ local msg = next_msg()
+ local joined = nobr and {line1..line2} or {line1, line2}
+ if not pcall(eq, {"notification", stream, {id, joined}}, msg) then
+ local sep = (not nobr) and "" or nil
+ eq({"notification", stream, {id, {line1, sep}}}, msg)
+ eq({"notification", stream, {id, {line2}}}, next_msg())
+ end
+ end
+
+ it('can use stdio channel with pty', function()
+ if helpers.pending_win32(pending) then return end
+ source([[
+ let g:job_opts = {
+ \ 'on_stdout': function('OnEvent'),
+ \ 'on_exit': function('OnEvent'),
+ \ 'pty': v:true,
+ \ }
+ ]])
+ meths.set_var("nvim_prog", nvim_prog)
+ meths.set_var("code", [[
+ function! OnEvent(id, data, event) dict
+ let text = string([a:id, a:data, a:event])
+ call chansend(g:x, text)
+ endfunction
+ let g:x = stdioopen({'on_stdin':'OnEvent'})
+ ]])
+ command("let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)")
+ local id = eval("g:id")
+ ok(id > 0)
+
+ command("call chansend(id, 'TEXT\n')")
+ expect_twoline(id, "stdout", "TEXT\r", "[1, ['TEXT', ''], 'stdin']")
+
+
+ command("call chansend(id, 'neovan')")
+ eq({"notification", "stdout", {id, {"neovan"}}}, next_msg())
+ command("call chansend(id, '\127\127im\n')")
+ expect_twoline(id, "stdout", "\b \b\b \bim\r", "[1, ['neovim', ''], 'stdin']")
+
+ command("call chansend(id, 'incomplet\004')")
+
+ local is_freebsd = eval("system('uname') =~? 'FreeBSD'") == 1
+ local bsdlike = is_freebsd or (helpers.os_name() == "osx")
+ print("bsdlike:", bsdlike)
+ local extra = bsdlike and "^D\008\008" or ""
+ expect_twoline(id, "stdout",
+ "incomplet"..extra, "[1, ['incomplet'], 'stdin']", true)
+
+
+ command("call chansend(id, '\004')")
+ if bsdlike then
+ expect_twoline(id, "stdout", extra, "[1, [''], 'stdin']", true)
+ else
+ eq({"notification", "stdout", {id, {"[1, [''], 'stdin']"}}}, next_msg())
+ end
+
+ -- channel is still open
+ command("call chansend(id, 'hi again!\n')")
+ eq({"notification", "stdout", {id, {"hi again!\r", ""}}}, next_msg())
+ end)
+
+
+ it('stdio channel can use rpc and stderr simultaneously', function()
+ if helpers.pending_win32(pending) then return end
+ source([[
+ let g:job_opts = {
+ \ 'on_stderr': function('OnEvent'),
+ \ 'on_exit': function('OnEvent'),
+ \ 'rpc': v:true,
+ \ }
+ ]])
+ meths.set_var("nvim_prog", nvim_prog)
+ meths.set_var("code", [[
+ let id = stdioopen({'rpc':v:true})
+ call rpcnotify(id,"nvim_call_function", "rpcnotify", [1, "message", "hi there!", id])
+ call chansend(v:stderr, "trouble!")
+ ]])
+ command("let id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)")
+ eq({"notification", "message", {"hi there!", 1}}, next_msg())
+ eq({"notification", "stderr", {3, {"trouble!"}}}, next_msg())
+
+ eq(30, eval("rpcrequest(id, 'nvim_eval', '[chansend(v:stderr, \"math??\"), 5*6][1]')"))
+ eq({"notification", "stderr", {3, {"math??"}}}, next_msg())
+
+ local _, err = pcall(command,"call rpcrequest(id, 'nvim_command', 'call chanclose(v:stderr, \"stdin\")')")
+ ok(string.find(err,"E906: invalid stream for channel") ~= nil)
+
+ eq(1, eval("rpcrequest(id, 'nvim_eval', 'chanclose(v:stderr, \"stderr\")')"))
+ eq({"notification", "stderr", {3, {""}}}, next_msg())
+
+ command("call rpcnotify(id, 'nvim_command', 'quit')")
+ eq({"notification", "exit", {3, 0}}, next_msg())
+ end)
+
+ it('can use buffered output mode', function()
+ if helpers.pending_win32(pending) then return end
+ source([[
+ let g:job_opts = {
+ \ 'on_stdout': function('OnEvent'),
+ \ 'on_exit': function('OnEvent'),
+ \ 'stdout_buffered': v:true,
+ \ }
+ ]])
+ command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
+ local id = eval("g:id")
+
+ command([[call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")]])
+ sleep(10)
+ command([[call chansend(id, "xx\n20 GOTO 10\nzz\n")]])
+ command("call chanclose(id, 'stdin')")
+
+ eq({"notification", "stdout", {id, {'10 PRINT "NVIM"',
+ '20 GOTO 10', ''}}}, next_msg())
+ eq({"notification", "exit", {id, 0}}, next_msg())
+
+ command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
+ id = eval("g:id")
+
+ command([[call chansend(id, "is no number\nnot at all")]])
+ command("call chanclose(id, 'stdin')")
+
+ -- works correctly with no output
+ eq({"notification", "stdout", {id, {''}}}, next_msg())
+ eq({"notification", "exit", {id, 1}}, next_msg())
+
+ end)
+
+ it('can use buffered output mode with no stream callback', function()
+ if helpers.pending_win32(pending) then return end
+ source([[
+ function! OnEvent(id, data, event) dict
+ call rpcnotify(1, a:event, a:id, a:data, self.stdout)
+ endfunction
+ let g:job_opts = {
+ \ 'on_exit': function('OnEvent'),
+ \ 'stdout_buffered': v:true,
+ \ }
+ ]])
+ command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
+ local id = eval("g:id")
+
+ command([[call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")]])
+ sleep(10)
+ command([[call chansend(id, "xx\n20 GOTO 10\nzz\n")]])
+ command("call chanclose(id, 'stdin')")
+
+ eq({"notification", "exit", {id, 0, {'10 PRINT "NVIM"',
+ '20 GOTO 10', ''}}}, next_msg())
+
+ -- reset dictionary
+ source([[
+ let g:job_opts = {
+ \ 'on_exit': function('OnEvent'),
+ \ 'stdout_buffered': v:true,
+ \ }
+ ]])
+ command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
+ id = eval("g:id")
+
+ command([[call chansend(id, "is no number\nnot at all")]])
+ command("call chanclose(id, 'stdin')")
+
+ -- works correctly with no output
+ eq({"notification", "exit", {id, 1, {''}}}, next_msg())
+
+ end)
+end)
diff --git a/test/functional/core/job_spec.lua b/test/functional/core/job_spec.lua
index 1b8a5b1b95..e957650c88 100644
--- a/test/functional/core/job_spec.lua
+++ b/test/functional/core/job_spec.lua
@@ -10,6 +10,7 @@ local wait = helpers.wait
local iswin = helpers.iswin
local get_pathsep = helpers.get_pathsep
local nvim_set = helpers.nvim_set
+local expect_twostreams = helpers.expect_twostreams
local Screen = require('test.functional.ui.screen')
describe('jobs', function()
@@ -29,15 +30,14 @@ describe('jobs', function()
\ ? map(a:data, 'substitute(v:val, "\r", "", "g")')
\ : a:data
endfunction
- function! s:OnEvent(id, data, event) dict
+ function! OnEvent(id, data, event) dict
let userdata = get(self, 'user')
let data = Normalize(a:data)
call rpcnotify(g:channel, a:event, userdata, data)
endfunction
let g:job_opts = {
- \ 'on_stdout': function('s:OnEvent'),
- \ 'on_stderr': function('s:OnEvent'),
- \ 'on_exit': function('s:OnEvent'),
+ \ 'on_stdout': function('OnEvent'),
+ \ 'on_exit': function('OnEvent'),
\ 'user': 0
\ }
]])
@@ -51,6 +51,7 @@ describe('jobs', function()
nvim('command', "let j = jobstart('echo $VAR', g:job_opts)")
end
eq({'notification', 'stdout', {0, {'abc', ''}}}, next_msg())
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -63,6 +64,7 @@ describe('jobs', function()
end
eq({'notification', 'stdout',
{0, {(iswin() and [[C:\]] or '/'), ''}}}, next_msg())
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -76,6 +78,7 @@ describe('jobs', function()
nvim('command', "let j = jobstart('pwd', g:job_opts)")
end
eq({'notification', 'stdout', {0, {dir, ''}}}, next_msg())
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
rmdir(dir)
end)
@@ -118,8 +121,12 @@ describe('jobs', function()
it('invokes callbacks when the job writes and exits', function()
-- TODO: hangs on Windows
if helpers.pending_win32(pending) then return end
+ nvim('command', "let g:job_opts.on_stderr = function('OnEvent')")
nvim('command', "call jobstart('echo', g:job_opts)")
- eq({'notification', 'stdout', {0, {'', ''}}}, next_msg())
+ expect_twostreams({{'notification', 'stdout', {0, {'', ''}}},
+ {'notification', 'stdout', {0, {''}}}},
+ {{'notification', 'stderr', {0, {''}}}})
+
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -134,6 +141,7 @@ describe('jobs', function()
nvim('command', 'call jobsend(j, [123, "xyz", ""])')
eq({'notification', 'stdout', {0, {'123', 'xyz', ''}}}, next_msg())
nvim('command', "call jobstop(j)")
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -145,6 +153,7 @@ describe('jobs', function()
nvim('command', "let j = jobstart(['cat', '"..filename.."'], g:job_opts)")
eq({'notification', 'stdout', {0, {'abc\ndef', ''}}}, next_msg())
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
os.remove(filename)
@@ -168,6 +177,7 @@ describe('jobs', function()
nvim('command', 'call jobsend(j, "abc\\nxyz")')
eq({'notification', 'stdout', {0, {'abc', 'xyz'}}}, next_msg())
nvim('command', "call jobstop(j)")
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -186,6 +196,7 @@ describe('jobs', function()
eq({'notification', 'stdout', {0, {'\n123\n', 'abc\nxyz\n', ''}}},
next_msg())
nvim('command', "call jobstop(j)")
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -196,6 +207,7 @@ describe('jobs', function()
eq({'notification', 'stdout', {0, {'some data', 'without\nfinal nl'}}},
next_msg())
nvim('command', "call jobstop(j)")
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -203,6 +215,7 @@ describe('jobs', function()
if helpers.pending_win32(pending) then return end -- TODO: Need `cat`.
nvim('command', "let j = jobstart(['cat', '-'], g:job_opts)")
nvim('command', 'call jobclose(j, "stdin")')
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@@ -239,6 +252,7 @@ describe('jobs', function()
local pid = eval('jobpid(j)')
eq(0,os.execute('ps -p '..pid..' > /dev/null'))
nvim('command', 'call jobstop(j)')
+ eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
neq(0,os.execute('ps -p '..pid..' > /dev/null'))
end)
@@ -270,6 +284,7 @@ describe('jobs', function()
nvim('command', [[call jobstart('echo "foo"', g:job_opts)]])
local data = {n = 5, s = 'str', l = {1}}
eq({'notification', 'stdout', {data, {'foo', ''}}}, next_msg())
+ eq({'notification', 'stdout', {data, {''}}}, next_msg())
eq({'notification', 'exit', {data, 0}}, next_msg())
end)
@@ -283,7 +298,6 @@ describe('jobs', function()
it('can omit data callbacks', function()
nvim('command', 'unlet g:job_opts.on_stdout')
- nvim('command', 'unlet g:job_opts.on_stderr')
nvim('command', 'let g:job_opts.user = 5')
nvim('command', [[call jobstart('echo "foo"', g:job_opts)]])
eq({'notification', 'exit', {5, 0}}, next_msg())
@@ -294,11 +308,13 @@ describe('jobs', function()
nvim('command', 'let g:job_opts.user = 5')
nvim('command', [[call jobstart('echo "foo"', g:job_opts)]])
eq({'notification', 'stdout', {5, {'foo', ''}}}, next_msg())
+ eq({'notification', 'stdout', {5, {''}}}, next_msg())
end)
it('will pass return code with the exit event', function()
nvim('command', 'let g:job_opts.user = 5')
nvim('command', "call jobstart('exit 55', g:job_opts)")
+ eq({'notification', 'stdout', {5, {''}}}, next_msg())
eq({'notification', 'exit', {5, 55}}, next_msg())
end)
@@ -341,6 +357,14 @@ describe('jobs', function()
end)
it('requires funcrefs for script-local (s:) functions', function()
+ local screen = Screen.new(60, 5)
+ screen:attach()
+ screen:set_default_attr_ids({
+ [1] = {bold = true, foreground = Screen.colors.Blue1},
+ [2] = {foreground = Screen.colors.Grey100, background = Screen.colors.Red},
+ [3] = {bold = true, foreground = Screen.colors.SeaGreen4}
+ })
+
-- Pass job callback names _without_ `function(...)`.
source([[
function! s:OnEvent(id, data, event) dict
@@ -350,14 +374,10 @@ describe('jobs', function()
\ 'on_stdout': 's:OnEvent',
\ 'on_stderr': 's:OnEvent',
\ 'on_exit': 's:OnEvent',
- \ 'user': 2349
\ })
]])
- -- The behavior is asynchronous, retry until a time limit.
- helpers.retry(nil, 10000, function()
- eq("E120:", string.match(eval("v:errmsg"), "E%d*:"))
- end)
+ screen:expect("{2:E120: Using <SID> not in a script context: s:OnEvent}",nil,nil,nil,true)
end)
it('does not repeat output with slow output handlers', function()
@@ -376,7 +396,7 @@ describe('jobs', function()
call jobwait([jobstart(cmd, d)])
call rpcnotify(g:channel, 'data', d.data)
]])
- eq({'notification', 'data', {{{'1', ''}, {'2', ''}, {'3', ''}, {'4', ''}, {'5', ''}}}}, next_msg())
+ eq({'notification', 'data', {{{'1', ''}, {'2', ''}, {'3', ''}, {'4', ''}, {'5', ''}, {''}}}}, next_msg())
end)
it('jobstart() works with partial functions', function()
@@ -497,7 +517,8 @@ describe('jobs', function()
elseif self.state == 2
let self.state = 3
call jobsend(a:id, "line3\n")
- else
+ elseif self.state == 3
+ let self.state = 4
call rpcnotify(g:channel, 'w', printf('job %d closed', self.counter))
call jobclose(a:id, 'stdin')
endif
@@ -552,6 +573,7 @@ describe('jobs', function()
-- FIXME need to wait until jobsend succeeds before calling jobstop
pending('will only emit the "exit" event after "stdout" and "stderr"', function()
+ nvim('command', "let g:job_opts.on_stderr = function('s:OnEvent')")
nvim('command', "let j = jobstart(['cat', '-'], g:job_opts)")
local jobid = nvim('eval', 'j')
nvim('eval', 'jobsend(j, "abcdef")')
@@ -638,7 +660,7 @@ describe('jobs', function()
-- there won't be any more messages, and the test would hang.
helpers.sleep(100)
local err = exc_exec('call jobpid(j)')
- eq('Vim(call):E900: Invalid job id', err)
+ eq('Vim(call):E900: Invalid channel id', err)
-- cleanup
eq(other_pid, eval('jobpid(' .. other_jobid .. ')'))
diff --git a/test/functional/helpers.lua b/test/functional/helpers.lua
index 848f1ef477..272d2045c9 100644
--- a/test/functional/helpers.lua
+++ b/test/functional/helpers.lua
@@ -100,6 +100,22 @@ local function next_message()
return session:next_message()
end
+local function expect_twostreams(msgs1, msgs2)
+ local pos1, pos2 = 1, 1
+ while pos1 <= #msgs1 or pos2 <= #msgs2 do
+ local msg = next_message()
+ if pos1 <= #msgs1 and pcall(eq, msgs1[pos1], msg) then
+ pos1 = pos1 + 1
+ elseif pos2 <= #msgs2 then
+ eq(msgs2[pos2], msg)
+ pos2 = pos2 + 1
+ else
+ -- already failed, but show the right error message
+ eq(msgs1[pos1], msg)
+ end
+ end
+end
+
local function call_and_stop_on_error(...)
local status, result = copcall(...) -- luacheck: ignore
if not status then
@@ -618,6 +634,33 @@ local function alter_slashes(obj)
end
end
+local function hexdump(str)
+ local len = string.len( str )
+ local dump = ""
+ local hex = ""
+ local asc = ""
+
+ for i = 1, len do
+ if 1 == i % 8 then
+ dump = dump .. hex .. asc .. "\n"
+ hex = string.format( "%04x: ", i - 1 )
+ asc = ""
+ end
+
+ local ord = string.byte( str, i )
+ hex = hex .. string.format( "%02x ", ord )
+ if ord >= 32 and ord <= 126 then
+ asc = asc .. string.char( ord )
+ else
+ asc = asc .. "."
+ end
+ end
+
+ return dump .. hex
+ .. string.rep( " ", 8 - len % 8 ) .. asc
+
+end
+
local module = {
prepend_argv = prepend_argv,
clear = clear,
@@ -636,6 +679,7 @@ local module = {
command = nvim_command,
request = request,
next_message = next_message,
+ expect_twostreams = expect_twostreams,
run = run,
stop = stop,
eq = eq,
@@ -687,6 +731,7 @@ local module = {
get_pathsep = get_pathsep,
missing_provider = missing_provider,
alter_slashes = alter_slashes,
+ hexdump = hexdump,
}
return function(after_each)
diff --git a/test/helpers.lua b/test/helpers.lua
index 260f10002e..123dd111ee 100644
--- a/test/helpers.lua
+++ b/test/helpers.lua
@@ -225,21 +225,8 @@ local function check_cores(app, force)
local esigns = ('='):rep(len / 2)
out:write(('\n%s Core file %s %s\n'):format(esigns, core, esigns))
out:flush()
- local pipe = io.popen(
- db_cmd:gsub('%$_NVIM_TEST_APP', app):gsub('%$_NVIM_TEST_CORE', core)
- .. ' 2>&1', 'r')
- if pipe then
- local bt = pipe:read('*a')
- if bt then
- out:write(bt)
- out:write('\n')
- else
- out:write('Failed to read from the pipe\n')
- end
- else
- out:write('Failed to create pipe\n')
- end
- out:flush()
+ os.execute(db_cmd:gsub('%$_NVIM_TEST_APP', app):gsub('%$_NVIM_TEST_CORE', core) .. ' 2>&1')
+ out:write('\n')
found_cores = found_cores + 1
os.remove(core)
end