aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os/channel.c
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-07-17 12:06:31 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-07-17 12:06:31 -0300
commit953d61cbf82d5f1acd68bd1ae2101d92f5ec5492 (patch)
treed4aa1fe08ad3f0a7e27b6628aad4925cd1fbfb2a /src/nvim/os/channel.c
parentb92630c2fff7950141630f0d62b11404d0589ece (diff)
parent4dc642aa622cfac09f2f4752907137d68d8508fe (diff)
downloadrneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.gz
rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.bz2
rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.zip
Merge PR #895 'Core service providers...'
Diffstat (limited to 'src/nvim/os/channel.c')
-rw-r--r--src/nvim/os/channel.c131
1 files changed, 65 insertions, 66 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 504c1ca05b..d5f29aa667 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -6,6 +6,7 @@
#include <msgpack.h>
#include "nvim/api/private/helpers.h"
+#include "nvim/api/vim.h"
#include "nvim/os/channel.h"
#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
@@ -17,9 +18,11 @@
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/vim.h"
+#include "nvim/ascii.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/map.h"
+#include "nvim/log.h"
#include "nvim/lib/kvec.h"
typedef struct {
@@ -81,7 +84,8 @@ void channel_teardown(void)
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
-bool channel_from_job(char **argv)
+/// @return The channel id
+uint64_t channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
@@ -91,17 +95,17 @@ bool channel_from_job(char **argv)
channel,
job_out,
job_err,
- job_exit,
+ NULL,
true,
0,
&status);
if (status <= 0) {
close_channel(channel);
- return false;
+ return 0;
}
- return true;
+ return channel->id;
}
/// Creates an API channel from a libuv stream representing a tcp or
@@ -114,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream)
stream->data = NULL;
channel->is_job = false;
// read stream
- channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true);
+ channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
@@ -123,6 +127,13 @@ void channel_from_stream(uv_stream_t *stream)
channel->data.streams.uv = stream;
}
+bool channel_exists(uint64_t id)
+{
+ Channel *channel;
+ return (channel = pmap_get(uint64_t)(channels, id)) != NULL
+ && channel->enabled;
+}
+
/// Sends event/data to channel
///
/// @param id The channel id. If 0, the event will be sent to all
@@ -135,7 +146,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
Channel *channel = NULL;
if (id > 0) {
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@@ -155,7 +166,7 @@ bool channel_send_call(uint64_t id,
{
Channel *channel = NULL;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@@ -170,22 +181,18 @@ bool channel_send_call(uint64_t id,
"while processing a RPC call",
channel->id);
*result = STRING_OBJ(cstr_to_string(buf));
+ msgpack_rpc_free_object(arg);
+ return false;
}
uint64_t request_id = channel->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, name, arg);
- if (!kv_size(channel->call_stack)) {
- // This is the first frame, we must disable event deferral for this
- // channel because we won't be returning until the client sends a
- // response
- if (channel->is_job) {
- job_set_defer(channel->data.job, false);
- } else {
- rstream_set_defer(channel->data.streams.read, false);
- }
- }
+ EventSource channel_source = channel->is_job
+ ? job_event_source(channel->data.job)
+ : rstream_event_source(channel->data.streams.read);
+ EventSource sources[] = {channel_source, NULL};
// Push the frame
ChannelCallFrame frame = {request_id, false, NIL};
@@ -193,24 +200,18 @@ bool channel_send_call(uint64_t id,
size_t size = kv_size(channel->call_stack);
do {
- event_poll(-1);
+ event_poll(-1, sources);
} while (
// Continue running if ...
channel->enabled && // the channel is still enabled
kv_size(channel->call_stack) >= size); // the call didn't return
- if (!kv_size(channel->call_stack)) {
- // Popped last frame, restore event deferral
- if (channel->is_job) {
- job_set_defer(channel->data.job, true);
- } else {
- rstream_set_defer(channel->data.streams.read, true);
- }
- if (!channel->enabled && !channel->rpc_call_level) {
+ if (!(kv_size(channel->call_stack)
+ || channel->enabled
+ || channel->rpc_call_level)) {
// Close the channel if it has been disabled and we have not been called
// by `parse_msgpack`(It would be unsafe to close the channel otherwise)
close_channel(channel);
- }
}
*errored = frame.errored;
@@ -227,7 +228,7 @@ void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@@ -249,7 +250,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@@ -264,12 +265,15 @@ static void job_out(RStream *rstream, void *data, bool eof)
static void job_err(RStream *rstream, void *data, bool eof)
{
- // TODO(tarruda): plugin error messages should be sent to the error buffer
-}
-
-static void job_exit(Job *job, void *data)
-{
- // TODO(tarruda): what should be done here?
+ size_t count;
+ char buf[256];
+ Channel *channel = job_data(data);
+
+ while ((count = rstream_available(rstream))) {
+ size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
+ buf[read] = NUL;
+ ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
+ }
}
static void parse_msgpack(RStream *rstream, void *data, bool eof)
@@ -283,12 +287,15 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
- disable_channel(channel, buf);
+ call_set_error(channel, buf);
return;
}
channel->rpc_call_level++;
uint32_t count = rstream_available(rstream);
+ DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
+ count,
+ rstream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@@ -313,7 +320,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
" a matching id for the current RPC call. Ensure the client "
" is properly synchronized",
channel->id);
- call_stack_unwind(channel, buf, 1);
+ call_set_error(channel, buf);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
@@ -366,7 +373,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed due to a failed write",
channel->id);
- disable_channel(channel, buf);
+ call_set_error(channel, buf);
}
return success;
@@ -383,7 +390,7 @@ static void send_request(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(id, method, arg, &out_buffer));
+ channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1));
}
static void send_event(Channel *channel,
@@ -391,7 +398,7 @@ static void send_event(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(0, method, arg, &out_buffer));
+ channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1));
}
static void broadcast_event(char *name, Object arg)
@@ -412,7 +419,11 @@ static void broadcast_event(char *name, Object arg)
}
String method = {.size = strlen(name), .data = name};
- WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
+ WBuffer *buffer = serialize_request(0,
+ method,
+ arg,
+ &out_buffer,
+ kv_size(subscribed));
for (size_t i = 0; i < kv_size(subscribed); i++) {
channel_write(kv_A(subscribed, i), buffer);
@@ -443,6 +454,15 @@ static void close_channel(Channel *channel)
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
+ // Unsubscribe from all events
+ char *event_string;
+ map_foreach_value(channel->subscribed_events, event_string, {
+ unsubscribe(channel, event_string);
+ });
+
+ pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
+
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
@@ -453,14 +473,6 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
- // Unsubscribe from all events
- char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
- unsubscribe(channel, event_string);
- });
-
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
free(channel);
}
@@ -503,10 +515,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
static void call_stack_pop(msgpack_object *obj, Channel *channel)
{
- ChannelCallFrame *frame = kv_A(channel->call_stack,
- kv_size(channel->call_stack) - 1);
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
- (void)kv_pop(channel->call_stack);
if (frame->errored) {
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
@@ -515,24 +525,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
}
}
-static void call_stack_unwind(Channel *channel, char *msg, int count)
+static void call_set_error(Channel *channel, char *msg)
{
- while (kv_size(channel->call_stack) && count--) {
+ for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
-}
-static void disable_channel(Channel *channel, char *msg)
-{
- if (kv_size(channel->call_stack)) {
- // Channel is currently in the middle of a call, remove all frames and mark
- // it as "dead"
- channel->enabled = false;
- call_stack_unwind(channel, msg, -1);
- } else {
- // Safe to close it now
- close_channel(channel);
- }
+ channel->enabled = false;
}