diff options
Diffstat (limited to 'src/nvim/os')
-rw-r--r-- | src/nvim/os/channel.c | 174 | ||||
-rw-r--r-- | src/nvim/os/channel.h | 17 | ||||
-rw-r--r-- | src/nvim/os/job.c | 2 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 4 | ||||
-rw-r--r-- | src/nvim/os/rstream.h | 4 | ||||
-rw-r--r-- | src/nvim/os/wstream.c | 63 | ||||
-rw-r--r-- | src/nvim/os/wstream.h | 19 | ||||
-rw-r--r-- | src/nvim/os/wstream_defs.h | 1 |
8 files changed, 207 insertions, 77 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 2923ab0912..b8cceede6f 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -15,9 +15,11 @@ #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/map.h" +#include "nvim/lib/kvec.h" typedef struct { uint64_t id; + Map(cstr_t) *subscribed_events; bool is_job; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; @@ -33,17 +35,24 @@ typedef struct { static uint64_t next_id = 1; static Map(uint64_t) *channels = NULL; +static Map(cstr_t) *event_strings = NULL; static msgpack_sbuffer msgpack_event_buffer; -static void on_job_stdout(RStream *rstream, void *data, bool eof); -static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void job_out(RStream *rstream, void *data, bool eof); +static void job_err(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); +static void send_event(Channel *channel, char *type, typval_T *data); +static void broadcast_event(char *type, typval_T *data); +static void unsubscribe(Channel *channel, char *event); static void close_channel(Channel *channel); static void close_cb(uv_handle_t *handle); +static WBuffer *serialize_event(char *type, typval_T *data); +static Channel *register_channel(void); void channel_init() { channels = map_new(uint64_t)(); + event_strings = map_new(cstr_t)(); msgpack_sbuffer_init(&msgpack_event_buffer); } @@ -62,71 +71,78 @@ void channel_teardown() void channel_from_job(char **argv) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = on_job_stdout; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - - channel->id = next_id++; + Channel *channel = register_channel(); channel->is_job = true; - channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); - map_put(uint64_t)(channels, channel->id, channel); + channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL); } void channel_from_stream(uv_stream_t *stream) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = parse_msgpack; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - + Channel *channel = register_channel(); stream->data = NULL; - channel->id = next_id++; channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(rcb, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream channel->data.streams.write = wstream_new(1024 * 1024); wstream_set_stream(channel->data.streams.write, stream); channel->data.streams.uv = stream; - map_put(uint64_t)(channels, channel->id, channel); } bool channel_send_event(uint64_t id, char *type, typval_T *data) { - Channel *channel = map_get(uint64_t)(channels, id); + Channel *channel = NULL; - if (!channel) { - return false; + if (id > 0) { + if (!(channel = map_get(uint64_t)(channels, id))) { + return false; + } + send_event(channel, type, data); + } else { + broadcast_event(type, data); } - String event_type = {.size = strnlen(type, 1024), .data = type}; - Object event_data = vim_to_object(data); - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, event_data, &packer); - char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); + return true; +} - wstream_write(channel->data.streams.write, - bytes, - msgpack_event_buffer.size, - true); +void channel_subscribe(uint64_t id, char *event) +{ + Channel *channel; - msgpack_rpc_free_object(event_data); - msgpack_sbuffer_clear(&msgpack_event_buffer); + if (!(channel = map_get(uint64_t)(channels, id))) { + return; + } - return true; + char *event_string = map_get(cstr_t)(event_strings, event); + + if (!event_string) { + event_string = xstrdup(event); + map_put(cstr_t)(event_strings, event_string, event_string); + } + + map_put(cstr_t)(channel->subscribed_events, event_string, event_string); +} + +void channel_unsubscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = map_get(uint64_t)(channels, id))) { + return; + } + + unsubscribe(channel, event); } -static void on_job_stdout(RStream *rstream, void *data, bool eof) +static void job_out(RStream *rstream, void *data, bool eof) { Job *job = data; parse_msgpack(rstream, job_data(job), eof); } -static void on_job_stderr(RStream *rstream, void *data, bool eof) +static void job_err(RStream *rstream, void *data, bool eof) { // TODO(tarruda): plugin error messages should be sent to the error buffer } @@ -158,15 +174,62 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); wstream_write(channel->data.streams.write, - xmemdup(channel->sbuffer->data, channel->sbuffer->size), - channel->sbuffer->size, - true); + wstream_new_buffer(channel->sbuffer->data, + channel->sbuffer->size, + true)); // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } } +static void send_event(Channel *channel, char *type, typval_T *data) +{ + wstream_write(channel->data.streams.write, serialize_event(type, data)); +} + +static void broadcast_event(char *type, typval_T *data) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, type)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + goto end; + } + + WBuffer *buffer = serialize_event(type, data); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + } + +end: + kv_destroy(subscribed); +} + +static void unsubscribe(Channel *channel, char *event) +{ + char *event_string = map_get(cstr_t)(event_strings, event); + map_del(cstr_t)(channel->subscribed_events, event_string); + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, event_string)) { + return; + } + }); + + // Since the string is no longer used by other channels, release it's memory + map_del(cstr_t)(event_strings, event_string); + free(event_string); +} + static void close_channel(Channel *channel) { map_del(uint64_t)(channels, channel->id); @@ -181,6 +244,13 @@ static void close_channel(Channel *channel) uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); } + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + map_free(cstr_t)(channel->subscribed_events); free(channel); } @@ -190,3 +260,29 @@ static void close_cb(uv_handle_t *handle) free(handle); } +static WBuffer *serialize_event(char *type, typval_T *data) +{ + String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type}; + Object event_data = vim_to_object(data); + msgpack_packer packer; + msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); + msgpack_rpc_notification(event_type, event_data, &packer); + WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data, + msgpack_event_buffer.size, + true); + msgpack_rpc_free_object(event_data); + msgpack_sbuffer_clear(&msgpack_event_buffer); + + return rv; +} + +static Channel *register_channel() +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->sbuffer = msgpack_sbuffer_new(); + rv->id = next_id++; + rv->subscribed_events = map_new(cstr_t)(); + map_put(uint64_t)(channels, rv->id, rv); + return rv; +} diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h index a1c650a378..b88cd2445f 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/os/channel.h @@ -5,6 +5,8 @@ #include "nvim/vim.h" +#define EVENT_MAXLEN 512 + /// Initializes the module void channel_init(void); @@ -25,11 +27,24 @@ void channel_from_job(char **argv); /// Sends event/data to channel /// -/// @param id The channel id +/// @param id The channel id. If 0, the event will be sent to all +/// channels that have subscribed to the event type /// @param type The event type, an arbitrary string /// @param obj The event data /// @return True if the data was sent successfully, false otherwise. bool channel_send_event(uint64_t id, char *type, typval_T *data); +/// Subscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_subscribe(uint64_t id, char *event); + +/// Unsubscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_unsubscribe(uint64_t id, char *event); + #endif // NVIM_OS_CHANNEL_H diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index ac79a5b882..c4a9c85d1d 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -234,7 +234,7 @@ bool job_write(int id, char *data, uint32_t len) return false; } - if (!wstream_write(job->in, data, len, true)) { + if (!wstream_write(job->in, wstream_new_buffer(data, len, false))) { job_stop(job->id); return false; } diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 1dc30be582..b1e4cc7a57 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -33,7 +33,7 @@ static void close_cb(uv_handle_t *handle); static void emit_read_event(RStream *rstream, bool eof); RStream * rstream_new(rstream_cb cb, - uint32_t buffer_size, + size_t buffer_size, void *data, bool async) { @@ -133,7 +133,7 @@ void rstream_stop(RStream *rstream) } } -size_t rstream_read(RStream *rstream, char *buf, uint32_t count) +size_t rstream_read(RStream *rstream, char *buf, size_t count) { size_t read_count = rstream->wpos - rstream->rpos; diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h index 7a565099b7..5afa864f04 100644 --- a/src/nvim/os/rstream.h +++ b/src/nvim/os/rstream.h @@ -21,7 +21,7 @@ /// this to false /// @return The newly-allocated `RStream` instance RStream * rstream_new(rstream_cb cb, - uint32_t buffer_size, + size_t buffer_size, void *data, bool async); @@ -65,7 +65,7 @@ void rstream_stop(RStream *rstream); /// @param buffer The buffer which will receive the data /// @param count Number of bytes that `buffer` can accept /// @return The number of bytes copied into `buffer` -size_t rstream_read(RStream *rstream, char *buffer, uint32_t count); +size_t rstream_read(RStream *rstream, char *buffer, size_t count); /// Returns the number of bytes available for reading from `rstream` /// diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 382c95e31d..57afdd0e8f 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -12,27 +12,27 @@ struct wstream { uv_stream_t *stream; // Memory currently used by pending buffers - uint32_t curmem; + size_t curmem; // Maximum memory used by this instance - uint32_t maxmem; + size_t maxmem; // Number of pending requests - uint32_t pending_reqs; + size_t pending_reqs; bool freed; }; +struct wbuffer { + size_t refcount, size; + char *data; +}; + typedef struct { WStream *wstream; - // Buffer containing data to be written - char *buffer; - // Size of the buffer - uint32_t length; - // If it's our responsibility to free the buffer - bool free; + WBuffer *buffer; } WriteData; static void write_cb(uv_write_t *req, int status); -WStream * wstream_new(uint32_t maxmem) +WStream * wstream_new(size_t maxmem) { WStream *rv = xmalloc(sizeof(WStream)); rv->maxmem = maxmem; @@ -59,51 +59,60 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream) wstream->stream = stream; } -bool wstream_write(WStream *wstream, char *buffer, uint32_t length, bool free) +bool wstream_write(WStream *wstream, WBuffer *buffer) { WriteData *data; uv_buf_t uvbuf; uv_write_t *req; - if (wstream->freed) { - // Don't accept write requests after the WStream instance was freed - return false; - } + // This should not be called after a wstream was freed + assert(!wstream->freed); - if (wstream->curmem + length > wstream->maxmem) { + if (wstream->curmem + buffer->size > wstream->maxmem) { return false; } - if (free) { - // We should only account for buffers that are ours to free - wstream->curmem += length; - } - + buffer->refcount++; + wstream->curmem += buffer->size; data = xmalloc(sizeof(WriteData)); data->wstream = wstream; data->buffer = buffer; - data->length = length; - data->free = free; req = xmalloc(sizeof(uv_write_t)); req->data = data; - uvbuf.base = buffer; - uvbuf.len = length; + uvbuf.base = buffer->data; + uvbuf.len = buffer->size; wstream->pending_reqs++; uv_write(req, wstream->stream, &uvbuf, 1, write_cb); return true; } +WBuffer *wstream_new_buffer(char *data, size_t size, bool copy) +{ + WBuffer *rv = xmalloc(sizeof(WBuffer)); + rv->size = size; + rv->refcount = 0; + + if (copy) { + rv->data = xmemdup(data, size); + } else { + rv->data = data; + } + + return rv; +} + static void write_cb(uv_write_t *req, int status) { WriteData *data = req->data; free(req); + data->wstream->curmem -= data->buffer->size; - if (data->free) { + if (!--data->buffer->refcount) { // Free the data written to the stream + free(data->buffer->data); free(data->buffer); - data->wstream->curmem -= data->length; } data->wstream->pending_reqs--; diff --git a/src/nvim/os/wstream.h b/src/nvim/os/wstream.h index a12d26fd5e..1f61f6afd0 100644 --- a/src/nvim/os/wstream.h +++ b/src/nvim/os/wstream.h @@ -12,7 +12,7 @@ /// /// @param maxmem Maximum amount memory used by this `WStream` instance. /// @return The newly-allocated `WStream` instance -WStream * wstream_new(uint32_t maxmem); +WStream * wstream_new(size_t maxmem); /// Frees all memory allocated for a WStream instance /// @@ -31,10 +31,19 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream); /// /// @param wstream The `WStream` instance /// @param buffer The buffer which contains data to be written -/// @param length Number of bytes that should be written from `buffer` -/// @param free If true, `buffer` will be freed after the write is complete -/// @return true if the data was successfully queued, false otherwise. -bool wstream_write(WStream *wstream, char *buffer, uint32_t length, bool free); +/// @return false if the write failed +bool wstream_write(WStream *wstream, WBuffer *buffer); + +/// Creates a WBuffer object for holding output data. Instances of this +/// object can be reused across WStream instances, and the memory is freed +/// automatically when no longer needed(it tracks the number of references +/// internally) +/// +/// @param data Data stored by the WBuffer +/// @param size The size of the data array +/// @param copy If true, the data will be copied into the WBuffer +/// @return The allocated WBuffer instance +WBuffer *wstream_new_buffer(char *data, size_t size, bool copy); #endif // NVIM_OS_WSTREAM_H diff --git a/src/nvim/os/wstream_defs.h b/src/nvim/os/wstream_defs.h index e044ecfd8e..a7565c9bc7 100644 --- a/src/nvim/os/wstream_defs.h +++ b/src/nvim/os/wstream_defs.h @@ -1,6 +1,7 @@ #ifndef NVIM_OS_WSTREAM_DEFS_H #define NVIM_OS_WSTREAM_DEFS_H +typedef struct wbuffer WBuffer; typedef struct wstream WStream; #endif // NVIM_OS_WSTREAM_DEFS_H |