aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2015-07-16 23:10:22 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2015-07-17 00:19:55 -0300
commit9d8d2b7fa83fd69d1d616728c505a41acf8fedbb (patch)
treea15cd8f07ce4aad933781c1fe80fea5f79a46a97
parentac2bd0256183fe4255e5fcccf37f860f037d43a6 (diff)
downloadrneovim-9d8d2b7fa83fd69d1d616728c505a41acf8fedbb.tar.gz
rneovim-9d8d2b7fa83fd69d1d616728c505a41acf8fedbb.tar.bz2
rneovim-9d8d2b7fa83fd69d1d616728c505a41acf8fedbb.zip
server: Extract most logic into the new socket abstraction
- Move event loop code into event/socket - Reimplement server.c on top of the new SocketWatcher class - Adapt msgpack_rpc/channel.c
-rw-r--r--src/nvim/event/socket.c157
-rw-r--r--src/nvim/event/socket.h38
-rw-r--r--src/nvim/msgpack_rpc/channel.c12
-rw-r--r--src/nvim/msgpack_rpc/channel.h1
-rw-r--r--src/nvim/msgpack_rpc/server.c223
5 files changed, 242 insertions, 189 deletions
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
new file mode 100644
index 0000000000..bdc632abf0
--- /dev/null
+++ b/src/nvim/event/socket.c
@@ -0,0 +1,157 @@
+#include <assert.h>
+#include <stdint.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/socket.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+#include "nvim/os/os.h"
+#include "nvim/ascii.h"
+#include "nvim/vim.h"
+#include "nvim/strings.h"
+#include "nvim/path.h"
+#include "nvim/memory.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/socket.c.generated.h"
+#endif
+
+#define NVIM_DEFAULT_TCP_PORT 7450
+
+void socket_watcher_init(Loop *loop, SocketWatcher *watcher,
+ const char *endpoint, void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3)
+{
+ // Trim to `ADDRESS_MAX_SIZE`
+ if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr))
+ >= sizeof(watcher->addr)) {
+ // TODO(aktau): since this is not what the user wanted, perhaps we
+ // should return an error here
+ WLOG("Address was too long, truncated to %s", watcher->addr);
+ }
+
+ bool tcp = true;
+ char ip[16], *ip_end = xstrchrnul(watcher->addr, ':');
+
+ // (ip_end - addr) is always > 0, so convert to size_t
+ size_t addr_len = (size_t)(ip_end - watcher->addr);
+
+ if (addr_len > sizeof(ip) - 1) {
+ // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255)
+ addr_len = sizeof(ip) - 1;
+ }
+
+ // Extract the address part
+ xstrlcpy(ip, watcher->addr, addr_len + 1);
+ int port = NVIM_DEFAULT_TCP_PORT;
+
+ if (*ip_end == ':') {
+ // Extract the port
+ long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
+ if (lport <= 0 || lport > 0xffff) {
+ // Invalid port, treat as named pipe or unix socket
+ tcp = false;
+ } else {
+ port = (int) lport;
+ }
+ }
+
+ if (tcp) {
+ // Try to parse ip address
+ if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) {
+ // Invalid address, treat as named pipe or unix socket
+ tcp = false;
+ }
+ }
+
+ if (tcp) {
+ uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle);
+ watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle;
+ } else {
+ uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0);
+ watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle;
+ }
+
+ watcher->stream->data = watcher;
+ watcher->cb = NULL;
+ watcher->close_cb = NULL;
+}
+
+int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
+ FUNC_ATTR_NONNULL_ALL
+{
+ watcher->cb = cb;
+ int result;
+
+ if (watcher->stream->type == UV_TCP) {
+ result = uv_tcp_bind(&watcher->uv.tcp.handle,
+ (const struct sockaddr *)&watcher->uv.tcp.addr, 0);
+ } else {
+ result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr);
+ }
+
+ if (result == 0) {
+ result = uv_listen(watcher->stream, backlog, connection_cb);
+ }
+
+ assert(result <= 0); // libuv should have returned -errno or zero.
+ if (result < 0) {
+ if (result == -EACCES) {
+ // Libuv converts ENOENT to EACCES for Windows compatibility, but if
+ // the parent directory does not exist, ENOENT would be more accurate.
+ *path_tail((char_u *)watcher->addr) = NUL;
+ if (!os_file_exists((char_u *)watcher->addr)) {
+ result = -ENOENT;
+ }
+ }
+ return result;
+ }
+
+ return 0;
+}
+
+int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
+{
+ uv_stream_t *client;
+
+ if (watcher->stream->type == UV_TCP) {
+ client = (uv_stream_t *)&stream->uv.tcp;
+ uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client);
+ } else {
+ client = (uv_stream_t *)&stream->uv.pipe;
+ uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0);
+ }
+
+ int result = uv_accept(watcher->stream, client);
+
+ if (result) {
+ uv_close((uv_handle_t *)client, NULL);
+ return result;
+ }
+
+ stream_init(NULL, stream, -1, client, data);
+ return 0;
+}
+
+void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb)
+ FUNC_ATTR_NONNULL_ARG(1)
+{
+ watcher->close_cb = cb;
+ uv_close((uv_handle_t *)watcher->stream, close_cb);
+}
+
+static void connection_cb(uv_stream_t *handle, int status)
+{
+ SocketWatcher *watcher = handle->data;
+ watcher->cb(watcher, status, watcher->data);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ SocketWatcher *watcher = handle->data;
+ if (watcher->close_cb) {
+ watcher->close_cb(watcher, watcher->data);
+ }
+}
diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h
new file mode 100644
index 0000000000..17fd39f33b
--- /dev/null
+++ b/src/nvim/event/socket.h
@@ -0,0 +1,38 @@
+#ifndef NVIM_EVENT_SOCKET_H
+#define NVIM_EVENT_SOCKET_H
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
+#include "nvim/event/wstream.h"
+
+#define ADDRESS_MAX_SIZE 256
+
+typedef struct socket_watcher SocketWatcher;
+typedef void (*socket_cb)(SocketWatcher *watcher, int result, void *data);
+typedef void (*socket_close_cb)(SocketWatcher *watcher, void *data);
+
+struct socket_watcher {
+ // Pipe/socket path, or TCP address string
+ char addr[ADDRESS_MAX_SIZE];
+ // TCP server or unix socket (named pipe on Windows)
+ union {
+ struct {
+ uv_tcp_t handle;
+ struct sockaddr_in addr;
+ } tcp;
+ struct {
+ uv_pipe_t handle;
+ } pipe;
+ } uv;
+ uv_stream_t *stream;
+ void *data;
+ socket_cb cb;
+ socket_close_cb close_cb;
+};
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/socket.h.generated.h"
+#endif
+#endif // NVIM_EVENT_SOCKET_H
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 05badc72d4..577965e5ba 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -12,6 +12,7 @@
#include "nvim/event/loop.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
+#include "nvim/event/socket.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/msgpack_rpc/helpers.h"
@@ -140,17 +141,14 @@ uint64_t channel_from_job(char **argv)
return channel->id;
}
-/// Creates an API channel from a libuv stream representing a tcp or
-/// pipe/socket client connection
+/// Creates an API channel from a tcp/pipe socket connection
///
-/// @param stream The established connection
-void channel_from_stream(uv_stream_t *uvstream)
+/// @param watcher The SocketWatcher ready to accept the connection
+void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = register_channel(kChannelTypeSocket);
- stream_init(NULL, &channel->data.stream, -1, uvstream, channel);
- // write stream
+ socket_watcher_accept(watcher, &channel->data.stream, channel);
wstream_init(&channel->data.stream, 0);
- // read stream
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, parse_msgpack);
}
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
index df742fe368..104547a7b8 100644
--- a/src/nvim/msgpack_rpc/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -5,6 +5,7 @@
#include <uv.h>
#include "nvim/api/private/defs.h"
+#include "nvim/event/socket.h"
#include "nvim/vim.h"
#define METHOD_MAXLEN 512
diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c
index 1a78b3498f..8dffea35ca 100644
--- a/src/nvim/msgpack_rpc/server.c
+++ b/src/nvim/msgpack_rpc/server.c
@@ -3,11 +3,10 @@
#include <string.h>
#include <stdint.h>
-#include <uv.h>
-
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
#include "nvim/os/os.h"
+#include "nvim/event/socket.h"
#include "nvim/ascii.h"
#include "nvim/eval.h"
#include "nvim/garray.h"
@@ -19,35 +18,9 @@
#include "nvim/strings.h"
#define MAX_CONNECTIONS 32
-#define ADDRESS_MAX_SIZE 256
-#define NVIM_DEFAULT_TCP_PORT 7450
#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS"
-typedef enum {
- kServerTypeTcp,
- kServerTypePipe
-} ServerType;
-
-typedef struct {
- // Pipe/socket path, or TCP address string
- char addr[ADDRESS_MAX_SIZE];
-
- // Type of the union below
- ServerType type;
-
- // TCP server or unix socket (named pipe on Windows)
- union {
- struct {
- uv_tcp_t handle;
- struct sockaddr_in addr;
- } tcp;
- struct {
- uv_pipe_t handle;
- } pipe;
- } socket;
-} Server;
-
-static garray_T servers = GA_EMPTY_INIT_VALUE;
+static garray_T watchers = GA_EMPTY_INIT_VALUE;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/server.c.generated.h"
@@ -56,7 +29,7 @@ static garray_T servers = GA_EMPTY_INIT_VALUE;
/// Initializes the module
bool server_init(void)
{
- ga_init(&servers, sizeof(Server *), 1);
+ ga_init(&watchers, sizeof(SocketWatcher *), 1);
bool must_free = false;
const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
@@ -72,18 +45,10 @@ bool server_init(void)
return ok;
}
-/// Retrieve the file handle from a server.
-static uv_handle_t *server_handle(Server *server)
-{
- return server->type == kServerTypeTcp
- ? (uv_handle_t *)&server->socket.tcp.handle
- : (uv_handle_t *) &server->socket.pipe.handle;
-}
-
/// Teardown a single server
-static void server_close_cb(Server **server)
+static void close_socket_watcher(SocketWatcher **watcher)
{
- uv_close(server_handle(*server), free_server);
+ socket_watcher_close(*watcher, free_server);
}
/// Set v:servername to the first server in the server list, or unset it if no
@@ -91,7 +56,7 @@ static void server_close_cb(Server **server)
static void set_vservername(garray_T *srvs)
{
char *default_server = (srvs->ga_len > 0)
- ? ((Server **)srvs->ga_data)[0]->addr
+ ? ((SocketWatcher **)srvs->ga_data)[0]->addr
: NULL;
set_vim_var_string(VV_SEND_SERVER, (char_u *)default_server, -1);
}
@@ -99,7 +64,7 @@ static void set_vservername(garray_T *srvs)
/// Teardown the server module
void server_teardown(void)
{
- GA_DEEP_CLEAR(&servers, Server *, server_close_cb);
+ GA_DEEP_CLEAR(&watchers, SocketWatcher *, close_socket_watcher);
}
/// Starts listening for API calls on the TCP address or pipe path `endpoint`.
@@ -116,120 +81,38 @@ void server_teardown(void)
int server_start(const char *endpoint)
FUNC_ATTR_NONNULL_ALL
{
- char addr[ADDRESS_MAX_SIZE];
-
- // Trim to `ADDRESS_MAX_SIZE`
- if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) {
- // TODO(aktau): since this is not what the user wanted, perhaps we
- // should return an error here
- WLOG("Address was too long, truncated to %s", addr);
- }
-
- // Check if the server already exists
- for (int i = 0; i < servers.ga_len; i++) {
- if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) {
- ELOG("Already listening on %s", addr);
+ SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher));
+ socket_watcher_init(&loop, watcher, endpoint, NULL);
+
+ // Check if a watcher for the endpoint already exists
+ for (int i = 0; i < watchers.ga_len; i++) {
+ if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) {
+ ELOG("Already listening on %s", watcher->addr);
+ socket_watcher_close(watcher, free_server);
return 1;
}
}
- ServerType server_type = kServerTypeTcp;
- Server *server = xmalloc(sizeof(Server));
- char ip[16], *ip_end = strrchr(addr, ':');
-
- if (!ip_end) {
- ip_end = strchr(addr, NUL);
- }
-
- // (ip_end - addr) is always > 0, so convert to size_t
- size_t addr_len = (size_t)(ip_end - addr);
-
- if (addr_len > sizeof(ip) - 1) {
- // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255)
- addr_len = sizeof(ip) - 1;
- }
-
- // Extract the address part
- xstrlcpy(ip, addr, addr_len + 1);
-
- int port = NVIM_DEFAULT_TCP_PORT;
-
- if (*ip_end == ':') {
- // Extract the port
- long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
- if (lport <= 0 || lport > 0xffff) {
- // Invalid port, treat as named pipe or unix socket
- server_type = kServerTypePipe;
- } else {
- port = (int) lport;
- }
- }
-
- if (server_type == kServerTypeTcp) {
- // Try to parse ip address
- if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) {
- // Invalid address, treat as named pipe or unix socket
- server_type = kServerTypePipe;
- }
- }
-
- int result;
- uv_stream_t *stream = NULL;
-
- xstrlcpy(server->addr, addr, sizeof(server->addr));
-
- if (server_type == kServerTypeTcp) {
- // Listen on tcp address/port
- uv_tcp_init(&loop.uv, &server->socket.tcp.handle);
- result = uv_tcp_bind(&server->socket.tcp.handle,
- (const struct sockaddr *)&server->socket.tcp.addr,
- 0);
- stream = (uv_stream_t *)&server->socket.tcp.handle;
- } else {
- // Listen on named pipe or unix socket
- uv_pipe_init(&loop.uv, &server->socket.pipe.handle, 0);
- result = uv_pipe_bind(&server->socket.pipe.handle, server->addr);
- stream = (uv_stream_t *)&server->socket.pipe.handle;
- }
-
- stream->data = server;
-
- if (result == 0) {
- result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
- MAX_CONNECTIONS,
- connection_cb);
- }
-
- assert(result <= 0); // libuv should have returned -errno or zero.
+ int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb);
if (result < 0) {
- if (result == -EACCES) {
- // Libuv converts ENOENT to EACCES for Windows compatibility, but if
- // the parent directory does not exist, ENOENT would be more accurate.
- *path_tail((char_u *) addr) = NUL;
- if (!os_file_exists((char_u *) addr)) {
- result = -ENOENT;
- }
- }
- uv_close((uv_handle_t *)stream, free_server);
ELOG("Failed to start server: %s", uv_strerror(result));
+ socket_watcher_close(watcher, free_server);
return result;
}
// Update $NVIM_LISTEN_ADDRESS, if not set.
const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
if (listen_address == NULL) {
- os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1);
+ os_setenv(LISTEN_ADDRESS_ENV_VAR, watcher->addr, 1);
}
- server->type = server_type;
-
- // Add the server to the list.
- ga_grow(&servers, 1);
- ((Server **)servers.ga_data)[servers.ga_len++] = server;
+ // Add the watcher to the list.
+ ga_grow(&watchers, 1);
+ ((SocketWatcher **)watchers.ga_data)[watchers.ga_len++] = watcher;
// Update v:servername, if not set.
if (STRLEN(get_vim_var_str(VV_SEND_SERVER)) == 0) {
- set_vservername(&servers);
+ set_vservername(&watchers);
}
return 0;
@@ -240,21 +123,21 @@ int server_start(const char *endpoint)
/// @param endpoint Address of the server.
void server_stop(char *endpoint)
{
- Server *server;
+ SocketWatcher *watcher;
char addr[ADDRESS_MAX_SIZE];
// Trim to `ADDRESS_MAX_SIZE`
xstrlcpy(addr, endpoint, sizeof(addr));
int i = 0; // Index of the server whose address equals addr.
- for (; i < servers.ga_len; i++) {
- server = ((Server **)servers.ga_data)[i];
- if (strcmp(addr, server->addr) == 0) {
+ for (; i < watchers.ga_len; i++) {
+ watcher = ((SocketWatcher **)watchers.ga_data)[i];
+ if (strcmp(addr, watcher->addr) == 0) {
break;
}
}
- if (i >= servers.ga_len) {
+ if (i >= watchers.ga_len) {
ELOG("Not listening on %s", addr);
return;
}
@@ -265,18 +148,18 @@ void server_stop(char *endpoint)
os_unsetenv(LISTEN_ADDRESS_ENV_VAR);
}
- uv_close(server_handle(server), free_server);
+ socket_watcher_close(watcher, free_server);
// Remove this server from the list by swapping it with the last item.
- if (i != servers.ga_len - 1) {
- ((Server **)servers.ga_data)[i] =
- ((Server **)servers.ga_data)[servers.ga_len - 1];
+ if (i != watchers.ga_len - 1) {
+ ((SocketWatcher **)watchers.ga_data)[i] =
+ ((SocketWatcher **)watchers.ga_data)[watchers.ga_len - 1];
}
- servers.ga_len--;
+ watchers.ga_len--;
// If v:servername is the stopped address, re-initialize it.
if (STRCMP(addr, get_vim_var_str(VV_SEND_SERVER)) == 0) {
- set_vservername(&servers);
+ set_vservername(&watchers);
}
}
@@ -285,52 +168,28 @@ void server_stop(char *endpoint)
char **server_address_list(size_t *size)
FUNC_ATTR_NONNULL_ALL
{
- if ((*size = (size_t) servers.ga_len) == 0) {
+ if ((*size = (size_t)watchers.ga_len) == 0) {
return NULL;
}
- char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char *));
- for (int i = 0; i < servers.ga_len; i++) {
- addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr);
+ char **addrs = xcalloc((size_t)watchers.ga_len, sizeof(const char *));
+ for (int i = 0; i < watchers.ga_len; i++) {
+ addrs[i] = xstrdup(((SocketWatcher **)watchers.ga_data)[i]->addr);
}
return addrs;
}
-static void connection_cb(uv_stream_t *server, int status)
+static void connection_cb(SocketWatcher *watcher, int result, void *data)
{
- int result;
- uv_stream_t *client;
- Server *srv = server->data;
-
- if (status < 0) {
- abort();
- }
-
- if (srv->type == kServerTypeTcp) {
- client = xmalloc(sizeof(uv_tcp_t));
- uv_tcp_init(&loop.uv, (uv_tcp_t *)client);
- } else {
- client = xmalloc(sizeof(uv_pipe_t));
- uv_pipe_init(&loop.uv, (uv_pipe_t *)client, 0);
- }
-
- result = uv_accept(server, client);
-
if (result) {
ELOG("Failed to accept connection: %s", uv_strerror(result));
- uv_close((uv_handle_t *)client, free_client);
return;
}
- channel_from_stream(client);
-}
-
-static void free_client(uv_handle_t *handle)
-{
- xfree(handle);
+ channel_from_connection(watcher);
}
-static void free_server(uv_handle_t *handle)
+static void free_server(SocketWatcher *watcher, void *data)
{
- xfree(handle->data);
+ xfree(watcher);
}