diff options
Diffstat (limited to 'src/nvim/msgpack_rpc')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 274 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 1 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel_defs.h | 6 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 2 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.h | 1 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/server.c | 116 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/unpacker.c | 322 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/unpacker.h | 46 |
8 files changed, 544 insertions, 224 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 299651ee97..de01443313 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -26,12 +26,13 @@ #include "nvim/message.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/unpacker.h" #include "nvim/os/input.h" #include "nvim/os_unix.h" #include "nvim/ui.h" #include "nvim/vim.h" -#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL +#if MIN_LOG_LEVEL > LOGLVL_DBG # define log_client_msg(...) # define log_server_msg(...) #endif @@ -49,21 +50,21 @@ void rpc_init(void) msgpack_sbuffer_init(&out_buffer); } - void rpc_start(Channel *channel) { channel_incref(channel); channel->is_rpc = true; RpcState *rpc = &channel->rpc; rpc->closed = false; - rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker); + unpacker_init(rpc->unpacker); rpc->next_request_id = 1; rpc->info = (Dictionary)ARRAY_DICT_INIT; kv_init(rpc->call_stack); if (channel->streamtype != kChannelStreamInternal) { Stream *out = channel_outstream(channel); -#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#if MIN_LOG_LEVEL <= LOGLVL_DBG Stream *in = channel_instream(channel); DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, (void *)in, (void *)out); @@ -73,7 +74,6 @@ void rpc_start(Channel *channel) } } - static Channel *find_rpc_channel(uint64_t id) { Channel *chan = find_channel(id); @@ -114,7 +114,8 @@ bool rpc_send_event(uint64_t id, const char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *err) +Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem *result_mem, + Error *err) { Channel *channel = NULL; @@ -131,7 +132,7 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *er send_request(channel, request_id, method_name, args); // Push the frame - ChannelCallFrame frame = { request_id, false, false, NIL }; + ChannelCallFrame frame = { request_id, false, false, NIL, NULL }; kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); (void)kv_pop(rpc->call_stack); @@ -156,11 +157,15 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, Error *er api_set_error(err, kErrorTypeException, "%s", "unknown error"); } - api_free_object(frame.result); + // frame.result was allocated in an arena + arena_mem_free(frame.result_mem, &rpc->unpacker->reuse_blk); + frame.result_mem = NULL; } channel_decref(channel); + *result_mem = frame.result_mem; + return frame.errored ? NIL : frame.result; } @@ -211,20 +216,20 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf, INFO_LOG_LEVEL); + chan_close_with_error(channel, buf, LOGLVL_INF); goto end; } - size_t count = rbuffer_size(rbuf); DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", - channel->id, count, (void *)stream); - - // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); - msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); + channel->id, rbuffer_size(rbuf), (void *)stream); + Unpacker *p = channel->rpc.unpacker; + size_t size = 0; + p->read_ptr = rbuffer_read_ptr(rbuf, &size); + p->read_size = size; parse_msgpack(channel); + size_t consumed = size - p->read_size; + rbuffer_consumed_compact(rbuf, consumed); end: channel_decref(channel); @@ -232,111 +237,71 @@ end: static void parse_msgpack(Channel *channel) { - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - msgpack_unpack_return result; - - // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { - bool is_response = is_rpc_response(&unpacked.data); - log_client_msg(channel->id, !is_response, unpacked.data); - - if (is_response) { - if (is_valid_rpc_response(&unpacked.data, channel)) { - complete_call(&unpacked.data, channel); - } else { + Unpacker *p = channel->rpc.unpacker; + while (unpacker_advance(p)) { + if (p->type == kMessageTypeResponse) { + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + if (p->request_id != frame->request_id) { char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf, ERROR_LOG_LEVEL); + chan_close_with_error(channel, buf, LOGLVL_ERR); + } + frame->returned = true; + frame->errored = (p->error.type != kObjectTypeNil); + + if (frame->errored) { + frame->result = p->error; + // TODO(bfredl): p->result should not even be decoded + // api_free_object(p->result); + } else { + frame->result = p->result; } - msgpack_unpacked_destroy(&unpacked); + frame->result_mem = arena_finish(&p->arena); } else { - handle_request(channel, &unpacked.data); - } - } + log_client_msg(channel->id, p->type == kMessageTypeRequest, p->handler.name); - if (result == MSGPACK_UNPACK_NOMEM_ERROR) { - mch_errmsg(e_outofmem); - mch_errmsg("\n"); - channel_decref(channel); - preserve_exit(); + Object res = p->result; + if (p->result.type != kObjectTypeArray) { + chan_close_with_error(channel, "msgpack-rpc request args has to be an array", LOGLVL_ERR); + return; + } + Array arg = res.data.array; + handle_request(channel, p, arg); + } } - if (result == MSGPACK_UNPACK_PARSE_ERROR) { - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when its internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) - send_error(channel, kMessageTypeRequest, 0, - "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); + if (unpacker_closed(p)) { + chan_close_with_error(channel, p->unpack_error.msg, LOGLVL_ERR); + api_clear_error(&p->unpack_error); } } /// Handles requests and notifications received on the channel. -static void handle_request(Channel *channel, msgpack_object *request) +static void handle_request(Channel *channel, Unpacker *p, Array args) FUNC_ATTR_NONNULL_ALL { - uint32_t request_id; - Error error = ERROR_INIT; - MessageType type = msgpack_rpc_validate(&request_id, request, &error); + assert(p->type == kMessageTypeRequest || p->type == kMessageTypeNotification); - if (ERROR_SET(&error)) { - // Validation failed, send response with error - if (channel_write(channel, - serialize_response(channel->id, - type, - request_id, - &error, - NIL, - &out_buffer))) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 " sent an invalid message, closed.", - channel->id); - call_set_error(channel, buf, ERROR_LOG_LEVEL); - } - api_clear_error(&error); - return; - } - assert(type == kMessageTypeRequest || type == kMessageTypeNotification); - - MsgpackRpcRequestHandler handler; - msgpack_object *method = msgpack_rpc_method(request); - handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, - method->via.bin.size, - &error); - - // check method arguments - Array args = ARRAY_DICT_INIT; - if (!ERROR_SET(&error) - && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { - api_set_error(&error, kErrorTypeException, "Invalid method arguments"); - } - - if (ERROR_SET(&error)) { - send_error(channel, type, request_id, error.msg); - api_clear_error(&error); - api_free_array(args); + if (!p->handler.fn) { + send_error(channel, p->type, p->request_id, p->unpack_error.msg); + api_clear_error(&p->unpack_error); + arena_mem_free(arena_finish(&p->arena), &p->reuse_blk); return; } RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); - evdata->type = type; + evdata->type = p->type; evdata->channel = channel; - evdata->handler = handler; + evdata->handler = p->handler; evdata->args = args; - evdata->request_id = request_id; + evdata->used_mem = arena_finish(&p->arena); + evdata->request_id = p->request_id; channel_incref(channel); - if (handler.fast) { - bool is_get_mode = handler.fn == handle_nvim_get_mode; + if (p->handler.fast) { + bool is_get_mode = p->handler.fn == handle_nvim_get_mode; if (is_get_mode && !input_blocking()) { // Defer the event to a special queue used by os/input.c. #6247 @@ -346,7 +311,7 @@ static void handle_request(Channel *channel, msgpack_object *request) request_event((void **)&evdata); } } else { - bool is_resize = handler.fn == handle_nvim_ui_try_resize; + bool is_resize = p->handler.fn == handle_nvim_ui_try_resize; if (is_resize) { Event ev = event_create_oneshot(event_create(request_event, 1, evdata), 2); @@ -354,12 +319,11 @@ static void handle_request(Channel *channel, msgpack_object *request) multiqueue_put_event(resize_events, ev); } else { multiqueue_put(channel->events, request_event, 1, evdata); - DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr); + DLOG("RPC: scheduled %.*s", (int)p->method_name_len, p->handler.name); } } } - /// Handles a message, depending on the type: /// - Request: invokes method and writes the response (or error). /// - Notification: invokes method (emits `nvim_error_event` on error). @@ -389,12 +353,24 @@ static void request_event(void **argv) } free_ret: - api_free_array(e->args); + // e->args is allocated in an arena + arena_mem_free(e->used_mem, &channel->rpc.unpacker->reuse_blk); channel_decref(channel); xfree(e); api_clear_error(&error); } +bool rpc_write_raw(uint64_t id, WBuffer *buffer) +{ + Channel *channel = find_rpc_channel(id); + if (!channel) { + wstream_release_wbuffer(buffer); + return false; + } + + return channel_write(channel, buffer); +} + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; @@ -413,7 +389,6 @@ static bool channel_write(Channel *channel, WBuffer *buffer) success = wstream_write(in, buffer); } - if (!success) { // If the write failed for any reason, close the channel char buf[256]; @@ -422,7 +397,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "ch %" PRIu64 ": stream write failed. " "RPC canceled; closing channel", channel->id); - call_set_error(channel, buf, ERROR_LOG_LEVEL); + chan_close_with_error(channel, buf, LOGLVL_ERR); } return success; @@ -432,14 +407,19 @@ static void internal_read_event(void **argv) { Channel *channel = argv[0]; WBuffer *buffer = argv[1]; + Unpacker *p = channel->rpc.unpacker; - msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), - buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); - + p->read_ptr = buffer->data; + p->read_size = buffer->size; parse_msgpack(channel); + if (p->read_size) { + // This should not happen, as WBuffer is one single serialized message. + if (!channel->rpc.closed) { + chan_close_with_error(channel, "internal channel: internal error", LOGLVL_ERR); + } + } + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -535,7 +515,6 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } - /// Mark rpc state as closed, and release its reference to the channel. /// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) void rpc_close(Channel *channel) @@ -547,13 +526,25 @@ void rpc_close(Channel *channel) channel->rpc.closed = true; channel_decref(channel); - if (channel->streamtype == kChannelStreamStdio) { + if (channel->streamtype == kChannelStreamStdio + || channel->id == ui_client_channel_id) { multiqueue_put(main_loop.fast_events, exit_event, 0); } } +static void exit_delay_cb(uv_timer_t *handle) +{ + uv_timer_stop(&main_loop.exit_delay_timer); + multiqueue_put(main_loop.fast_events, exit_event, 0); +} + static void exit_event(void **argv) { + if (exit_need_delay) { + uv_timer_start(&main_loop.exit_delay_timer, exit_delay_cb, 0, 0); + return; + } + if (!exiting) { os_exit(0); } @@ -562,7 +553,8 @@ static void exit_event(void **argv) void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - msgpack_unpacker_free(channel->rpc.unpacker); + unpacker_teardown(channel->rpc.unpacker); + xfree(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; @@ -575,48 +567,13 @@ void rpc_free(Channel *channel) api_free_dictionary(channel->rpc.info); } -static bool is_rpc_response(msgpack_object *obj) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 4 - && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.array.ptr[0].via.u64 == 1 - && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; -} - -static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) -{ - uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64; - if (kv_size(channel->rpc.call_stack) == 0) { - return false; - } - - // Must be equal to the frame at the stack's bottom - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - return response_id == frame->request_id; -} - -static void complete_call(msgpack_object *obj, Channel *channel) -{ - ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); - frame->returned = true; - frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - - if (frame->errored) { - msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); - } else { - msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); - } -} - -static void call_set_error(Channel *channel, char *msg, int loglevel) +static void chan_close_with_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; - api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } @@ -697,7 +654,7 @@ const char *rpc_client_name(Channel *chan) return NULL; } -#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#if MIN_LOG_LEVEL <= LOGLVL_DBG # define REQ "[request] " # define RES "[response] " # define NOT "[notify] " @@ -727,7 +684,8 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) log_lock(); FILE *f = open_log_file(); fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); - log_msg_close(f, unpacked.data); + msgpack_object_print(f, unpacked.data); + log_close(f); msgpack_unpacked_destroy(&unpacked); break; } @@ -738,30 +696,24 @@ static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) log_lock(); FILE *f = open_log_file(); fprintf(f, ERR); - log_msg_close(f, (msgpack_object) { - .type = MSGPACK_OBJECT_STR, - .via.str = { - .ptr = (char *)msgpack_error_messages[result + MUR_OFF], - .size = (uint32_t)strlen(msgpack_error_messages[result + MUR_OFF]), - }, - }); + fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]); + log_close(f); break; } } } -static void log_client_msg(uint64_t channel_id, bool is_request, msgpack_object msg) +static void log_client_msg(uint64_t channel_id, bool is_request, const char *name) { DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); log_lock(); FILE *f = open_log_file(); - fprintf(f, is_request ? REQ : RES); - log_msg_close(f, msg); + fprintf(f, "%s: %s", is_request ? REQ : RES, name); + log_close(f); } -static void log_msg_close(FILE *f, msgpack_object msg) +static void log_close(FILE *f) { - msgpack_object_print(f, msg); fputc('\n', f); fflush(f); fclose(f); diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index eb0de47437..ac7911bb2c 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -17,7 +17,6 @@ /// of os_inchar(), so they are processed "just-in-time". EXTERN MultiQueue *ch_before_blocking_events INIT(= NULL); - #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/channel.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/channel_defs.h b/src/nvim/msgpack_rpc/channel_defs.h index 6647779db9..e622ebddf5 100644 --- a/src/nvim/msgpack_rpc/channel_defs.h +++ b/src/nvim/msgpack_rpc/channel_defs.h @@ -6,16 +6,19 @@ #include <uv.h> #include "nvim/api/private/defs.h" +#include "nvim/api/private/dispatch.h" #include "nvim/event/process.h" #include "nvim/event/socket.h" #include "nvim/vim.h" typedef struct Channel Channel; +typedef struct Unpacker Unpacker; typedef struct { uint32_t request_id; bool returned, errored; Object result; + ArenaMem result_mem; } ChannelCallFrame; typedef struct { @@ -24,12 +27,13 @@ typedef struct { MsgpackRpcRequestHandler handler; Array args; uint32_t request_id; + ArenaMem used_mem; } RequestEvent; typedef struct { PMap(cstr_t) subscribed_events[1]; bool closed; - msgpack_unpacker *unpacker; + Unpacker *unpacker; uint32_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; Dictionary info; diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 32014fcf2b..488321be42 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -22,7 +22,6 @@ static msgpack_zone zone; static msgpack_sbuffer sbuffer; - void msgpack_rpc_helpers_init(void) { msgpack_zone_init(&zone, 0xfff); @@ -252,7 +251,6 @@ bool msgpack_rpc_to_dictionary(const msgpack_object *const obj, Dictionary *cons arg->size = obj->via.array.size; arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); - for (uint32_t i = 0; i < obj->via.map.size; i++) { if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, &arg->items[i].key)) { diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h index e5fd92374d..dab8a16b6b 100644 --- a/src/nvim/msgpack_rpc/helpers.h +++ b/src/nvim/msgpack_rpc/helpers.h @@ -21,4 +21,3 @@ #endif #endif // NVIM_MSGPACK_RPC_HELPERS_H - diff --git a/src/nvim/msgpack_rpc/server.c b/src/nvim/msgpack_rpc/server.c index e954e4b3a3..b252f0998e 100644 --- a/src/nvim/msgpack_rpc/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -22,7 +22,7 @@ #include "nvim/vim.h" #define MAX_CONNECTIONS 32 -#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS" +#define ENV_LISTEN "NVIM_LISTEN_ADDRESS" // deprecated static garray_T watchers = GA_EMPTY_INIT_VALUE; @@ -35,20 +35,29 @@ bool server_init(const char *listen_addr) { ga_init(&watchers, sizeof(SocketWatcher *), 1); - // $NVIM_LISTEN_ADDRESS - const char *env_addr = os_getenv(LISTEN_ADDRESS_ENV_VAR); - int rv = listen_addr == NULL ? 1 : server_start(listen_addr); + // $NVIM_LISTEN_ADDRESS (deprecated) + if (!listen_addr && os_env_exists(ENV_LISTEN)) { + listen_addr = os_getenv(ENV_LISTEN); + } + int rv = listen_addr ? server_start(listen_addr) : 1; if (0 != rv) { - rv = env_addr == NULL ? 1 : server_start(env_addr); - if (0 != rv) { - listen_addr = server_address_new(); - if (listen_addr == NULL) { - return false; - } - rv = server_start(listen_addr); - xfree((char *)listen_addr); + listen_addr = server_address_new(NULL); + if (!listen_addr) { + return false; } + rv = server_start(listen_addr); + xfree((char *)listen_addr); + } + + if (os_env_exists(ENV_LISTEN)) { + // Unset $NVIM_LISTEN_ADDRESS, it's a liability hereafter. + os_unsetenv(ENV_LISTEN); + } + + // TODO(justinmk): this is for logging_spec. Can remove this after nvim_log #7062 is merged. + if (os_env_exists("__NVIM_TEST_LOG")) { + ELOG("test log message"); } return rv == 0; @@ -60,8 +69,8 @@ static void close_socket_watcher(SocketWatcher **watcher) socket_watcher_close(*watcher, free_server); } -/// Set v:servername to the first server in the server list, or unset it if no -/// servers are known. +/// Sets the "primary address" (v:servername and $NVIM) to the first server in +/// the server list, or unsets if no servers are known. static void set_vservername(garray_T *srvs) { char *default_server = (srvs->ga_len > 0) @@ -78,23 +87,26 @@ void server_teardown(void) /// Generates unique address for local server. /// -/// In Windows this is a named pipe in the format -/// \\.\pipe\nvim-<PID>-<COUNTER>. -/// -/// For other systems it is a path returned by vim_tempname(). -/// -/// This function is NOT thread safe -char *server_address_new(void) +/// Named pipe format: +/// - Windows: "\\.\pipe\<name>.<pid>.<counter>" +/// - Other: "/tmp/nvim.user/xxx/<name>.<pid>.<counter>" +char *server_address_new(const char *name) { -#ifdef WIN32 static uint32_t count = 0; - char template[ADDRESS_MAX_SIZE]; - snprintf(template, ADDRESS_MAX_SIZE, - "\\\\.\\pipe\\nvim-%" PRIu64 "-%" PRIu32, os_get_pid(), count++); - return xstrdup(template); + char fmt[ADDRESS_MAX_SIZE]; +#ifdef WIN32 + int r = snprintf(fmt, sizeof(fmt), "\\\\.\\pipe\\%s.%" PRIu64 ".%" PRIu32, + name ? name : "nvim", os_get_pid(), count++); #else - return (char *)vim_tempname(); + char *dir = stdpaths_get_xdg_var(kXDGRuntimeDir); + int r = snprintf(fmt, sizeof(fmt), "%s/%s.%" PRIu64 ".%" PRIu32, + dir, name ? name : "nvim", os_get_pid(), count++); + xfree(dir); #endif + if ((size_t)r >= sizeof(fmt)) { + ELOG("truncated server address"); + } + return xstrdup(fmt); } /// Check if this instance owns a pipe address. @@ -109,35 +121,35 @@ bool server_owns_pipe_address(const char *path) return false; } -/// Starts listening for API calls. +/// Starts listening for RPC calls. /// -/// The socket type is determined by parsing `endpoint`: If it's a valid IPv4 -/// or IPv6 address in 'ip:[port]' format, then it will be a TCP socket. -/// Otherwise it will be a Unix socket or named pipe (Windows). +/// Socket type is decided by the format of `addr`: +/// - TCP socket if it looks like an IPv4/6 address ("ip:[port]"). +/// - If [port] is omitted, a random one is assigned. +/// - Unix socket (or named pipe on Windows) otherwise. +/// - If the name doesn't contain slashes it is appended to a generated path. #8519 /// -/// If no port is given, a random one will be assigned. -/// -/// @param endpoint Address of the server. Either a 'ip:[port]' string or an -/// arbitrary identifier (trimmed to 256 bytes) for the Unix -/// socket or named pipe. -/// @returns 0: success, 1: validation error, 2: already listening, -/// -errno: failed to bind or listen. -int server_start(const char *endpoint) +/// @param addr Server address: a "ip:[port]" string or arbitrary name or filepath (max 256 bytes) +/// for the Unix socket or named pipe. +/// @returns 0: success, 1: validation error, 2: already listening, -errno: failed to bind/listen. +int server_start(const char *addr) { - if (endpoint == NULL || endpoint[0] == '\0') { - WLOG("Empty or NULL endpoint"); + if (addr == NULL || addr[0] == '\0') { + WLOG("Empty or NULL address"); return 1; } + bool isname = !strstr(addr, ":") && !strstr(addr, "/") && !strstr(addr, "\\"); + char *addr_gen = isname ? server_address_new(addr) : NULL; SocketWatcher *watcher = xmalloc(sizeof(SocketWatcher)); - - int result = socket_watcher_init(&main_loop, watcher, endpoint); + int result = socket_watcher_init(&main_loop, watcher, isname ? addr_gen : addr); + xfree(addr_gen); if (result < 0) { xfree(watcher); return result; } - // Check if a watcher for the endpoint already exists + // Check if a watcher for the address already exists. for (int i = 0; i < watchers.ga_len; i++) { if (!strcmp(watcher->addr, ((SocketWatcher **)watchers.ga_data)[i]->addr)) { ELOG("Already listening on %s", watcher->addr); @@ -156,12 +168,6 @@ int server_start(const char *endpoint) return result; } - // Update $NVIM_LISTEN_ADDRESS, if not set. - const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); - if (listen_address == NULL) { - os_setenv(LISTEN_ADDRESS_ENV_VAR, watcher->addr, 1); - } - // Add the watcher to the list. ga_grow(&watchers, 1); ((SocketWatcher **)watchers.ga_data)[watchers.ga_len++] = watcher; @@ -200,12 +206,6 @@ bool server_stop(char *endpoint) return false; } - // Unset $NVIM_LISTEN_ADDRESS if it is the stopped address. - const char *listen_address = os_getenv(LISTEN_ADDRESS_ENV_VAR); - if (listen_address && STRCMP(addr, listen_address) == 0) { - os_unsetenv(LISTEN_ADDRESS_ENV_VAR); - } - socket_watcher_close(watcher, free_server); // Remove this server from the list by swapping it with the last item. @@ -215,8 +215,8 @@ bool server_stop(char *endpoint) } watchers.ga_len--; - // If v:servername is the stopped address, re-initialize it. - if (STRCMP(addr, get_vim_var_str(VV_SEND_SERVER)) == 0) { + // Bump v:servername to the next available server, if any. + if (strequal(addr, (char *)get_vim_var_str(VV_SEND_SERVER))) { set_vservername(&watchers); } diff --git a/src/nvim/msgpack_rpc/unpacker.c b/src/nvim/msgpack_rpc/unpacker.c new file mode 100644 index 0000000000..26c1843026 --- /dev/null +++ b/src/nvim/msgpack_rpc/unpacker.c @@ -0,0 +1,322 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + +#include "nvim/api/private/helpers.h" +#include "nvim/log.h" +#include "nvim/memory.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/unpacker.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/unpacker.c.generated.h" +#endif + +Object unpack(const char *data, size_t size, Error *err) +{ + Unpacker unpacker; + mpack_parser_init(&unpacker.parser, 0); + unpacker.parser.data.p = &unpacker; + + int result = mpack_parse(&unpacker.parser, &data, &size, + api_parse_enter, api_parse_exit); + + if (result == MPACK_NOMEM) { + api_set_error(err, kErrorTypeException, "object was too deep to unpack"); + } else if (result == MPACK_EOF) { + api_set_error(err, kErrorTypeException, "incomplete msgpack string"); + } else if (result == MPACK_ERROR) { + api_set_error(err, kErrorTypeException, "invalid msgpack string"); + } else if (result == MPACK_OK && size) { + api_set_error(err, kErrorTypeException, "trailing data in msgpack string"); + } + + return unpacker.result; +} + +static void api_parse_enter(mpack_parser_t *parser, mpack_node_t *node) +{ + Unpacker *p = parser->data.p; + Object *result = NULL; + String *key_location = NULL; + + mpack_node_t *parent = MPACK_PARENT_NODE(node); + if (parent) { + switch (parent->tok.type) { + case MPACK_TOKEN_ARRAY: { + Object *obj = parent->data[0].p; + result = &kv_A(obj->data.array, parent->pos); + break; + } + case MPACK_TOKEN_MAP: { + Object *obj = parent->data[0].p; + KeyValuePair *kv = &kv_A(obj->data.dictionary, parent->pos); + if (!parent->key_visited) { + // TODO(bfredl): when implementing interrupt parse on error, + // stop parsing here when node is not a STR/BIN + kv->key = (String)STRING_INIT; + key_location = &kv->key; + } + result = &kv->value; + break; + } + + case MPACK_TOKEN_STR: + case MPACK_TOKEN_BIN: + case MPACK_TOKEN_EXT: + assert(node->tok.type == MPACK_TOKEN_CHUNK); + break; + + default: + abort(); + } + } else { + result = &p->result; + } + + switch (node->tok.type) { + case MPACK_TOKEN_NIL: + *result = NIL; + break; + case MPACK_TOKEN_BOOLEAN: + *result = BOOL(mpack_unpack_boolean(node->tok)); + break; + case MPACK_TOKEN_SINT: + *result = INTEGER_OBJ(mpack_unpack_sint(node->tok)); + break; + case MPACK_TOKEN_UINT: + *result = INTEGER_OBJ((Integer)mpack_unpack_uint(node->tok)); + break; + case MPACK_TOKEN_FLOAT: + *result = FLOAT_OBJ(mpack_unpack_float(node->tok)); + break; + + case MPACK_TOKEN_BIN: + case MPACK_TOKEN_STR: { + char *mem = arena_alloc(&p->arena, node->tok.length + 1, false); + mem[node->tok.length] = NUL; + String str = { .data = mem, .size = node->tok.length }; + if (key_location) { + *key_location = str; + } else { + *result = STRING_OBJ(str); + } + node->data[0].p = str.data; + break; + } + case MPACK_TOKEN_EXT: + // handled in chunk; but save result location + node->data[0].p = result; + break; + case MPACK_TOKEN_CHUNK: + assert(parent); + if (parent->tok.type == MPACK_TOKEN_STR || parent->tok.type == MPACK_TOKEN_BIN) { + char *data = parent->data[0].p; + memcpy(data + parent->pos, + node->tok.data.chunk_ptr, node->tok.length); + } else { + Object *res = parent->data[0].p; + + size_t endlen = parent->pos + node->tok.length; + if (endlen > MAX_EXT_LEN) { + *res = NIL; + break; + } + memcpy(p->ext_buf + parent->pos, + node->tok.data.chunk_ptr, node->tok.length); + if (parent->pos + node->tok.length < parent->tok.length) { + break; // EOF, let's get back to it later + } + const char *buf = p->ext_buf; + size_t size = parent->tok.length; + mpack_token_t ext_tok; + int status = mpack_rtoken(&buf, &size, &ext_tok); + if (status || ext_tok.type != MPACK_TOKEN_UINT) { + // TODO(bfredl): once we fixed memory management, we can set + // p->unpack_error and a flag like p->interrupted + *res = NIL; + break; + } + int ext_type = parent->tok.data.ext_type; + if (0 <= ext_type && ext_type <= EXT_OBJECT_TYPE_MAX) { + res->type = (ObjectType)(ext_type + EXT_OBJECT_TYPE_SHIFT); + res->data.integer = (int64_t)mpack_unpack_uint(ext_tok); + } else { + *res = NIL; + break; + } + } + break; + + case MPACK_TOKEN_ARRAY: { + Array arr = KV_INITIAL_VALUE; + kv_fixsize_arena(&p->arena, arr, node->tok.length); + kv_size(arr) = node->tok.length; + *result = ARRAY_OBJ(arr); + node->data[0].p = result; + break; + } + case MPACK_TOKEN_MAP: { + Dictionary dict = KV_INITIAL_VALUE; + kv_fixsize_arena(&p->arena, dict, node->tok.length); + kv_size(dict) = node->tok.length; + *result = DICTIONARY_OBJ(dict); + node->data[0].p = result; + break; + } + + default: + abort(); + } +} + +static void api_parse_exit(mpack_parser_t *parser, mpack_node_t *node) +{} + +void unpacker_init(Unpacker *p) +{ + mpack_parser_init(&p->parser, 0); + p->parser.data.p = p; + mpack_tokbuf_init(&p->reader); + p->unpack_error = (Error)ERROR_INIT; + + p->arena = (Arena)ARENA_EMPTY; + p->reuse_blk = NULL; +} + +void unpacker_teardown(Unpacker *p) +{ + arena_mem_free(p->reuse_blk, NULL); + arena_mem_free(arena_finish(&p->arena), NULL); +} + +bool unpacker_parse_header(Unpacker *p) +{ + mpack_token_t tok; + int result; + + const char *data = p->read_ptr; + size_t size = p->read_size; + + assert(!ERROR_SET(&p->unpack_error)); + +#define NEXT(tok) \ + result = mpack_read(&p->reader, &data, &size, &tok); \ + if (result) { goto error; } + + NEXT(tok); + if (tok.type != MPACK_TOKEN_ARRAY || tok.length < 3 || tok.length > 4) { + goto error; + } + size_t array_length = tok.length; + + NEXT(tok); + if (tok.type != MPACK_TOKEN_UINT) { + goto error; + } + uint32_t type = (uint32_t)mpack_unpack_uint(tok); + if ((array_length == 3) ? type != 2 : (type >= 2)) { + goto error; + } + p->type = (MessageType)type; + p->request_id = 0; + + if (p->type != kMessageTypeNotification) { + NEXT(tok); + if (tok.type != MPACK_TOKEN_UINT) { + goto error; + } + p->request_id = (uint32_t)mpack_unpack_uint(tok); + } + + if (p->type != kMessageTypeResponse) { + NEXT(tok); + if ((tok.type != MPACK_TOKEN_STR && tok.type != MPACK_TOKEN_BIN) + || tok.length > 100) { + goto error; + } + p->method_name_len = tok.length; + + if (p->method_name_len > 0) { + NEXT(tok); + assert(tok.type == MPACK_TOKEN_CHUNK); + } + if (tok.length < p->method_name_len) { + result = MPACK_EOF; + goto error; + } + // if this fails, p->handler.fn will be NULL + p->handler = msgpack_rpc_get_handler_for(tok.length ? tok.data.chunk_ptr : "", + tok.length, &p->unpack_error); + } + + p->read_ptr = data; + p->read_size = size; + return true; +#undef NEXT + +error: + if (result == MPACK_EOF) { + // recover later by retrying from scratch + // when more data is available. + mpack_tokbuf_init(&p->reader); + } else { + api_set_error(&p->unpack_error, kErrorTypeValidation, "failed to decode msgpack"); + p->state = -1; + } + return false; +} + +// BASIC BITCH STATE MACHINE +// +// With some basic assumptions, we can parse the overall structure of msgpack-rpc +// messages with a hand-rolled FSM of just 3 states (<x> = p->state): +// +// <0>[0, request_id, method_name, <2>args] +// <0>[1, request_id, <1>err, <2>result] +// <0>[2, method_name, <2>args] +// +// The assumption here is that the header of the message, which we define as the +// initial array head, the kind integer, request_id and/or method name (when needed), +// is relatively small, just ~10 bytes + the method name. Thus we can simply refuse +// to advance the stream beyond the header until it can be parsed in its entirety. +// +// Of course, later on, we want to specialize state 2 into sub-states depending +// on the specific method. "nvim_exec_lua" should just decode direct into lua +// objects, and "redraw/grid_line" should use a hand-rolled decoder to avoid +// a blizzard of small objects for each screen cell. + +bool unpacker_advance(Unpacker *p) +{ + assert(p->state >= 0); + if (p->state == 0) { + if (!unpacker_parse_header(p)) { + return false; + } + p->state = p->type == kMessageTypeResponse ? 1 : 2; + arena_start(&p->arena, &p->reuse_blk); + } + + int result; + +rerun: + result = mpack_parse(&p->parser, &p->read_ptr, &p->read_size, + api_parse_enter, api_parse_exit); + + if (result == MPACK_EOF) { + return false; + } else if (result != MPACK_OK) { + api_set_error(&p->unpack_error, kErrorTypeValidation, "failed to parse msgpack"); + p->state = -1; + return false; + } + + if (p->state == 1) { + p->error = p->result; + p->state = 2; + goto rerun; + } else { + assert(p->state == 2); + p->state = 0; + } + return true; +} diff --git a/src/nvim/msgpack_rpc/unpacker.h b/src/nvim/msgpack_rpc/unpacker.h new file mode 100644 index 0000000000..e0dc6f0a68 --- /dev/null +++ b/src/nvim/msgpack_rpc/unpacker.h @@ -0,0 +1,46 @@ +#ifndef NVIM_MSGPACK_RPC_UNPACKER_H +#define NVIM_MSGPACK_RPC_UNPACKER_H + +#include <inttypes.h> +#include <stdbool.h> +#include <string.h> + +#include "mpack/mpack_core.h" +#include "mpack/object.h" +#include "nvim/api/private/dispatch.h" +#include "nvim/api/private/helpers.h" +#include "nvim/memory.h" +#include "nvim/msgpack_rpc/channel_defs.h" + +struct Unpacker { + mpack_parser_t parser; + mpack_tokbuf_t reader; + + const char *read_ptr; + size_t read_size; + +#define MAX_EXT_LEN 9 // byte + 8-byte integer + char ext_buf[MAX_EXT_LEN]; + + int state; + MessageType type; + uint32_t request_id; + size_t method_name_len; + MsgpackRpcRequestHandler handler; + Object error; // error return + Object result; // arg list or result + Error unpack_error; + + Arena arena; + // one lenght free-list of reusable blocks + ArenaMem reuse_blk; +}; + +// unrecovareble error. unpack_error should be set! +#define unpacker_closed(p) ((p)->state < 0) + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/unpacker.h.generated.h" +#endif + +#endif // NVIM_MSGPACK_RPC_UNPACKER_H |