diff options
| author | Jakob Schnitzer <mail@jakobschnitzer.de> | 2017-06-28 16:52:04 +0200 |
|---|---|---|
| committer | Jakob Schnitzer <mail@jakobschnitzer.de> | 2017-06-28 16:52:04 +0200 |
| commit | e8829710bc5f38208499e0ad38402eac24a67ac2 (patch) | |
| tree | 4e1ae954c2e301adadbfa7038b823ea9ea2fb08e /src/nvim/msgpack_rpc | |
| parent | ff8b2eb435c518f0eafd0e509afe1f5ee4a81fd1 (diff) | |
| parent | f0dafa89c2b7602cfedf0bd3409858e4c212b0a2 (diff) | |
| download | rneovim-e8829710bc5f38208499e0ad38402eac24a67ac2.tar.gz rneovim-e8829710bc5f38208499e0ad38402eac24a67ac2.tar.bz2 rneovim-e8829710bc5f38208499e0ad38402eac24a67ac2.zip | |
Merge branch 'master' into option-fixes
Diffstat (limited to 'src/nvim/msgpack_rpc')
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 133 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 5 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 31 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/helpers.h | 7 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/server.c | 48 | ||||
| -rw-r--r-- | src/nvim/msgpack_rpc/server.h | 2 |
6 files changed, 176 insertions, 50 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 59594357de..68ac35bc4e 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -12,6 +12,7 @@ #include "nvim/api/vim.h" #include "nvim/api/ui.h" #include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" #include "nvim/event/rstream.h" @@ -28,7 +29,9 @@ #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" +#include "nvim/path.h" #include "nvim/lib/kvec.h" +#include "nvim/os/input.h" #define CHANNEL_BUFFER_SIZE 0xffff @@ -40,7 +43,8 @@ typedef enum { kChannelTypeSocket, kChannelTypeProc, - kChannelTypeStdio + kChannelTypeStdio, + kChannelTypeInternal } ChannelType; typedef struct { @@ -89,6 +93,7 @@ static msgpack_sbuffer out_buffer; /// Initializes the module void channel_init(void) { + ch_before_blocking_events = multiqueue_new_child(main_loop.events); channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); @@ -123,7 +128,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id) wstream_init(proc->in, 0); rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack, channel); + rstream_start(proc->out, receive_msgpack, channel); return channel->id; } @@ -140,7 +145,36 @@ void channel_from_connection(SocketWatcher *watcher) channel->data.stream.internal_data = channel; wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack, channel); + rstream_start(&channel->data.stream, receive_msgpack, channel); +} + +uint64_t channel_connect(bool tcp, const char *address, + int timeout, const char **error) +{ + if (!tcp) { + char *path = fix_fname(address); + if (server_owns_pipe_address(path)) { + // avoid deadlock + xfree(path); + return channel_create_internal(); + } + xfree(path); + } + + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); + if (!socket_connect(&main_loop, &channel->data.stream, + tcp, address, timeout, error)) { + decref(channel); + return 0; + } + + incref(channel); // close channel only after the stream is closed + channel->data.stream.internal_close_cb = close_cb; + channel->data.stream.internal_data = channel; + wstream_init(&channel->data.stream, 0); + rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.stream, receive_msgpack, channel); + return channel->id; } /// Sends event/arguments to channel @@ -303,11 +337,20 @@ void channel_from_stdio(void) incref(channel); // stdio channels are only closed on exit // read stream rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, parse_msgpack, channel); + rstream_start(&channel->data.std.in, receive_msgpack, channel); // write stream wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); } +/// Creates a loopback channel. This is used to avoid deadlock +/// when an instance connects to its own named pipe. +uint64_t channel_create_internal(void) +{ + Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); + incref(channel); // internal channel lives until process exit + return channel->id; +} + void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); @@ -316,8 +359,8 @@ void channel_process_exit(uint64_t id, int status) decref(channel); } -static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, - bool eof) +static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, + void *data, bool eof) { Channel *channel = data; incref(channel); @@ -327,7 +370,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, WARNING_LOG_LEVEL); goto end; } @@ -339,6 +382,14 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); msgpack_unpacker_buffer_consumed(channel->unpacker, count); + parse_msgpack(channel); + +end: + decref(channel); +} + +static void parse_msgpack(Channel *channel) +{ msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); msgpack_unpack_return result; @@ -358,11 +409,11 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -386,11 +437,9 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - decref(channel); } + static void handle_request(Channel *channel, msgpack_object *request) FUNC_ATTR_NONNULL_ALL { @@ -410,7 +459,7 @@ static void handle_request(Channel *channel, msgpack_object *request) snprintf(buf, sizeof(buf), "ch %" PRIu64 " sent an invalid message, closed.", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } api_clear_error(&error); return; @@ -433,16 +482,24 @@ static void handle_request(Channel *channel, msgpack_object *request) handler.async = true; } - RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); - event_data->channel = channel; - event_data->handler = handler; - event_data->args = args; - event_data->request_id = request_id; + RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); + evdata->channel = channel; + evdata->handler = handler; + evdata->args = args; + evdata->request_id = request_id; incref(channel); if (handler.async) { - on_request_event((void **)&event_data); + bool is_get_mode = handler.fn == handle_nvim_get_mode; + + if (is_get_mode && !input_blocking()) { + // Defer the event to a special queue used by os/input.c. #6247 + multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata); + } else { + // Invoke immediately. + on_request_event((void **)&evdata); + } } else { - multiqueue_put(channel->events, on_request_event, 1, event_data); + multiqueue_put(channel->events, on_request_event, 1, evdata); } } @@ -475,7 +532,7 @@ static void on_request_event(void **argv) static bool channel_write(Channel *channel, WBuffer *buffer) { - bool success; + bool success = false; if (channel->closed) { wstream_release_wbuffer(buffer); @@ -492,8 +549,11 @@ static bool channel_write(Channel *channel, WBuffer *buffer) case kChannelTypeStdio: success = wstream_write(&channel->data.std.out, buffer); break; - default: - abort(); + case kChannelTypeInternal: + incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + break; } if (!success) { @@ -504,12 +564,28 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "Before returning from a RPC call, ch %" PRIu64 " was " "closed due to a failed write", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } return success; } +static void internal_read_event(void **argv) +{ + Channel *channel = argv[0]; + WBuffer *buffer = argv[1]; + + msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->unpacker), + buffer->data, buffer->size); + msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); + + parse_msgpack(channel); + + decref(channel); + wstream_release_wbuffer(buffer); +} + static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; @@ -626,8 +702,9 @@ static void close_channel(Channel *channel) stream_close(&channel->data.std.out, NULL, NULL); multiqueue_put(main_loop.fast_events, exit_event, 1, channel); return; - default: - abort(); + case kChannelTypeInternal: + // nothing to free. + break; } decref(channel); @@ -718,9 +795,9 @@ static void complete_call(msgpack_object *obj, Channel *channel) } } -static void call_set_error(Channel *channel, char *msg) +static void call_set_error(Channel *channel, char *msg, int loglevel) { - ELOG("RPC: %s", msg); + LOG(loglevel, "RPC: %s", msg); for (size_t i = 0; i < kv_size(channel->call_stack); i++) { ChannelCallFrame *frame = kv_A(channel->call_stack, i); frame->returned = true; diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index 0d92976d02..f8fe6f129b 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -11,6 +11,11 @@ #define METHOD_MAXLEN 512 +/// HACK: os/input.c drains this queue immediately before blocking for input. +/// Events on this queue are async-safe, but they need the resolved state +/// of os_inchar(), so they are processed "just-in-time". +MultiQueue *ch_before_blocking_events; + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/channel.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 0228582d37..444c6cc256 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -24,12 +24,12 @@ static msgpack_zone zone; static msgpack_sbuffer sbuffer; #define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(const msgpack_object *const obj, \ - Integer *const arg) \ - FUNC_ATTR_NONNULL_ALL \ + static bool msgpack_rpc_to_##lt(const msgpack_object *const obj, \ + Integer *const arg) \ + FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT \ { \ if (obj->type != MSGPACK_OBJECT_EXT \ - || obj->via.ext.type != kObjectType##t) { \ + || obj->via.ext.type + EXT_OBJECT_TYPE_SHIFT != kObjectType##t) { \ return false; \ } \ \ @@ -48,13 +48,14 @@ static msgpack_sbuffer sbuffer; return true; \ } \ \ - void msgpack_rpc_from_##lt(Integer o, msgpack_packer *res) \ + static void msgpack_rpc_from_##lt(Integer o, msgpack_packer *res) \ FUNC_ATTR_NONNULL_ARG(2) \ { \ msgpack_packer pac; \ msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \ msgpack_pack_int64(&pac, (handle_T)o); \ - msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \ + msgpack_pack_ext(res, sbuffer.size, \ + kObjectType##t - EXT_OBJECT_TYPE_SHIFT); \ msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \ msgpack_sbuffer_clear(&sbuffer); \ } @@ -76,7 +77,7 @@ typedef struct { size_t idx; } MPToAPIObjectStackItem; -/// Convert type used by msgpack parser to Neovim own API type +/// Convert type used by msgpack parser to Nvim API type. /// /// @param[in] obj Msgpack value to convert. /// @param[out] arg Location where result of conversion will be saved. @@ -126,7 +127,7 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) { STATIC_ASSERT(sizeof(Float) == sizeof(cur.mobj->via.f64), "Msgpack floating-point size does not match API integer"); - *cur.aobj = FLOATING_OBJ(cur.mobj->via.f64); + *cur.aobj = FLOAT_OBJ(cur.mobj->via.f64); break; } #define STR_CASE(type, attr, obj, dest, conv) \ @@ -225,7 +226,7 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) break; } case MSGPACK_OBJECT_EXT: { - switch (cur.mobj->via.ext.type) { + switch ((ObjectType)(cur.mobj->via.ext.type + EXT_OBJECT_TYPE_SHIFT)) { case kObjectTypeBuffer: { cur.aobj->type = kObjectTypeBuffer; ret = msgpack_rpc_to_buffer(cur.mobj, &cur.aobj->data.integer); @@ -241,6 +242,15 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg) ret = msgpack_rpc_to_tabpage(cur.mobj, &cur.aobj->data.integer); break; } + case kObjectTypeNil: + case kObjectTypeBoolean: + case kObjectTypeInteger: + case kObjectTypeFloat: + case kObjectTypeString: + case kObjectTypeArray: + case kObjectTypeDictionary: { + break; + } } break; } @@ -364,6 +374,9 @@ void msgpack_rpc_from_object(const Object result, msgpack_packer *const res) kv_push(stack, ((APIToMPObjectStackItem) { &result, false, 0 })); while (kv_size(stack)) { APIToMPObjectStackItem cur = kv_last(stack); + STATIC_ASSERT(kObjectTypeWindow == kObjectTypeBuffer + 1 + && kObjectTypeTabpage == kObjectTypeWindow + 1, + "Buffer, window and tabpage enum items are in order"); switch (cur.aobj->type) { case kObjectTypeNil: { msgpack_pack_nil(res); diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h index 7d9f114140..0e4cd1be6d 100644 --- a/src/nvim/msgpack_rpc/helpers.h +++ b/src/nvim/msgpack_rpc/helpers.h @@ -9,6 +9,13 @@ #include "nvim/event/wstream.h" #include "nvim/api/private/defs.h" +/// Value by which objects represented as EXT type are shifted +/// +/// Subtracted when packing, added when unpacking. Used to allow moving +/// buffer/window/tabpage block inside ObjectType enum. This block yet cannot be +/// split or reordered. +#define EXT_OBJECT_TYPE_SHIFT kObjectTypeBuffer + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/helpers.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index b6958088ca..1e0cc27886 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -97,39 +97,61 @@ char *server_address_new(void) #endif } -/// Starts listening for API calls on the TCP address or pipe path `endpoint`. +/// Check if this instance owns a pipe address. +/// The argument must already be resolved to an absolute path! +bool server_owns_pipe_address(const char *path) +{ + for (int i = 0; i < watchers.ga_len; i++) { + if (!strcmp(path, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { + return true; + } + } + return false; +} + +/// Starts listening for API calls. +/// /// The socket type is determined by parsing `endpoint`: If it's a valid IPv4 -/// address in 'ip[:port]' format, then it will be TCP socket. The port is -/// optional and if omitted defaults to NVIM_DEFAULT_TCP_PORT. Otherwise it -/// will be a unix socket or named pipe. +/// or IPv6 address in 'ip:[port]' format, then it will be a TCP socket. +/// Otherwise it will be a Unix socket or named pipe (Windows). +/// +/// If no port is given, a random one will be assigned. /// -/// @param endpoint Address of the server. Either a 'ip[:port]' string or an -/// arbitrary identifier (trimmed to 256 bytes) for the unix socket or -/// named pipe. +/// @param endpoint Address of the server. Either a 'ip:[port]' string or an +/// arbitrary identifier (trimmed to 256 bytes) for the Unix +/// socket or named pipe. /// @returns 0 on success, 1 on a regular error, and negative errno -/// on failure to bind or connect. +/// on failure to bind or listen. int server_start(const char *endpoint) { - if (endpoint == NULL) { - ELOG("Attempting to start server on NULL endpoint"); + if (endpoint == NULL || endpoint[0] == '\0') { + WLOG("Empty or NULL endpoint"); return 1; } SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher)); - socket_watcher_init(&main_loop, watcher, endpoint, NULL); + + int result = socket_watcher_init(&main_loop, watcher, endpoint); + if (result < 0) { + xfree(watcher); + return result; + } // Check if a watcher for the endpoint already exists for (int i = 0; i < watchers.ga_len; i++) { if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { ELOG("Already listening on %s", watcher->addr); + if (watcher->stream->type == UV_TCP) { + uv_freeaddrinfo(watcher->uv.tcp.addrinfo); + } socket_watcher_close(watcher, free_server); return 1; } } - int result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb); + result = socket_watcher_start(watcher, MAX_CONNECTIONS, connection_cb); if (result < 0) { - ELOG("Failed to start server: %s", uv_strerror(result)); + WLOG("Failed to start server: %s", uv_strerror(result)); socket_watcher_close(watcher, free_server); return result; } diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h index f1a6703938..5446e40e0b 100644 --- a/src/nvim/msgpack_rpc/server.h +++ b/src/nvim/msgpack_rpc/server.h @@ -1,6 +1,8 @@ #ifndef NVIM_MSGPACK_RPC_SERVER_H #define NVIM_MSGPACK_RPC_SERVER_H +#include <stdio.h> + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/server.h.generated.h" #endif |