From b00a37544c2d0662e251fd80033509cb6ed8ffe2 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:36:36 -0300 Subject: event: Bail out of event_poll when any event is processed The loop condition was set to only exit when user input is processed, but we must exit on any event to properly notify `event_poll` callers --- src/nvim/os/event.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 6723b97e0c..8b7bc53978 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -99,13 +99,9 @@ bool event_poll(int32_t ms) // Run one event loop iteration, blocking for events if run_mode is // UV_RUN_ONCE uv_run(uv_default_loop(), run_mode); - // Process immediate events outside uv_run since libuv event loop not - // support recursion(processing events may cause a recursive event_poll - // call) - event_process(false); } while ( // Continue running if ... - !input_ready() && // we have no input + !event_process(false) && // we didn't process any immediate events !event_has_deferred() && // no events are waiting to be processed run_mode != UV_RUN_NOWAIT && // ms != 0 !timer_data.timed_out); // we didn't get a timeout @@ -139,11 +135,13 @@ void event_push(Event event, bool deferred) } // Runs the appropriate action for each queued event -void event_process(bool deferred) +bool event_process(bool deferred) { + bool processed_events = false; Event event; while (kl_shift(Event, get_queue(deferred), &event) == 0) { + processed_events = true; switch (event.type) { case kEventSignal: signal_handle(event); @@ -158,6 +156,8 @@ void event_process(bool deferred) abort(); } } + + return processed_events; } // Set a flag in the `event_poll` loop for signaling of a timeout -- cgit From ef4c5ccb21bda85cea7fa3c825f865c95f6cdbb4 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:36:58 -0300 Subject: event: Decouple user input checks from `event_poll` This was done to generalize the usage of `event_poll`, which will now return `true` only if a event has been processed/deferred before the timeout(if not -1). To do that, the `input_ready` calls have been extracted to the input.c module(the `event_poll` call has been surrounded by `input_ready` calls, resulting in the same behavior). The `input_start`/`input_stop` calls still present in `event_poll` are temporary: When the API becomes the only way to read user input, it will no longer be necessary to start/stop the input stream. --- src/nvim/os/event.c | 12 +++++------- src/nvim/os/input.c | 22 ++++++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 8b7bc53978..a8bd6ca886 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -63,11 +63,6 @@ bool event_poll(int32_t ms) { uv_run_mode run_mode = UV_RUN_ONCE; - if (input_ready()) { - // If there's a pending input event to be consumed, do it now - return true; - } - static int recursive = 0; if (!(recursive++)) { @@ -95,13 +90,16 @@ bool event_poll(int32_t ms) run_mode = UV_RUN_NOWAIT; } + bool events_processed; + do { // Run one event loop iteration, blocking for events if run_mode is // UV_RUN_ONCE uv_run(uv_default_loop(), run_mode); + events_processed = event_process(false); } while ( // Continue running if ... - !event_process(false) && // we didn't process any immediate events + !events_processed && // we didn't process any immediate events !event_has_deferred() && // no events are waiting to be processed run_mode != UV_RUN_NOWAIT && // ms != 0 !timer_data.timed_out); // we didn't get a timeout @@ -120,7 +118,7 @@ bool event_poll(int32_t ms) event_process(false); } - return input_ready() || event_has_deferred(); + return !timer_data.timed_out && (events_processed || event_has_deferred()); } bool event_has_deferred() diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 6e42cba4ad..0f6d2df12f 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -37,12 +37,6 @@ void input_init() rstream_set_file(read_stream, read_cmd_fd); } -// Check if there's pending input -bool input_ready() -{ - return rstream_available(read_stream) > 0 || eof; -} - // Listen for input void input_start() { @@ -119,7 +113,7 @@ bool os_char_avail() // In cooked mode we should get SIGINT, no need to check. void os_breakcheck() { - if (curr_tmode == TMODE_RAW && event_poll(0)) + if (curr_tmode == TMODE_RAW && input_poll(0)) fill_input_buf(false); } @@ -132,6 +126,11 @@ bool os_isatty(int fd) return uv_guess_handle(fd) == UV_TTY; } +static bool input_poll(int32_t ms) +{ + return input_ready() || event_poll(ms) || input_ready(); +} + // This is a replacement for the old `WaitForChar` function in os_unix.c static InbufPollResult inbuf_poll(int32_t ms) { @@ -139,7 +138,7 @@ static InbufPollResult inbuf_poll(int32_t ms) return kInputAvail; } - if (event_poll(ms)) { + if (input_poll(ms)) { return eof && rstream_available(read_stream) == 0 ? kInputEof : kInputAvail; @@ -196,3 +195,10 @@ static int push_event_key(uint8_t *buf, int maxlen) return buf_idx; } + +// Check if there's pending input +bool input_ready() +{ + return rstream_available(read_stream) > 0 || eof; +} + -- cgit From 0fd46ae8f08375dd160aa1566d5678b1e51804ce Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:37:01 -0300 Subject: job: Fix vimscript wrapper by returning when an invalid id is passed --- src/nvim/eval.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 92075e46f8..3213e36f81 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -10535,6 +10535,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv) if (!job) { // Invalid job id EMSG(_(e_invjob)); + return; } WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string), -- cgit From 11916b6b595421ce2ece10f7aa40757cc4937c0c Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:37:04 -0300 Subject: wstream: Refactor buffer memory management - Extract code to release WBuffer instances into `release_wbuffer` - Fix memory leak when wstream_write returns false --- src/nvim/os/wstream.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 9a908a4348..a3037e477b 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -91,11 +91,13 @@ bool wstream_write(WStream *wstream, WBuffer *buffer) // This should not be called after a wstream was freed assert(!wstream->freed); + buffer->refcount++; + if (wstream->curmem > wstream->maxmem) { + release_wbuffer(buffer); return false; } - buffer->refcount++; wstream->curmem += buffer->size; data = xmalloc(sizeof(WriteData)); data->wstream = wstream; @@ -138,10 +140,7 @@ static void write_cb(uv_write_t *req, int status) free(req); data->wstream->curmem -= data->buffer->size; - if (!--data->buffer->refcount) { - data->buffer->cb(data->buffer->data); - free(data->buffer); - } + release_wbuffer(data->buffer); data->wstream->pending_reqs--; if (data->wstream->freed && data->wstream->pending_reqs == 0) { @@ -152,3 +151,10 @@ static void write_cb(uv_write_t *req, int status) free(data); } +static void release_wbuffer(WBuffer *buffer) +{ + if (!--buffer->refcount) { + buffer->cb(buffer->data); + free(buffer); + } +} -- cgit From 30fc6a4fd1b7b317e0f427fbbf981474524f55af Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:37:06 -0300 Subject: wstream: Use a default value of 10mb for `maxmem` when 0 is passed --- src/nvim/os/wstream.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index a3037e477b..5c10401958 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -9,6 +9,8 @@ #include "nvim/vim.h" #include "nvim/memory.h" +#define DEFAULT_MAXMEM 1024 * 1024 * 10 + struct wstream { uv_stream_t *stream; // Memory currently used by pending buffers @@ -43,6 +45,10 @@ typedef struct { /// @return The newly-allocated `WStream` instance WStream * wstream_new(size_t maxmem) { + if (!maxmem) { + maxmem = DEFAULT_MAXMEM; + } + WStream *rv = xmalloc(sizeof(WStream)); rv->maxmem = maxmem; rv->stream = NULL; -- cgit From c722e22ee660f72d6442475a87d77d61c13a9a42 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:52:49 -0300 Subject: wstream: Make wstream_write consider the return value from uv_write --- src/nvim/os/wstream.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 5c10401958..13b8e8d9dc 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -100,8 +100,7 @@ bool wstream_write(WStream *wstream, WBuffer *buffer) buffer->refcount++; if (wstream->curmem > wstream->maxmem) { - release_wbuffer(buffer); - return false; + goto err; } wstream->curmem += buffer->size; @@ -113,9 +112,16 @@ bool wstream_write(WStream *wstream, WBuffer *buffer) uvbuf.base = buffer->data; uvbuf.len = buffer->size; wstream->pending_reqs++; - uv_write(req, wstream->stream, &uvbuf, 1, write_cb); + + if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) { + goto err; + } return true; + +err: + release_wbuffer(buffer); + return false; } /// Creates a WBuffer object for holding output data. Instances of this -- cgit From 0dea2682dc3b1132fb86e807d50ac9ac4c063d76 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:52:53 -0300 Subject: job: Add a `maxmem` parameter to job_start The value is forwarded to it's own WStream instance --- src/nvim/eval.c | 1 + src/nvim/os/channel.c | 3 ++- src/nvim/os/job.c | 5 +++-- 3 files changed, 6 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 3213e36f81..7300e60b1a 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -10474,6 +10474,7 @@ static void f_job_start(typval_T *argvars, typval_T *rettv) on_job_stderr, on_job_exit, true, + 0, &rettv->vval.v_number); if (rettv->vval.v_number <= 0) { diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 653f09756a..f859544663 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -80,6 +80,7 @@ bool channel_from_job(char **argv) job_err, job_exit, true, + 0, &status); if (status <= 0) { @@ -104,7 +105,7 @@ void channel_from_stream(uv_stream_t *stream) rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream - channel->data.streams.write = wstream_new(1024 * 1024); + channel->data.streams.write = wstream_new(0); wstream_set_stream(channel->data.streams.write, stream); channel->data.streams.uv = stream; } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index b369004e47..dcf50243a9 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -21,7 +21,6 @@ #define EXIT_TIMEOUT 25 #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 1024 -#define JOB_WRITE_MAXMEM 1024 * 1024 struct job { // Job id the index in the job table plus one. @@ -131,6 +130,7 @@ void job_teardown() /// @param exit_cb Callback that will be invoked when the job exits /// @param defer If the job callbacks invocation should be deferred to vim /// main loop +/// @param maxmem Maximum amount of memory used by the job WStream /// @param[out] The job id if the job started successfully, 0 if the job table /// is full, -1 if the program could not be executed. /// @return The job pointer if the job started successfully, NULL otherwise @@ -140,6 +140,7 @@ Job *job_start(char **argv, rstream_cb stderr_cb, job_exit_cb job_exit_cb, bool defer, + size_t maxmem, int *status) { int i; @@ -210,7 +211,7 @@ Job *job_start(char **argv, handle_set_job((uv_handle_t *)&job->proc_stdout, job); handle_set_job((uv_handle_t *)&job->proc_stderr, job); - job->in = wstream_new(JOB_WRITE_MAXMEM); + job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); -- cgit From c8297e462a1f1a842370651ca67fe5abd45e3c1d Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:52:56 -0300 Subject: channel: Extract 'channel_write' function --- src/nvim/os/channel.c | 52 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index f859544663..7cc40a67e0 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -14,13 +14,14 @@ #include "nvim/os/msgpack_rpc.h" #include "nvim/vim.h" #include "nvim/memory.h" +#include "nvim/message.h" #include "nvim/map.h" #include "nvim/lib/kvec.h" typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job; + bool is_job, is_alive; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; union { @@ -215,12 +216,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); - wstream_write(channel->data.streams.write, - wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free)); - + WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), + channel->sbuffer->size, + free); + if (!channel_write(channel, buffer)) { + return; + } // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } @@ -240,19 +242,40 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting", &response); - wstream_write(channel->data.streams.write, - wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free)); + WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), + channel->sbuffer->size, + free); + if (!channel_write(channel, buffer)) { + return; + } // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); } } +static bool channel_write(Channel *channel, WBuffer *buffer) +{ + bool success; + + if (channel->is_job) { + success = job_write(channel->data.job, buffer); + } else { + success = wstream_write(channel->data.streams.write, buffer); + } + + if (!success) { + // If the write failed for whatever reason, mark the channel as not alive so + // it can be freed later + channel->is_alive = false; + } + + return success; +} + static void send_event(Channel *channel, char *type, Object data) { - wstream_write(channel->data.streams.write, serialize_event(type, data)); + channel_write(channel, serialize_event(type, data)); } static void broadcast_event(char *type, Object data) @@ -275,7 +298,7 @@ static void broadcast_event(char *type, Object data) WBuffer *buffer = serialize_event(type, data); for (size_t i = 0; i < kv_size(subscribed); i++) { - wstream_write(kv_A(subscribed, i)->data.streams.write, buffer); + channel_write(kv_A(subscribed, i), buffer); } end: @@ -349,6 +372,7 @@ static WBuffer *serialize_event(char *type, Object data) static Channel *register_channel() { Channel *rv = xmalloc(sizeof(Channel)); + rv->is_alive = true; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->sbuffer = msgpack_sbuffer_new(); rv->id = next_id++; -- cgit From c0b0bd07fd1bc92665be2b0e1eb6d5d9c736ddb1 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:52:59 -0300 Subject: channel: Extract function for sending errors that are not responses --- src/nvim/os/channel.c | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 7cc40a67e0..a0575de2e9 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -207,12 +207,12 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); UnpackResult result; - msgpack_packer response; // Deserialize everything we can. while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked)) == kUnpackResultOk) { // Each object is a new msgpack-rpc request and requires an empty response + msgpack_packer response; msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); // Perform the call msgpack_rpc_call(channel->id, &unpacked.data, &response); @@ -234,24 +234,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // A not so uncommon cause for this might be deserializing objects with // a high nesting level: msgpack will break when it's internal parse stack // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&response, 4); - msgpack_pack_int(&response, 1); - msgpack_pack_int(&response, 0); - msgpack_rpc_error("Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting", - &response); - WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free); - if (!channel_write(channel, buffer)) { - return; - } - // Clear the buffer for future calls - msgpack_sbuffer_clear(channel->sbuffer); + send_error(channel, "Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting"); + } +} + +static void send_error(Channel *channel, char *msg) +{ + msgpack_packer err; + // See src/msgpack/unpack_template.h in msgpack source tree for + // causes for this error(search for 'goto _failed') + // + // A not so uncommon cause for this might be deserializing objects with + // a high nesting level: msgpack will break when it's internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + msgpack_packer_init(&err, channel->sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&err, 4); + msgpack_pack_int(&err, 1); + msgpack_pack_int(&err, 0); + msgpack_rpc_error(msg, &err); + WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, + channel->sbuffer->size), + channel->sbuffer->size, + free); + if (!channel_write(channel, buffer)) { + return; } + // Clear the buffer for future calls + msgpack_sbuffer_clear(channel->sbuffer); } static bool channel_write(Channel *channel, WBuffer *buffer) -- cgit From 09605cec03ea23e87ee285fd950a23ce8d23678d Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:53:02 -0300 Subject: channel/msgpack_rpc: Refactor msgpack_rpc_notification/serialize_event - Generalize some argument names(event type -> event name, event data -> event arg) - Rename serialize_event to serialize_message - Rename msgpack_rpc_notification to msgpack_rpc_message - Extract the message type out of msgpack_rpc_message - Add 'id' parameter to msgpack_rpc_message/serialize_message to create messages that are not notifications --- src/nvim/api/vim.c | 10 ++++++---- src/nvim/os/channel.c | 40 ++++++++++++++++++++++++---------------- src/nvim/os/channel.h | 2 +- src/nvim/os/msgpack_rpc.c | 21 +++++++++++++++------ src/nvim/os/msgpack_rpc.h | 17 ++++++++++++----- 5 files changed, 58 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index e7261e1096..fbeb42cf4b 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -424,8 +424,8 @@ void vim_set_current_tabpage(Tabpage tabpage, Error *err) /// @param event The event type string void vim_subscribe(uint64_t channel_id, String event) { - size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN); - char e[EVENT_MAXLEN + 1]; + size_t length = (event.size < METHOD_MAXLEN ? event.size : METHOD_MAXLEN); + char e[METHOD_MAXLEN + 1]; memcpy(e, event.data, length); e[length] = NUL; channel_subscribe(channel_id, e); @@ -437,8 +437,10 @@ void vim_subscribe(uint64_t channel_id, String event) /// @param event The event type string void vim_unsubscribe(uint64_t channel_id, String event) { - size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN); - char e[EVENT_MAXLEN + 1]; + size_t length = (event.size < METHOD_MAXLEN ? + event.size : + METHOD_MAXLEN); + char e[METHOD_MAXLEN + 1]; memcpy(e, event.data, length); e[length] = NUL; channel_unsubscribe(channel_id, e); diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index a0575de2e9..4299f2a06d 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -115,21 +115,21 @@ void channel_from_stream(uv_stream_t *stream) /// /// @param id The channel id. If 0, the event will be sent to all /// channels that have subscribed to the event type -/// @param type The event type, an arbitrary string -/// @param obj The event data +/// @param name The event name, an arbitrary string +/// @param arg The event arg /// @return True if the data was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *type, Object data) +bool channel_send_event(uint64_t id, char *name, Object arg) { Channel *channel = NULL; if (id > 0) { if (!(channel = pmap_get(uint64_t)(channels, id))) { - msgpack_rpc_free_object(data); + msgpack_rpc_free_object(arg); return false; } - send_event(channel, type, data); + send_message(channel, 2, 0, name, arg); } else { - broadcast_event(type, data); + broadcast_event(name, arg); } return true; @@ -284,29 +284,33 @@ static bool channel_write(Channel *channel, WBuffer *buffer) return success; } -static void send_event(Channel *channel, char *type, Object data) +static void send_message(Channel *channel, + int type, + uint64_t id, + char *name, + Object arg) { - channel_write(channel, serialize_event(type, data)); + channel_write(channel, serialize_message(type, id, name, arg)); } -static void broadcast_event(char *type, Object data) +static void broadcast_event(char *name, Object arg) { kvec_t(Channel *) subscribed; kv_init(subscribed); Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, type)) { + if (pmap_has(cstr_t)(channel->subscribed_events, name)) { kv_push(Channel *, subscribed, channel); } }); if (!kv_size(subscribed)) { - msgpack_rpc_free_object(data); + msgpack_rpc_free_object(arg); goto end; } - WBuffer *buffer = serialize_event(type, data); + WBuffer *buffer = serialize_message(2, 0, name, arg); for (size_t i = 0; i < kv_size(subscribed); i++) { channel_write(kv_A(subscribed, i), buffer); @@ -364,17 +368,20 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static WBuffer *serialize_event(char *type, Object data) +static WBuffer *serialize_message(int type, + uint64_t id, + char *method, + Object arg) { - String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type}; + String method_str = {.size = strnlen(method, METHOD_MAXLEN), .data = method}; msgpack_packer packer; msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_notification(event_type, data, &packer); + msgpack_rpc_message(type, id, method_str, arg, &packer); WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size), msgpack_event_buffer.size, free); - msgpack_rpc_free_object(data); + msgpack_rpc_free_object(arg); msgpack_sbuffer_clear(&msgpack_event_buffer); return rv; @@ -391,3 +398,4 @@ static Channel *register_channel() pmap_put(uint64_t)(channels, rv->id, rv); return rv; } + diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h index f12d54cede..ce04abb76d 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/os/channel.h @@ -6,7 +6,7 @@ #include "nvim/api/private/defs.h" #include "nvim/vim.h" -#define EVENT_MAXLEN 512 +#define METHOD_MAXLEN 512 #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/channel.h.generated.h" diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c index 63e1245028..0d9a7ae3de 100644 --- a/src/nvim/os/msgpack_rpc.c +++ b/src/nvim/os/msgpack_rpc.c @@ -115,13 +115,22 @@ void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) msgpack_rpc_dispatch(id, req, res); } -void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac) +void msgpack_rpc_message(int type, + uint64_t id, + String method, + Object arg, + msgpack_packer *pac) { - msgpack_pack_array(pac, 3); - msgpack_pack_int(pac, 2); - msgpack_pack_raw(pac, type.size); - msgpack_pack_raw_body(pac, type.data, type.size); - msgpack_rpc_from_object(data, pac); + msgpack_pack_array(pac, id ? 4 : 3); + msgpack_pack_int(pac, type); + + if (id) { + msgpack_pack_uint64(pac, id); + } + + msgpack_pack_raw(pac, method.size); + msgpack_pack_raw_body(pac, method.data, method.size); + msgpack_rpc_from_object(arg, pac); } void msgpack_rpc_error(char *msg, msgpack_packer *res) diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h index baabff20aa..9858eab960 100644 --- a/src/nvim/os/msgpack_rpc.h +++ b/src/nvim/os/msgpack_rpc.h @@ -24,13 +24,20 @@ typedef enum { void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); -/// Packs a notification message +/// Packs a message /// -/// @param type The message type, an arbitrary string -/// @param data The notification data +/// @param type The message type +/// @param id The message id, must be an unique integer > 0 or will be +/// ignored(the message array will have 3 elements instead of 4). +/// @param method The message name, an arbitrary string +/// @param arg The message argument /// @param packer Where the notification will be packed to -void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac) - FUNC_ATTR_NONNULL_ARG(3); +void msgpack_rpc_message(int type, + uint64_t id, + String method, + Object arg, + msgpack_packer *pac) + FUNC_ATTR_NONNULL_ARG(5); /// Dispatches to the actual API function after basic payload validation by /// `msgpack_rpc_call`. It is responsible for validating/converting arguments -- cgit From ea7a389ec77c0031160ce860129101c603d8e0ec Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 20 Jun 2014 10:53:05 -0300 Subject: channel: Implement the 'channel_send_call' function This function is used to send RPC calls to clients. In contrast to `channel_send_event`, this function will block until the client sends a response(But it will continue processing requests from that client). The RPC call stack has a maximum depth of 20. --- src/nvim/api/private/helpers.c | 2 +- src/nvim/eval.c | 43 ++++++++++ src/nvim/os/channel.c | 189 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 226 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c index 30301e9368..d5ebc93f7c 100644 --- a/src/nvim/api/private/helpers.c +++ b/src/nvim/api/private/helpers.c @@ -341,7 +341,7 @@ String cstr_to_string(const char *str) }; } -static bool object_to_vim(Object obj, typval_T *tv, Error *err) +bool object_to_vim(Object obj, typval_T *tv, Error *err) { tv->v_type = VAR_UNKNOWN; tv->v_lock = 0; diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 7300e60b1a..4c39950344 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -71,6 +71,7 @@ #include "nvim/os/time.h" #include "nvim/os/channel.h" #include "nvim/api/private/helpers.h" +#include "nvim/os/msgpack_rpc.h" #define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */ @@ -6453,6 +6454,7 @@ static struct fst { {"searchpair", 3, 7, f_searchpair}, {"searchpairpos", 3, 7, f_searchpairpos}, {"searchpos", 1, 4, f_searchpos}, + {"send_call", 3, 3, f_send_call}, {"send_event", 3, 3, f_send_event}, {"setbufvar", 3, 3, f_setbufvar}, {"setcmdpos", 1, 1, f_setcmdpos}, @@ -12525,6 +12527,47 @@ do_searchpair ( return retval; } +// "send_call()" function +static void f_send_call(typval_T *argvars, typval_T *rettv) +{ + rettv->v_type = VAR_NUMBER; + rettv->vval.v_number = 0; + + if (check_restricted() || check_secure()) { + return; + } + + if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) { + EMSG2(_(e_invarg2), "Channel id must be a positive integer"); + return; + } + + if (argvars[1].v_type != VAR_STRING) { + EMSG2(_(e_invarg2), "Method name must be a string"); + return; + } + + bool errored; + Object result; + if (!channel_send_call((uint64_t)argvars[0].vval.v_number, + (char *)argvars[1].vval.v_string, + vim_to_object(&argvars[2]), + &result, + &errored)) { + EMSG2(_(e_invarg2), "Channel doesn't exist"); + return; + } + + Error conversion_error = {.set = false}; + if (errored || !object_to_vim(result, rettv, &conversion_error)) { + EMSG(errored ? + result.data.string.data : + _("Error converting the call result")); + } + + msgpack_rpc_free_object(result); +} + // "send_event()" function static void f_send_event(typval_T *argvars, typval_T *rettv) { diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 4299f2a06d..552c962b3a 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -5,6 +5,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/os/channel.h" +#include "nvim/os/event.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/wstream.h" @@ -18,10 +19,16 @@ #include "nvim/map.h" #include "nvim/lib/kvec.h" +typedef struct { + uint64_t request_id; + bool errored; + Object result; +} ChannelCallFrame; + typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job, is_alive; + bool is_job, enabled; msgpack_unpacker *unpacker; msgpack_sbuffer *sbuffer; union { @@ -32,6 +39,9 @@ typedef struct { uv_stream_t *uv; } streams; } data; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; + size_t rpc_call_level; } Channel; static uint64_t next_id = 1; @@ -135,6 +145,78 @@ bool channel_send_event(uint64_t id, char *name, Object arg) return true; } +bool channel_send_call(uint64_t id, + char *name, + Object arg, + Object *result, + bool *errored) +{ + Channel *channel = NULL; + + if (!(channel = pmap_get(uint64_t)(channels, id))) { + msgpack_rpc_free_object(arg); + return false; + } + + if (kv_size(channel->call_stack) > 20) { + // 20 stack depth is more than anyone should ever need for RPC calls + *errored = true; + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " was closed due to a high stack depth " + "while processing a RPC call", + channel->id); + *result = STRING_OBJ(buf); + } + + uint64_t request_id = channel->next_request_id++; + // Send the msgpack-rpc request + channel_write(channel, serialize_message(0, request_id, name, arg)); + + if (!kv_size(channel->call_stack)) { + // This is the first frame, we must disable event deferral for this + // channel because we won't be returning until the client sends a + // response + if (channel->is_job) { + job_set_defer(channel->data.job, false); + } else { + rstream_set_defer(channel->data.streams.read, false); + } + } + + // Push the frame + ChannelCallFrame frame = {request_id, false, NIL}; + kv_push(ChannelCallFrame *, channel->call_stack, &frame); + size_t size = kv_size(channel->call_stack); + + do { + event_poll(-1); + } while ( + // Continue running if ... + channel->enabled && // the channel is still enabled + kv_size(channel->call_stack) >= size); // the call didn't return + + if (!kv_size(channel->call_stack)) { + // Popped last frame, restore event deferral + if (channel->is_job) { + job_set_defer(channel->data.job, true); + } else { + rstream_set_defer(channel->data.streams.read, true); + } + if (!channel->enabled && !channel->rpc_call_level) { + // Close the channel if it has been disabled and we have not been called + // by `parse_msgpack`(It would be unsafe to close the channel otherwise) + close_channel(channel); + } + } + + *errored = frame.errored; + *result = frame.result; + + return true; +} + /// Subscribes to event broadcasts /// /// @param id The channel id @@ -193,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) Channel *channel = data; if (eof) { - close_channel(channel); + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed by the client", + channel->id); + disable_channel(channel, buf); return; } + channel->rpc_call_level++; uint32_t count = rstream_available(rstream); // Feed the unpacker with data @@ -211,6 +300,24 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Deserialize everything we can. while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked)) == kUnpackResultOk) { + if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + if (is_valid_rpc_response(&unpacked.data, channel)) { + call_stack_pop(&unpacked.data, channel); + } else { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Channel %" PRIu64 " returned a response that doesn't have " + " a matching id for the current RPC call. Ensure the client " + " is properly synchronized", + channel->id); + call_stack_unwind(channel, buf, 1); + } + msgpack_unpacked_destroy(&unpacked); + // Bail out from this event loop iteration + goto end; + } + // Each object is a new msgpack-rpc request and requires an empty response msgpack_packer response; msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); @@ -221,7 +328,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) channel->sbuffer->size, free); if (!channel_write(channel, buffer)) { - return; + goto end; } // Clear the buffer for future calls msgpack_sbuffer_clear(channel->sbuffer); @@ -238,6 +345,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting"); } + +end: + channel->rpc_call_level--; + if (!channel->enabled && !kv_size(channel->call_stack)) { + // Now it's safe to destroy the channel + close_channel(channel); + } } static void send_error(Channel *channel, char *msg) @@ -276,9 +390,14 @@ static bool channel_write(Channel *channel, WBuffer *buffer) } if (!success) { - // If the write failed for whatever reason, mark the channel as not alive so - // it can be freed later - channel->is_alive = false; + // If the write failed for any reason, close the channel + char buf[256]; + snprintf(buf, + sizeof(buf), + "Before returning from a RPC call, channel %" PRIu64 " was " + "closed due to a failed write", + channel->id); + disable_channel(channel, buf); } return success; @@ -359,6 +478,7 @@ static void close_channel(Channel *channel) }); pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); free(channel); } @@ -390,12 +510,67 @@ static WBuffer *serialize_message(int type, static Channel *register_channel() { Channel *rv = xmalloc(sizeof(Channel)); - rv->is_alive = true; + rv->enabled = true; + rv->rpc_call_level = 0; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->sbuffer = msgpack_sbuffer_new(); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); + rv->next_request_id = 1; + kv_init(rv->call_stack); pmap_put(uint64_t)(channels, rv->id, rv); return rv; } +static bool is_rpc_response(msgpack_object *obj) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 4 + && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.array.ptr[0].via.u64 == 1 + && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; +} + +static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) +{ + uint64_t response_id = obj->via.array.ptr[1].via.u64; + // Must be equal to the frame at the stack's bottom + return response_id == kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1)->request_id; +} + +static void call_stack_pop(msgpack_object *obj, Channel *channel) +{ + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; + (void)kv_pop(channel->call_stack); + + if (frame->errored) { + msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); + } else { + msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); + } +} + +static void call_stack_unwind(Channel *channel, char *msg, int count) +{ + while (kv_size(channel->call_stack) && count--) { + ChannelCallFrame *frame = kv_pop(channel->call_stack); + frame->errored = true; + frame->result = STRING_OBJ(msg); + } +} + +static void disable_channel(Channel *channel, char *msg) +{ + if (kv_size(channel->call_stack)) { + // Channel is currently in the middle of a call, remove all frames and mark + // it as "dead" + channel->enabled = false; + call_stack_unwind(channel, msg, -1); + } else { + // Safe to close it now + close_channel(channel); + } +} -- cgit From bc0380038e0a4ff4f4bfaa939b0cef26c5e53582 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 23 Jun 2014 11:42:29 -0300 Subject: channel/msgpack_rpc: Refactor to better split functions across modules Move validation/conversion functions and to msgpack_rpc_helpers to separate those from the functions that are used from the channel module --- src/nvim/eval.c | 2 +- src/nvim/os/channel.c | 82 +++---- src/nvim/os/msgpack_rpc.c | 453 +++++++++----------------------------- src/nvim/os/msgpack_rpc.h | 158 +------------ src/nvim/os/msgpack_rpc_helpers.c | 331 ++++++++++++++++++++++++++++ src/nvim/os/msgpack_rpc_helpers.h | 124 +++++++++++ 6 files changed, 589 insertions(+), 561 deletions(-) create mode 100644 src/nvim/os/msgpack_rpc_helpers.c create mode 100644 src/nvim/os/msgpack_rpc_helpers.h (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 4c39950344..adc411afc7 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -71,7 +71,7 @@ #include "nvim/os/time.h" #include "nvim/os/channel.h" #include "nvim/api/private/helpers.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/os/msgpack_rpc_helpers.h" #define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */ diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 552c962b3a..b44b1d13a4 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -13,6 +13,7 @@ #include "nvim/os/job.h" #include "nvim/os/job_defs.h" #include "nvim/os/msgpack_rpc.h" +#include "nvim/os/msgpack_rpc_helpers.h" #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/message.h" @@ -47,7 +48,7 @@ typedef struct { static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; -static msgpack_sbuffer msgpack_event_buffer; +static msgpack_sbuffer out_buffer; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/channel.c.generated.h" @@ -58,7 +59,7 @@ void channel_init() { channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); - msgpack_sbuffer_init(&msgpack_event_buffer); + msgpack_sbuffer_init(&out_buffer); } /// Teardown the module @@ -137,7 +138,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg) msgpack_rpc_free_object(arg); return false; } - send_message(channel, 2, 0, name, arg); + send_event(channel, name, arg); } else { broadcast_event(name, arg); } @@ -172,7 +173,7 @@ bool channel_send_call(uint64_t id, uint64_t request_id = channel->next_request_id++; // Send the msgpack-rpc request - channel_write(channel, serialize_message(0, request_id, name, arg)); + send_request(channel, request_id, name, arg); if (!kv_size(channel->call_stack)) { // This is the first frame, we must disable event deferral for this @@ -341,9 +342,9 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // A not so uncommon cause for this might be deserializing objects with // a high nesting level: msgpack will break when it's internal parse stack // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - send_error(channel, "Invalid msgpack payload. " - "This error can also happen when deserializing " - "an object with high level of nesting"); + send_error(channel, 0, "Invalid msgpack payload. " + "This error can also happen when deserializing " + "an object with high level of nesting"); } end: @@ -354,31 +355,6 @@ end: } } -static void send_error(Channel *channel, char *msg) -{ - msgpack_packer err; - // See src/msgpack/unpack_template.h in msgpack source tree for - // causes for this error(search for 'goto _failed') - // - // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) - msgpack_packer_init(&err, channel->sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&err, 4); - msgpack_pack_int(&err, 1); - msgpack_pack_int(&err, 0); - msgpack_rpc_error(msg, &err); - WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free); - if (!channel_write(channel, buffer)) { - return; - } - // Clear the buffer for future calls - msgpack_sbuffer_clear(channel->sbuffer); -} - static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; @@ -403,13 +379,27 @@ static bool channel_write(Channel *channel, WBuffer *buffer) return success; } -static void send_message(Channel *channel, - int type, +static void send_error(Channel *channel, uint64_t id, char *err_msg) +{ + String err = {.size = strlen(err_msg), .data = err_msg}; + channel_write(channel, serialize_response(id, err, NIL, channel->sbuffer)); +} + +static void send_request(Channel *channel, uint64_t id, char *name, Object arg) { - channel_write(channel, serialize_message(type, id, name, arg)); + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(id, method, arg, &out_buffer)); +} + +static void send_event(Channel *channel, + char *name, + Object arg) +{ + String method = {.size = strlen(name), .data = name}; + channel_write(channel, serialize_request(0, method, arg, &out_buffer)); } static void broadcast_event(char *name, Object arg) @@ -429,7 +419,8 @@ static void broadcast_event(char *name, Object arg) goto end; } - WBuffer *buffer = serialize_message(2, 0, name, arg); + String method = {.size = strlen(name), .data = name}; + WBuffer *buffer = serialize_request(0, method, arg, &out_buffer); for (size_t i = 0; i < kv_size(subscribed); i++) { channel_write(kv_A(subscribed, i), buffer); @@ -488,25 +479,6 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static WBuffer *serialize_message(int type, - uint64_t id, - char *method, - Object arg) -{ - String method_str = {.size = strnlen(method, METHOD_MAXLEN), .data = method}; - msgpack_packer packer; - msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); - msgpack_rpc_message(type, id, method_str, arg, &packer); - WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data, - msgpack_event_buffer.size), - msgpack_event_buffer.size, - free); - msgpack_rpc_free_object(arg); - msgpack_sbuffer_clear(&msgpack_event_buffer); - - return rv; -} - static Channel *register_channel() { Channel *rv = xmalloc(sizeof(Channel)); diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c index 0d9a7ae3de..5e2ec6aa10 100644 --- a/src/nvim/os/msgpack_rpc.c +++ b/src/nvim/os/msgpack_rpc.c @@ -3,59 +3,25 @@ #include -#include "nvim/os/msgpack_rpc.h" #include "nvim/vim.h" #include "nvim/memory.h" - -#define REMOTE_FUNCS_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ - { \ - *arg = obj->via.u64; \ - return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \ - } \ - \ - void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \ - { \ - msgpack_pack_uint64(res, result); \ - } - -#define TYPED_ARRAY_IMPL(t, lt) \ - bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \ - { \ - if (obj->type != MSGPACK_OBJECT_ARRAY) { \ - return false; \ - } \ - \ - arg->size = obj->via.array.size; \ - arg->items = xcalloc(obj->via.array.size, sizeof(t)); \ - \ - for (size_t i = 0; i < obj->via.array.size; i++) { \ - if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \ - return false; \ - } \ - } \ - \ - return true; \ - } \ - \ - void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \ - { \ - msgpack_pack_array(res, result.size); \ - \ - for (size_t i = 0; i < result.size; i++) { \ - msgpack_rpc_from_##lt(result.items[i], res); \ - } \ - } \ - \ - void msgpack_rpc_free_##lt##array(t##Array value) { \ - for (size_t i = 0; i < value.size; i++) { \ - msgpack_rpc_free_##lt(value.items[i]); \ - } \ - \ - free(value.items); \ - } - +#include "nvim/os/wstream.h" +#include "nvim/os/msgpack_rpc.h" +#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/func_attr.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/msgpack_rpc.c.generated.h" +#endif + +/// Validates the basic structure of the msgpack-rpc call and fills `res` +/// with the basic response structure. +/// +/// @param id The channel id +/// @param req The parsed request object +/// @param res A packer that contains the response void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3) { // The initial response structure is the same no matter what happens, // we set it up here @@ -115,302 +81,19 @@ void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) msgpack_rpc_dispatch(id, req, res); } -void msgpack_rpc_message(int type, - uint64_t id, - String method, - Object arg, - msgpack_packer *pac) -{ - msgpack_pack_array(pac, id ? 4 : 3); - msgpack_pack_int(pac, type); - - if (id) { - msgpack_pack_uint64(pac, id); - } - - msgpack_pack_raw(pac, method.size); - msgpack_pack_raw_body(pac, method.data, method.size); - msgpack_rpc_from_object(arg, pac); -} - -void msgpack_rpc_error(char *msg, msgpack_packer *res) -{ - size_t len = strlen(msg); - - // error message - msgpack_pack_raw(res, len); - msgpack_pack_raw_body(res, msg, len); - // Nil result - msgpack_pack_nil(res); -} - -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) -{ - *arg = obj->via.boolean; - return obj->type == MSGPACK_OBJECT_BOOLEAN; -} - -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) -{ - if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER - && obj->via.u64 <= INT64_MAX) { - *arg = (int64_t)obj->via.u64; - return true; - } - - *arg = obj->via.i64; - return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; -} - -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) -{ - *arg = obj->via.dec; - return obj->type == MSGPACK_OBJECT_DOUBLE; -} - -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) -{ - if (obj->type != MSGPACK_OBJECT_RAW) { - return false; - } - - arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size); - arg->size = obj->via.raw.size; - return true; -} - -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) -{ - switch (obj->type) { - case MSGPACK_OBJECT_NIL: - arg->type = kObjectTypeNil; - return true; - - case MSGPACK_OBJECT_BOOLEAN: - arg->type = kObjectTypeBoolean; - return msgpack_rpc_to_boolean(obj, &arg->data.boolean); - - case MSGPACK_OBJECT_POSITIVE_INTEGER: - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - arg->type = kObjectTypeInteger; - return msgpack_rpc_to_integer(obj, &arg->data.integer); - - case MSGPACK_OBJECT_DOUBLE: - arg->type = kObjectTypeFloat; - return msgpack_rpc_to_float(obj, &arg->data.floating); - - case MSGPACK_OBJECT_RAW: - arg->type = kObjectTypeString; - return msgpack_rpc_to_string(obj, &arg->data.string); - - case MSGPACK_OBJECT_ARRAY: - arg->type = kObjectTypeArray; - return msgpack_rpc_to_array(obj, &arg->data.array); - - case MSGPACK_OBJECT_MAP: - arg->type = kObjectTypeDictionary; - return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); - - default: - return false; - } -} - -bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) -{ - return obj->type == MSGPACK_OBJECT_ARRAY - && obj->via.array.size == 2 - && msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row) - && msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col); -} - - -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) -{ - if (obj->type != MSGPACK_OBJECT_ARRAY) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.array.size, sizeof(Object)); - - for (uint32_t i = 0; i < obj->via.array.size; i++) { - if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { - return false; - } - } - - return true; -} - -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) -{ - if (obj->type != MSGPACK_OBJECT_MAP) { - return false; - } - - arg->size = obj->via.array.size; - arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); - - - for (uint32_t i = 0; i < obj->via.map.size; i++) { - if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, - &arg->items[i].key)) { - return false; - } - - if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, - &arg->items[i].value)) { - return false; - } - } - - return true; -} - -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) -{ - if (result) { - msgpack_pack_true(res); - } else { - msgpack_pack_false(res); - } -} - -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) -{ - msgpack_pack_int64(res, result); -} - -void msgpack_rpc_from_float(Float result, msgpack_packer *res) -{ - msgpack_pack_double(res, result); -} - -void msgpack_rpc_from_string(String result, msgpack_packer *res) -{ - msgpack_pack_raw(res, result.size); - msgpack_pack_raw_body(res, result.data, result.size); -} - -void msgpack_rpc_from_object(Object result, msgpack_packer *res) -{ - switch (result.type) { - case kObjectTypeNil: - msgpack_pack_nil(res); - break; - - case kObjectTypeBoolean: - msgpack_rpc_from_boolean(result.data.boolean, res); - break; - - case kObjectTypeInteger: - msgpack_rpc_from_integer(result.data.integer, res); - break; - - case kObjectTypeFloat: - msgpack_rpc_from_float(result.data.floating, res); - break; - - case kObjectTypeString: - msgpack_rpc_from_string(result.data.string, res); - break; - - case kObjectTypeArray: - msgpack_rpc_from_array(result.data.array, res); - break; - - case kObjectTypeDictionary: - msgpack_rpc_from_dictionary(result.data.dictionary, res); - break; - - default: - abort(); - } -} - -void msgpack_rpc_from_position(Position result, msgpack_packer *res) -{ - msgpack_pack_array(res, 2);; - msgpack_pack_int64(res, result.row); - msgpack_pack_int64(res, result.col); -} - -void msgpack_rpc_from_array(Array result, msgpack_packer *res) -{ - msgpack_pack_array(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_object(result.items[i], res); - } -} - -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) -{ - msgpack_pack_map(res, result.size); - - for (size_t i = 0; i < result.size; i++) { - msgpack_rpc_from_string(result.items[i].key, res); - msgpack_rpc_from_object(result.items[i].value, res); - } -} - -void msgpack_rpc_free_string(String value) -{ - if (!value.data) { - return; - } - - free(value.data); -} - -void msgpack_rpc_free_object(Object value) -{ - switch (value.type) { - case kObjectTypeNil: - case kObjectTypeBoolean: - case kObjectTypeInteger: - case kObjectTypeFloat: - break; - - case kObjectTypeString: - msgpack_rpc_free_string(value.data.string); - break; - - case kObjectTypeArray: - msgpack_rpc_free_array(value.data.array); - break; - - case kObjectTypeDictionary: - msgpack_rpc_free_dictionary(value.data.dictionary); - break; - - default: - abort(); - } -} - -void msgpack_rpc_free_array(Array value) -{ - for (uint32_t i = 0; i < value.size; i++) { - msgpack_rpc_free_object(value.items[i]); - } - - free(value.items); -} - -void msgpack_rpc_free_dictionary(Dictionary value) -{ - for (uint32_t i = 0; i < value.size; i++) { - msgpack_rpc_free_string(value.items[i].key); - msgpack_rpc_free_object(value.items[i].value); - } - - free(value.items); -} - +/// Try to unpack a msgpack document from the data in the unpacker buffer. This +/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets +/// the called know if the unpacking failed due to bad input or due to missing +/// data. +/// +/// @param unpacker The unpacker containing the parse buffer +/// @param result The result which will contain the parsed object +/// @return kUnpackResultOk : An object was parsed +/// kUnpackResultFail : Got bad input +/// kUnpackResultNeedMore: Need more data UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker, msgpack_unpacked* result) + FUNC_ATTR_NONNULL_ALL { if (result->zone != NULL) { msgpack_zone_free(result->zone); @@ -434,12 +117,80 @@ UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker, return kUnpackResultNeedMore; } -REMOTE_FUNCS_IMPL(Buffer, buffer) -REMOTE_FUNCS_IMPL(Window, window) -REMOTE_FUNCS_IMPL(Tabpage, tabpage) +/// Finishes the msgpack-rpc call with an error message. +/// +/// @param msg The error message +/// @param res A packer that contains the response +void msgpack_rpc_error(char *msg, msgpack_packer *res) + FUNC_ATTR_NONNULL_ALL +{ + size_t len = strlen(msg); -TYPED_ARRAY_IMPL(Buffer, buffer) -TYPED_ARRAY_IMPL(Window, window) -TYPED_ARRAY_IMPL(Tabpage, tabpage) -TYPED_ARRAY_IMPL(String, string) + // error message + msgpack_pack_raw(res, len); + msgpack_pack_raw_body(res, msg, len); + // Nil result + msgpack_pack_nil(res); +} + +/// Serializes a msgpack-rpc request or notification(id == 0) +WBuffer *serialize_request(uint64_t id, + String method, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ALL +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, id ? 4 : 3); + msgpack_pack_int(&pac, id ? 0 : 2); + + if (id) { + msgpack_pack_uint64(&pac, id); + } + + msgpack_pack_raw(&pac, method.size); + msgpack_pack_raw_body(&pac, method.data, method.size); + msgpack_rpc_from_object(arg, &pac); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + free); + msgpack_rpc_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +/// Serializes a msgpack-rpc response +WBuffer *serialize_response(uint64_t id, + String err, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ALL +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, id); + + if (err.size) { + // error message + msgpack_pack_raw(&pac, err.size); + msgpack_pack_raw_body(&pac, err.data, err.size); + // Nil result + msgpack_pack_nil(&pac); + } else { + // Nil error + msgpack_pack_nil(&pac); + // Return value + msgpack_rpc_from_object(arg, &pac); + } + + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + free); + msgpack_rpc_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h index 9858eab960..cbb487b44a 100644 --- a/src/nvim/os/msgpack_rpc.h +++ b/src/nvim/os/msgpack_rpc.h @@ -8,6 +8,7 @@ #include "nvim/func_attr.h" #include "nvim/api/private/defs.h" +#include "nvim/os/wstream.h" typedef enum { kUnpackResultOk, /// Successfully parsed a document @@ -15,30 +16,6 @@ typedef enum { kUnpackResultNeedMore /// Need more data } UnpackResult; -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); - -/// Packs a message -/// -/// @param type The message type -/// @param id The message id, must be an unique integer > 0 or will be -/// ignored(the message array will have 3 elements instead of 4). -/// @param method The message name, an arbitrary string -/// @param arg The message argument -/// @param packer Where the notification will be packed to -void msgpack_rpc_message(int type, - uint64_t id, - String method, - Object arg, - msgpack_packer *pac) - FUNC_ATTR_NONNULL_ARG(5); - /// Dispatches to the actual API function after basic payload validation by /// `msgpack_rpc_call`. It is responsible for validating/converting arguments /// to C types, and converting the return value back to msgpack types. @@ -53,136 +30,9 @@ void msgpack_rpc_dispatch(uint64_t id, msgpack_packer *res) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); -/// Try to unpack a msgpack document from the data in the unpacker buffer. This -/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets -/// the called know if the unpacking failed due to bad input or due to missing -/// data. -/// -/// @param unpacker The unpacker containing the parse buffer -/// @param result The result which will contain the parsed object -/// @return kUnpackResultOk : An object was parsed -/// kUnpackResultFail : Got bad input -/// kUnpackResultNeedMore: Need more data -UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker, - msgpack_unpacked* result); - -/// Finishes the msgpack-rpc call with an error message. -/// -/// @param msg The error message -/// @param res A packer that contains the response -void msgpack_rpc_error(char *msg, msgpack_packer *res) - FUNC_ATTR_NONNULL_ALL; - -/// Functions for validating and converting from msgpack types to C types. -/// These are used by `msgpack_rpc_dispatch` to validate and convert each -/// argument. -/// -/// @param obj The object to convert -/// @param[out] arg A pointer to the avalue -/// @return true if the conversion succeeded, false otherwise -bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) - FUNC_ATTR_NONNULL_ALL; -bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) - FUNC_ATTR_NONNULL_ALL; - -/// Functions for converting from C types to msgpack types. -/// These are used by `msgpack_rpc_dispatch` to convert return values -/// from the API -/// -/// @param result A pointer to the result -/// @param res A packer that contains the response -void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_float(Float result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_position(Position result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_string(String result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_window(Window result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_object(Object result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_array(Array result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); -void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2); - -/// Helpers for initializing types that may be freed later -#define msgpack_rpc_init_boolean -#define msgpack_rpc_init_integer -#define msgpack_rpc_init_float -#define msgpack_rpc_init_position -#define msgpack_rpc_init_string = STRING_INIT -#define msgpack_rpc_init_buffer -#define msgpack_rpc_init_window -#define msgpack_rpc_init_tabpage -#define msgpack_rpc_init_object = {.type = kObjectTypeNil} -#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT -#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT -#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT -#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT -#define msgpack_rpc_init_array = ARRAY_DICT_INIT -#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT - -/// Helpers for freeing arguments/return value -/// -/// @param value The value to be freed -#define msgpack_rpc_free_boolean(value) -#define msgpack_rpc_free_integer(value) -#define msgpack_rpc_free_float(value) -#define msgpack_rpc_free_position(value) -void msgpack_rpc_free_string(String value); -#define msgpack_rpc_free_buffer(value) -#define msgpack_rpc_free_window(value) -#define msgpack_rpc_free_tabpage(value) -void msgpack_rpc_free_object(Object value); -void msgpack_rpc_free_stringarray(StringArray value); -void msgpack_rpc_free_bufferarray(BufferArray value); -void msgpack_rpc_free_windowarray(WindowArray value); -void msgpack_rpc_free_tabpagearray(TabpageArray value); -void msgpack_rpc_free_array(Array value); -void msgpack_rpc_free_dictionary(Dictionary value); +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/msgpack_rpc.h.generated.h" +#endif #endif // NVIM_OS_MSGPACK_RPC_H diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c new file mode 100644 index 0000000000..3af6794169 --- /dev/null +++ b/src/nvim/os/msgpack_rpc_helpers.c @@ -0,0 +1,331 @@ +#include +#include + +#include + +#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/vim.h" +#include "nvim/memory.h" + +#define REMOTE_FUNCS_IMPL(t, lt) \ + bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \ + { \ + *arg = obj->via.u64; \ + return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \ + } \ + \ + void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \ + { \ + msgpack_pack_uint64(res, result); \ + } + +#define TYPED_ARRAY_IMPL(t, lt) \ + bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \ + { \ + if (obj->type != MSGPACK_OBJECT_ARRAY) { \ + return false; \ + } \ + \ + arg->size = obj->via.array.size; \ + arg->items = xcalloc(obj->via.array.size, sizeof(t)); \ + \ + for (size_t i = 0; i < obj->via.array.size; i++) { \ + if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \ + return false; \ + } \ + } \ + \ + return true; \ + } \ + \ + void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \ + { \ + msgpack_pack_array(res, result.size); \ + \ + for (size_t i = 0; i < result.size; i++) { \ + msgpack_rpc_from_##lt(result.items[i], res); \ + } \ + } \ + \ + void msgpack_rpc_free_##lt##array(t##Array value) { \ + for (size_t i = 0; i < value.size; i++) { \ + msgpack_rpc_free_##lt(value.items[i]); \ + } \ + \ + free(value.items); \ + } + +bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) +{ + *arg = obj->via.boolean; + return obj->type == MSGPACK_OBJECT_BOOLEAN; +} + +bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) +{ + if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER + && obj->via.u64 <= INT64_MAX) { + *arg = (int64_t)obj->via.u64; + return true; + } + + *arg = obj->via.i64; + return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER; +} + +bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) +{ + *arg = obj->via.dec; + return obj->type == MSGPACK_OBJECT_DOUBLE; +} + +bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) +{ + if (obj->type != MSGPACK_OBJECT_RAW) { + return false; + } + + arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size); + arg->size = obj->via.raw.size; + return true; +} + +bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) +{ + switch (obj->type) { + case MSGPACK_OBJECT_NIL: + arg->type = kObjectTypeNil; + return true; + + case MSGPACK_OBJECT_BOOLEAN: + arg->type = kObjectTypeBoolean; + return msgpack_rpc_to_boolean(obj, &arg->data.boolean); + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + arg->type = kObjectTypeInteger; + return msgpack_rpc_to_integer(obj, &arg->data.integer); + + case MSGPACK_OBJECT_DOUBLE: + arg->type = kObjectTypeFloat; + return msgpack_rpc_to_float(obj, &arg->data.floating); + + case MSGPACK_OBJECT_RAW: + arg->type = kObjectTypeString; + return msgpack_rpc_to_string(obj, &arg->data.string); + + case MSGPACK_OBJECT_ARRAY: + arg->type = kObjectTypeArray; + return msgpack_rpc_to_array(obj, &arg->data.array); + + case MSGPACK_OBJECT_MAP: + arg->type = kObjectTypeDictionary; + return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary); + + default: + return false; + } +} + +bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) +{ + return obj->type == MSGPACK_OBJECT_ARRAY + && obj->via.array.size == 2 + && msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row) + && msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col); +} + + +bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) +{ + if (obj->type != MSGPACK_OBJECT_ARRAY) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.array.size, sizeof(Object)); + + for (uint32_t i = 0; i < obj->via.array.size; i++) { + if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) { + return false; + } + } + + return true; +} + +bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) +{ + if (obj->type != MSGPACK_OBJECT_MAP) { + return false; + } + + arg->size = obj->via.array.size; + arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair)); + + + for (uint32_t i = 0; i < obj->via.map.size; i++) { + if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key, + &arg->items[i].key)) { + return false; + } + + if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val, + &arg->items[i].value)) { + return false; + } + } + + return true; +} + +void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) +{ + if (result) { + msgpack_pack_true(res); + } else { + msgpack_pack_false(res); + } +} + +void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) +{ + msgpack_pack_int64(res, result); +} + +void msgpack_rpc_from_float(Float result, msgpack_packer *res) +{ + msgpack_pack_double(res, result); +} + +void msgpack_rpc_from_string(String result, msgpack_packer *res) +{ + msgpack_pack_raw(res, result.size); + msgpack_pack_raw_body(res, result.data, result.size); +} + +void msgpack_rpc_from_object(Object result, msgpack_packer *res) +{ + switch (result.type) { + case kObjectTypeNil: + msgpack_pack_nil(res); + break; + + case kObjectTypeBoolean: + msgpack_rpc_from_boolean(result.data.boolean, res); + break; + + case kObjectTypeInteger: + msgpack_rpc_from_integer(result.data.integer, res); + break; + + case kObjectTypeFloat: + msgpack_rpc_from_float(result.data.floating, res); + break; + + case kObjectTypeString: + msgpack_rpc_from_string(result.data.string, res); + break; + + case kObjectTypeArray: + msgpack_rpc_from_array(result.data.array, res); + break; + + case kObjectTypeDictionary: + msgpack_rpc_from_dictionary(result.data.dictionary, res); + break; + + default: + abort(); + } +} + +void msgpack_rpc_from_position(Position result, msgpack_packer *res) +{ + msgpack_pack_array(res, 2);; + msgpack_pack_int64(res, result.row); + msgpack_pack_int64(res, result.col); +} + +void msgpack_rpc_from_array(Array result, msgpack_packer *res) +{ + msgpack_pack_array(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_object(result.items[i], res); + } +} + +void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) +{ + msgpack_pack_map(res, result.size); + + for (size_t i = 0; i < result.size; i++) { + msgpack_rpc_from_string(result.items[i].key, res); + msgpack_rpc_from_object(result.items[i].value, res); + } +} + +void msgpack_rpc_free_string(String value) +{ + if (!value.data) { + return; + } + + free(value.data); +} + +void msgpack_rpc_free_object(Object value) +{ + switch (value.type) { + case kObjectTypeNil: + case kObjectTypeBoolean: + case kObjectTypeInteger: + case kObjectTypeFloat: + break; + + case kObjectTypeString: + msgpack_rpc_free_string(value.data.string); + break; + + case kObjectTypeArray: + msgpack_rpc_free_array(value.data.array); + break; + + case kObjectTypeDictionary: + msgpack_rpc_free_dictionary(value.data.dictionary); + break; + + default: + abort(); + } +} + +void msgpack_rpc_free_array(Array value) +{ + for (uint32_t i = 0; i < value.size; i++) { + msgpack_rpc_free_object(value.items[i]); + } + + free(value.items); +} + +void msgpack_rpc_free_dictionary(Dictionary value) +{ + for (uint32_t i = 0; i < value.size; i++) { + msgpack_rpc_free_string(value.items[i].key); + msgpack_rpc_free_object(value.items[i].value); + } + + free(value.items); +} + +REMOTE_FUNCS_IMPL(Buffer, buffer) +REMOTE_FUNCS_IMPL(Window, window) +REMOTE_FUNCS_IMPL(Tabpage, tabpage) + +TYPED_ARRAY_IMPL(Buffer, buffer) +TYPED_ARRAY_IMPL(Window, window) +TYPED_ARRAY_IMPL(Tabpage, tabpage) +TYPED_ARRAY_IMPL(String, string) + diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h new file mode 100644 index 0000000000..e3d1e756ef --- /dev/null +++ b/src/nvim/os/msgpack_rpc_helpers.h @@ -0,0 +1,124 @@ +#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H +#define NVIM_OS_MSGPACK_RPC_HELPERS_H + +#include +#include + +#include + +#include "nvim/func_attr.h" +#include "nvim/api/private/defs.h" + +/// Functions for validating and converting from msgpack types to C types. +/// These are used by `msgpack_rpc_dispatch` to validate and convert each +/// argument. +/// +/// @param obj The object to convert +/// @param[out] arg A pointer to the avalue +/// @return true if the conversion succeeded, false otherwise +bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_string(msgpack_object *obj, String *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg) + FUNC_ATTR_NONNULL_ALL; +bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg) + FUNC_ATTR_NONNULL_ALL; + +/// Functions for converting from C types to msgpack types. +/// These are used by `msgpack_rpc_dispatch` to convert return values +/// from the API +/// +/// @param result A pointer to the result +/// @param res A packer that contains the response +void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_integer(Integer result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_float(Float result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_position(Position result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_string(String result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_window(Window result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_object(Object result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_array(Array result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); +void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) + FUNC_ATTR_NONNULL_ARG(2); + +/// Helpers for initializing types that may be freed later +#define msgpack_rpc_init_boolean +#define msgpack_rpc_init_integer +#define msgpack_rpc_init_float +#define msgpack_rpc_init_position +#define msgpack_rpc_init_string = STRING_INIT +#define msgpack_rpc_init_buffer +#define msgpack_rpc_init_window +#define msgpack_rpc_init_tabpage +#define msgpack_rpc_init_object = {.type = kObjectTypeNil} +#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT +#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT +#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT +#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT +#define msgpack_rpc_init_array = ARRAY_DICT_INIT +#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT + +/// Helpers for freeing arguments/return value +/// +/// @param value The value to be freed +#define msgpack_rpc_free_boolean(value) +#define msgpack_rpc_free_integer(value) +#define msgpack_rpc_free_float(value) +#define msgpack_rpc_free_position(value) +void msgpack_rpc_free_string(String value); +#define msgpack_rpc_free_buffer(value) +#define msgpack_rpc_free_window(value) +#define msgpack_rpc_free_tabpage(value) +void msgpack_rpc_free_object(Object value); +void msgpack_rpc_free_stringarray(StringArray value); +void msgpack_rpc_free_bufferarray(BufferArray value); +void msgpack_rpc_free_windowarray(WindowArray value); +void msgpack_rpc_free_tabpagearray(TabpageArray value); +void msgpack_rpc_free_array(Array value); +void msgpack_rpc_free_dictionary(Dictionary value); + +#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H + -- cgit From 296da85198a7d5da36dbb2e6f213edb5da511635 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Mon, 23 Jun 2014 11:52:42 -0300 Subject: channel/msgpack_rpc: Refactor API dispatching This is how API dispatching worked before this commit: - The generated `msgpack_rpc_dispatch` function receives a the `msgpack_packer` argument. - The response is incrementally built while validating/calling the API. - Return values/errors are also packed into the `msgpack_packer` while the final response is being calculated. Now the `msgpack_packer` argument is no longer provided, and the `msgpack_rpc_dispatch` function returns `Object`/`Error` values to `msgpack_rpc_call`, which will use those values to build the response in a single pass. This was done because the new `channel_send_call` function created the possibility of having recursive API invocations, and this wasn't possible when sharing a single `msgpack_sbuffer` across call frames(it was shared implicitly through the `msgpack_packer` instance). Since we only start to build the response when the necessary information has been computed, it's now safe to share a single `msgpack_sbuffer` instance across all channels and API invocations. Some other changes also had to be performed: - Handling of the metadata discover was moved to `msgpack_rpc_call` - Expose more types as subtypes of `Object`, this was required to forward the return value from `msgpack_rpc_dispatch` to `msgpack_rpc_call` - Added more helper macros for casting API types to `Object` any --- src/nvim/api/private/defs.h | 18 ++++- src/nvim/api/private/helpers.c | 2 + src/nvim/api/private/helpers.h | 51 ++++++++++-- src/nvim/os/channel.c | 26 ++---- src/nvim/os/msgpack_rpc.c | 162 ++++++++++++++++++++++++-------------- src/nvim/os/msgpack_rpc.h | 13 +-- src/nvim/os/msgpack_rpc_helpers.c | 55 ++++++++++++- 7 files changed, 230 insertions(+), 97 deletions(-) (limited to 'src') diff --git a/src/nvim/api/private/defs.h b/src/nvim/api/private/defs.h index ee0fc02c4d..b049412014 100644 --- a/src/nvim/api/private/defs.h +++ b/src/nvim/api/private/defs.h @@ -65,8 +65,16 @@ typedef enum { kObjectTypeInteger, kObjectTypeFloat, kObjectTypeString, + kObjectTypeBuffer, + kObjectTypeWindow, + kObjectTypeTabpage, kObjectTypeArray, - kObjectTypeDictionary + kObjectTypeDictionary, + kObjectTypePosition, + kObjectTypeStringArray, + kObjectTypeBufferArray, + kObjectTypeWindowArray, + kObjectTypeTabpageArray, } ObjectType; struct object { @@ -76,8 +84,16 @@ struct object { Integer integer; Float floating; String string; + Buffer buffer; + Window window; + Tabpage tabpage; Array array; Dictionary dictionary; + Position position; + StringArray stringarray; + BufferArray bufferarray; + WindowArray windowarray; + TabpageArray tabpagearray; } data; }; diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c index d5ebc93f7c..024f0c2405 100644 --- a/src/nvim/api/private/helpers.c +++ b/src/nvim/api/private/helpers.c @@ -426,6 +426,8 @@ bool object_to_vim(Object obj, typval_T *tv, Error *err) } tv->vval.v_dict->dv_refcount++; break; + default: + abort(); } return true; diff --git a/src/nvim/api/private/helpers.h b/src/nvim/api/private/helpers.h index e1e1a35490..f1b9dc3bc8 100644 --- a/src/nvim/api/private/helpers.h +++ b/src/nvim/api/private/helpers.h @@ -14,7 +14,9 @@ err->set = true; \ } while (0) -#define BOOL_OBJ(b) ((Object) { \ +#define OBJECT_OBJ(o) o + +#define BOOLEAN_OBJ(b) ((Object) { \ .type = kObjectTypeBoolean, \ .data.boolean = b \ }) @@ -26,26 +28,59 @@ #define STRING_OBJ(s) ((Object) { \ .type = kObjectTypeString, \ - .data.string = cstr_to_string(s) \ + .data.string = s \ }) -#define STRINGL_OBJ(d, s) ((Object) { \ - .type = kObjectTypeString, \ - .data.string = (String) { \ - .size = s, \ - .data = xmemdup(d, s) \ - }}) +#define BUFFER_OBJ(s) ((Object) { \ + .type = kObjectTypeBuffer, \ + .data.buffer = s \ + }) + +#define WINDOW_OBJ(s) ((Object) { \ + .type = kObjectTypeWindow, \ + .data.window = s \ + }) + +#define TABPAGE_OBJ(s) ((Object) { \ + .type = kObjectTypeTabpage, \ + .data.tabpage = s \ + }) #define ARRAY_OBJ(a) ((Object) { \ .type = kObjectTypeArray, \ .data.array = a \ }) +#define STRINGARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeStringArray, \ + .data.stringarray = a \ + }) + +#define BUFFERARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeBufferArray, \ + .data.bufferarray = a \ + }) + +#define WINDOWARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeWindowArray, \ + .data.windowarray = a \ + }) + +#define TABPAGEARRAY_OBJ(a) ((Object) { \ + .type = kObjectTypeTabpageArray, \ + .data.tabpagearray = a \ + }) + #define DICTIONARY_OBJ(d) ((Object) { \ .type = kObjectTypeDictionary, \ .data.dictionary = d \ }) +#define POSITION_OBJ(p) ((Object) { \ + .type = kObjectTypePosition, \ + .data.position = p \ + }) + #define NIL ((Object) {.type = kObjectTypeNil}) #define PUT(dict, k, v) \ diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index b44b1d13a4..9bba247a7b 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -31,7 +31,6 @@ typedef struct { PMap(cstr_t) *subscribed_events; bool is_job, enabled; msgpack_unpacker *unpacker; - msgpack_sbuffer *sbuffer; union { Job *job; struct { @@ -168,7 +167,7 @@ bool channel_send_call(uint64_t id, "Channel %" PRIu64 " was closed due to a high stack depth " "while processing a RPC call", channel->id); - *result = STRING_OBJ(buf); + *result = STRING_OBJ(cstr_to_string(buf)); } uint64_t request_id = channel->next_request_id++; @@ -319,20 +318,12 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - // Each object is a new msgpack-rpc request and requires an empty response - msgpack_packer response; - msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write); // Perform the call - msgpack_rpc_call(channel->id, &unpacked.data, &response); - WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data, - channel->sbuffer->size), - channel->sbuffer->size, - free); - if (!channel_write(channel, buffer)) { + WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); + // write the response + if (!channel_write(channel, resp)) { goto end; } - // Clear the buffer for future calls - msgpack_sbuffer_clear(channel->sbuffer); } if (result == kUnpackResultFail) { @@ -379,10 +370,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer) return success; } -static void send_error(Channel *channel, uint64_t id, char *err_msg) +static void send_error(Channel *channel, uint64_t id, char *err) { - String err = {.size = strlen(err_msg), .data = err_msg}; - channel_write(channel, serialize_response(id, err, NIL, channel->sbuffer)); + channel_write(channel, serialize_response(id, err, NIL, &out_buffer)); } static void send_request(Channel *channel, @@ -449,7 +439,6 @@ static void unsubscribe(Channel *channel, char *event) static void close_channel(Channel *channel) { pmap_del(uint64_t)(channels, channel->id); - msgpack_sbuffer_free(channel->sbuffer); msgpack_unpacker_free(channel->unpacker); if (channel->is_job) { @@ -485,7 +474,6 @@ static Channel *register_channel() rv->enabled = true; rv->rpc_call_level = 0; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->sbuffer = msgpack_sbuffer_new(); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); rv->next_request_id = 1; @@ -530,7 +518,7 @@ static void call_stack_unwind(Channel *channel, char *msg, int count) while (kv_size(channel->call_stack) && count--) { ChannelCallFrame *frame = kv_pop(channel->call_stack); frame->errored = true; - frame->result = STRING_OBJ(msg); + frame->result = STRING_OBJ(cstr_to_string(msg)); } } diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c index 5e2ec6aa10..85569372da 100644 --- a/src/nvim/os/msgpack_rpc.c +++ b/src/nvim/os/msgpack_rpc.c @@ -8,77 +8,53 @@ #include "nvim/os/wstream.h" #include "nvim/os/msgpack_rpc.h" #include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/api/private/helpers.h" #include "nvim/func_attr.h" #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/msgpack_rpc.c.generated.h" #endif +extern const uint8_t msgpack_metadata[]; +extern const unsigned int msgpack_metadata_size; + /// Validates the basic structure of the msgpack-rpc call and fills `res` /// with the basic response structure. /// -/// @param id The channel id +/// @param channel_id The channel id /// @param req The parsed request object /// @param res A packer that contains the response -void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res) - FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3) +WBuffer *msgpack_rpc_call(uint64_t channel_id, + msgpack_object *req, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2) + FUNC_ATTR_NONNULL_ARG(3) { - // The initial response structure is the same no matter what happens, - // we set it up here - // Array of size 4 - msgpack_pack_array(res, 4); - // Response type is 1 - msgpack_pack_int(res, 1); - - // Validate the basic structure of the msgpack-rpc payload - if (req->type != MSGPACK_OBJECT_ARRAY) { - msgpack_pack_int(res, 0); // no message id yet - msgpack_rpc_error("Request is not an array", res); - return; - } - - if (req->via.array.size != 4) { - msgpack_pack_int(res, 0); // no message id yet - char error_msg[256]; - snprintf(error_msg, - sizeof(error_msg), - "Request array size is %u, it should be 4", - req->via.array.size); - msgpack_rpc_error(error_msg, res); - return; - } + uint64_t response_id; + char *err = msgpack_rpc_validate(&response_id, req); - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - msgpack_pack_int(res, 0); // no message id yet - msgpack_rpc_error("Id must be a positive integer", res); - return; + if (err) { + return serialize_response(response_id, err, NIL, sbuffer); } - // Set the response id, which is the same as the request - msgpack_pack_uint64(res, req->via.array.ptr[1].via.u64); + uint64_t method_id = req->via.array.ptr[2].via.u64; - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - msgpack_rpc_error("Message type must be an integer", res); - return; + if (method_id == 0) { + return serialize_metadata(response_id, channel_id, sbuffer); } - if (req->via.array.ptr[0].via.u64 != 0) { - msgpack_rpc_error("Message type must be 0", res); - return; - } + // dispatch the call + Error error = { .set = false }; + Object rv = msgpack_rpc_dispatch(channel_id, method_id, req, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - msgpack_rpc_error("Method id must be a positive integer", res); - return; + if (error.set) { + return serialize_response(response_id, error.msg, NIL, sbuffer); } - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { - msgpack_rpc_error("Paremeters must be an array", res); - return; - } - - // dispatch the message - msgpack_rpc_dispatch(id, req, res); + return serialize_response(response_id, NULL, rv, sbuffer); } /// Try to unpack a msgpack document from the data in the unpacker buffer. This @@ -134,19 +110,19 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res) } /// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t id, +WBuffer *serialize_request(uint64_t request_id, String method, Object arg, msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ALL + FUNC_ATTR_NONNULL_ARG(4) { msgpack_packer pac; msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, id ? 4 : 3); - msgpack_pack_int(&pac, id ? 0 : 2); + msgpack_pack_array(&pac, request_id ? 4 : 3); + msgpack_pack_int(&pac, request_id ? 0 : 2); - if (id) { - msgpack_pack_uint64(&pac, id); + if (request_id) { + msgpack_pack_uint64(&pac, request_id); } msgpack_pack_raw(&pac, method.size); @@ -161,19 +137,20 @@ WBuffer *serialize_request(uint64_t id, } /// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t id, - String err, +WBuffer *serialize_response(uint64_t response_id, + char *err_msg, Object arg, msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ALL + FUNC_ATTR_NONNULL_ARG(4) { msgpack_packer pac; msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); msgpack_pack_array(&pac, 4); msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, id); + msgpack_pack_uint64(&pac, response_id); - if (err.size) { + if (err_msg) { + String err = {.size = strlen(err_msg), .data = err_msg}; // error message msgpack_pack_raw(&pac, err.size); msgpack_pack_raw_body(&pac, err.data, err.size); @@ -194,3 +171,66 @@ WBuffer *serialize_response(uint64_t id, return rv; } +WBuffer *serialize_metadata(uint64_t id, + uint64_t channel_id, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ALL +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, id); + // Nil error + msgpack_pack_nil(&pac); + // The result is the [channel_id, metadata] array + msgpack_pack_array(&pac, 2); + msgpack_pack_uint64(&pac, channel_id); + msgpack_pack_raw(&pac, msgpack_metadata_size); + msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + free); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +static char *msgpack_rpc_validate(uint64_t *response_id, msgpack_object *req) +{ + // response id not known yet + + *response_id = 0; + // Validate the basic structure of the msgpack-rpc payload + if (req->type != MSGPACK_OBJECT_ARRAY) { + return "Request is not an array"; + } + + if (req->via.array.size != 4) { + return "Request array size should be 4"; + } + + if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + return "Id must be a positive integer"; + } + + // Set the response id, which is the same as the request + *response_id = req->via.array.ptr[1].via.u64; + + if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + return "Message type must be an integer"; + } + + if (req->via.array.ptr[0].via.u64 != 0) { + return "Message type must be 0"; + } + + if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + return "Method id must be a positive integer"; + } + + if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { + return "Paremeters must be an array"; + } + + return NULL; +} diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h index cbb487b44a..b8b947c0ec 100644 --- a/src/nvim/os/msgpack_rpc.h +++ b/src/nvim/os/msgpack_rpc.h @@ -22,12 +22,15 @@ typedef enum { /// The implementation is generated at compile time with metadata extracted /// from the api/*.h headers, /// -/// @param id The channel id +/// @param channel_id The channel id +/// @param method_id The method id /// @param req The parsed request object -/// @param res A packer that contains the response -void msgpack_rpc_dispatch(uint64_t id, - msgpack_object *req, - msgpack_packer *res) +/// @param err Pointer to error structure +/// @return Some object +Object msgpack_rpc_dispatch(uint64_t channel_id, + uint64_t method_id, + msgpack_object *req, + Error *err) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/os/msgpack_rpc_helpers.c index 3af6794169..e2c277abe4 100644 --- a/src/nvim/os/msgpack_rpc_helpers.c +++ b/src/nvim/os/msgpack_rpc_helpers.c @@ -231,12 +231,41 @@ void msgpack_rpc_from_object(Object result, msgpack_packer *res) msgpack_rpc_from_array(result.data.array, res); break; + case kObjectTypePosition: + msgpack_rpc_from_position(result.data.position, res); + break; + + case kObjectTypeBuffer: + msgpack_rpc_from_buffer(result.data.buffer, res); + break; + + case kObjectTypeWindow: + msgpack_rpc_from_window(result.data.window, res); + break; + + case kObjectTypeTabpage: + msgpack_rpc_from_tabpage(result.data.tabpage, res); + break; + + case kObjectTypeStringArray: + msgpack_rpc_from_stringarray(result.data.stringarray, res); + break; + + case kObjectTypeBufferArray: + msgpack_rpc_from_bufferarray(result.data.bufferarray, res); + break; + + case kObjectTypeWindowArray: + msgpack_rpc_from_windowarray(result.data.windowarray, res); + break; + + case kObjectTypeTabpageArray: + msgpack_rpc_from_tabpagearray(result.data.tabpagearray, res); + break; + case kObjectTypeDictionary: msgpack_rpc_from_dictionary(result.data.dictionary, res); break; - - default: - abort(); } } @@ -282,6 +311,10 @@ void msgpack_rpc_free_object(Object value) case kObjectTypeBoolean: case kObjectTypeInteger: case kObjectTypeFloat: + case kObjectTypePosition: + case kObjectTypeBuffer: + case kObjectTypeWindow: + case kObjectTypeTabpage: break; case kObjectTypeString: @@ -292,6 +325,22 @@ void msgpack_rpc_free_object(Object value) msgpack_rpc_free_array(value.data.array); break; + case kObjectTypeStringArray: + msgpack_rpc_free_stringarray(value.data.stringarray); + break; + + case kObjectTypeBufferArray: + msgpack_rpc_free_bufferarray(value.data.bufferarray); + break; + + case kObjectTypeWindowArray: + msgpack_rpc_free_windowarray(value.data.windowarray); + break; + + case kObjectTypeTabpageArray: + msgpack_rpc_free_tabpagearray(value.data.tabpagearray); + break; + case kObjectTypeDictionary: msgpack_rpc_free_dictionary(value.data.dictionary); break; -- cgit