diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 501 |
1 files changed, 178 insertions, 323 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index de6167c7fc..32781cf4d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -1,3 +1,6 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + #include <stdbool.h> #include <string.h> #include <inttypes.h> @@ -8,6 +11,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" @@ -26,56 +30,13 @@ #include "nvim/log.h" #include "nvim/misc1.h" #include "nvim/lib/kvec.h" - -#define CHANNEL_BUFFER_SIZE 0xffff +#include "nvim/os/input.h" #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - size_t pending_requests; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; - Process *proc; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - kvec_t(WBuffer *) delayed_notifications; - MultiQueue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -83,91 +44,64 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { - channels = pmap_new(uint64_t)(); + ch_before_blocking_events = multiqueue_new_child(main_loop.events); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +void rpc_start(Channel *channel) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); - incref(channel); // process channels are only closed by the exit_cb - channel->data.proc = proc; + channel_incref(channel); + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + kv_init(rpc->call_stack); - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack, channel); + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - return channel->id; + rstream_start(out, receive_msgpack, channel); + } } -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) + +static Channel *find_rpc_channel(uint64_t id) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - socket_watcher_accept(watcher, &channel->data.stream); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack, channel); + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; + } + return chan; } -/// Sends event/arguments to channel +/// Publishes an event to a channel. /// -/// @param id The channel id. If 0, the event will be sent to all -/// channels that have subscribed to the event type -/// @param name The event name, an arbitrary string -/// @param args Array with event arguments +/// @param id Channel id. 0 means "broadcast to all subscribed channels" +/// @param name Event name (application-defined) +/// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } if (channel) { - if (channel->pending_requests) { - // Pending request, queue the notification for later sending. - String method = cstr_as_string(name); - WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1); - kv_push(channel->delayed_notifications, buffer); - } else { - send_event(channel, name, args); - } + send_event(channel, name, args); } else { - // TODO(tarruda): Implement event broadcasting in vimscript broadcast_event(name, args); } @@ -181,35 +115,35 @@ bool channel_send_event(uint64_t id, char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); + if (!(channel = find_rpc_channel(id))) { + api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); - channel->pending_requests++; + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); - channel->pending_requests--; + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { - api_set_error(err, Exception, "%s", frame.result.data.string.data); + api_set_error(err, kErrorTypeException, "%s", + frame.result.data.string.data); } else if (frame.result.type == kObjectTypeArray) { // Should be an error in the form [type, message] Array array = frame.result.data.array; @@ -217,24 +151,19 @@ Object channel_send_call(uint64_t id, && (array.items[0].data.integer == kErrorTypeException || array.items[0].data.integer == kErrorTypeValidation) && array.items[1].type == kObjectTypeString) { - err->type = (ErrorType) array.items[0].data.integer; - xstrlcpy(err->msg, array.items[1].data.string.data, sizeof(err->msg)); - err->set = true; + api_set_error(err, (ErrorType)array.items[0].data.integer, "%s", + array.items[1].data.string.data); } else { - api_set_error(err, Exception, "%s", "unknown error"); + api_set_error(err, kErrorTypeException, "%s", "unknown error"); } } else { - api_set_error(err, Exception, "%s", "unknown error"); + api_set_error(err, kErrorTypeException, "%s", "unknown error"); } api_free_object(frame.result); } - if (!channel->pending_requests) { - send_delayed_notifications(channel); - } - - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -243,11 +172,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -258,91 +187,63 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } unsubscribe(channel, event); } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { - return false; - } - - close_channel(channel); - return true; -} - -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); - incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, parse_msgpack, channel); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); -} - -void channel_process_exit(uint64_t id, int status) -{ - Channel *channel = pmap_get(uint64_t)(channels, id); - - channel->closed = true; - decref(channel); -} - -static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, - bool eof) +static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, + void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, WARN_LOG_LEVEL); goto end; } size_t count = rbuffer_size(rbuf); - DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream); + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", + channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); + parse_msgpack(channel); + +end: + channel_decref(channel); +} + +static void parse_msgpack(Channel *channel) +{ msgpack_unpacked unpacked; msgpack_unpacked_init(&unpacked); msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -355,11 +256,11 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, "ch %" PRIu64 " returned a response with an unknown request " "id. Ensure the client is properly synchronized", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -368,7 +269,7 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -377,17 +278,15 @@ static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, // causes for this error(search for 'goto _failed') // // A not so uncommon cause for this might be deserializing objects with - // a high nesting level: msgpack will break when it's internal parse stack - // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default) + // a high nesting level: msgpack will break when its internal parse stack + // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) send_error(channel, 0, "Invalid msgpack payload. " "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - decref(channel); } + static void handle_request(Channel *channel, msgpack_object *request) FUNC_ATTR_NONNULL_ALL { @@ -395,7 +294,7 @@ static void handle_request(Channel *channel, msgpack_object *request) Error error = ERROR_INIT; msgpack_rpc_validate(&request_id, request, &error); - if (error.set) { + if (ERROR_SET(&error)) { // Validation failed, send response with error if (channel_write(channel, serialize_response(channel->id, @@ -407,11 +306,11 @@ static void handle_request(Channel *channel, msgpack_object *request) snprintf(buf, sizeof(buf), "ch %" PRIu64 " sent an invalid message, closed.", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } + api_clear_error(&error); return; } - // Retrieve the request handler MsgpackRpcRequestHandler handler; msgpack_object *method = msgpack_rpc_method(request); @@ -430,16 +329,24 @@ static void handle_request(Channel *channel, msgpack_object *request) handler.async = true; } - RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); - event_data->channel = channel; - event_data->handler = handler; - event_data->args = args; - event_data->request_id = request_id; - incref(channel); + RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); + evdata->channel = channel; + evdata->handler = handler; + evdata->args = args; + evdata->request_id = request_id; + channel_incref(channel); if (handler.async) { - on_request_event((void **)&event_data); + bool is_get_mode = handler.fn == handle_nvim_get_mode; + + if (is_get_mode && !input_blocking()) { + // Defer the event to a special queue used by os/input.c. #6247 + multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata); + } else { + // Invoke immediately. + on_request_event((void **)&evdata); + } } else { - multiqueue_put(channel->events, on_request_event, 1, event_data); + multiqueue_put(channel->events, on_request_event, 1, evdata); } } @@ -465,64 +372,78 @@ static void on_request_event(void **argv) api_free_object(result); } api_free_array(args); - decref(channel); + channel_decref(channel); xfree(e); + api_clear_error(&error); } static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - success = wstream_write(&channel->data.stream, buffer); - break; - case kChannelTypeProc: - success = wstream_write(channel->data.proc->in, buffer); - break; - case kChannelTypeStdio: - success = wstream_write(&channel->data.std.out, buffer); - break; - default: - abort(); + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; snprintf(buf, sizeof(buf), - "Before returning from a RPC call, ch %" PRIu64 " was " - "closed due to a failed write", + "ch %" PRIu64 ": stream write failed. " + "RPC canceled; closing channel", channel->id); - call_set_error(channel, buf); + call_set_error(channel, buf, ERROR_LOG_LEVEL); } return success; } +static void internal_read_event(void **argv) +{ + Channel *channel = argv[0]; + WBuffer *buffer = argv[1]; + + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), + buffer->data, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); + + parse_msgpack(channel); + + channel_decref(channel); + wstream_release_wbuffer(buffer); +} + static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; - api_set_error(&e, Exception, "%s", err); + api_set_error(&e, kErrorTypeException, "%s", err); channel_write(channel, serialize_response(channel->id, id, &e, NIL, &out_buffer)); + api_clear_error(&e); } static void send_request(Channel *channel, uint64_t id, - char *name, + const char *name, Array args) { - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); channel_write(channel, serialize_request(channel->id, id, method, @@ -532,10 +453,10 @@ static void send_request(Channel *channel, } static void send_event(Channel *channel, - char *name, + const char *name, Array args) { - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); channel_write(channel, serialize_request(channel->id, 0, method, @@ -544,13 +465,14 @@ static void send_event(Channel *channel, 1)); } -static void broadcast_event(char *name, Array args) +static void broadcast_event(const char *name, Array args) { kvec_t(Channel *) subscribed = KV_INITIAL_VALUE; Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -560,7 +482,7 @@ static void broadcast_event(char *name, Array args) goto end; } - String method = {.size = strlen(name), .data = name}; + const String method = cstr_as_string((char *)name); WBuffer *buffer = serialize_request(0, 0, method, @@ -570,11 +492,7 @@ static void broadcast_event(char *name, Array args) for (size_t i = 0; i < kv_size(subscribed); i++) { Channel *channel = kv_A(subscribed, i); - if (channel->pending_requests) { - kv_push(channel->delayed_notifications, buffer); - } else { - channel_write(channel, buffer); - } + channel_write(channel, buffer); } end: @@ -584,10 +502,11 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -597,89 +516,43 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/process and free the channel resources. -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; + channel_decref(channel); - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL, NULL); - break; - case kChannelTypeProc: - // Only close the rpc channel part, - // there could be an error message on the stderr stream - process_close_in(channel->data.proc); - process_close_out(channel->data.proc); - break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL, NULL); - stream_close(&channel->data.std.out, NULL, NULL); - multiqueue_put(main_loop.fast_events, exit_event, 1, channel); - return; - default: - abort(); + if (channel->streamtype == kChannelStreamStdio) { + multiqueue_put(main_loop.fast_events, exit_event, 0); } - - decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); - if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - kv_destroy(channel->delayed_notifications); - if (channel->type != kChannelTypeProc) { - multiqueue_free(channel->events); - } - xfree(channel); -} - -static void close_cb(Stream *stream, void *data) -{ - decref(data); -} - -static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = events ? events : multiqueue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = id > 0 ? id : next_chan_id++; - rv->pending_requests = 0; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - kv_init(rv->delayed_notifications); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); } static bool is_rpc_response(msgpack_object *obj) @@ -694,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -713,22 +589,23 @@ static void complete_call(msgpack_object *obj, Channel *channel) } } -static void call_set_error(Channel *channel, char *msg) +static void call_set_error(Channel *channel, char *msg, int loglevel) { - ELOG("RPC: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + LOG(loglevel, "RPC: %s", msg); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; + api_free_object(frame->result); frame->result = STRING_OBJ(cstr_to_string(msg)); } - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); } static WBuffer *serialize_request(uint64_t channel_id, uint64_t request_id, - String method, + const String method, Array args, msgpack_sbuffer *sbuffer, size_t refcount) @@ -765,28 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void send_delayed_notifications(Channel* channel) -{ - for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) { - WBuffer *buffer = kv_A(channel->delayed_notifications, i); - channel_write(channel, buffer); - } - - kv_size(channel->delayed_notifications) = 0; -} - -static void incref(Channel *channel) -{ - channel->refcount++; -} - -static void decref(Channel *channel) -{ - if (!(--channel->refcount)) { - free_channel(channel); - } -} - #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL #define REQ "[request] " #define RES "[response] " |