diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 124 |
1 files changed, 64 insertions, 60 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 3932fa1f36..05badc72d4 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -10,10 +10,8 @@ #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/remote_ui.h" #include "nvim/event/loop.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" #include "nvim/os/job.h" #include "nvim/os/job_defs.h" #include "nvim/msgpack_rpc/helpers.h" @@ -34,6 +32,12 @@ #define log_server_msg(...) #endif +typedef enum { + kChannelTypeSocket, + kChannelTypeJob, + kChannelTypeStdio +} ChannelType; + typedef struct { uint64_t request_id; bool returned, errored; @@ -45,15 +49,16 @@ typedef struct { size_t refcount; size_t pending_requests; PMap(cstr_t) *subscribed_events; - bool is_job, closed; + bool closed; + ChannelType type; msgpack_unpacker *unpacker; union { Job *job; + Stream stream; struct { - RStream *read; - WStream *write; - uv_stream_t *uv; - } streams; + Stream in; + Stream out; + } std; } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; @@ -112,8 +117,7 @@ void channel_teardown(void) /// 0, on error. uint64_t channel_from_job(char **argv) { - Channel *channel = register_channel(); - channel->is_job = true; + Channel *channel = register_channel(kChannelTypeJob); incref(channel); // job channels are only closed by the exit_cb int status; @@ -140,21 +144,15 @@ uint64_t channel_from_job(char **argv) /// pipe/socket client connection /// /// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) +void channel_from_stream(uv_stream_t *uvstream) { - Channel *channel = register_channel(); - stream->data = NULL; - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel); - rstream_set_stream(channel->data.streams.read, stream); - rstream_start(channel->data.streams.read); + Channel *channel = register_channel(kChannelTypeSocket); + stream_init(NULL, &channel->data.stream, -1, uvstream, channel); // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_stream(channel->data.streams.write, stream); - channel->data.streams.uv = stream; + wstream_init(&channel->data.stream, 0); + // read stream + rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.stream, parse_msgpack); } /// Sends event/arguments to channel @@ -313,28 +311,23 @@ bool channel_close(uint64_t id) /// Neovim static void channel_from_stdio(void) { - Channel *channel = register_channel(); + Channel *channel = register_channel(kChannelTypeStdio); incref(channel); // stdio channels are only closed on exit - channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel); - rstream_set_file(channel->data.streams.read, 0); - rstream_start(channel->data.streams.read); + rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, + channel); + rstream_start(&channel->data.std.in, parse_msgpack); // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_file(channel->data.streams.write, 1); - channel->data.streams.uv = NULL; + wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof) +static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof) { Job *job = data; - parse_msgpack(rstream, buf, job_data(job), eof); + parse_msgpack(stream, buf, job_data(job), eof); } -static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; @@ -350,7 +343,7 @@ static void job_exit(Job *job, int status, void *data) decref(data); } -static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) { Channel *channel = data; incref(channel); @@ -362,9 +355,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) } size_t count = rbuffer_size(rbuf); - DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)", count, - rstream); + stream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -516,10 +509,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer) return false; } - if (channel->is_job) { - success = job_write(channel->data.job, buffer); - } else { - success = wstream_write(channel->data.streams.write, buffer); + switch (channel->type) { + case kChannelTypeSocket: + success = wstream_write(&channel->data.stream, buffer); + break; + case kChannelTypeJob: + success = job_write(channel->data.job, buffer); + break; + case kChannelTypeStdio: + success = wstream_write(&channel->data.std.out, buffer); + break; + default: + abort(); } if (!success) { @@ -637,20 +638,23 @@ static void close_channel(Channel *channel) channel->closed = true; - if (channel->is_job) { - if (channel->data.job) { - job_stop(channel->data.job); - } - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; - if (handle) { - uv_close(handle, close_cb); - } else { + switch (channel->type) { + case kChannelTypeSocket: + stream_close(&channel->data.stream, close_cb); + break; + case kChannelTypeJob: + if (channel->data.job) { + job_stop(channel->data.job); + } + break; + case kChannelTypeStdio: + stream_close(&channel->data.std.in, NULL); + stream_close(&channel->data.std.out, NULL); loop_push_event(&loop, (Event) { .handler = on_stdio_close, .data = channel }, false); - } + break; + default: + abort(); } decref(channel); @@ -683,15 +687,15 @@ static void free_channel(Channel *channel) xfree(channel); } -static void close_cb(uv_handle_t *handle) +static void close_cb(Stream *stream, void *data) { - xfree(handle->data); - xfree(handle); + xfree(data); } -static Channel *register_channel(void) +static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->type = type; rv->refcount = 1; rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); |