aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c36
1 files changed, 17 insertions, 19 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 98d5d8c6cb..6a0dc10214 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -31,8 +31,6 @@
#include "nvim/msgpack_rpc/packer.h"
#include "nvim/msgpack_rpc/unpacker.h"
#include "nvim/os/input.h"
-#include "nvim/rbuffer.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#include "nvim/ui.h"
#include "nvim/ui_client.h"
@@ -202,10 +200,25 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
return frame.errored ? NIL : frame.result;
}
-static void receive_msgpack(RStream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
+static size_t receive_msgpack(RStream *stream, const char *rbuf, size_t c, void *data, bool eof)
{
Channel *channel = data;
channel_incref(channel);
+ size_t consumed = 0;
+
+ DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
+ channel->id, c, (void *)stream);
+
+ if (c > 0) {
+ Unpacker *p = channel->rpc.unpacker;
+ p->read_ptr = rbuf;
+ p->read_size = c;
+ parse_msgpack(channel);
+
+ if (!unpacker_closed(p)) {
+ consumed = c - p->read_size;
+ }
+ }
if (eof) {
channel_close(channel->id, kChannelPartRpc, NULL);
@@ -213,25 +226,10 @@ static void receive_msgpack(RStream *stream, RBuffer *rbuf, size_t c, void *data
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
chan_close_with_error(channel, buf, LOGLVL_INF);
- goto end;
- }
-
- DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
- channel->id, rbuffer_size(rbuf), (void *)stream);
-
- Unpacker *p = channel->rpc.unpacker;
- size_t size = 0;
- p->read_ptr = rbuffer_read_ptr(rbuf, &size);
- p->read_size = size;
- parse_msgpack(channel);
-
- if (!unpacker_closed(p)) {
- size_t consumed = size - p->read_size;
- rbuffer_consumed_compact(rbuf, consumed);
}
-end:
channel_decref(channel);
+ return consumed;
}
static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id)