From 3717e2157f2d45ce23dbe4ac03085fea2d956dc4 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 23 Jul 2017 18:04:14 +0200 Subject: Revert channel logging, rebased on new code below --- src/nvim/msgpack_rpc/channel.c | 43 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 29 deletions(-) (limited to 'src/nvim/msgpack_rpc') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5efdb9a194..18f6334780 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -115,15 +115,12 @@ void channel_teardown(void) /// Creates an API channel by starting a process and connecting to its /// stdin/stdout. stderr is handled by the job infrastructure. /// -/// @param proc process object -/// @param id (optional) channel id -/// @param source description of source function, rplugin name, TCP addr, etc -/// -/// @return Channel id (> 0), on success. 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id, char *source) +/// @param argv The argument vector for the process. [consumed] +/// @return The channel id (> 0), on success. +/// 0, on error. +uint64_t channel_from_process(Process *proc, uint64_t id) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events, - source); + Channel *channel = register_channel(kChannelTypeProc, id, proc->events); incref(channel); // process channels are only closed by the exit_cb channel->data.proc = proc; @@ -142,8 +139,7 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, - watcher->addr); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; @@ -156,9 +152,8 @@ void channel_from_connection(SocketWatcher *watcher) &channel->data.stream); } -/// @param source description of source function, rplugin name, TCP addr, etc -uint64_t channel_connect(bool tcp, const char *address, int timeout, - char *source, const char **error) +uint64_t channel_connect(bool tcp, const char *address, + int timeout, const char **error) { if (!tcp) { char *path = fix_fname(address); @@ -170,7 +165,7 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source); + Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); if (!socket_connect(&main_loop, &channel->data.stream, tcp, address, timeout, error)) { decref(channel); @@ -323,10 +318,11 @@ bool channel_close(uint64_t id) return true; } -/// Creates an API channel from stdin/stdout. Used to embed Nvim. +/// Creates an API channel from stdin/stdout. This is used when embedding +/// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL); + Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); incref(channel); // stdio channels are only closed on exit // read stream rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); @@ -342,7 +338,7 @@ void channel_from_stdio(void) /// when an instance connects to its own named pipe. uint64_t channel_create_internal(void) { - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL); + Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); incref(channel); // internal channel lives until process exit return channel->id; } @@ -778,12 +774,9 @@ static void close_cb(Stream *stream, void *data) decref(data); } -/// @param source description of source function, rplugin name, TCP addr, etc static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events, char *source) + MultiQueue *events) { - // Jobs and channels share the same id namespace. - assert(id == 0 || !pmap_get(uint64_t)(channels, id)); Channel *rv = xmalloc(sizeof(Channel)); rv->events = events ? events : multiqueue_new_child(main_loop.events); rv->type = type; @@ -795,14 +788,6 @@ static Channel *register_channel(ChannelType type, uint64_t id, rv->next_request_id = 1; kv_init(rv->call_stack); pmap_put(uint64_t)(channels, rv->id, rv); - - ILOG("new channel %" PRIu64 " (%s): %s", rv->id, - (type == kChannelTypeProc ? "proc" - : (type == kChannelTypeSocket ? "socket" - : (type == kChannelTypeStdio ? "stdio" - : (type == kChannelTypeInternal ? "internal" : "?")))), - (source ? source : "?")); - return rv; } -- cgit From 5215e3205a07b85e4e4cf1f8a8ca6be2b9556459 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Sun, 27 Aug 2017 11:59:33 +0200 Subject: channels: refactor --- src/nvim/msgpack_rpc/channel.c | 388 ++++++++++++------------------------ src/nvim/msgpack_rpc/channel.h | 2 + src/nvim/msgpack_rpc/channel_defs.h | 36 ++++ 3 files changed, 166 insertions(+), 260 deletions(-) create mode 100644 src/nvim/msgpack_rpc/channel_defs.h (limited to 'src/nvim/msgpack_rpc') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 18f6334780..42b1f63830 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -11,6 +11,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" @@ -40,47 +41,6 @@ #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio, - kChannelTypeInternal -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; // bidirectional (socket) - Process *proc; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - MultiQueue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -88,50 +48,32 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { ch_before_blocking_events = multiqueue_new_child(main_loop.events); - channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +void rpc_start(Channel *channel) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); - incref(channel); // process channels are only closed by the exit_cb - channel->data.proc = proc; + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + kv_init(rpc->call_stack); - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, receive_msgpack, channel); + Stream *in = channel_instream(channel); + Stream *out = channel_outstream(channel); - DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, - proc->out); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); - return channel->id; + wstream_init(in, 0); + rstream_init(out, CHANNEL_BUFFER_SIZE); + rstream_start(out, receive_msgpack, channel); } /// Creates an API channel from a tcp/pipe socket connection @@ -139,19 +81,15 @@ uint64_t channel_from_process(Process *proc, uint64_t id) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - socket_watcher_accept(watcher, &channel->data.stream); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); - - DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, - &channel->data.stream); + Channel *channel = channel_alloc(kChannelStreamSocket); + socket_watcher_accept(watcher, &channel->stream.socket); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + rpc_start(channel); } +/// TODO: move to eval.c, also support bytes uint64_t channel_connect(bool tcp, const char *address, int timeout, const char **error) { @@ -165,34 +103,40 @@ uint64_t channel_connect(bool tcp, const char *address, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - if (!socket_connect(&main_loop, &channel->data.stream, + Channel *channel = channel_alloc(kChannelStreamSocket); + if (!socket_connect(&main_loop, &channel->stream.socket, tcp, address, timeout, error)) { - decref(channel); + channel_decref(channel); return 0; } - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + rpc_start(channel); return channel->id; } +static Channel *find_rpc_channel(uint64_t id) +{ + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; + } + return chan; +} + /// Publishes an event to a channel. /// /// @param id Channel id. 0 means "broadcast to all subscribed channels" /// @param name Event name (application-defined) /// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, const char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } @@ -213,29 +157,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - const char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { @@ -260,7 +205,7 @@ Object channel_send_call(uint64_t id, api_free_object(frame.result); } - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -269,11 +214,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -284,18 +229,18 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -310,7 +255,7 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { return false; } @@ -322,24 +267,22 @@ bool channel_close(uint64_t id) /// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); - incref(channel); // stdio channels are only closed on exit + Channel *channel = channel_alloc(kChannelStreamStdio); + channel_incref(channel); // stdio channels are only closed on exit // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, receive_msgpack, channel); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); + rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); + wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); - DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, - &channel->data.std.in, &channel->data.std.out); + rpc_start(channel); } /// Creates a loopback channel. This is used to avoid deadlock /// when an instance connects to its own named pipe. uint64_t channel_create_internal(void) { - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); - incref(channel); // internal channel lives until process exit + Channel *channel = channel_alloc(kChannelStreamInternal); + channel_incref(channel); // internal channel lives until process exit + rpc_start(channel); return channel->id; } @@ -347,8 +290,8 @@ void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); - channel->closed = true; - decref(channel); + // channel_decref(channel); remove?? + channel->rpc.closed = true; } // rstream.c:read_event() invokes this as stream->read_cb(). @@ -356,7 +299,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { close_channel(channel); @@ -367,30 +310,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); - DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); parse_msgpack(channel); end: - decref(channel); + channel_decref(channel); } static void parse_msgpack(Channel *channel) @@ -400,8 +332,8 @@ static void parse_msgpack(Channel *channel) msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -427,7 +359,7 @@ static void parse_msgpack(Channel *channel) if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -492,7 +424,7 @@ static void handle_request(Channel *channel, msgpack_object *request) evdata->handler = handler; evdata->args = args; evdata->request_id = request_id; - incref(channel); + channel_incref(channel); if (handler.async) { bool is_get_mode = handler.fn == handle_nvim_get_mode; @@ -530,66 +462,30 @@ static void on_request_event(void **argv) api_free_object(result); } api_free_array(args); - decref(channel); + channel_decref(channel); xfree(e); api_clear_error(&error); } -/// Returns the Stream that a Channel writes to. -static Stream *chan_wstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->in; - case kChannelTypeStdio: - return &chan->data.std.out; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { - bool success = false; + bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - case kChannelTypeProc: - case kChannelTypeStdio: - success = wstream_write(chan_wstream(channel), buffer); - break; - case kChannelTypeInternal: - incref(channel); - CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); - success = true; - break; + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; @@ -609,14 +505,14 @@ static void internal_read_event(void **argv) Channel *channel = argv[0]; WBuffer *buffer = argv[1]; - msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->unpacker), + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); parse_msgpack(channel); - decref(channel); + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -665,7 +561,8 @@ static void broadcast_event(const char *name, Array args) Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -695,10 +592,11 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -709,86 +607,65 @@ static void unsubscribe(Channel *channel, char *event) } /// Close the channel streams/process and free the channel resources. +/// TODO: move to channel.h static void close_channel(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL, NULL); + switch (channel->streamtype) { + case kChannelStreamSocket: + stream_close(&channel->stream.socket, NULL, NULL); break; - case kChannelTypeProc: + case kChannelStreamProc: // Only close the rpc channel part, // there could be an error message on the stderr stream - process_close_in(channel->data.proc); - process_close_out(channel->data.proc); + process_close_in(&channel->stream.proc); + process_close_out(&channel->stream.proc); break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL, NULL); - stream_close(&channel->data.std.out, NULL, NULL); + case kChannelStreamStdio: + stream_close(&channel->stream.stdio.in, NULL, NULL); + stream_close(&channel->stream.stdio.out, NULL, NULL); multiqueue_put(main_loop.fast_events, exit_event, 1, channel); return; - case kChannelTypeInternal: + case kChannelStreamInternal: // nothing to free. break; } - decref(channel); + channel_decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); + channel_decref(argv[0]); if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - if (channel->type != kChannelTypeProc) { - multiqueue_free(channel->events); - } - xfree(channel); + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); } static void close_cb(Stream *stream, void *data) { - decref(data); -} - -static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = events ? events : multiqueue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = id > 0 ? id : next_chan_id++; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + channel_decref(data); } static bool is_rpc_response(msgpack_object *obj) @@ -803,15 +680,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -825,8 +705,8 @@ static void complete_call(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; api_free_object(frame->result); @@ -875,18 +755,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void incref(Channel *channel) -{ - channel->refcount++; -} - -static void decref(Channel *channel) -{ - if (!(--channel->refcount)) { - free_channel(channel); - } -} - #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL #define REQ "[request] " #define RES "[response] " diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index f8fe6f129b..9ff5abdc5f 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -8,6 +8,7 @@ #include "nvim/event/socket.h" #include "nvim/event/process.h" #include "nvim/vim.h" +#include "nvim/channel.h" #define METHOD_MAXLEN 512 @@ -16,6 +17,7 @@ /// of os_inchar(), so they are processed "just-in-time". MultiQueue *ch_before_blocking_events; + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/channel.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/channel_defs.h b/src/nvim/msgpack_rpc/channel_defs.h new file mode 100644 index 0000000000..6d8362e8b7 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel_defs.h @@ -0,0 +1,36 @@ +#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H +#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H + +#include +#include +#include + +#include "nvim/api/private/defs.h" +#include "nvim/event/socket.h" +#include "nvim/event/process.h" +#include "nvim/vim.h" + +typedef struct Channel Channel; + +typedef struct { + uint64_t request_id; + bool returned, errored; + Object result; +} ChannelCallFrame; + +typedef struct { + Channel *channel; + MsgpackRpcRequestHandler handler; + Array args; + uint64_t request_id; +} RequestEvent; + +typedef struct { + PMap(cstr_t) *subscribed_events; + bool closed; + msgpack_unpacker *unpacker; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; +} RpcState; + +#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H -- cgit From 1ebc96fe10fbdbec22caa26d5d52a9f095da9687 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Mon, 5 Jun 2017 08:29:10 +0200 Subject: channels: allow bytes sockets and stdio, and buffered bytes output --- src/nvim/msgpack_rpc/channel.c | 87 ++++-------------------------------------- 1 file changed, 8 insertions(+), 79 deletions(-) (limited to 'src/nvim/msgpack_rpc') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 42b1f63830..56af4fa791 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -13,7 +13,6 @@ #include "nvim/api/ui.h" #include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" -#include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" #include "nvim/event/libuv_process.h" #include "nvim/event/rstream.h" @@ -30,12 +29,9 @@ #include "nvim/map.h" #include "nvim/log.h" #include "nvim/misc1.h" -#include "nvim/path.h" #include "nvim/lib/kvec.h" #include "nvim/os/input.h" -#define CHANNEL_BUFFER_SIZE 0xffff - #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL #define log_client_msg(...) #define log_server_msg(...) @@ -66,57 +62,18 @@ void rpc_start(Channel *channel) rpc->next_request_id = 1; kv_init(rpc->call_stack); - Stream *in = channel_instream(channel); - Stream *out = channel_outstream(channel); - - DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); - - wstream_init(in, 0); - rstream_init(out, CHANNEL_BUFFER_SIZE); - rstream_start(out, receive_msgpack, channel); -} - -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) -{ - Channel *channel = channel_alloc(kChannelStreamSocket); - socket_watcher_accept(watcher, &channel->stream.socket); - channel_incref(channel); // close channel only after the stream is closed - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - rpc_start(channel); -} - -/// TODO: move to eval.c, also support bytes -uint64_t channel_connect(bool tcp, const char *address, - int timeout, const char **error) -{ - if (!tcp) { - char *path = fix_fname(address); - if (server_owns_pipe_address(path)) { - // avoid deadlock - xfree(path); - return channel_create_internal(); - } - xfree(path); - } + if (channel->streamtype != kChannelStreamInternal) { + Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL + Stream *in = channel_instream(channel); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif - Channel *channel = channel_alloc(kChannelStreamSocket); - if (!socket_connect(&main_loop, &channel->stream.socket, - tcp, address, timeout, error)) { - channel_decref(channel); - return 0; + rstream_start(out, receive_msgpack, channel); } - - channel_incref(channel); // close channel only after the stream is closed - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - rpc_start(channel); - return channel->id; } + static Channel *find_rpc_channel(uint64_t id) { Channel *chan = find_channel(id); @@ -263,29 +220,6 @@ bool channel_close(uint64_t id) return true; } -/// Creates an API channel from stdin/stdout. This is used when embedding -/// Neovim -void channel_from_stdio(void) -{ - Channel *channel = channel_alloc(kChannelStreamStdio); - channel_incref(channel); // stdio channels are only closed on exit - // read stream - rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); - wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); - - rpc_start(channel); -} - -/// Creates a loopback channel. This is used to avoid deadlock -/// when an instance connects to its own named pipe. -uint64_t channel_create_internal(void) -{ - Channel *channel = channel_alloc(kChannelStreamInternal); - channel_incref(channel); // internal channel lives until process exit - rpc_start(channel); - return channel->id; -} - void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); @@ -663,11 +597,6 @@ void rpc_free(Channel *channel) kv_destroy(channel->rpc.call_stack); } -static void close_cb(Stream *stream, void *data) -{ - channel_decref(data); -} - static bool is_rpc_response(msgpack_object *obj) { return obj->type == MSGPACK_OBJECT_ARRAY -- cgit From 90e5cc5484ceeb410ae2a2706e09ed475cade4a5 Mon Sep 17 00:00:00 2001 From: Björn Linse Date: Thu, 8 Jun 2017 17:15:53 +0200 Subject: channels: generalize jobclose() --- src/nvim/msgpack_rpc/channel.c | 62 +++++++----------------------------------- 1 file changed, 10 insertions(+), 52 deletions(-) (limited to 'src/nvim/msgpack_rpc') diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 56af4fa791..32781cf4d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -54,6 +54,7 @@ void rpc_init(void) void rpc_start(Channel *channel) { + channel_incref(channel); channel->is_rpc = true; RpcState *rpc = &channel->rpc; rpc->closed = false; @@ -204,31 +205,6 @@ void rpc_unsubscribe(uint64_t id, char *event) unsubscribe(channel, event); } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ - Channel *channel; - - if (!(channel = find_rpc_channel(id))) { - return false; - } - - close_channel(channel); - return true; -} - -void channel_process_exit(uint64_t id, int status) -{ - Channel *channel = pmap_get(uint64_t)(channels, id); - - // channel_decref(channel); remove?? - channel->rpc.closed = true; -} - -// rstream.c:read_event() invokes this as stream->read_cb(). static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { @@ -236,7 +212,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, channel_incref(channel); if (eof) { - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); char buf[256]; snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client", channel->id); @@ -540,43 +516,25 @@ static void unsubscribe(Channel *channel, char *event) xfree(event_string); } -/// Close the channel streams/process and free the channel resources. -/// TODO: move to channel.h -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel) { if (channel->rpc.closed) { return; } channel->rpc.closed = true; + channel_decref(channel); - switch (channel->streamtype) { - case kChannelStreamSocket: - stream_close(&channel->stream.socket, NULL, NULL); - break; - case kChannelStreamProc: - // Only close the rpc channel part, - // there could be an error message on the stderr stream - process_close_in(&channel->stream.proc); - process_close_out(&channel->stream.proc); - break; - case kChannelStreamStdio: - stream_close(&channel->stream.stdio.in, NULL, NULL); - stream_close(&channel->stream.stdio.out, NULL, NULL); - multiqueue_put(main_loop.fast_events, exit_event, 1, channel); - return; - case kChannelStreamInternal: - // nothing to free. - break; + if (channel->streamtype == kChannelStreamStdio) { + multiqueue_put(main_loop.fast_events, exit_event, 0); } - - channel_decref(channel); } static void exit_event(void **argv) { - channel_decref(argv[0]); - if (!exiting) { mch_exit(0); } @@ -642,7 +600,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel) frame->result = STRING_OBJ(cstr_to_string(msg)); } - close_channel(channel); + channel_close(channel->id, kChannelPartRpc, NULL); } static WBuffer *serialize_request(uint64_t channel_id, -- cgit