aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c124
1 files changed, 64 insertions, 60 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 3932fa1f36..05badc72d4 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -10,10 +10,8 @@
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
#include "nvim/event/loop.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/msgpack_rpc/helpers.h"
@@ -34,6 +32,12 @@
#define log_server_msg(...)
#endif
+typedef enum {
+ kChannelTypeSocket,
+ kChannelTypeJob,
+ kChannelTypeStdio
+} ChannelType;
+
typedef struct {
uint64_t request_id;
bool returned, errored;
@@ -45,15 +49,16 @@ typedef struct {
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
- bool is_job, closed;
+ bool closed;
+ ChannelType type;
msgpack_unpacker *unpacker;
union {
Job *job;
+ Stream stream;
struct {
- RStream *read;
- WStream *write;
- uv_stream_t *uv;
- } streams;
+ Stream in;
+ Stream out;
+ } std;
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
@@ -112,8 +117,7 @@ void channel_teardown(void)
/// 0, on error.
uint64_t channel_from_job(char **argv)
{
- Channel *channel = register_channel();
- channel->is_job = true;
+ Channel *channel = register_channel(kChannelTypeJob);
incref(channel); // job channels are only closed by the exit_cb
int status;
@@ -140,21 +144,15 @@ uint64_t channel_from_job(char **argv)
/// pipe/socket client connection
///
/// @param stream The established connection
-void channel_from_stream(uv_stream_t *stream)
+void channel_from_stream(uv_stream_t *uvstream)
{
- Channel *channel = register_channel();
- stream->data = NULL;
- channel->is_job = false;
- // read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel);
- rstream_set_stream(channel->data.streams.read, stream);
- rstream_start(channel->data.streams.read);
+ Channel *channel = register_channel(kChannelTypeSocket);
+ stream_init(NULL, &channel->data.stream, -1, uvstream, channel);
// write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_stream(channel->data.streams.write, stream);
- channel->data.streams.uv = stream;
+ wstream_init(&channel->data.stream, 0);
+ // read stream
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, parse_msgpack);
}
/// Sends event/arguments to channel
@@ -313,28 +311,23 @@ bool channel_close(uint64_t id)
/// Neovim
static void channel_from_stdio(void)
{
- Channel *channel = register_channel();
+ Channel *channel = register_channel(kChannelTypeStdio);
incref(channel); // stdio channels are only closed on exit
- channel->is_job = false;
// read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel);
- rstream_set_file(channel->data.streams.read, 0);
- rstream_start(channel->data.streams.read);
+ rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
+ channel);
+ rstream_start(&channel->data.std.in, parse_msgpack);
// write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_file(channel->data.streams.write, 1);
- channel->data.streams.uv = NULL;
+ wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
-static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof)
+static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
- parse_msgpack(rstream, buf, job_data(job), eof);
+ parse_msgpack(stream, buf, job_data(job), eof);
}
-static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
+static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
@@ -350,7 +343,7 @@ static void job_exit(Job *job, int status, void *data)
decref(data);
}
-static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
+static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
Channel *channel = data;
incref(channel);
@@ -362,9 +355,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
}
size_t count = rbuffer_size(rbuf);
- DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
+ DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
count,
- rstream);
+ stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@@ -516,10 +509,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
return false;
}
- if (channel->is_job) {
- success = job_write(channel->data.job, buffer);
- } else {
- success = wstream_write(channel->data.streams.write, buffer);
+ switch (channel->type) {
+ case kChannelTypeSocket:
+ success = wstream_write(&channel->data.stream, buffer);
+ break;
+ case kChannelTypeJob:
+ success = job_write(channel->data.job, buffer);
+ break;
+ case kChannelTypeStdio:
+ success = wstream_write(&channel->data.std.out, buffer);
+ break;
+ default:
+ abort();
}
if (!success) {
@@ -637,20 +638,23 @@ static void close_channel(Channel *channel)
channel->closed = true;
- if (channel->is_job) {
- if (channel->data.job) {
- job_stop(channel->data.job);
- }
- } else {
- rstream_free(channel->data.streams.read);
- wstream_free(channel->data.streams.write);
- uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv;
- if (handle) {
- uv_close(handle, close_cb);
- } else {
+ switch (channel->type) {
+ case kChannelTypeSocket:
+ stream_close(&channel->data.stream, close_cb);
+ break;
+ case kChannelTypeJob:
+ if (channel->data.job) {
+ job_stop(channel->data.job);
+ }
+ break;
+ case kChannelTypeStdio:
+ stream_close(&channel->data.std.in, NULL);
+ stream_close(&channel->data.std.out, NULL);
loop_push_event(&loop,
(Event) { .handler = on_stdio_close, .data = channel }, false);
- }
+ break;
+ default:
+ abort();
}
decref(channel);
@@ -683,15 +687,15 @@ static void free_channel(Channel *channel)
xfree(channel);
}
-static void close_cb(uv_handle_t *handle)
+static void close_cb(Stream *stream, void *data)
{
- xfree(handle->data);
- xfree(handle);
+ xfree(data);
}
-static Channel *register_channel(void)
+static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
+ rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);