diff options
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r-- | src/nvim/os/channel.c | 110 |
1 files changed, 29 insertions, 81 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index f275c70805..2923ab0912 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -5,7 +5,6 @@ #include "nvim/api/private/helpers.h" #include "nvim/os/channel.h" -#include "nvim/os/channel_defs.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/wstream.h" @@ -19,14 +18,9 @@ typedef struct { uint64_t id; - ChannelProtocol protocol; bool is_job; - union { - struct { - msgpack_unpacker *unpacker; - msgpack_sbuffer *sbuffer; - } msgpack; - } proto; + msgpack_unpacker *unpacker; + msgpack_sbuffer *sbuffer; union { int job_id; struct { @@ -44,7 +38,6 @@ static msgpack_sbuffer msgpack_event_buffer; static void on_job_stdout(RStream *rstream, void *data, bool eof); static void on_job_stderr(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); -static void send_msgpack(Channel *channel, String type, Object data); static void close_channel(Channel *channel); static void close_cb(uv_handle_t *handle); @@ -67,48 +60,28 @@ void channel_teardown() }); } -void channel_from_job(char **argv, ChannelProtocol prot) +void channel_from_job(char **argv) { Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = NULL; - - switch (prot) { - case kChannelProtocolMsgpack: - rcb = on_job_stdout; - channel->proto.msgpack.unpacker = - msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->proto.msgpack.sbuffer = msgpack_sbuffer_new(); - break; - default: - abort(); - } + rstream_cb rcb = on_job_stdout; + channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + channel->sbuffer = msgpack_sbuffer_new(); channel->id = next_id++; - channel->protocol = prot; channel->is_job = true; channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); map_put(uint64_t)(channels, channel->id, channel); } -void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) +void channel_from_stream(uv_stream_t *stream) { Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = NULL; - - switch (prot) { - case kChannelProtocolMsgpack: - rcb = parse_msgpack; - channel->proto.msgpack.unpacker = - msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->proto.msgpack.sbuffer = msgpack_sbuffer_new(); - break; - default: - abort(); - } + rstream_cb rcb = parse_msgpack; + channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + channel->sbuffer = msgpack_sbuffer_new(); stream->data = NULL; channel->id = next_id++; - channel->protocol = prot; channel->is_job = false; // read stream channel->data.streams.read = rstream_new(rcb, 1024, channel, true); @@ -131,16 +104,18 @@ bool channel_send_event(uint64_t id, char *type, typval_T *data) String event_type = {.size = strnlen(type, 1024), .data = type}; Object event_data = vim_to_object(data); + msgpack_packer packer; + msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); + msgpack_rpc_notification(event_type, event_data, &packer); + char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); - switch (channel->protocol) { - case kChannelProtocolMsgpack: - send_msgpack(channel, event_type, event_data); - break; - default: - abort(); - } + wstream_write(channel->data.streams.write, + bytes, + msgpack_event_buffer.size, + true); msgpack_rpc_free_object(event_data); + msgpack_sbuffer_clear(&msgpack_event_buffer); return true; } @@ -168,62 +143,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) uint32_t count = rstream_available(rstream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count); - rstream_read(rstream, - msgpack_unpacker_buffer(channel->proto.msgpack.unpacker), - count); - msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count); + msgpack_unpacker_reserve_buffer(channel->unpacker, count); + rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); + msgpack_unpacker_buffer_consumed(channel->unpacker, count); msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); // Deserialize everything we can. - while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) { + while (msgpack_unpacker_next(channel->unpacker, &unpacked)) { // Each object is a new msgpack-rpc request and requires an empty response msgpack_packer response; - msgpack_packer_init(&response, - channel->proto.msgpack.sbuffer, - msgpack_sbuffer_write); + msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); wstream_write(channel->data.streams.write, - xmemdup(channel->proto.msgpack.sbuffer->data, - channel->proto.msgpack.sbuffer->size), - channel->proto.msgpack.sbuffer->size, + xmemdup(channel->sbuffer->data, channel->sbuffer->size), + channel->sbuffer->size, true); // Clear the buffer for future calls - msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer); + msgpack_sbuffer_clear(channel->sbuffer); } } -static void send_msgpack(Channel *channel, String type, Object data) -{ - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(type, data, &packer); - char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); - - wstream_write(channel->data.streams.write, - bytes, - msgpack_event_buffer.size, - true); - - msgpack_sbuffer_clear(&msgpack_event_buffer); -} - static void close_channel(Channel *channel) { map_del(uint64_t)(channels, channel->id); - - switch (channel->protocol) { - case kChannelProtocolMsgpack: - msgpack_sbuffer_free(channel->proto.msgpack.sbuffer); - msgpack_unpacker_free(channel->proto.msgpack.unpacker); - break; - default: - abort(); - } + msgpack_sbuffer_free(channel->sbuffer); + msgpack_unpacker_free(channel->unpacker); if (channel->is_job) { job_stop(channel->data.job_id); |