aboutsummaryrefslogtreecommitdiff
path: root/test/client
diff options
context:
space:
mode:
Diffstat (limited to 'test/client')
-rw-r--r--test/client/rpc_stream.lua (renamed from test/client/msgpack_rpc_stream.lua)49
-rw-r--r--test/client/session.lua58
-rw-r--r--test/client/uv_stream.lua105
3 files changed, 160 insertions, 52 deletions
diff --git a/test/client/msgpack_rpc_stream.lua b/test/client/rpc_stream.lua
index 7131940a58..9f2672bcf9 100644
--- a/test/client/msgpack_rpc_stream.lua
+++ b/test/client/rpc_stream.lua
@@ -1,34 +1,41 @@
+---
+--- Reading/writing of msgpack over any of the stream types from `uv_stream.lua`.
+--- Does not implement the RPC protocol, see `session.lua` for that.
+---
+
local mpack = vim.mpack
local Response = {}
Response.__index = Response
-function Response.new(msgpack_rpc_stream, request_id)
+function Response.new(rpc_stream, request_id)
return setmetatable({
- _msgpack_rpc_stream = msgpack_rpc_stream,
+ _rpc_stream = rpc_stream,
_request_id = request_id,
}, Response)
end
function Response:send(value, is_error)
- local data = self._msgpack_rpc_stream._session:reply(self._request_id)
+ local data = self._rpc_stream._session:reply(self._request_id)
if is_error then
- data = data .. self._msgpack_rpc_stream._pack(value)
- data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
+ data = data .. self._rpc_stream._pack(value)
+ data = data .. self._rpc_stream._pack(mpack.NIL)
else
- data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
- data = data .. self._msgpack_rpc_stream._pack(value)
+ data = data .. self._rpc_stream._pack(mpack.NIL)
+ data = data .. self._rpc_stream._pack(value)
end
- self._msgpack_rpc_stream._stream:write(data)
+ self._rpc_stream._stream:write(data)
end
---- @class test.MsgpackRpcStream
+--- Nvim msgpack RPC stream.
+---
+--- @class test.RpcStream
--- @field private _stream test.Stream
--- @field private __pack table
-local MsgpackRpcStream = {}
-MsgpackRpcStream.__index = MsgpackRpcStream
+local RpcStream = {}
+RpcStream.__index = RpcStream
-function MsgpackRpcStream.new(stream)
+function RpcStream.new(stream)
return setmetatable({
_stream = stream,
_pack = mpack.Packer(),
@@ -50,10 +57,10 @@ function MsgpackRpcStream.new(stream)
},
}),
}),
- }, MsgpackRpcStream)
+ }, RpcStream)
end
-function MsgpackRpcStream:write(method, args, response_cb)
+function RpcStream:write(method, args, response_cb)
local data
if response_cb then
assert(type(response_cb) == 'function')
@@ -66,10 +73,10 @@ function MsgpackRpcStream:write(method, args, response_cb)
self._stream:write(data)
end
-function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
+function RpcStream:read_start(on_request, on_notification, on_eof)
self._stream:read_start(function(data)
if not data then
- return eof_cb()
+ return on_eof()
end
local type, id_or_cb, method_or_error, args_or_result
local pos = 1
@@ -78,9 +85,9 @@ function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos)
if type == 'request' or type == 'notification' then
if type == 'request' then
- request_cb(method_or_error, args_or_result, Response.new(self, id_or_cb))
+ on_request(method_or_error, args_or_result, Response.new(self, id_or_cb))
else
- notification_cb(method_or_error, args_or_result)
+ on_notification(method_or_error, args_or_result)
end
elseif type == 'response' then
if method_or_error == mpack.NIL then
@@ -94,12 +101,12 @@ function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
end)
end
-function MsgpackRpcStream:read_stop()
+function RpcStream:read_stop()
self._stream:read_stop()
end
-function MsgpackRpcStream:close(signal)
+function RpcStream:close(signal)
self._stream:close(signal)
end
-return MsgpackRpcStream
+return RpcStream
diff --git a/test/client/session.lua b/test/client/session.lua
index f1f46c5efe..a5839e012a 100644
--- a/test/client/session.lua
+++ b/test/client/session.lua
@@ -1,12 +1,18 @@
+---
+--- Nvim msgpack-RPC protocol session. Manages requests/notifications/responses.
+---
+
local uv = vim.uv
-local MsgpackRpcStream = require('test.client.msgpack_rpc_stream')
+local RpcStream = require('test.client.rpc_stream')
+--- Nvim msgpack-RPC protocol session. Manages requests/notifications/responses.
+---
--- @class test.Session
---- @field private _pending_messages string[]
---- @field private _msgpack_rpc_stream test.MsgpackRpcStream
+--- @field private _pending_messages string[] Requests/notifications received from the remote end.
+--- @field private _rpc_stream test.RpcStream
--- @field private _prepare uv.uv_prepare_t
--- @field private _timer uv.uv_timer_t
---- @field private _is_running boolean
+--- @field private _is_running boolean true during `Session:run()` scope.
--- @field exec_lua_setup boolean
local Session = {}
Session.__index = Session
@@ -51,9 +57,10 @@ local function coroutine_exec(func, ...)
end))
end
+--- Creates a new msgpack-RPC session.
function Session.new(stream)
return setmetatable({
- _msgpack_rpc_stream = MsgpackRpcStream.new(stream),
+ _rpc_stream = RpcStream.new(stream),
_pending_messages = {},
_prepare = uv.new_prepare(),
_timer = uv.new_timer(),
@@ -91,10 +98,13 @@ function Session:next_message(timeout)
return table.remove(self._pending_messages, 1)
end
+--- Sends a notification to the RPC endpoint.
function Session:notify(method, ...)
- self._msgpack_rpc_stream:write(method, { ... })
+ self._rpc_stream:write(method, { ... })
end
+--- Sends a request to the RPC endpoint.
+---
--- @param method string
--- @param ... any
--- @return boolean, table
@@ -114,8 +124,16 @@ function Session:request(method, ...)
return true, result
end
---- Runs the event loop.
+--- Processes incoming RPC requests/notifications until exhausted.
+---
+--- TODO(justinmk): luaclient2 avoids this via uvutil.cb_wait() + uvutil.add_idle_call()?
+---
+--- @param request_cb function Handles requests from the sever to the local end.
+--- @param notification_cb function Handles notifications from the sever to the local end.
+--- @param setup_cb function
+--- @param timeout number
function Session:run(request_cb, notification_cb, setup_cb, timeout)
+ --- Handles an incoming request.
local function on_request(method, args, response)
coroutine_exec(request_cb, method, args, function(status, result, flag)
if status then
@@ -126,6 +144,7 @@ function Session:run(request_cb, notification_cb, setup_cb, timeout)
end)
end
+ --- Handles an incoming notification.
local function on_notification(method, args)
coroutine_exec(notification_cb, method, args)
end
@@ -160,39 +179,45 @@ function Session:close(signal)
if not self._prepare:is_closing() then
self._prepare:close()
end
- self._msgpack_rpc_stream:close(signal)
+ self._rpc_stream:close(signal)
self.closed = true
end
+--- Sends a request to the RPC endpoint, without blocking (schedules a coroutine).
function Session:_yielding_request(method, args)
return coroutine.yield(function(co)
- self._msgpack_rpc_stream:write(method, args, function(err, result)
+ self._rpc_stream:write(method, args, function(err, result)
resume(co, err, result)
end)
end)
end
+--- Sends a request to the RPC endpoint, and blocks (polls event loop) until a response is received.
function Session:_blocking_request(method, args)
local err, result
+ -- Invoked when a request is received from the remote end.
local function on_request(method_, args_, response)
table.insert(self._pending_messages, { 'request', method_, args_, response })
end
+ -- Invoked when a notification is received from the remote end.
local function on_notification(method_, args_)
table.insert(self._pending_messages, { 'notification', method_, args_ })
end
- self._msgpack_rpc_stream:write(method, args, function(e, r)
+ self._rpc_stream:write(method, args, function(e, r)
err = e
result = r
uv.stop()
end)
+ -- Poll for incoming requests/notifications received from the remote end.
self:_run(on_request, on_notification)
return (err or self.eof_err), result
end
+--- Polls for incoming requests/notifications received from the remote end.
function Session:_run(request_cb, notification_cb, timeout)
if type(timeout) == 'number' then
self._prepare:start(function()
@@ -202,14 +227,21 @@ function Session:_run(request_cb, notification_cb, timeout)
self._prepare:stop()
end)
end
- self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function()
+ self._rpc_stream:read_start(request_cb, notification_cb, function()
uv.stop()
- self.eof_err = { 1, 'EOF was received from Nvim. Likely the Nvim process crashed.' }
+
+ --- @diagnostic disable-next-line: invisible
+ local stderr = self._rpc_stream._stream.stderr --[[@as string?]]
+ -- See if `ProcStream.stderr` has anything useful.
+ stderr = '' ~= ((stderr or ''):match('^%s*(.*%S)') or '') and ' stderr:\n' .. stderr or ''
+
+ self.eof_err = { 1, 'EOF was received from Nvim. Likely the Nvim process crashed.' .. stderr }
end)
uv.run()
self._prepare:stop()
self._timer:stop()
- self._msgpack_rpc_stream:read_stop()
+ self._rpc_stream:read_stop()
end
+--- Nvim msgpack-RPC session.
return Session
diff --git a/test/client/uv_stream.lua b/test/client/uv_stream.lua
index adf002ba1e..6e1a6995be 100644
--- a/test/client/uv_stream.lua
+++ b/test/client/uv_stream.lua
@@ -1,3 +1,8 @@
+---
+--- Basic stream types.
+--- See `rpc_stream.lua` for the msgpack layer.
+---
+
local uv = vim.uv
--- @class test.Stream
@@ -6,6 +11,8 @@ local uv = vim.uv
--- @field read_stop fun(self)
--- @field close fun(self, signal?: string)
+--- Stream over given pipes.
+---
--- @class vim.StdioStream : test.Stream
--- @field private _in uv.uv_pipe_t
--- @field private _out uv.uv_pipe_t
@@ -45,6 +52,8 @@ function StdioStream:close()
self._out:close()
end
+--- Stream over a named pipe or TCP socket.
+---
--- @class test.SocketStream : test.Stream
--- @field package _stream_error? string
--- @field package _socket uv.uv_pipe_t
@@ -109,26 +118,54 @@ function SocketStream:close()
uv.close(self._socket)
end
---- @class test.ChildProcessStream : test.Stream
+--- Stream over child process stdio.
+---
+--- @class test.ProcStream : test.Stream
--- @field private _proc uv.uv_process_t
--- @field private _pid integer
--- @field private _child_stdin uv.uv_pipe_t
--- @field private _child_stdout uv.uv_pipe_t
+--- @field private _child_stderr uv.uv_pipe_t
+--- Collects stdout (if `collect_text=true`). Treats data as text (CRLF converted to LF).
+--- @field stdout string
+--- Collects stderr as raw data.
+--- @field stderr string
+--- Gets stderr+stdout as text (CRLF converted to LF).
+--- @field output fun(): string
+--- @field stdout_eof boolean
+--- @field stderr_eof boolean
+--- Collects text into the `stdout` field.
+--- @field collect_text boolean
+--- Exit code
--- @field status integer
--- @field signal integer
-local ChildProcessStream = {}
-ChildProcessStream.__index = ChildProcessStream
+local ProcStream = {}
+ProcStream.__index = ProcStream
+--- Starts child process specified by `argv`.
+---
--- @param argv string[]
--- @param env string[]?
--- @param io_extra uv.uv_pipe_t?
---- @return test.ChildProcessStream
-function ChildProcessStream.spawn(argv, env, io_extra)
+--- @return test.ProcStream
+function ProcStream.spawn(argv, env, io_extra)
local self = setmetatable({
- _child_stdin = uv.new_pipe(false),
- _child_stdout = uv.new_pipe(false),
+ collect_text = false,
+ output = function(self)
+ if not self.collect_text then
+ error('set collect_text=true')
+ end
+ return (self.stderr .. self.stdout):gsub('\r\n', '\n')
+ end,
+ stdout = '',
+ stderr = '',
+ stdout_eof = false,
+ stderr_eof = false,
+ _child_stdin = assert(uv.new_pipe(false)),
+ _child_stdout = assert(uv.new_pipe(false)),
+ _child_stderr = assert(uv.new_pipe(false)),
_exiting = false,
- }, ChildProcessStream)
+ }, ProcStream)
local prog = argv[1]
local args = {} --- @type string[]
for i = 2, #argv do
@@ -136,13 +173,14 @@ function ChildProcessStream.spawn(argv, env, io_extra)
end
--- @diagnostic disable-next-line:missing-fields
self._proc, self._pid = uv.spawn(prog, {
- stdio = { self._child_stdin, self._child_stdout, 1, io_extra },
+ stdio = { self._child_stdin, self._child_stdout, self._child_stderr, io_extra },
args = args,
--- @diagnostic disable-next-line:assign-type-mismatch
env = env,
}, function(status, signal)
- self.status = status
self.signal = signal
+ -- "Abort" exit may not set status; force to nonzero in that case.
+ self.status = (0 ~= (status or 0) or 0 == (signal or 0)) and status or (128 + (signal or 0))
end)
if not self._proc then
@@ -153,24 +191,54 @@ function ChildProcessStream.spawn(argv, env, io_extra)
return self
end
-function ChildProcessStream:write(data)
+function ProcStream:write(data)
self._child_stdin:write(data)
end
-function ChildProcessStream:read_start(cb)
- self._child_stdout:read_start(function(err, chunk)
- if err then
- error(err)
+function ProcStream:on_read(stream, cb, err, chunk)
+ if err then
+ error(err) -- stream read failed?
+ elseif chunk then
+ -- Always collect stderr, in case it gives useful info on failure.
+ if stream == 'stderr' then
+ self.stderr = self.stderr .. chunk --[[@as string]]
+ elseif stream == 'stdout' and self.collect_text then
+ -- Set `stdout` and convert CRLF => LF.
+ self.stdout = (self.stdout .. chunk):gsub('\r\n', '\n')
end
+ else
+ -- stderr_eof/stdout_eof
+ self[stream .. '_eof'] = true ---@type boolean
+ end
+
+ -- Handler provided by the caller.
+ if cb then
cb(chunk)
+ end
+end
+
+--- Collects output until the process exits.
+function ProcStream:wait()
+ while not (self.stdout_eof and self.stderr_eof and (self.status or self.signal)) do
+ uv.run('once')
+ end
+end
+
+function ProcStream:read_start(on_stdout, on_stderr)
+ self._child_stdout:read_start(function(err, chunk)
+ self:on_read('stdout', on_stdout, err, chunk)
+ end)
+ self._child_stderr:read_start(function(err, chunk)
+ self:on_read('stderr', on_stderr, err, chunk)
end)
end
-function ChildProcessStream:read_stop()
+function ProcStream:read_stop()
self._child_stdout:read_stop()
+ self._child_stderr:read_stop()
end
-function ChildProcessStream:close(signal)
+function ProcStream:close(signal)
if self._closed then
return
end
@@ -178,6 +246,7 @@ function ChildProcessStream:close(signal)
self:read_stop()
self._child_stdin:close()
self._child_stdout:close()
+ self._child_stderr:close()
if type(signal) == 'string' then
self._proc:kill('sig' .. signal)
end
@@ -189,6 +258,6 @@ end
return {
StdioStream = StdioStream,
- ChildProcessStream = ChildProcessStream,
+ ProcStream = ProcStream,
SocketStream = SocketStream,
}