aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r--src/nvim/msgpack_rpc/channel.c234
-rw-r--r--src/nvim/msgpack_rpc/channel.h1
-rw-r--r--src/nvim/msgpack_rpc/defs.h5
-rw-r--r--src/nvim/msgpack_rpc/helpers.c9
-rw-r--r--src/nvim/msgpack_rpc/helpers.h2
-rw-r--r--src/nvim/msgpack_rpc/remote_ui.c23
-rw-r--r--src/nvim/msgpack_rpc/server.c223
7 files changed, 182 insertions, 315 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index df78f822d6..0e3b8200c9 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -9,13 +9,11 @@
#include "nvim/api/vim.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
-#include "nvim/os/event.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
+#include "nvim/event/loop.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/event/socket.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@@ -34,6 +32,12 @@
#define log_server_msg(...)
#endif
+typedef enum {
+ kChannelTypeSocket,
+ kChannelTypeProc,
+ kChannelTypeStdio
+} ChannelType;
+
typedef struct {
uint64_t request_id;
bool returned, errored;
@@ -45,19 +49,26 @@ typedef struct {
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
- bool is_job, closed;
+ bool closed;
+ ChannelType type;
msgpack_unpacker *unpacker;
union {
- Job *job;
+ Stream stream;
+ struct {
+ UvProcess uvproc;
+ Stream in;
+ Stream out;
+ Stream err;
+ } process;
struct {
- RStream *read;
- WStream *write;
- uv_stream_t *uv;
- } streams;
+ Stream in;
+ Stream out;
+ } std;
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
+ Queue *events;
} Channel;
typedef struct {
@@ -104,57 +115,51 @@ void channel_teardown(void)
});
}
-/// Creates an API channel by starting a job and connecting to its
+/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
-uint64_t channel_from_job(char **argv)
-{
- Channel *channel = register_channel();
- channel->is_job = true;
- incref(channel); // job channels are only closed by the exit_cb
-
- int status;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = channel;
- opts.stdout_cb = job_out;
- opts.stderr_cb = job_err;
- opts.exit_cb = job_exit;
- channel->data.job = job_start(opts, &status);
-
- if (status <= 0) {
- if (status == 0) { // Two decrefs needed if status == 0.
- decref(channel); // Only one needed if status < 0,
- } // because exit_cb will do the second one.
+uint64_t channel_from_process(char **argv)
+{
+ Channel *channel = register_channel(kChannelTypeProc);
+ channel->data.process.uvproc = uv_process_init(&loop, channel);
+ Process *proc = &channel->data.process.uvproc.process;
+ proc->argv = argv;
+ proc->in = &channel->data.process.in;
+ proc->out = &channel->data.process.out;
+ proc->err = &channel->data.process.err;
+ proc->cb = process_exit;
+ if (!process_spawn(proc)) {
+ loop_poll_events(&loop, 0);
decref(channel);
return 0;
}
+ incref(channel); // process channels are only closed by the exit_cb
+ wstream_init(proc->in, 0);
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, parse_msgpack);
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, forward_stderr);
+
return channel->id;
}
-/// Creates an API channel from a libuv stream representing a tcp or
-/// pipe/socket client connection
+/// Creates an API channel from a tcp/pipe socket connection
///
-/// @param stream The established connection
-void channel_from_stream(uv_stream_t *stream)
+/// @param watcher The SocketWatcher ready to accept the connection
+void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel();
- stream->data = NULL;
- channel->is_job = false;
- // read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel);
- rstream_set_stream(channel->data.streams.read, stream);
- rstream_start(channel->data.streams.read);
- // write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_stream(channel->data.streams.write, stream);
- channel->data.streams.uv = stream;
+ Channel *channel = register_channel(kChannelTypeSocket);
+ socket_watcher_accept(watcher, &channel->data.stream, channel);
+ incref(channel); // close channel only after the stream is closed
+ channel->data.stream.internal_close_cb = close_cb;
+ channel->data.stream.internal_data = channel;
+ wstream_init(&channel->data.stream, 0);
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, parse_msgpack);
}
/// Sends event/arguments to channel
@@ -220,7 +225,7 @@ Object channel_send_call(uint64_t id,
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
channel->pending_requests++;
- event_poll_until(-1, frame.returned);
+ LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@@ -313,46 +318,34 @@ bool channel_close(uint64_t id)
/// Neovim
static void channel_from_stdio(void)
{
- Channel *channel = register_channel();
+ Channel *channel = register_channel(kChannelTypeStdio);
incref(channel); // stdio channels are only closed on exit
- channel->is_job = false;
// read stream
- channel->data.streams.read = rstream_new(parse_msgpack,
- rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel);
- rstream_set_file(channel->data.streams.read, 0);
- rstream_start(channel->data.streams.read);
+ rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
+ channel);
+ rstream_start(&channel->data.std.in, parse_msgpack);
// write stream
- channel->data.streams.write = wstream_new(0);
- wstream_set_file(channel->data.streams.write, 1);
- channel->data.streams.uv = NULL;
+ wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
-static void job_out(RStream *rstream, void *data, bool eof)
+static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
+ void *data, bool eof)
{
- Job *job = data;
- parse_msgpack(rstream, job_data(job), eof);
-}
-
-static void job_err(RStream *rstream, void *data, bool eof)
-{
- size_t count;
- char buf[256];
-
- while ((count = rstream_pending(rstream))) {
- size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
+ while (rbuffer_size(rbuf)) {
+ char buf[256];
+ size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
- ELOG("Channel %" PRIu64 " stderr: %s",
- ((Channel *)job_data(data))->id, buf);
+ ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
-static void job_exit(Job *job, int status, void *data)
+static void process_exit(Process *proc, int status, void *data)
{
decref(data);
}
-static void parse_msgpack(RStream *rstream, void *data, bool eof)
+static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
+ bool eof)
{
Channel *channel = data;
incref(channel);
@@ -363,14 +356,14 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
goto end;
}
- size_t count = rstream_pending(rstream);
- DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
+ size_t count = rbuffer_size(rbuf);
+ DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
count,
- rstream);
+ stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
- rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
+ rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
msgpack_unpacked unpacked;
@@ -460,31 +453,31 @@ static void handle_request(Channel *channel, msgpack_object *request)
method->via.bin.size);
} else {
handler.fn = msgpack_rpc_handle_missing_method;
- handler.defer = false;
+ handler.async = true;
}
Array args = ARRAY_DICT_INIT;
if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
handler.fn = msgpack_rpc_handle_invalid_arguments;
- handler.defer = false;
+ handler.async = true;
}
- bool defer = (!kv_size(channel->call_stack) && handler.defer);
RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
incref(channel);
- event_push((Event) {
- .handler = on_request_event,
- .data = event_data
- }, defer);
+ if (handler.async) {
+ on_request_event((void **)&event_data);
+ } else {
+ queue_put(channel->events, on_request_event, 1, event_data);
+ }
}
-static void on_request_event(Event event)
+static void on_request_event(void **argv)
{
- RequestEvent *e = event.data;
+ RequestEvent *e = argv[0];
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
@@ -518,10 +511,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
return false;
}
- if (channel->is_job) {
- success = job_write(channel->data.job, buffer);
- } else {
- success = wstream_write(channel->data.streams.write, buffer);
+ switch (channel->type) {
+ case kChannelTypeSocket:
+ success = wstream_write(&channel->data.stream, buffer);
+ break;
+ case kChannelTypeProc:
+ success = wstream_write(&channel->data.process.in, buffer);
+ break;
+ case kChannelTypeStdio:
+ success = wstream_write(&channel->data.std.out, buffer);
+ break;
+ default:
+ abort();
}
if (!success) {
@@ -630,7 +631,7 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/job and free the channel resources.
+/// Close the channel streams/process and free the channel resources.
static void close_channel(Channel *channel)
{
if (channel->closed) {
@@ -639,27 +640,30 @@ static void close_channel(Channel *channel)
channel->closed = true;
- if (channel->is_job) {
- if (channel->data.job) {
- job_stop(channel->data.job);
- }
- } else {
- rstream_free(channel->data.streams.read);
- wstream_free(channel->data.streams.write);
- uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv;
- if (handle) {
- uv_close(handle, close_cb);
- } else {
- event_push((Event) { .handler = on_stdio_close, .data = channel }, false);
- }
+ switch (channel->type) {
+ case kChannelTypeSocket:
+ stream_close(&channel->data.stream, NULL);
+ break;
+ case kChannelTypeProc:
+ if (!channel->data.process.uvproc.process.closed) {
+ process_stop(&channel->data.process.uvproc.process);
+ }
+ break;
+ case kChannelTypeStdio:
+ stream_close(&channel->data.std.in, NULL);
+ stream_close(&channel->data.std.out, NULL);
+ queue_put(loop.fast_events, exit_event, 1, channel);
+ return;
+ default:
+ abort();
}
decref(channel);
}
-static void on_stdio_close(Event e)
+static void exit_event(void **argv)
{
- decref(e.data);
+ decref(argv[0]);
if (!exiting) {
mch_exit(0);
@@ -681,18 +685,20 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
+ queue_free(channel->events);
xfree(channel);
}
-static void close_cb(uv_handle_t *handle)
+static void close_cb(Stream *stream, void *data)
{
- xfree(handle->data);
- xfree(handle);
+ decref(data);
}
-static Channel *register_channel(void)
+static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
+ rv->events = queue_new_child(loop.events);
+ rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
index df742fe368..104547a7b8 100644
--- a/src/nvim/msgpack_rpc/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -5,6 +5,7 @@
#include <uv.h>
#include "nvim/api/private/defs.h"
+#include "nvim/event/socket.h"
#include "nvim/vim.h"
#define METHOD_MAXLEN 512
diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h
index 0492a65290..d97cf28ca1 100644
--- a/src/nvim/msgpack_rpc/defs.h
+++ b/src/nvim/msgpack_rpc/defs.h
@@ -11,9 +11,8 @@ typedef struct {
uint64_t request_id,
Array args,
Error *error);
- bool defer; // Should the call be deferred to the main loop? This should
- // be true if the function mutates editor data structures such
- // as buffers, windows, tabs, or if it executes vimscript code.
+ bool async; // function is always safe to run immediately instead of being
+ // put in a request queue for handling when nvim waits for input.
} MsgpackRpcRequestHandler;
/// Initializes the msgpack-rpc method table
diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c
index acfd3fe94f..473958c765 100644
--- a/src/nvim/msgpack_rpc/helpers.c
+++ b/src/nvim/msgpack_rpc/helpers.c
@@ -94,13 +94,14 @@ bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
FUNC_ATTR_NONNULL_ALL
{
if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) {
+ if (obj->via.bin.ptr == NULL) {
+ return false;
+ }
arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size);
arg->size = obj->via.bin.size;
- } else {
- return false;
+ return true;
}
-
- return true;
+ return false;
}
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h
index bf161d54e0..7d9f114140 100644
--- a/src/nvim/msgpack_rpc/helpers.h
+++ b/src/nvim/msgpack_rpc/helpers.h
@@ -6,7 +6,7 @@
#include <msgpack.h>
-#include "nvim/os/wstream.h"
+#include "nvim/event/wstream.h"
#include "nvim/api/private/defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
diff --git a/src/nvim/msgpack_rpc/remote_ui.c b/src/nvim/msgpack_rpc/remote_ui.c
index 07d78dd9d7..3334b0e6af 100644
--- a/src/nvim/msgpack_rpc/remote_ui.c
+++ b/src/nvim/msgpack_rpc/remote_ui.c
@@ -28,7 +28,7 @@ void remote_ui_init(void)
connected_uis = pmap_new(uint64_t)();
// Add handler for "attach_ui"
String method = cstr_as_string("ui_attach");
- MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .defer = false};
+ MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .async = true};
msgpack_rpc_add_method_handler(method, handler);
method = cstr_as_string("ui_detach");
handler.fn = remote_ui_detach;
@@ -86,8 +86,7 @@ static Object remote_ui_attach(uint64_t channel_id, uint64_t request_id,
ui->busy_stop = remote_ui_busy_stop;
ui->mouse_on = remote_ui_mouse_on;
ui->mouse_off = remote_ui_mouse_off;
- ui->insert_mode = remote_ui_insert_mode;
- ui->normal_mode = remote_ui_normal_mode;
+ ui->mode_change = remote_ui_mode_change;
ui->set_scroll_region = remote_ui_set_scroll_region;
ui->scroll = remote_ui_scroll;
ui->highlight_set = remote_ui_highlight_set;
@@ -214,16 +213,18 @@ static void remote_ui_mouse_off(UI *ui)
push_call(ui, "mouse_off", args);
}
-static void remote_ui_insert_mode(UI *ui)
+static void remote_ui_mode_change(UI *ui, int mode)
{
Array args = ARRAY_DICT_INIT;
- push_call(ui, "insert_mode", args);
-}
-
-static void remote_ui_normal_mode(UI *ui)
-{
- Array args = ARRAY_DICT_INIT;
- push_call(ui, "normal_mode", args);
+ if (mode == INSERT) {
+ ADD(args, STRING_OBJ(cstr_to_string("insert")));
+ } else if (mode == REPLACE) {
+ ADD(args, STRING_OBJ(cstr_to_string("replace")));
+ } else {
+ assert(mode == NORMAL);
+ ADD(args, STRING_OBJ(cstr_to_string("normal")));
+ }
+ push_call(ui, "mode_change", args);
}
static void remote_ui_set_scroll_region(UI *ui, int top, int bot, int left,
diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c
index 388b5a04cf..8dffea35ca 100644
--- a/src/nvim/msgpack_rpc/server.c
+++ b/src/nvim/msgpack_rpc/server.c
@@ -3,11 +3,10 @@
#include <string.h>
#include <stdint.h>
-#include <uv.h>
-
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
#include "nvim/os/os.h"
+#include "nvim/event/socket.h"
#include "nvim/ascii.h"
#include "nvim/eval.h"
#include "nvim/garray.h"
@@ -19,35 +18,9 @@
#include "nvim/strings.h"
#define MAX_CONNECTIONS 32
-#define ADDRESS_MAX_SIZE 256
-#define NVIM_DEFAULT_TCP_PORT 7450
#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS"
-typedef enum {
- kServerTypeTcp,
- kServerTypePipe
-} ServerType;
-
-typedef struct {
- // Pipe/socket path, or TCP address string
- char addr[ADDRESS_MAX_SIZE];
-
- // Type of the union below
- ServerType type;
-
- // TCP server or unix socket (named pipe on Windows)
- union {
- struct {
- uv_tcp_t handle;
- struct sockaddr_in addr;
- } tcp;
- struct {
- uv_pipe_t handle;
- } pipe;
- } socket;
-} Server;
-
-static garray_T servers = GA_EMPTY_INIT_VALUE;
+static garray_T watchers = GA_EMPTY_INIT_VALUE;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/server.c.generated.h"
@@ -56,7 +29,7 @@ static garray_T servers = GA_EMPTY_INIT_VALUE;
/// Initializes the module
bool server_init(void)
{
- ga_init(&servers, sizeof(Server *), 1);
+ ga_init(&watchers, sizeof(SocketWatcher *), 1);
bool must_free = false;
const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
@@ -72,18 +45,10 @@ bool server_init(void)
return ok;
}
-/// Retrieve the file handle from a server.
-static uv_handle_t *server_handle(Server *server)
-{
- return server->type == kServerTypeTcp
- ? (uv_handle_t *)&server->socket.tcp.handle
- : (uv_handle_t *) &server->socket.pipe.handle;
-}
-
/// Teardown a single server
-static void server_close_cb(Server **server)
+static void close_socket_watcher(SocketWatcher **watcher)
{
- uv_close(server_handle(*server), free_server);
+ socket_watcher_close(*watcher, free_server);
}
/// Set v:servername to the first server in the server list, or unset it if no
@@ -91,7 +56,7 @@ static void server_close_cb(Server **server)
static void set_vservername(garray_T *srvs)
{
char *default_server = (srvs->ga_len > 0)
- ? ((Server **)srvs->ga_data)[0]->addr
+ ? ((SocketWatcher **)srvs->ga_data)[0]->addr
: NULL;
set_vim_var_string(VV_SEND_SERVER, (char_u *)default_server, -1);
}
@@ -99,7 +64,7 @@ static void set_vservername(garray_T *srvs)
/// Teardown the server module
void server_teardown(void)
{
- GA_DEEP_CLEAR(&servers, Server *, server_close_cb);
+ GA_DEEP_CLEAR(&watchers, SocketWatcher *, close_socket_watcher);
}
/// Starts listening for API calls on the TCP address or pipe path `endpoint`.
@@ -116,120 +81,38 @@ void server_teardown(void)
int server_start(const char *endpoint)
FUNC_ATTR_NONNULL_ALL
{
- char addr[ADDRESS_MAX_SIZE];
-
- // Trim to `ADDRESS_MAX_SIZE`
- if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) {
- // TODO(aktau): since this is not what the user wanted, perhaps we
- // should return an error here
- WLOG("Address was too long, truncated to %s", addr);
- }
-
- // Check if the server already exists
- for (int i = 0; i < servers.ga_len; i++) {
- if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) {
- ELOG("Already listening on %s", addr);
+ SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher));
+ socket_watcher_init(&loop, watcher, endpoint, NULL);
+
+ // Check if a watcher for the endpoint already exists
+ for (int i = 0; i < watchers.ga_len; i++) {
+ if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) {
+ ELOG("Already listening on %s", watcher->addr);
+ socket_watcher_close(watcher, free_server);
return 1;
}
}
- ServerType server_type = kServerTypeTcp;
- Server *server = xmalloc(sizeof(Server));
- char ip[16], *ip_end = strrchr(addr, ':');
-
- if (!ip_end) {
- ip_end = strchr(addr, NUL);
- }
-
- // (ip_end - addr) is always > 0, so convert to size_t
- size_t addr_len = (size_t)(ip_end - addr);
-
- if (addr_len > sizeof(ip) - 1) {
- // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255)
- addr_len = sizeof(ip) - 1;
- }
-
- // Extract the address part
- xstrlcpy(ip, addr, addr_len + 1);
-
- int port = NVIM_DEFAULT_TCP_PORT;
-
- if (*ip_end == ':') {
- // Extract the port
- long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
- if (lport <= 0 || lport > 0xffff) {
- // Invalid port, treat as named pipe or unix socket
- server_type = kServerTypePipe;
- } else {
- port = (int) lport;
- }
- }
-
- if (server_type == kServerTypeTcp) {
- // Try to parse ip address
- if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) {
- // Invalid address, treat as named pipe or unix socket
- server_type = kServerTypePipe;
- }
- }
-
- int result;
- uv_stream_t *stream = NULL;
-
- xstrlcpy(server->addr, addr, sizeof(server->addr));
-
- if (server_type == kServerTypeTcp) {
- // Listen on tcp address/port
- uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle);
- result = uv_tcp_bind(&server->socket.tcp.handle,
- (const struct sockaddr *)&server->socket.tcp.addr,
- 0);
- stream = (uv_stream_t *)&server->socket.tcp.handle;
- } else {
- // Listen on named pipe or unix socket
- uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
- result = uv_pipe_bind(&server->socket.pipe.handle, server->addr);
- stream = (uv_stream_t *)&server->socket.pipe.handle;
- }
-
- stream->data = server;
-
- if (result == 0) {
- result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
- MAX_CONNECTIONS,
- connection_cb);
- }
-
- assert(result <= 0); // libuv should have returned -errno or zero.
+ int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb);
if (result < 0) {
- if (result == -EACCES) {
- // Libuv converts ENOENT to EACCES for Windows compatibility, but if
- // the parent directory does not exist, ENOENT would be more accurate.
- *path_tail((char_u *) addr) = NUL;
- if (!os_file_exists((char_u *) addr)) {
- result = -ENOENT;
- }
- }
- uv_close((uv_handle_t *)stream, free_server);
ELOG("Failed to start server: %s", uv_strerror(result));
+ socket_watcher_close(watcher, free_server);
return result;
}
// Update $NVIM_LISTEN_ADDRESS, if not set.
const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
if (listen_address == NULL) {
- os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1);
+ os_setenv(LISTEN_ADDRESS_ENV_VAR, watcher->addr, 1);
}
- server->type = server_type;
-
- // Add the server to the list.
- ga_grow(&servers, 1);
- ((Server **)servers.ga_data)[servers.ga_len++] = server;
+ // Add the watcher to the list.
+ ga_grow(&watchers, 1);
+ ((SocketWatcher **)watchers.ga_data)[watchers.ga_len++] = watcher;
// Update v:servername, if not set.
if (STRLEN(get_vim_var_str(VV_SEND_SERVER)) == 0) {
- set_vservername(&servers);
+ set_vservername(&watchers);
}
return 0;
@@ -240,21 +123,21 @@ int server_start(const char *endpoint)
/// @param endpoint Address of the server.
void server_stop(char *endpoint)
{
- Server *server;
+ SocketWatcher *watcher;
char addr[ADDRESS_MAX_SIZE];
// Trim to `ADDRESS_MAX_SIZE`
xstrlcpy(addr, endpoint, sizeof(addr));
int i = 0; // Index of the server whose address equals addr.
- for (; i < servers.ga_len; i++) {
- server = ((Server **)servers.ga_data)[i];
- if (strcmp(addr, server->addr) == 0) {
+ for (; i < watchers.ga_len; i++) {
+ watcher = ((SocketWatcher **)watchers.ga_data)[i];
+ if (strcmp(addr, watcher->addr) == 0) {
break;
}
}
- if (i >= servers.ga_len) {
+ if (i >= watchers.ga_len) {
ELOG("Not listening on %s", addr);
return;
}
@@ -265,18 +148,18 @@ void server_stop(char *endpoint)
os_unsetenv(LISTEN_ADDRESS_ENV_VAR);
}
- uv_close(server_handle(server), free_server);
+ socket_watcher_close(watcher, free_server);
// Remove this server from the list by swapping it with the last item.
- if (i != servers.ga_len - 1) {
- ((Server **)servers.ga_data)[i] =
- ((Server **)servers.ga_data)[servers.ga_len - 1];
+ if (i != watchers.ga_len - 1) {
+ ((SocketWatcher **)watchers.ga_data)[i] =
+ ((SocketWatcher **)watchers.ga_data)[watchers.ga_len - 1];
}
- servers.ga_len--;
+ watchers.ga_len--;
// If v:servername is the stopped address, re-initialize it.
if (STRCMP(addr, get_vim_var_str(VV_SEND_SERVER)) == 0) {
- set_vservername(&servers);
+ set_vservername(&watchers);
}
}
@@ -285,52 +168,28 @@ void server_stop(char *endpoint)
char **server_address_list(size_t *size)
FUNC_ATTR_NONNULL_ALL
{
- if ((*size = (size_t) servers.ga_len) == 0) {
+ if ((*size = (size_t)watchers.ga_len) == 0) {
return NULL;
}
- char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char *));
- for (int i = 0; i < servers.ga_len; i++) {
- addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr);
+ char **addrs = xcalloc((size_t)watchers.ga_len, sizeof(const char *));
+ for (int i = 0; i < watchers.ga_len; i++) {
+ addrs[i] = xstrdup(((SocketWatcher **)watchers.ga_data)[i]->addr);
}
return addrs;
}
-static void connection_cb(uv_stream_t *server, int status)
+static void connection_cb(SocketWatcher *watcher, int result, void *data)
{
- int result;
- uv_stream_t *client;
- Server *srv = server->data;
-
- if (status < 0) {
- abort();
- }
-
- if (srv->type == kServerTypeTcp) {
- client = xmalloc(sizeof(uv_tcp_t));
- uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client);
- } else {
- client = xmalloc(sizeof(uv_pipe_t));
- uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0);
- }
-
- result = uv_accept(server, client);
-
if (result) {
ELOG("Failed to accept connection: %s", uv_strerror(result));
- uv_close((uv_handle_t *)client, free_client);
return;
}
- channel_from_stream(client);
-}
-
-static void free_client(uv_handle_t *handle)
-{
- xfree(handle);
+ channel_from_connection(watcher);
}
-static void free_server(uv_handle_t *handle)
+static void free_server(SocketWatcher *watcher, void *data)
{
- xfree(handle->data);
+ xfree(watcher);
}