diff options
| author | Jakob Schnitzer <mail@jakobschnitzer.de> | 2017-06-28 16:52:04 +0200 |
|---|---|---|
| committer | Jakob Schnitzer <mail@jakobschnitzer.de> | 2017-06-28 16:52:04 +0200 |
| commit | e8829710bc5f38208499e0ad38402eac24a67ac2 (patch) | |
| tree | 4e1ae954c2e301adadbfa7038b823ea9ea2fb08e /src/nvim/event | |
| parent | ff8b2eb435c518f0eafd0e509afe1f5ee4a81fd1 (diff) | |
| parent | f0dafa89c2b7602cfedf0bd3409858e4c212b0a2 (diff) | |
| download | rneovim-e8829710bc5f38208499e0ad38402eac24a67ac2.tar.gz rneovim-e8829710bc5f38208499e0ad38402eac24a67ac2.tar.bz2 rneovim-e8829710bc5f38208499e0ad38402eac24a67ac2.zip | |
Merge branch 'master' into option-fixes
Diffstat (limited to 'src/nvim/event')
| -rw-r--r-- | src/nvim/event/defs.h | 8 | ||||
| -rw-r--r-- | src/nvim/event/libuv_process.c | 10 | ||||
| -rw-r--r-- | src/nvim/event/loop.c | 24 | ||||
| -rw-r--r-- | src/nvim/event/multiqueue.c | 48 | ||||
| -rw-r--r-- | src/nvim/event/multiqueue.h | 2 | ||||
| -rw-r--r-- | src/nvim/event/process.c | 10 | ||||
| -rw-r--r-- | src/nvim/event/socket.c | 206 | ||||
| -rw-r--r-- | src/nvim/event/socket.h | 2 | ||||
| -rw-r--r-- | src/nvim/event/stream.c | 8 |
9 files changed, 226 insertions, 92 deletions
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h index e5335d9f25..cc875d74b9 100644 --- a/src/nvim/event/defs.h +++ b/src/nvim/event/defs.h @@ -8,16 +8,14 @@ typedef void (*argv_callback)(void **argv); typedef struct message { - int priority; argv_callback handler; void *argv[EVENT_HANDLER_MAX_ARGC]; } Event; typedef void(*event_scheduler)(Event event, void *data); -#define VA_EVENT_INIT(event, p, h, a) \ +#define VA_EVENT_INIT(event, h, a) \ do { \ assert(a <= EVENT_HANDLER_MAX_ARGC); \ - (event)->priority = p; \ (event)->handler = h; \ if (a) { \ va_list args; \ @@ -29,11 +27,11 @@ typedef void(*event_scheduler)(Event event, void *data); } \ } while (0) -static inline Event event_create(int priority, argv_callback cb, int argc, ...) +static inline Event event_create(argv_callback cb, int argc, ...) { assert(argc <= EVENT_HANDLER_MAX_ARGC); Event event; - VA_EVENT_INIT(&event, priority, cb, argc); + VA_EVENT_INIT(&event, cb, argc); return event; } diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c index 3116adbde8..f6a567a520 100644 --- a/src/nvim/event/libuv_process.c +++ b/src/nvim/event/libuv_process.c @@ -11,6 +11,7 @@ #include "nvim/event/process.h" #include "nvim/event/libuv_process.h" #include "nvim/log.h" +#include "nvim/macros.h" #include "nvim/os/os.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -47,17 +48,20 @@ int libuv_process_spawn(LibuvProcess *uvproc) if (proc->in) { uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - uvproc->uvstdio[0].data.stream = (uv_stream_t *)&proc->in->uv.pipe; + uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t, + &proc->in->uv.pipe); } if (proc->out) { uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - uvproc->uvstdio[1].data.stream = (uv_stream_t *)&proc->out->uv.pipe; + uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t, + &proc->out->uv.pipe); } if (proc->err) { uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; - uvproc->uvstdio[2].data.stream = (uv_stream_t *)&proc->err->uv.pipe; + uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t, + &proc->err->uv.pipe); } int status; diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 6963978581..25701a1621 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -8,6 +8,7 @@ #include "nvim/event/loop.h" #include "nvim/event/process.h" +#include "nvim/log.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.c.generated.h" @@ -44,8 +45,7 @@ void loop_poll_events(Loop *loop, int ms) // we do not block indefinitely for I/O. uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms); } else if (ms == 0) { - // For ms == 0, we need to do a non-blocking event poll by - // setting the run mode to UV_RUN_NOWAIT. + // For ms == 0, do a non-blocking event poll. mode = UV_RUN_NOWAIT; } @@ -79,20 +79,34 @@ void loop_on_put(MultiQueue *queue, void *data) uv_stop(&loop->uv); } -void loop_close(Loop *loop, bool wait) +/// @returns false if the loop could not be closed gracefully +bool loop_close(Loop *loop, bool wait) { + bool rv = true; uv_mutex_destroy(&loop->mutex); uv_close((uv_handle_t *)&loop->children_watcher, NULL); uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); uv_close((uv_handle_t *)&loop->poll_timer, NULL); uv_close((uv_handle_t *)&loop->async, NULL); - do { + uint64_t start = wait ? os_hrtime() : 0; + while (true) { uv_run(&loop->uv, wait ? UV_RUN_DEFAULT : UV_RUN_NOWAIT); - } while (uv_loop_close(&loop->uv) && wait); + if (!uv_loop_close(&loop->uv) || !wait) { + break; + } + if (os_hrtime() - start >= 2 * 1000000000) { + // Some libuv resource was not correctly deref'd. Log and bail. + rv = false; + ELOG("uv_loop_close() hang?"); + log_uv_handles(&loop->uv); + break; + } + } multiqueue_free(loop->fast_events); multiqueue_free(loop->thread_events); multiqueue_free(loop->events); kl_destroy(WatcherPtr, loop->children); + return rv; } void loop_purge(Loop *loop) diff --git a/src/nvim/event/multiqueue.c b/src/nvim/event/multiqueue.c index a17bae31e3..ef9f3f1870 100644 --- a/src/nvim/event/multiqueue.c +++ b/src/nvim/event/multiqueue.c @@ -126,6 +126,7 @@ void multiqueue_free(MultiQueue *this) xfree(this); } +/// Removes the next item and returns its Event. Event multiqueue_get(MultiQueue *this) { return multiqueue_empty(this) ? NILEVENT : multiqueue_remove(this); @@ -144,7 +145,7 @@ void multiqueue_process_events(MultiQueue *this) { assert(this); while (!multiqueue_empty(this)) { - Event event = multiqueue_get(this); + Event event = multiqueue_remove(this); if (event.handler) { event.handler(event.argv); } @@ -178,36 +179,48 @@ size_t multiqueue_size(MultiQueue *this) return this->size; } -static Event multiqueue_remove(MultiQueue *this) +/// Gets an Event from an item. +/// +/// @param remove Remove the node from its queue, and free it. +static Event multiqueueitem_get_event(MultiQueueItem *item, bool remove) { - assert(!multiqueue_empty(this)); - QUEUE *h = QUEUE_HEAD(&this->headtail); - QUEUE_REMOVE(h); - MultiQueueItem *item = multiqueue_node_data(h); - Event rv; - + assert(item != NULL); + Event ev; if (item->link) { - assert(!this->parent); - // remove the next node in the linked queue + // get the next node in the linked queue MultiQueue *linked = item->data.queue; assert(!multiqueue_empty(linked)); MultiQueueItem *child = multiqueue_node_data(QUEUE_HEAD(&linked->headtail)); - QUEUE_REMOVE(&child->node); - rv = child->data.item.event; - xfree(child); + ev = child->data.item.event; + // remove the child node + if (remove) { + QUEUE_REMOVE(&child->node); + xfree(child); + } } else { - if (this->parent) { - // remove the corresponding link node in the parent queue + // remove the corresponding link node in the parent queue + if (remove && item->data.item.parent_item) { QUEUE_REMOVE(&item->data.item.parent_item->node); xfree(item->data.item.parent_item); + item->data.item.parent_item = NULL; } - rv = item->data.item.event; + ev = item->data.item.event; } + return ev; +} +static Event multiqueue_remove(MultiQueue *this) +{ + assert(!multiqueue_empty(this)); + QUEUE *h = QUEUE_HEAD(&this->headtail); + QUEUE_REMOVE(h); + MultiQueueItem *item = multiqueue_node_data(h); + assert(!item->link || !this->parent); // Only a parent queue has link-nodes + Event ev = multiqueueitem_get_event(item, true); this->size--; xfree(item); - return rv; + return ev; } static void multiqueue_push(MultiQueue *this, Event event) @@ -215,6 +228,7 @@ static void multiqueue_push(MultiQueue *this, Event event) MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem)); item->link = false; item->data.item.event = event; + item->data.item.parent_item = NULL; QUEUE_INSERT_TAIL(&this->headtail, &item->node); if (this->parent) { // push link node to the parent queue diff --git a/src/nvim/event/multiqueue.h b/src/nvim/event/multiqueue.h index def6b95a10..a688107665 100644 --- a/src/nvim/event/multiqueue.h +++ b/src/nvim/event/multiqueue.h @@ -10,7 +10,7 @@ typedef struct multiqueue MultiQueue; typedef void (*put_callback)(MultiQueue *multiq, void *data); #define multiqueue_put(q, h, ...) \ - multiqueue_put_event(q, event_create(1, h, __VA_ARGS__)); + multiqueue_put_event(q, event_create(h, __VA_ARGS__)); #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index ffda10a494..cad49e2007 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -14,6 +14,7 @@ #include "nvim/event/libuv_process.h" #include "nvim/os/pty_process.h" #include "nvim/globals.h" +#include "nvim/macros.h" #include "nvim/log.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -82,7 +83,8 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->in) { - stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe); + stream_init(NULL, proc->in, -1, + STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe)); proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; @@ -90,7 +92,8 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->out) { - stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe); + stream_init(NULL, proc->out, -1, + STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe)); proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; @@ -98,7 +101,8 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->err) { - stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe); + stream_init(NULL, proc->err, -1, + STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe)); proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index e536d79a2a..a796f303ab 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -15,99 +15,122 @@ #include "nvim/vim.h" #include "nvim/strings.h" #include "nvim/path.h" +#include "nvim/main.h" #include "nvim/memory.h" +#include "nvim/macros.h" +#include "nvim/charset.h" +#include "nvim/log.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/socket.c.generated.h" #endif -#define NVIM_DEFAULT_TCP_PORT 7450 - -void socket_watcher_init(Loop *loop, SocketWatcher *watcher, - const char *endpoint, void *data) - FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3) +int socket_watcher_init(Loop *loop, SocketWatcher *watcher, + const char *endpoint) + FUNC_ATTR_NONNULL_ALL { - // Trim to `ADDRESS_MAX_SIZE` - if (xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr)) - >= sizeof(watcher->addr)) { - // TODO(aktau): since this is not what the user wanted, perhaps we - // should return an error here - WLOG("Address was too long, truncated to %s", watcher->addr); - } + xstrlcpy(watcher->addr, endpoint, sizeof(watcher->addr)); + char *addr = watcher->addr; + char *host_end = strrchr(addr, ':'); - bool tcp = true; - char ip[16], *ip_end = xstrchrnul(watcher->addr, ':'); + if (host_end && addr != host_end) { + // Split user specified address into two strings, addr(hostname) and port. + // The port part in watcher->addr will be updated later. + *host_end = '\0'; + char *port = host_end + 1; + intmax_t iport; - // (ip_end - addr) is always > 0, so convert to size_t - size_t addr_len = (size_t)(ip_end - watcher->addr); - - if (addr_len > sizeof(ip) - 1) { - // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) - addr_len = sizeof(ip) - 1; - } + int ret = getdigits_safe(&(char_u *){ (char_u *)port }, &iport); + if (ret == FAIL || iport < 0 || iport > UINT16_MAX) { + ELOG("Invalid port: %s", port); + return UV_EINVAL; + } - // Extract the address part - xstrlcpy(ip, watcher->addr, addr_len + 1); - int port = NVIM_DEFAULT_TCP_PORT; - - if (*ip_end == ':') { - // Extract the port - long lport = strtol(ip_end + 1, NULL, 10); // NOLINT - if (lport <= 0 || lport > 0xffff) { - // Invalid port, treat as named pipe or unix socket - tcp = false; - } else { - port = (int) lport; + if (*port == NUL) { + // When no port is given, (uv_)getaddrinfo expects NULL otherwise the + // implementation may attempt to lookup the service by name (and fail) + port = NULL; } - } - if (tcp) { - // Try to parse ip address - if (uv_ip4_addr(ip, port, &watcher->uv.tcp.addr)) { - // Invalid address, treat as named pipe or unix socket - tcp = false; + uv_getaddrinfo_t request; + + int retval = uv_getaddrinfo(&loop->uv, &request, NULL, addr, port, + &(struct addrinfo){ + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + }); + if (retval != 0) { + ELOG("Host lookup failed: %s", endpoint); + return retval; } - } + watcher->uv.tcp.addrinfo = request.addrinfo; - if (tcp) { uv_tcp_init(&loop->uv, &watcher->uv.tcp.handle); - watcher->stream = (uv_stream_t *)&watcher->uv.tcp.handle; + uv_tcp_nodelay(&watcher->uv.tcp.handle, true); + watcher->stream = STRUCT_CAST(uv_stream_t, &watcher->uv.tcp.handle); } else { uv_pipe_init(&loop->uv, &watcher->uv.pipe.handle, 0); - watcher->stream = (uv_stream_t *)&watcher->uv.pipe.handle; + watcher->stream = STRUCT_CAST(uv_stream_t, &watcher->uv.pipe.handle); } watcher->stream->data = watcher; watcher->cb = NULL; watcher->close_cb = NULL; watcher->events = NULL; + watcher->data = NULL; + + return 0; } int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) FUNC_ATTR_NONNULL_ALL { watcher->cb = cb; - int result; + int result = UV_EINVAL; if (watcher->stream->type == UV_TCP) { - result = uv_tcp_bind(&watcher->uv.tcp.handle, - (const struct sockaddr *)&watcher->uv.tcp.addr, 0); + struct addrinfo *ai = watcher->uv.tcp.addrinfo; + + for (; ai; ai = ai->ai_next) { + result = uv_tcp_bind(&watcher->uv.tcp.handle, ai->ai_addr, 0); + if (result != 0) { + continue; + } + result = uv_listen(watcher->stream, backlog, connection_cb); + if (result == 0) { + struct sockaddr_storage sas; + + // When the endpoint in socket_watcher_init() didn't specify a port + // number, a free random port number will be assigned. sin_port will + // contain 0 in this case, unless uv_tcp_getsockname() is used first. + uv_tcp_getsockname(&watcher->uv.tcp.handle, (struct sockaddr *)&sas, + &(int){ sizeof(sas) }); + uint16_t port = (uint16_t)((sas.ss_family == AF_INET) + ? ((struct sockaddr_in *)&sas)->sin_port + : ((struct sockaddr_in6 *)&sas)->sin6_port); + // v:servername uses the string from watcher->addr + size_t len = strlen(watcher->addr); + snprintf(watcher->addr+len, sizeof(watcher->addr)-len, ":%" PRIu16, + ntohs(port)); + break; + } + } + uv_freeaddrinfo(watcher->uv.tcp.addrinfo); } else { result = uv_pipe_bind(&watcher->uv.pipe.handle, watcher->addr); - } - - if (result == 0) { - result = uv_listen(watcher->stream, backlog, connection_cb); + if (result == 0) { + result = uv_listen(watcher->stream, backlog, connection_cb); + } } assert(result <= 0); // libuv should return negative error code or zero. if (result < 0) { - if (result == -EACCES) { + if (result == UV_EACCES) { // Libuv converts ENOENT to EACCES for Windows compatibility, but if // the parent directory does not exist, ENOENT would be more accurate. *path_tail((char_u *)watcher->addr) = NUL; if (!os_path_exists((char_u *)watcher->addr)) { - result = -ENOENT; + result = UV_ENOENT; } } return result; @@ -122,10 +145,11 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) uv_stream_t *client; if (watcher->stream->type == UV_TCP) { - client = (uv_stream_t *)&stream->uv.tcp; + client = STRUCT_CAST(uv_stream_t, &stream->uv.tcp); uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); + uv_tcp_nodelay((uv_tcp_t *)client, true); } else { - client = (uv_stream_t *)&stream->uv.pipe; + client = STRUCT_CAST(uv_stream_t, &stream->uv.pipe); uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); } @@ -168,3 +192,77 @@ static void close_cb(uv_handle_t *handle) watcher->close_cb(watcher, watcher->data); } } + +static void connect_cb(uv_connect_t *req, int status) +{ + int *ret_status = req->data; + *ret_status = status; + if (status != 0) { + uv_close((uv_handle_t *)req->handle, NULL); + } +} + +bool socket_connect(Loop *loop, Stream *stream, + bool is_tcp, const char *address, + int timeout, const char **error) +{ + bool success = false; + int status; + uv_connect_t req; + req.data = &status; + uv_stream_t *uv_stream; + + uv_tcp_t *tcp = &stream->uv.tcp; + uv_getaddrinfo_t addr_req; + addr_req.addrinfo = NULL; + const struct addrinfo *addrinfo = NULL; + char *addr = NULL; + if (is_tcp) { + addr = xstrdup(address); + char *host_end = strrchr(addr, ':'); + if (!host_end) { + *error = _("tcp address must be host:port"); + goto cleanup; + } + *host_end = NUL; + + const struct addrinfo hints = { .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + .ai_flags = AI_NUMERICSERV }; + int retval = uv_getaddrinfo(&loop->uv, &addr_req, NULL, + addr, host_end+1, &hints); + if (retval != 0) { + *error = _("failed to lookup host or port"); + goto cleanup; + } + addrinfo = addr_req.addrinfo; + +tcp_retry: + uv_tcp_init(&loop->uv, tcp); + uv_tcp_nodelay(tcp, true); + uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb); + uv_stream = (uv_stream_t *)tcp; + + } else { + uv_pipe_t *pipe = &stream->uv.pipe; + uv_pipe_init(&loop->uv, pipe, 0); + uv_pipe_connect(&req, pipe, address, connect_cb); + uv_stream = (uv_stream_t *)pipe; + } + status = 1; + LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1); + if (status == 0) { + stream_init(NULL, stream, -1, uv_stream); + success = true; + } else if (is_tcp && addrinfo->ai_next) { + addrinfo = addrinfo->ai_next; + goto tcp_retry; + } else { + *error = _("connection refused"); + } + +cleanup: + xfree(addr); + uv_freeaddrinfo(addr_req.addrinfo); + return success; +} diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index eb0823c76d..d30ae45502 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -20,7 +20,7 @@ struct socket_watcher { union { struct { uv_tcp_t handle; - struct sockaddr_in addr; + struct addrinfo *addrinfo; } tcp; struct { uv_pipe_t handle; diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 860a957b3e..60ceff9b24 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -8,6 +8,7 @@ #include <uv.h> #include "nvim/rbuffer.h" +#include "nvim/macros.h" #include "nvim/event/stream.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -26,8 +27,9 @@ int stream_set_blocking(int fd, bool blocking) uv_loop_init(&loop); uv_pipe_init(&loop, &stream, 0); uv_pipe_open(&stream, fd); - int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); - uv_close((uv_handle_t *)&stream, NULL); + int retval = uv_stream_set_blocking(STRUCT_CAST(uv_stream_t, &stream), + blocking); + uv_close(STRUCT_CAST(uv_handle_t, &stream), NULL); uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt. uv_loop_close(&loop); return retval; @@ -52,7 +54,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) assert(type == UV_NAMED_PIPE || type == UV_TTY); uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); uv_pipe_open(&stream->uv.pipe, fd); - stream->uvstream = (uv_stream_t *)&stream->uv.pipe; + stream->uvstream = STRUCT_CAST(uv_stream_t, &stream->uv.pipe); } } |