diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-05-28 08:42:05 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-05-28 08:52:54 -0300 |
commit | 5e3fb4ae9545678299a4fc4314de7a086303e9bb (patch) | |
tree | 1e879ef2492db8e2577fbe74d01d8d652160225e | |
parent | 48ac06506ba64f21e59e0120ca87759ddb85efd0 (diff) | |
download | rneovim-5e3fb4ae9545678299a4fc4314de7a086303e9bb.tar.gz rneovim-5e3fb4ae9545678299a4fc4314de7a086303e9bb.tar.bz2 rneovim-5e3fb4ae9545678299a4fc4314de7a086303e9bb.zip |
API: Events: Add support for broadcasting events
The channel_send_event will now broadcast events to all subscribed channels if
the 'id' parameter is 0.
-rw-r--r-- | src/nvim/eval.c | 2 | ||||
-rw-r--r-- | src/nvim/os/channel.c | 85 | ||||
-rw-r--r-- | src/nvim/os/channel.h | 3 |
3 files changed, 71 insertions, 19 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 87054aa9df..83bbd9a24c 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -13069,7 +13069,7 @@ static void f_send_event(typval_T *argvars, typval_T *rettv) return; } - if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) { + if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number < 0) { EMSG2(_(e_invarg2), "Channel id must be a positive integer"); return; } diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 2584340950..46c2a3d394 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -15,9 +15,11 @@ #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/map.h" +#include "nvim/lib/kvec.h" typedef struct { uint64_t id; + Map(cstr_t) *subscribed_events; bool is_job; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; @@ -38,6 +40,8 @@ static msgpack_sbuffer msgpack_event_buffer; static void on_job_stdout(RStream *rstream, void *data, bool eof); static void on_job_stderr(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); +static void send_event(Channel *channel, char *type, typval_T *data); +static void broadcast_event(char *type, typval_T *data); static void close_channel(Channel *channel); static void close_cb(uv_handle_t *handle); @@ -68,6 +72,7 @@ void channel_from_job(char **argv) channel->sbuffer = msgpack_sbuffer_new(); channel->id = next_id++; + channel->subscribed_events = map_new(cstr_t)(); channel->is_job = true; channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); map_put(uint64_t)(channels, channel->id, channel); @@ -82,6 +87,7 @@ void channel_from_stream(uv_stream_t *stream) stream->data = NULL; channel->id = next_id++; + channel->subscribed_events = map_new(cstr_t)(); channel->is_job = false; // read stream channel->data.streams.read = rstream_new(rcb, 1024, channel, true); @@ -96,26 +102,17 @@ void channel_from_stream(uv_stream_t *stream) bool channel_send_event(uint64_t id, char *type, typval_T *data) { - Channel *channel = map_get(uint64_t)(channels, id); + Channel *channel = NULL; - if (!channel) { - return false; + if (id > 0) { + if (!(channel = map_get(uint64_t)(channels, id))) { + return false; + } + send_event(channel, type, data); + } else { + broadcast_event(type, data); } - String event_type = {.size = strnlen(type, 1024), .data = type}; - Object event_data = vim_to_object(data); - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, event_data, &packer); - - wstream_write(channel->data.streams.write, - wstream_new_buffer(msgpack_event_buffer.data, - msgpack_event_buffer.size, - true)); - - msgpack_rpc_free_object(event_data); - msgpack_sbuffer_clear(&msgpack_event_buffer); - return true; } @@ -166,6 +163,59 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } } +static void send_event(Channel *channel, char *type, typval_T *data) +{ + String event_type = {.size = strnlen(type, 1024), .data = type}; + Object event_data = vim_to_object(data); + msgpack_packer packer; + msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); + msgpack_rpc_notification(event_type, event_data, &packer); + WBuffer *buffer = wstream_new_buffer(msgpack_event_buffer.data, + msgpack_event_buffer.size, + true); + + wstream_write(channel->data.streams.write, buffer); + + msgpack_rpc_free_object(event_data); + msgpack_sbuffer_clear(&msgpack_event_buffer); +} + +static void broadcast_event(char *type, typval_T *data) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, type)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + goto end; + } + + String event_type = {.size = strnlen(type, 1024), .data = type}; + Object event_data = vim_to_object(data); + msgpack_packer packer; + msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); + msgpack_rpc_notification(event_type, event_data, &packer); + WBuffer *buffer = wstream_new_buffer(msgpack_event_buffer.data, + msgpack_event_buffer.size, + true); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + } + + msgpack_rpc_free_object(event_data); + msgpack_sbuffer_clear(&msgpack_event_buffer); + +end: + kv_destroy(subscribed); +} + static void close_channel(Channel *channel) { map_del(uint64_t)(channels, channel->id); @@ -180,6 +230,7 @@ static void close_channel(Channel *channel) uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); } + map_free(cstr_t)(channel->subscribed_events); free(channel); } diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h index a1c650a378..05588151a3 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/os/channel.h @@ -25,7 +25,8 @@ void channel_from_job(char **argv); /// Sends event/data to channel /// -/// @param id The channel id +/// @param id The channel id. If 0, the event will be sent to all +/// channels that have subscribed to the event type /// @param type The event type, an arbitrary string /// @param obj The event data /// @return True if the data was sent successfully, false otherwise. |