aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c65
1 files changed, 30 insertions, 35 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 5737a0440f..626312b666 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,8 +1,5 @@
#include <assert.h>
#include <inttypes.h>
-#include <msgpack/object.h>
-#include <msgpack/sbuffer.h>
-#include <msgpack/unpack.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
@@ -17,7 +14,7 @@
#include "nvim/event/defs.h"
#include "nvim/event/loop.h"
#include "nvim/event/multiqueue.h"
-#include "nvim/event/process.h"
+#include "nvim/event/proc.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/globals.h"
@@ -31,8 +28,6 @@
#include "nvim/msgpack_rpc/packer.h"
#include "nvim/msgpack_rpc/unpacker.h"
#include "nvim/os/input.h"
-#include "nvim/rbuffer.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/types_defs.h"
#include "nvim/ui.h"
#include "nvim/ui_client.h"
@@ -85,11 +80,11 @@ void rpc_start(Channel *channel)
rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker);
unpacker_init(rpc->unpacker);
rpc->next_request_id = 1;
- rpc->info = (Dictionary)ARRAY_DICT_INIT;
+ rpc->info = (Dict)ARRAY_DICT_INIT;
kv_init(rpc->call_stack);
if (channel->streamtype != kChannelStreamInternal) {
- Stream *out = channel_outstream(channel);
+ RStream *out = channel_outstream(channel);
#ifdef NVIM_LOG_DEBUG
Stream *in = channel_instream(channel);
DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
@@ -202,10 +197,25 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
return frame.errored ? NIL : frame.result;
}
-static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
+static size_t receive_msgpack(RStream *stream, const char *rbuf, size_t c, void *data, bool eof)
{
Channel *channel = data;
channel_incref(channel);
+ size_t consumed = 0;
+
+ DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
+ channel->id, c, (void *)stream);
+
+ if (c > 0) {
+ Unpacker *p = channel->rpc.unpacker;
+ p->read_ptr = rbuf;
+ p->read_size = c;
+ parse_msgpack(channel);
+
+ if (!unpacker_closed(p)) {
+ consumed = c - p->read_size;
+ }
+ }
if (eof) {
channel_close(channel->id, kChannelPartRpc, NULL);
@@ -213,25 +223,10 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
chan_close_with_error(channel, buf, LOGLVL_INF);
- goto end;
- }
-
- DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
- channel->id, rbuffer_size(rbuf), (void *)stream);
-
- Unpacker *p = channel->rpc.unpacker;
- size_t size = 0;
- p->read_ptr = rbuffer_read_ptr(rbuf, &size);
- p->read_size = size;
- parse_msgpack(channel);
-
- if (!unpacker_closed(p)) {
- size_t consumed = size - p->read_size;
- rbuffer_consumed_compact(rbuf, consumed);
}
-end:
channel_decref(channel);
+ return consumed;
}
static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id)
@@ -505,7 +500,7 @@ void rpc_free(Channel *channel)
xfree(channel->rpc.unpacker);
kv_destroy(channel->rpc.call_stack);
- api_free_dictionary(channel->rpc.info);
+ api_free_dict(channel->rpc.info);
}
static void chan_close_with_error(Channel *channel, char *msg, int loglevel)
@@ -592,16 +587,16 @@ static void packer_buffer_init_channels(Channel **chans, size_t nchans, PackerBu
packer->endptr = packer->startptr + ARENA_BLOCK_SIZE;
packer->packer_flush = channel_flush_callback;
packer->anydata = chans;
- packer->anylen = nchans;
+ packer->anyint = (int64_t)nchans;
}
static void packer_buffer_finish_channels(PackerBuffer *packer)
{
size_t len = (size_t)(packer->ptr - packer->startptr);
if (len > 0) {
- WBuffer *buf = wstream_new_buffer(packer->startptr, len, packer->anylen, free_block);
+ WBuffer *buf = wstream_new_buffer(packer->startptr, len, (size_t)packer->anyint, free_block);
Channel **chans = packer->anydata;
- for (size_t i = 0; i < packer->anylen; i++) {
+ for (int64_t i = 0; i < packer->anyint; i++) {
channel_write(chans[i], buf);
}
} else {
@@ -612,17 +607,17 @@ static void packer_buffer_finish_channels(PackerBuffer *packer)
static void channel_flush_callback(PackerBuffer *packer)
{
packer_buffer_finish_channels(packer);
- packer_buffer_init_channels(packer->anydata, packer->anylen, packer);
+ packer_buffer_init_channels(packer->anydata, (size_t)packer->anyint, packer);
}
-void rpc_set_client_info(uint64_t id, Dictionary info)
+void rpc_set_client_info(uint64_t id, Dict info)
{
Channel *chan = find_rpc_channel(id);
if (!chan) {
abort();
}
- api_free_dictionary(chan->rpc.info);
+ api_free_dict(chan->rpc.info);
chan->rpc.info = info;
// Parse "type" on "info" and set "client_type"
@@ -646,9 +641,9 @@ void rpc_set_client_info(uint64_t id, Dictionary info)
channel_info_changed(chan, false);
}
-Dictionary rpc_client_info(Channel *chan)
+Dict rpc_client_info(Channel *chan)
{
- return copy_dictionary(chan->rpc.info, NULL);
+ return copy_dict(chan->rpc.info, NULL);
}
const char *get_client_info(Channel *chan, const char *key)
@@ -657,7 +652,7 @@ const char *get_client_info(Channel *chan, const char *key)
if (!chan->is_rpc) {
return NULL;
}
- Dictionary info = chan->rpc.info;
+ Dict info = chan->rpc.info;
for (size_t i = 0; i < info.size; i++) {
if (strequal(key, info.items[i].key.data)
&& info.items[i].value.type == kObjectTypeString) {