diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/nvim/event/socket.c | 157 | ||||
| -rw-r--r-- | src/nvim/event/socket.h | 38 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 12 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 1 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/server.c | 223 | 
5 files changed, 242 insertions, 189 deletions
| diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c new file mode 100644 index 0000000000..bdc632abf0 --- /dev/null +++ b/src/nvim/event/socket.c @@ -0,0 +1,157 @@ +#include <assert.h> +#include <stdint.h> + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/socket.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/os/os.h" +#include "nvim/ascii.h" +#include "nvim/vim.h" +#include "nvim/strings.h" +#include "nvim/path.h" +#include "nvim/memory.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/socket.c.generated.h" +#endif + +#define NVIM_DEFAULT_TCP_PORT 7450 + +void socket_watcher_init(Loop *loop, SocketWatcher *watcher, +    const char *endpoint, void *data) +  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3) +{ +  // Trim to `ADDRESS_MAX_SIZE` +  if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr)) +      >= sizeof(watcher->addr)) { +    // TODO(aktau): since this is not what the user wanted, perhaps we +    // should return an error here +    WLOG("Address was too long, truncated to %s", watcher->addr); +  } + +  bool tcp = true; +  char ip[16], *ip_end = xstrchrnul(watcher->addr, ':'); + +  // (ip_end - addr) is always > 0, so convert to size_t +  size_t addr_len = (size_t)(ip_end - watcher->addr); + +  if (addr_len > sizeof(ip) - 1) { +    // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) +    addr_len = sizeof(ip) - 1; +  } + +  // Extract the address part +  xstrlcpy(ip, watcher->addr, addr_len + 1); +  int port = NVIM_DEFAULT_TCP_PORT; + +  if (*ip_end == ':') { +    // Extract the port +    long lport = strtol(ip_end + 1, NULL, 10); // NOLINT +    if (lport <= 0 || lport > 0xffff) { +      // Invalid port, treat as named pipe or unix socket +      tcp = false; +    } else { +      port = (int) lport; +    } +  } + +  if (tcp) { +    // Try to parse ip address +    if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) { +      // Invalid address, treat as named pipe or unix socket +      tcp = false; +    } +  } + +  if (tcp) { +    uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle); +    watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle; +  } else { +    uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0); +    watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle; +  } + +  watcher->stream->data = watcher; +  watcher->cb = NULL; +  watcher->close_cb = NULL; +} + +int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) +  FUNC_ATTR_NONNULL_ALL +{ +  watcher->cb = cb; +  int result; + +  if (watcher->stream->type == UV_TCP) { +    result = uv_tcp_bind(&watcher->uv.tcp.handle, +                         (const struct sockaddr *)&watcher->uv.tcp.addr, 0); +  } else { +    result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr); +  } + +  if (result == 0) { +    result = uv_listen(watcher->stream, backlog, connection_cb); +  } + +  assert(result <= 0);  // libuv should have returned -errno or zero. +  if (result < 0) { +    if (result == -EACCES) { +      // Libuv converts ENOENT to EACCES for Windows compatibility, but if +      // the parent directory does not exist, ENOENT would be more accurate. +      *path_tail((char_u *)watcher->addr) = NUL; +      if (!os_file_exists((char_u *)watcher->addr)) { +        result = -ENOENT; +      } +    } +    return result; +  } + +  return 0; +} + +int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) +{ +  uv_stream_t *client; + +  if (watcher->stream->type == UV_TCP) { +    client = (uv_stream_t *)&stream->uv.tcp; +    uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); +  } else { +    client = (uv_stream_t *)&stream->uv.pipe; +    uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); +  } + +  int result = uv_accept(watcher->stream, client); + +  if (result) { +    uv_close((uv_handle_t *)client, NULL); +    return result; +  } + +  stream_init(NULL, stream, -1, client, data); +  return 0; +} + +void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) +  FUNC_ATTR_NONNULL_ARG(1) +{ +  watcher->close_cb = cb; +  uv_close((uv_handle_t *)watcher->stream, close_cb); +} + +static void connection_cb(uv_stream_t *handle, int status) +{ +  SocketWatcher *watcher = handle->data; +  watcher->cb(watcher, status, watcher->data); +} + +static void close_cb(uv_handle_t *handle) +{ +  SocketWatcher *watcher = handle->data; +  if (watcher->close_cb) { +    watcher->close_cb(watcher, watcher->data); +  } +} diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h new file mode 100644 index 0000000000..17fd39f33b --- /dev/null +++ b/src/nvim/event/socket.h @@ -0,0 +1,38 @@ +#ifndef NVIM_EVENT_SOCKET_H +#define NVIM_EVENT_SOCKET_H + +#include <uv.h> + +#include "nvim/event/loop.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" + +#define ADDRESS_MAX_SIZE 256 + +typedef struct socket_watcher SocketWatcher; +typedef void (*socket_cb)(SocketWatcher *watcher, int result, void *data); +typedef void (*socket_close_cb)(SocketWatcher *watcher, void *data); + +struct socket_watcher { +  // Pipe/socket path, or TCP address string +  char addr[ADDRESS_MAX_SIZE]; +  // TCP server or unix socket (named pipe on Windows) +  union { +    struct { +      uv_tcp_t handle; +      struct sockaddr_in addr; +    } tcp; +    struct { +      uv_pipe_t handle; +    } pipe; +  } uv; +  uv_stream_t *stream; +  void *data; +  socket_cb cb; +  socket_close_cb close_cb; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/socket.h.generated.h" +#endif +#endif  // NVIM_EVENT_SOCKET_H diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 05badc72d4..577965e5ba 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -12,6 +12,7 @@  #include "nvim/event/loop.h"  #include "nvim/event/rstream.h"  #include "nvim/event/wstream.h" +#include "nvim/event/socket.h"  #include "nvim/os/job.h"  #include "nvim/os/job_defs.h"  #include "nvim/msgpack_rpc/helpers.h" @@ -140,17 +141,14 @@ uint64_t channel_from_job(char **argv)    return channel->id;  } -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection +/// Creates an API channel from a tcp/pipe socket connection  /// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *uvstream) +/// @param watcher The SocketWatcher ready to accept the connection +void channel_from_connection(SocketWatcher *watcher)  {    Channel *channel = register_channel(kChannelTypeSocket); -  stream_init(NULL, &channel->data.stream, -1, uvstream, channel); -  // write stream +  socket_watcher_accept(watcher, &channel->data.stream, channel);    wstream_init(&channel->data.stream, 0); -  // read stream    rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);    rstream_start(&channel->data.stream, parse_msgpack);  } diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index df742fe368..104547a7b8 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -5,6 +5,7 @@  #include <uv.h>  #include "nvim/api/private/defs.h" +#include "nvim/event/socket.h"  #include "nvim/vim.h"  #define METHOD_MAXLEN 512 diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index 1a78b3498f..8dffea35ca 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -3,11 +3,10 @@  #include <string.h>  #include <stdint.h> -#include <uv.h> -  #include "nvim/msgpack_rpc/channel.h"  #include "nvim/msgpack_rpc/server.h"  #include "nvim/os/os.h" +#include "nvim/event/socket.h"  #include "nvim/ascii.h"  #include "nvim/eval.h"  #include "nvim/garray.h" @@ -19,35 +18,9 @@  #include "nvim/strings.h"  #define MAX_CONNECTIONS 32 -#define ADDRESS_MAX_SIZE 256 -#define NVIM_DEFAULT_TCP_PORT 7450  #define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" -typedef enum { -  kServerTypeTcp, -  kServerTypePipe -} ServerType; - -typedef struct { -  // Pipe/socket path, or TCP address string -  char addr[ADDRESS_MAX_SIZE]; - -  // Type of the union below -  ServerType type; - -  // TCP server or unix socket (named pipe on Windows) -  union { -    struct { -      uv_tcp_t handle; -      struct sockaddr_in addr; -    } tcp; -    struct { -      uv_pipe_t handle; -    } pipe; -  } socket; -} Server; - -static garray_T servers = GA_EMPTY_INIT_VALUE; +static garray_T watchers = GA_EMPTY_INIT_VALUE;  #ifdef INCLUDE_GENERATED_DECLARATIONS  # include "msgpack_rpc/server.c.generated.h" @@ -56,7 +29,7 @@ static garray_T servers = GA_EMPTY_INIT_VALUE;  /// Initializes the module  bool server_init(void)  { -  ga_init(&servers, sizeof(Server *), 1); +  ga_init(&watchers, sizeof(SocketWatcher *), 1);    bool must_free = false;    const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); @@ -72,18 +45,10 @@ bool server_init(void)    return ok;  } -/// Retrieve the file handle from a server. -static uv_handle_t *server_handle(Server *server) -{ -  return server->type == kServerTypeTcp -    ? (uv_handle_t *)&server->socket.tcp.handle -    : (uv_handle_t *) &server->socket.pipe.handle; -} -  /// Teardown a single server -static void server_close_cb(Server **server) +static void close_socket_watcher(SocketWatcher **watcher)  { -  uv_close(server_handle(*server), free_server); +  socket_watcher_close(*watcher, free_server);  }  /// Set v:servername to the first server in the server list, or unset it if no @@ -91,7 +56,7 @@ static void server_close_cb(Server **server)  static void set_vservername(garray_T *srvs)  {    char *default_server = (srvs->ga_len > 0) -    ? ((Server **)srvs->ga_data)[0]->addr +    ? ((SocketWatcher **)srvs->ga_data)[0]->addr      : NULL;    set_vim_var_string(VV_SEND_SERVER, (char_u *)default_server, -1);  } @@ -99,7 +64,7 @@ static void set_vservername(garray_T *srvs)  /// Teardown the server module  void server_teardown(void)  { -  GA_DEEP_CLEAR(&servers, Server *, server_close_cb); +  GA_DEEP_CLEAR(&watchers, SocketWatcher *, close_socket_watcher);  }  /// Starts listening for API calls on the TCP address or pipe path `endpoint`. @@ -116,120 +81,38 @@ void server_teardown(void)  int server_start(const char *endpoint)    FUNC_ATTR_NONNULL_ALL  { -  char addr[ADDRESS_MAX_SIZE]; - -  // Trim to `ADDRESS_MAX_SIZE` -  if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { -    // TODO(aktau): since this is not what the user wanted, perhaps we -    // should return an error here -    WLOG("Address was too long, truncated to %s", addr); -  } - -  // Check if the server already exists -  for (int i = 0; i < servers.ga_len; i++) { -    if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) { -      ELOG("Already listening on %s", addr); +  SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher)); +  socket_watcher_init(&loop, watcher, endpoint, NULL); + +  // Check if a watcher for the endpoint already exists +  for (int i = 0; i < watchers.ga_len; i++) { +    if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { +      ELOG("Already listening on %s", watcher->addr); +      socket_watcher_close(watcher, free_server);        return 1;      }    } -  ServerType server_type = kServerTypeTcp; -  Server *server = xmalloc(sizeof(Server)); -  char ip[16], *ip_end = strrchr(addr, ':'); - -  if (!ip_end) { -    ip_end = strchr(addr, NUL); -  } - -  // (ip_end - addr) is always > 0, so convert to size_t -  size_t addr_len = (size_t)(ip_end - addr); - -  if (addr_len > sizeof(ip) - 1) { -    // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) -    addr_len = sizeof(ip) - 1; -  } - -  // Extract the address part -  xstrlcpy(ip, addr, addr_len + 1); - -  int port = NVIM_DEFAULT_TCP_PORT; - -  if (*ip_end == ':') { -    // Extract the port -    long lport = strtol(ip_end + 1, NULL, 10); // NOLINT -    if (lport <= 0 || lport > 0xffff) { -      // Invalid port, treat as named pipe or unix socket -      server_type = kServerTypePipe; -    } else { -      port = (int) lport; -    } -  } - -  if (server_type == kServerTypeTcp) { -    // Try to parse ip address -    if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { -      // Invalid address, treat as named pipe or unix socket -      server_type = kServerTypePipe; -    } -  } - -  int result; -  uv_stream_t *stream = NULL; - -  xstrlcpy(server->addr, addr, sizeof(server->addr)); - -  if (server_type == kServerTypeTcp) { -    // Listen on tcp address/port -    uv_tcp_init(&loop.uv, &server->socket.tcp.handle); -    result = uv_tcp_bind(&server->socket.tcp.handle, -                         (const struct sockaddr *)&server->socket.tcp.addr, -                         0); -    stream = (uv_stream_t *)&server->socket.tcp.handle; -  } else { -    // Listen on named pipe or unix socket -    uv_pipe_init(&loop.uv, &server->socket.pipe.handle, 0); -    result = uv_pipe_bind(&server->socket.pipe.handle, server->addr); -    stream = (uv_stream_t *)&server->socket.pipe.handle; -  } - -  stream->data = server; - -  if (result == 0) { -    result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, -                       MAX_CONNECTIONS, -                       connection_cb); -  } - -  assert(result <= 0);  // libuv should have returned -errno or zero. +  int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb);    if (result < 0) { -    if (result == -EACCES) { -      // Libuv converts ENOENT to EACCES for Windows compatibility, but if -      // the parent directory does not exist, ENOENT would be more accurate. -      *path_tail((char_u *) addr) = NUL; -      if (!os_file_exists((char_u *) addr)) { -        result = -ENOENT; -      } -    } -    uv_close((uv_handle_t *)stream, free_server);      ELOG("Failed to start server: %s", uv_strerror(result)); +    socket_watcher_close(watcher, free_server);      return result;    }    // Update $NVIM_LISTEN_ADDRESS, if not set.    const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);    if (listen_address == NULL) { -    os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1); +    os_setenv(LISTEN_ADDRESS_ENV_VAR, watcher->addr, 1);    } -  server->type = server_type; - -  // Add the server to the list. -  ga_grow(&servers, 1); -  ((Server **)servers.ga_data)[servers.ga_len++] = server; +  // Add the watcher to the list. +  ga_grow(&watchers, 1); +  ((SocketWatcher **)watchers.ga_data)[watchers.ga_len++] = watcher;    // Update v:servername, if not set.    if (STRLEN(get_vim_var_str(VV_SEND_SERVER)) == 0) { -    set_vservername(&servers); +    set_vservername(&watchers);    }    return 0; @@ -240,21 +123,21 @@ int server_start(const char *endpoint)  /// @param endpoint Address of the server.  void server_stop(char *endpoint)  { -  Server *server; +  SocketWatcher *watcher;    char addr[ADDRESS_MAX_SIZE];    // Trim to `ADDRESS_MAX_SIZE`    xstrlcpy(addr, endpoint, sizeof(addr));    int i = 0;  // Index of the server whose address equals addr. -  for (; i < servers.ga_len; i++) { -    server = ((Server **)servers.ga_data)[i]; -    if (strcmp(addr, server->addr) == 0) { +  for (; i < watchers.ga_len; i++) { +    watcher = ((SocketWatcher **)watchers.ga_data)[i]; +    if (strcmp(addr, watcher->addr) == 0) {        break;      }    } -  if (i >= servers.ga_len) { +  if (i >= watchers.ga_len) {      ELOG("Not listening on %s", addr);      return;    } @@ -265,18 +148,18 @@ void server_stop(char *endpoint)      os_unsetenv(LISTEN_ADDRESS_ENV_VAR);    } -  uv_close(server_handle(server), free_server); +  socket_watcher_close(watcher, free_server);    // Remove this server from the list by swapping it with the last item. -  if (i != servers.ga_len - 1) { -    ((Server **)servers.ga_data)[i] = -      ((Server **)servers.ga_data)[servers.ga_len - 1]; +  if (i != watchers.ga_len - 1) { +    ((SocketWatcher **)watchers.ga_data)[i] = +      ((SocketWatcher **)watchers.ga_data)[watchers.ga_len - 1];    } -  servers.ga_len--; +  watchers.ga_len--;    // If v:servername is the stopped address, re-initialize it.    if (STRCMP(addr, get_vim_var_str(VV_SEND_SERVER)) == 0) { -    set_vservername(&servers); +    set_vservername(&watchers);    }  } @@ -285,52 +168,28 @@ void server_stop(char *endpoint)  char **server_address_list(size_t *size)    FUNC_ATTR_NONNULL_ALL  { -  if ((*size = (size_t) servers.ga_len) == 0) { +  if ((*size = (size_t)watchers.ga_len) == 0) {      return NULL;    } -  char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char *)); -  for (int i = 0; i < servers.ga_len; i++) { -    addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr); +  char **addrs = xcalloc((size_t)watchers.ga_len, sizeof(const char *)); +  for (int i = 0; i < watchers.ga_len; i++) { +    addrs[i] = xstrdup(((SocketWatcher **)watchers.ga_data)[i]->addr);    }    return addrs;  } -static void connection_cb(uv_stream_t *server, int status) +static void connection_cb(SocketWatcher *watcher, int result, void *data)  { -  int result; -  uv_stream_t *client; -  Server *srv = server->data; - -  if (status < 0) { -    abort(); -  } - -  if (srv->type == kServerTypeTcp) { -    client = xmalloc(sizeof(uv_tcp_t)); -    uv_tcp_init(&loop.uv, (uv_tcp_t *)client); -  } else { -    client = xmalloc(sizeof(uv_pipe_t)); -    uv_pipe_init(&loop.uv, (uv_pipe_t *)client, 0); -  } - -  result = uv_accept(server, client); -    if (result) {      ELOG("Failed to accept connection: %s", uv_strerror(result)); -    uv_close((uv_handle_t *)client, free_client);      return;    } -  channel_from_stream(client); -} - -static void free_client(uv_handle_t *handle) -{ -  xfree(handle); +  channel_from_connection(watcher);  } -static void free_server(uv_handle_t *handle) +static void free_server(SocketWatcher *watcher, void *data)  { -  xfree(handle->data); +  xfree(watcher);  } | 
