aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nvim/eval.c48
-rw-r--r--src/nvim/eval.lua1
-rw-r--r--src/nvim/event/socket.c74
-rw-r--r--src/nvim/msgpack_rpc/channel.c95
-rw-r--r--src/nvim/msgpack_rpc/server.c12
-rw-r--r--src/nvim/path.c2
6 files changed, 217 insertions, 15 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 7bfd638e86..4e303414a3 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -15058,6 +15058,54 @@ static void f_simplify(typval_T *argvars, typval_T *rettv, FunPtr fptr)
rettv->v_type = VAR_STRING;
}
+/// "sockconnect()" function
+static void f_sockconnect(typval_T *argvars, typval_T *rettv, FunPtr fptr)
+{
+ if (argvars[0].v_type != VAR_STRING || argvars[1].v_type != VAR_STRING) {
+ EMSG(_(e_invarg));
+ return;
+ }
+ if (argvars[2].v_type != VAR_DICT && argvars[2].v_type != VAR_UNKNOWN) {
+ // Wrong argument types
+ EMSG2(_(e_invarg2), "expected dictionary");
+ return;
+ }
+
+ const char *mode = tv_get_string(&argvars[0]);
+ const char *address = tv_get_string(&argvars[1]);
+
+ bool tcp;
+ if (strcmp(mode, "tcp") == 0) {
+ tcp = true;
+ } else if (strcmp(mode, "pipe") == 0) {
+ tcp = false;
+ } else {
+ EMSG2(_(e_invarg2), "invalid mode");
+ return;
+ }
+
+ bool rpc = false;
+ if (argvars[2].v_type == VAR_DICT) {
+ dict_T *opts = argvars[2].vval.v_dict;
+ rpc = tv_dict_get_number(opts, "rpc") != 0;
+ }
+
+ if (!rpc) {
+ EMSG2(_(e_invarg2), "rpc option must be true");
+ return;
+ }
+
+ const char *error = NULL;
+ uint64_t id = channel_connect(tcp, address, 50, &error);
+
+ if (error) {
+ EMSG2(_("connection failed: %s"), error);
+ }
+
+ rettv->vval.v_number = (varnumber_T)id;
+ rettv->v_type = VAR_NUMBER;
+}
+
/// struct used in the array that's given to qsort()
typedef struct {
listitem_T *item;
diff --git a/src/nvim/eval.lua b/src/nvim/eval.lua
index 533403b2b0..334e10eb6c 100644
--- a/src/nvim/eval.lua
+++ b/src/nvim/eval.lua
@@ -268,6 +268,7 @@ return {
simplify={args=1},
sin={args=1, func="float_op_wrapper", data="&sin"},
sinh={args=1, func="float_op_wrapper", data="&sinh"},
+ sockconnect={args={2,3}},
sort={args={1, 3}},
soundfold={args=1},
spellbadword={args={0, 1}},
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index bc5a4ec75e..30a71a5586 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -15,6 +15,7 @@
#include "nvim/vim.h"
#include "nvim/strings.h"
#include "nvim/path.h"
+#include "nvim/main.h"
#include "nvim/memory.h"
#include "nvim/macros.h"
#include "nvim/charset.h"
@@ -189,3 +190,76 @@ static void close_cb(uv_handle_t *handle)
watcher->close_cb(watcher, watcher->data);
}
}
+
+static void connect_cb(uv_connect_t *req, int status)
+{
+ int *ret_status = req->data;
+ *ret_status = status;
+ if (status != 0) {
+ uv_close((uv_handle_t *)req->handle, NULL);
+ }
+}
+
+bool socket_connect(Loop *loop, Stream *stream,
+ bool is_tcp, const char *address,
+ int timeout, const char **error)
+{
+ bool success = false;
+ int status;
+ uv_connect_t req;
+ req.data = &status;
+ uv_stream_t *uv_stream;
+
+ uv_tcp_t *tcp = &stream->uv.tcp;
+ uv_getaddrinfo_t addr_req;
+ addr_req.addrinfo = NULL;
+ const struct addrinfo *addrinfo = NULL;
+ char *addr = NULL;
+ if (is_tcp) {
+ addr = xstrdup(address);
+ char *host_end = strrchr(addr, ':');
+ if (!host_end) {
+ *error = _("tcp address must be host:port");
+ goto cleanup;
+ }
+ *host_end = NUL;
+
+ const struct addrinfo hints = { .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_STREAM,
+ .ai_flags = AI_NUMERICSERV };
+ int retval = uv_getaddrinfo(&loop->uv, &addr_req, NULL,
+ addr, host_end+1, &hints);
+ if (retval != 0) {
+ *error = _("failed to lookup host or port");
+ goto cleanup;
+ }
+ addrinfo = addr_req.addrinfo;
+
+tcp_retry:
+ uv_tcp_init(&loop->uv, tcp);
+ uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb);
+ uv_stream = (uv_stream_t *)tcp;
+
+ } else {
+ uv_pipe_t *pipe = &stream->uv.pipe;
+ uv_pipe_init(&loop->uv, pipe, 0);
+ uv_pipe_connect(&req, pipe, address, connect_cb);
+ uv_stream = (uv_stream_t *)pipe;
+ }
+ status = 1;
+ LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1);
+ if (status == 0) {
+ stream_init(NULL, stream, -1, uv_stream);
+ success = true;
+ } else if (is_tcp && addrinfo->ai_next) {
+ addrinfo = addrinfo->ai_next;
+ goto tcp_retry;
+ } else {
+ *error = _("connection refused");
+ }
+
+cleanup:
+ xfree(addr);
+ uv_freeaddrinfo(addr_req.addrinfo);
+ return success;
+}
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index cd64e14976..e8ee0ede75 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -12,6 +12,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@@ -28,6 +29,7 @@
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
+#include "nvim/path.h"
#include "nvim/lib/kvec.h"
#include "nvim/os/input.h"
@@ -41,7 +43,8 @@
typedef enum {
kChannelTypeSocket,
kChannelTypeProc,
- kChannelTypeStdio
+ kChannelTypeStdio,
+ kChannelTypeInternal
} ChannelType;
typedef struct {
@@ -125,7 +128,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id)
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack, channel);
+ rstream_start(proc->out, receive_msgpack, channel);
return channel->id;
}
@@ -142,7 +145,36 @@ void channel_from_connection(SocketWatcher *watcher)
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack, channel);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+}
+
+uint64_t channel_connect(bool tcp, const char *address,
+ int timeout, const char **error)
+{
+ if (!tcp) {
+ char *path = fix_fname(address);
+ if (server_owns_pipe_address(path)) {
+ // avoid deadlock
+ xfree(path);
+ return channel_create_internal();
+ }
+ xfree(path);
+ }
+
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ if (!socket_connect(&main_loop, &channel->data.stream,
+ tcp, address, timeout, error)) {
+ decref(channel);
+ return 0;
+ }
+
+ incref(channel); // close channel only after the stream is closed
+ channel->data.stream.internal_close_cb = close_cb;
+ channel->data.stream.internal_data = channel;
+ wstream_init(&channel->data.stream, 0);
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+ return channel->id;
}
/// Sends event/arguments to channel
@@ -305,11 +337,20 @@ void channel_from_stdio(void)
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.std.in, parse_msgpack, channel);
+ rstream_start(&channel->data.std.in, receive_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
+/// Creates a loopback channel. This is used to avoid deadlock
+/// when an instance connects to its own named pipe.
+uint64_t channel_create_internal(void)
+{
+ Channel *channel = register_channel(kChannelTypeInternal, 0, NULL);
+ incref(channel); // internal channel lives until process exit
+ return channel->id;
+}
+
void channel_process_exit(uint64_t id, int status)
{
Channel *channel = pmap_get(uint64_t)(channels, id);
@@ -318,8 +359,8 @@ void channel_process_exit(uint64_t id, int status)
decref(channel);
}
-static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
- bool eof)
+static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
+ void *data, bool eof)
{
Channel *channel = data;
incref(channel);
@@ -341,6 +382,14 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+ parse_msgpack(channel);
+
+end:
+ decref(channel);
+}
+
+static void parse_msgpack(Channel *channel)
+{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
msgpack_unpack_return result;
@@ -364,7 +413,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -388,11 +437,9 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- decref(channel);
}
+
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
@@ -502,8 +549,11 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
break;
- default:
- abort();
+ case kChannelTypeInternal:
+ incref(channel);
+ CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
+ success = true;
+ break;
}
if (!success) {
@@ -520,6 +570,22 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
return success;
}
+static void internal_read_event(void **argv)
+{
+ Channel *channel = argv[0];
+ WBuffer *buffer = argv[1];
+
+ msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
+ memcpy(msgpack_unpacker_buffer(channel->unpacker),
+ buffer->data, buffer->size);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
+
+ parse_msgpack(channel);
+
+ decref(channel);
+ wstream_release_wbuffer(buffer);
+}
+
static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
@@ -636,8 +702,9 @@ static void close_channel(Channel *channel)
stream_close(&channel->data.std.out, NULL, NULL);
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
- default:
- abort();
+ case kChannelTypeInternal:
+ // nothing to free.
+ break;
}
decref(channel);
diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c
index bae5a32850..c9edd05dc2 100644
--- a/src/nvim/msgpack_rpc/server.c
+++ b/src/nvim/msgpack_rpc/server.c
@@ -97,6 +97,18 @@ char *server_address_new(void)
#endif
}
+/// Check if this instance owns a pipe address.
+/// The argument must already be resolved to an absolute path!
+bool server_owns_pipe_address(const char *path)
+{
+ for (int i = 0; i < watchers.ga_len; i++) {
+ if (!strcmp(path, ((SocketWatcher **)watchers.ga_data)[i]->addr)) {
+ return true;
+ }
+ }
+ return false;
+}
+
/// Starts listening for API calls.
///
/// The socket type is determined by parsing `endpoint`: If it's a valid IPv4
diff --git a/src/nvim/path.c b/src/nvim/path.c
index 9162b6da4d..f2339c8046 100644
--- a/src/nvim/path.c
+++ b/src/nvim/path.c
@@ -1715,7 +1715,7 @@ int vim_FullName(const char *fname, char *buf, size_t len, bool force)
///
/// @param fname is the filename to expand
/// @return [allocated] Full path (NULL for failure).
-char *fix_fname(char *fname)
+char *fix_fname(const char *fname)
{
#ifdef UNIX
return FullName_save(fname, true);