diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 247 |
1 files changed, 104 insertions, 143 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ee7359c476..a8fde5a652 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -1,7 +1,6 @@ #include <assert.h> #include <inttypes.h> #include <msgpack/object.h> -#include <msgpack/pack.h> #include <msgpack/sbuffer.h> #include <msgpack/unpack.h> #include <stdbool.h> @@ -29,7 +28,7 @@ #include "nvim/message.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/channel_defs.h" -#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/packer.h" #include "nvim/msgpack_rpc/unpacker.h" #include "nvim/os/input.h" #include "nvim/rbuffer.h" @@ -44,73 +43,31 @@ # define NOT "[notify] " # define ERR "[error] " -// Cannot define array with negative offsets, so this one is needed to be added -// to MSGPACK_UNPACK_\* values. -# define MUR_OFF 2 +# define SEND "->" +# define RECV "<-" -static const char *const msgpack_error_messages[] = { - [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found", - [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string", - [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error", - [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory", -}; - -static void log_close(FILE *f) +static void log_request(char *dir, uint64_t channel_id, uint32_t req_id, const char *name) { - fputc('\n', f); - fflush(f); - fclose(f); - log_unlock(); + DLOGN("RPC %s %" PRIu64 ": %s id=%u: %s\n", dir, channel_id, REQ, req_id, name); } -static void log_server_msg(uint64_t channel_id, msgpack_sbuffer *packed) +static void log_response(char *dir, uint64_t channel_id, char *kind, uint32_t req_id) { - msgpack_unpacked unpacked; - msgpack_unpacked_init(&unpacked); - DLOGN("RPC ->ch %" PRIu64 ": ", channel_id); - const msgpack_unpack_return result = - msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); - switch (result) { - case MSGPACK_UNPACK_SUCCESS: { - uint64_t type = unpacked.data.via.array.ptr[0].via.u64; - log_lock(); - FILE *f = open_log_file(); - fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); - msgpack_object_print(f, unpacked.data); - log_close(f); - msgpack_unpacked_destroy(&unpacked); - break; - } - case MSGPACK_UNPACK_EXTRA_BYTES: - case MSGPACK_UNPACK_CONTINUE: - case MSGPACK_UNPACK_PARSE_ERROR: - case MSGPACK_UNPACK_NOMEM_ERROR: { - log_lock(); - FILE *f = open_log_file(); - fprintf(f, ERR); - fprintf(f, "%s", msgpack_error_messages[result + MUR_OFF]); - log_close(f); - break; - } - } + DLOGN("RPC %s %" PRIu64 ": %s id=%u\n", dir, channel_id, kind, req_id); } -static void log_client_msg(uint64_t channel_id, bool is_request, const char *name) +static void log_notify(char *dir, uint64_t channel_id, const char *name) { - DLOGN("RPC <-ch %" PRIu64 ": ", channel_id); - log_lock(); - FILE *f = open_log_file(); - fprintf(f, "%s: %s", is_request ? REQ : RES, name); - log_close(f); + DLOGN("RPC %s %" PRIu64 ": %s %s\n", dir, channel_id, NOT, name); } #else -# define log_client_msg(...) -# define log_server_msg(...) +# define log_request(...) +# define log_response(...) +# define log_notify(...) #endif static Set(cstr_t) event_strings = SET_INIT; -static msgpack_sbuffer out_buffer; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/channel.c.generated.h" @@ -119,7 +76,6 @@ static msgpack_sbuffer out_buffer; void rpc_init(void) { ch_before_blocking_events = multiqueue_new_child(main_loop.events); - msgpack_sbuffer_init(&out_buffer); } void rpc_start(Channel *channel) @@ -169,8 +125,9 @@ bool rpc_send_event(uint64_t id, const char *name, Array args) return false; } + log_notify(SEND, channel ? channel->id : 0, name); if (channel) { - send_event(channel, name, args); + serialize_request(&channel, 1, 0, name, args); } else { broadcast_event(name, args); } @@ -199,7 +156,9 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem RpcState *rpc = &channel->rpc; uint32_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request - send_request(channel, request_id, method_name, args); + serialize_request(&channel, 1, request_id, method_name, args); + + log_request(SEND, channel->id, request_id, method_name); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL, NULL }; @@ -361,8 +320,13 @@ static void parse_msgpack(Channel *channel) frame->result = p->result; } frame->result_mem = arena_finish(&p->arena); + log_response(RECV, channel->id, frame->errored ? ERR : RES, p->request_id); } else { - log_client_msg(channel->id, p->type == kMessageTypeRequest, p->handler.name); + if (p->type == kMessageTypeNotification) { + log_notify(RECV, channel->id, p->handler.name); + } else { + log_request(RECV, channel->id, p->request_id, p->handler.name); + } Object res = p->result; if (p->result.type != kObjectTypeArray) { @@ -442,15 +406,7 @@ static void request_event(void **argv) Object result = handler.fn(channel->id, e->args, &e->used_mem, &error); if (e->type == kMessageTypeRequest || ERROR_SET(&error)) { // Send the response. - msgpack_packer response; - msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); - channel_write(channel, serialize_response(channel->id, - e->handler, - e->type, - e->request_id, - &error, - &result, - &out_buffer)); + serialize_response(channel, e->handler, e->type, e->request_id, &error, &result); } if (handler.ret_alloc) { api_free_object(result); @@ -533,41 +489,14 @@ static void send_error(Channel *chan, MsgpackRpcRequestHandler handler, MessageT { Error e = ERROR_INIT; api_set_error(&e, kErrorTypeException, "%s", err); - channel_write(chan, serialize_response(chan->id, - handler, - type, - id, - &e, - &NIL, - &out_buffer)); + serialize_response(chan, handler, type, id, &e, &NIL); api_clear_error(&e); } -static void send_request(Channel *channel, uint32_t id, const char *name, Array args) -{ - const String method = cstr_as_string(name); - channel_write(channel, serialize_request(channel->id, - id, - method, - args, - &out_buffer, - 1)); -} - -static void send_event(Channel *channel, const char *name, Array args) -{ - const String method = cstr_as_string(name); - channel_write(channel, serialize_request(channel->id, - 0, - method, - args, - &out_buffer, - 1)); -} - static void broadcast_event(const char *name, Array args) { - kvec_t(Channel *) subscribed = KV_INITIAL_VALUE; + kvec_withinit_t(Channel *, 4) subscribed = KV_INITIAL_VALUE; + kvi_init(subscribed); Channel *channel; map_foreach_value(&channels, channel, { @@ -577,25 +506,11 @@ static void broadcast_event(const char *name, Array args) } }); - if (!kv_size(subscribed)) { - goto end; - } - - const String method = cstr_as_string(name); - WBuffer *buffer = serialize_request(0, - 0, - method, - args, - &out_buffer, - kv_size(subscribed)); - - for (size_t i = 0; i < kv_size(subscribed); i++) { - Channel *c = kv_A(subscribed, i); - channel_write(c, buffer); + if (kv_size(subscribed)) { + serialize_request(subscribed.items, kv_size(subscribed), 0, name, args); } -end: - kv_destroy(subscribed); + kvi_destroy(subscribed); } static void unsubscribe(Channel *channel, char *event) @@ -653,27 +568,28 @@ static void chan_close_with_error(Channel *channel, char *msg, int loglevel) channel_close(channel->id, kChannelPartRpc, NULL); } -static WBuffer *serialize_request(uint64_t channel_id, uint32_t request_id, const String method, - Array args, msgpack_sbuffer *sbuffer, size_t refcount) +static void serialize_request(Channel **chans, size_t nchans, uint32_t request_id, + const char *method, Array args) { - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_rpc_serialize_request(request_id, method, args, &pac); - log_server_msg(channel_id, sbuffer); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - xfree); - msgpack_sbuffer_clear(sbuffer); - return rv; + PackerBuffer packer; + packer_buffer_init_channels(chans, nchans, &packer); + + mpack_array(&packer.ptr, request_id ? 4 : 3); + mpack_w(&packer.ptr, request_id ? 0 : 2); + + if (request_id) { + mpack_uint(&packer.ptr, request_id); + } + + mpack_str(cstr_as_string(method), &packer); + mpack_object_array(args, &packer); + + packer_buffer_finish_channels(&packer); } -static WBuffer *serialize_response(uint64_t channel_id, MsgpackRpcRequestHandler handler, - MessageType type, uint32_t response_id, Error *err, Object *arg, - msgpack_sbuffer *sbuffer) +void serialize_response(Channel *channel, MsgpackRpcRequestHandler handler, MessageType type, + uint32_t response_id, Error *err, Object *arg) { - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); if (ERROR_SET(err) && type == kMessageTypeNotification) { if (handler.fn == handle_nvim_paste) { // TODO(bfredl): this is pretty much ad-hoc. maybe TUI and UI:s should be @@ -685,19 +601,65 @@ static WBuffer *serialize_response(uint64_t channel_id, MsgpackRpcRequestHandler MAXSIZE_TEMP_ARRAY(args, 2); ADD_C(args, INTEGER_OBJ(err->type)); ADD_C(args, CSTR_AS_OBJ(err->msg)); - msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"), - args, &pac); + serialize_request(&channel, 1, 0, "nvim_error_event", args); } + return; + } + + PackerBuffer packer; + packer_buffer_init_channels(&channel, 1, &packer); + + mpack_array(&packer.ptr, 4); + mpack_w(&packer.ptr, 1); + mpack_uint(&packer.ptr, response_id); + + if (ERROR_SET(err)) { + // error represented by a [type, message] array + mpack_array(&packer.ptr, 2); + mpack_integer(&packer.ptr, err->type); + mpack_str(cstr_as_string(err->msg), &packer); + // Nil result + mpack_nil(&packer.ptr); } else { - msgpack_rpc_serialize_response(response_id, err, arg, &pac); + // Nil error + mpack_nil(&packer.ptr); + // Return value + mpack_object(arg, &packer); } - log_server_msg(channel_id, sbuffer); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - xfree); - msgpack_sbuffer_clear(sbuffer); - return rv; + + packer_buffer_finish_channels(&packer); + + log_response(SEND, channel->id, ERROR_SET(err) ? ERR : RES, response_id); +} + +static void packer_buffer_init_channels(Channel **chans, size_t nchans, PackerBuffer *packer) +{ + packer->startptr = alloc_block(); + packer->ptr = packer->startptr; + packer->endptr = packer->startptr + ARENA_BLOCK_SIZE; + packer->packer_flush = channel_flush_callback; + packer->anydata = chans; + packer->anylen = nchans; +} + +static void packer_buffer_finish_channels(PackerBuffer *packer) +{ + size_t len = (size_t)(packer->ptr - packer->startptr); + if (len > 0) { + WBuffer *buf = wstream_new_buffer(packer->startptr, len, packer->anylen, free_block); + Channel **chans = packer->anydata; + for (size_t i = 0; i < packer->anylen; i++) { + channel_write(chans[i], buf); + } + } else { + free_block(packer->startptr); + } +} + +static void channel_flush_callback(PackerBuffer *packer) +{ + packer_buffer_finish_channels(packer); + packer_buffer_init_channels(packer->anydata, packer->anylen, packer); } void rpc_set_client_info(uint64_t id, Dictionary info) @@ -762,7 +724,6 @@ void rpc_free_all_mem(void) }); set_destroy(cstr_t, &event_strings); - msgpack_sbuffer_destroy(&out_buffer); multiqueue_free(ch_before_blocking_events); } #endif |