aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-06-26 13:29:46 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-07-17 11:30:01 -0300
commit0e20afe37e7ad99036ab98356a3f72281e1a8017 (patch)
tree1299f1a8e7878311bcf2bd4aa74a35fc4923cff0
parent5d9c73ce70420321b92519d56c1c9e03fea94ecc (diff)
downloadrneovim-0e20afe37e7ad99036ab98356a3f72281e1a8017.tar.gz
rneovim-0e20afe37e7ad99036ab98356a3f72281e1a8017.tar.bz2
rneovim-0e20afe37e7ad99036ab98356a3f72281e1a8017.zip
wstream: Pass WBuffer refcount as a constructor parameter
This is required to handle broadcasting when the first write fails. Ref: https://github.com/tarruda/neovim/commit/11916b6b595421ce2ece10f7aa40757cc4937c0c#commitcomment-6792287
-rw-r--r--src/nvim/eval.c1
-rw-r--r--src/nvim/os/channel.c10
-rw-r--r--src/nvim/os/msgpack_rpc.c6
-rw-r--r--src/nvim/os/wstream.c27
4 files changed, 29 insertions, 15 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index d82f71b836..ec80be36c5 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -10560,6 +10560,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
strlen((char *)argvars[1].vval.v_string),
+ 1,
free);
rettv->vval.v_number = job_write(job, buf);
}
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 504c1ca05b..ae33ca31a3 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -383,7 +383,7 @@ static void send_request(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(id, method, arg, &out_buffer));
+ channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1));
}
static void send_event(Channel *channel,
@@ -391,7 +391,7 @@ static void send_event(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(0, method, arg, &out_buffer));
+ channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1));
}
static void broadcast_event(char *name, Object arg)
@@ -412,7 +412,11 @@ static void broadcast_event(char *name, Object arg)
}
String method = {.size = strlen(name), .data = name};
- WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
+ WBuffer *buffer = serialize_request(0,
+ method,
+ arg,
+ &out_buffer,
+ kv_size(subscribed));
for (size_t i = 0; i < kv_size(subscribed); i++) {
channel_write(kv_A(subscribed, i), buffer);
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
index 85569372da..402e741370 100644
--- a/src/nvim/os/msgpack_rpc.c
+++ b/src/nvim/os/msgpack_rpc.c
@@ -113,7 +113,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
WBuffer *serialize_request(uint64_t request_id,
String method,
Object arg,
- msgpack_sbuffer *sbuffer)
+ msgpack_sbuffer *sbuffer,
+ size_t refcount)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
@@ -130,6 +131,7 @@ WBuffer *serialize_request(uint64_t request_id,
msgpack_rpc_from_object(arg, &pac);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
+ refcount,
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
@@ -165,6 +167,7 @@ WBuffer *serialize_response(uint64_t response_id,
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
+ 1, // responses only go though 1 channel
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
@@ -190,6 +193,7 @@ WBuffer *serialize_metadata(uint64_t id,
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
+ 1,
free);
msgpack_sbuffer_clear(sbuffer);
return rv;
diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c
index 20aa5ddeb2..0978d33a10 100644
--- a/src/nvim/os/wstream.c
+++ b/src/nvim/os/wstream.c
@@ -92,33 +92,33 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream)
/// @return false if the write failed
bool wstream_write(WStream *wstream, WBuffer *buffer)
{
- WriteData *data;
- uv_buf_t uvbuf;
- uv_write_t *req;
-
// This should not be called after a wstream was freed
assert(!wstream->freed);
- buffer->refcount++;
-
if (wstream->curmem > wstream->maxmem) {
goto err;
}
wstream->curmem += buffer->size;
- data = xmalloc(sizeof(WriteData));
+
+ WriteData *data = xmalloc(sizeof(WriteData));
data->wstream = wstream;
data->buffer = buffer;
- req = xmalloc(sizeof(uv_write_t));
+
+ uv_write_t *req = xmalloc(sizeof(uv_write_t));
req->data = data;
+
+ uv_buf_t uvbuf;
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
- wstream->pending_reqs++;
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
+ free(data);
+ free(req);
goto err;
}
+ wstream->pending_reqs++;
return true;
err:
@@ -133,14 +133,19 @@ err:
///
/// @param data Data stored by the WBuffer
/// @param size The size of the data array
+/// @param refcount The number of references for the WBuffer. This will be used
+/// by WStream instances to decide when a WBuffer should be freed.
/// @param cb Pointer to function that will be responsible for freeing
/// the buffer data(passing 'free' will work as expected).
/// @return The allocated WBuffer instance
-WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb)
+WBuffer *wstream_new_buffer(char *data,
+ size_t size,
+ size_t refcount,
+ wbuffer_data_finalizer cb)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
- rv->refcount = 0;
+ rv->refcount = refcount;
rv->cb = cb;
rv->data = data;