diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/os/channel.c | 82 |
1 files changed, 54 insertions, 28 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 10766ca76e..8184003593 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -3,7 +3,6 @@ #include <uv.h> #include <msgpack.h> -#include "nvim/lib/klist.h" #include "nvim/os/channel.h" #include "nvim/os/channel_defs.h" #include "nvim/os/rstream.h" @@ -15,8 +14,10 @@ #include "nvim/os/msgpack_rpc.h" #include "nvim/vim.h" #include "nvim/memory.h" +#include "nvim/map.h" typedef struct { + uint64_t id; ChannelProtocol protocol; bool is_job; union { @@ -30,22 +31,23 @@ typedef struct { struct { RStream *read; WStream *write; + uv_stream_t *uv; } streams; } data; } Channel; -#define _destroy_channel(x) +static uint64_t next_id = 1; +static Map(uint64_t) *channels = NULL; -KLIST_INIT(Channel, Channel *, _destroy_channel) - -static klist_t(Channel) *channels = NULL; static void on_job_stdout(RStream *rstream, void *data, bool eof); static void on_job_stderr(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); +static void close_channel(Channel *channel); +static void close_cb(uv_handle_t *handle); void channel_init() { - channels = kl_init(Channel); + channels = map_new(uint64_t)(); } void channel_teardown() @@ -56,24 +58,9 @@ void channel_teardown() Channel *channel; - while (kl_shift(Channel, channels, &channel) == 0) { - - switch (channel->protocol) { - case kChannelProtocolMsgpack: - msgpack_sbuffer_free(channel->proto.msgpack.sbuffer); - msgpack_unpacker_free(channel->proto.msgpack.unpacker); - break; - default: - abort(); - } - - if (channel->is_job) { - job_stop(channel->data.job_id); - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - } - } + map_foreach_value(channels, channel, { + close_channel(channel); + }); } void channel_from_job(char **argv, ChannelProtocol prot) @@ -92,10 +79,11 @@ void channel_from_job(char **argv, ChannelProtocol prot) abort(); } + channel->id = next_id++; channel->protocol = prot; channel->is_job = true; channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); - *kl_pushp(Channel, channels) = channel; + map_put(uint64_t)(channels, channel->id, channel); } void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) @@ -115,6 +103,7 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) } stream->data = NULL; + channel->id = next_id++; channel->protocol = prot; channel->is_job = false; // read stream @@ -124,8 +113,8 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) // write stream channel->data.streams.write = wstream_new(1024 * 1024); wstream_set_stream(channel->data.streams.write, stream); - // push to channel list - *kl_pushp(Channel, channels) = channel; + channel->data.streams.uv = stream; + map_put(uint64_t)(channels, channel->id, channel); } static void on_job_stdout(RStream *rstream, void *data, bool eof) @@ -141,8 +130,13 @@ static void on_job_stderr(RStream *rstream, void *data, bool eof) static void parse_msgpack(RStream *rstream, void *data, bool eof) { - msgpack_unpacked unpacked; Channel *channel = data; + + if (eof) { + close_channel(channel); + return; + } + uint32_t count = rstream_available(rstream); // Feed the unpacker with data @@ -152,6 +146,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) count); msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count); + msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); // Deserialize everything we can. @@ -173,3 +168,34 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer); } } + +static void close_channel(Channel *channel) +{ + map_del(uint64_t)(channels, channel->id); + + switch (channel->protocol) { + case kChannelProtocolMsgpack: + msgpack_sbuffer_free(channel->proto.msgpack.sbuffer); + msgpack_unpacker_free(channel->proto.msgpack.unpacker); + break; + default: + abort(); + } + + if (channel->is_job) { + job_stop(channel->data.job_id); + } else { + rstream_free(channel->data.streams.read); + wstream_free(channel->data.streams.write); + uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + } + + free(channel); +} + +static void close_cb(uv_handle_t *handle) +{ + free(handle->data); + free(handle); +} + |