aboutsummaryrefslogtreecommitdiff
path: root/test/client/msgpack_rpc_stream.lua
blob: ff054ae62bb2a01aa2ec34a9a9d5db47d798375a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
local mpack = require('mpack')

-- temporary hack to be able to manipulate buffer/window/tabpage
local Buffer = {}
Buffer.__index = Buffer
function Buffer.new(id)
  return setmetatable({ id = id }, Buffer)
end
local Window = {}
Window.__index = Window
function Window.new(id)
  return setmetatable({ id = id }, Window)
end
local Tabpage = {}
Tabpage.__index = Tabpage
function Tabpage.new(id)
  return setmetatable({ id = id }, Tabpage)
end

local Response = {}
Response.__index = Response

function Response.new(msgpack_rpc_stream, request_id)
  return setmetatable({
    _msgpack_rpc_stream = msgpack_rpc_stream,
    _request_id = request_id,
  }, Response)
end

function Response:send(value, is_error)
  local data = self._msgpack_rpc_stream._session:reply(self._request_id)
  if is_error then
    data = data .. self._msgpack_rpc_stream._pack(value)
    data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
  else
    data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
    data = data .. self._msgpack_rpc_stream._pack(value)
  end
  self._msgpack_rpc_stream._stream:write(data)
end

local MsgpackRpcStream = {}
MsgpackRpcStream.__index = MsgpackRpcStream

function MsgpackRpcStream.new(stream)
  return setmetatable({
    _stream = stream,
    _pack = mpack.Packer({
      ext = {
        [Buffer] = function(o)
          return 0, mpack.encode(o.id)
        end,
        [Window] = function(o)
          return 1, mpack.encode(o.id)
        end,
        [Tabpage] = function(o)
          return 2, mpack.encode(o.id)
        end,
      },
    }),
    _session = mpack.Session({
      unpack = mpack.Unpacker({
        ext = {
          [0] = function(_c, s)
            return Buffer.new(mpack.decode(s))
          end,
          [1] = function(_c, s)
            return Window.new(mpack.decode(s))
          end,
          [2] = function(_c, s)
            return Tabpage.new(mpack.decode(s))
          end,
        },
      }),
    }),
  }, MsgpackRpcStream)
end

function MsgpackRpcStream:write(method, args, response_cb)
  local data
  if response_cb then
    assert(type(response_cb) == 'function')
    data = self._session:request(response_cb)
  else
    data = self._session:notify()
  end

  data = data .. self._pack(method) .. self._pack(args)
  self._stream:write(data)
end

function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
  self._stream:read_start(function(data)
    if not data then
      return eof_cb()
    end
    local type, id_or_cb, method_or_error, args_or_result
    local pos = 1
    local len = #data
    while pos <= len do
      type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos)
      if type == 'request' or type == 'notification' then
        if type == 'request' then
          request_cb(method_or_error, args_or_result, Response.new(self, id_or_cb))
        else
          notification_cb(method_or_error, args_or_result)
        end
      elseif type == 'response' then
        if method_or_error == mpack.NIL then
          method_or_error = nil
        else
          args_or_result = nil
        end
        id_or_cb(method_or_error, args_or_result)
      end
    end
  end)
end

function MsgpackRpcStream:read_stop()
  self._stream:read_stop()
end

function MsgpackRpcStream:close(signal)
  self._stream:close(signal)
end

return MsgpackRpcStream