diff options
Diffstat (limited to 'test/client')
-rw-r--r-- | test/client/rpc_stream.lua (renamed from test/client/msgpack_rpc_stream.lua) | 49 | ||||
-rw-r--r-- | test/client/session.lua | 58 | ||||
-rw-r--r-- | test/client/uv_stream.lua | 105 |
3 files changed, 160 insertions, 52 deletions
diff --git a/test/client/msgpack_rpc_stream.lua b/test/client/rpc_stream.lua index 7131940a58..9f2672bcf9 100644 --- a/test/client/msgpack_rpc_stream.lua +++ b/test/client/rpc_stream.lua @@ -1,34 +1,41 @@ +--- +--- Reading/writing of msgpack over any of the stream types from `uv_stream.lua`. +--- Does not implement the RPC protocol, see `session.lua` for that. +--- + local mpack = vim.mpack local Response = {} Response.__index = Response -function Response.new(msgpack_rpc_stream, request_id) +function Response.new(rpc_stream, request_id) return setmetatable({ - _msgpack_rpc_stream = msgpack_rpc_stream, + _rpc_stream = rpc_stream, _request_id = request_id, }, Response) end function Response:send(value, is_error) - local data = self._msgpack_rpc_stream._session:reply(self._request_id) + local data = self._rpc_stream._session:reply(self._request_id) if is_error then - data = data .. self._msgpack_rpc_stream._pack(value) - data = data .. self._msgpack_rpc_stream._pack(mpack.NIL) + data = data .. self._rpc_stream._pack(value) + data = data .. self._rpc_stream._pack(mpack.NIL) else - data = data .. self._msgpack_rpc_stream._pack(mpack.NIL) - data = data .. self._msgpack_rpc_stream._pack(value) + data = data .. self._rpc_stream._pack(mpack.NIL) + data = data .. self._rpc_stream._pack(value) end - self._msgpack_rpc_stream._stream:write(data) + self._rpc_stream._stream:write(data) end ---- @class test.MsgpackRpcStream +--- Nvim msgpack RPC stream. +--- +--- @class test.RpcStream --- @field private _stream test.Stream --- @field private __pack table -local MsgpackRpcStream = {} -MsgpackRpcStream.__index = MsgpackRpcStream +local RpcStream = {} +RpcStream.__index = RpcStream -function MsgpackRpcStream.new(stream) +function RpcStream.new(stream) return setmetatable({ _stream = stream, _pack = mpack.Packer(), @@ -50,10 +57,10 @@ function MsgpackRpcStream.new(stream) }, }), }), - }, MsgpackRpcStream) + }, RpcStream) end -function MsgpackRpcStream:write(method, args, response_cb) +function RpcStream:write(method, args, response_cb) local data if response_cb then assert(type(response_cb) == 'function') @@ -66,10 +73,10 @@ function MsgpackRpcStream:write(method, args, response_cb) self._stream:write(data) end -function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb) +function RpcStream:read_start(on_request, on_notification, on_eof) self._stream:read_start(function(data) if not data then - return eof_cb() + return on_eof() end local type, id_or_cb, method_or_error, args_or_result local pos = 1 @@ -78,9 +85,9 @@ function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb) type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos) if type == 'request' or type == 'notification' then if type == 'request' then - request_cb(method_or_error, args_or_result, Response.new(self, id_or_cb)) + on_request(method_or_error, args_or_result, Response.new(self, id_or_cb)) else - notification_cb(method_or_error, args_or_result) + on_notification(method_or_error, args_or_result) end elseif type == 'response' then if method_or_error == mpack.NIL then @@ -94,12 +101,12 @@ function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb) end) end -function MsgpackRpcStream:read_stop() +function RpcStream:read_stop() self._stream:read_stop() end -function MsgpackRpcStream:close(signal) +function RpcStream:close(signal) self._stream:close(signal) end -return MsgpackRpcStream +return RpcStream diff --git a/test/client/session.lua b/test/client/session.lua index f1f46c5efe..a5839e012a 100644 --- a/test/client/session.lua +++ b/test/client/session.lua @@ -1,12 +1,18 @@ +--- +--- Nvim msgpack-RPC protocol session. Manages requests/notifications/responses. +--- + local uv = vim.uv -local MsgpackRpcStream = require('test.client.msgpack_rpc_stream') +local RpcStream = require('test.client.rpc_stream') +--- Nvim msgpack-RPC protocol session. Manages requests/notifications/responses. +--- --- @class test.Session ---- @field private _pending_messages string[] ---- @field private _msgpack_rpc_stream test.MsgpackRpcStream +--- @field private _pending_messages string[] Requests/notifications received from the remote end. +--- @field private _rpc_stream test.RpcStream --- @field private _prepare uv.uv_prepare_t --- @field private _timer uv.uv_timer_t ---- @field private _is_running boolean +--- @field private _is_running boolean true during `Session:run()` scope. --- @field exec_lua_setup boolean local Session = {} Session.__index = Session @@ -51,9 +57,10 @@ local function coroutine_exec(func, ...) end)) end +--- Creates a new msgpack-RPC session. function Session.new(stream) return setmetatable({ - _msgpack_rpc_stream = MsgpackRpcStream.new(stream), + _rpc_stream = RpcStream.new(stream), _pending_messages = {}, _prepare = uv.new_prepare(), _timer = uv.new_timer(), @@ -91,10 +98,13 @@ function Session:next_message(timeout) return table.remove(self._pending_messages, 1) end +--- Sends a notification to the RPC endpoint. function Session:notify(method, ...) - self._msgpack_rpc_stream:write(method, { ... }) + self._rpc_stream:write(method, { ... }) end +--- Sends a request to the RPC endpoint. +--- --- @param method string --- @param ... any --- @return boolean, table @@ -114,8 +124,16 @@ function Session:request(method, ...) return true, result end ---- Runs the event loop. +--- Processes incoming RPC requests/notifications until exhausted. +--- +--- TODO(justinmk): luaclient2 avoids this via uvutil.cb_wait() + uvutil.add_idle_call()? +--- +--- @param request_cb function Handles requests from the sever to the local end. +--- @param notification_cb function Handles notifications from the sever to the local end. +--- @param setup_cb function +--- @param timeout number function Session:run(request_cb, notification_cb, setup_cb, timeout) + --- Handles an incoming request. local function on_request(method, args, response) coroutine_exec(request_cb, method, args, function(status, result, flag) if status then @@ -126,6 +144,7 @@ function Session:run(request_cb, notification_cb, setup_cb, timeout) end) end + --- Handles an incoming notification. local function on_notification(method, args) coroutine_exec(notification_cb, method, args) end @@ -160,39 +179,45 @@ function Session:close(signal) if not self._prepare:is_closing() then self._prepare:close() end - self._msgpack_rpc_stream:close(signal) + self._rpc_stream:close(signal) self.closed = true end +--- Sends a request to the RPC endpoint, without blocking (schedules a coroutine). function Session:_yielding_request(method, args) return coroutine.yield(function(co) - self._msgpack_rpc_stream:write(method, args, function(err, result) + self._rpc_stream:write(method, args, function(err, result) resume(co, err, result) end) end) end +--- Sends a request to the RPC endpoint, and blocks (polls event loop) until a response is received. function Session:_blocking_request(method, args) local err, result + -- Invoked when a request is received from the remote end. local function on_request(method_, args_, response) table.insert(self._pending_messages, { 'request', method_, args_, response }) end + -- Invoked when a notification is received from the remote end. local function on_notification(method_, args_) table.insert(self._pending_messages, { 'notification', method_, args_ }) end - self._msgpack_rpc_stream:write(method, args, function(e, r) + self._rpc_stream:write(method, args, function(e, r) err = e result = r uv.stop() end) + -- Poll for incoming requests/notifications received from the remote end. self:_run(on_request, on_notification) return (err or self.eof_err), result end +--- Polls for incoming requests/notifications received from the remote end. function Session:_run(request_cb, notification_cb, timeout) if type(timeout) == 'number' then self._prepare:start(function() @@ -202,14 +227,21 @@ function Session:_run(request_cb, notification_cb, timeout) self._prepare:stop() end) end - self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function() + self._rpc_stream:read_start(request_cb, notification_cb, function() uv.stop() - self.eof_err = { 1, 'EOF was received from Nvim. Likely the Nvim process crashed.' } + + --- @diagnostic disable-next-line: invisible + local stderr = self._rpc_stream._stream.stderr --[[@as string?]] + -- See if `ProcStream.stderr` has anything useful. + stderr = '' ~= ((stderr or ''):match('^%s*(.*%S)') or '') and ' stderr:\n' .. stderr or '' + + self.eof_err = { 1, 'EOF was received from Nvim. Likely the Nvim process crashed.' .. stderr } end) uv.run() self._prepare:stop() self._timer:stop() - self._msgpack_rpc_stream:read_stop() + self._rpc_stream:read_stop() end +--- Nvim msgpack-RPC session. return Session diff --git a/test/client/uv_stream.lua b/test/client/uv_stream.lua index adf002ba1e..6e1a6995be 100644 --- a/test/client/uv_stream.lua +++ b/test/client/uv_stream.lua @@ -1,3 +1,8 @@ +--- +--- Basic stream types. +--- See `rpc_stream.lua` for the msgpack layer. +--- + local uv = vim.uv --- @class test.Stream @@ -6,6 +11,8 @@ local uv = vim.uv --- @field read_stop fun(self) --- @field close fun(self, signal?: string) +--- Stream over given pipes. +--- --- @class vim.StdioStream : test.Stream --- @field private _in uv.uv_pipe_t --- @field private _out uv.uv_pipe_t @@ -45,6 +52,8 @@ function StdioStream:close() self._out:close() end +--- Stream over a named pipe or TCP socket. +--- --- @class test.SocketStream : test.Stream --- @field package _stream_error? string --- @field package _socket uv.uv_pipe_t @@ -109,26 +118,54 @@ function SocketStream:close() uv.close(self._socket) end ---- @class test.ChildProcessStream : test.Stream +--- Stream over child process stdio. +--- +--- @class test.ProcStream : test.Stream --- @field private _proc uv.uv_process_t --- @field private _pid integer --- @field private _child_stdin uv.uv_pipe_t --- @field private _child_stdout uv.uv_pipe_t +--- @field private _child_stderr uv.uv_pipe_t +--- Collects stdout (if `collect_text=true`). Treats data as text (CRLF converted to LF). +--- @field stdout string +--- Collects stderr as raw data. +--- @field stderr string +--- Gets stderr+stdout as text (CRLF converted to LF). +--- @field output fun(): string +--- @field stdout_eof boolean +--- @field stderr_eof boolean +--- Collects text into the `stdout` field. +--- @field collect_text boolean +--- Exit code --- @field status integer --- @field signal integer -local ChildProcessStream = {} -ChildProcessStream.__index = ChildProcessStream +local ProcStream = {} +ProcStream.__index = ProcStream +--- Starts child process specified by `argv`. +--- --- @param argv string[] --- @param env string[]? --- @param io_extra uv.uv_pipe_t? ---- @return test.ChildProcessStream -function ChildProcessStream.spawn(argv, env, io_extra) +--- @return test.ProcStream +function ProcStream.spawn(argv, env, io_extra) local self = setmetatable({ - _child_stdin = uv.new_pipe(false), - _child_stdout = uv.new_pipe(false), + collect_text = false, + output = function(self) + if not self.collect_text then + error('set collect_text=true') + end + return (self.stderr .. self.stdout):gsub('\r\n', '\n') + end, + stdout = '', + stderr = '', + stdout_eof = false, + stderr_eof = false, + _child_stdin = assert(uv.new_pipe(false)), + _child_stdout = assert(uv.new_pipe(false)), + _child_stderr = assert(uv.new_pipe(false)), _exiting = false, - }, ChildProcessStream) + }, ProcStream) local prog = argv[1] local args = {} --- @type string[] for i = 2, #argv do @@ -136,13 +173,14 @@ function ChildProcessStream.spawn(argv, env, io_extra) end --- @diagnostic disable-next-line:missing-fields self._proc, self._pid = uv.spawn(prog, { - stdio = { self._child_stdin, self._child_stdout, 1, io_extra }, + stdio = { self._child_stdin, self._child_stdout, self._child_stderr, io_extra }, args = args, --- @diagnostic disable-next-line:assign-type-mismatch env = env, }, function(status, signal) - self.status = status self.signal = signal + -- "Abort" exit may not set status; force to nonzero in that case. + self.status = (0 ~= (status or 0) or 0 == (signal or 0)) and status or (128 + (signal or 0)) end) if not self._proc then @@ -153,24 +191,54 @@ function ChildProcessStream.spawn(argv, env, io_extra) return self end -function ChildProcessStream:write(data) +function ProcStream:write(data) self._child_stdin:write(data) end -function ChildProcessStream:read_start(cb) - self._child_stdout:read_start(function(err, chunk) - if err then - error(err) +function ProcStream:on_read(stream, cb, err, chunk) + if err then + error(err) -- stream read failed? + elseif chunk then + -- Always collect stderr, in case it gives useful info on failure. + if stream == 'stderr' then + self.stderr = self.stderr .. chunk --[[@as string]] + elseif stream == 'stdout' and self.collect_text then + -- Set `stdout` and convert CRLF => LF. + self.stdout = (self.stdout .. chunk):gsub('\r\n', '\n') end + else + -- stderr_eof/stdout_eof + self[stream .. '_eof'] = true ---@type boolean + end + + -- Handler provided by the caller. + if cb then cb(chunk) + end +end + +--- Collects output until the process exits. +function ProcStream:wait() + while not (self.stdout_eof and self.stderr_eof and (self.status or self.signal)) do + uv.run('once') + end +end + +function ProcStream:read_start(on_stdout, on_stderr) + self._child_stdout:read_start(function(err, chunk) + self:on_read('stdout', on_stdout, err, chunk) + end) + self._child_stderr:read_start(function(err, chunk) + self:on_read('stderr', on_stderr, err, chunk) end) end -function ChildProcessStream:read_stop() +function ProcStream:read_stop() self._child_stdout:read_stop() + self._child_stderr:read_stop() end -function ChildProcessStream:close(signal) +function ProcStream:close(signal) if self._closed then return end @@ -178,6 +246,7 @@ function ChildProcessStream:close(signal) self:read_stop() self._child_stdin:close() self._child_stdout:close() + self._child_stderr:close() if type(signal) == 'string' then self._proc:kill('sig' .. signal) end @@ -189,6 +258,6 @@ end return { StdioStream = StdioStream, - ChildProcessStream = ChildProcessStream, + ProcStream = ProcStream, SocketStream = SocketStream, } |