From d6279f9392073cb1422d76c57baf3fd283ed954e Mon Sep 17 00:00:00 2001 From: bfredl Date: Tue, 31 Jan 2023 23:35:04 +0100 Subject: refactor(tests): move lua-client into core and use it for functionaltests Eliminates lua-client and non-static libluv as test time dependencies Note: the API for a public lua-client is not yet finished. The interface needs to be adjusted to work in the embedded loop of a nvim instance (to use it to talk between instances) --- test/client/msgpack_rpc_stream.lua | 112 ++++++++++++++++++++++ test/client/session.lua | 192 +++++++++++++++++++++++++++++++++++++ test/client/uv_stream.lua | 164 +++++++++++++++++++++++++++++++ 3 files changed, 468 insertions(+) create mode 100644 test/client/msgpack_rpc_stream.lua create mode 100644 test/client/session.lua create mode 100644 test/client/uv_stream.lua (limited to 'test/client') diff --git a/test/client/msgpack_rpc_stream.lua b/test/client/msgpack_rpc_stream.lua new file mode 100644 index 0000000000..5711616b17 --- /dev/null +++ b/test/client/msgpack_rpc_stream.lua @@ -0,0 +1,112 @@ +local mpack = require('mpack') + +-- temporary hack to be able to manipulate buffer/window/tabpage +local Buffer = {} +Buffer.__index = Buffer +function Buffer.new(id) return setmetatable({id=id}, Buffer) end +local Window = {} +Window.__index = Window +function Window.new(id) return setmetatable({id=id}, Window) end +local Tabpage = {} +Tabpage.__index = Tabpage +function Tabpage.new(id) return setmetatable({id=id}, Tabpage) end + +local Response = {} +Response.__index = Response + +function Response.new(msgpack_rpc_stream, request_id) + return setmetatable({ + _msgpack_rpc_stream = msgpack_rpc_stream, + _request_id = request_id + }, Response) +end + +function Response:send(value, is_error) + local data = self._msgpack_rpc_stream._session:reply(self._request_id) + if is_error then + data = data .. self._msgpack_rpc_stream._pack(value) + data = data .. self._msgpack_rpc_stream._pack(mpack.NIL) + else + data = data .. self._msgpack_rpc_stream._pack(mpack.NIL) + data = data .. self._msgpack_rpc_stream._pack(value) + end + self._msgpack_rpc_stream._stream:write(data) +end + +local MsgpackRpcStream = {} +MsgpackRpcStream.__index = MsgpackRpcStream + +function MsgpackRpcStream.new(stream) + return setmetatable({ + _stream = stream, + _pack = mpack.Packer({ + ext = { + [Buffer] = function(o) return 0, mpack.encode(o.id) end, + [Window] = function(o) return 1, mpack.encode(o.id) end, + [Tabpage] = function(o) return 2, mpack.encode(o.id) end + } + }), + _session = mpack.Session({ + unpack = mpack.Unpacker({ + ext = { + [0] = function(_c, s) return Buffer.new(mpack.decode(s)) end, + [1] = function(_c, s) return Window.new(mpack.decode(s)) end, + [2] = function(_c, s) return Tabpage.new(mpack.decode(s)) end + } + }) + }), + }, MsgpackRpcStream) +end + +function MsgpackRpcStream:write(method, args, response_cb) + local data + if response_cb then + assert(type(response_cb) == 'function') + data = self._session:request(response_cb) + else + data = self._session:notify() + end + + data = data .. self._pack(method) .. self._pack(args) + self._stream:write(data) +end + +function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb) + self._stream:read_start(function(data) + if not data then + return eof_cb() + end + local type, id_or_cb, method_or_error, args_or_result + local pos = 1 + local len = #data + while pos <= len do + type, id_or_cb, method_or_error, args_or_result, pos = + self._session:receive(data, pos) + if type == 'request' or type == 'notification' then + if type == 'request' then + request_cb(method_or_error, args_or_result, Response.new(self, + id_or_cb)) + else + notification_cb(method_or_error, args_or_result) + end + elseif type == 'response' then + if method_or_error == mpack.NIL then + method_or_error = nil + else + args_or_result = nil + end + id_or_cb(method_or_error, args_or_result) + end + end + end) +end + +function MsgpackRpcStream:read_stop() + self._stream:read_stop() +end + +function MsgpackRpcStream:close(signal) + self._stream:close(signal) +end + +return MsgpackRpcStream diff --git a/test/client/session.lua b/test/client/session.lua new file mode 100644 index 0000000000..0509fa88be --- /dev/null +++ b/test/client/session.lua @@ -0,0 +1,192 @@ +local uv = require('luv') +local MsgpackRpcStream = require('test.client.msgpack_rpc_stream') + +local Session = {} +Session.__index = Session +if package.loaded['jit'] then + -- luajit pcall is already coroutine safe + Session.safe_pcall = pcall +else + Session.safe_pcall = require'coxpcall'.pcall +end + +local function resume(co, ...) + local status, result = coroutine.resume(co, ...) + + if coroutine.status(co) == 'dead' then + if not status then + error(result) + end + return + end + + assert(coroutine.status(co) == 'suspended') + result(co) +end + +local function coroutine_exec(func, ...) + local args = {...} + local on_complete + + if #args > 0 and type(args[#args]) == 'function' then + -- completion callback + on_complete = table.remove(args) + end + + resume(coroutine.create(function() + local status, result, flag = Session.safe_pcall(func, unpack(args)) + if on_complete then + coroutine.yield(function() + -- run the completion callback on the main thread + on_complete(status, result, flag) + end) + end + end)) +end + +function Session.new(stream) + return setmetatable({ + _msgpack_rpc_stream = MsgpackRpcStream.new(stream), + _pending_messages = {}, + _prepare = uv.new_prepare(), + _timer = uv.new_timer(), + _is_running = false + }, Session) +end + +function Session:next_message(timeout) + local function on_request(method, args, response) + table.insert(self._pending_messages, {'request', method, args, response}) + uv.stop() + end + + local function on_notification(method, args) + table.insert(self._pending_messages, {'notification', method, args}) + uv.stop() + end + + if self._is_running then + error('Event loop already running') + end + + if #self._pending_messages > 0 then + return table.remove(self._pending_messages, 1) + end + + self:_run(on_request, on_notification, timeout) + return table.remove(self._pending_messages, 1) +end + +function Session:notify(method, ...) + self._msgpack_rpc_stream:write(method, {...}) +end + +function Session:request(method, ...) + local args = {...} + local err, result + if self._is_running then + err, result = self:_yielding_request(method, args) + else + err, result = self:_blocking_request(method, args) + end + + if err then + return false, err + end + + return true, result +end + +function Session:run(request_cb, notification_cb, setup_cb, timeout) + local function on_request(method, args, response) + coroutine_exec(request_cb, method, args, function(status, result, flag) + if status then + response:send(result, flag) + else + response:send(result, true) + end + end) + end + + local function on_notification(method, args) + coroutine_exec(notification_cb, method, args) + end + + self._is_running = true + + if setup_cb then + coroutine_exec(setup_cb) + end + + while #self._pending_messages > 0 do + local msg = table.remove(self._pending_messages, 1) + if msg[1] == 'request' then + on_request(msg[2], msg[3], msg[4]) + else + on_notification(msg[2], msg[3]) + end + end + + self:_run(on_request, on_notification, timeout) + self._is_running = false +end + +function Session:stop() + uv.stop() +end + +function Session:close(signal) + if not self._timer:is_closing() then self._timer:close() end + if not self._prepare:is_closing() then self._prepare:close() end + self._msgpack_rpc_stream:close(signal) +end + +function Session:_yielding_request(method, args) + return coroutine.yield(function(co) + self._msgpack_rpc_stream:write(method, args, function(err, result) + resume(co, err, result) + end) + end) +end + +function Session:_blocking_request(method, args) + local err, result + + local function on_request(method_, args_, response) + table.insert(self._pending_messages, {'request', method_, args_, response}) + end + + local function on_notification(method_, args_) + table.insert(self._pending_messages, {'notification', method_, args_}) + end + + self._msgpack_rpc_stream:write(method, args, function(e, r) + err = e + result = r + uv.stop() + end) + + self:_run(on_request, on_notification) + return (err or self.eof_err), result +end + +function Session:_run(request_cb, notification_cb, timeout) + if type(timeout) == 'number' then + self._prepare:start(function() + self._timer:start(timeout, 0, function() + uv.stop() + end) + self._prepare:stop() + end) + end + self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function() + uv.stop() + self.eof_err = {1, "EOF was received from Nvim. Likely the Nvim process crashed."} + end) + uv.run() + self._prepare:stop() + self._timer:stop() + self._msgpack_rpc_stream:read_stop() +end + +return Session diff --git a/test/client/uv_stream.lua b/test/client/uv_stream.lua new file mode 100644 index 0000000000..1f4414c934 --- /dev/null +++ b/test/client/uv_stream.lua @@ -0,0 +1,164 @@ +local uv = require('luv') + +local StdioStream = {} +StdioStream.__index = StdioStream + +function StdioStream.open() + local self = setmetatable({ + _in = uv.new_pipe(false), + _out = uv.new_pipe(false) + }, StdioStream) + self._in:open(0) + self._out:open(1) + return self +end + +function StdioStream:write(data) + self._out:write(data) +end + +function StdioStream:read_start(cb) + self._in:read_start(function(err, chunk) + if err then + error(err) + end + cb(chunk) + end) +end + +function StdioStream:read_stop() + self._in:read_stop() +end + +function StdioStream:close() + self._in:close() + self._out:close() +end + +local SocketStream = {} +SocketStream.__index = SocketStream + +function SocketStream.open(file) + local socket = uv.new_pipe(false) + local self = setmetatable({ + _socket = socket, + _stream_error = nil + }, SocketStream) + uv.pipe_connect(socket, file, function (err) + self._stream_error = self._stream_error or err + end) + return self +end + +function SocketStream.connect(host, port) + local socket = uv.new_tcp() + local self = setmetatable({ + _socket = socket, + _stream_error = nil + }, SocketStream) + uv.tcp_connect(socket, host, port, function (err) + self._stream_error = self._stream_error or err + end) + return self +end + + +function SocketStream:write(data) + if self._stream_error then + error(self._stream_error) + end + uv.write(self._socket, data, function(err) + if err then + error(self._stream_error or err) + end + end) +end + +function SocketStream:read_start(cb) + if self._stream_error then + error(self._stream_error) + end + uv.read_start(self._socket, function(err, chunk) + if err then + error(err) + end + cb(chunk) + end) +end + +function SocketStream:read_stop() + if self._stream_error then + error(self._stream_error) + end + uv.read_stop(self._socket) +end + +function SocketStream:close() + uv.close(self._socket) +end + +local ChildProcessStream = {} +ChildProcessStream.__index = ChildProcessStream + +function ChildProcessStream.spawn(argv, env, io_extra) + local self = setmetatable({ + _child_stdin = uv.new_pipe(false), + _child_stdout = uv.new_pipe(false) + }, ChildProcessStream) + local prog = argv[1] + local args = {} + for i = 2, #argv do + args[#args + 1] = argv[i] + end + self._proc, self._pid = uv.spawn(prog, { + stdio = {self._child_stdin, self._child_stdout, 2, io_extra}, + args = args, + env = env, + }, function() + self:close() + end) + + if not self._proc then + local err = self._pid + error(err) + end + + return self +end + +function ChildProcessStream:write(data) + self._child_stdin:write(data) +end + +function ChildProcessStream:read_start(cb) + self._child_stdout:read_start(function(err, chunk) + if err then + error(err) + end + cb(chunk) + end) +end + +function ChildProcessStream:read_stop() + self._child_stdout:read_stop() +end + +function ChildProcessStream:close(signal) + if self._closed then + return + end + self._closed = true + self:read_stop() + self._child_stdin:close() + self._child_stdout:close() + if type(signal) == 'string' then + self._proc:kill('sig'..signal) + end + uv.run('nowait') +end + +return { + StdioStream = StdioStream; + ChildProcessStream = ChildProcessStream; + SocketStream = SocketStream; +} -- cgit From 0837980db4958baca96449869d31120f349f3500 Mon Sep 17 00:00:00 2001 From: bfredl Date: Fri, 10 Feb 2023 10:26:18 +0100 Subject: fix(client): wait for session to exit This replicates the old native.pid_wait(self._pid) call, except using the proper libuv pattern (run loop unitil exit callback) --- test/client/uv_stream.lua | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'test/client') diff --git a/test/client/uv_stream.lua b/test/client/uv_stream.lua index 1f4414c934..cea77f0dbd 100644 --- a/test/client/uv_stream.lua +++ b/test/client/uv_stream.lua @@ -102,8 +102,9 @@ ChildProcessStream.__index = ChildProcessStream function ChildProcessStream.spawn(argv, env, io_extra) local self = setmetatable({ - _child_stdin = uv.new_pipe(false), - _child_stdout = uv.new_pipe(false) + _child_stdin = uv.new_pipe(false); + _child_stdout = uv.new_pipe(false); + _exiting = false; }, ChildProcessStream) local prog = argv[1] local args = {} @@ -114,8 +115,9 @@ function ChildProcessStream.spawn(argv, env, io_extra) stdio = {self._child_stdin, self._child_stdout, 2, io_extra}, args = args, env = env, - }, function() - self:close() + }, function(status, signal) + self.status = status + self.signal = signal end) if not self._proc then @@ -154,7 +156,10 @@ function ChildProcessStream:close(signal) if type(signal) == 'string' then self._proc:kill('sig'..signal) end - uv.run('nowait') + while self.status == nil do + uv.run 'once' + end + return self.status, self.signal end return { -- cgit From 5970157e1d22fd5e05ae5d3bd949f807fb7a744c Mon Sep 17 00:00:00 2001 From: bfredl Date: Wed, 17 May 2023 16:08:06 +0200 Subject: refactor(map): enhanced implementation, Clean Codeā„¢, etc etc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This involves two redesigns of the map.c implementations: 1. Change of macro style and code organization The old khash.h and map.c implementation used huge #define blocks with a lot of backslash line continuations. This instead uses the "implementation file" .c.h pattern. Such a file is meant to be included multiple times, with different macros set prior to inclusion as parameters. we already use this pattern e.g. for eval/typval_encode.c.h to implement different typval encoders reusing a similar structure. We can structure this code into two parts. one that only depends on key type and is enough to implement sets, and one which depends on both key and value to implement maps (as a wrapper around sets, with an added value[] array) 2. Separate the main hash buckets from the key / value arrays Change the hack buckets to only contain an index into separate key / value arrays This is a common pattern in modern, state of the art hashmap implementations. Even though this leads to one more allocated array, it is this often is a net reduction of memory consumption. Consider key+value consuming at least 12 bytes per pair. On average, we will have twice as many buckets per item. Thus old implementation: 2*12 = 24 bytes per item New implementation 1*12 + 2*4 = 20 bytes per item And the difference gets bigger with larger items. One might think we have pulled a fast one here, as wouldn't the average size of the new key/value arrays be 1.5 slots per items due to amortized grows? But remember, these arrays are fully dense, and thus the accessed memory, measured in _cache lines_, the unit which actually matters, will be the fully used memory but just rounded up to the nearest cache line boundary. This has some other interesting properties, such as an insert-only set/map will be fully ordered by insert only. Preserving this ordering in face of deletions is more tricky tho. As we currently don't use ordered maps, the "delete" operation maintains compactness of the item arrays in the simplest way by breaking the ordering. It would be possible to implement an order-preserving delete although at some cost, like allowing the items array to become non-dense until the next rehash. Finally, in face of these two major changes, all code used in khash.h has been integrated into map.c and friends. Given the heavy edits it makes no sense to "layer" the code into a vendored and a wrapper part. Rather, the layered cake follows the specialization depth: code shared for all maps, code specialized to a key type (and its equivalence relation), and finally code specialized to value+key type. --- test/client/session.lua | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'test/client') diff --git a/test/client/session.lua b/test/client/session.lua index 0509fa88be..b1bf5ea75e 100644 --- a/test/client/session.lua +++ b/test/client/session.lua @@ -73,6 +73,11 @@ function Session:next_message(timeout) return table.remove(self._pending_messages, 1) end + -- if closed, only return pending messages + if self.closed then + return nil + end + self:_run(on_request, on_notification, timeout) return table.remove(self._pending_messages, 1) end @@ -139,6 +144,7 @@ function Session:close(signal) if not self._timer:is_closing() then self._timer:close() end if not self._prepare:is_closing() then self._prepare:close() end self._msgpack_rpc_stream:close(signal) + self.closed = true end function Session:_yielding_request(method, args) -- cgit