aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-06-24 13:47:36 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-06-24 13:47:36 -0300
commit9f1b9726fb28f78fba7c50efa5316c4bf2413cf8 (patch)
tree4fb8f7d5e1d7f4153b508e987f268c0222431152
parente1264412f80ac8bab15e8e960b93a30ca9254418 (diff)
parent296da85198a7d5da36dbb2e6f213edb5da511635 (diff)
downloadrneovim-9f1b9726fb28f78fba7c50efa5316c4bf2413cf8.tar.gz
rneovim-9f1b9726fb28f78fba7c50efa5316c4bf2413cf8.tar.bz2
rneovim-9f1b9726fb28f78fba7c50efa5316c4bf2413cf8.zip
Merge pull request #872 'Add support for client-side RPC'
-rw-r--r--scripts/msgpack-gen.lua50
-rw-r--r--src/nvim/api/private/defs.h18
-rw-r--r--src/nvim/api/private/helpers.c4
-rw-r--r--src/nvim/api/private/helpers.h51
-rw-r--r--src/nvim/api/vim.c10
-rw-r--r--src/nvim/eval.c45
-rw-r--r--src/nvim/os/channel.c307
-rw-r--r--src/nvim/os/channel.h2
-rw-r--r--src/nvim/os/event.c22
-rw-r--r--src/nvim/os/input.c22
-rw-r--r--src/nvim/os/job.c5
-rw-r--r--src/nvim/os/msgpack_rpc.c560
-rw-r--r--src/nvim/os/msgpack_rpc.h164
-rw-r--r--src/nvim/os/msgpack_rpc_helpers.c380
-rw-r--r--src/nvim/os/msgpack_rpc_helpers.h124
-rw-r--r--src/nvim/os/wstream.c32
16 files changed, 1126 insertions, 670 deletions
diff --git a/scripts/msgpack-gen.lua b/scripts/msgpack-gen.lua
index e7d5d5a503..e2cc267191 100644
--- a/scripts/msgpack-gen.lua
+++ b/scripts/msgpack-gen.lua
@@ -91,6 +91,8 @@ output:write([[
#include <msgpack.h>
#include "nvim/os/msgpack_rpc.h"
+#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/api/private/helpers.h"
]])
for i = 1, #headers do
@@ -120,20 +122,13 @@ output:write([[
};
const unsigned int msgpack_metadata_size = sizeof(msgpack_metadata);
-void msgpack_rpc_dispatch(uint64_t channel_id, msgpack_object *req, msgpack_packer *res)
+Object msgpack_rpc_dispatch(uint64_t channel_id,
+ uint64_t method_id,
+ msgpack_object *req,
+ Error *error)
{
- Error error = { .set = false };
- uint64_t method_id = (uint32_t)req->via.array.ptr[2].via.u64;
-
+ Object ret = NIL;
switch (method_id) {
- case 0:
- msgpack_pack_nil(res);
- // The result is the [channel_id, metadata] array
- msgpack_pack_array(res, 2);
- msgpack_pack_uint64(res, channel_id);
- msgpack_pack_raw(res, sizeof(msgpack_metadata));
- msgpack_pack_raw_body(res, msgpack_metadata, sizeof(msgpack_metadata));
- return;
]])
-- Visit each function metadata to build the case label with code generated
@@ -145,8 +140,7 @@ for i = 1, #api.functions do
output:write('\n case '..fn.id..': {')
output:write('\n if (req->via.array.ptr[3].via.array.size != '..#fn.parameters..') {')
- output:write('\n snprintf(error.msg, sizeof(error.msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
- output:write('\n msgpack_rpc_error(error.msg, res);')
+ output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
-- Declare/initialize variables that will hold converted arguments
@@ -164,7 +158,9 @@ for i = 1, #api.functions do
converted = 'arg_'..j
convert_arg = 'msgpack_rpc_to_'..string.lower(param[1])
output:write('\n if (!'..convert_arg..'('..arg..', &'..converted..')) {')
- output:write('\n msgpack_rpc_error("Wrong type for argument '..j..', expecting '..param[1]..'", res);')
+ output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
+
+ output:write('\n error->set = true;')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
args[#args + 1] = converted
@@ -195,28 +191,20 @@ for i = 1, #api.functions do
if fn.can_fail then
-- if the function can fail, also pass a pointer to the local error object
if #args > 0 then
- output:write(', &error);\n')
+ output:write(', error);\n')
else
- output:write('&error);\n')
+ output:write('error);\n')
end
-- and check for the error
- output:write('\n if (error.set) {')
- output:write('\n msgpack_rpc_error(error.msg, res);')
+ output:write('\n if (error->set) {')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
else
output:write(');\n')
end
- -- nil error
- output:write('\n msgpack_pack_nil(res);');
-
- if fn.return_type == 'void' then
- output:write('\n msgpack_pack_nil(res);');
- else
- output:write('\n msgpack_rpc_from_'..string.lower(fn.return_type)..'(rv, res);')
- -- free the return value
- output:write('\n msgpack_rpc_free_'..string.lower(fn.return_type)..'(rv);')
+ if fn.return_type ~= 'void' then
+ output:write('\n ret = '..string.upper(fn.return_type)..'_OBJ(rv);')
end
-- Now generate the cleanup label for freeing memory allocated for the
-- arguments
@@ -226,7 +214,7 @@ for i = 1, #api.functions do
local param = fn.parameters[j]
output:write('\n msgpack_rpc_free_'..string.lower(param[1])..'(arg_'..j..');')
end
- output:write('\n return;');
+ output:write('\n break;');
output:write('\n };\n');
end
@@ -235,8 +223,10 @@ output:write([[
default:
- msgpack_rpc_error("Invalid function id", res);
+ snprintf(error->msg, sizeof(error->msg), "Invalid function id");
+ error->set = true;
}
+ return ret;
}
]])
output:close()
diff --git a/src/nvim/api/private/defs.h b/src/nvim/api/private/defs.h
index ee0fc02c4d..b049412014 100644
--- a/src/nvim/api/private/defs.h
+++ b/src/nvim/api/private/defs.h
@@ -65,8 +65,16 @@ typedef enum {
kObjectTypeInteger,
kObjectTypeFloat,
kObjectTypeString,
+ kObjectTypeBuffer,
+ kObjectTypeWindow,
+ kObjectTypeTabpage,
kObjectTypeArray,
- kObjectTypeDictionary
+ kObjectTypeDictionary,
+ kObjectTypePosition,
+ kObjectTypeStringArray,
+ kObjectTypeBufferArray,
+ kObjectTypeWindowArray,
+ kObjectTypeTabpageArray,
} ObjectType;
struct object {
@@ -76,8 +84,16 @@ struct object {
Integer integer;
Float floating;
String string;
+ Buffer buffer;
+ Window window;
+ Tabpage tabpage;
Array array;
Dictionary dictionary;
+ Position position;
+ StringArray stringarray;
+ BufferArray bufferarray;
+ WindowArray windowarray;
+ TabpageArray tabpagearray;
} data;
};
diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c
index 30301e9368..024f0c2405 100644
--- a/src/nvim/api/private/helpers.c
+++ b/src/nvim/api/private/helpers.c
@@ -341,7 +341,7 @@ String cstr_to_string(const char *str)
};
}
-static bool object_to_vim(Object obj, typval_T *tv, Error *err)
+bool object_to_vim(Object obj, typval_T *tv, Error *err)
{
tv->v_type = VAR_UNKNOWN;
tv->v_lock = 0;
@@ -426,6 +426,8 @@ static bool object_to_vim(Object obj, typval_T *tv, Error *err)
}
tv->vval.v_dict->dv_refcount++;
break;
+ default:
+ abort();
}
return true;
diff --git a/src/nvim/api/private/helpers.h b/src/nvim/api/private/helpers.h
index e1e1a35490..f1b9dc3bc8 100644
--- a/src/nvim/api/private/helpers.h
+++ b/src/nvim/api/private/helpers.h
@@ -14,7 +14,9 @@
err->set = true; \
} while (0)
-#define BOOL_OBJ(b) ((Object) { \
+#define OBJECT_OBJ(o) o
+
+#define BOOLEAN_OBJ(b) ((Object) { \
.type = kObjectTypeBoolean, \
.data.boolean = b \
})
@@ -26,26 +28,59 @@
#define STRING_OBJ(s) ((Object) { \
.type = kObjectTypeString, \
- .data.string = cstr_to_string(s) \
+ .data.string = s \
})
-#define STRINGL_OBJ(d, s) ((Object) { \
- .type = kObjectTypeString, \
- .data.string = (String) { \
- .size = s, \
- .data = xmemdup(d, s) \
- }})
+#define BUFFER_OBJ(s) ((Object) { \
+ .type = kObjectTypeBuffer, \
+ .data.buffer = s \
+ })
+
+#define WINDOW_OBJ(s) ((Object) { \
+ .type = kObjectTypeWindow, \
+ .data.window = s \
+ })
+
+#define TABPAGE_OBJ(s) ((Object) { \
+ .type = kObjectTypeTabpage, \
+ .data.tabpage = s \
+ })
#define ARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeArray, \
.data.array = a \
})
+#define STRINGARRAY_OBJ(a) ((Object) { \
+ .type = kObjectTypeStringArray, \
+ .data.stringarray = a \
+ })
+
+#define BUFFERARRAY_OBJ(a) ((Object) { \
+ .type = kObjectTypeBufferArray, \
+ .data.bufferarray = a \
+ })
+
+#define WINDOWARRAY_OBJ(a) ((Object) { \
+ .type = kObjectTypeWindowArray, \
+ .data.windowarray = a \
+ })
+
+#define TABPAGEARRAY_OBJ(a) ((Object) { \
+ .type = kObjectTypeTabpageArray, \
+ .data.tabpagearray = a \
+ })
+
#define DICTIONARY_OBJ(d) ((Object) { \
.type = kObjectTypeDictionary, \
.data.dictionary = d \
})
+#define POSITION_OBJ(p) ((Object) { \
+ .type = kObjectTypePosition, \
+ .data.position = p \
+ })
+
#define NIL ((Object) {.type = kObjectTypeNil})
#define PUT(dict, k, v) \
diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c
index e7261e1096..fbeb42cf4b 100644
--- a/src/nvim/api/vim.c
+++ b/src/nvim/api/vim.c
@@ -424,8 +424,8 @@ void vim_set_current_tabpage(Tabpage tabpage, Error *err)
/// @param event The event type string
void vim_subscribe(uint64_t channel_id, String event)
{
- size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN);
- char e[EVENT_MAXLEN + 1];
+ size_t length = (event.size < METHOD_MAXLEN ? event.size : METHOD_MAXLEN);
+ char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_subscribe(channel_id, e);
@@ -437,8 +437,10 @@ void vim_subscribe(uint64_t channel_id, String event)
/// @param event The event type string
void vim_unsubscribe(uint64_t channel_id, String event)
{
- size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN);
- char e[EVENT_MAXLEN + 1];
+ size_t length = (event.size < METHOD_MAXLEN ?
+ event.size :
+ METHOD_MAXLEN);
+ char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_unsubscribe(channel_id, e);
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 92075e46f8..adc411afc7 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -71,6 +71,7 @@
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
#include "nvim/api/private/helpers.h"
+#include "nvim/os/msgpack_rpc_helpers.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@@ -6453,6 +6454,7 @@ static struct fst {
{"searchpair", 3, 7, f_searchpair},
{"searchpairpos", 3, 7, f_searchpairpos},
{"searchpos", 1, 4, f_searchpos},
+ {"send_call", 3, 3, f_send_call},
{"send_event", 3, 3, f_send_event},
{"setbufvar", 3, 3, f_setbufvar},
{"setcmdpos", 1, 1, f_setcmdpos},
@@ -10474,6 +10476,7 @@ static void f_job_start(typval_T *argvars, typval_T *rettv)
on_job_stderr,
on_job_exit,
true,
+ 0,
&rettv->vval.v_number);
if (rettv->vval.v_number <= 0) {
@@ -10535,6 +10538,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
if (!job) {
// Invalid job id
EMSG(_(e_invjob));
+ return;
}
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
@@ -12523,6 +12527,47 @@ do_searchpair (
return retval;
}
+// "send_call()" function
+static void f_send_call(typval_T *argvars, typval_T *rettv)
+{
+ rettv->v_type = VAR_NUMBER;
+ rettv->vval.v_number = 0;
+
+ if (check_restricted() || check_secure()) {
+ return;
+ }
+
+ if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) {
+ EMSG2(_(e_invarg2), "Channel id must be a positive integer");
+ return;
+ }
+
+ if (argvars[1].v_type != VAR_STRING) {
+ EMSG2(_(e_invarg2), "Method name must be a string");
+ return;
+ }
+
+ bool errored;
+ Object result;
+ if (!channel_send_call((uint64_t)argvars[0].vval.v_number,
+ (char *)argvars[1].vval.v_string,
+ vim_to_object(&argvars[2]),
+ &result,
+ &errored)) {
+ EMSG2(_(e_invarg2), "Channel doesn't exist");
+ return;
+ }
+
+ Error conversion_error = {.set = false};
+ if (errored || !object_to_vim(result, rettv, &conversion_error)) {
+ EMSG(errored ?
+ result.data.string.data :
+ _("Error converting the call result"));
+ }
+
+ msgpack_rpc_free_object(result);
+}
+
// "send_event()" function
static void f_send_event(typval_T *argvars, typval_T *rettv)
{
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 653f09756a..9bba247a7b 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -5,6 +5,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
+#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
@@ -12,17 +13,24 @@
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/msgpack_rpc.h"
+#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
+#include "nvim/message.h"
#include "nvim/map.h"
#include "nvim/lib/kvec.h"
typedef struct {
+ uint64_t request_id;
+ bool errored;
+ Object result;
+} ChannelCallFrame;
+
+typedef struct {
uint64_t id;
PMap(cstr_t) *subscribed_events;
- bool is_job;
+ bool is_job, enabled;
msgpack_unpacker *unpacker;
- msgpack_sbuffer *sbuffer;
union {
Job *job;
struct {
@@ -31,12 +39,15 @@ typedef struct {
uv_stream_t *uv;
} streams;
} data;
+ uint64_t next_request_id;
+ kvec_t(ChannelCallFrame *) call_stack;
+ size_t rpc_call_level;
} Channel;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
-static msgpack_sbuffer msgpack_event_buffer;
+static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/channel.c.generated.h"
@@ -47,7 +58,7 @@ void channel_init()
{
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
- msgpack_sbuffer_init(&msgpack_event_buffer);
+ msgpack_sbuffer_init(&out_buffer);
}
/// Teardown the module
@@ -80,6 +91,7 @@ bool channel_from_job(char **argv)
job_err,
job_exit,
true,
+ 0,
&status);
if (status <= 0) {
@@ -104,7 +116,7 @@ void channel_from_stream(uv_stream_t *stream)
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
- channel->data.streams.write = wstream_new(1024 * 1024);
+ channel->data.streams.write = wstream_new(0);
wstream_set_stream(channel->data.streams.write, stream);
channel->data.streams.uv = stream;
}
@@ -113,26 +125,98 @@ void channel_from_stream(uv_stream_t *stream)
///
/// @param id The channel id. If 0, the event will be sent to all
/// channels that have subscribed to the event type
-/// @param type The event type, an arbitrary string
-/// @param obj The event data
+/// @param name The event name, an arbitrary string
+/// @param arg The event arg
/// @return True if the data was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *type, Object data)
+bool channel_send_event(uint64_t id, char *name, Object arg)
{
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id))) {
- msgpack_rpc_free_object(data);
+ msgpack_rpc_free_object(arg);
return false;
}
- send_event(channel, type, data);
+ send_event(channel, name, arg);
} else {
- broadcast_event(type, data);
+ broadcast_event(name, arg);
}
return true;
}
+bool channel_send_call(uint64_t id,
+ char *name,
+ Object arg,
+ Object *result,
+ bool *errored)
+{
+ Channel *channel = NULL;
+
+ if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ msgpack_rpc_free_object(arg);
+ return false;
+ }
+
+ if (kv_size(channel->call_stack) > 20) {
+ // 20 stack depth is more than anyone should ever need for RPC calls
+ *errored = true;
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Channel %" PRIu64 " was closed due to a high stack depth "
+ "while processing a RPC call",
+ channel->id);
+ *result = STRING_OBJ(cstr_to_string(buf));
+ }
+
+ uint64_t request_id = channel->next_request_id++;
+ // Send the msgpack-rpc request
+ send_request(channel, request_id, name, arg);
+
+ if (!kv_size(channel->call_stack)) {
+ // This is the first frame, we must disable event deferral for this
+ // channel because we won't be returning until the client sends a
+ // response
+ if (channel->is_job) {
+ job_set_defer(channel->data.job, false);
+ } else {
+ rstream_set_defer(channel->data.streams.read, false);
+ }
+ }
+
+ // Push the frame
+ ChannelCallFrame frame = {request_id, false, NIL};
+ kv_push(ChannelCallFrame *, channel->call_stack, &frame);
+ size_t size = kv_size(channel->call_stack);
+
+ do {
+ event_poll(-1);
+ } while (
+ // Continue running if ...
+ channel->enabled && // the channel is still enabled
+ kv_size(channel->call_stack) >= size); // the call didn't return
+
+ if (!kv_size(channel->call_stack)) {
+ // Popped last frame, restore event deferral
+ if (channel->is_job) {
+ job_set_defer(channel->data.job, true);
+ } else {
+ rstream_set_defer(channel->data.streams.read, true);
+ }
+ if (!channel->enabled && !channel->rpc_call_level) {
+ // Close the channel if it has been disabled and we have not been called
+ // by `parse_msgpack`(It would be unsafe to close the channel otherwise)
+ close_channel(channel);
+ }
+ }
+
+ *errored = frame.errored;
+ *result = frame.result;
+
+ return true;
+}
+
/// Subscribes to event broadcasts
///
/// @param id The channel id
@@ -191,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
Channel *channel = data;
if (eof) {
- close_channel(channel);
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Before returning from a RPC call, channel %" PRIu64 " was "
+ "closed by the client",
+ channel->id);
+ disable_channel(channel, buf);
return;
}
+ channel->rpc_call_level++;
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
@@ -205,23 +296,34 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
UnpackResult result;
- msgpack_packer response;
// Deserialize everything we can.
while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
== kUnpackResultOk) {
- // Each object is a new msgpack-rpc request and requires an empty response
- msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
+ if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
+ if (is_valid_rpc_response(&unpacked.data, channel)) {
+ call_stack_pop(&unpacked.data, channel);
+ } else {
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Channel %" PRIu64 " returned a response that doesn't have "
+ " a matching id for the current RPC call. Ensure the client "
+ " is properly synchronized",
+ channel->id);
+ call_stack_unwind(channel, buf, 1);
+ }
+ msgpack_unpacked_destroy(&unpacked);
+ // Bail out from this event loop iteration
+ goto end;
+ }
+
// Perform the call
- msgpack_rpc_call(channel->id, &unpacked.data, &response);
- wstream_write(channel->data.streams.write,
- wstream_new_buffer(xmemdup(channel->sbuffer->data,
- channel->sbuffer->size),
- channel->sbuffer->size,
- free));
-
- // Clear the buffer for future calls
- msgpack_sbuffer_clear(channel->sbuffer);
+ WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
+ // write the response
+ if (!channel_write(channel, resp)) {
+ goto end;
+ }
}
if (result == kUnpackResultFail) {
@@ -231,50 +333,87 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// A not so uncommon cause for this might be deserializing objects with
// a high nesting level: msgpack will break when it's internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
- msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&response, 4);
- msgpack_pack_int(&response, 1);
- msgpack_pack_int(&response, 0);
- msgpack_rpc_error("Invalid msgpack payload. "
- "This error can also happen when deserializing "
- "an object with high level of nesting",
- &response);
- wstream_write(channel->data.streams.write,
- wstream_new_buffer(xmemdup(channel->sbuffer->data,
- channel->sbuffer->size),
- channel->sbuffer->size,
- free));
- // Clear the buffer for future calls
- msgpack_sbuffer_clear(channel->sbuffer);
+ send_error(channel, 0, "Invalid msgpack payload. "
+ "This error can also happen when deserializing "
+ "an object with high level of nesting");
+ }
+
+end:
+ channel->rpc_call_level--;
+ if (!channel->enabled && !kv_size(channel->call_stack)) {
+ // Now it's safe to destroy the channel
+ close_channel(channel);
}
}
-static void send_event(Channel *channel, char *type, Object data)
+static bool channel_write(Channel *channel, WBuffer *buffer)
{
- wstream_write(channel->data.streams.write, serialize_event(type, data));
+ bool success;
+
+ if (channel->is_job) {
+ success = job_write(channel->data.job, buffer);
+ } else {
+ success = wstream_write(channel->data.streams.write, buffer);
+ }
+
+ if (!success) {
+ // If the write failed for any reason, close the channel
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Before returning from a RPC call, channel %" PRIu64 " was "
+ "closed due to a failed write",
+ channel->id);
+ disable_channel(channel, buf);
+ }
+
+ return success;
}
-static void broadcast_event(char *type, Object data)
+static void send_error(Channel *channel, uint64_t id, char *err)
+{
+ channel_write(channel, serialize_response(id, err, NIL, &out_buffer));
+}
+
+static void send_request(Channel *channel,
+ uint64_t id,
+ char *name,
+ Object arg)
+{
+ String method = {.size = strlen(name), .data = name};
+ channel_write(channel, serialize_request(id, method, arg, &out_buffer));
+}
+
+static void send_event(Channel *channel,
+ char *name,
+ Object arg)
+{
+ String method = {.size = strlen(name), .data = name};
+ channel_write(channel, serialize_request(0, method, arg, &out_buffer));
+}
+
+static void broadcast_event(char *name, Object arg)
{
kvec_t(Channel *) subscribed;
kv_init(subscribed);
Channel *channel;
map_foreach_value(channels, channel, {
- if (pmap_has(cstr_t)(channel->subscribed_events, type)) {
+ if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
kv_push(Channel *, subscribed, channel);
}
});
if (!kv_size(subscribed)) {
- msgpack_rpc_free_object(data);
+ msgpack_rpc_free_object(arg);
goto end;
}
- WBuffer *buffer = serialize_event(type, data);
+ String method = {.size = strlen(name), .data = name};
+ WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
for (size_t i = 0; i < kv_size(subscribed); i++) {
- wstream_write(kv_A(subscribed, i)->data.streams.write, buffer);
+ channel_write(kv_A(subscribed, i), buffer);
}
end:
@@ -300,7 +439,6 @@ static void unsubscribe(Channel *channel, char *event)
static void close_channel(Channel *channel)
{
pmap_del(uint64_t)(channels, channel->id);
- msgpack_sbuffer_free(channel->sbuffer);
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
@@ -320,6 +458,7 @@ static void close_channel(Channel *channel)
});
pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
free(channel);
}
@@ -329,29 +468,69 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
-static WBuffer *serialize_event(char *type, Object data)
-{
- String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
- msgpack_packer packer;
- msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(event_type, data, &packer);
- WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data,
- msgpack_event_buffer.size),
- msgpack_event_buffer.size,
- free);
- msgpack_rpc_free_object(data);
- msgpack_sbuffer_clear(&msgpack_event_buffer);
-
- return rv;
-}
-
static Channel *register_channel()
{
Channel *rv = xmalloc(sizeof(Channel));
+ rv->enabled = true;
+ rv->rpc_call_level = 0;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- rv->sbuffer = msgpack_sbuffer_new();
rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)();
+ rv->next_request_id = 1;
+ kv_init(rv->call_stack);
pmap_put(uint64_t)(channels, rv->id, rv);
return rv;
}
+
+static bool is_rpc_response(msgpack_object *obj)
+{
+ return obj->type == MSGPACK_OBJECT_ARRAY
+ && obj->via.array.size == 4
+ && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
+ && obj->via.array.ptr[0].via.u64 == 1
+ && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
+}
+
+static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
+{
+ uint64_t response_id = obj->via.array.ptr[1].via.u64;
+ // Must be equal to the frame at the stack's bottom
+ return response_id == kv_A(channel->call_stack,
+ kv_size(channel->call_stack) - 1)->request_id;
+}
+
+static void call_stack_pop(msgpack_object *obj, Channel *channel)
+{
+ ChannelCallFrame *frame = kv_A(channel->call_stack,
+ kv_size(channel->call_stack) - 1);
+ frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
+ (void)kv_pop(channel->call_stack);
+
+ if (frame->errored) {
+ msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
+ } else {
+ msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
+ }
+}
+
+static void call_stack_unwind(Channel *channel, char *msg, int count)
+{
+ while (kv_size(channel->call_stack) && count--) {
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
+ frame->errored = true;
+ frame->result = STRING_OBJ(cstr_to_string(msg));
+ }
+}
+
+static void disable_channel(Channel *channel, char *msg)
+{
+ if (kv_size(channel->call_stack)) {
+ // Channel is currently in the middle of a call, remove all frames and mark
+ // it as "dead"
+ channel->enabled = false;
+ call_stack_unwind(channel, msg, -1);
+ } else {
+ // Safe to close it now
+ close_channel(channel);
+ }
+}
diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h
index f12d54cede..ce04abb76d 100644
--- a/src/nvim/os/channel.h
+++ b/src/nvim/os/channel.h
@@ -6,7 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/vim.h"
-#define EVENT_MAXLEN 512
+#define METHOD_MAXLEN 512
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/channel.h.generated.h"
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index 6723b97e0c..a8bd6ca886 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -63,11 +63,6 @@ bool event_poll(int32_t ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
- if (input_ready()) {
- // If there's a pending input event to be consumed, do it now
- return true;
- }
-
static int recursive = 0;
if (!(recursive++)) {
@@ -95,17 +90,16 @@ bool event_poll(int32_t ms)
run_mode = UV_RUN_NOWAIT;
}
+ bool events_processed;
+
do {
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
uv_run(uv_default_loop(), run_mode);
- // Process immediate events outside uv_run since libuv event loop not
- // support recursion(processing events may cause a recursive event_poll
- // call)
- event_process(false);
+ events_processed = event_process(false);
} while (
// Continue running if ...
- !input_ready() && // we have no input
+ !events_processed && // we didn't process any immediate events
!event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
!timer_data.timed_out); // we didn't get a timeout
@@ -124,7 +118,7 @@ bool event_poll(int32_t ms)
event_process(false);
}
- return input_ready() || event_has_deferred();
+ return !timer_data.timed_out && (events_processed || event_has_deferred());
}
bool event_has_deferred()
@@ -139,11 +133,13 @@ void event_push(Event event, bool deferred)
}
// Runs the appropriate action for each queued event
-void event_process(bool deferred)
+bool event_process(bool deferred)
{
+ bool processed_events = false;
Event event;
while (kl_shift(Event, get_queue(deferred), &event) == 0) {
+ processed_events = true;
switch (event.type) {
case kEventSignal:
signal_handle(event);
@@ -158,6 +154,8 @@ void event_process(bool deferred)
abort();
}
}
+
+ return processed_events;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index 6e42cba4ad..0f6d2df12f 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -37,12 +37,6 @@ void input_init()
rstream_set_file(read_stream, read_cmd_fd);
}
-// Check if there's pending input
-bool input_ready()
-{
- return rstream_available(read_stream) > 0 || eof;
-}
-
// Listen for input
void input_start()
{
@@ -119,7 +113,7 @@ bool os_char_avail()
// In cooked mode we should get SIGINT, no need to check.
void os_breakcheck()
{
- if (curr_tmode == TMODE_RAW && event_poll(0))
+ if (curr_tmode == TMODE_RAW && input_poll(0))
fill_input_buf(false);
}
@@ -132,6 +126,11 @@ bool os_isatty(int fd)
return uv_guess_handle(fd) == UV_TTY;
}
+static bool input_poll(int32_t ms)
+{
+ return input_ready() || event_poll(ms) || input_ready();
+}
+
// This is a replacement for the old `WaitForChar` function in os_unix.c
static InbufPollResult inbuf_poll(int32_t ms)
{
@@ -139,7 +138,7 @@ static InbufPollResult inbuf_poll(int32_t ms)
return kInputAvail;
}
- if (event_poll(ms)) {
+ if (input_poll(ms)) {
return eof && rstream_available(read_stream) == 0 ?
kInputEof :
kInputAvail;
@@ -196,3 +195,10 @@ static int push_event_key(uint8_t *buf, int maxlen)
return buf_idx;
}
+
+// Check if there's pending input
+bool input_ready()
+{
+ return rstream_available(read_stream) > 0 || eof;
+}
+
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index b369004e47..dcf50243a9 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -21,7 +21,6 @@
#define EXIT_TIMEOUT 25
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 1024
-#define JOB_WRITE_MAXMEM 1024 * 1024
struct job {
// Job id the index in the job table plus one.
@@ -131,6 +130,7 @@ void job_teardown()
/// @param exit_cb Callback that will be invoked when the job exits
/// @param defer If the job callbacks invocation should be deferred to vim
/// main loop
+/// @param maxmem Maximum amount of memory used by the job WStream
/// @param[out] The job id if the job started successfully, 0 if the job table
/// is full, -1 if the program could not be executed.
/// @return The job pointer if the job started successfully, NULL otherwise
@@ -140,6 +140,7 @@ Job *job_start(char **argv,
rstream_cb stderr_cb,
job_exit_cb job_exit_cb,
bool defer,
+ size_t maxmem,
int *status)
{
int i;
@@ -210,7 +211,7 @@ Job *job_start(char **argv,
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
handle_set_job((uv_handle_t *)&job->proc_stderr, job);
- job->in = wstream_new(JOB_WRITE_MAXMEM);
+ job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
index 63e1245028..85569372da 100644
--- a/src/nvim/os/msgpack_rpc.c
+++ b/src/nvim/os/msgpack_rpc.c
@@ -3,128 +3,102 @@
#include <msgpack.h>
-#include "nvim/os/msgpack_rpc.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
+#include "nvim/os/wstream.h"
+#include "nvim/os/msgpack_rpc.h"
+#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/api/private/helpers.h"
+#include "nvim/func_attr.h"
-#define REMOTE_FUNCS_IMPL(t, lt) \
- bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
- { \
- *arg = obj->via.u64; \
- return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \
- } \
- \
- void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \
- { \
- msgpack_pack_uint64(res, result); \
- }
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "os/msgpack_rpc.c.generated.h"
+#endif
-#define TYPED_ARRAY_IMPL(t, lt) \
- bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \
- { \
- if (obj->type != MSGPACK_OBJECT_ARRAY) { \
- return false; \
- } \
- \
- arg->size = obj->via.array.size; \
- arg->items = xcalloc(obj->via.array.size, sizeof(t)); \
- \
- for (size_t i = 0; i < obj->via.array.size; i++) { \
- if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \
- return false; \
- } \
- } \
- \
- return true; \
- } \
- \
- void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \
- { \
- msgpack_pack_array(res, result.size); \
- \
- for (size_t i = 0; i < result.size; i++) { \
- msgpack_rpc_from_##lt(result.items[i], res); \
- } \
- } \
- \
- void msgpack_rpc_free_##lt##array(t##Array value) { \
- for (size_t i = 0; i < value.size; i++) { \
- msgpack_rpc_free_##lt(value.items[i]); \
- } \
- \
- free(value.items); \
- }
+extern const uint8_t msgpack_metadata[];
+extern const unsigned int msgpack_metadata_size;
-void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
+/// Validates the basic structure of the msgpack-rpc call and fills `res`
+/// with the basic response structure.
+///
+/// @param channel_id The channel id
+/// @param req The parsed request object
+/// @param res A packer that contains the response
+WBuffer *msgpack_rpc_call(uint64_t channel_id,
+ msgpack_object *req,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ARG(2)
+ FUNC_ATTR_NONNULL_ARG(3)
{
- // The initial response structure is the same no matter what happens,
- // we set it up here
- // Array of size 4
- msgpack_pack_array(res, 4);
- // Response type is 1
- msgpack_pack_int(res, 1);
+ uint64_t response_id;
+ char *err = msgpack_rpc_validate(&response_id, req);
- // Validate the basic structure of the msgpack-rpc payload
- if (req->type != MSGPACK_OBJECT_ARRAY) {
- msgpack_pack_int(res, 0); // no message id yet
- msgpack_rpc_error("Request is not an array", res);
- return;
+ if (err) {
+ return serialize_response(response_id, err, NIL, sbuffer);
}
- if (req->via.array.size != 4) {
- msgpack_pack_int(res, 0); // no message id yet
- char error_msg[256];
- snprintf(error_msg,
- sizeof(error_msg),
- "Request array size is %u, it should be 4",
- req->via.array.size);
- msgpack_rpc_error(error_msg, res);
- return;
- }
+ uint64_t method_id = req->via.array.ptr[2].via.u64;
- if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- msgpack_pack_int(res, 0); // no message id yet
- msgpack_rpc_error("Id must be a positive integer", res);
- return;
+ if (method_id == 0) {
+ return serialize_metadata(response_id, channel_id, sbuffer);
}
- // Set the response id, which is the same as the request
- msgpack_pack_uint64(res, req->via.array.ptr[1].via.u64);
+ // dispatch the call
+ Error error = { .set = false };
+ Object rv = msgpack_rpc_dispatch(channel_id, method_id, req, &error);
+ // send the response
+ msgpack_packer response;
+ msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
- if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- msgpack_rpc_error("Message type must be an integer", res);
- return;
+ if (error.set) {
+ return serialize_response(response_id, error.msg, NIL, sbuffer);
}
- if (req->via.array.ptr[0].via.u64 != 0) {
- msgpack_rpc_error("Message type must be 0", res);
- return;
- }
+ return serialize_response(response_id, NULL, rv, sbuffer);
+}
- if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- msgpack_rpc_error("Method id must be a positive integer", res);
- return;
+/// Try to unpack a msgpack document from the data in the unpacker buffer. This
+/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets
+/// the called know if the unpacking failed due to bad input or due to missing
+/// data.
+///
+/// @param unpacker The unpacker containing the parse buffer
+/// @param result The result which will contain the parsed object
+/// @return kUnpackResultOk : An object was parsed
+/// kUnpackResultFail : Got bad input
+/// kUnpackResultNeedMore: Need more data
+UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
+ msgpack_unpacked* result)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (result->zone != NULL) {
+ msgpack_zone_free(result->zone);
}
- if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
- msgpack_rpc_error("Paremeters must be an array", res);
- return;
+ int res = msgpack_unpacker_execute(unpacker);
+
+ if (res > 0) {
+ result->zone = msgpack_unpacker_release_zone(unpacker);
+ result->data = msgpack_unpacker_data(unpacker);
+ msgpack_unpacker_reset(unpacker);
+ return kUnpackResultOk;
}
- // dispatch the message
- msgpack_rpc_dispatch(id, req, res);
-}
+ if (res < 0) {
+ // Since we couldn't parse it, destroy the data consumed so far
+ msgpack_unpacker_reset(unpacker);
+ return kUnpackResultFail;
+ }
-void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac)
-{
- msgpack_pack_array(pac, 3);
- msgpack_pack_int(pac, 2);
- msgpack_pack_raw(pac, type.size);
- msgpack_pack_raw_body(pac, type.data, type.size);
- msgpack_rpc_from_object(data, pac);
+ return kUnpackResultNeedMore;
}
+/// Finishes the msgpack-rpc call with an error message.
+///
+/// @param msg The error message
+/// @param res A packer that contains the response
void msgpack_rpc_error(char *msg, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ALL
{
size_t len = strlen(msg);
@@ -135,302 +109,128 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
msgpack_pack_nil(res);
}
-bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
-{
- *arg = obj->via.boolean;
- return obj->type == MSGPACK_OBJECT_BOOLEAN;
-}
-
-bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
-{
- if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
- && obj->via.u64 <= INT64_MAX) {
- *arg = (int64_t)obj->via.u64;
- return true;
- }
-
- *arg = obj->via.i64;
- return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
-}
-
-bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
-{
- *arg = obj->via.dec;
- return obj->type == MSGPACK_OBJECT_DOUBLE;
-}
-
-bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
-{
- if (obj->type != MSGPACK_OBJECT_RAW) {
- return false;
- }
-
- arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size);
- arg->size = obj->via.raw.size;
- return true;
-}
-
-bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
-{
- switch (obj->type) {
- case MSGPACK_OBJECT_NIL:
- arg->type = kObjectTypeNil;
- return true;
-
- case MSGPACK_OBJECT_BOOLEAN:
- arg->type = kObjectTypeBoolean;
- return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
-
- case MSGPACK_OBJECT_POSITIVE_INTEGER:
- case MSGPACK_OBJECT_NEGATIVE_INTEGER:
- arg->type = kObjectTypeInteger;
- return msgpack_rpc_to_integer(obj, &arg->data.integer);
-
- case MSGPACK_OBJECT_DOUBLE:
- arg->type = kObjectTypeFloat;
- return msgpack_rpc_to_float(obj, &arg->data.floating);
-
- case MSGPACK_OBJECT_RAW:
- arg->type = kObjectTypeString;
- return msgpack_rpc_to_string(obj, &arg->data.string);
-
- case MSGPACK_OBJECT_ARRAY:
- arg->type = kObjectTypeArray;
- return msgpack_rpc_to_array(obj, &arg->data.array);
-
- case MSGPACK_OBJECT_MAP:
- arg->type = kObjectTypeDictionary;
- return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
-
- default:
- return false;
- }
-}
-
-bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
-{
- return obj->type == MSGPACK_OBJECT_ARRAY
- && obj->via.array.size == 2
- && msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row)
- && msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col);
-}
-
-
-bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
-{
- if (obj->type != MSGPACK_OBJECT_ARRAY) {
- return false;
- }
-
- arg->size = obj->via.array.size;
- arg->items = xcalloc(obj->via.array.size, sizeof(Object));
-
- for (uint32_t i = 0; i < obj->via.array.size; i++) {
- if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
- return false;
- }
- }
-
- return true;
-}
-
-bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
-{
- if (obj->type != MSGPACK_OBJECT_MAP) {
- return false;
- }
-
- arg->size = obj->via.array.size;
- arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
-
-
- for (uint32_t i = 0; i < obj->via.map.size; i++) {
- if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
- &arg->items[i].key)) {
- return false;
- }
-
- if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
- &arg->items[i].value)) {
- return false;
- }
- }
-
- return true;
-}
-
-void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
-{
- if (result) {
- msgpack_pack_true(res);
+/// Serializes a msgpack-rpc request or notification(id == 0)
+WBuffer *serialize_request(uint64_t request_id,
+ String method,
+ Object arg,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ARG(4)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, request_id ? 4 : 3);
+ msgpack_pack_int(&pac, request_id ? 0 : 2);
+
+ if (request_id) {
+ msgpack_pack_uint64(&pac, request_id);
+ }
+
+ msgpack_pack_raw(&pac, method.size);
+ msgpack_pack_raw_body(&pac, method.data, method.size);
+ msgpack_rpc_from_object(arg, &pac);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ free);
+ msgpack_rpc_free_object(arg);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+/// Serializes a msgpack-rpc response
+WBuffer *serialize_response(uint64_t response_id,
+ char *err_msg,
+ Object arg,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ARG(4)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, 4);
+ msgpack_pack_int(&pac, 1);
+ msgpack_pack_uint64(&pac, response_id);
+
+ if (err_msg) {
+ String err = {.size = strlen(err_msg), .data = err_msg};
+ // error message
+ msgpack_pack_raw(&pac, err.size);
+ msgpack_pack_raw_body(&pac, err.data, err.size);
+ // Nil result
+ msgpack_pack_nil(&pac);
} else {
- msgpack_pack_false(res);
- }
-}
-
-void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
-{
- msgpack_pack_int64(res, result);
-}
-
-void msgpack_rpc_from_float(Float result, msgpack_packer *res)
-{
- msgpack_pack_double(res, result);
-}
-
-void msgpack_rpc_from_string(String result, msgpack_packer *res)
-{
- msgpack_pack_raw(res, result.size);
- msgpack_pack_raw_body(res, result.data, result.size);
-}
-
-void msgpack_rpc_from_object(Object result, msgpack_packer *res)
-{
- switch (result.type) {
- case kObjectTypeNil:
- msgpack_pack_nil(res);
- break;
-
- case kObjectTypeBoolean:
- msgpack_rpc_from_boolean(result.data.boolean, res);
- break;
-
- case kObjectTypeInteger:
- msgpack_rpc_from_integer(result.data.integer, res);
- break;
-
- case kObjectTypeFloat:
- msgpack_rpc_from_float(result.data.floating, res);
- break;
-
- case kObjectTypeString:
- msgpack_rpc_from_string(result.data.string, res);
- break;
-
- case kObjectTypeArray:
- msgpack_rpc_from_array(result.data.array, res);
- break;
-
- case kObjectTypeDictionary:
- msgpack_rpc_from_dictionary(result.data.dictionary, res);
- break;
-
- default:
- abort();
- }
-}
-
-void msgpack_rpc_from_position(Position result, msgpack_packer *res)
-{
- msgpack_pack_array(res, 2);;
- msgpack_pack_int64(res, result.row);
- msgpack_pack_int64(res, result.col);
-}
-
-void msgpack_rpc_from_array(Array result, msgpack_packer *res)
-{
- msgpack_pack_array(res, result.size);
-
- for (size_t i = 0; i < result.size; i++) {
- msgpack_rpc_from_object(result.items[i], res);
- }
-}
-
-void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
-{
- msgpack_pack_map(res, result.size);
-
- for (size_t i = 0; i < result.size; i++) {
- msgpack_rpc_from_string(result.items[i].key, res);
- msgpack_rpc_from_object(result.items[i].value, res);
- }
-}
-
-void msgpack_rpc_free_string(String value)
-{
- if (!value.data) {
- return;
+ // Nil error
+ msgpack_pack_nil(&pac);
+ // Return value
+ msgpack_rpc_from_object(arg, &pac);
+ }
+
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ free);
+ msgpack_rpc_free_object(arg);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+WBuffer *serialize_metadata(uint64_t id,
+ uint64_t channel_id,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ALL
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, 4);
+ msgpack_pack_int(&pac, 1);
+ msgpack_pack_uint64(&pac, id);
+ // Nil error
+ msgpack_pack_nil(&pac);
+ // The result is the [channel_id, metadata] array
+ msgpack_pack_array(&pac, 2);
+ msgpack_pack_uint64(&pac, channel_id);
+ msgpack_pack_raw(&pac, msgpack_metadata_size);
+ msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ free);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+static char *msgpack_rpc_validate(uint64_t *response_id, msgpack_object *req)
+{
+ // response id not known yet
+
+ *response_id = 0;
+ // Validate the basic structure of the msgpack-rpc payload
+ if (req->type != MSGPACK_OBJECT_ARRAY) {
+ return "Request is not an array";
}
- free(value.data);
-}
-
-void msgpack_rpc_free_object(Object value)
-{
- switch (value.type) {
- case kObjectTypeNil:
- case kObjectTypeBoolean:
- case kObjectTypeInteger:
- case kObjectTypeFloat:
- break;
-
- case kObjectTypeString:
- msgpack_rpc_free_string(value.data.string);
- break;
-
- case kObjectTypeArray:
- msgpack_rpc_free_array(value.data.array);
- break;
-
- case kObjectTypeDictionary:
- msgpack_rpc_free_dictionary(value.data.dictionary);
- break;
-
- default:
- abort();
+ if (req->via.array.size != 4) {
+ return "Request array size should be 4";
}
-}
-void msgpack_rpc_free_array(Array value)
-{
- for (uint32_t i = 0; i < value.size; i++) {
- msgpack_rpc_free_object(value.items[i]);
+ if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ return "Id must be a positive integer";
}
- free(value.items);
-}
+ // Set the response id, which is the same as the request
+ *response_id = req->via.array.ptr[1].via.u64;
-void msgpack_rpc_free_dictionary(Dictionary value)
-{
- for (uint32_t i = 0; i < value.size; i++) {
- msgpack_rpc_free_string(value.items[i].key);
- msgpack_rpc_free_object(value.items[i].value);
+ if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ return "Message type must be an integer";
}
- free(value.items);
-}
-
-UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
- msgpack_unpacked* result)
-{
- if (result->zone != NULL) {
- msgpack_zone_free(result->zone);
+ if (req->via.array.ptr[0].via.u64 != 0) {
+ return "Message type must be 0";
}
- int res = msgpack_unpacker_execute(unpacker);
-
- if (res > 0) {
- result->zone = msgpack_unpacker_release_zone(unpacker);
- result->data = msgpack_unpacker_data(unpacker);
- msgpack_unpacker_reset(unpacker);
- return kUnpackResultOk;
+ if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ return "Method id must be a positive integer";
}
- if (res < 0) {
- // Since we couldn't parse it, destroy the data consumed so far
- msgpack_unpacker_reset(unpacker);
- return kUnpackResultFail;
+ if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
+ return "Paremeters must be an array";
}
- return kUnpackResultNeedMore;
+ return NULL;
}
-
-REMOTE_FUNCS_IMPL(Buffer, buffer)
-REMOTE_FUNCS_IMPL(Window, window)
-REMOTE_FUNCS_IMPL(Tabpage, tabpage)
-
-TYPED_ARRAY_IMPL(Buffer, buffer)
-TYPED_ARRAY_IMPL(Window, window)
-TYPED_ARRAY_IMPL(Tabpage, tabpage)
-TYPED_ARRAY_IMPL(String, string)
-
diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h
index baabff20aa..b8b947c0ec 100644
--- a/src/nvim/os/msgpack_rpc.h
+++ b/src/nvim/os/msgpack_rpc.h
@@ -8,6 +8,7 @@
#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
+#include "nvim/os/wstream.h"
typedef enum {
kUnpackResultOk, /// Successfully parsed a document
@@ -15,167 +16,26 @@ typedef enum {
kUnpackResultNeedMore /// Need more data
} UnpackResult;
-/// Validates the basic structure of the msgpack-rpc call and fills `res`
-/// with the basic response structure.
-///
-/// @param id The channel id
-/// @param req The parsed request object
-/// @param res A packer that contains the response
-void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
-
-/// Packs a notification message
-///
-/// @param type The message type, an arbitrary string
-/// @param data The notification data
-/// @param packer Where the notification will be packed to
-void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac)
- FUNC_ATTR_NONNULL_ARG(3);
-
/// Dispatches to the actual API function after basic payload validation by
/// `msgpack_rpc_call`. It is responsible for validating/converting arguments
/// to C types, and converting the return value back to msgpack types.
/// The implementation is generated at compile time with metadata extracted
/// from the api/*.h headers,
///
-/// @param id The channel id
+/// @param channel_id The channel id
+/// @param method_id The method id
/// @param req The parsed request object
-/// @param res A packer that contains the response
-void msgpack_rpc_dispatch(uint64_t id,
- msgpack_object *req,
- msgpack_packer *res)
+/// @param err Pointer to error structure
+/// @return Some object
+Object msgpack_rpc_dispatch(uint64_t channel_id,
+ uint64_t method_id,
+ msgpack_object *req,
+ Error *err)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
-/// Try to unpack a msgpack document from the data in the unpacker buffer. This
-/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets
-/// the called know if the unpacking failed due to bad input or due to missing
-/// data.
-///
-/// @param unpacker The unpacker containing the parse buffer
-/// @param result The result which will contain the parsed object
-/// @return kUnpackResultOk : An object was parsed
-/// kUnpackResultFail : Got bad input
-/// kUnpackResultNeedMore: Need more data
-UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
- msgpack_unpacked* result);
-
-/// Finishes the msgpack-rpc call with an error message.
-///
-/// @param msg The error message
-/// @param res A packer that contains the response
-void msgpack_rpc_error(char *msg, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ALL;
-
-/// Functions for validating and converting from msgpack types to C types.
-/// These are used by `msgpack_rpc_dispatch` to validate and convert each
-/// argument.
-///
-/// @param obj The object to convert
-/// @param[out] arg A pointer to the avalue
-/// @return true if the conversion succeeded, false otherwise
-bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
- FUNC_ATTR_NONNULL_ALL;
-bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
- FUNC_ATTR_NONNULL_ALL;
-
-/// Functions for converting from C types to msgpack types.
-/// These are used by `msgpack_rpc_dispatch` to convert return values
-/// from the API
-///
-/// @param result A pointer to the result
-/// @param res A packer that contains the response
-void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_float(Float result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_position(Position result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_string(String result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_window(Window result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_object(Object result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_array(Array result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ARG(2);
-
-/// Helpers for initializing types that may be freed later
-#define msgpack_rpc_init_boolean
-#define msgpack_rpc_init_integer
-#define msgpack_rpc_init_float
-#define msgpack_rpc_init_position
-#define msgpack_rpc_init_string = STRING_INIT
-#define msgpack_rpc_init_buffer
-#define msgpack_rpc_init_window
-#define msgpack_rpc_init_tabpage
-#define msgpack_rpc_init_object = {.type = kObjectTypeNil}
-#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT
-#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT
-#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT
-#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT
-#define msgpack_rpc_init_array = ARRAY_DICT_INIT
-#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT
-
-/// Helpers for freeing arguments/return value
-///
-/// @param value The value to be freed
-#define msgpack_rpc_free_boolean(value)
-#define msgpack_rpc_free_integer(value)
-#define msgpack_rpc_free_float(value)
-#define msgpack_rpc_free_position(value)
-void msgpack_rpc_free_string(String value);
-#define msgpack_rpc_free_buffer(value)
-#define msgpack_rpc_free_window(value)
-#define msgpack_rpc_free_tabpage(value)
-void msgpack_rpc_free_object(Object value);
-void msgpack_rpc_free_stringarray(StringArray value);
-void msgpack_rpc_free_bufferarray(BufferArray value);
-void msgpack_rpc_free_windowarray(WindowArray value);
-void msgpack_rpc_free_tabpagearray(TabpageArray value);
-void msgpack_rpc_free_array(Array value);
-void msgpack_rpc_free_dictionary(Dictionary value);
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "os/msgpack_rpc.h.generated.h"
+#endif
#endif // NVIM_OS_MSGPACK_RPC_H
diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c
new file mode 100644
index 0000000000..e2c277abe4
--- /dev/null
+++ b/src/nvim/os/msgpack_rpc_helpers.c
@@ -0,0 +1,380 @@
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <msgpack.h>
+
+#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+
+#define REMOTE_FUNCS_IMPL(t, lt) \
+ bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
+ { \
+ *arg = obj->via.u64; \
+ return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \
+ } \
+ \
+ void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \
+ { \
+ msgpack_pack_uint64(res, result); \
+ }
+
+#define TYPED_ARRAY_IMPL(t, lt) \
+ bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \
+ { \
+ if (obj->type != MSGPACK_OBJECT_ARRAY) { \
+ return false; \
+ } \
+ \
+ arg->size = obj->via.array.size; \
+ arg->items = xcalloc(obj->via.array.size, sizeof(t)); \
+ \
+ for (size_t i = 0; i < obj->via.array.size; i++) { \
+ if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \
+ return false; \
+ } \
+ } \
+ \
+ return true; \
+ } \
+ \
+ void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \
+ { \
+ msgpack_pack_array(res, result.size); \
+ \
+ for (size_t i = 0; i < result.size; i++) { \
+ msgpack_rpc_from_##lt(result.items[i], res); \
+ } \
+ } \
+ \
+ void msgpack_rpc_free_##lt##array(t##Array value) { \
+ for (size_t i = 0; i < value.size; i++) { \
+ msgpack_rpc_free_##lt(value.items[i]); \
+ } \
+ \
+ free(value.items); \
+ }
+
+bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
+{
+ *arg = obj->via.boolean;
+ return obj->type == MSGPACK_OBJECT_BOOLEAN;
+}
+
+bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
+{
+ if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
+ && obj->via.u64 <= INT64_MAX) {
+ *arg = (int64_t)obj->via.u64;
+ return true;
+ }
+
+ *arg = obj->via.i64;
+ return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
+}
+
+bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
+{
+ *arg = obj->via.dec;
+ return obj->type == MSGPACK_OBJECT_DOUBLE;
+}
+
+bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
+{
+ if (obj->type != MSGPACK_OBJECT_RAW) {
+ return false;
+ }
+
+ arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size);
+ arg->size = obj->via.raw.size;
+ return true;
+}
+
+bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
+{
+ switch (obj->type) {
+ case MSGPACK_OBJECT_NIL:
+ arg->type = kObjectTypeNil;
+ return true;
+
+ case MSGPACK_OBJECT_BOOLEAN:
+ arg->type = kObjectTypeBoolean;
+ return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
+
+ case MSGPACK_OBJECT_POSITIVE_INTEGER:
+ case MSGPACK_OBJECT_NEGATIVE_INTEGER:
+ arg->type = kObjectTypeInteger;
+ return msgpack_rpc_to_integer(obj, &arg->data.integer);
+
+ case MSGPACK_OBJECT_DOUBLE:
+ arg->type = kObjectTypeFloat;
+ return msgpack_rpc_to_float(obj, &arg->data.floating);
+
+ case MSGPACK_OBJECT_RAW:
+ arg->type = kObjectTypeString;
+ return msgpack_rpc_to_string(obj, &arg->data.string);
+
+ case MSGPACK_OBJECT_ARRAY:
+ arg->type = kObjectTypeArray;
+ return msgpack_rpc_to_array(obj, &arg->data.array);
+
+ case MSGPACK_OBJECT_MAP:
+ arg->type = kObjectTypeDictionary;
+ return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
+
+ default:
+ return false;
+ }
+}
+
+bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
+{
+ return obj->type == MSGPACK_OBJECT_ARRAY
+ && obj->via.array.size == 2
+ && msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row)
+ && msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col);
+}
+
+
+bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
+{
+ if (obj->type != MSGPACK_OBJECT_ARRAY) {
+ return false;
+ }
+
+ arg->size = obj->via.array.size;
+ arg->items = xcalloc(obj->via.array.size, sizeof(Object));
+
+ for (uint32_t i = 0; i < obj->via.array.size; i++) {
+ if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
+{
+ if (obj->type != MSGPACK_OBJECT_MAP) {
+ return false;
+ }
+
+ arg->size = obj->via.array.size;
+ arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
+
+
+ for (uint32_t i = 0; i < obj->via.map.size; i++) {
+ if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
+ &arg->items[i].key)) {
+ return false;
+ }
+
+ if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
+ &arg->items[i].value)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
+{
+ if (result) {
+ msgpack_pack_true(res);
+ } else {
+ msgpack_pack_false(res);
+ }
+}
+
+void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
+{
+ msgpack_pack_int64(res, result);
+}
+
+void msgpack_rpc_from_float(Float result, msgpack_packer *res)
+{
+ msgpack_pack_double(res, result);
+}
+
+void msgpack_rpc_from_string(String result, msgpack_packer *res)
+{
+ msgpack_pack_raw(res, result.size);
+ msgpack_pack_raw_body(res, result.data, result.size);
+}
+
+void msgpack_rpc_from_object(Object result, msgpack_packer *res)
+{
+ switch (result.type) {
+ case kObjectTypeNil:
+ msgpack_pack_nil(res);
+ break;
+
+ case kObjectTypeBoolean:
+ msgpack_rpc_from_boolean(result.data.boolean, res);
+ break;
+
+ case kObjectTypeInteger:
+ msgpack_rpc_from_integer(result.data.integer, res);
+ break;
+
+ case kObjectTypeFloat:
+ msgpack_rpc_from_float(result.data.floating, res);
+ break;
+
+ case kObjectTypeString:
+ msgpack_rpc_from_string(result.data.string, res);
+ break;
+
+ case kObjectTypeArray:
+ msgpack_rpc_from_array(result.data.array, res);
+ break;
+
+ case kObjectTypePosition:
+ msgpack_rpc_from_position(result.data.position, res);
+ break;
+
+ case kObjectTypeBuffer:
+ msgpack_rpc_from_buffer(result.data.buffer, res);
+ break;
+
+ case kObjectTypeWindow:
+ msgpack_rpc_from_window(result.data.window, res);
+ break;
+
+ case kObjectTypeTabpage:
+ msgpack_rpc_from_tabpage(result.data.tabpage, res);
+ break;
+
+ case kObjectTypeStringArray:
+ msgpack_rpc_from_stringarray(result.data.stringarray, res);
+ break;
+
+ case kObjectTypeBufferArray:
+ msgpack_rpc_from_bufferarray(result.data.bufferarray, res);
+ break;
+
+ case kObjectTypeWindowArray:
+ msgpack_rpc_from_windowarray(result.data.windowarray, res);
+ break;
+
+ case kObjectTypeTabpageArray:
+ msgpack_rpc_from_tabpagearray(result.data.tabpagearray, res);
+ break;
+
+ case kObjectTypeDictionary:
+ msgpack_rpc_from_dictionary(result.data.dictionary, res);
+ break;
+ }
+}
+
+void msgpack_rpc_from_position(Position result, msgpack_packer *res)
+{
+ msgpack_pack_array(res, 2);;
+ msgpack_pack_int64(res, result.row);
+ msgpack_pack_int64(res, result.col);
+}
+
+void msgpack_rpc_from_array(Array result, msgpack_packer *res)
+{
+ msgpack_pack_array(res, result.size);
+
+ for (size_t i = 0; i < result.size; i++) {
+ msgpack_rpc_from_object(result.items[i], res);
+ }
+}
+
+void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
+{
+ msgpack_pack_map(res, result.size);
+
+ for (size_t i = 0; i < result.size; i++) {
+ msgpack_rpc_from_string(result.items[i].key, res);
+ msgpack_rpc_from_object(result.items[i].value, res);
+ }
+}
+
+void msgpack_rpc_free_string(String value)
+{
+ if (!value.data) {
+ return;
+ }
+
+ free(value.data);
+}
+
+void msgpack_rpc_free_object(Object value)
+{
+ switch (value.type) {
+ case kObjectTypeNil:
+ case kObjectTypeBoolean:
+ case kObjectTypeInteger:
+ case kObjectTypeFloat:
+ case kObjectTypePosition:
+ case kObjectTypeBuffer:
+ case kObjectTypeWindow:
+ case kObjectTypeTabpage:
+ break;
+
+ case kObjectTypeString:
+ msgpack_rpc_free_string(value.data.string);
+ break;
+
+ case kObjectTypeArray:
+ msgpack_rpc_free_array(value.data.array);
+ break;
+
+ case kObjectTypeStringArray:
+ msgpack_rpc_free_stringarray(value.data.stringarray);
+ break;
+
+ case kObjectTypeBufferArray:
+ msgpack_rpc_free_bufferarray(value.data.bufferarray);
+ break;
+
+ case kObjectTypeWindowArray:
+ msgpack_rpc_free_windowarray(value.data.windowarray);
+ break;
+
+ case kObjectTypeTabpageArray:
+ msgpack_rpc_free_tabpagearray(value.data.tabpagearray);
+ break;
+
+ case kObjectTypeDictionary:
+ msgpack_rpc_free_dictionary(value.data.dictionary);
+ break;
+
+ default:
+ abort();
+ }
+}
+
+void msgpack_rpc_free_array(Array value)
+{
+ for (uint32_t i = 0; i < value.size; i++) {
+ msgpack_rpc_free_object(value.items[i]);
+ }
+
+ free(value.items);
+}
+
+void msgpack_rpc_free_dictionary(Dictionary value)
+{
+ for (uint32_t i = 0; i < value.size; i++) {
+ msgpack_rpc_free_string(value.items[i].key);
+ msgpack_rpc_free_object(value.items[i].value);
+ }
+
+ free(value.items);
+}
+
+REMOTE_FUNCS_IMPL(Buffer, buffer)
+REMOTE_FUNCS_IMPL(Window, window)
+REMOTE_FUNCS_IMPL(Tabpage, tabpage)
+
+TYPED_ARRAY_IMPL(Buffer, buffer)
+TYPED_ARRAY_IMPL(Window, window)
+TYPED_ARRAY_IMPL(Tabpage, tabpage)
+TYPED_ARRAY_IMPL(String, string)
+
diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h
new file mode 100644
index 0000000000..e3d1e756ef
--- /dev/null
+++ b/src/nvim/os/msgpack_rpc_helpers.h
@@ -0,0 +1,124 @@
+#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H
+#define NVIM_OS_MSGPACK_RPC_HELPERS_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <msgpack.h>
+
+#include "nvim/func_attr.h"
+#include "nvim/api/private/defs.h"
+
+/// Functions for validating and converting from msgpack types to C types.
+/// These are used by `msgpack_rpc_dispatch` to validate and convert each
+/// argument.
+///
+/// @param obj The object to convert
+/// @param[out] arg A pointer to the avalue
+/// @return true if the conversion succeeded, false otherwise
+bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
+ FUNC_ATTR_NONNULL_ALL;
+
+/// Functions for converting from C types to msgpack types.
+/// These are used by `msgpack_rpc_dispatch` to convert return values
+/// from the API
+///
+/// @param result A pointer to the result
+/// @param res A packer that contains the response
+void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_float(Float result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_position(Position result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_string(String result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_window(Window result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_object(Object result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_array(Array result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+
+/// Helpers for initializing types that may be freed later
+#define msgpack_rpc_init_boolean
+#define msgpack_rpc_init_integer
+#define msgpack_rpc_init_float
+#define msgpack_rpc_init_position
+#define msgpack_rpc_init_string = STRING_INIT
+#define msgpack_rpc_init_buffer
+#define msgpack_rpc_init_window
+#define msgpack_rpc_init_tabpage
+#define msgpack_rpc_init_object = {.type = kObjectTypeNil}
+#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT
+#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT
+#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT
+#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT
+#define msgpack_rpc_init_array = ARRAY_DICT_INIT
+#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT
+
+/// Helpers for freeing arguments/return value
+///
+/// @param value The value to be freed
+#define msgpack_rpc_free_boolean(value)
+#define msgpack_rpc_free_integer(value)
+#define msgpack_rpc_free_float(value)
+#define msgpack_rpc_free_position(value)
+void msgpack_rpc_free_string(String value);
+#define msgpack_rpc_free_buffer(value)
+#define msgpack_rpc_free_window(value)
+#define msgpack_rpc_free_tabpage(value)
+void msgpack_rpc_free_object(Object value);
+void msgpack_rpc_free_stringarray(StringArray value);
+void msgpack_rpc_free_bufferarray(BufferArray value);
+void msgpack_rpc_free_windowarray(WindowArray value);
+void msgpack_rpc_free_tabpagearray(TabpageArray value);
+void msgpack_rpc_free_array(Array value);
+void msgpack_rpc_free_dictionary(Dictionary value);
+
+#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H
+
diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c
index 9a908a4348..13b8e8d9dc 100644
--- a/src/nvim/os/wstream.c
+++ b/src/nvim/os/wstream.c
@@ -9,6 +9,8 @@
#include "nvim/vim.h"
#include "nvim/memory.h"
+#define DEFAULT_MAXMEM 1024 * 1024 * 10
+
struct wstream {
uv_stream_t *stream;
// Memory currently used by pending buffers
@@ -43,6 +45,10 @@ typedef struct {
/// @return The newly-allocated `WStream` instance
WStream * wstream_new(size_t maxmem)
{
+ if (!maxmem) {
+ maxmem = DEFAULT_MAXMEM;
+ }
+
WStream *rv = xmalloc(sizeof(WStream));
rv->maxmem = maxmem;
rv->stream = NULL;
@@ -91,11 +97,12 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
// This should not be called after a wstream was freed
assert(!wstream->freed);
+ buffer->refcount++;
+
if (wstream->curmem > wstream->maxmem) {
- return false;
+ goto err;
}
- buffer->refcount++;
wstream->curmem += buffer->size;
data = xmalloc(sizeof(WriteData));
data->wstream = wstream;
@@ -105,9 +112,16 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
wstream->pending_reqs++;
- uv_write(req, wstream->stream, &uvbuf, 1, write_cb);
+
+ if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
+ goto err;
+ }
return true;
+
+err:
+ release_wbuffer(buffer);
+ return false;
}
/// Creates a WBuffer object for holding output data. Instances of this
@@ -138,10 +152,7 @@ static void write_cb(uv_write_t *req, int status)
free(req);
data->wstream->curmem -= data->buffer->size;
- if (!--data->buffer->refcount) {
- data->buffer->cb(data->buffer->data);
- free(data->buffer);
- }
+ release_wbuffer(data->buffer);
data->wstream->pending_reqs--;
if (data->wstream->freed && data->wstream->pending_reqs == 0) {
@@ -152,3 +163,10 @@ static void write_cb(uv_write_t *req, int status)
free(data);
}
+static void release_wbuffer(WBuffer *buffer)
+{
+ if (!--buffer->refcount) {
+ buffer->cb(buffer->data);
+ free(buffer);
+ }
+}