diff options
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r-- | src/nvim/os/channel.c | 174 |
1 files changed, 135 insertions, 39 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 2923ab0912..b8cceede6f 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -15,9 +15,11 @@ #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/map.h" +#include "nvim/lib/kvec.h" typedef struct { uint64_t id; + Map(cstr_t) *subscribed_events; bool is_job; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; @@ -33,17 +35,24 @@ typedef struct { static uint64_t next_id = 1; static Map(uint64_t) *channels = NULL; +static Map(cstr_t) *event_strings = NULL; static msgpack_sbuffer msgpack_event_buffer; -static void on_job_stdout(RStream *rstream, void *data, bool eof); -static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void job_out(RStream *rstream, void *data, bool eof); +static void job_err(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); +static void send_event(Channel *channel, char *type, typval_T *data); +static void broadcast_event(char *type, typval_T *data); +static void unsubscribe(Channel *channel, char *event); static void close_channel(Channel *channel); static void close_cb(uv_handle_t *handle); +static WBuffer *serialize_event(char *type, typval_T *data); +static Channel *register_channel(void); void channel_init() { channels = map_new(uint64_t)(); + event_strings = map_new(cstr_t)(); msgpack_sbuffer_init(&msgpack_event_buffer); } @@ -62,71 +71,78 @@ void channel_teardown() void channel_from_job(char **argv) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = on_job_stdout; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - - channel->id = next_id++; + Channel *channel = register_channel(); channel->is_job = true; - channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); - map_put(uint64_t)(channels, channel->id, channel); + channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL); } void channel_from_stream(uv_stream_t *stream) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = parse_msgpack; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - + Channel *channel = register_channel(); stream->data = NULL; - channel->id = next_id++; channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(rcb, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream channel->data.streams.write = wstream_new(1024 * 1024); wstream_set_stream(channel->data.streams.write, stream); channel->data.streams.uv = stream; - map_put(uint64_t)(channels, channel->id, channel); } bool channel_send_event(uint64_t id, char *type, typval_T *data) { - Channel *channel = map_get(uint64_t)(channels, id); + Channel *channel = NULL; - if (!channel) { - return false; + if (id > 0) { + if (!(channel = map_get(uint64_t)(channels, id))) { + return false; + } + send_event(channel, type, data); + } else { + broadcast_event(type, data); } - String event_type = {.size = strnlen(type, 1024), .data = type}; - Object event_data = vim_to_object(data); - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, event_data, &packer); - char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); + return true; +} - wstream_write(channel->data.streams.write, - bytes, - msgpack_event_buffer.size, - true); +void channel_subscribe(uint64_t id, char *event) +{ + Channel *channel; - msgpack_rpc_free_object(event_data); - msgpack_sbuffer_clear(&msgpack_event_buffer); + if (!(channel = map_get(uint64_t)(channels, id))) { + return; + } - return true; + char *event_string = map_get(cstr_t)(event_strings, event); + + if (!event_string) { + event_string = xstrdup(event); + map_put(cstr_t)(event_strings, event_string, event_string); + } + + map_put(cstr_t)(channel->subscribed_events, event_string, event_string); +} + +void channel_unsubscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = map_get(uint64_t)(channels, id))) { + return; + } + + unsubscribe(channel, event); } -static void on_job_stdout(RStream *rstream, void *data, bool eof) +static void job_out(RStream *rstream, void *data, bool eof) { Job *job = data; parse_msgpack(rstream, job_data(job), eof); } -static void on_job_stderr(RStream *rstream, void *data, bool eof) +static void job_err(RStream *rstream, void *data, bool eof) { // TODO(tarruda): plugin error messages should be sent to the error buffer } @@ -158,15 +174,62 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); wstream_write(channel->data.streams.write, - xmemdup(channel->sbuffer->data, channel->sbuffer->size), - channel->sbuffer->size, - true); + wstream_new_buffer(channel->sbuffer->data, + channel->sbuffer->size, + true)); // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } } +static void send_event(Channel *channel, char *type, typval_T *data) +{ + wstream_write(channel->data.streams.write, serialize_event(type, data)); +} + +static void broadcast_event(char *type, typval_T *data) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, type)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + goto end; + } + + WBuffer *buffer = serialize_event(type, data); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + } + +end: + kv_destroy(subscribed); +} + +static void unsubscribe(Channel *channel, char *event) +{ + char *event_string = map_get(cstr_t)(event_strings, event); + map_del(cstr_t)(channel->subscribed_events, event_string); + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, event_string)) { + return; + } + }); + + // Since the string is no longer used by other channels, release it's memory + map_del(cstr_t)(event_strings, event_string); + free(event_string); +} + static void close_channel(Channel *channel) { map_del(uint64_t)(channels, channel->id); @@ -181,6 +244,13 @@ static void close_channel(Channel *channel) uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); } + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + map_free(cstr_t)(channel->subscribed_events); free(channel); } @@ -190,3 +260,29 @@ static void close_cb(uv_handle_t *handle) free(handle); } +static WBuffer *serialize_event(char *type, typval_T *data) +{ + String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type}; + Object event_data = vim_to_object(data); + msgpack_packer packer; + msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); + msgpack_rpc_notification(event_type, event_data, &packer); + WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data, + msgpack_event_buffer.size, + true); + msgpack_rpc_free_object(event_data); + msgpack_sbuffer_clear(&msgpack_event_buffer); + + return rv; +} + +static Channel *register_channel() +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->sbuffer = msgpack_sbuffer_new(); + rv->id = next_id++; + rv->subscribed_events = map_new(cstr_t)(); + map_put(uint64_t)(channels, rv->id, rv); + return rv; +} |