diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-06-20 10:53:05 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-06-24 13:02:24 -0300 |
commit | ea7a389ec77c0031160ce860129101c603d8e0ec (patch) | |
tree | 64b415d9d820a170e82fb22d2315d3817e07b175 /src | |
parent | 09605cec03ea23e87ee285fd950a23ce8d23678d (diff) | |
download | rneovim-ea7a389ec77c0031160ce860129101c603d8e0ec.tar.gz rneovim-ea7a389ec77c0031160ce860129101c603d8e0ec.tar.bz2 rneovim-ea7a389ec77c0031160ce860129101c603d8e0ec.zip |
channel: Implement the 'channel_send_call' function
This function is used to send RPC calls to clients. In contrast to
`channel_send_event`, this function will block until the client sends a
response(But it will continue processing requests from that client).
The RPC call stack has a maximum depth of 20.
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/api/private/helpers.c | 2 | ||||
-rw-r--r-- | src/nvim/eval.c | 43 | ||||
-rw-r--r-- | src/nvim/os/channel.c | 189 |
3 files changed, 226 insertions, 8 deletions
diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c index 30301e9368..d5ebc93f7c 100644 --- a/src/nvim/api/private/helpers.c +++ b/src/nvim/api/private/helpers.c @@ -341,7 +341,7 @@ String cstr_to_string(const char *str) }; } -static bool object_to_vim(Object obj, typval_T *tv, Error *err) +bool object_to_vim(Object obj, typval_T *tv, Error *err) { tv->v_type = VAR_UNKNOWN; tv->v_lock = 0; diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 7300e60b1a..4c39950344 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -71,6 +71,7 @@ #include "nvim/os/time.h" #include "nvim/os/channel.h" #include "nvim/api/private/helpers.h" +#include "nvim/os/msgpack_rpc.h" #define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */ @@ -6453,6 +6454,7 @@ static struct fst { {"searchpair", 3, 7, f_searchpair}, {"searchpairpos", 3, 7, f_searchpairpos}, {"searchpos", 1, 4, f_searchpos}, + {"send_call", 3, 3, f_send_call}, {"send_event", 3, 3, f_send_event}, {"setbufvar", 3, 3, f_setbufvar}, {"setcmdpos", 1, 1, f_setcmdpos}, @@ -12525,6 +12527,47 @@ do_searchpair ( return retval; } +// "send_call()" function +static void f_send_call(typval_T *argvars, typval_T *rettv) +{ + rettv->v_type = VAR_NUMBER; + rettv->vval.v_number = 0; + + if (check_restricted() || check_secure()) { + return; + } + + if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) { + EMSG2(_(e_invarg2), "Channel id must be a positive integer"); + return; + } + + if (argvars[1].v_type != VAR_STRING) { + EMSG2(_(e_invarg2), "Method name must be a string"); + return; + } + + bool errored; + Object result; + if (!channel_send_call((uint64_t)argvars[0].vval.v_number, + (char *)argvars[1].vval.v_string, + vim_to_object(&argvars[2]), + &result, + &errored)) { + EMSG2(_(e_invarg2), "Channel doesn't exist"); + return; + } + + Error conversion_error = {.set = false}; + if (errored || !object_to_vim(result, rettv, &conversion_error)) { + EMSG(errored ? + result.data.string.data : + _("Error converting the call result")); + } + + msgpack_rpc_free_object(result); +} + // "send_event()" function static void f_send_event(typval_T *argvars, typval_T *rettv) { diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 4299f2a06d..552c962b3a 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -5,6 +5,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/os/channel.h" +#include "nvim/os/event.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/wstream.h" @@ -19,9 +20,15 @@ #include "nvim/lib/kvec.h" typedef struct { + uint64_t request_id; + bool errored; + Object result; +} ChannelCallFrame; + +typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job, is_alive; + bool is_job, enabled; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; union { @@ -32,6 +39,9 @@ typedef struct { uv_stream_t *uv; } streams; } data; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; + size_t rpc_call_level; } Channel; static uint64_t next_id = 1; @@ -135,6 +145,78 @@ bool channel_send_event(uint64_t id, char *name, Object arg) return true; } +bool channel_send_call(uint64_t id, + char *name, + Object arg, + Object *result, + bool *errored) +{ + Channel *channel = NULL; + + if (!(channel = pmap_get(uint64_t)(channels, id))) { + msgpack_rpc_free_object(arg); + return false; + } + + if (kv_size(channel->call_stack) > 20) { + // 20 stack depth is more than anyone should ever need for RPC calls + *errored = true; + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " was closed due to a high stack depth " + "while processing a RPC call", + channel->id); + *result = STRING_OBJ(buf); + } + + uint64_t request_id = channel->next_request_id++; + // Send the msgpack-rpc request + channel_write(channel, serialize_message(0, request_id, name, arg)); + + if (!kv_size(channel->call_stack)) { + // This is the first frame, we must disable event deferral for this + // channel because we won't be returning until the client sends a + // response + if (channel->is_job) { + job_set_defer(channel->data.job, false); + } else { + rstream_set_defer(channel->data.streams.read, false); + } + } + + // Push the frame + ChannelCallFrame frame = {request_id, false, NIL}; + kv_push(ChannelCallFrame *, channel->call_stack, &frame); + size_t size = kv_size(channel->call_stack); + + do { + event_poll(-1); + } while ( + // Continue running if ... + channel->enabled && // the channel is still enabled + kv_size(channel->call_stack) >= size); // the call didn't return + + if (!kv_size(channel->call_stack)) { + // Popped last frame, restore event deferral + if (channel->is_job) { + job_set_defer(channel->data.job, true); + } else { + rstream_set_defer(channel->data.streams.read, true); + } + if (!channel->enabled && !channel->rpc_call_level) { + // Close the channel if it has been disabled and we have not been called + // by `parse_msgpack`(It would be unsafe to close the channel otherwise) + close_channel(channel); + } + } + + *errored = frame.errored; + *result = frame.result; + + return true; +} + /// Subscribes to event broadcasts /// /// @param id The channel id @@ -193,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) Channel *channel = data; if (eof) { - close_channel(channel); + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed by the client", + channel->id); + disable_channel(channel, buf); return; } + channel->rpc_call_level++; uint32_t count = rstream_available(rstream); // Feed the unpacker with data @@ -211,6 +300,24 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Deserialize everything we can. while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked)) == kUnpackResultOk) { + if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + if (is_valid_rpc_response(&unpacked.data, channel)) { + call_stack_pop(&unpacked.data, channel); + } else { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " returned a response that doesn't have " + " a matching id for the current RPC call. Ensure the client " + " is properly synchronized", + channel->id); + call_stack_unwind(channel, buf, 1); + } + msgpack_unpacked_destroy(&unpacked); + // Bail out from this event loop iteration + goto end; + } + // Each object is a new msgpack-rpc request and requires an empty response msgpack_packer response; msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); @@ -221,7 +328,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) channel->sbuffer->size, free); if (!channel_write(channel, buffer)) { - return; + goto end; } // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); @@ -238,6 +345,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting"); } + +end: + channel->rpc_call_level--; + if (!channel->enabled && !kv_size(channel->call_stack)) { + // Now it's safe to destroy the channel + close_channel(channel); + } } static void send_error(Channel *channel, char *msg) @@ -276,9 +390,14 @@ static bool channel_write(Channel *channel, WBuffer *buffer) } if (!success) { - // If the write failed for whatever reason, mark the channel as not alive so - // it can be freed later - channel->is_alive = false; + // If the write failed for any reason, close the channel + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed due to a failed write", + channel->id); + disable_channel(channel, buf); } return success; @@ -359,6 +478,7 @@ static void close_channel(Channel *channel) }); pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); free(channel); } @@ -390,12 +510,67 @@ static WBuffer *serialize_message(int type, static Channel *register_channel() { Channel *rv = xmalloc(sizeof(Channel)); - rv->is_alive = true; + rv->enabled = true; + rv->rpc_call_level = 0; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->sbuffer = msgpack_sbuffer_new(); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); + rv->next_request_id = 1; + kv_init(rv->call_stack); pmap_put(uint64_t)(channels, rv->id, rv); return rv; } +static bool is_rpc_response(msgpack_object *obj) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 4 + && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.array.ptr[0].via.u64 == 1 + && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; +} + +static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) +{ + uint64_t response_id = obj->via.array.ptr[1].via.u64; + // Must be equal to the frame at the stack's bottom + return response_id == kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1)->request_id; +} + +static void call_stack_pop(msgpack_object *obj, Channel *channel) +{ + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; + (void)kv_pop(channel->call_stack); + + if (frame->errored) { + msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); + } else { + msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); + } +} + +static void call_stack_unwind(Channel *channel, char *msg, int count) +{ + while (kv_size(channel->call_stack) && count--) { + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = true; + frame->result = STRING_OBJ(msg); + } +} + +static void disable_channel(Channel *channel, char *msg) +{ + if (kv_size(channel->call_stack)) { + // Channel is currently in the middle of a call, remove all frames and mark + // it as "dead" + channel->enabled = false; + call_stack_unwind(channel, msg, -1); + } else { + // Safe to close it now + close_channel(channel); + } +} |