aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r--src/nvim/msgpack_rpc/channel.c75
-rw-r--r--src/nvim/msgpack_rpc/helpers.c69
-rw-r--r--src/nvim/msgpack_rpc/remote_ui.c4
-rw-r--r--src/nvim/msgpack_rpc/server.c126
4 files changed, 188 insertions, 86 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 35549ce042..b671b8b545 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -5,8 +5,6 @@
#include <uv.h>
#include <msgpack.h>
-#include "nvim/lib/klist.h"
-
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/msgpack_rpc/channel.h"
@@ -69,10 +67,6 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
-#define _noop(x)
-KMEMPOOL_INIT(RequestEventPool, RequestEvent, _noop)
-static kmempool_t(RequestEventPool) *request_event_pool = NULL;
-
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
@@ -85,7 +79,6 @@ static msgpack_sbuffer out_buffer;
/// Initializes the module
void channel_init(void)
{
- request_event_pool = kmp_init(RequestEventPool);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@@ -232,7 +225,25 @@ Object channel_send_call(uint64_t id,
channel->pending_requests--;
if (frame.errored) {
- api_set_error(err, Exception, "%s", frame.result.data.string.data);
+ if (frame.result.type == kObjectTypeString) {
+ api_set_error(err, Exception, "%s", frame.result.data.string.data);
+ } else if (frame.result.type == kObjectTypeArray) {
+ // Should be an error in the form [type, message]
+ Array array = frame.result.data.array;
+ if (array.size == 2 && array.items[0].type == kObjectTypeInteger
+ && (array.items[0].data.integer == kErrorTypeException
+ || array.items[0].data.integer == kErrorTypeValidation)
+ && array.items[1].type == kObjectTypeString) {
+ err->type = (ErrorType) array.items[0].data.integer;
+ xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg));
+ err->set = true;
+ } else {
+ api_set_error(err, Exception, "%s", "unknown error");
+ }
+ } else {
+ api_set_error(err, Exception, "%s", "unknown error");
+ }
+
api_free_object(frame.result);
}
@@ -442,20 +453,20 @@ static void handle_request(Channel *channel, msgpack_object *request)
// Retrieve the request handler
MsgpackRpcRequestHandler handler;
- msgpack_object method = request->via.array.ptr[2];
+ msgpack_object *method = msgpack_rpc_method(request);
- if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
- handler = msgpack_rpc_get_handler_for(method.via.bin.ptr,
- method.via.bin.size);
+ if (method) {
+ handler = msgpack_rpc_get_handler_for(method->via.bin.ptr,
+ method->via.bin.size);
} else {
handler.fn = msgpack_rpc_handle_missing_method;
handler.defer = false;
}
Array args = ARRAY_DICT_INIT;
- msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
+ msgpack_rpc_to_array(msgpack_rpc_args(request), &args);
bool defer = (!kv_size(channel->call_stack) && handler.defer);
- RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
+ RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
@@ -476,18 +487,22 @@ static void on_request_event(Event event)
uint64_t request_id = e->request_id;
Error error = ERROR_INIT;
Object result = handler.fn(channel->id, request_id, args, &error);
- // send the response
- msgpack_packer response;
- msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
- channel_write(channel, serialize_response(channel->id,
- request_id,
- &error,
- result,
- &out_buffer));
+ if (request_id != NO_RESPONSE) {
+ // send the response
+ msgpack_packer response;
+ msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
+ channel_write(channel, serialize_response(channel->id,
+ request_id,
+ &error,
+ result,
+ &out_buffer));
+ } else {
+ api_free_object(result);
+ }
// All arguments were freed already, but we still need to free the array
- free(args.items);
+ xfree(args.items);
decref(channel);
- kmp_free(RequestEventPool, request_event_pool, e);
+ xfree(e);
}
static bool channel_write(Channel *channel, WBuffer *buffer)
@@ -608,7 +623,7 @@ static void unsubscribe(Channel *channel, char *event)
// Since the string is no longer used by other channels, release it's memory
pmap_del(cstr_t)(event_strings, event_string);
- free(event_string);
+ xfree(event_string);
}
/// Close the channel streams/job and free the channel resources.
@@ -662,13 +677,13 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
- free(channel);
+ xfree(channel);
}
static void close_cb(uv_handle_t *handle)
{
- free(handle->data);
- free(handle);
+ xfree(handle->data);
+ xfree(handle);
}
static Channel *register_channel(void)
@@ -745,7 +760,7 @@ static WBuffer *serialize_request(uint64_t channel_id,
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
refcount,
- free);
+ xfree);
msgpack_sbuffer_clear(sbuffer);
api_free_array(args);
return rv;
@@ -764,7 +779,7 @@ static WBuffer *serialize_response(uint64_t channel_id,
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
1, // responses only go though 1 channel
- free);
+ xfree);
msgpack_sbuffer_clear(sbuffer);
api_free_object(arg);
return rv;
diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c
index 355176aa5f..7d0db9a9b8 100644
--- a/src/nvim/msgpack_rpc/helpers.c
+++ b/src/nvim/msgpack_rpc/helpers.c
@@ -351,49 +351,86 @@ void msgpack_rpc_serialize_response(uint64_t response_id,
}
}
+static bool msgpack_rpc_is_notification(msgpack_object *req)
+{
+ return req->via.array.ptr[0].via.u64 == 2;
+}
+
+msgpack_object *msgpack_rpc_method(msgpack_object *req)
+{
+ msgpack_object *obj = req->via.array.ptr
+ + (msgpack_rpc_is_notification(req) ? 1 : 2);
+ return obj->type == MSGPACK_OBJECT_STR || obj->type == MSGPACK_OBJECT_BIN ?
+ obj : NULL;
+}
+
+msgpack_object *msgpack_rpc_args(msgpack_object *req)
+{
+ msgpack_object *obj = req->via.array.ptr
+ + (msgpack_rpc_is_notification(req) ? 2 : 3);
+ return obj->type == MSGPACK_OBJECT_ARRAY ? obj : NULL;
+}
+
+static msgpack_object *msgpack_rpc_msg_id(msgpack_object *req)
+{
+ if (msgpack_rpc_is_notification(req)) {
+ return NULL;
+ }
+ msgpack_object *obj = &req->via.array.ptr[1];
+ return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ? obj : NULL;
+}
+
void msgpack_rpc_validate(uint64_t *response_id,
msgpack_object *req,
Error *err)
{
// response id not known yet
- *response_id = 0;
+ *response_id = NO_RESPONSE;
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
- api_set_error(err, Validation, _("Request is not an array"));
+ api_set_error(err, Validation, _("Message is not an array"));
return;
}
- if (req->via.array.size != 4) {
- api_set_error(err, Validation, _("Request array size should be 4"));
+ if (req->via.array.size == 0) {
+ api_set_error(err, Validation, _("Message is empty"));
return;
}
- if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Id must be a positive integer"));
+ if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ api_set_error(err, Validation, _("Message type must be an integer"));
return;
}
- // Set the response id, which is the same as the request
- *response_id = req->via.array.ptr[1].via.u64;
-
- if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Message type must be an integer"));
+ uint64_t type = req->via.array.ptr[0].via.u64;
+ if (type != kMessageTypeRequest && type != kMessageTypeNotification) {
+ api_set_error(err, Validation, _("Unknown message type"));
return;
}
- if (req->via.array.ptr[0].via.u64 != 0) {
- api_set_error(err, Validation, _("Message type must be 0"));
+ if ((type == kMessageTypeRequest && req->via.array.size != 4) ||
+ (type == kMessageTypeNotification && req->via.array.size != 3)) {
+ api_set_error(err, Validation, _("Request array size should be 4 (request) "
+ "or 3 (notification)"));
return;
}
- if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
- && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
+ if (type == kMessageTypeRequest) {
+ msgpack_object *id_obj = msgpack_rpc_msg_id(req);
+ if (!id_obj) {
+ api_set_error(err, Validation, _("ID must be a positive integer"));
+ return;
+ }
+ *response_id = id_obj->via.u64;
+ }
+
+ if (!msgpack_rpc_method(req)) {
api_set_error(err, Validation, _("Method must be a string"));
return;
}
- if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
+ if (!msgpack_rpc_args(req)) {
api_set_error(err, Validation, _("Parameters must be an array"));
return;
}
diff --git a/src/nvim/msgpack_rpc/remote_ui.c b/src/nvim/msgpack_rpc/remote_ui.c
index b554d76bed..07d78dd9d7 100644
--- a/src/nvim/msgpack_rpc/remote_ui.c
+++ b/src/nvim/msgpack_rpc/remote_ui.c
@@ -48,9 +48,9 @@ void remote_ui_disconnect(uint64_t channel_id)
// destroy pending screen updates
api_free_array(data->buffer);
pmap_del(uint64_t)(connected_uis, channel_id);
- free(ui->data);
+ xfree(ui->data);
ui_detach(ui);
- free(ui);
+ xfree(ui);
}
static Object remote_ui_attach(uint64_t channel_id, uint64_t request_id,
diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c
index 91aca0c37a..8fb2902b0b 100644
--- a/src/nvim/msgpack_rpc/server.c
+++ b/src/nvim/msgpack_rpc/server.c
@@ -9,11 +9,11 @@
#include "nvim/msgpack_rpc/server.h"
#include "nvim/os/os.h"
#include "nvim/ascii.h"
+#include "nvim/garray.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/log.h"
#include "nvim/tempfile.h"
-#include "nvim/map.h"
#include "nvim/path.h"
#define MAX_CONNECTIONS 32
@@ -27,6 +27,9 @@ typedef enum {
} ServerType;
typedef struct {
+ // The address of a pipe, or string value of a tcp address.
+ char addr[ADDRESS_MAX_SIZE];
+
// Type of the union below
ServerType type;
@@ -38,12 +41,11 @@ typedef struct {
} tcp;
struct {
uv_pipe_t handle;
- char addr[ADDRESS_MAX_SIZE];
} pipe;
} socket;
} Server;
-static PMap(cstr_t) *servers = NULL;
+static garray_T servers = GA_EMPTY_INIT_VALUE;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/server.c.generated.h"
@@ -52,33 +54,40 @@ static PMap(cstr_t) *servers = NULL;
/// Initializes the module
bool server_init(void)
{
- servers = pmap_new(cstr_t)();
+ ga_init(&servers, sizeof(Server *), 1);
- if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) {
- char *listen_address = (char *)vim_tempname();
- os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1);
- free(listen_address);
+ bool must_free = false;
+ const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
+ if (listen_address == NULL || *listen_address == NUL) {
+ must_free = true;
+ listen_address = (char *)vim_tempname();
}
- return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0;
+ bool ok = (server_start(listen_address) == 0);
+ if (must_free) {
+ xfree((char *) listen_address);
+ }
+ return ok;
}
-/// Teardown the server module
-void server_teardown(void)
+/// Retrieve the file handle from a server.
+static uv_handle_t *server_handle(Server *server)
{
- if (!servers) {
- return;
- }
+ return server->type == kServerTypeTcp
+ ? (uv_handle_t *)&server->socket.tcp.handle
+ : (uv_handle_t *) &server->socket.pipe.handle;
+}
- Server *server;
+/// Teardown a single server
+static void server_close_cb(Server **server)
+{
+ uv_close(server_handle(*server), free_server);
+}
- map_foreach_value(servers, server, {
- if (server->type == kServerTypeTcp) {
- uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
- } else {
- uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
- }
- });
+/// Teardown the server module
+void server_teardown(void)
+{
+ GA_DEEP_CLEAR(&servers, Server *, server_close_cb);
}
/// Starts listening on arbitrary tcp/unix addresses specified by
@@ -106,9 +115,11 @@ int server_start(const char *endpoint)
}
// Check if the server already exists
- if (pmap_has(cstr_t)(servers, addr)) {
- ELOG("Already listening on %s", addr);
- return 1;
+ for (int i = 0; i < servers.ga_len; i++) {
+ if (strcmp(addr, ((Server **)servers.ga_data)[i]->addr) == 0) {
+ ELOG("Already listening on %s", addr);
+ return 1;
+ }
}
ServerType server_type = kServerTypeTcp;
@@ -154,6 +165,8 @@ int server_start(const char *endpoint)
int result;
uv_stream_t *stream = NULL;
+ xstrlcpy(server->addr, addr, sizeof(server->addr));
+
if (server_type == kServerTypeTcp) {
// Listen on tcp address/port
uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle);
@@ -163,10 +176,8 @@ int server_start(const char *endpoint)
stream = (uv_stream_t *)&server->socket.tcp.handle;
} else {
// Listen on named pipe or unix socket
- xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr));
uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
- result = uv_pipe_bind(&server->socket.pipe.handle,
- server->socket.pipe.addr);
+ result = uv_pipe_bind(&server->socket.pipe.handle, server->addr);
stream = (uv_stream_t *)&server->socket.pipe.handle;
}
@@ -193,9 +204,17 @@ int server_start(const char *endpoint)
return result;
}
+ // Update $NVIM_LISTEN_ADDRESS, if not set.
+ const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
+ if (listen_address == NULL || *listen_address == NUL) {
+ os_setenv(LISTEN_ADDRESS_ENV_VAR, addr, 1);
+ }
+
server->type = server_type;
- // Add the server to the hash table
- pmap_put(cstr_t)(servers, addr, server);
+
+ // Add the server to the list.
+ ga_grow(&servers, 1);
+ ((Server **)servers.ga_data)[servers.ga_len++] = server;
return 0;
}
@@ -211,18 +230,49 @@ void server_stop(char *endpoint)
// Trim to `ADDRESS_MAX_SIZE`
xstrlcpy(addr, endpoint, sizeof(addr));
- if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) {
+ int i = 0; // The index of the server whose address equals addr.
+ for (; i < servers.ga_len; i++) {
+ server = ((Server **)servers.ga_data)[i];
+ if (strcmp(addr, server->addr) == 0) {
+ break;
+ }
+ }
+
+ if (i == servers.ga_len) {
ELOG("Not listening on %s", addr);
return;
}
- if (server->type == kServerTypeTcp) {
- uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
- } else {
- uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
+ // If we are invalidating the listen address, unset it.
+ const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR);
+ if (listen_address && strcmp(addr, listen_address) == 0) {
+ os_unsetenv(LISTEN_ADDRESS_ENV_VAR);
}
- pmap_del(cstr_t)(servers, addr);
+ uv_close(server_handle(server), free_server);
+
+ // Remove this server from the list by swapping it with the last item.
+ if (i != servers.ga_len - 1) {
+ ((Server **)servers.ga_data)[i] =
+ ((Server **)servers.ga_data)[servers.ga_len - 1];
+ }
+ servers.ga_len--;
+}
+
+/// Returns an allocated array of server addresses.
+/// @param[out] size The size of the returned array.
+char **server_address_list(size_t *size)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if ((*size = (size_t) servers.ga_len) == 0) {
+ return NULL;
+ }
+
+ char **addrs = xcalloc((size_t) servers.ga_len, sizeof(const char **));
+ for (int i = 0; i < servers.ga_len; i++) {
+ addrs[i] = xstrdup(((Server **)servers.ga_data)[i]->addr);
+ }
+ return addrs;
}
static void connection_cb(uv_stream_t *server, int status)
@@ -256,10 +306,10 @@ static void connection_cb(uv_stream_t *server, int status)
static void free_client(uv_handle_t *handle)
{
- free(handle);
+ xfree(handle);
}
static void free_server(uv_handle_t *handle)
{
- free(handle->data);
+ xfree(handle->data);
}