diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-07-17 12:06:31 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-07-17 12:06:31 -0300 |
commit | 953d61cbf82d5f1acd68bd1ae2101d92f5ec5492 (patch) | |
tree | d4aa1fe08ad3f0a7e27b6628aad4925cd1fbfb2a /src/nvim/os/channel.c | |
parent | b92630c2fff7950141630f0d62b11404d0589ece (diff) | |
parent | 4dc642aa622cfac09f2f4752907137d68d8508fe (diff) | |
download | rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.gz rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.bz2 rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.zip |
Merge PR #895 'Core service providers...'
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r-- | src/nvim/os/channel.c | 131 |
1 files changed, 65 insertions, 66 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 504c1ca05b..d5f29aa667 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -6,6 +6,7 @@ #include <msgpack.h> #include "nvim/api/private/helpers.h" +#include "nvim/api/vim.h" #include "nvim/os/channel.h" #include "nvim/os/event.h" #include "nvim/os/rstream.h" @@ -17,9 +18,11 @@ #include "nvim/os/msgpack_rpc.h" #include "nvim/os/msgpack_rpc_helpers.h" #include "nvim/vim.h" +#include "nvim/ascii.h" #include "nvim/memory.h" #include "nvim/message.h" #include "nvim/map.h" +#include "nvim/log.h" #include "nvim/lib/kvec.h" typedef struct { @@ -81,7 +84,8 @@ void channel_teardown(void) /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process -bool channel_from_job(char **argv) +/// @return The channel id +uint64_t channel_from_job(char **argv) { Channel *channel = register_channel(); channel->is_job = true; @@ -91,17 +95,17 @@ bool channel_from_job(char **argv) channel, job_out, job_err, - job_exit, + NULL, true, 0, &status); if (status <= 0) { close_channel(channel); - return false; + return 0; } - return true; + return channel->id; } /// Creates an API channel from a libuv stream representing a tcp or @@ -114,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream) stream->data = NULL; channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -123,6 +127,13 @@ void channel_from_stream(uv_stream_t *stream) channel->data.streams.uv = stream; } +bool channel_exists(uint64_t id) +{ + Channel *channel; + return (channel = pmap_get(uint64_t)(channels, id)) != NULL + && channel->enabled; +} + /// Sends event/data to channel /// /// @param id The channel id. If 0, the event will be sent to all @@ -135,7 +146,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg) Channel *channel = NULL; if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { msgpack_rpc_free_object(arg); return false; } @@ -155,7 +166,7 @@ bool channel_send_call(uint64_t id, { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { msgpack_rpc_free_object(arg); return false; } @@ -170,22 +181,18 @@ bool channel_send_call(uint64_t id, "while processing a RPC call", channel->id); *result = STRING_OBJ(cstr_to_string(buf)); + msgpack_rpc_free_object(arg); + return false; } uint64_t request_id = channel->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, name, arg); - if (!kv_size(channel->call_stack)) { - // This is the first frame, we must disable event deferral for this - // channel because we won't be returning until the client sends a - // response - if (channel->is_job) { - job_set_defer(channel->data.job, false); - } else { - rstream_set_defer(channel->data.streams.read, false); - } - } + EventSource channel_source = channel->is_job + ? job_event_source(channel->data.job) + : rstream_event_source(channel->data.streams.read); + EventSource sources[] = {channel_source, NULL}; // Push the frame ChannelCallFrame frame = {request_id, false, NIL}; @@ -193,24 +200,18 @@ bool channel_send_call(uint64_t id, size_t size = kv_size(channel->call_stack); do { - event_poll(-1); + event_poll(-1, sources); } while ( // Continue running if ... channel->enabled && // the channel is still enabled kv_size(channel->call_stack) >= size); // the call didn't return - if (!kv_size(channel->call_stack)) { - // Popped last frame, restore event deferral - if (channel->is_job) { - job_set_defer(channel->data.job, true); - } else { - rstream_set_defer(channel->data.streams.read, true); - } - if (!channel->enabled && !channel->rpc_call_level) { + if (!(kv_size(channel->call_stack) + || channel->enabled + || channel->rpc_call_level)) { // Close the channel if it has been disabled and we have not been called // by `parse_msgpack`(It would be unsafe to close the channel otherwise) close_channel(channel); - } } *errored = frame.errored; @@ -227,7 +228,7 @@ void channel_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { abort(); } @@ -249,7 +250,7 @@ void channel_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { abort(); } @@ -264,12 +265,15 @@ static void job_out(RStream *rstream, void *data, bool eof) static void job_err(RStream *rstream, void *data, bool eof) { - // TODO(tarruda): plugin error messages should be sent to the error buffer -} - -static void job_exit(Job *job, void *data) -{ - // TODO(tarruda): what should be done here? + size_t count; + char buf[256]; + Channel *channel = job_data(data); + + while ((count = rstream_available(rstream))) { + size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); + buf[read] = NUL; + ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); + } } static void parse_msgpack(RStream *rstream, void *data, bool eof) @@ -283,12 +287,15 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "Before returning from a RPC call, channel %" PRIu64 " was " "closed by the client", channel->id); - disable_channel(channel, buf); + call_set_error(channel, buf); return; } channel->rpc_call_level++; uint32_t count = rstream_available(rstream); + DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + count, + rstream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -313,7 +320,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) " a matching id for the current RPC call. Ensure the client " " is properly synchronized", channel->id); - call_stack_unwind(channel, buf, 1); + call_set_error(channel, buf); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration @@ -366,7 +373,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "Before returning from a RPC call, channel %" PRIu64 " was " "closed due to a failed write", channel->id); - disable_channel(channel, buf); + call_set_error(channel, buf); } return success; @@ -383,7 +390,7 @@ static void send_request(Channel *channel, Object arg) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, arg, &out_buffer)); + channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1)); } static void send_event(Channel *channel, @@ -391,7 +398,7 @@ static void send_event(Channel *channel, Object arg) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, arg, &out_buffer)); + channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1)); } static void broadcast_event(char *name, Object arg) @@ -412,7 +419,11 @@ static void broadcast_event(char *name, Object arg) } String method = {.size = strlen(name), .data = name}; - WBuffer *buffer = serialize_request(0, method, arg, &out_buffer); + WBuffer *buffer = serialize_request(0, + method, + arg, + &out_buffer, + kv_size(subscribed)); for (size_t i = 0; i < kv_size(subscribed); i++) { channel_write(kv_A(subscribed, i), buffer); @@ -443,6 +454,15 @@ static void close_channel(Channel *channel) pmap_del(uint64_t)(channels, channel->id); msgpack_unpacker_free(channel->unpacker); + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + if (channel->is_job) { if (channel->data.job) { job_stop(channel->data.job); @@ -453,14 +473,6 @@ static void close_channel(Channel *channel) uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); } - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); free(channel); } @@ -503,10 +515,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) static void call_stack_pop(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_pop(channel->call_stack); frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - (void)kv_pop(channel->call_stack); if (frame->errored) { msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); @@ -515,24 +525,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel) } } -static void call_stack_unwind(Channel *channel, char *msg, int count) +static void call_set_error(Channel *channel, char *msg) { - while (kv_size(channel->call_stack) && count--) { + for (size_t i = 0; i < kv_size(channel->call_stack); i++) { ChannelCallFrame *frame = kv_pop(channel->call_stack); frame->errored = true; frame->result = STRING_OBJ(cstr_to_string(msg)); } -} -static void disable_channel(Channel *channel, char *msg) -{ - if (kv_size(channel->call_stack)) { - // Channel is currently in the middle of a call, remove all frames and mark - // it as "dead" - channel->enabled = false; - call_stack_unwind(channel, msg, -1); - } else { - // Safe to close it now - close_channel(channel); - } + channel->enabled = false; } |