diff options
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r-- | src/nvim/os/channel.c | 89 |
1 files changed, 70 insertions, 19 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 9a692cf9fe..653f09756a 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -24,7 +24,7 @@ typedef struct { msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; union { - int job_id; + Job *job; struct { RStream *read; WStream *write; @@ -68,11 +68,26 @@ void channel_teardown() /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process -void channel_from_job(char **argv) +bool channel_from_job(char **argv) { Channel *channel = register_channel(); channel->is_job = true; - channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL); + + int status; + channel->data.job = job_start(argv, + channel, + job_out, + job_err, + job_exit, + true, + &status); + + if (status <= 0) { + close_channel(channel); + return false; + } + + return true; } /// Creates an API channel from a libuv stream representing a tcp or @@ -101,12 +116,13 @@ void channel_from_stream(uv_stream_t *stream) /// @param type The event type, an arbitrary string /// @param obj The event data /// @return True if the data was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *type, typval_T *data) +bool channel_send_event(uint64_t id, char *type, Object data) { Channel *channel = NULL; if (id > 0) { if (!(channel = pmap_get(uint64_t)(channels, id))) { + msgpack_rpc_free_object(data); return false; } send_event(channel, type, data); @@ -126,7 +142,7 @@ void channel_subscribe(uint64_t id, char *event) Channel *channel; if (!(channel = pmap_get(uint64_t)(channels, id))) { - return; + abort(); } char *event_string = pmap_get(cstr_t)(event_strings, event); @@ -148,7 +164,7 @@ void channel_unsubscribe(uint64_t id, char *event) Channel *channel; if (!(channel = pmap_get(uint64_t)(channels, id))) { - return; + abort(); } unsubscribe(channel, event); @@ -165,6 +181,11 @@ static void job_err(RStream *rstream, void *data, bool eof) // TODO(tarruda): plugin error messages should be sent to the error buffer } +static void job_exit(Job *job, void *data) +{ + // TODO(tarruda): what should be done here? +} + static void parse_msgpack(RStream *rstream, void *data, bool eof) { Channel *channel = data; @@ -183,30 +204,57 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); + UnpackResult result; + msgpack_packer response; // Deserialize everything we can. - while (msgpack_unpacker_next(channel->unpacker, &unpacked)) { + while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked)) + == kUnpackResultOk) { // Each object is a new msgpack-rpc request and requires an empty response - msgpack_packer response; msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); wstream_write(channel->data.streams.write, - wstream_new_buffer(channel->sbuffer->data, + wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), channel->sbuffer->size, - true)); + free)); // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } + + if (result == kUnpackResultFail) { + // See src/msgpack/unpack_template.h in msgpack source tree for + // causes for this error(search for 'goto _failed') + // + // A not so uncommon cause for this might be deserializing objects with + // a high nesting level: msgpack will break when it's internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&response, 4); + msgpack_pack_int(&response, 1); + msgpack_pack_int(&response, 0); + msgpack_rpc_error("Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting", + &response); + wstream_write(channel->data.streams.write, + wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), + channel->sbuffer->size, + free)); + // Clear the buffer for future calls + msgpack_sbuffer_clear(channel->sbuffer); + } } -static void send_event(Channel *channel, char *type, typval_T *data) +static void send_event(Channel *channel, char *type, Object data) { wstream_write(channel->data.streams.write, serialize_event(type, data)); } -static void broadcast_event(char *type, typval_T *data) +static void broadcast_event(char *type, Object data) { kvec_t(Channel *) subscribed; kv_init(subscribed); @@ -219,6 +267,7 @@ static void broadcast_event(char *type, typval_T *data) }); if (!kv_size(subscribed)) { + msgpack_rpc_free_object(data); goto end; } @@ -255,7 +304,9 @@ static void close_channel(Channel *channel) msgpack_unpacker_free(channel->unpacker); if (channel->is_job) { - job_stop(channel->data.job_id); + if (channel->data.job) { + job_stop(channel->data.job); + } } else { rstream_free(channel->data.streams.read); wstream_free(channel->data.streams.write); @@ -278,17 +329,17 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static WBuffer *serialize_event(char *type, typval_T *data) +static WBuffer *serialize_event(char *type, Object data) { String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type}; - Object event_data = vim_to_object(data); msgpack_packer packer; msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, event_data, &packer); - WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data, + msgpack_rpc_notification(event_type, data, &packer); + WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data, + msgpack_event_buffer.size), msgpack_event_buffer.size, - true); - msgpack_rpc_free_object(event_data); + free); + msgpack_rpc_free_object(data); msgpack_sbuffer_clear(&msgpack_event_buffer); return rv; |