diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-05-28 09:05:13 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-05-28 09:05:13 -0300 |
commit | e4fe2dbd777a59a9a9b386d960eb9dddc459e84e (patch) | |
tree | aa99bd39c8a87c95f93457dcf4456caab45f7310 | |
parent | 1faf546ea286ae324c79cfef3b5613a0004182bb (diff) | |
parent | cd8421537034d0b5cc567d81afc4fa5316da171e (diff) | |
download | rneovim-e4fe2dbd777a59a9a9b386d960eb9dddc459e84e.tar.gz rneovim-e4fe2dbd777a59a9a9b386d960eb9dddc459e84e.tar.bz2 rneovim-e4fe2dbd777a59a9a9b386d960eb9dddc459e84e.zip |
Merge 'Refactor WStream to enable writing the same buffer to multiple targets'
-rw-r--r-- | scripts/msgpack-gen.lua | 25 | ||||
-rwxr-xr-x | scripts/run-api-tests.exp | 4 | ||||
-rw-r--r-- | src/nvim/api/buffer.c | 1 | ||||
-rw-r--r-- | src/nvim/api/vim.c | 19 | ||||
-rw-r--r-- | src/nvim/api/vim.h | 12 | ||||
-rw-r--r-- | src/nvim/eval.c | 2 | ||||
-rw-r--r-- | src/nvim/lib/kvec.h | 91 | ||||
-rw-r--r-- | src/nvim/os/channel.c | 174 | ||||
-rw-r--r-- | src/nvim/os/channel.h | 17 | ||||
-rw-r--r-- | src/nvim/os/job.c | 2 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 4 | ||||
-rw-r--r-- | src/nvim/os/rstream.h | 4 | ||||
-rw-r--r-- | src/nvim/os/wstream.c | 63 | ||||
-rw-r--r-- | src/nvim/os/wstream.h | 19 | ||||
-rw-r--r-- | src/nvim/os/wstream_defs.h | 1 |
15 files changed, 354 insertions, 84 deletions
diff --git a/scripts/msgpack-gen.lua b/scripts/msgpack-gen.lua index 3cc1558fd7..fac1b10b45 100644 --- a/scripts/msgpack-gen.lua +++ b/scripts/msgpack-gen.lua @@ -62,6 +62,12 @@ for i = 1, #arg - 1 do api.functions[#api.functions + 1] = tmp[i] local fn_id = #api.functions local fn = api.functions[fn_id] + if #fn.parameters ~= 0 and fn.parameters[1][2] == 'channel_id' then + -- this function should receive the channel id + fn.receives_channel_id = true + -- remove the parameter since it won't be passed by the api client + table.remove(fn.parameters, 1) + end if #fn.parameters ~= 0 and fn.parameters[#fn.parameters][1] == 'error' then -- function can fail if the last parameter type is 'Error' fn.can_fail = true @@ -111,7 +117,7 @@ end output:write([[ }; -void msgpack_rpc_dispatch(uint64_t id, msgpack_object *req, msgpack_packer *res) +void msgpack_rpc_dispatch(uint64_t channel_id, msgpack_object *req, msgpack_packer *res) { Error error = { .set = false }; uint64_t method_id = (uint32_t)req->via.array.ptr[2].via.u64; @@ -121,7 +127,7 @@ void msgpack_rpc_dispatch(uint64_t id, msgpack_object *req, msgpack_packer *res) msgpack_pack_nil(res); // The result is the [channel_id, metadata] array msgpack_pack_array(res, 2); - msgpack_pack_uint64(res, id); + msgpack_pack_uint64(res, channel_id); msgpack_pack_raw(res, sizeof(msgpack_metadata)); msgpack_pack_raw_body(res, msgpack_metadata, sizeof(msgpack_metadata)); return; @@ -169,8 +175,19 @@ for i = 1, #api.functions do output:write(fn.return_type..' rv = ') end - -- write the call without the closing parenthesis - output:write(fn.name..'('..call_args) + -- write the function name and the opening parenthesis + output:write(fn.name..'(') + + if fn.receives_channel_id then + -- if the function receives the channel id, pass it as first argument + if #args > 0 then + output:write('channel_id, '..call_args) + else + output:write('channel_id)') + end + else + output:write(call_args) + end if fn.can_fail then -- if the function can fail, also pass a pointer to the local error object diff --git a/scripts/run-api-tests.exp b/scripts/run-api-tests.exp index 6a568e17cb..0cfa512626 100755 --- a/scripts/run-api-tests.exp +++ b/scripts/run-api-tests.exp @@ -28,9 +28,9 @@ send { for group in split(groups) exe 'augroup '.group autocmd! - augroup NONE - exe 'augroup! '.group + augroup END endfor + autocmd! tabnew let curbufnum = eval(bufnr('%')) redir => buflist diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c index 4721045048..d2b4cafaac 100644 --- a/src/nvim/api/buffer.c +++ b/src/nvim/api/buffer.c @@ -121,6 +121,7 @@ end: } free(rv.items); + rv.items = NULL; } return rv; diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index 694781e0a3..39e2c32d6d 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -7,6 +7,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/buffer.h" +#include "nvim/os/channel.h" #include "nvim/vim.h" #include "nvim/buffer.h" #include "nvim/window.h" @@ -327,6 +328,24 @@ void vim_set_current_tabpage(Tabpage tabpage, Error *err) try_end(err); } +void vim_subscribe(uint64_t channel_id, String event) +{ + size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN); + char e[EVENT_MAXLEN + 1]; + memcpy(e, event.data, length); + e[length] = NUL; + channel_subscribe(channel_id, e); +} + +void vim_unsubscribe(uint64_t channel_id, String event) +{ + size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN); + char e[EVENT_MAXLEN + 1]; + memcpy(e, event.data, length); + e[length] = NUL; + channel_unsubscribe(channel_id, e); +} + static void write_msg(String message, bool to_err) { static int pos = 0; diff --git a/src/nvim/api/vim.h b/src/nvim/api/vim.h index acfab11cf7..4d1ac9023e 100644 --- a/src/nvim/api/vim.h +++ b/src/nvim/api/vim.h @@ -155,5 +155,17 @@ Tabpage vim_get_current_tabpage(void); /// @param[out] err Details of an error that may have occurred void vim_set_current_tabpage(Tabpage tabpage, Error *err); +/// Subscribes to event broadcasts +/// +/// @param channel_id The channel id(passed automatically by the dispatcher) +/// @param event The event type string +void vim_subscribe(uint64_t channel_id, String event); + +/// Unsubscribes to event broadcasts +/// +/// @param channel_id The channel id(passed automatically by the dispatcher) +/// @param event The event type string +void vim_unsubscribe(uint64_t channel_id, String event); + #endif // NVIM_API_VIM_H diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 87054aa9df..83bbd9a24c 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -13069,7 +13069,7 @@ static void f_send_event(typval_T *argvars, typval_T *rettv) return; } - if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) { + if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number < 0) { EMSG2(_(e_invarg2), "Channel id must be a positive integer"); return; } diff --git a/src/nvim/lib/kvec.h b/src/nvim/lib/kvec.h new file mode 100644 index 0000000000..fe17afb7c2 --- /dev/null +++ b/src/nvim/lib/kvec.h @@ -0,0 +1,91 @@ +/* The MIT License + + Copyright (c) 2008, by Attractive Chaos <attractor@live.co.uk> + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +/* + An example: + +#include "kvec.h" +int main() { + kvec_t(int) array; + kv_init(array); + kv_push(int, array, 10); // append + kv_a(int, array, 20) = 5; // dynamic + kv_A(array, 20) = 4; // static + kv_destroy(array); + return 0; +} +*/ + +/* + 2008-09-22 (0.1.0): + + * The initial version. + +*/ + +#ifndef AC_KVEC_H +#define AC_KVEC_H + +#include <stdlib.h> +#include "nvim/memory.h" + +#define kv_roundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x)) + +#define kvec_t(type) struct { size_t n, m; type *a; } +#define kv_init(v) ((v).n = (v).m = 0, (v).a = 0) +#define kv_destroy(v) free((v).a) +#define kv_A(v, i) ((v).a[(i)]) +#define kv_pop(v) ((v).a[--(v).n]) +#define kv_size(v) ((v).n) +#define kv_max(v) ((v).m) + +#define kv_resize(type, v, s) ((v).m = (s), (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m)) + +#define kv_copy(type, v1, v0) do { \ + if ((v1).m < (v0).n) kv_resize(type, v1, (v0).n); \ + (v1).n = (v0).n; \ + memcpy((v1).a, (v0).a, sizeof(type) * (v0).n); \ + } while (0) \ + +#define kv_push(type, v, x) do { \ + if ((v).n == (v).m) { \ + (v).m = (v).m? (v).m<<1 : 2; \ + (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m); \ + } \ + (v).a[(v).n++] = (x); \ + } while (0) + +#define kv_pushp(type, v) (((v).n == (v).m)? \ + ((v).m = ((v).m? (v).m<<1 : 2), \ + (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \ + : 0), ((v).a + ((v).n++)) + +#define kv_a(type, v, i) (((v).m <= (size_t)(i)? \ + ((v).m = (v).n = (i) + 1, kv_roundup32((v).m), \ + (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \ + : (v).n <= (size_t)(i)? (v).n = (i) + 1 \ + : 0), (v).a[(i)]) + +#endif diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 2923ab0912..b8cceede6f 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -15,9 +15,11 @@ #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/map.h" +#include "nvim/lib/kvec.h" typedef struct { uint64_t id; + Map(cstr_t) *subscribed_events; bool is_job; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; @@ -33,17 +35,24 @@ typedef struct { static uint64_t next_id = 1; static Map(uint64_t) *channels = NULL; +static Map(cstr_t) *event_strings = NULL; static msgpack_sbuffer msgpack_event_buffer; -static void on_job_stdout(RStream *rstream, void *data, bool eof); -static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void job_out(RStream *rstream, void *data, bool eof); +static void job_err(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); +static void send_event(Channel *channel, char *type, typval_T *data); +static void broadcast_event(char *type, typval_T *data); +static void unsubscribe(Channel *channel, char *event); static void close_channel(Channel *channel); static void close_cb(uv_handle_t *handle); +static WBuffer *serialize_event(char *type, typval_T *data); +static Channel *register_channel(void); void channel_init() { channels = map_new(uint64_t)(); + event_strings = map_new(cstr_t)(); msgpack_sbuffer_init(&msgpack_event_buffer); } @@ -62,71 +71,78 @@ void channel_teardown() void channel_from_job(char **argv) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = on_job_stdout; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - - channel->id = next_id++; + Channel *channel = register_channel(); channel->is_job = true; - channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); - map_put(uint64_t)(channels, channel->id, channel); + channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL); } void channel_from_stream(uv_stream_t *stream) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = parse_msgpack; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - + Channel *channel = register_channel(); stream->data = NULL; - channel->id = next_id++; channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(rcb, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream channel->data.streams.write = wstream_new(1024 * 1024); wstream_set_stream(channel->data.streams.write, stream); channel->data.streams.uv = stream; - map_put(uint64_t)(channels, channel->id, channel); } bool channel_send_event(uint64_t id, char *type, typval_T *data) { - Channel *channel = map_get(uint64_t)(channels, id); + Channel *channel = NULL; - if (!channel) { - return false; + if (id > 0) { + if (!(channel = map_get(uint64_t)(channels, id))) { + return false; + } + send_event(channel, type, data); + } else { + broadcast_event(type, data); } - String event_type = {.size = strnlen(type, 1024), .data = type}; - Object event_data = vim_to_object(data); - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, event_data, &packer); - char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); + return true; +} - wstream_write(channel->data.streams.write, - bytes, - msgpack_event_buffer.size, - true); +void channel_subscribe(uint64_t id, char *event) +{ + Channel *channel; - msgpack_rpc_free_object(event_data); - msgpack_sbuffer_clear(&msgpack_event_buffer); + if (!(channel = map_get(uint64_t)(channels, id))) { + return; + } - return true; + char *event_string = map_get(cstr_t)(event_strings, event); + + if (!event_string) { + event_string = xstrdup(event); + map_put(cstr_t)(event_strings, event_string, event_string); + } + + map_put(cstr_t)(channel->subscribed_events, event_string, event_string); +} + +void channel_unsubscribe(uint64_t id, char *event) +{ + Channel *channel; + + if (!(channel = map_get(uint64_t)(channels, id))) { + return; + } + + unsubscribe(channel, event); } -static void on_job_stdout(RStream *rstream, void *data, bool eof) +static void job_out(RStream *rstream, void *data, bool eof) { Job *job = data; parse_msgpack(rstream, job_data(job), eof); } -static void on_job_stderr(RStream *rstream, void *data, bool eof) +static void job_err(RStream *rstream, void *data, bool eof) { // TODO(tarruda): plugin error messages should be sent to the error buffer } @@ -158,15 +174,62 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); wstream_write(channel->data.streams.write, - xmemdup(channel->sbuffer->data, channel->sbuffer->size), - channel->sbuffer->size, - true); + wstream_new_buffer(channel->sbuffer->data, + channel->sbuffer->size, + true)); // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } } +static void send_event(Channel *channel, char *type, typval_T *data) +{ + wstream_write(channel->data.streams.write, serialize_event(type, data)); +} + +static void broadcast_event(char *type, typval_T *data) +{ + kvec_t(Channel *) subscribed; + kv_init(subscribed); + Channel *channel; + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, type)) { + kv_push(Channel *, subscribed, channel); + } + }); + + if (!kv_size(subscribed)) { + goto end; + } + + WBuffer *buffer = serialize_event(type, data); + + for (size_t i = 0; i < kv_size(subscribed); i++) { + wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + } + +end: + kv_destroy(subscribed); +} + +static void unsubscribe(Channel *channel, char *event) +{ + char *event_string = map_get(cstr_t)(event_strings, event); + map_del(cstr_t)(channel->subscribed_events, event_string); + + map_foreach_value(channels, channel, { + if (map_has(cstr_t)(channel->subscribed_events, event_string)) { + return; + } + }); + + // Since the string is no longer used by other channels, release it's memory + map_del(cstr_t)(event_strings, event_string); + free(event_string); +} + static void close_channel(Channel *channel) { map_del(uint64_t)(channels, channel->id); @@ -181,6 +244,13 @@ static void close_channel(Channel *channel) uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); } + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + map_free(cstr_t)(channel->subscribed_events); free(channel); } @@ -190,3 +260,29 @@ static void close_cb(uv_handle_t *handle) free(handle); } +static WBuffer *serialize_event(char *type, typval_T *data) +{ + String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type}; + Object event_data = vim_to_object(data); + msgpack_packer packer; + msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); + msgpack_rpc_notification(event_type, event_data, &packer); + WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data, + msgpack_event_buffer.size, + true); + msgpack_rpc_free_object(event_data); + msgpack_sbuffer_clear(&msgpack_event_buffer); + + return rv; +} + +static Channel *register_channel() +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->sbuffer = msgpack_sbuffer_new(); + rv->id = next_id++; + rv->subscribed_events = map_new(cstr_t)(); + map_put(uint64_t)(channels, rv->id, rv); + return rv; +} diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h index a1c650a378..b88cd2445f 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/os/channel.h @@ -5,6 +5,8 @@ #include "nvim/vim.h" +#define EVENT_MAXLEN 512 + /// Initializes the module void channel_init(void); @@ -25,11 +27,24 @@ void channel_from_job(char **argv); /// Sends event/data to channel /// -/// @param id The channel id +/// @param id The channel id. If 0, the event will be sent to all +/// channels that have subscribed to the event type /// @param type The event type, an arbitrary string /// @param obj The event data /// @return True if the data was sent successfully, false otherwise. bool channel_send_event(uint64_t id, char *type, typval_T *data); +/// Subscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_subscribe(uint64_t id, char *event); + +/// Unsubscribes to event broadcasts +/// +/// @param id The channel id +/// @param event The event type string +void channel_unsubscribe(uint64_t id, char *event); + #endif // NVIM_OS_CHANNEL_H diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index ac79a5b882..c4a9c85d1d 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -234,7 +234,7 @@ bool job_write(int id, char *data, uint32_t len) return false; } - if (!wstream_write(job->in, data, len, true)) { + if (!wstream_write(job->in, wstream_new_buffer(data, len, false))) { job_stop(job->id); return false; } diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 1dc30be582..b1e4cc7a57 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -33,7 +33,7 @@ static void close_cb(uv_handle_t *handle); static void emit_read_event(RStream *rstream, bool eof); RStream * rstream_new(rstream_cb cb, - uint32_t buffer_size, + size_t buffer_size, void *data, bool async) { @@ -133,7 +133,7 @@ void rstream_stop(RStream *rstream) } } -size_t rstream_read(RStream *rstream, char *buf, uint32_t count) +size_t rstream_read(RStream *rstream, char *buf, size_t count) { size_t read_count = rstream->wpos - rstream->rpos; diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h index 7a565099b7..5afa864f04 100644 --- a/src/nvim/os/rstream.h +++ b/src/nvim/os/rstream.h @@ -21,7 +21,7 @@ /// this to false /// @return The newly-allocated `RStream` instance RStream * rstream_new(rstream_cb cb, - uint32_t buffer_size, + size_t buffer_size, void *data, bool async); @@ -65,7 +65,7 @@ void rstream_stop(RStream *rstream); /// @param buffer The buffer which will receive the data /// @param count Number of bytes that `buffer` can accept /// @return The number of bytes copied into `buffer` -size_t rstream_read(RStream *rstream, char *buffer, uint32_t count); +size_t rstream_read(RStream *rstream, char *buffer, size_t count); /// Returns the number of bytes available for reading from `rstream` /// diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 382c95e31d..57afdd0e8f 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -12,27 +12,27 @@ struct wstream { uv_stream_t *stream; // Memory currently used by pending buffers - uint32_t curmem; + size_t curmem; // Maximum memory used by this instance - uint32_t maxmem; + size_t maxmem; // Number of pending requests - uint32_t pending_reqs; + size_t pending_reqs; bool freed; }; +struct wbuffer { + size_t refcount, size; + char *data; +}; + typedef struct { WStream *wstream; - // Buffer containing data to be written - char *buffer; - // Size of the buffer - uint32_t length; - // If it's our responsibility to free the buffer - bool free; + WBuffer *buffer; } WriteData; static void write_cb(uv_write_t *req, int status); -WStream * wstream_new(uint32_t maxmem) +WStream * wstream_new(size_t maxmem) { WStream *rv = xmalloc(sizeof(WStream)); rv->maxmem = maxmem; @@ -59,51 +59,60 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream) wstream->stream = stream; } -bool wstream_write(WStream *wstream, char *buffer, uint32_t length, bool free) +bool wstream_write(WStream *wstream, WBuffer *buffer) { WriteData *data; uv_buf_t uvbuf; uv_write_t *req; - if (wstream->freed) { - // Don't accept write requests after the WStream instance was freed - return false; - } + // This should not be called after a wstream was freed + assert(!wstream->freed); - if (wstream->curmem + length > wstream->maxmem) { + if (wstream->curmem + buffer->size > wstream->maxmem) { return false; } - if (free) { - // We should only account for buffers that are ours to free - wstream->curmem += length; - } - + buffer->refcount++; + wstream->curmem += buffer->size; data = xmalloc(sizeof(WriteData)); data->wstream = wstream; data->buffer = buffer; - data->length = length; - data->free = free; req = xmalloc(sizeof(uv_write_t)); req->data = data; - uvbuf.base = buffer; - uvbuf.len = length; + uvbuf.base = buffer->data; + uvbuf.len = buffer->size; wstream->pending_reqs++; uv_write(req, wstream->stream, &uvbuf, 1, write_cb); return true; } +WBuffer *wstream_new_buffer(char *data, size_t size, bool copy) +{ + WBuffer *rv = xmalloc(sizeof(WBuffer)); + rv->size = size; + rv->refcount = 0; + + if (copy) { + rv->data = xmemdup(data, size); + } else { + rv->data = data; + } + + return rv; +} + static void write_cb(uv_write_t *req, int status) { WriteData *data = req->data; free(req); + data->wstream->curmem -= data->buffer->size; - if (data->free) { + if (!--data->buffer->refcount) { // Free the data written to the stream + free(data->buffer->data); free(data->buffer); - data->wstream->curmem -= data->length; } data->wstream->pending_reqs--; diff --git a/src/nvim/os/wstream.h b/src/nvim/os/wstream.h index a12d26fd5e..1f61f6afd0 100644 --- a/src/nvim/os/wstream.h +++ b/src/nvim/os/wstream.h @@ -12,7 +12,7 @@ /// /// @param maxmem Maximum amount memory used by this `WStream` instance. /// @return The newly-allocated `WStream` instance -WStream * wstream_new(uint32_t maxmem); +WStream * wstream_new(size_t maxmem); /// Frees all memory allocated for a WStream instance /// @@ -31,10 +31,19 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream); /// /// @param wstream The `WStream` instance /// @param buffer The buffer which contains data to be written -/// @param length Number of bytes that should be written from `buffer` -/// @param free If true, `buffer` will be freed after the write is complete -/// @return true if the data was successfully queued, false otherwise. -bool wstream_write(WStream *wstream, char *buffer, uint32_t length, bool free); +/// @return false if the write failed +bool wstream_write(WStream *wstream, WBuffer *buffer); + +/// Creates a WBuffer object for holding output data. Instances of this +/// object can be reused across WStream instances, and the memory is freed +/// automatically when no longer needed(it tracks the number of references +/// internally) +/// +/// @param data Data stored by the WBuffer +/// @param size The size of the data array +/// @param copy If true, the data will be copied into the WBuffer +/// @return The allocated WBuffer instance +WBuffer *wstream_new_buffer(char *data, size_t size, bool copy); #endif // NVIM_OS_WSTREAM_H diff --git a/src/nvim/os/wstream_defs.h b/src/nvim/os/wstream_defs.h index e044ecfd8e..a7565c9bc7 100644 --- a/src/nvim/os/wstream_defs.h +++ b/src/nvim/os/wstream_defs.h @@ -1,6 +1,7 @@ #ifndef NVIM_OS_WSTREAM_DEFS_H #define NVIM_OS_WSTREAM_DEFS_H +typedef struct wbuffer WBuffer; typedef struct wstream WStream; #endif // NVIM_OS_WSTREAM_DEFS_H |