aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/channel.c
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-05-27 10:16:40 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-05-27 10:19:38 -0300
commitb8e563f516a6d184c36d15b8b9c795f84fd40983 (patch)
tree1624bebe6b86dba6793e812e057a54d39595a965 /src/nvim/os/channel.c
parent277554a9ebaaa34bea1862802254db9b9c0f92d1 (diff)
downloadrneovim-b8e563f516a6d184c36d15b8b9c795f84fd40983.tar.gz
rneovim-b8e563f516a6d184c36d15b8b9c795f84fd40983.tar.bz2
rneovim-b8e563f516a6d184c36d15b8b9c795f84fd40983.zip
Refactor: Remove support for multiple protocols
This removes the boilerplate code supporting more than one RPC protocol as it was becoming hard to maintain and we probably won't ever need it.
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r--src/nvim/os/channel.c110
1 files changed, 29 insertions, 81 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index f275c70805..2923ab0912 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -5,7 +5,6 @@
#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
-#include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
@@ -19,14 +18,9 @@
typedef struct {
uint64_t id;
- ChannelProtocol protocol;
bool is_job;
- union {
- struct {
- msgpack_unpacker *unpacker;
- msgpack_sbuffer *sbuffer;
- } msgpack;
- } proto;
+ msgpack_unpacker *unpacker;
+ msgpack_sbuffer *sbuffer;
union {
int job_id;
struct {
@@ -44,7 +38,6 @@ static msgpack_sbuffer msgpack_event_buffer;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
-static void send_msgpack(Channel *channel, String type, Object data);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
@@ -67,48 +60,28 @@ void channel_teardown()
});
}
-void channel_from_job(char **argv, ChannelProtocol prot)
+void channel_from_job(char **argv)
{
Channel *channel = xmalloc(sizeof(Channel));
- rstream_cb rcb = NULL;
-
- switch (prot) {
- case kChannelProtocolMsgpack:
- rcb = on_job_stdout;
- channel->proto.msgpack.unpacker =
- msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
- break;
- default:
- abort();
- }
+ rstream_cb rcb = on_job_stdout;
+ channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ channel->sbuffer = msgpack_sbuffer_new();
channel->id = next_id++;
- channel->protocol = prot;
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
map_put(uint64_t)(channels, channel->id, channel);
}
-void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
+void channel_from_stream(uv_stream_t *stream)
{
Channel *channel = xmalloc(sizeof(Channel));
- rstream_cb rcb = NULL;
-
- switch (prot) {
- case kChannelProtocolMsgpack:
- rcb = parse_msgpack;
- channel->proto.msgpack.unpacker =
- msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
- break;
- default:
- abort();
- }
+ rstream_cb rcb = parse_msgpack;
+ channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ channel->sbuffer = msgpack_sbuffer_new();
stream->data = NULL;
channel->id = next_id++;
- channel->protocol = prot;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
@@ -131,16 +104,18 @@ bool channel_send_event(uint64_t id, char *type, typval_T *data)
String event_type = {.size = strnlen(type, 1024), .data = type};
Object event_data = vim_to_object(data);
+ msgpack_packer packer;
+ msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
+ msgpack_rpc_notification(event_type, event_data, &packer);
+ char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
- switch (channel->protocol) {
- case kChannelProtocolMsgpack:
- send_msgpack(channel, event_type, event_data);
- break;
- default:
- abort();
- }
+ wstream_write(channel->data.streams.write,
+ bytes,
+ msgpack_event_buffer.size,
+ true);
msgpack_rpc_free_object(event_data);
+ msgpack_sbuffer_clear(&msgpack_event_buffer);
return true;
}
@@ -168,62 +143,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
- msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count);
- rstream_read(rstream,
- msgpack_unpacker_buffer(channel->proto.msgpack.unpacker),
- count);
- msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
+ msgpack_unpacker_reserve_buffer(channel->unpacker, count);
+ rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, count);
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
// Deserialize everything we can.
- while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) {
+ while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
- msgpack_packer_init(&response,
- channel->proto.msgpack.sbuffer,
- msgpack_sbuffer_write);
+ msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
- xmemdup(channel->proto.msgpack.sbuffer->data,
- channel->proto.msgpack.sbuffer->size),
- channel->proto.msgpack.sbuffer->size,
+ xmemdup(channel->sbuffer->data, channel->sbuffer->size),
+ channel->sbuffer->size,
true);
// Clear the buffer for future calls
- msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
+ msgpack_sbuffer_clear(channel->sbuffer);
}
}
-static void send_msgpack(Channel *channel, String type, Object data)
-{
- msgpack_packer packer;
- msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(type, data, &packer);
- char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
-
- wstream_write(channel->data.streams.write,
- bytes,
- msgpack_event_buffer.size,
- true);
-
- msgpack_sbuffer_clear(&msgpack_event_buffer);
-}
-
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
-
- switch (channel->protocol) {
- case kChannelProtocolMsgpack:
- msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
- msgpack_unpacker_free(channel->proto.msgpack.unpacker);
- break;
- default:
- abort();
- }
+ msgpack_sbuffer_free(channel->sbuffer);
+ msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
job_stop(channel->data.job_id);