From b280308ac649da61e2a0f40a222eae21af5352c9 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 07:35:10 -0300 Subject: msgpack-rpc: Create subdirectory for msgpack-rpc modules Create the msgpack_rpc subdirectory and move all modules that deal with msgpack-rpc to it. Also merge msgpack_rpc.c into msgpack_rpc/helpers.c --- src/nvim/msgpack_rpc/channel.c | 597 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 597 insertions(+) create mode 100644 src/nvim/msgpack_rpc/channel.c (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c new file mode 100644 index 0000000000..dcd7e41737 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel.c @@ -0,0 +1,597 @@ +#include +#include +#include + +#include +#include + +#include "nvim/api/private/helpers.h" +#include "nvim/api/vim.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/os/event.h" +#include "nvim/os/rstream.h" +#include "nvim/os/rstream_defs.h" +#include "nvim/os/wstream.h" +#include "nvim/os/wstream_defs.h" +#include "nvim/os/job.h" +#include "nvim/os/job_defs.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/vim.h" +#include "nvim/ascii.h" +#include "nvim/memory.h" +#include "nvim/os_unix.h" +#include "nvim/message.h" +#include "nvim/term.h" +#include "nvim/map.h" +#include "nvim/log.h" +#include "nvim/misc1.h" +#include "nvim/lib/kvec.h" + +#define CHANNEL_BUFFER_SIZE 0xffff + +typedef struct { + uint64_t request_id; + bool errored; + Object result; +} ChannelCallFrame; + +typedef struct { + uint64_t id; + PMap(cstr_t) *subscribed_events; + bool is_job, enabled; + msgpack_unpacker *unpacker; + union { + Job *job; + struct { + RStream *read; + WStream *write; + uv_stream_t *uv; + } streams; + } data; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; + size_t rpc_call_level; +} Channel; + +static uint64_t next_id = 1; +static PMap(uint64_t) *channels = NULL; +static PMap(cstr_t) *event_strings = NULL; +static msgpack_sbuffer out_buffer; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/channel.c.generated.h" +#endif + +/// Initializes the module +void channel_init(void) +{ + channels = pmap_new(uint64_t)(); + event_strings = pmap_new(cstr_t)(); + msgpack_sbuffer_init(&out_buffer); + + if (embedded_mode) { + channel_from_stdio(); + } +} + +/// Teardown the module +void channel_teardown(void) +{ + if (!channels) { + return; + } + + Channel *channel; + + map_foreach_value(channels, channel, { + close_channel(channel); + }); +} + +/// Creates an API channel by starting a job and connecting to its +/// stdin/stdout. stderr is forwarded to the editor error stream. +/// +/// @param argv The argument vector for the process +/// @return The channel id +uint64_t channel_from_job(char **argv) +{ + Channel *channel = register_channel(); + channel->is_job = true; + + int status; + channel->data.job = job_start(argv, + channel, + job_out, + job_err, + NULL, + 0, + &status); + + if (status <= 0) { + close_channel(channel); + return 0; + } + + return channel->id; +} + +/// Creates an API channel from a libuv stream representing a tcp or +/// pipe/socket client connection +/// +/// @param stream The established connection +void channel_from_stream(uv_stream_t *stream) +{ + Channel *channel = register_channel(); + stream->data = NULL; + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(parse_msgpack, + rbuffer_new(CHANNEL_BUFFER_SIZE), + channel, + NULL); + rstream_set_stream(channel->data.streams.read, stream); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(0); + wstream_set_stream(channel->data.streams.write, stream); + channel->data.streams.uv = stream; +} + +bool channel_exists(uint64_t id) +{ + Channel *channel; + return (channel = pmap_get(uint64_t)(channels, id)) != NULL + && channel->enabled; +} + +/// Sends event/arguments to channel +/// +/// @param id The channel id. If 0, the event will be sent to all +/// channels that have subscribed to the event type +/// @param name The event name, an arbitrary string +/// @param args Array with event arguments +/// @return True if the event was sent successfully, false otherwise. +bool channel_send_event(uint64_t id, char *name, Array args) +{ + Channel *channel = NULL; + + if (id > 0) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + api_free_array(args); + return false; + } + send_event(channel, name, args); + } else { + broadcast_event(name, args); + } + + return true; +} + +/// Sends a method call to a channel +/// +/// @param id The channel id +/// @param method_name The method name, an arbitrary string +/// @param args Array with method arguments +/// @param[out] error True if the return value is an error +/// @return Whatever the remote method returned +Object channel_send_call(uint64_t id, + char *method_name, + Array args, + Error *err) +{ + Channel *channel = NULL; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); + api_free_array(args); + return NIL; + } + + if (kv_size(channel->call_stack) > 20) { + // 20 stack depth is more than anyone should ever need for RPC calls + api_set_error(err, + Exception, + _("Channel %" PRIu64 " crossed maximum stack depth"), + channel->id); + api_free_array(args); + return NIL; + } + + uint64_t request_id = channel->next_request_id++; + // Send the msgpack-rpc request + send_request(channel, request_id, method_name, args); + + EventSource channel_source = channel->is_job + ? job_event_source(channel->data.job) + : rstream_event_source(channel->data.streams.read); + EventSource sources[] = {channel_source, NULL}; + + // Push the frame + ChannelCallFrame frame = {request_id, false, NIL}; + kv_push(ChannelCallFrame *, channel->call_stack, &frame); + size_t size = kv_size(channel->call_stack); + + do { + event_poll(-1, sources); + } while ( + // Continue running if ... + channel->enabled && // the channel is still enabled + kv_size(channel->call_stack) >= size); // the call didn't return + + if (frame.errored) { + api_set_error(err, Exception, "%s", frame.result.data.string.data); + return NIL; + } + + return frame.result; +} + +/// Subscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_subscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + abort(); + } + + char *event_string = pmap_get(cstr_t)(event_strings, event); + + if (!event_string) { + event_string = xstrdup(event); + pmap_put(cstr_t)(event_strings, event_string, event_string); + } + + pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); +} + +/// Unsubscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_unsubscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + abort(); + } + + unsubscribe(channel, event); +} + +/// Closes a channel +/// +/// @param id The channel id +/// @return true if successful, false otherwise +bool channel_close(uint64_t id) +{ + Channel *channel; + + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + return false; + } + + channel_kill(channel); + channel->enabled = false; + return true; +} + +/// Creates an API channel from stdin/stdout. This is used when embedding +/// Neovim +static void channel_from_stdio(void) +{ + Channel *channel = register_channel(); + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(parse_msgpack, + rbuffer_new(CHANNEL_BUFFER_SIZE), + channel, + NULL); + rstream_set_file(channel->data.streams.read, 0); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(0); + wstream_set_file(channel->data.streams.write, 1); + channel->data.streams.uv = NULL; +} + +static void job_out(RStream *rstream, void *data, bool eof) +{ + Job *job = data; + parse_msgpack(rstream, job_data(job), eof); +} + +static void job_err(RStream *rstream, void *data, bool eof) +{ + size_t count; + char buf[256]; + Channel *channel = job_data(data); + + while ((count = rstream_pending(rstream))) { + size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); + buf[read] = NUL; + ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); + } +} + +static void parse_msgpack(RStream *rstream, void *data, bool eof) +{ + Channel *channel = data; + channel->rpc_call_level++; + + if (eof) { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed by the client", + channel->id); + call_set_error(channel, buf); + goto end; + } + + uint32_t count = rstream_pending(rstream); + DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + count, + rstream); + + // Feed the unpacker with data + msgpack_unpacker_reserve_buffer(channel->unpacker, count); + rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); + msgpack_unpacker_buffer_consumed(channel->unpacker, count); + + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + msgpack_unpack_return result; + + // Deserialize everything we can. + while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { + if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + if (is_valid_rpc_response(&unpacked.data, channel)) { + call_stack_pop(&unpacked.data, channel); + } else { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " returned a response that doesn't have " + " a matching id for the current RPC call. Ensure the client " + " is properly synchronized", + channel->id); + call_set_error(channel, buf); + } + msgpack_unpacked_destroy(&unpacked); + // Bail out from this event loop iteration + goto end; + } + + // Perform the call + WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); + // write the response + if (!channel_write(channel, resp)) { + goto end; + } + } + + if (result == MSGPACK_UNPACK_NOMEM_ERROR) { + OUT_STR(e_outofmem); + out_char('\n'); + preserve_exit(); + } + + if (result == MSGPACK_UNPACK_PARSE_ERROR) { + // See src/msgpack/unpack_template.h in msgpack source tree for + // causes for this error(search for 'goto _failed') + // + // A not so uncommon cause for this might be deserializing objects with + // a high nesting level: msgpack will break when it's internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + send_error(channel, 0, "Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting"); + } + +end: + channel->rpc_call_level--; + if (!channel->enabled && !kv_size(channel->call_stack)) { + // Now it's safe to destroy the channel + close_channel(channel); + } +} + +static bool channel_write(Channel *channel, WBuffer *buffer) +{ + bool success; + + if (channel->is_job) { + success = job_write(channel->data.job, buffer); + } else { + success = wstream_write(channel->data.streams.write, buffer); + } + + if (!success) { + // If the write failed for any reason, close the channel + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed due to a failed write", + channel->id); + call_set_error(channel, buf); + } + + return success; +} + +static void send_error(Channel *channel, uint64_t id, char *err) +{ + Error e = ERROR_INIT; + api_set_error(&e, Exception, "%s", err); + channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); +} + +static void send_request(Channel *channel, + uint64_t id, + char *name, + Array args) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); +} + +static void send_event(Channel *channel, + char *name, + Array args) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); +} + +static void broadcast_event(char *name, Array args) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + api_free_array(args); + goto end; + } + + String method = {.size = strlen(name), .data = name}; + WBuffer *buffer = serialize_request(0, + method, + args, + &out_buffer, + kv_size(subscribed)); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + channel_write(kv_A(subscribed, i), buffer); + } + +end: + kv_destroy(subscribed); +} + +static void unsubscribe(Channel *channel, char *event) +{ + char *event_string = pmap_get(cstr_t)(event_strings, event); + pmap_del(cstr_t)(channel->subscribed_events, event_string); + + map_foreach_value(channels, channel, { + if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + return; + } + }); + + // Since the string is no longer used by other channels, release it's memory + pmap_del(cstr_t)(event_strings, event_string); + free(event_string); +} + +static void close_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + channel_kill(channel); + + free(channel); +} + +static void channel_kill(Channel *channel) +{ + if (channel->is_job) { + if (channel->data.job) { + job_stop(channel->data.job); + } + } else { + rstream_free(channel->data.streams.read); + wstream_free(channel->data.streams.write); + if (channel->data.streams.uv) { + uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + } else { + // When the stdin channel closes, it's time to go + mch_exit(0); + } + } +} + +static void close_cb(uv_handle_t *handle) +{ + free(handle->data); + free(handle); +} + +static Channel *register_channel(void) +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->enabled = true; + rv->rpc_call_level = 0; + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->id = next_id++; + rv->subscribed_events = pmap_new(cstr_t)(); + rv->next_request_id = 1; + kv_init(rv->call_stack); + pmap_put(uint64_t)(channels, rv->id, rv); + return rv; +} + +static bool is_rpc_response(msgpack_object *obj) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 4 + && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.array.ptr[0].via.u64 == 1 + && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; +} + +static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) +{ + uint64_t response_id = obj->via.array.ptr[1].via.u64; + // Must be equal to the frame at the stack's bottom + return response_id == kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1)->request_id; +} + +static void call_stack_pop(msgpack_object *obj, Channel *channel) +{ + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; + + if (frame->errored) { + msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); + } else { + msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); + } +} + +static void call_set_error(Channel *channel, char *msg) +{ + for (size_t i = 0; i < kv_size(channel->call_stack); i++) { + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = true; + frame->result = STRING_OBJ(cstr_to_string(msg)); + } + + channel->enabled = false; +} -- cgit From cf6f60ce4dc376570e8d71facea76299ca951604 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 07:46:19 -0300 Subject: channel: Simplify resource management - Remove unused rpc_call_level field - Add `returned` field to ChannelCallFrame. This is set when the call returns and is the only condition checked by `channel_send_call`. - Add job_exit callback for properly closing channels created from job(the job_exit callback is only called after all read callbacks, so it's the only safe place to free the channel). --- src/nvim/msgpack_rpc/channel.c | 109 ++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 56 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index dcd7e41737..83e7900a54 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -31,14 +31,14 @@ typedef struct { uint64_t request_id; - bool errored; + bool returned, errored; Object result; } ChannelCallFrame; typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job, enabled; + bool is_job, closed; msgpack_unpacker *unpacker; union { Job *job; @@ -50,7 +50,6 @@ typedef struct { } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; } Channel; static uint64_t next_id = 1; @@ -103,12 +102,12 @@ uint64_t channel_from_job(char **argv) channel, job_out, job_err, - NULL, + job_exit, 0, &status); if (status <= 0) { - close_channel(channel); + free_channel(channel); return 0; } @@ -141,7 +140,7 @@ bool channel_exists(uint64_t id) { Channel *channel; return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; + && !channel->closed; } /// Sends event/arguments to channel @@ -156,7 +155,7 @@ bool channel_send_event(uint64_t id, char *name, Array args) Channel *channel = NULL; if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_free_array(args); return false; } @@ -182,7 +181,7 @@ Object channel_send_call(uint64_t id, { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); api_free_array(args); return NIL; @@ -208,16 +207,14 @@ Object channel_send_call(uint64_t id, EventSource sources[] = {channel_source, NULL}; // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; + ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); do { event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return + } while (!frame.returned); + + (void)kv_pop(channel->call_stack); if (frame.errored) { api_set_error(err, Exception, "%s", frame.result.data.string.data); @@ -235,7 +232,7 @@ void channel_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -257,7 +254,7 @@ void channel_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -272,12 +269,11 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { return false; } - channel_kill(channel); - channel->enabled = false; + close_channel(channel); return true; } @@ -319,19 +315,16 @@ static void job_err(RStream *rstream, void *data, bool eof) } } +static void job_exit(Job *job, void *data) +{ + free_channel((Channel *)data); +} + static void parse_msgpack(RStream *rstream, void *data, bool eof) { Channel *channel = data; - channel->rpc_call_level++; if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); goto end; } @@ -354,7 +347,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) MSGPACK_UNPACK_SUCCESS) { if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); + complete_call(&unpacked.data, channel); } else { char buf[256]; snprintf(buf, @@ -397,10 +390,11 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); + if (eof && !channel->is_job && !kv_size(channel->call_stack)) { + // The free_channel call is deferred for jobs because it's possible that + // job_stderr will called after this. For non-job channels, this is the + // last callback so it must be freed now. + free_channel(channel); } } @@ -500,26 +494,11 @@ static void unsubscribe(Channel *channel, char *event) free(event_string); } +/// Close the channel streams/job. The channel resources will be freed by +/// free_channel later. static void close_channel(Channel *channel) { - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ + channel->closed = true; if (channel->is_job) { if (channel->data.job) { job_stop(channel->data.job); @@ -536,6 +515,22 @@ static void channel_kill(Channel *channel) } } +static void free_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + free(channel); +} + static void close_cb(uv_handle_t *handle) { free(handle->data); @@ -545,8 +540,7 @@ static void close_cb(uv_handle_t *handle) static Channel *register_channel(void) { Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; + rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); @@ -573,9 +567,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) kv_size(channel->call_stack) - 1)->request_id; } -static void call_stack_pop(msgpack_object *obj, Channel *channel) +static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; if (frame->errored) { @@ -588,10 +584,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg) { for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, i); + frame->returned = true; frame->errored = true; frame->result = STRING_OBJ(cstr_to_string(msg)); } - channel->enabled = false; + close_channel(channel); } -- cgit From 264e0d872c598062be2b2a118d38c89a6ed5a023 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 09:25:58 -0300 Subject: event: Remove automatic event deferall This is how asynchronous events are currently handled by Nvim: - Libuv event loop is entered when Nvim blocks for user input(os_inchar is called) - Any event delivered by libuv that is not user input is queued for processing - The `K_EVENT` special key code is returned by os_inchar - `K_EVENT` is returned to a loop that is reading keys for the current Nvim mode, which will be handled by calling event_process() This approach has the advantage of integrating nicely with the current codebase, eg: vimscript code can be executed asynchronously with little surprises(Its the same as if the user typed a key). The problem with using keys to represent any event is that it also interferes with operators, and not every event needs or should do that. For example, consider this scenario: - A msgpack-rpc client calls vim_feedkeys("d") - Nvim processes K_EVENT, pushing "d" to the input queue - Nvim processes "d", entering operator-pending mode to wait for a motion - The client calls vim_feedkeys("w"), expecting Nvim to delete a word - Nvim processes K_EVENT, breaking out of operator-pending and pushing "w" - Nvim processes "w", moving a word This commit fixes the above problem by removing all automatic calls to `event_push`(which is what generates K_EVENT input). Right now this also breaks redrawing initiated by asynchronous events(and possibly other stuff too, Nvim is a complex state machine and we can't simply run vimscript code anywhere). In future commits the calls to `event_push` will be inserted only where it's absolutely necessary to run code in "key reading loops", such as when executing vimscript code or mutating editor data structures in ways that currently can only be done by the user. --- src/nvim/msgpack_rpc/channel.c | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 83e7900a54..d31e404c23 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -126,8 +126,7 @@ void channel_from_stream(uv_stream_t *stream) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -201,17 +200,12 @@ Object channel_send_call(uint64_t id, // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - // Push the frame ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); do { - event_poll(-1, sources); + event_poll(-1); } while (!frame.returned); (void)kv_pop(channel->call_stack); @@ -286,8 +280,7 @@ static void channel_from_stdio(void) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_file(channel->data.streams.read, 0); rstream_start(channel->data.streams.read); // write stream -- cgit From b527ac752fd5ebcc74c06306e7009e2b98e4ee01 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 10:39:54 -0300 Subject: event: Extract event_poll loops to `event_poll_until` macro A pattern that is becoming common across the project is to poll for events until a certain condition is true, optionally passing a timeout. To address this scenario, the event_poll_until macro was created and the job/channel/input modules were refactored to use it on their blocking functions. --- src/nvim/msgpack_rpc/channel.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index d31e404c23..91c26ca21e 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -203,11 +203,7 @@ Object channel_send_call(uint64_t id, // Push the frame ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); - - do { - event_poll(-1); - } while (!frame.returned); - + event_poll_until(-1, frame.returned); (void)kv_pop(channel->call_stack); if (frame.errored) { -- cgit From 72e3e57bf1aa128b02724e853365f65fd9451f0b Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 20 Oct 2014 20:07:01 -0300 Subject: msgpack-rpc: Allow selective deferral API calls Since all API functions now run immediately after a msgpack-rpc request is parsed by libuv callbacks, a mechanism was added to override this behavior and allow certain functions to run in Nvim main loop. The mechanism is simple: Any API function tagged with the FUNC_ATTR_DEFERRED (a "dummy" attribute only used by msgpack-gen.lua) will be called when Nvim main loop receives a K_EVENT key. To implement this mechanism it was necessary some restructuration on the msgpack-rpc modules, especially in the msgpack-gen.lua script. --- src/nvim/msgpack_rpc/channel.c | 99 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 6 deletions(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 91c26ca21e..6ddda10c5f 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -5,6 +5,8 @@ #include #include +#include "nvim/lib/klist.h" + #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/msgpack_rpc/channel.h" @@ -52,6 +54,17 @@ typedef struct { kvec_t(ChannelCallFrame *) call_stack; } Channel; +typedef struct { + Channel *channel; + MsgpackRpcRequestHandler handler; + Array args; + uint64_t request_id; +} RequestEvent; + +#define RequestEventFreer(x) +KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer) +kmempool_t(RequestEventPool) *request_event_pool = NULL; + static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; @@ -64,6 +77,7 @@ static msgpack_sbuffer out_buffer; /// Initializes the module void channel_init(void) { + request_event_pool = kmp_init(RequestEventPool); channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); @@ -352,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - // Perform the call - WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); - // write the response - if (!channel_write(channel, resp)) { - goto end; - } + handle_request(channel, &unpacked.data); } if (result == MSGPACK_UNPACK_NOMEM_ERROR) { @@ -387,6 +396,84 @@ end: } } +static void handle_request(Channel *channel, msgpack_object *request) + FUNC_ATTR_NONNULL_ALL +{ + uint64_t request_id; + Error error = ERROR_INIT; + msgpack_rpc_validate(&request_id, request, &error); + + if (error.set) { + // Validation failed, send response with error + channel_write(channel, + serialize_response(request_id, &error, NIL, &out_buffer)); + return; + } + + // Retrieve the request handler + MsgpackRpcRequestHandler handler; + msgpack_object method = request->via.array.ptr[2]; + + if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) { + handler = msgpack_rpc_get_handler_for(method.via.bin.ptr, + method.via.bin.size); + } else { + handler.fn = msgpack_rpc_handle_missing_method; + handler.defer = false; + } + + Array args; + msgpack_rpc_to_array(request->via.array.ptr + 3, &args); + + if (kv_size(channel->call_stack) || !handler.defer) { + call_request_handler(channel, handler, args, request_id); + return; + } + + // Defer calling the request handler. + RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool); + event_data->channel = channel; + event_data->handler = handler; + event_data->args = args; + event_data->request_id = request_id; + event_push((Event) { + .handler = on_request_event, + .data = event_data + }); +} + +static void on_request_event(Event event) +{ + RequestEvent *e = event.data; + call_request_handler(e->channel, e->handler, e->args, e->request_id); + kmp_free(RequestEventPool, request_event_pool, e); +} + +static void call_request_handler(Channel *channel, + MsgpackRpcRequestHandler handler, + Array args, + uint64_t request_id) +{ + Error error = ERROR_INIT; + Object result = handler.fn(channel->id, request_id, args, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); + + if (error.set) { + ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", + error.msg, + request_id); + channel_write(channel, + serialize_response(request_id, &error, NIL, &out_buffer)); + } + + DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", + request_id); + channel_write(channel, + serialize_response(request_id, &error, result, &out_buffer)); +} + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; -- cgit From 79b7263f793206167260fcbc99bd76f73bfeb2c7 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Tue, 21 Oct 2014 08:53:55 -0300 Subject: compilation: Add -Wconversion to more files and validate CONV_SOURCES All files under the os, api and msgpack_rpc directories have -Wconversion automatically applied. CONV_SOURCES is also checked for missing files(when renaming, for example) --- src/nvim/msgpack_rpc/channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/nvim/msgpack_rpc/channel.c') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6ddda10c5f..a1ab12f7c3 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -331,7 +331,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - uint32_t count = rstream_pending(rstream); + size_t count = rstream_pending(rstream); DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", count, rstream); -- cgit