aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r--src/nvim/os/channel.c174
1 files changed, 135 insertions, 39 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 2923ab0912..b8cceede6f 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -15,9 +15,11 @@
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/map.h"
+#include "nvim/lib/kvec.h"
typedef struct {
uint64_t id;
+ Map(cstr_t) *subscribed_events;
bool is_job;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
@@ -33,17 +35,24 @@ typedef struct {
static uint64_t next_id = 1;
static Map(uint64_t) *channels = NULL;
+static Map(cstr_t) *event_strings = NULL;
static msgpack_sbuffer msgpack_event_buffer;
-static void on_job_stdout(RStream *rstream, void *data, bool eof);
-static void on_job_stderr(RStream *rstream, void *data, bool eof);
+static void job_out(RStream *rstream, void *data, bool eof);
+static void job_err(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
+static void send_event(Channel *channel, char *type, typval_T *data);
+static void broadcast_event(char *type, typval_T *data);
+static void unsubscribe(Channel *channel, char *event);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
+static WBuffer *serialize_event(char *type, typval_T *data);
+static Channel *register_channel(void);
void channel_init()
{
channels = map_new(uint64_t)();
+ event_strings = map_new(cstr_t)();
msgpack_sbuffer_init(&msgpack_event_buffer);
}
@@ -62,71 +71,78 @@ void channel_teardown()
void channel_from_job(char **argv)
{
- Channel *channel = xmalloc(sizeof(Channel));
- rstream_cb rcb = on_job_stdout;
- channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- channel->sbuffer = msgpack_sbuffer_new();
-
- channel->id = next_id++;
+ Channel *channel = register_channel();
channel->is_job = true;
- channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
- map_put(uint64_t)(channels, channel->id, channel);
+ channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL);
}
void channel_from_stream(uv_stream_t *stream)
{
- Channel *channel = xmalloc(sizeof(Channel));
- rstream_cb rcb = parse_msgpack;
- channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
- channel->sbuffer = msgpack_sbuffer_new();
-
+ Channel *channel = register_channel();
stream->data = NULL;
- channel->id = next_id++;
channel->is_job = false;
// read stream
- channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
+ channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
channel->data.streams.write = wstream_new(1024 * 1024);
wstream_set_stream(channel->data.streams.write, stream);
channel->data.streams.uv = stream;
- map_put(uint64_t)(channels, channel->id, channel);
}
bool channel_send_event(uint64_t id, char *type, typval_T *data)
{
- Channel *channel = map_get(uint64_t)(channels, id);
+ Channel *channel = NULL;
- if (!channel) {
- return false;
+ if (id > 0) {
+ if (!(channel = map_get(uint64_t)(channels, id))) {
+ return false;
+ }
+ send_event(channel, type, data);
+ } else {
+ broadcast_event(type, data);
}
- String event_type = {.size = strnlen(type, 1024), .data = type};
- Object event_data = vim_to_object(data);
- msgpack_packer packer;
- msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(event_type, event_data, &packer);
- char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
+ return true;
+}
- wstream_write(channel->data.streams.write,
- bytes,
- msgpack_event_buffer.size,
- true);
+void channel_subscribe(uint64_t id, char *event)
+{
+ Channel *channel;
- msgpack_rpc_free_object(event_data);
- msgpack_sbuffer_clear(&msgpack_event_buffer);
+ if (!(channel = map_get(uint64_t)(channels, id))) {
+ return;
+ }
- return true;
+ char *event_string = map_get(cstr_t)(event_strings, event);
+
+ if (!event_string) {
+ event_string = xstrdup(event);
+ map_put(cstr_t)(event_strings, event_string, event_string);
+ }
+
+ map_put(cstr_t)(channel->subscribed_events, event_string, event_string);
+}
+
+void channel_unsubscribe(uint64_t id, char *event)
+{
+ Channel *channel;
+
+ if (!(channel = map_get(uint64_t)(channels, id))) {
+ return;
+ }
+
+ unsubscribe(channel, event);
}
-static void on_job_stdout(RStream *rstream, void *data, bool eof)
+static void job_out(RStream *rstream, void *data, bool eof)
{
Job *job = data;
parse_msgpack(rstream, job_data(job), eof);
}
-static void on_job_stderr(RStream *rstream, void *data, bool eof)
+static void job_err(RStream *rstream, void *data, bool eof)
{
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
@@ -158,15 +174,62 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
- xmemdup(channel->sbuffer->data, channel->sbuffer->size),
- channel->sbuffer->size,
- true);
+ wstream_new_buffer(channel->sbuffer->data,
+ channel->sbuffer->size,
+ true));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
}
+static void send_event(Channel *channel, char *type, typval_T *data)
+{
+ wstream_write(channel->data.streams.write, serialize_event(type, data));
+}
+
+static void broadcast_event(char *type, typval_T *data)
+{
+ kvec_t(Channel *) subscribed;
+ kv_init(subscribed);
+ Channel *channel;
+
+ map_foreach_value(channels, channel, {
+ if (map_has(cstr_t)(channel->subscribed_events, type)) {
+ kv_push(Channel *, subscribed, channel);
+ }
+ });
+
+ if (!kv_size(subscribed)) {
+ goto end;
+ }
+
+ WBuffer *buffer = serialize_event(type, data);
+
+ for (size_t i = 0; i < kv_size(subscribed); i++) {
+ wstream_write(kv_A(subscribed, i)->data.streams.write, buffer);
+ }
+
+end:
+ kv_destroy(subscribed);
+}
+
+static void unsubscribe(Channel *channel, char *event)
+{
+ char *event_string = map_get(cstr_t)(event_strings, event);
+ map_del(cstr_t)(channel->subscribed_events, event_string);
+
+ map_foreach_value(channels, channel, {
+ if (map_has(cstr_t)(channel->subscribed_events, event_string)) {
+ return;
+ }
+ });
+
+ // Since the string is no longer used by other channels, release it's memory
+ map_del(cstr_t)(event_strings, event_string);
+ free(event_string);
+}
+
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
@@ -181,6 +244,13 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
+ // Unsubscribe from all events
+ char *event_string;
+ map_foreach_value(channel->subscribed_events, event_string, {
+ unsubscribe(channel, event_string);
+ });
+
+ map_free(cstr_t)(channel->subscribed_events);
free(channel);
}
@@ -190,3 +260,29 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
+static WBuffer *serialize_event(char *type, typval_T *data)
+{
+ String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
+ Object event_data = vim_to_object(data);
+ msgpack_packer packer;
+ msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
+ msgpack_rpc_notification(event_type, event_data, &packer);
+ WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data,
+ msgpack_event_buffer.size,
+ true);
+ msgpack_rpc_free_object(event_data);
+ msgpack_sbuffer_clear(&msgpack_event_buffer);
+
+ return rv;
+}
+
+static Channel *register_channel()
+{
+ Channel *rv = xmalloc(sizeof(Channel));
+ rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ rv->sbuffer = msgpack_sbuffer_new();
+ rv->id = next_id++;
+ rv->subscribed_events = map_new(cstr_t)();
+ map_put(uint64_t)(channels, rv->id, rv);
+ return rv;
+}