aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-06-20 10:53:05 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-06-24 13:02:24 -0300
commitea7a389ec77c0031160ce860129101c603d8e0ec (patch)
tree64b415d9d820a170e82fb22d2315d3817e07b175 /src
parent09605cec03ea23e87ee285fd950a23ce8d23678d (diff)
downloadrneovim-ea7a389ec77c0031160ce860129101c603d8e0ec.tar.gz
rneovim-ea7a389ec77c0031160ce860129101c603d8e0ec.tar.bz2
rneovim-ea7a389ec77c0031160ce860129101c603d8e0ec.zip
channel: Implement the 'channel_send_call' function
This function is used to send RPC calls to clients. In contrast to `channel_send_event`, this function will block until the client sends a response(But it will continue processing requests from that client). The RPC call stack has a maximum depth of 20.
Diffstat (limited to 'src')
-rw-r--r--src/nvim/api/private/helpers.c2
-rw-r--r--src/nvim/eval.c43
-rw-r--r--src/nvim/os/channel.c189
3 files changed, 226 insertions, 8 deletions
diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c
index 30301e9368..d5ebc93f7c 100644
--- a/src/nvim/api/private/helpers.c
+++ b/src/nvim/api/private/helpers.c
@@ -341,7 +341,7 @@ String cstr_to_string(const char *str)
};
}
-static bool object_to_vim(Object obj, typval_T *tv, Error *err)
+bool object_to_vim(Object obj, typval_T *tv, Error *err)
{
tv->v_type = VAR_UNKNOWN;
tv->v_lock = 0;
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 7300e60b1a..4c39950344 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -71,6 +71,7 @@
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
#include "nvim/api/private/helpers.h"
+#include "nvim/os/msgpack_rpc.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@@ -6453,6 +6454,7 @@ static struct fst {
{"searchpair", 3, 7, f_searchpair},
{"searchpairpos", 3, 7, f_searchpairpos},
{"searchpos", 1, 4, f_searchpos},
+ {"send_call", 3, 3, f_send_call},
{"send_event", 3, 3, f_send_event},
{"setbufvar", 3, 3, f_setbufvar},
{"setcmdpos", 1, 1, f_setcmdpos},
@@ -12525,6 +12527,47 @@ do_searchpair (
return retval;
}
+// "send_call()" function
+static void f_send_call(typval_T *argvars, typval_T *rettv)
+{
+ rettv->v_type = VAR_NUMBER;
+ rettv->vval.v_number = 0;
+
+ if (check_restricted() || check_secure()) {
+ return;
+ }
+
+ if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) {
+ EMSG2(_(e_invarg2), "Channel id must be a positive integer");
+ return;
+ }
+
+ if (argvars[1].v_type != VAR_STRING) {
+ EMSG2(_(e_invarg2), "Method name must be a string");
+ return;
+ }
+
+ bool errored;
+ Object result;
+ if (!channel_send_call((uint64_t)argvars[0].vval.v_number,
+ (char *)argvars[1].vval.v_string,
+ vim_to_object(&argvars[2]),
+ &result,
+ &errored)) {
+ EMSG2(_(e_invarg2), "Channel doesn't exist");
+ return;
+ }
+
+ Error conversion_error = {.set = false};
+ if (errored || !object_to_vim(result, rettv, &conversion_error)) {
+ EMSG(errored ?
+ result.data.string.data :
+ _("Error converting the call result"));
+ }
+
+ msgpack_rpc_free_object(result);
+}
+
// "send_event()" function
static void f_send_event(typval_T *argvars, typval_T *rettv)
{
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 4299f2a06d..552c962b3a 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -5,6 +5,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
+#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
@@ -19,9 +20,15 @@
#include "nvim/lib/kvec.h"
typedef struct {
+ uint64_t request_id;
+ bool errored;
+ Object result;
+} ChannelCallFrame;
+
+typedef struct {
uint64_t id;
PMap(cstr_t) *subscribed_events;
- bool is_job, is_alive;
+ bool is_job, enabled;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
@@ -32,6 +39,9 @@ typedef struct {
uv_stream_t *uv;
} streams;
} data;
+ uint64_t next_request_id;
+ kvec_t(ChannelCallFrame *) call_stack;
+ size_t rpc_call_level;
} Channel;
static uint64_t next_id = 1;
@@ -135,6 +145,78 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
return true;
}
+bool channel_send_call(uint64_t id,
+ char *name,
+ Object arg,
+ Object *result,
+ bool *errored)
+{
+ Channel *channel = NULL;
+
+ if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ msgpack_rpc_free_object(arg);
+ return false;
+ }
+
+ if (kv_size(channel->call_stack) > 20) {
+ // 20 stack depth is more than anyone should ever need for RPC calls
+ *errored = true;
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Channel %" PRIu64 " was closed due to a high stack depth "
+ "while processing a RPC call",
+ channel->id);
+ *result = STRING_OBJ(buf);
+ }
+
+ uint64_t request_id = channel->next_request_id++;
+ // Send the msgpack-rpc request
+ channel_write(channel, serialize_message(0, request_id, name, arg));
+
+ if (!kv_size(channel->call_stack)) {
+ // This is the first frame, we must disable event deferral for this
+ // channel because we won't be returning until the client sends a
+ // response
+ if (channel->is_job) {
+ job_set_defer(channel->data.job, false);
+ } else {
+ rstream_set_defer(channel->data.streams.read, false);
+ }
+ }
+
+ // Push the frame
+ ChannelCallFrame frame = {request_id, false, NIL};
+ kv_push(ChannelCallFrame *, channel->call_stack, &frame);
+ size_t size = kv_size(channel->call_stack);
+
+ do {
+ event_poll(-1);
+ } while (
+ // Continue running if ...
+ channel->enabled && // the channel is still enabled
+ kv_size(channel->call_stack) >= size); // the call didn't return
+
+ if (!kv_size(channel->call_stack)) {
+ // Popped last frame, restore event deferral
+ if (channel->is_job) {
+ job_set_defer(channel->data.job, true);
+ } else {
+ rstream_set_defer(channel->data.streams.read, true);
+ }
+ if (!channel->enabled && !channel->rpc_call_level) {
+ // Close the channel if it has been disabled and we have not been called
+ // by `parse_msgpack`(It would be unsafe to close the channel otherwise)
+ close_channel(channel);
+ }
+ }
+
+ *errored = frame.errored;
+ *result = frame.result;
+
+ return true;
+}
+
/// Subscribes to event broadcasts
///
/// @param id The channel id
@@ -193,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
Channel *channel = data;
if (eof) {
- close_channel(channel);
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Before returning from a RPC call, channel %" PRIu64 " was "
+ "closed by the client",
+ channel->id);
+ disable_channel(channel, buf);
return;
}
+ channel->rpc_call_level++;
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
@@ -211,6 +300,24 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// Deserialize everything we can.
while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
== kUnpackResultOk) {
+ if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
+ if (is_valid_rpc_response(&unpacked.data, channel)) {
+ call_stack_pop(&unpacked.data, channel);
+ } else {
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Channel %" PRIu64 " returned a response that doesn't have "
+ " a matching id for the current RPC call. Ensure the client "
+ " is properly synchronized",
+ channel->id);
+ call_stack_unwind(channel, buf, 1);
+ }
+ msgpack_unpacked_destroy(&unpacked);
+ // Bail out from this event loop iteration
+ goto end;
+ }
+
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
@@ -221,7 +328,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
channel->sbuffer->size,
free);
if (!channel_write(channel, buffer)) {
- return;
+ goto end;
}
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
@@ -238,6 +345,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"This error can also happen when deserializing "
"an object with high level of nesting");
}
+
+end:
+ channel->rpc_call_level--;
+ if (!channel->enabled && !kv_size(channel->call_stack)) {
+ // Now it's safe to destroy the channel
+ close_channel(channel);
+ }
}
static void send_error(Channel *channel, char *msg)
@@ -276,9 +390,14 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
}
if (!success) {
- // If the write failed for whatever reason, mark the channel as not alive so
- // it can be freed later
- channel->is_alive = false;
+ // If the write failed for any reason, close the channel
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Before returning from a RPC call, channel %" PRIu64 " was "
+ "closed due to a failed write",
+ channel->id);
+ disable_channel(channel, buf);
}
return success;
@@ -359,6 +478,7 @@ static void close_channel(Channel *channel)
});
pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
free(channel);
}
@@ -390,12 +510,67 @@ static WBuffer *serialize_message(int type,
static Channel *register_channel()
{
Channel *rv = xmalloc(sizeof(Channel));
- rv->is_alive = true;
+ rv->enabled = true;
+ rv->rpc_call_level = 0;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->sbuffer = msgpack_sbuffer_new();
rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)();
+ rv->next_request_id = 1;
+ kv_init(rv->call_stack);
pmap_put(uint64_t)(channels, rv->id, rv);
return rv;
}
+static bool is_rpc_response(msgpack_object *obj)
+{
+ return obj->type == MSGPACK_OBJECT_ARRAY
+ && obj->via.array.size == 4
+ && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
+ && obj->via.array.ptr[0].via.u64 == 1
+ && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
+}
+
+static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
+{
+ uint64_t response_id = obj->via.array.ptr[1].via.u64;
+ // Must be equal to the frame at the stack's bottom
+ return response_id == kv_A(channel->call_stack,
+ kv_size(channel->call_stack) - 1)->request_id;
+}
+
+static void call_stack_pop(msgpack_object *obj, Channel *channel)
+{
+ ChannelCallFrame *frame = kv_A(channel->call_stack,
+ kv_size(channel->call_stack) - 1);
+ frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
+ (void)kv_pop(channel->call_stack);
+
+ if (frame->errored) {
+ msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
+ } else {
+ msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
+ }
+}
+
+static void call_stack_unwind(Channel *channel, char *msg, int count)
+{
+ while (kv_size(channel->call_stack) && count--) {
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
+ frame->errored = true;
+ frame->result = STRING_OBJ(msg);
+ }
+}
+
+static void disable_channel(Channel *channel, char *msg)
+{
+ if (kv_size(channel->call_stack)) {
+ // Channel is currently in the middle of a call, remove all frames and mark
+ // it as "dead"
+ channel->enabled = false;
+ call_stack_unwind(channel, msg, -1);
+ } else {
+ // Safe to close it now
+ close_channel(channel);
+ }
+}