aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/msgpack_rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r--src/nvim/msgpack_rpc/channel.c314
-rw-r--r--src/nvim/msgpack_rpc/channel.h5
-rw-r--r--src/nvim/msgpack_rpc/helpers.c86
-rw-r--r--src/nvim/msgpack_rpc/helpers.h7
-rw-r--r--src/nvim/msgpack_rpc/server.c51
-rw-r--r--src/nvim/msgpack_rpc/server.h2
6 files changed, 341 insertions, 124 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 98636263b9..02f3854f47 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <stdbool.h>
#include <string.h>
#include <inttypes.h>
@@ -9,6 +12,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@@ -25,7 +29,9 @@
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
+#include "nvim/path.h"
#include "nvim/lib/kvec.h"
+#include "nvim/os/input.h"
#define CHANNEL_BUFFER_SIZE 0xffff
@@ -37,7 +43,8 @@
typedef enum {
kChannelTypeSocket,
kChannelTypeProc,
- kChannelTypeStdio
+ kChannelTypeStdio,
+ kChannelTypeInternal
} ChannelType;
typedef struct {
@@ -55,7 +62,7 @@ typedef struct {
ChannelType type;
msgpack_unpacker *unpacker;
union {
- Stream stream;
+ Stream stream; // bidirectional (socket)
Process *proc;
struct {
Stream in;
@@ -86,6 +93,7 @@ static msgpack_sbuffer out_buffer;
/// Initializes the module
void channel_init(void)
{
+ ch_before_blocking_events = multiqueue_new_child(main_loop.events);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@@ -109,18 +117,24 @@ void channel_teardown(void)
/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is handled by the job infrastructure.
///
-/// @param argv The argument vector for the process. [consumed]
-/// @return The channel id (> 0), on success.
-/// 0, on error.
-uint64_t channel_from_process(Process *proc, uint64_t id)
+/// @param proc process object
+/// @param id (optional) channel id
+/// @param source description of source function, rplugin name, TCP addr, etc
+///
+/// @return Channel id (> 0), on success. 0, on error.
+uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
{
- Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
+ Channel *channel = register_channel(kChannelTypeProc, id, proc->events,
+ source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack, channel);
+ rstream_start(proc->out, receive_msgpack, channel);
+
+ DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
+ proc->out);
return channel->id;
}
@@ -130,14 +144,48 @@ uint64_t channel_from_process(Process *proc, uint64_t id)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
- Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL,
+ watcher->addr);
socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack, channel);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+
+ DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
+ &channel->data.stream);
+}
+
+/// @param source description of source function, rplugin name, TCP addr, etc
+uint64_t channel_connect(bool tcp, const char *address, int timeout,
+ char *source, const char **error)
+{
+ if (!tcp) {
+ char *path = fix_fname(address);
+ if (server_owns_pipe_address(path)) {
+ // avoid deadlock
+ xfree(path);
+ return channel_create_internal();
+ }
+ xfree(path);
+ }
+
+ Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source);
+ if (!socket_connect(&main_loop, &channel->data.stream,
+ tcp, address, timeout, error)) {
+ decref(channel);
+ return 0;
+ }
+
+ incref(channel); // close channel only after the stream is closed
+ channel->data.stream.internal_close_cb = close_cb;
+ channel->data.stream.internal_data = channel;
+ wstream_init(&channel->data.stream, 0);
+ rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.stream, receive_msgpack, channel);
+ return channel->id;
}
/// Sends event/arguments to channel
@@ -147,7 +195,7 @@ void channel_from_connection(SocketWatcher *watcher)
/// @param name The event name, an arbitrary string
/// @param args Array with event arguments
/// @return True if the event was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *name, Array args)
+bool channel_send_event(uint64_t id, const char *name, Array args)
{
Channel *channel = NULL;
@@ -160,7 +208,7 @@ bool channel_send_event(uint64_t id, char *name, Array args)
if (channel) {
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
- String method = cstr_as_string(name);
+ const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
@@ -182,14 +230,14 @@ bool channel_send_event(uint64_t id, char *name, Array args)
/// @param[out] error True if the return value is an error
/// @return Whatever the remote method returned
Object channel_send_call(uint64_t id,
- char *method_name,
+ const char *method_name,
Array args,
Error *err)
{
Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
- api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
+ api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
api_free_array(args);
return NIL;
}
@@ -209,7 +257,8 @@ Object channel_send_call(uint64_t id,
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
- api_set_error(err, Exception, "%s", frame.result.data.string.data);
+ api_set_error(err, kErrorTypeException, "%s",
+ frame.result.data.string.data);
} else if (frame.result.type == kObjectTypeArray) {
// Should be an error in the form [type, message]
Array array = frame.result.data.array;
@@ -217,14 +266,13 @@ Object channel_send_call(uint64_t id,
&& (array.items[0].data.integer == kErrorTypeException
|| array.items[0].data.integer == kErrorTypeValidation)
&& array.items[1].type == kObjectTypeString) {
- err->type = (ErrorType) array.items[0].data.integer;
- xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg));
- err->set = true;
+ api_set_error(err, (ErrorType)array.items[0].data.integer, "%s",
+ array.items[1].data.string.data);
} else {
- api_set_error(err, Exception, "%s", "unknown error");
+ api_set_error(err, kErrorTypeException, "%s", "unknown error");
}
} else {
- api_set_error(err, Exception, "%s", "unknown error");
+ api_set_error(err, kErrorTypeException, "%s", "unknown error");
}
api_free_object(frame.result);
@@ -292,17 +340,28 @@ bool channel_close(uint64_t id)
return true;
}
-/// Creates an API channel from stdin/stdout. This is used when embedding
-/// Neovim
+/// Creates an API channel from stdin/stdout. Used to embed Nvim.
void channel_from_stdio(void)
{
- Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
+ Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.std.in, parse_msgpack, channel);
+ rstream_start(&channel->data.std.in, receive_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
+
+ DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
+ &channel->data.std.in, &channel->data.std.out);
+}
+
+/// Creates a loopback channel. This is used to avoid deadlock
+/// when an instance connects to its own named pipe.
+uint64_t channel_create_internal(void)
+{
+ Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL);
+ incref(channel); // internal channel lives until process exit
+ return channel->id;
}
void channel_process_exit(uint64_t id, int status)
@@ -313,28 +372,50 @@ void channel_process_exit(uint64_t id, int status)
decref(channel);
}
-static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
- bool eof)
+// rstream.c:read_event() invokes this as stream->read_cb().
+static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
+ void *data, bool eof)
{
Channel *channel = data;
incref(channel);
if (eof) {
close_channel(channel);
- call_set_error(channel, "Channel was closed by the client");
+ char buf[256];
+ snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
+ channel->id);
+ call_set_error(channel, buf, WARN_LOG_LEVEL);
+ goto end;
+ }
+
+ if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
+ || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
+ char buf[256];
+ snprintf(buf, sizeof(buf),
+ "ch %" PRIu64 ": stream closed unexpectedly. "
+ "closing channel",
+ channel->id);
+ call_set_error(channel, buf, WARN_LOG_LEVEL);
goto end;
}
size_t count = rbuffer_size(rbuf);
- DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
- count,
- stream);
+ DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
+ channel->id, count, stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
+ parse_msgpack(channel);
+
+end:
+ decref(channel);
+}
+
+static void parse_msgpack(Channel *channel)
+{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
msgpack_unpack_return result;
@@ -350,17 +431,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
complete_call(&unpacked.data, channel);
} else {
char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Channel %" PRIu64 " returned a response that doesn't have "
- "a matching request id. Ensure the client is properly "
- "synchronized",
+ snprintf(buf, sizeof(buf),
+ "ch %" PRIu64 " returned a response with an unknown request "
+ "id. Ensure the client is properly synchronized",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -378,17 +457,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
// causes for this error(search for 'goto _failed')
//
// A not so uncommon cause for this might be deserializing objects with
- // a high nesting level: msgpack will break when it's internal parse stack
- // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
+ // a high nesting level: msgpack will break when its internal parse stack
+ // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
send_error(channel, 0, "Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- decref(channel);
}
+
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
@@ -396,7 +473,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error);
- if (error.set) {
+ if (ERROR_SET(&error)) {
// Validation failed, send response with error
if (channel_write(channel,
serialize_response(channel->id,
@@ -406,13 +483,13 @@ static void handle_request(Channel *channel, msgpack_object *request)
&out_buffer))) {
char buf[256];
snprintf(buf, sizeof(buf),
- "Channel %" PRIu64 " sent an invalid message, closed.",
+ "ch %" PRIu64 " sent an invalid message, closed.",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
+ api_clear_error(&error);
return;
}
-
// Retrieve the request handler
MsgpackRpcRequestHandler handler;
msgpack_object *method = msgpack_rpc_method(request);
@@ -431,16 +508,24 @@ static void handle_request(Channel *channel, msgpack_object *request)
handler.async = true;
}
- RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
- event_data->channel = channel;
- event_data->handler = handler;
- event_data->args = args;
- event_data->request_id = request_id;
+ RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
+ evdata->channel = channel;
+ evdata->handler = handler;
+ evdata->args = args;
+ evdata->request_id = request_id;
incref(channel);
if (handler.async) {
- on_request_event((void **)&event_data);
+ bool is_get_mode = handler.fn == handle_nvim_get_mode;
+
+ if (is_get_mode && !input_blocking()) {
+ // Defer the event to a special queue used by os/input.c. #6247
+ multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata);
+ } else {
+ // Invoke immediately.
+ on_request_event((void **)&evdata);
+ }
} else {
- multiqueue_put(channel->events, on_request_event, 1, event_data);
+ multiqueue_put(channel->events, on_request_event, 1, evdata);
}
}
@@ -468,11 +553,45 @@ static void on_request_event(void **argv)
api_free_array(args);
decref(channel);
xfree(e);
+ api_clear_error(&error);
}
+/// Returns the Stream that a Channel writes to.
+static Stream *chan_wstream(Channel *chan)
+{
+ switch (chan->type) {
+ case kChannelTypeSocket:
+ return &chan->data.stream;
+ case kChannelTypeProc:
+ return chan->data.proc->in;
+ case kChannelTypeStdio:
+ return &chan->data.std.out;
+ case kChannelTypeInternal:
+ return NULL;
+ }
+ abort();
+}
+
+/// Returns the Stream that a Channel reads from.
+static Stream *chan_rstream(Channel *chan)
+{
+ switch (chan->type) {
+ case kChannelTypeSocket:
+ return &chan->data.stream;
+ case kChannelTypeProc:
+ return chan->data.proc->out;
+ case kChannelTypeStdio:
+ return &chan->data.std.in;
+ case kChannelTypeInternal:
+ return NULL;
+ }
+ abort();
+}
+
+
static bool channel_write(Channel *channel, WBuffer *buffer)
{
- bool success;
+ bool success = false;
if (channel->closed) {
wstream_release_wbuffer(buffer);
@@ -481,16 +600,15 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
switch (channel->type) {
case kChannelTypeSocket:
- success = wstream_write(&channel->data.stream, buffer);
- break;
case kChannelTypeProc:
- success = wstream_write(channel->data.proc->in, buffer);
- break;
case kChannelTypeStdio:
- success = wstream_write(&channel->data.std.out, buffer);
+ success = wstream_write(chan_wstream(channel), buffer);
+ break;
+ case kChannelTypeInternal:
+ incref(channel);
+ CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
+ success = true;
break;
- default:
- abort();
}
if (!success) {
@@ -498,32 +616,49 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
char buf[256];
snprintf(buf,
sizeof(buf),
- "Before returning from a RPC call, channel %" PRIu64 " was "
- "closed due to a failed write",
+ "ch %" PRIu64 ": stream write failed. "
+ "RPC canceled; closing channel",
channel->id);
- call_set_error(channel, buf);
+ call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
return success;
}
+static void internal_read_event(void **argv)
+{
+ Channel *channel = argv[0];
+ WBuffer *buffer = argv[1];
+
+ msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
+ memcpy(msgpack_unpacker_buffer(channel->unpacker),
+ buffer->data, buffer->size);
+ msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
+
+ parse_msgpack(channel);
+
+ decref(channel);
+ wstream_release_wbuffer(buffer);
+}
+
static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
- api_set_error(&e, Exception, "%s", err);
+ api_set_error(&e, kErrorTypeException, "%s", err);
channel_write(channel, serialize_response(channel->id,
id,
&e,
NIL,
&out_buffer));
+ api_clear_error(&e);
}
static void send_request(Channel *channel,
uint64_t id,
- char *name,
+ const char *name,
Array args)
{
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
id,
method,
@@ -533,10 +668,10 @@ static void send_request(Channel *channel,
}
static void send_event(Channel *channel,
- char *name,
+ const char *name,
Array args)
{
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
channel_write(channel, serialize_request(channel->id,
0,
method,
@@ -545,7 +680,7 @@ static void send_event(Channel *channel,
1));
}
-static void broadcast_event(char *name, Array args)
+static void broadcast_event(const char *name, Array args)
{
kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
Channel *channel;
@@ -561,7 +696,7 @@ static void broadcast_event(char *name, Array args)
goto end;
}
- String method = {.size = strlen(name), .data = name};
+ const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(0,
0,
method,
@@ -622,8 +757,9 @@ static void close_channel(Channel *channel)
stream_close(&channel->data.std.out, NULL, NULL);
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
- default:
- abort();
+ case kChannelTypeInternal:
+ // nothing to free.
+ break;
}
decref(channel);
@@ -664,9 +800,12 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
+/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
- MultiQueue *events)
+ MultiQueue *events, char *source)
{
+ // Jobs and channels share the same id namespace.
+ assert(id == 0 || !pmap_get(uint64_t)(channels, id));
Channel *rv = xmalloc(sizeof(Channel));
rv->events = events ? events : multiqueue_new_child(main_loop.events);
rv->type = type;
@@ -680,6 +819,14 @@ static Channel *register_channel(ChannelType type, uint64_t id,
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);
+
+ ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
+ (type == kChannelTypeProc ? "proc"
+ : (type == kChannelTypeSocket ? "socket"
+ : (type == kChannelTypeStdio ? "stdio"
+ : (type == kChannelTypeInternal ? "internal" : "?")))),
+ (source ? source : "?"));
+
return rv;
}
@@ -714,13 +861,14 @@ static void complete_call(msgpack_object *obj, Channel *channel)
}
}
-static void call_set_error(Channel *channel, char *msg)
+static void call_set_error(Channel *channel, char *msg, int loglevel)
{
- ELOG("msgpack-rpc: %s", msg);
+ LOG(loglevel, "RPC: %s", msg);
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
frame->returned = true;
frame->errored = true;
+ api_free_object(frame->result);
frame->result = STRING_OBJ(cstr_to_string(msg));
}
@@ -729,7 +877,7 @@ static void call_set_error(Channel *channel, char *msg)
static WBuffer *serialize_request(uint64_t channel_id,
uint64_t request_id,
- String method,
+ const String method,
Array args,
msgpack_sbuffer *sbuffer,
size_t refcount)
@@ -789,10 +937,10 @@ static void decref(Channel *channel)
}
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
-#define REQ "[request] "
-#define RES "[response] "
-#define NOT "[notification] "
-#define ERR "[error] "
+#define REQ "[request] "
+#define RES "[response] "
+#define NOT "[notify] "
+#define ERR "[error] "
// Cannot define array with negative offsets, so this one is needed to be added
// to MSGPACK_UNPACK_\* values.
@@ -810,7 +958,7 @@ static void log_server_msg(uint64_t channel_id,
{
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
- DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id);
+ DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
const msgpack_unpack_return result =
msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
switch (result) {
@@ -847,7 +995,7 @@ static void log_client_msg(uint64_t channel_id,
bool is_request,
msgpack_object msg)
{
- DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id);
+ DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
log_lock();
FILE *f = open_log_file();
fprintf(f, is_request ? REQ : RES);
diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h
index 0d92976d02..f8fe6f129b 100644
--- a/src/nvim/msgpack_rpc/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -11,6 +11,11 @@
#define METHOD_MAXLEN 512
+/// HACK: os/input.c drains this queue immediately before blocking for input.
+/// Events on this queue are async-safe, but they need the resolved state
+/// of os_inchar(), so they are processed "just-in-time".
+MultiQueue *ch_before_blocking_events;
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/channel.h.generated.h"
#endif
diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c
index 5137b375f0..fecae11d45 100644
--- a/src/nvim/msgpack_rpc/helpers.c
+++ b/src/nvim/msgpack_rpc/helpers.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <stdint.h>
#include <stdbool.h>
#include <inttypes.h>
@@ -21,12 +24,12 @@ static msgpack_zone zone;
static msgpack_sbuffer sbuffer;
#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \
- bool msgpack_rpc_to_##lt(const msgpack_object *const obj, \
- Integer *const arg) \
- FUNC_ATTR_NONNULL_ALL \
+ static bool msgpack_rpc_to_##lt(const msgpack_object *const obj, \
+ Integer *const arg) \
+ FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT \
{ \
if (obj->type != MSGPACK_OBJECT_EXT \
- || obj->via.ext.type != kObjectType##t) { \
+ || obj->via.ext.type + EXT_OBJECT_TYPE_SHIFT != kObjectType##t) { \
return false; \
} \
\
@@ -45,13 +48,14 @@ static msgpack_sbuffer sbuffer;
return true; \
} \
\
- void msgpack_rpc_from_##lt(Integer o, msgpack_packer *res) \
+ static void msgpack_rpc_from_##lt(Integer o, msgpack_packer *res) \
FUNC_ATTR_NONNULL_ARG(2) \
{ \
msgpack_packer pac; \
msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \
msgpack_pack_int64(&pac, (handle_T)o); \
- msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \
+ msgpack_pack_ext(res, sbuffer.size, \
+ kObjectType##t - EXT_OBJECT_TYPE_SHIFT); \
msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \
msgpack_sbuffer_clear(&sbuffer); \
}
@@ -73,7 +77,7 @@ typedef struct {
size_t idx;
} MPToAPIObjectStackItem;
-/// Convert type used by msgpack parser to Neovim own API type
+/// Convert type used by msgpack parser to Nvim API type.
///
/// @param[in] obj Msgpack value to convert.
/// @param[out] arg Location where result of conversion will be saved.
@@ -84,7 +88,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
{
bool ret = true;
kvec_t(MPToAPIObjectStackItem) stack = KV_INITIAL_VALUE;
- kv_push(stack, ((MPToAPIObjectStackItem) { obj, arg, false, 0 }));
+ kv_push(stack, ((MPToAPIObjectStackItem) {
+ .mobj = obj,
+ .aobj = arg,
+ .container = false,
+ .idx = 0,
+ }));
while (ret && kv_size(stack)) {
MPToAPIObjectStackItem cur = kv_last(stack);
if (!cur.container) {
@@ -114,10 +123,16 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
}
break;
}
- case MSGPACK_OBJECT_FLOAT: {
+#ifdef NVIM_MSGPACK_HAS_FLOAT32
+ case MSGPACK_OBJECT_FLOAT32:
+ case MSGPACK_OBJECT_FLOAT64:
+#else
+ case MSGPACK_OBJECT_FLOAT:
+#endif
+ {
STATIC_ASSERT(sizeof(Float) == sizeof(cur.mobj->via.f64),
"Msgpack floating-point size does not match API integer");
- *cur.aobj = FLOATING_OBJ(cur.mobj->via.f64);
+ *cur.aobj = FLOAT_OBJ(cur.mobj->via.f64);
break;
}
#define STR_CASE(type, attr, obj, dest, conv) \
@@ -181,7 +196,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
case MSGPACK_OBJECT_BOOLEAN:
case MSGPACK_OBJECT_POSITIVE_INTEGER:
case MSGPACK_OBJECT_NEGATIVE_INTEGER:
+#ifdef NVIM_MSGPACK_HAS_FLOAT32
+ case MSGPACK_OBJECT_FLOAT32:
+ case MSGPACK_OBJECT_FLOAT64:
+#else
case MSGPACK_OBJECT_FLOAT:
+#endif
case MSGPACK_OBJECT_EXT:
case MSGPACK_OBJECT_MAP:
case MSGPACK_OBJECT_ARRAY: {
@@ -211,7 +231,7 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
break;
}
case MSGPACK_OBJECT_EXT: {
- switch (cur.mobj->via.ext.type) {
+ switch ((ObjectType)(cur.mobj->via.ext.type + EXT_OBJECT_TYPE_SHIFT)) {
case kObjectTypeBuffer: {
cur.aobj->type = kObjectTypeBuffer;
ret = msgpack_rpc_to_buffer(cur.mobj, &cur.aobj->data.integer);
@@ -227,6 +247,15 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
ret = msgpack_rpc_to_tabpage(cur.mobj, &cur.aobj->data.integer);
break;
}
+ case kObjectTypeNil:
+ case kObjectTypeBoolean:
+ case kObjectTypeInteger:
+ case kObjectTypeFloat:
+ case kObjectTypeString:
+ case kObjectTypeArray:
+ case kObjectTypeDictionary: {
+ break;
+ }
}
break;
}
@@ -322,7 +351,7 @@ void msgpack_rpc_from_float(Float result, msgpack_packer *res)
msgpack_pack_double(res, result);
}
-void msgpack_rpc_from_string(String result, msgpack_packer *res)
+void msgpack_rpc_from_string(const String result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2)
{
msgpack_pack_str(res, result.size);
@@ -337,7 +366,7 @@ typedef struct {
size_t idx;
} APIToMPObjectStackItem;
-/// Convert type used by Neovim API to msgpack
+/// Convert type used by Nvim API to msgpack type.
///
/// @param[in] result Object to convert.
/// @param[out] res Structure that defines where conversion results are saved.
@@ -350,6 +379,9 @@ void msgpack_rpc_from_object(const Object result, msgpack_packer *const res)
kv_push(stack, ((APIToMPObjectStackItem) { &result, false, 0 }));
while (kv_size(stack)) {
APIToMPObjectStackItem cur = kv_last(stack);
+ STATIC_ASSERT(kObjectTypeWindow == kObjectTypeBuffer + 1
+ && kObjectTypeTabpage == kObjectTypeWindow + 1,
+ "Buffer, window and tabpage enum items are in order");
switch (cur.aobj->type) {
case kObjectTypeNil: {
msgpack_pack_nil(res);
@@ -461,8 +493,7 @@ Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
Array args,
Error *error)
{
- snprintf(error->msg, sizeof(error->msg), "Invalid method name");
- error->set = true;
+ api_set_error(error, kErrorTypeException, "Invalid method name");
return NIL;
}
@@ -471,14 +502,13 @@ Object msgpack_rpc_handle_invalid_arguments(uint64_t channel_id,
Array args,
Error *error)
{
- snprintf(error->msg, sizeof(error->msg), "Invalid method arguments");
- error->set = true;
+ api_set_error(error, kErrorTypeException, "Invalid method arguments");
return NIL;
}
/// Serializes a msgpack-rpc request or notification(id == 0)
void msgpack_rpc_serialize_request(uint64_t request_id,
- String method,
+ const String method,
Array args,
msgpack_packer *pac)
FUNC_ATTR_NONNULL_ARG(4)
@@ -505,7 +535,7 @@ void msgpack_rpc_serialize_response(uint64_t response_id,
msgpack_pack_int(pac, 1);
msgpack_pack_uint64(pac, response_id);
- if (err->set) {
+ if (ERROR_SET(err)) {
// error represented by a [type, message] array
msgpack_pack_array(pac, 2);
msgpack_rpc_from_integer(err->type, pac);
@@ -558,49 +588,49 @@ void msgpack_rpc_validate(uint64_t *response_id,
*response_id = NO_RESPONSE;
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
- api_set_error(err, Validation, _("Message is not an array"));
+ api_set_error(err, kErrorTypeValidation, "Message is not an array");
return;
}
if (req->via.array.size == 0) {
- api_set_error(err, Validation, _("Message is empty"));
+ api_set_error(err, kErrorTypeValidation, "Message is empty");
return;
}
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Message type must be an integer"));
+ api_set_error(err, kErrorTypeValidation, "Message type must be an integer");
return;
}
uint64_t type = req->via.array.ptr[0].via.u64;
if (type != kMessageTypeRequest && type != kMessageTypeNotification) {
- api_set_error(err, Validation, _("Unknown message type"));
+ api_set_error(err, kErrorTypeValidation, "Unknown message type");
return;
}
if ((type == kMessageTypeRequest && req->via.array.size != 4)
|| (type == kMessageTypeNotification && req->via.array.size != 3)) {
- api_set_error(err, Validation, _("Request array size should be 4 (request) "
- "or 3 (notification)"));
+ api_set_error(err, kErrorTypeValidation,
+ "Request array size must be 4 (request) or 3 (notification)");
return;
}
if (type == kMessageTypeRequest) {
msgpack_object *id_obj = msgpack_rpc_msg_id(req);
if (!id_obj) {
- api_set_error(err, Validation, _("ID must be a positive integer"));
+ api_set_error(err, kErrorTypeValidation, "ID must be a positive integer");
return;
}
*response_id = id_obj->via.u64;
}
if (!msgpack_rpc_method(req)) {
- api_set_error(err, Validation, _("Method must be a string"));
+ api_set_error(err, kErrorTypeValidation, "Method must be a string");
return;
}
if (!msgpack_rpc_args(req)) {
- api_set_error(err, Validation, _("Parameters must be an array"));
+ api_set_error(err, kErrorTypeValidation, "Parameters must be an array");
return;
}
}
diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h
index 7d9f114140..0e4cd1be6d 100644
--- a/src/nvim/msgpack_rpc/helpers.h
+++ b/src/nvim/msgpack_rpc/helpers.h
@@ -9,6 +9,13 @@
#include "nvim/event/wstream.h"
#include "nvim/api/private/defs.h"
+/// Value by which objects represented as EXT type are shifted
+///
+/// Subtracted when packing, added when unpacking. Used to allow moving
+/// buffer/window/tabpage block inside ObjectType enum. This block yet cannot be
+/// split or reordered.
+#define EXT_OBJECT_TYPE_SHIFT kObjectTypeBuffer
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/helpers.h.generated.h"
#endif
diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c
index d7c2926a0f..1e0cc27886 100644
--- a/src/nvim/msgpack_rpc/server.c
+++ b/src/nvim/msgpack_rpc/server.c
@@ -1,3 +1,6 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
#include <assert.h>
#include <stdlib.h>
#include <string.h>
@@ -94,39 +97,61 @@ char *server_address_new(void)
#endif
}
-/// Starts listening for API calls on the TCP address or pipe path `endpoint`.
+/// Check if this instance owns a pipe address.
+/// The argument must already be resolved to an absolute path!
+bool server_owns_pipe_address(const char *path)
+{
+ for (int i = 0; i < watchers.ga_len; i++) {
+ if (!strcmp(path, ((SocketWatcher **)watchers.ga_data)[i]->addr)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/// Starts listening for API calls.
+///
/// The socket type is determined by parsing `endpoint`: If it's a valid IPv4
-/// address in 'ip[:port]' format, then it will be TCP socket. The port is
-/// optional and if omitted defaults to NVIM_DEFAULT_TCP_PORT. Otherwise it
-/// will be a unix socket or named pipe.
+/// or IPv6 address in 'ip:[port]' format, then it will be a TCP socket.
+/// Otherwise it will be a Unix socket or named pipe (Windows).
+///
+/// If no port is given, a random one will be assigned.
///
-/// @param endpoint Address of the server. Either a 'ip[:port]' string or an
-/// arbitrary identifier (trimmed to 256 bytes) for the unix socket or
-/// named pipe.
+/// @param endpoint Address of the server. Either a 'ip:[port]' string or an
+/// arbitrary identifier (trimmed to 256 bytes) for the Unix
+/// socket or named pipe.
/// @returns 0 on success, 1 on a regular error, and negative errno
-/// on failure to bind or connect.
+/// on failure to bind or listen.
int server_start(const char *endpoint)
{
- if (endpoint == NULL) {
- ELOG("Attempting to start server on NULL endpoint");
+ if (endpoint == NULL || endpoint[0] == '\0') {
+ WLOG("Empty or NULL endpoint");
return 1;
}
SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher));
- socket_watcher_init(&main_loop, watcher, endpoint, NULL);
+
+ int result = socket_watcher_init(&main_loop, watcher, endpoint);
+ if (result < 0) {
+ xfree(watcher);
+ return result;
+ }
// Check if a watcher for the endpoint already exists
for (int i = 0; i < watchers.ga_len; i++) {
if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) {
ELOG("Already listening on %s", watcher->addr);
+ if (watcher->stream->type == UV_TCP) {
+ uv_freeaddrinfo(watcher->uv.tcp.addrinfo);
+ }
socket_watcher_close(watcher, free_server);
return 1;
}
}
- int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb);
+ result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb);
if (result < 0) {
- ELOG("Failed to start server: %s", uv_strerror(result));
+ WLOG("Failed to start server: %s", uv_strerror(result));
socket_watcher_close(watcher, free_server);
return result;
}
diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h
index f1a6703938..5446e40e0b 100644
--- a/src/nvim/msgpack_rpc/server.h
+++ b/src/nvim/msgpack_rpc/server.h
@@ -1,6 +1,8 @@
#ifndef NVIM_MSGPACK_RPC_SERVER_H
#define NVIM_MSGPACK_RPC_SERVER_H
+#include <stdio.h>
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/server.h.generated.h"
#endif