aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os')
-rw-r--r--src/nvim/os/channel.c131
-rw-r--r--src/nvim/os/channel.h9
-rw-r--r--src/nvim/os/msgpack_rpc.c33
-rw-r--r--src/nvim/os/msgpack_rpc.h118
-rw-r--r--src/nvim/os/server.c4
-rw-r--r--src/nvim/os/server.h2
6 files changed, 222 insertions, 75 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 10766ca76e..f275c70805 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -3,7 +3,7 @@
#include <uv.h>
#include <msgpack.h>
-#include "nvim/lib/klist.h"
+#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
#include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h"
@@ -15,8 +15,10 @@
#include "nvim/os/msgpack_rpc.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
+#include "nvim/map.h"
typedef struct {
+ uint64_t id;
ChannelProtocol protocol;
bool is_job;
union {
@@ -30,22 +32,26 @@ typedef struct {
struct {
RStream *read;
WStream *write;
+ uv_stream_t *uv;
} streams;
} data;
} Channel;
-#define _destroy_channel(x)
+static uint64_t next_id = 1;
+static Map(uint64_t) *channels = NULL;
+static msgpack_sbuffer msgpack_event_buffer;
-KLIST_INIT(Channel, Channel *, _destroy_channel)
-
-static klist_t(Channel) *channels = NULL;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
+static void send_msgpack(Channel *channel, String type, Object data);
+static void close_channel(Channel *channel);
+static void close_cb(uv_handle_t *handle);
void channel_init()
{
- channels = kl_init(Channel);
+ channels = map_new(uint64_t)();
+ msgpack_sbuffer_init(&msgpack_event_buffer);
}
void channel_teardown()
@@ -56,24 +62,9 @@ void channel_teardown()
Channel *channel;
- while (kl_shift(Channel, channels, &channel) == 0) {
-
- switch (channel->protocol) {
- case kChannelProtocolMsgpack:
- msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
- msgpack_unpacker_free(channel->proto.msgpack.unpacker);
- break;
- default:
- abort();
- }
-
- if (channel->is_job) {
- job_stop(channel->data.job_id);
- } else {
- rstream_free(channel->data.streams.read);
- wstream_free(channel->data.streams.write);
- }
- }
+ map_foreach_value(channels, channel, {
+ close_channel(channel);
+ });
}
void channel_from_job(char **argv, ChannelProtocol prot)
@@ -92,10 +83,11 @@ void channel_from_job(char **argv, ChannelProtocol prot)
abort();
}
+ channel->id = next_id++;
channel->protocol = prot;
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
- *kl_pushp(Channel, channels) = channel;
+ map_put(uint64_t)(channels, channel->id, channel);
}
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
@@ -115,6 +107,7 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
}
stream->data = NULL;
+ channel->id = next_id++;
channel->protocol = prot;
channel->is_job = false;
// read stream
@@ -124,8 +117,32 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
// write stream
channel->data.streams.write = wstream_new(1024 * 1024);
wstream_set_stream(channel->data.streams.write, stream);
- // push to channel list
- *kl_pushp(Channel, channels) = channel;
+ channel->data.streams.uv = stream;
+ map_put(uint64_t)(channels, channel->id, channel);
+}
+
+bool channel_send_event(uint64_t id, char *type, typval_T *data)
+{
+ Channel *channel = map_get(uint64_t)(channels, id);
+
+ if (!channel) {
+ return false;
+ }
+
+ String event_type = {.size = strnlen(type, 1024), .data = type};
+ Object event_data = vim_to_object(data);
+
+ switch (channel->protocol) {
+ case kChannelProtocolMsgpack:
+ send_msgpack(channel, event_type, event_data);
+ break;
+ default:
+ abort();
+ }
+
+ msgpack_rpc_free_object(event_data);
+
+ return true;
}
static void on_job_stdout(RStream *rstream, void *data, bool eof)
@@ -141,8 +158,13 @@ static void on_job_stderr(RStream *rstream, void *data, bool eof)
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
- msgpack_unpacked unpacked;
Channel *channel = data;
+
+ if (eof) {
+ close_channel(channel);
+ return;
+ }
+
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
@@ -152,17 +174,18 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
count);
msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
+ msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
- // Deserialize everything we can.
+ // Deserialize everything we can.
while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) {
- // Each object is a new msgpack-rpc request and requires an empty response
+ // Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
msgpack_packer_init(&response,
channel->proto.msgpack.sbuffer,
msgpack_sbuffer_write);
// Perform the call
- msgpack_rpc_call(&unpacked.data, &response);
+ msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
xmemdup(channel->proto.msgpack.sbuffer->data,
channel->proto.msgpack.sbuffer->size),
@@ -173,3 +196,49 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
}
}
+
+static void send_msgpack(Channel *channel, String type, Object data)
+{
+ msgpack_packer packer;
+ msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
+ msgpack_rpc_notification(type, data, &packer);
+ char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
+
+ wstream_write(channel->data.streams.write,
+ bytes,
+ msgpack_event_buffer.size,
+ true);
+
+ msgpack_sbuffer_clear(&msgpack_event_buffer);
+}
+
+static void close_channel(Channel *channel)
+{
+ map_del(uint64_t)(channels, channel->id);
+
+ switch (channel->protocol) {
+ case kChannelProtocolMsgpack:
+ msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
+ msgpack_unpacker_free(channel->proto.msgpack.unpacker);
+ break;
+ default:
+ abort();
+ }
+
+ if (channel->is_job) {
+ job_stop(channel->data.job_id);
+ } else {
+ rstream_free(channel->data.streams.read);
+ wstream_free(channel->data.streams.write);
+ uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
+ }
+
+ free(channel);
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+ free(handle->data);
+ free(handle);
+}
+
diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h
index 4a3962575d..543b91dd89 100644
--- a/src/nvim/os/channel.h
+++ b/src/nvim/os/channel.h
@@ -3,6 +3,7 @@
#include <uv.h>
+#include "nvim/vim.h"
#include "nvim/os/channel_defs.h"
/// Initializes the module
@@ -25,5 +26,13 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot);
/// @param prot The rpc protocol used
void channel_from_job(char **argv, ChannelProtocol prot);
+/// Sends event/data to channel
+///
+/// @param id The channel id
+/// @param type The event type, an arbitrary string
+/// @param obj The event data
+/// @return True if the data was sent successfully, false otherwise.
+bool channel_send_event(uint64_t id, char *type, typval_T *data);
+
#endif // NVIM_OS_CHANNEL_H
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
index d7ffa6f559..423c5d584d 100644
--- a/src/nvim/os/msgpack_rpc.c
+++ b/src/nvim/os/msgpack_rpc.c
@@ -1,3 +1,6 @@
+#include <stdint.h>
+#include <stdbool.h>
+
#include <msgpack.h>
#include "nvim/os/msgpack_rpc.h"
@@ -52,7 +55,7 @@
free(value.items); \
}
-void msgpack_rpc_call(msgpack_object *req, msgpack_packer *res)
+void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
{
// The initial response structure is the same no matter what happens,
// we set it up here
@@ -107,7 +110,16 @@ void msgpack_rpc_call(msgpack_object *req, msgpack_packer *res)
}
// dispatch the message
- msgpack_rpc_dispatch(req, res);
+ msgpack_rpc_dispatch(id, req, res);
+}
+
+void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac)
+{
+ msgpack_pack_array(pac, 3);
+ msgpack_pack_int(pac, 2);
+ msgpack_pack_raw(pac, type.size);
+ msgpack_pack_raw_body(pac, type.data, type.size);
+ msgpack_rpc_from_object(data, pac);
}
void msgpack_rpc_error(char *msg, msgpack_packer *res)
@@ -147,9 +159,13 @@ bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
{
- arg->data = (char *)obj->via.raw.ptr;
+ if (obj->type != MSGPACK_OBJECT_RAW) {
+ return false;
+ }
+
+ arg->data = xmemdup(obj->via.raw.ptr, obj->via.raw.size);
arg->size = obj->via.raw.size;
- return obj->type == MSGPACK_OBJECT_RAW;
+ return true;
}
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
@@ -328,6 +344,15 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
}
}
+void msgpack_rpc_free_string(String value)
+{
+ if (!value.data) {
+ return;
+ }
+
+ free(value.data);
+}
+
void msgpack_rpc_free_object(Object value)
{
switch (value.type) {
diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h
index a6a909ac1f..4d8d51699b 100644
--- a/src/nvim/os/msgpack_rpc.h
+++ b/src/nvim/os/msgpack_rpc.h
@@ -6,14 +6,25 @@
#include <msgpack.h>
+#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
+/// @param id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
-void msgpack_rpc_call(msgpack_object *req, msgpack_packer *res);
+void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
+
+/// Packs a notification message
+///
+/// @param type The message type, an arbitrary string
+/// @param data The notification data
+/// @param packer Where the notification will be packed to
+void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac)
+ FUNC_ATTR_NONNULL_ARG(3);
/// Dispatches to the actual API function after basic payload validation by
/// `msgpack_rpc_call`. It is responsible for validating/converting arguments
@@ -21,15 +32,20 @@ void msgpack_rpc_call(msgpack_object *req, msgpack_packer *res);
/// The implementation is generated at compile time with metadata extracted
/// from the api/*.h headers,
///
+/// @param id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
-void msgpack_rpc_dispatch(msgpack_object *req, msgpack_packer *res);
+void msgpack_rpc_dispatch(uint64_t id,
+ msgpack_object *req,
+ msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
/// @param res A packer that contains the response
-void msgpack_rpc_error(char *msg, msgpack_packer *res);
+void msgpack_rpc_error(char *msg, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ALL;
/// Functions for validating and converting from msgpack types to C types.
/// These are used by `msgpack_rpc_dispatch` to validate and convert each
@@ -38,21 +54,36 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res);
/// @param obj The object to convert
/// @param[out] arg A pointer to the avalue
/// @return true if the convertion succeeded, false otherwise
-bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg);
-bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg);
-bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg);
-bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg);
-bool msgpack_rpc_to_string(msgpack_object *obj, String *arg);
-bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg);
-bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg);
-bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg);
-bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg);
-bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg);
-bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg);
-bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg);
-bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg);
-bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg);
-bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg);
+bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
+ FUNC_ATTR_NONNULL_ALL;
+bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
+ FUNC_ATTR_NONNULL_ALL;
/// Functions for converting from C types to msgpack types.
/// These are used by `msgpack_rpc_dispatch` to convert return values
@@ -60,28 +91,43 @@ bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg);
///
/// @param result A pointer to the result
/// @param res A packer that contains the response
-void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res);
-void msgpack_rpc_from_integer(Integer result, msgpack_packer *res);
-void msgpack_rpc_from_float(Float result, msgpack_packer *res);
-void msgpack_rpc_from_position(Position result, msgpack_packer *res);
-void msgpack_rpc_from_string(String result, msgpack_packer *res);
-void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res);
-void msgpack_rpc_from_window(Window result, msgpack_packer *res);
-void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res);
-void msgpack_rpc_from_object(Object result, msgpack_packer *res);
-void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res);
-void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res);
-void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res);
-void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res);
-void msgpack_rpc_from_array(Array result, msgpack_packer *res);
-void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res);
+void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_float(Float result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_position(Position result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_string(String result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_window(Window result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_object(Object result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_array(Array result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
+void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ARG(2);
/// Helpers for initializing types that may be freed later
#define msgpack_rpc_init_boolean
#define msgpack_rpc_init_integer
#define msgpack_rpc_init_float
#define msgpack_rpc_init_position
-#define msgpack_rpc_init_string
+#define msgpack_rpc_init_string = STRING_INIT
#define msgpack_rpc_init_buffer
#define msgpack_rpc_init_window
#define msgpack_rpc_init_tabpage
@@ -100,9 +146,7 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res);
#define msgpack_rpc_free_integer(value)
#define msgpack_rpc_free_float(value)
#define msgpack_rpc_free_position(value)
-// Strings are not copied from msgpack and so don't need to be freed(they
-// probably "live" in the msgpack streaming buffer)
-#define msgpack_rpc_free_string(value)
+void msgpack_rpc_free_string(String value);
#define msgpack_rpc_free_buffer(value)
#define msgpack_rpc_free_window(value)
#define msgpack_rpc_free_tabpage(value)
diff --git a/src/nvim/os/server.c b/src/nvim/os/server.c
index b2faa49a86..7b2326556c 100644
--- a/src/nvim/os/server.c
+++ b/src/nvim/os/server.c
@@ -123,8 +123,8 @@ void server_start(char *endpoint, ChannelProtocol prot)
char *port_end;
// Extract the port
port = strtol(ip_end + 1, &port_end, 10);
-
errno = 0;
+
if (errno != 0 || port == 0 || port > 0xffff) {
// Invalid port, treat as named pipe or unix socket
server_type = kServerTypePipe;
@@ -156,7 +156,7 @@ void server_start(char *endpoint, ChannelProtocol prot)
}
} else {
// Listen on named pipe or unix socket
- strcpy(server->socket.pipe.addr, addr);
+ xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr));
uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
server->socket.pipe.handle.data = server;
uv_pipe_bind(&server->socket.pipe.handle, server->socket.pipe.addr);
diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h
index 541746eb5f..73c6bd1fea 100644
--- a/src/nvim/os/server.h
+++ b/src/nvim/os/server.h
@@ -10,7 +10,7 @@ void server_init();
void server_teardown();
/// Starts listening on arbitrary tcp/unix addresses specified by
-/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will
+/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will
/// be determined by parsing `endpoint`: If it's a valid tcp address in the
/// 'ip:port' format, then it will be tcp socket, else it will be a unix
/// socket or named pipe.