aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-05-28 08:42:05 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-05-28 08:52:54 -0300
commit5e3fb4ae9545678299a4fc4314de7a086303e9bb (patch)
tree1e879ef2492db8e2577fbe74d01d8d652160225e /src
parent48ac06506ba64f21e59e0120ca87759ddb85efd0 (diff)
downloadrneovim-5e3fb4ae9545678299a4fc4314de7a086303e9bb.tar.gz
rneovim-5e3fb4ae9545678299a4fc4314de7a086303e9bb.tar.bz2
rneovim-5e3fb4ae9545678299a4fc4314de7a086303e9bb.zip
API: Events: Add support for broadcasting events
The channel_send_event will now broadcast events to all subscribed channels if the 'id' parameter is 0.
Diffstat (limited to 'src')
-rw-r--r--src/nvim/eval.c2
-rw-r--r--src/nvim/os/channel.c85
-rw-r--r--src/nvim/os/channel.h3
3 files changed, 71 insertions, 19 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 87054aa9df..83bbd9a24c 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -13069,7 +13069,7 @@ static void f_send_event(typval_T *argvars, typval_T *rettv)
return;
}
- if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) {
+ if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number < 0) {
EMSG2(_(e_invarg2), "Channel id must be a positive integer");
return;
}
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 2584340950..46c2a3d394 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -15,9 +15,11 @@
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/map.h"
+#include "nvim/lib/kvec.h"
typedef struct {
uint64_t id;
+ Map(cstr_t) *subscribed_events;
bool is_job;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
@@ -38,6 +40,8 @@ static msgpack_sbuffer msgpack_event_buffer;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
+static void send_event(Channel *channel, char *type, typval_T *data);
+static void broadcast_event(char *type, typval_T *data);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
@@ -68,6 +72,7 @@ void channel_from_job(char **argv)
channel->sbuffer = msgpack_sbuffer_new();
channel->id = next_id++;
+ channel->subscribed_events = map_new(cstr_t)();
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
map_put(uint64_t)(channels, channel->id, channel);
@@ -82,6 +87,7 @@ void channel_from_stream(uv_stream_t *stream)
stream->data = NULL;
channel->id = next_id++;
+ channel->subscribed_events = map_new(cstr_t)();
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
@@ -96,26 +102,17 @@ void channel_from_stream(uv_stream_t *stream)
bool channel_send_event(uint64_t id, char *type, typval_T *data)
{
- Channel *channel = map_get(uint64_t)(channels, id);
+ Channel *channel = NULL;
- if (!channel) {
- return false;
+ if (id > 0) {
+ if (!(channel = map_get(uint64_t)(channels, id))) {
+ return false;
+ }
+ send_event(channel, type, data);
+ } else {
+ broadcast_event(type, data);
}
- String event_type = {.size = strnlen(type, 1024), .data = type};
- Object event_data = vim_to_object(data);
- msgpack_packer packer;
- msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(event_type, event_data, &packer);
-
- wstream_write(channel->data.streams.write,
- wstream_new_buffer(msgpack_event_buffer.data,
- msgpack_event_buffer.size,
- true));
-
- msgpack_rpc_free_object(event_data);
- msgpack_sbuffer_clear(&msgpack_event_buffer);
-
return true;
}
@@ -166,6 +163,59 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
}
}
+static void send_event(Channel *channel, char *type, typval_T *data)
+{
+ String event_type = {.size = strnlen(type, 1024), .data = type};
+ Object event_data = vim_to_object(data);
+ msgpack_packer packer;
+ msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
+ msgpack_rpc_notification(event_type, event_data, &packer);
+ WBuffer *buffer = wstream_new_buffer(msgpack_event_buffer.data,
+ msgpack_event_buffer.size,
+ true);
+
+ wstream_write(channel->data.streams.write, buffer);
+
+ msgpack_rpc_free_object(event_data);
+ msgpack_sbuffer_clear(&msgpack_event_buffer);
+}
+
+static void broadcast_event(char *type, typval_T *data)
+{
+ kvec_t(Channel *) subscribed;
+ kv_init(subscribed);
+ Channel *channel;
+
+ map_foreach_value(channels, channel, {
+ if (map_has(cstr_t)(channel->subscribed_events, type)) {
+ kv_push(Channel *, subscribed, channel);
+ }
+ });
+
+ if (!kv_size(subscribed)) {
+ goto end;
+ }
+
+ String event_type = {.size = strnlen(type, 1024), .data = type};
+ Object event_data = vim_to_object(data);
+ msgpack_packer packer;
+ msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
+ msgpack_rpc_notification(event_type, event_data, &packer);
+ WBuffer *buffer = wstream_new_buffer(msgpack_event_buffer.data,
+ msgpack_event_buffer.size,
+ true);
+
+ for (size_t i = 0; i < kv_size(subscribed); i++) {
+ wstream_write(kv_A(subscribed, i)->data.streams.write, buffer);
+ }
+
+ msgpack_rpc_free_object(event_data);
+ msgpack_sbuffer_clear(&msgpack_event_buffer);
+
+end:
+ kv_destroy(subscribed);
+}
+
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
@@ -180,6 +230,7 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
+ map_free(cstr_t)(channel->subscribed_events);
free(channel);
}
diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h
index a1c650a378..05588151a3 100644
--- a/src/nvim/os/channel.h
+++ b/src/nvim/os/channel.h
@@ -25,7 +25,8 @@ void channel_from_job(char **argv);
/// Sends event/data to channel
///
-/// @param id The channel id
+/// @param id The channel id. If 0, the event will be sent to all
+/// channels that have subscribed to the event type
/// @param type The event type, an arbitrary string
/// @param obj The event data
/// @return True if the data was sent successfully, false otherwise.