diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:46:34 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2015-07-17 00:46:34 -0300 |
commit | 883b78d29864f39b8032468c4374766dad7d142f (patch) | |
tree | b555f3a48c08862c07ef7518a8ba6c8fa58c1aee /src/nvim/msgpack_rpc | |
parent | d88c93acf390ea9d5e8674283927cff60fb41e0d (diff) | |
parent | aa9cb48bf08af14068178619414590254b263882 (diff) | |
download | rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.gz rneovim-883b78d29864f39b8032468c4374766dad7d142f.tar.bz2 rneovim-883b78d29864f39b8032468c4374766dad7d142f.zip |
Merge PR #2980 'Refactor event loop layer'
Helped-by: oni-link <knil.ino@gmail.com>
Reviewed-by: oni-link <knil.ino@gmail.com>
Reviewed-by: Scott Prager <splinterofchaos@gmail.com>
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 195 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 1 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.h | 2 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/server.c | 223 |
4 files changed, 141 insertions, 280 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 2a81b4f160..861614f147 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -9,13 +9,11 @@ #include "nvim/api/vim.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/remote_ui.h" -#include "nvim/os/event.h" -#include "nvim/os/rstream.h" -#include "nvim/os/rstream_defs.h" -#include "nvim/os/wstream.h" -#include "nvim/os/wstream_defs.h" -#include "nvim/os/job.h" -#include "nvim/os/job_defs.h" +#include "nvim/event/loop.h" +#include "nvim/event/uv_process.h" +#include "nvim/event/rstream.h" +#include "nvim/event/wstream.h" +#include "nvim/event/socket.h" #include "nvim/msgpack_rpc/helpers.h" #include "nvim/vim.h" #include "nvim/ascii.h" @@ -34,6 +32,12 @@ #define log_server_msg(...) #endif +typedef enum { + kChannelTypeSocket, + kChannelTypeProc, + kChannelTypeStdio +} ChannelType; + typedef struct { uint64_t request_id; bool returned, errored; @@ -45,15 +49,21 @@ typedef struct { size_t refcount; size_t pending_requests; PMap(cstr_t) *subscribed_events; - bool is_job, closed; + bool closed; + ChannelType type; msgpack_unpacker *unpacker; union { - Job *job; + Stream stream; + struct { + UvProcess uvproc; + Stream in; + Stream out; + Stream err; + } process; struct { - RStream *read; - WStream *write; - uv_stream_t *uv; - } streams; + Stream in; + Stream out; + } std; } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; @@ -104,57 +114,48 @@ void channel_teardown(void) }); } -/// Creates an API channel by starting a job and connecting to its +/// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process. [consumed] /// @return The channel id (> 0), on success. /// 0, on error. -uint64_t channel_from_job(char **argv) -{ - Channel *channel = register_channel(); - channel->is_job = true; - incref(channel); // job channels are only closed by the exit_cb - - int status; - JobOptions opts = JOB_OPTIONS_INIT; - opts.argv = argv; - opts.data = channel; - opts.stdout_cb = job_out; - opts.stderr_cb = job_err; - opts.exit_cb = job_exit; - channel->data.job = job_start(opts, &status); - - if (status <= 0) { - if (status == 0) { // Two decrefs needed if status == 0. - decref(channel); // Only one needed if status < 0, - } // because exit_cb will do the second one. +uint64_t channel_from_process(char **argv) +{ + Channel *channel = register_channel(kChannelTypeProc); + channel->data.process.uvproc = uv_process_init(channel); + Process *proc = &channel->data.process.uvproc.process; + proc->argv = argv; + proc->in = &channel->data.process.in; + proc->out = &channel->data.process.out; + proc->err = &channel->data.process.err; + proc->cb = process_exit; + if (!process_spawn(&loop, proc)) { + loop_poll_events(&loop, 0); decref(channel); return 0; } + incref(channel); // process channels are only closed by the exit_cb + wstream_init(proc->in, 0); + rstream_init(proc->out, 0); + rstream_start(proc->out, parse_msgpack); + rstream_init(proc->err, 0); + rstream_start(proc->err, forward_stderr); + return channel->id; } -/// Creates an API channel from a libuv stream representing a tcp or -/// pipe/socket client connection +/// Creates an API channel from a tcp/pipe socket connection /// -/// @param stream The established connection -void channel_from_stream(uv_stream_t *stream) +/// @param watcher The SocketWatcher ready to accept the connection +void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(); - stream->data = NULL; - channel->is_job = false; - // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel); - rstream_set_stream(channel->data.streams.read, stream); - rstream_start(channel->data.streams.read); - // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_stream(channel->data.streams.write, stream); - channel->data.streams.uv = stream; + Channel *channel = register_channel(kChannelTypeSocket); + socket_watcher_accept(watcher, &channel->data.stream, channel); + wstream_init(&channel->data.stream, 0); + rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.stream, parse_msgpack); } /// Sends event/arguments to channel @@ -220,7 +221,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - event_poll_until(-1, frame.returned); + LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -313,44 +314,32 @@ bool channel_close(uint64_t id) /// Neovim static void channel_from_stdio(void) { - Channel *channel = register_channel(); + Channel *channel = register_channel(kChannelTypeStdio); incref(channel); // stdio channels are only closed on exit - channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(parse_msgpack, - rbuffer_new(CHANNEL_BUFFER_SIZE), - channel); - rstream_set_file(channel->data.streams.read, 0); - rstream_start(channel->data.streams.read); + rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, + channel); + rstream_start(&channel->data.std.in, parse_msgpack); // write stream - channel->data.streams.write = wstream_new(0); - wstream_set_file(channel->data.streams.write, 1); - channel->data.streams.uv = NULL; -} - -static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof) -{ - Job *job = data; - parse_msgpack(rstream, buf, job_data(job), eof); + wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1); buf[read] = NUL; - ELOG("Channel %" PRIu64 " stderr: %s", - ((Channel *)job_data(data))->id, buf); + ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf); } } -static void job_exit(Job *job, int status, void *data) +static void process_exit(Process *proc, int status, void *data) { decref(data); } -static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) { Channel *channel = data; incref(channel); @@ -362,9 +351,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof) } size_t count = rbuffer_size(rbuf); - DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)", count, - rstream); + stream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -474,7 +463,7 @@ static void handle_request(Channel *channel, msgpack_object *request) event_data->args = args; event_data->request_id = request_id; incref(channel); - event_push((Event) { + loop_push_event(&loop, (Event) { .handler = on_request_event, .data = event_data }, defer); @@ -516,10 +505,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer) return false; } - if (channel->is_job) { - success = job_write(channel->data.job, buffer); - } else { - success = wstream_write(channel->data.streams.write, buffer); + switch (channel->type) { + case kChannelTypeSocket: + success = wstream_write(&channel->data.stream, buffer); + break; + case kChannelTypeProc: + success = wstream_write(&channel->data.process.in, buffer); + break; + case kChannelTypeStdio: + success = wstream_write(&channel->data.std.out, buffer); + break; + default: + abort(); } if (!success) { @@ -628,7 +625,7 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/job and free the channel resources. +/// Close the channel streams/process and free the channel resources. static void close_channel(Channel *channel) { if (channel->closed) { @@ -637,19 +634,23 @@ static void close_channel(Channel *channel) channel->closed = true; - if (channel->is_job) { - if (channel->data.job) { - job_stop(channel->data.job); - } - } else { - rstream_free(channel->data.streams.read); - wstream_free(channel->data.streams.write); - uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; - if (handle) { - uv_close(handle, close_cb); - } else { - event_push((Event) { .handler = on_stdio_close, .data = channel }, false); - } + switch (channel->type) { + case kChannelTypeSocket: + stream_close(&channel->data.stream, close_cb); + break; + case kChannelTypeProc: + if (!channel->data.process.uvproc.process.closed) { + process_stop(&channel->data.process.uvproc.process); + } + break; + case kChannelTypeStdio: + stream_close(&channel->data.std.in, NULL); + stream_close(&channel->data.std.out, NULL); + loop_push_event(&loop, + (Event) { .handler = on_stdio_close, .data = channel }, false); + break; + default: + abort(); } decref(channel); @@ -682,15 +683,15 @@ static void free_channel(Channel *channel) xfree(channel); } -static void close_cb(uv_handle_t *handle) +static void close_cb(Stream *stream, void *data) { - xfree(handle->data); - xfree(handle); + xfree(data); } -static Channel *register_channel(void) +static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->type = type; rv->refcount = 1; rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index df742fe368..104547a7b8 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -5,6 +5,7 @@ #include <uv.h> #include "nvim/api/private/defs.h" +#include "nvim/event/socket.h" #include "nvim/vim.h" #define METHOD_MAXLEN 512 diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h index bf161d54e0..7d9f114140 100644 --- a/src/nvim/msgpack_rpc/helpers.h +++ b/src/nvim/msgpack_rpc/helpers.h @@ -6,7 +6,7 @@ #include <msgpack.h> -#include "nvim/os/wstream.h" +#include "nvim/event/wstream.h" #include "nvim/api/private/defs.h" #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index 388b5a04cf..8dffea35ca 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -3,11 +3,10 @@ #include <string.h> #include <stdint.h> -#include <uv.h> - #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" #include "nvim/os/os.h" +#include "nvim/event/socket.h" #include "nvim/ascii.h" #include "nvim/eval.h" #include "nvim/garray.h" @@ -19,35 +18,9 @@ #include "nvim/strings.h" #define MAX_CONNECTIONS 32 -#define ADDRESS_MAX_SIZE 256 -#define NVIM_DEFAULT_TCP_PORT 7450 #define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" -typedef enum { - kServerTypeTcp, - kServerTypePipe -} ServerType; - -typedef struct { - // Pipe/socket path, or TCP address string - char addr[ADDRESS_MAX_SIZE]; - - // Type of the union below - ServerType type; - - // TCP server or unix socket (named pipe on Windows) - union { - struct { - uv_tcp_t handle; - struct sockaddr_in addr; - } tcp; - struct { - uv_pipe_t handle; - } pipe; - } socket; -} Server; - -static garray_T servers = GA_EMPTY_INIT_VALUE; +static garray_T watchers = GA_EMPTY_INIT_VALUE; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/server.c.generated.h" @@ -56,7 +29,7 @@ static garray_T servers = GA_EMPTY_INIT_VALUE; /// Initializes the module bool server_init(void) { - ga_init(&servers, sizeof(Server *), 1); + ga_init(&watchers, sizeof(SocketWatcher *), 1); bool must_free = false; const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); @@ -72,18 +45,10 @@ bool server_init(void) return ok; } -/// Retrieve the file handle from a server. -static uv_handle_t *server_handle(Server *server) -{ - return server->type == kServerTypeTcp - ? (uv_handle_t *)&server->socket.tcp.handle - : (uv_handle_t *) &server->socket.pipe.handle; -} - /// Teardown a single server -static void server_close_cb(Server **server) +static void close_socket_watcher(SocketWatcher **watcher) { - uv_close(server_handle(*server), free_server); + socket_watcher_close(*watcher, free_server); } /// Set v:servername to the first server in the server list, or unset it if no @@ -91,7 +56,7 @@ static void server_close_cb(Server **server) static void set_vservername(garray_T *srvs) { char *default_server = (srvs->ga_len > 0) - ? ((Server **)srvs->ga_data)[0]->addr + ? ((SocketWatcher **)srvs->ga_data)[0]->addr : NULL; set_vim_var_string(VV_SEND_SERVER, (char_u *)default_server, -1); } @@ -99,7 +64,7 @@ static void set_vservername(garray_T *srvs) /// Teardown the server module void server_teardown(void) { - GA_DEEP_CLEAR(&servers, Server *, server_close_cb); + GA_DEEP_CLEAR(&watchers, SocketWatcher *, close_socket_watcher); } /// Starts listening for API calls on the TCP address or pipe path `endpoint`. @@ -116,120 +81,38 @@ void server_teardown(void) int server_start(const char *endpoint) FUNC_ATTR_NONNULL_ALL { - char addr[ADDRESS_MAX_SIZE]; - - // Trim to `ADDRESS_MAX_SIZE` - if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) { - // TODO(aktau): since this is not what the user wanted, perhaps we - // should return an error here - WLOG("Address was too long, truncated to %s", addr); - } - - // Check if the server already exists - for (int i = 0; i < servers.ga_len; i++) { - if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) { - ELOG("Already listening on %s", addr); + SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher)); + socket_watcher_init(&loop, watcher, endpoint, NULL); + + // Check if a watcher for the endpoint already exists + for (int i = 0; i < watchers.ga_len; i++) { + if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { + ELOG("Already listening on %s", watcher->addr); + socket_watcher_close(watcher, free_server); return 1; } } - ServerType server_type = kServerTypeTcp; - Server *server = xmalloc(sizeof(Server)); - char ip[16], *ip_end = strrchr(addr, ':'); - - if (!ip_end) { - ip_end = strchr(addr, NUL); - } - - // (ip_end - addr) is always > 0, so convert to size_t - size_t addr_len = (size_t)(ip_end - addr); - - if (addr_len > sizeof(ip) - 1) { - // Maximum length of an IPv4 address buffer is 15 (eg: 255.255.255.255) - addr_len = sizeof(ip) - 1; - } - - // Extract the address part - xstrlcpy(ip, addr, addr_len + 1); - - int port = NVIM_DEFAULT_TCP_PORT; - - if (*ip_end == ':') { - // Extract the port - long lport = strtol(ip_end + 1, NULL, 10); // NOLINT - if (lport <= 0 || lport > 0xffff) { - // Invalid port, treat as named pipe or unix socket - server_type = kServerTypePipe; - } else { - port = (int) lport; - } - } - - if (server_type == kServerTypeTcp) { - // Try to parse ip address - if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) { - // Invalid address, treat as named pipe or unix socket - server_type = kServerTypePipe; - } - } - - int result; - uv_stream_t *stream = NULL; - - xstrlcpy(server->addr, addr, sizeof(server->addr)); - - if (server_type == kServerTypeTcp) { - // Listen on tcp address/port - uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); - result = uv_tcp_bind(&server->socket.tcp.handle, - (const struct sockaddr *)&server->socket.tcp.addr, - 0); - stream = (uv_stream_t *)&server->socket.tcp.handle; - } else { - // Listen on named pipe or unix socket - uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); - result = uv_pipe_bind(&server->socket.pipe.handle, server->addr); - stream = (uv_stream_t *)&server->socket.pipe.handle; - } - - stream->data = server; - - if (result == 0) { - result = uv_listen((uv_stream_t *)&server->socket.tcp.handle, - MAX_CONNECTIONS, - connection_cb); - } - - assert(result <= 0); // libuv should have returned -errno or zero. + int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb); if (result < 0) { - if (result == -EACCES) { - // Libuv converts ENOENT to EACCES for Windows compatibility, but if - // the parent directory does not exist, ENOENT would be more accurate. - *path_tail((char_u *) addr) = NUL; - if (!os_file_exists((char_u *) addr)) { - result = -ENOENT; - } - } - uv_close((uv_handle_t *)stream, free_server); ELOG("Failed to start server: %s", uv_strerror(result)); + socket_watcher_close(watcher, free_server); return result; } // Update $NVIM_LISTEN_ADDRESS, if not set. const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); if (listen_address == NULL) { - os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1); + os_setenv(LISTEN_ADDRESS_ENV_VAR, watcher->addr, 1); } - server->type = server_type; - - // Add the server to the list. - ga_grow(&servers, 1); - ((Server **)servers.ga_data)[servers.ga_len++] = server; + // Add the watcher to the list. + ga_grow(&watchers, 1); + ((SocketWatcher **)watchers.ga_data)[watchers.ga_len++] = watcher; // Update v:servername, if not set. if (STRLEN(get_vim_var_str(VV_SEND_SERVER)) == 0) { - set_vservername(&servers); + set_vservername(&watchers); } return 0; @@ -240,21 +123,21 @@ int server_start(const char *endpoint) /// @param endpoint Address of the server. void server_stop(char *endpoint) { - Server *server; + SocketWatcher *watcher; char addr[ADDRESS_MAX_SIZE]; // Trim to `ADDRESS_MAX_SIZE` xstrlcpy(addr, endpoint, sizeof(addr)); int i = 0; // Index of the server whose address equals addr. - for (; i < servers.ga_len; i++) { - server = ((Server **)servers.ga_data)[i]; - if (strcmp(addr, server->addr) == 0) { + for (; i < watchers.ga_len; i++) { + watcher = ((SocketWatcher **)watchers.ga_data)[i]; + if (strcmp(addr, watcher->addr) == 0) { break; } } - if (i >= servers.ga_len) { + if (i >= watchers.ga_len) { ELOG("Not listening on %s", addr); return; } @@ -265,18 +148,18 @@ void server_stop(char *endpoint) os_unsetenv(LISTEN_ADDRESS_ENV_VAR); } - uv_close(server_handle(server), free_server); + socket_watcher_close(watcher, free_server); // Remove this server from the list by swapping it with the last item. - if (i != servers.ga_len - 1) { - ((Server **)servers.ga_data)[i] = - ((Server **)servers.ga_data)[servers.ga_len - 1]; + if (i != watchers.ga_len - 1) { + ((SocketWatcher **)watchers.ga_data)[i] = + ((SocketWatcher **)watchers.ga_data)[watchers.ga_len - 1]; } - servers.ga_len--; + watchers.ga_len--; // If v:servername is the stopped address, re-initialize it. if (STRCMP(addr, get_vim_var_str(VV_SEND_SERVER)) == 0) { - set_vservername(&servers); + set_vservername(&watchers); } } @@ -285,52 +168,28 @@ void server_stop(char *endpoint) char **server_address_list(size_t *size) FUNC_ATTR_NONNULL_ALL { - if ((*size = (size_t) servers.ga_len) == 0) { + if ((*size = (size_t)watchers.ga_len) == 0) { return NULL; } - char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char *)); - for (int i = 0; i < servers.ga_len; i++) { - addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr); + char **addrs = xcalloc((size_t)watchers.ga_len, sizeof(const char *)); + for (int i = 0; i < watchers.ga_len; i++) { + addrs[i] = xstrdup(((SocketWatcher **)watchers.ga_data)[i]->addr); } return addrs; } -static void connection_cb(uv_stream_t *server, int status) +static void connection_cb(SocketWatcher *watcher, int result, void *data) { - int result; - uv_stream_t *client; - Server *srv = server->data; - - if (status < 0) { - abort(); - } - - if (srv->type == kServerTypeTcp) { - client = xmalloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client); - } else { - client = xmalloc(sizeof(uv_pipe_t)); - uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0); - } - - result = uv_accept(server, client); - if (result) { ELOG("Failed to accept connection: %s", uv_strerror(result)); - uv_close((uv_handle_t *)client, free_client); return; } - channel_from_stream(client); -} - -static void free_client(uv_handle_t *handle) -{ - xfree(handle); + channel_from_connection(watcher); } -static void free_server(uv_handle_t *handle) +static void free_server(SocketWatcher *watcher, void *data) { - xfree(handle->data); + xfree(watcher); } |