diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-06-20 10:52:56 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-06-24 13:02:24 -0300 |
commit | c8297e462a1f1a842370651ca67fe5abd45e3c1d (patch) | |
tree | 1ad551da8b62d19cb0d4b39e3cf9aae173993cea /src | |
parent | 0dea2682dc3b1132fb86e807d50ac9ac4c063d76 (diff) | |
download | rneovim-c8297e462a1f1a842370651ca67fe5abd45e3c1d.tar.gz rneovim-c8297e462a1f1a842370651ca67fe5abd45e3c1d.tar.bz2 rneovim-c8297e462a1f1a842370651ca67fe5abd45e3c1d.zip |
channel: Extract 'channel_write' function
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/os/channel.c | 52 |
1 files changed, 38 insertions, 14 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index f859544663..7cc40a67e0 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -14,13 +14,14 @@ #include "nvim/os/msgpack_rpc.h" #include "nvim/vim.h" #include "nvim/memory.h" +#include "nvim/message.h" #include "nvim/map.h" #include "nvim/lib/kvec.h" typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job; + bool is_job, is_alive; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; union { @@ -215,12 +216,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); - wstream_write(channel->data.streams.write, - wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free)); - + WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), + channel->sbuffer->size, + free); + if (!channel_write(channel, buffer)) { + return; + } // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } @@ -240,19 +242,40 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting", &response); - wstream_write(channel->data.streams.write, - wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free)); + WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), + channel->sbuffer->size, + free); + if (!channel_write(channel, buffer)) { + return; + } // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } } +static bool channel_write(Channel *channel, WBuffer *buffer) +{ + bool success; + + if (channel->is_job) { + success = job_write(channel->data.job, buffer); + } else { + success = wstream_write(channel->data.streams.write, buffer); + } + + if (!success) { + // If the write failed for whatever reason, mark the channel as not alive so + // it can be freed later + channel->is_alive = false; + } + + return success; +} + static void send_event(Channel *channel, char *type, Object data) { - wstream_write(channel->data.streams.write, serialize_event(type, data)); + channel_write(channel, serialize_event(type, data)); } static void broadcast_event(char *type, Object data) @@ -275,7 +298,7 @@ static void broadcast_event(char *type, Object data) WBuffer *buffer = serialize_event(type, data); for (size_t i = 0; i < kv_size(subscribed); i++) { - wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + channel_write(kv_A(subscribed, i), buffer); } end: @@ -349,6 +372,7 @@ static WBuffer *serialize_event(char *type, Object data) static Channel *register_channel() { Channel *rv = xmalloc(sizeof(Channel)); + rv->is_alive = true; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->sbuffer = msgpack_sbuffer_new(); rv->id = next_id++; |