aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r--src/nvim/os/channel.c89
1 files changed, 70 insertions, 19 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 9a692cf9fe..653f09756a 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -24,7 +24,7 @@ typedef struct {
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
- int job_id;
+ Job *job;
struct {
RStream *read;
WStream *write;
@@ -68,11 +68,26 @@ void channel_teardown()
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
-void channel_from_job(char **argv)
+bool channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
- channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL);
+
+ int status;
+ channel->data.job = job_start(argv,
+ channel,
+ job_out,
+ job_err,
+ job_exit,
+ true,
+ &status);
+
+ if (status <= 0) {
+ close_channel(channel);
+ return false;
+ }
+
+ return true;
}
/// Creates an API channel from a libuv stream representing a tcp or
@@ -101,12 +116,13 @@ void channel_from_stream(uv_stream_t *stream)
/// @param type The event type, an arbitrary string
/// @param obj The event data
/// @return True if the data was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *type, typval_T *data)
+bool channel_send_event(uint64_t id, char *type, Object data)
{
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ msgpack_rpc_free_object(data);
return false;
}
send_event(channel, type, data);
@@ -126,7 +142,7 @@ void channel_subscribe(uint64_t id, char *event)
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
- return;
+ abort();
}
char *event_string = pmap_get(cstr_t)(event_strings, event);
@@ -148,7 +164,7 @@ void channel_unsubscribe(uint64_t id, char *event)
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
- return;
+ abort();
}
unsubscribe(channel, event);
@@ -165,6 +181,11 @@ static void job_err(RStream *rstream, void *data, bool eof)
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
+static void job_exit(Job *job, void *data)
+{
+ // TODO(tarruda): what should be done here?
+}
+
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
Channel *channel = data;
@@ -183,30 +204,57 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
+ UnpackResult result;
+ msgpack_packer response;
// Deserialize everything we can.
- while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
+ while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
+ == kUnpackResultOk) {
// Each object is a new msgpack-rpc request and requires an empty response
- msgpack_packer response;
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
- wstream_new_buffer(channel->sbuffer->data,
+ wstream_new_buffer(xmemdup(channel->sbuffer->data,
+ channel->sbuffer->size),
channel->sbuffer->size,
- true));
+ free));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
+
+ if (result == kUnpackResultFail) {
+ // See src/msgpack/unpack_template.h in msgpack source tree for
+ // causes for this error(search for 'goto _failed')
+ //
+ // A not so uncommon cause for this might be deserializing objects with
+ // a high nesting level: msgpack will break when it's internal parse stack
+ // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
+ msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&response, 4);
+ msgpack_pack_int(&response, 1);
+ msgpack_pack_int(&response, 0);
+ msgpack_rpc_error("Invalid msgpack payload. "
+ "This error can also happen when deserializing "
+ "an object with high level of nesting",
+ &response);
+ wstream_write(channel->data.streams.write,
+ wstream_new_buffer(xmemdup(channel->sbuffer->data,
+ channel->sbuffer->size),
+ channel->sbuffer->size,
+ free));
+ // Clear the buffer for future calls
+ msgpack_sbuffer_clear(channel->sbuffer);
+ }
}
-static void send_event(Channel *channel, char *type, typval_T *data)
+static void send_event(Channel *channel, char *type, Object data)
{
wstream_write(channel->data.streams.write, serialize_event(type, data));
}
-static void broadcast_event(char *type, typval_T *data)
+static void broadcast_event(char *type, Object data)
{
kvec_t(Channel *) subscribed;
kv_init(subscribed);
@@ -219,6 +267,7 @@ static void broadcast_event(char *type, typval_T *data)
});
if (!kv_size(subscribed)) {
+ msgpack_rpc_free_object(data);
goto end;
}
@@ -255,7 +304,9 @@ static void close_channel(Channel *channel)
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
- job_stop(channel->data.job_id);
+ if (channel->data.job) {
+ job_stop(channel->data.job);
+ }
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
@@ -278,17 +329,17 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
-static WBuffer *serialize_event(char *type, typval_T *data)
+static WBuffer *serialize_event(char *type, Object data)
{
String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
- Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(event_type, event_data, &packer);
- WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data,
+ msgpack_rpc_notification(event_type, data, &packer);
+ WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data,
+ msgpack_event_buffer.size),
msgpack_event_buffer.size,
- true);
- msgpack_rpc_free_object(event_data);
+ free);
+ msgpack_rpc_free_object(data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return rv;