diff options
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 75 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 69 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/remote_ui.c | 4 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/server.c | 126 |
4 files changed, 188 insertions, 86 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 35549ce042..b671b8b545 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -5,8 +5,6 @@ #include <uv.h> #include <msgpack.h> -#include "nvim/lib/klist.h" - #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/msgpack_rpc/channel.h" @@ -69,10 +67,6 @@ typedef struct { uint64_t request_id; } RequestEvent; -#define _noop(x) -KMEMPOOL_INIT(RequestEventPool, RequestEvent, _noop) -static kmempool_t(RequestEventPool) *request_event_pool = NULL; - static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; @@ -85,7 +79,6 @@ static msgpack_sbuffer out_buffer; /// Initializes the module void channel_init(void) { - request_event_pool = kmp_init(RequestEventPool); channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); @@ -232,7 +225,25 @@ Object channel_send_call(uint64_t id, channel->pending_requests--; if (frame.errored) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); + if (frame.result.type == kObjectTypeString) { + api_set_error(err, Exception, "%s", frame.result.data.string.data); + } else if (frame.result.type == kObjectTypeArray) { + // Should be an error in the form [type, message] + Array array = frame.result.data.array; + if (array.size == 2 && array.items[0].type == kObjectTypeInteger + && (array.items[0].data.integer == kErrorTypeException + || array.items[0].data.integer == kErrorTypeValidation) + && array.items[1].type == kObjectTypeString) { + err->type = (ErrorType) array.items[0].data.integer; + xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg)); + err->set = true; + } else { + api_set_error(err, Exception, "%s", "unknown error"); + } + } else { + api_set_error(err, Exception, "%s", "unknown error"); + } + api_free_object(frame.result); } @@ -442,20 +453,20 @@ static void handle_request(Channel *channel, msgpack_object *request) // Retrieve the request handler MsgpackRpcRequestHandler handler; - msgpack_object method = request->via.array.ptr[2]; + msgpack_object *method = msgpack_rpc_method(request); - if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) { - handler = msgpack_rpc_get_handler_for(method.via.bin.ptr, - method.via.bin.size); + if (method) { + handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, + method->via.bin.size); } else { handler.fn = msgpack_rpc_handle_missing_method; handler.defer = false; } Array args = ARRAY_DICT_INIT; - msgpack_rpc_to_array(request->via.array.ptr + 3, &args); + msgpack_rpc_to_array(msgpack_rpc_args(request), &args); bool defer = (!kv_size(channel->call_stack) && handler.defer); - RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool); + RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; event_data->args = args; @@ -476,18 +487,22 @@ static void on_request_event(Event event) uint64_t request_id = e->request_id; Error error = ERROR_INIT; Object result = handler.fn(channel->id, request_id, args, &error); - // send the response - msgpack_packer response; - msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); - channel_write(channel, serialize_response(channel->id, - request_id, - &error, - result, - &out_buffer)); + if (request_id != NO_RESPONSE) { + // send the response + msgpack_packer response; + msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); + channel_write(channel, serialize_response(channel->id, + request_id, + &error, + result, + &out_buffer)); + } else { + api_free_object(result); + } // All arguments were freed already, but we still need to free the array - free(args.items); + xfree(args.items); decref(channel); - kmp_free(RequestEventPool, request_event_pool, e); + xfree(e); } static bool channel_write(Channel *channel, WBuffer *buffer) @@ -608,7 +623,7 @@ static void unsubscribe(Channel *channel, char *event) // Since the string is no longer used by other channels, release it's memory pmap_del(cstr_t)(event_strings, event_string); - free(event_string); + xfree(event_string); } /// Close the channel streams/job and free the channel resources. @@ -662,13 +677,13 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); - free(channel); + xfree(channel); } static void close_cb(uv_handle_t *handle) { - free(handle->data); - free(handle); + xfree(handle->data); + xfree(handle); } static Channel *register_channel(void) @@ -745,7 +760,7 @@ static WBuffer *serialize_request(uint64_t channel_id, WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), sbuffer->size, refcount, - free); + xfree); msgpack_sbuffer_clear(sbuffer); api_free_array(args); return rv; @@ -764,7 +779,7 @@ static WBuffer *serialize_response(uint64_t channel_id, WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), sbuffer->size, 1, // responses only go though 1 channel - free); + xfree); msgpack_sbuffer_clear(sbuffer); api_free_object(arg); return rv; diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 355176aa5f..7d0db9a9b8 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -351,49 +351,86 @@ void msgpack_rpc_serialize_response(uint64_t response_id, } } +static bool msgpack_rpc_is_notification(msgpack_object *req) +{ + return req->via.array.ptr[0].via.u64 == 2; +} + +msgpack_object *msgpack_rpc_method(msgpack_object *req) +{ + msgpack_object *obj = req->via.array.ptr + + (msgpack_rpc_is_notification(req) ? 1 : 2); + return obj->type == MSGPACK_OBJECT_STR || obj->type == MSGPACK_OBJECT_BIN ? + obj : NULL; +} + +msgpack_object *msgpack_rpc_args(msgpack_object *req) +{ + msgpack_object *obj = req->via.array.ptr + + (msgpack_rpc_is_notification(req) ? 2 : 3); + return obj->type == MSGPACK_OBJECT_ARRAY ? obj : NULL; +} + +static msgpack_object *msgpack_rpc_msg_id(msgpack_object *req) +{ + if (msgpack_rpc_is_notification(req)) { + return NULL; + } + msgpack_object *obj = &req->via.array.ptr[1]; + return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ? obj : NULL; +} + void msgpack_rpc_validate(uint64_t *response_id, msgpack_object *req, Error *err) { // response id not known yet - *response_id = 0; + *response_id = NO_RESPONSE; // Validate the basic structure of the msgpack-rpc payload if (req->type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Request is not an array")); + api_set_error(err, Validation, _("Message is not an array")); return; } - if (req->via.array.size != 4) { - api_set_error(err, Validation, _("Request array size should be 4")); + if (req->via.array.size == 0) { + api_set_error(err, Validation, _("Message is empty")); return; } - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Id must be a positive integer")); + if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Message type must be an integer")); return; } - // Set the response id, which is the same as the request - *response_id = req->via.array.ptr[1].via.u64; - - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Message type must be an integer")); + uint64_t type = req->via.array.ptr[0].via.u64; + if (type != kMessageTypeRequest && type != kMessageTypeNotification) { + api_set_error(err, Validation, _("Unknown message type")); return; } - if (req->via.array.ptr[0].via.u64 != 0) { - api_set_error(err, Validation, _("Message type must be 0")); + if ((type == kMessageTypeRequest && req->via.array.size != 4) || + (type == kMessageTypeNotification && req->via.array.size != 3)) { + api_set_error(err, Validation, _("Request array size should be 4 (request) " + "or 3 (notification)")); return; } - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN - && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { + if (type == kMessageTypeRequest) { + msgpack_object *id_obj = msgpack_rpc_msg_id(req); + if (!id_obj) { + api_set_error(err, Validation, _("ID must be a positive integer")); + return; + } + *response_id = id_obj->via.u64; + } + + if (!msgpack_rpc_method(req)) { api_set_error(err, Validation, _("Method must be a string")); return; } - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { + if (!msgpack_rpc_args(req)) { api_set_error(err, Validation, _("Parameters must be an array")); return; } diff --git a/src/nvim/msgpack_rpc/remote_ui.c b/src/nvim/msgpack_rpc/remote_ui.c index b554d76bed..07d78dd9d7 100644 --- a/src/nvim/msgpack_rpc/remote_ui.c +++ b/src/nvim/msgpack_rpc/remote_ui.c @@ -48,9 +48,9 @@ void remote_ui_disconnect(uint64_t channel_id) // destroy pending screen updates api_free_array(data->buffer); pmap_del(uint64_t)(connected_uis, channel_id); - free(ui->data); + xfree(ui->data); ui_detach(ui); - free(ui); + xfree(ui); } static Object remote_ui_attach(uint64_t channel_id, uint64_t request_id, diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index 91aca0c37a..8fb2902b0b 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -9,11 +9,11 @@ #include "nvim/msgpack_rpc/server.h" #include "nvim/os/os.h" #include "nvim/ascii.h" +#include "nvim/garray.h" #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/log.h" #include "nvim/tempfile.h" -#include "nvim/map.h" #include "nvim/path.h" #define MAX_CONNECTIONS 32 @@ -27,6 +27,9 @@ typedef enum { } ServerType; typedef struct { + // The address of a pipe, or string value of a tcp address. + char addr[ADDRESS_MAX_SIZE]; + // Type of the union below ServerType type; @@ -38,12 +41,11 @@ typedef struct { } tcp; struct { uv_pipe_t handle; - char addr[ADDRESS_MAX_SIZE]; } pipe; } socket; } Server; -static PMap(cstr_t) *servers = NULL; +static garray_T servers = GA_EMPTY_INIT_VALUE; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/server.c.generated.h" @@ -52,33 +54,40 @@ static PMap(cstr_t) *servers = NULL; /// Initializes the module bool server_init(void) { - servers = pmap_new(cstr_t)(); + ga_init(&servers, sizeof(Server *), 1); - if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) { - char *listen_address = (char *)vim_tempname(); - os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1); - free(listen_address); + bool must_free = false; + const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); + if (listen_address == NULL || *listen_address == NUL) { + must_free = true; + listen_address = (char *)vim_tempname(); } - return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0; + bool ok = (server_start(listen_address) == 0); + if (must_free) { + xfree((char *) listen_address); + } + return ok; } -/// Teardown the server module -void server_teardown(void) +/// Retrieve the file handle from a server. +static uv_handle_t *server_handle(Server *server) { - if (!servers) { - return; - } + return server->type == kServerTypeTcp + ? (uv_handle_t *)&server->socket.tcp.handle + : (uv_handle_t *) &server->socket.pipe.handle; +} - Server *server; +/// Teardown a single server +static void server_close_cb(Server **server) +{ + uv_close(server_handle(*server), free_server); +} - map_foreach_value(servers, server, { - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); - } - }); +/// Teardown the server module +void server_teardown(void) +{ + GA_DEEP_CLEAR(&servers, Server *, server_close_cb); } /// Starts listening on arbitrary tcp/unix addresses specified by @@ -106,9 +115,11 @@ int server_start(const char *endpoint) } // Check if the server already exists - if (pmap_has(cstr_t)(servers, addr)) { - ELOG("Already listening on %s", addr); - return 1; + for (int i = 0; i < servers.ga_len; i++) { + if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) { + ELOG("Already listening on %s", addr); + return 1; + } } ServerType server_type = kServerTypeTcp; @@ -154,6 +165,8 @@ int server_start(const char *endpoint) int result; uv_stream_t *stream = NULL; + xstrlcpy(server->addr, addr, sizeof(server->addr)); + if (server_type == kServerTypeTcp) { // Listen on tcp address/port uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle); @@ -163,10 +176,8 @@ int server_start(const char *endpoint) stream = (uv_stream_t *)&server->socket.tcp.handle; } else { // Listen on named pipe or unix socket - xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr)); uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0); - result = uv_pipe_bind(&server->socket.pipe.handle, - server->socket.pipe.addr); + result = uv_pipe_bind(&server->socket.pipe.handle, server->addr); stream = (uv_stream_t *)&server->socket.pipe.handle; } @@ -193,9 +204,17 @@ int server_start(const char *endpoint) return result; } + // Update $NVIM_LISTEN_ADDRESS, if not set. + const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); + if (listen_address == NULL || *listen_address == NUL) { + os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1); + } + server->type = server_type; - // Add the server to the hash table - pmap_put(cstr_t)(servers, addr, server); + + // Add the server to the list. + ga_grow(&servers, 1); + ((Server **)servers.ga_data)[servers.ga_len++] = server; return 0; } @@ -211,18 +230,49 @@ void server_stop(char *endpoint) // Trim to `ADDRESS_MAX_SIZE` xstrlcpy(addr, endpoint, sizeof(addr)); - if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) { + int i = 0; // The index of the server whose address equals addr. + for (; i < servers.ga_len; i++) { + server = ((Server **)servers.ga_data)[i]; + if (strcmp(addr, server->addr) == 0) { + break; + } + } + + if (i == servers.ga_len) { ELOG("Not listening on %s", addr); return; } - if (server->type == kServerTypeTcp) { - uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server); - } else { - uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server); + // If we are invalidating the listen address, unset it. + const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); + if (listen_address && strcmp(addr, listen_address) == 0) { + os_unsetenv(LISTEN_ADDRESS_ENV_VAR); } - pmap_del(cstr_t)(servers, addr); + uv_close(server_handle(server), free_server); + + // Remove this server from the list by swapping it with the last item. + if (i != servers.ga_len - 1) { + ((Server **)servers.ga_data)[i] = + ((Server **)servers.ga_data)[servers.ga_len - 1]; + } + servers.ga_len--; +} + +/// Returns an allocated array of server addresses. +/// @param[out] size The size of the returned array. +char **server_address_list(size_t *size) + FUNC_ATTR_NONNULL_ALL +{ + if ((*size = (size_t) servers.ga_len) == 0) { + return NULL; + } + + char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char **)); + for (int i = 0; i < servers.ga_len; i++) { + addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr); + } + return addrs; } static void connection_cb(uv_stream_t *server, int status) @@ -256,10 +306,10 @@ static void connection_cb(uv_stream_t *server, int status) static void free_client(uv_handle_t *handle) { - free(handle); + xfree(handle); } static void free_server(uv_handle_t *handle) { - free(handle->data); + xfree(handle->data); } |