aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r--src/nvim/os/channel.c110
1 files changed, 29 insertions, 81 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index f275c70805..2923ab0912 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -5,7 +5,6 @@
#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
-#include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
@@ -19,14 +18,9 @@
typedef struct {
uint64_t id;
- ChannelProtocol protocol;
bool is_job;
- union {
- struct {
- msgpack_unpacker *unpacker;
- msgpack_sbuffer *sbuffer;
- } msgpack;
- } proto;
+ msgpack_unpacker *unpacker;
+ msgpack_sbuffer *sbuffer;
union {
int job_id;
struct {
@@ -44,7 +38,6 @@ static msgpack_sbuffer msgpack_event_buffer;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
-static void send_msgpack(Channel *channel, String type, Object data);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
@@ -67,48 +60,28 @@ void channel_teardown()
});
}
-void channel_from_job(char **argv, ChannelProtocol prot)
+void channel_from_job(char **argv)
{
Channel *channel = xmalloc(sizeof(Channel));
- rstream_cb rcb = NULL;
-
- switch (prot) {
- case kChannelProtocolMsgpack:
- rcb = on_job_stdout;
- channel->proto.msgpack.unpacker =
- msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
- break;
- default:
- abort();
- }
+ rstream_cb rcb = on_job_stdout;
+ channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ channel->sbuffer = msgpack_sbuffer_new();
channel->id = next_id++;
- channel->protocol = prot;
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
map_put(uint64_t)(channels, channel->id, channel);
}
-void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
+void channel_from_stream(uv_stream_t *stream)
{
Channel *channel = xmalloc(sizeof(Channel));
- rstream_cb rcb = NULL;
-
- switch (prot) {
- case kChannelProtocolMsgpack:
- rcb = parse_msgpack;
- channel->proto.msgpack.unpacker =
- msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
- break;
- default:
- abort();
- }
+ rstream_cb rcb = parse_msgpack;
+ channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ channel->sbuffer = msgpack_sbuffer_new();
stream->data = NULL;
channel->id = next_id++;
- channel->protocol = prot;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
@@ -131,16 +104,18 @@ bool channel_send_event(uint64_t id, char *type, typval_T *data)
String event_type = {.size = strnlen(type, 1024), .data = type};
Object event_data = vim_to_object(data);
+ msgpack_packer packer;
+ msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
+ msgpack_rpc_notification(event_type, event_data, &packer);
+ char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
- switch (channel->protocol) {
- case kChannelProtocolMsgpack:
- send_msgpack(channel, event_type, event_data);
- break;
- default:
- abort();
- }
+ wstream_write(channel->data.streams.write,
+ bytes,
+ msgpack_event_buffer.size,
+ true);
msgpack_rpc_free_object(event_data);
+ msgpack_sbuffer_clear(&msgpack_event_buffer);
return true;
}
@@ -168,62 +143,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
- msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count);
- rstream_read(rstream,
- msgpack_unpacker_buffer(channel->proto.msgpack.unpacker),
- count);
- msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
+ msgpack_unpacker_reserve_buffer(channel->unpacker, count);
+ rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, count);
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
// Deserialize everything we can.
- while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) {
+ while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
- msgpack_packer_init(&response,
- channel->proto.msgpack.sbuffer,
- msgpack_sbuffer_write);
+ msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
- xmemdup(channel->proto.msgpack.sbuffer->data,
- channel->proto.msgpack.sbuffer->size),
- channel->proto.msgpack.sbuffer->size,
+ xmemdup(channel->sbuffer->data, channel->sbuffer->size),
+ channel->sbuffer->size,
true);
// Clear the buffer for future calls
- msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
+ msgpack_sbuffer_clear(channel->sbuffer);
}
}
-static void send_msgpack(Channel *channel, String type, Object data)
-{
- msgpack_packer packer;
- msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(type, data, &packer);
- char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
-
- wstream_write(channel->data.streams.write,
- bytes,
- msgpack_event_buffer.size,
- true);
-
- msgpack_sbuffer_clear(&msgpack_event_buffer);
-}
-
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
-
- switch (channel->protocol) {
- case kChannelProtocolMsgpack:
- msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
- msgpack_unpacker_free(channel->proto.msgpack.unpacker);
- break;
- default:
- abort();
- }
+ msgpack_sbuffer_free(channel->sbuffer);
+ msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
job_stop(channel->data.job_id);