aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nvim/os/channel.c82
1 files changed, 54 insertions, 28 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 10766ca76e..8184003593 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -3,7 +3,6 @@
#include <uv.h>
#include <msgpack.h>
-#include "nvim/lib/klist.h"
#include "nvim/os/channel.h"
#include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h"
@@ -15,8 +14,10 @@
#include "nvim/os/msgpack_rpc.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
+#include "nvim/map.h"
typedef struct {
+ uint64_t id;
ChannelProtocol protocol;
bool is_job;
union {
@@ -30,22 +31,23 @@ typedef struct {
struct {
RStream *read;
WStream *write;
+ uv_stream_t *uv;
} streams;
} data;
} Channel;
-#define _destroy_channel(x)
+static uint64_t next_id = 1;
+static Map(uint64_t) *channels = NULL;
-KLIST_INIT(Channel, Channel *, _destroy_channel)
-
-static klist_t(Channel) *channels = NULL;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
+static void close_channel(Channel *channel);
+static void close_cb(uv_handle_t *handle);
void channel_init()
{
- channels = kl_init(Channel);
+ channels = map_new(uint64_t)();
}
void channel_teardown()
@@ -56,24 +58,9 @@ void channel_teardown()
Channel *channel;
- while (kl_shift(Channel, channels, &channel) == 0) {
-
- switch (channel->protocol) {
- case kChannelProtocolMsgpack:
- msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
- msgpack_unpacker_free(channel->proto.msgpack.unpacker);
- break;
- default:
- abort();
- }
-
- if (channel->is_job) {
- job_stop(channel->data.job_id);
- } else {
- rstream_free(channel->data.streams.read);
- wstream_free(channel->data.streams.write);
- }
- }
+ map_foreach_value(channels, channel, {
+ close_channel(channel);
+ });
}
void channel_from_job(char **argv, ChannelProtocol prot)
@@ -92,10 +79,11 @@ void channel_from_job(char **argv, ChannelProtocol prot)
abort();
}
+ channel->id = next_id++;
channel->protocol = prot;
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
- *kl_pushp(Channel, channels) = channel;
+ map_put(uint64_t)(channels, channel->id, channel);
}
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
@@ -115,6 +103,7 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
}
stream->data = NULL;
+ channel->id = next_id++;
channel->protocol = prot;
channel->is_job = false;
// read stream
@@ -124,8 +113,8 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
// write stream
channel->data.streams.write = wstream_new(1024 * 1024);
wstream_set_stream(channel->data.streams.write, stream);
- // push to channel list
- *kl_pushp(Channel, channels) = channel;
+ channel->data.streams.uv = stream;
+ map_put(uint64_t)(channels, channel->id, channel);
}
static void on_job_stdout(RStream *rstream, void *data, bool eof)
@@ -141,8 +130,13 @@ static void on_job_stderr(RStream *rstream, void *data, bool eof)
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
- msgpack_unpacked unpacked;
Channel *channel = data;
+
+ if (eof) {
+ close_channel(channel);
+ return;
+ }
+
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
@@ -152,6 +146,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
count);
msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
+ msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
// Deserialize everything we can.
@@ -173,3 +168,34 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
}
}
+
+static void close_channel(Channel *channel)
+{
+ map_del(uint64_t)(channels, channel->id);
+
+ switch (channel->protocol) {
+ case kChannelProtocolMsgpack:
+ msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
+ msgpack_unpacker_free(channel->proto.msgpack.unpacker);
+ break;
+ default:
+ abort();
+ }
+
+ if (channel->is_job) {
+ job_stop(channel->data.job_id);
+ } else {
+ rstream_free(channel->data.streams.read);
+ wstream_free(channel->data.streams.write);
+ uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
+ }
+
+ free(channel);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ free(handle->data);
+ free(handle);
+}
+