aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r--src/nvim/msgpack_rpc/channel.c195
1 files changed, 98 insertions, 97 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 2a81b4f160..861614f147 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -9,13 +9,11 @@
#include "nvim/api/vim.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
-#include "nvim/os/event.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
+#include "nvim/event/loop.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/socket.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@@ -34,6 +32,12 @@
#define log_server_msg(...)
#endif
+typedef enum {
+ kChannelTypeSocket,
+ kChannelTypeProc,
+ kChannelTypeStdio
+} ChannelType;
+
typedef struct {
uint64_t request_id;
bool returned, errored;
@@ -45,15 +49,21 @@ typedef struct {
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
- bool is_job, closed;
+ bool closed;
+ ChannelType type;
msgpack_unpacker *unpacker;
union {
- Job *job;
+ Stream stream;
+ struct {
+ UvProcess uvproc;
+ Stream in;
+ Stream out;
+ Stream err;
+ } process;
struct {
- RStream *read;
- WStream *write;
- uv_stream_t *uv;
- } streams;
+ Stream in;
+ Stream out;
+ } std;
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
@@ -104,57 +114,48 @@ void channel_teardown(void)
});
}
-/// Creates an API channel by starting a job and connecting to its
+/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_job(char **argv)
-{
- Channel *channel = register_channel();
- channel->is_job = true;
- incref(channel); // job channels are only closed by the exit_cb
-
- int status;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = channel;
- opts.stdout_cb = job_out;
- opts.stderr_cb = job_err;
- opts.exit_cb = job_exit;
- channel->data.job = job_start(opts, &status);
-
- if (status <= 0) {
- if (status == 0) { // Two decrefs needed if status == 0.
- decref(channel); // Only one needed if status < 0,
- } // because exit_cb will do the second one.
+uint64_t channel_from_process(char **argv)
+{
+ Channel *channel = register_channel(kChannelTypeProc);
+ channel->data.process.uvproc = uv_process_init(channel);
+ Process *proc = &channel->data.process.uvproc.process;
+ proc->argv = argv;
+ proc->in = &channel->data.process.in;
+ proc->out = &channel->data.process.out;
+ proc->err = &channel->data.process.err;
+ proc->cb = process_exit;
+ if (!process_spawn(&loop, proc)) {
+ loop_poll_events(&loop, 0);
decref(channel);
return 0;
}
+ incref(channel); // process channels are only closed by the exit_cb
+ wstream_init(proc->in, 0);
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, parse_msgpack);
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, forward_stderr);
+
return channel->id;
}
-/// Creates an API channel from a libuv stream representing a tcp or
-/// pipe/socket client connection
+/// Creates an API channel from a tcp/pipe socket connection
///
-/// @param stream The established connection
-void channel_from_stream(uv_stream_t *stream)
+/// @param watcher The SocketWatcher ready to accept the connection
+void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel();
- stream->data = NULL;
- channel->is_job = false;
- // read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel);
- rstream_set_stream(channel->data.streams.read, stream);
- rstream_start(channel->data.streams.read);
- // write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_stream(channel->data.streams.write, stream);
- channel->data.streams.uv = stream;
+ Channel *channel = register_channel(kChannelTypeSocket);
+ socket_watcher_accept(watcher, &channel->data.stream, channel);
+ wstream_init(&channel->data.stream, 0);
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, parse_msgpack);
}
/// Sends event/arguments to channel
@@ -220,7 +221,7 @@ Object channel_send_call(uint64_t id,
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
channel->pending_requests++;
- event_poll_until(-1, frame.returned);
+ LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@@ -313,44 +314,32 @@ bool channel_close(uint64_t id)
/// Neovim
static void channel_from_stdio(void)
{
- Channel *channel = register_channel();
+ Channel *channel = register_channel(kChannelTypeStdio);
incref(channel); // stdio channels are only closed on exit
- channel->is_job = false;
// read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel);
- rstream_set_file(channel->data.streams.read, 0);
- rstream_start(channel->data.streams.read);
+ rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
+ channel);
+ rstream_start(&channel->data.std.in, parse_msgpack);
// write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_file(channel->data.streams.write, 1);
- channel->data.streams.uv = NULL;
-}
-
-static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof)
-{
- Job *job = data;
- parse_msgpack(rstream, buf, job_data(job), eof);
+ wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
-static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
+static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s",
- ((Channel *)job_data(data))->id, buf);
+ ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
-static void job_exit(Job *job, int status, void *data)
+static void process_exit(Process *proc, int status, void *data)
{
decref(data);
}
-static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
+static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
Channel *channel = data;
incref(channel);
@@ -362,9 +351,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
}
size_t count = rbuffer_size(rbuf);
- DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
+ DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
count,
- rstream);
+ stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@@ -474,7 +463,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
event_data->args = args;
event_data->request_id = request_id;
incref(channel);
- event_push((Event) {
+ loop_push_event(&loop, (Event) {
.handler = on_request_event,
.data = event_data
}, defer);
@@ -516,10 +505,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
return false;
}
- if (channel->is_job) {
- success = job_write(channel->data.job, buffer);
- } else {
- success = wstream_write(channel->data.streams.write, buffer);
+ switch (channel->type) {
+ case kChannelTypeSocket:
+ success = wstream_write(&channel->data.stream, buffer);
+ break;
+ case kChannelTypeProc:
+ success = wstream_write(&channel->data.process.in, buffer);
+ break;
+ case kChannelTypeStdio:
+ success = wstream_write(&channel->data.std.out, buffer);
+ break;
+ default:
+ abort();
}
if (!success) {
@@ -628,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/job and free the channel resources.
+/// Close the channel streams/process and free the channel resources.
static void close_channel(Channel *channel)
{
if (channel->closed) {
@@ -637,19 +634,23 @@ static void close_channel(Channel *channel)
channel->closed = true;
- if (channel->is_job) {
- if (channel->data.job) {
- job_stop(channel->data.job);
- }
- } else {
- rstream_free(channel->data.streams.read);
- wstream_free(channel->data.streams.write);
- uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv;
- if (handle) {
- uv_close(handle, close_cb);
- } else {
- event_push((Event) { .handler = on_stdio_close, .data = channel }, false);
- }
+ switch (channel->type) {
+ case kChannelTypeSocket:
+ stream_close(&channel->data.stream, close_cb);
+ break;
+ case kChannelTypeProc:
+ if (!channel->data.process.uvproc.process.closed) {
+ process_stop(&channel->data.process.uvproc.process);
+ }
+ break;
+ case kChannelTypeStdio:
+ stream_close(&channel->data.std.in, NULL);
+ stream_close(&channel->data.std.out, NULL);
+ loop_push_event(&loop,
+ (Event) { .handler = on_stdio_close, .data = channel }, false);
+ break;
+ default:
+ abort();
}
decref(channel);
@@ -682,15 +683,15 @@ static void free_channel(Channel *channel)
xfree(channel);
}
-static void close_cb(uv_handle_t *handle)
+static void close_cb(Stream *stream, void *data)
{
- xfree(handle->data);
- xfree(handle);
+ xfree(data);
}
-static Channel *register_channel(void)
+static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
+ rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);