diff options
Diffstat (limited to 'src/nvim/msgpack_rpc/channel.c')
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 454 | 
1 files changed, 97 insertions, 357 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5efdb9a194..32781cf4d9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -11,8 +11,8 @@  #include "nvim/api/private/helpers.h"  #include "nvim/api/vim.h"  #include "nvim/api/ui.h" +#include "nvim/channel.h"  #include "nvim/msgpack_rpc/channel.h" -#include "nvim/msgpack_rpc/server.h"  #include "nvim/event/loop.h"  #include "nvim/event/libuv_process.h"  #include "nvim/event/rstream.h" @@ -29,58 +29,14 @@  #include "nvim/map.h"  #include "nvim/log.h"  #include "nvim/misc1.h" -#include "nvim/path.h"  #include "nvim/lib/kvec.h"  #include "nvim/os/input.h" -#define CHANNEL_BUFFER_SIZE 0xffff -  #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL  #define log_client_msg(...)  #define log_server_msg(...)  #endif -typedef enum { -  kChannelTypeSocket, -  kChannelTypeProc, -  kChannelTypeStdio, -  kChannelTypeInternal -} ChannelType; - -typedef struct { -  uint64_t request_id; -  bool returned, errored; -  Object result; -} ChannelCallFrame; - -typedef struct { -  uint64_t id; -  size_t refcount; -  PMap(cstr_t) *subscribed_events; -  bool closed; -  ChannelType type; -  msgpack_unpacker *unpacker; -  union { -    Stream stream;  // bidirectional (socket) -    Process *proc; -    struct { -      Stream in; -      Stream out; -    } std; -  } data; -  uint64_t next_request_id; -  kvec_t(ChannelCallFrame *) call_stack; -  MultiQueue *events; -} Channel; - -typedef struct { -  Channel *channel; -  MsgpackRpcRequestHandler handler; -  Array args; -  uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL;  static PMap(cstr_t) *event_strings = NULL;  static msgpack_sbuffer out_buffer; @@ -88,102 +44,44 @@ static msgpack_sbuffer out_buffer;  # include "msgpack_rpc/channel.c.generated.h"  #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void)  {    ch_before_blocking_events = multiqueue_new_child(main_loop.events); -  channels = pmap_new(uint64_t)();    event_strings = pmap_new(cstr_t)();    msgpack_sbuffer_init(&out_buffer); -  remote_ui_init();  } -/// Teardown the module -void channel_teardown(void) -{ -  if (!channels) { -    return; -  } - -  Channel *channel; - -  map_foreach_value(channels, channel, { -    close_channel(channel); -  }); -} -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param proc   process object -/// @param id     (optional) channel id -/// @param source description of source function, rplugin name, TCP addr, etc -/// -/// @return Channel id (> 0), on success. 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id, char *source) +void rpc_start(Channel *channel)  { -  Channel *channel = register_channel(kChannelTypeProc, id, proc->events, -                                      source); -  incref(channel);  // process channels are only closed by the exit_cb -  channel->data.proc = proc; +  channel_incref(channel); +  channel->is_rpc = true; +  RpcState *rpc = &channel->rpc; +  rpc->closed = false; +  rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); +  rpc->subscribed_events = pmap_new(cstr_t)(); +  rpc->next_request_id = 1; +  kv_init(rpc->call_stack); -  wstream_init(proc->in, 0); -  rstream_init(proc->out, 0); -  rstream_start(proc->out, receive_msgpack, channel); - -  DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, -       proc->out); +  if (channel->streamtype != kChannelStreamInternal) { +    Stream *out = channel_outstream(channel); +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +    Stream *in = channel_instream(channel); +    DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); +#endif -  return channel->id; +    rstream_start(out, receive_msgpack, channel); +  }  } -/// Creates an API channel from a tcp/pipe socket connection -/// -/// @param watcher The SocketWatcher ready to accept the connection -void channel_from_connection(SocketWatcher *watcher) -{ -  Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, -                                      watcher->addr); -  socket_watcher_accept(watcher, &channel->data.stream); -  incref(channel);  // close channel only after the stream is closed -  channel->data.stream.internal_close_cb = close_cb; -  channel->data.stream.internal_data = channel; -  wstream_init(&channel->data.stream, 0); -  rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); -  rstream_start(&channel->data.stream, receive_msgpack, channel); - -  DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, -       &channel->data.stream); -} -/// @param source description of source function, rplugin name, TCP addr, etc -uint64_t channel_connect(bool tcp, const char *address, int timeout, -                         char *source, const char **error) +static Channel *find_rpc_channel(uint64_t id)  { -  if (!tcp) { -    char *path = fix_fname(address); -    if (server_owns_pipe_address(path)) { -      // avoid deadlock -      xfree(path); -      return channel_create_internal(); -    } -    xfree(path); -  } - -  Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source); -  if (!socket_connect(&main_loop, &channel->data.stream, -                      tcp, address, timeout, error)) { -    decref(channel); -    return 0; +  Channel *chan = find_channel(id); +  if (!chan || !chan->is_rpc || chan->rpc.closed) { +    return NULL;    } - -  incref(channel);  // close channel only after the stream is closed -  channel->data.stream.internal_close_cb = close_cb; -  channel->data.stream.internal_data = channel; -  wstream_init(&channel->data.stream, 0); -  rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); -  rstream_start(&channel->data.stream, receive_msgpack, channel); -  return channel->id; +  return chan;  }  /// Publishes an event to a channel. @@ -192,12 +90,11 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout,  /// @param name Event name (application-defined)  /// @param args Array of event arguments  /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, const char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args)  {    Channel *channel = NULL; -  if (id && (!(channel = pmap_get(uint64_t)(channels, id)) -            || channel->closed)) { +  if (id && (!(channel = find_rpc_channel(id)))) {      api_free_array(args);      return false;    } @@ -218,29 +115,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args)  /// @param args Array with method arguments  /// @param[out] error True if the return value is an error  /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, -                         const char *method_name, -                         Array args, -                         Error *err) +Object rpc_send_call(uint64_t id, +                     const char *method_name, +                     Array args, +                     Error *err)  {    Channel *channel = NULL; -  if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { +  if (!(channel = find_rpc_channel(id))) {      api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);      api_free_array(args);      return NIL;    } -  incref(channel); -  uint64_t request_id = channel->next_request_id++; +  channel_incref(channel); +  RpcState *rpc = &channel->rpc; +  uint64_t request_id = rpc->next_request_id++;    // Send the msgpack-rpc request    send_request(channel, request_id, method_name, args);    // Push the frame    ChannelCallFrame frame = { request_id, false, false, NIL }; -  kv_push(channel->call_stack, &frame); +  kv_push(rpc->call_stack, &frame);    LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); -  (void)kv_pop(channel->call_stack); +  (void)kv_pop(rpc->call_stack);    if (frame.errored) {      if (frame.result.type == kObjectTypeString) { @@ -265,7 +163,7 @@ Object channel_send_call(uint64_t id,      api_free_object(frame.result);    } -  decref(channel); +  channel_decref(channel);    return frame.errored ? NIL : frame.result;  } @@ -274,11 +172,11 @@ Object channel_send_call(uint64_t id,  ///  /// @param id The channel id  /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event)  {    Channel *channel; -  if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { +  if (!(channel = find_rpc_channel(id))) {      abort();    } @@ -289,81 +187,32 @@ void channel_subscribe(uint64_t id, char *event)      pmap_put(cstr_t)(event_strings, event_string, event_string);    } -  pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); +  pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);  }  /// Unsubscribes to event broadcasts  ///  /// @param id The channel id  /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event)  {    Channel *channel; -  if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { +  if (!(channel = find_rpc_channel(id))) {      abort();    }    unsubscribe(channel, event);  } -/// Closes a channel -/// -/// @param id The channel id -/// @return true if successful, false otherwise -bool channel_close(uint64_t id) -{ -  Channel *channel; - -  if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { -    return false; -  } - -  close_channel(channel); -  return true; -} - -/// Creates an API channel from stdin/stdout. Used to embed Nvim. -void channel_from_stdio(void) -{ -  Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL); -  incref(channel);  // stdio channels are only closed on exit -  // read stream -  rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); -  rstream_start(&channel->data.std.in, receive_msgpack, channel); -  // write stream -  wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); - -  DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, -       &channel->data.std.in, &channel->data.std.out); -} - -/// Creates a loopback channel. This is used to avoid deadlock -/// when an instance connects to its own named pipe. -uint64_t channel_create_internal(void) -{ -  Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL); -  incref(channel);  // internal channel lives until process exit -  return channel->id; -} - -void channel_process_exit(uint64_t id, int status) -{ -  Channel *channel = pmap_get(uint64_t)(channels, id); - -  channel->closed = true; -  decref(channel); -} - -// rstream.c:read_event() invokes this as stream->read_cb().  static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,                              void *data, bool eof)  {    Channel *channel = data; -  incref(channel); +  channel_incref(channel);    if (eof) { -    close_channel(channel); +    channel_close(channel->id, kChannelPartRpc, NULL);      char buf[256];      snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",               channel->id); @@ -371,30 +220,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,      goto end;    } -  if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) -      || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { -    char buf[256]; -    snprintf(buf, sizeof(buf), -             "ch %" PRIu64 ": stream closed unexpectedly. " -             "closing channel", -             channel->id); -    call_set_error(channel, buf, WARN_LOG_LEVEL); -    goto end; -  } -    size_t count = rbuffer_size(rbuf); -  DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", +  DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",         channel->id, count, stream);    // Feed the unpacker with data -  msgpack_unpacker_reserve_buffer(channel->unpacker, count); -  rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); -  msgpack_unpacker_buffer_consumed(channel->unpacker, count); +  msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); +  rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); +  msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);    parse_msgpack(channel);  end: -  decref(channel); +  channel_decref(channel);  }  static void parse_msgpack(Channel *channel) @@ -404,8 +242,8 @@ static void parse_msgpack(Channel *channel)    msgpack_unpack_return result;    // Deserialize everything we can. -  while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == -      MSGPACK_UNPACK_SUCCESS) { +  while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == +         MSGPACK_UNPACK_SUCCESS) {      bool is_response = is_rpc_response(&unpacked.data);      log_client_msg(channel->id, !is_response, unpacked.data); @@ -431,7 +269,7 @@ static void parse_msgpack(Channel *channel)    if (result == MSGPACK_UNPACK_NOMEM_ERROR) {      mch_errmsg(e_outofmem);      mch_errmsg("\n"); -    decref(channel); +    channel_decref(channel);      preserve_exit();    } @@ -496,7 +334,7 @@ static void handle_request(Channel *channel, msgpack_object *request)    evdata->handler = handler;    evdata->args = args;    evdata->request_id = request_id; -  incref(channel); +  channel_incref(channel);    if (handler.async) {      bool is_get_mode = handler.fn == handle_nvim_get_mode; @@ -534,66 +372,30 @@ static void on_request_event(void **argv)      api_free_object(result);    }    api_free_array(args); -  decref(channel); +  channel_decref(channel);    xfree(e);    api_clear_error(&error);  } -/// Returns the Stream that a Channel writes to. -static Stream *chan_wstream(Channel *chan) -{ -  switch (chan->type) { -    case kChannelTypeSocket: -      return &chan->data.stream; -    case kChannelTypeProc: -      return chan->data.proc->in; -    case kChannelTypeStdio: -      return &chan->data.std.out; -    case kChannelTypeInternal: -      return NULL; -  } -  abort(); -} - -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ -  switch (chan->type) { -    case kChannelTypeSocket: -      return &chan->data.stream; -    case kChannelTypeProc: -      return chan->data.proc->out; -    case kChannelTypeStdio: -      return &chan->data.std.in; -    case kChannelTypeInternal: -      return NULL; -  } -  abort(); -} - -  static bool channel_write(Channel *channel, WBuffer *buffer)  { -  bool success = false; +  bool success; -  if (channel->closed) { +  if (channel->rpc.closed) {      wstream_release_wbuffer(buffer);      return false;    } -  switch (channel->type) { -    case kChannelTypeSocket: -    case kChannelTypeProc: -    case kChannelTypeStdio: -      success = wstream_write(chan_wstream(channel), buffer); -      break; -    case kChannelTypeInternal: -      incref(channel); -      CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); -      success = true; -      break; +  if (channel->streamtype == kChannelStreamInternal) { +    channel_incref(channel); +    CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); +    success = true; +  } else { +    Stream *in = channel_instream(channel); +    success = wstream_write(in, buffer);    } +    if (!success) {      // If the write failed for any reason, close the channel      char buf[256]; @@ -613,14 +415,14 @@ static void internal_read_event(void **argv)    Channel *channel = argv[0];    WBuffer *buffer = argv[1]; -  msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); -  memcpy(msgpack_unpacker_buffer(channel->unpacker), +  msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); +  memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),           buffer->data, buffer->size); -  msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); +  msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);    parse_msgpack(channel); -  decref(channel); +  channel_decref(channel);    wstream_release_wbuffer(buffer);  } @@ -669,7 +471,8 @@ static void broadcast_event(const char *name, Array args)    Channel *channel;    map_foreach_value(channels, channel, { -    if (pmap_has(cstr_t)(channel->subscribed_events, name)) { +    if (channel->is_rpc +        && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {        kv_push(subscribed, channel);      }    }); @@ -699,10 +502,11 @@ end:  static void unsubscribe(Channel *channel, char *event)  {    char *event_string = pmap_get(cstr_t)(event_strings, event); -  pmap_del(cstr_t)(channel->subscribed_events, event_string); +  pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);    map_foreach_value(channels, channel, { -    if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { +    if (channel->is_rpc +        && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {        return;      }    }); @@ -712,98 +516,43 @@ static void unsubscribe(Channel *channel, char *event)    xfree(event_string);  } -/// Close the channel streams/process and free the channel resources. -static void close_channel(Channel *channel) + +/// Mark rpc state as closed, and release its reference to the channel. +/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) +void rpc_close(Channel *channel)  { -  if (channel->closed) { +  if (channel->rpc.closed) {      return;    } -  channel->closed = true; +  channel->rpc.closed = true; +  channel_decref(channel); -  switch (channel->type) { -    case kChannelTypeSocket: -      stream_close(&channel->data.stream, NULL, NULL); -      break; -    case kChannelTypeProc: -      // Only close the rpc channel part, -      // there could be an error message on the stderr stream -      process_close_in(channel->data.proc); -      process_close_out(channel->data.proc); -      break; -    case kChannelTypeStdio: -      stream_close(&channel->data.std.in, NULL, NULL); -      stream_close(&channel->data.std.out, NULL, NULL); -      multiqueue_put(main_loop.fast_events, exit_event, 1, channel); -      return; -    case kChannelTypeInternal: -      // nothing to free. -      break; +  if (channel->streamtype == kChannelStreamStdio) { +    multiqueue_put(main_loop.fast_events, exit_event, 0);    } - -  decref(channel);  }  static void exit_event(void **argv)  { -  decref(argv[0]); -    if (!exiting) {      mch_exit(0);    }  } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel)  {    remote_ui_disconnect(channel->id); -  pmap_del(uint64_t)(channels, channel->id); -  msgpack_unpacker_free(channel->unpacker); +  msgpack_unpacker_free(channel->rpc.unpacker);    // Unsubscribe from all events    char *event_string; -  map_foreach_value(channel->subscribed_events, event_string, { +  map_foreach_value(channel->rpc.subscribed_events, event_string, {      unsubscribe(channel, event_string);    }); -  pmap_free(cstr_t)(channel->subscribed_events); -  kv_destroy(channel->call_stack); -  if (channel->type != kChannelTypeProc) { -    multiqueue_free(channel->events); -  } -  xfree(channel); -} - -static void close_cb(Stream *stream, void *data) -{ -  decref(data); -} - -/// @param source description of source function, rplugin name, TCP addr, etc -static Channel *register_channel(ChannelType type, uint64_t id, -                                 MultiQueue *events, char *source) -{ -  // Jobs and channels share the same id namespace. -  assert(id == 0 || !pmap_get(uint64_t)(channels, id)); -  Channel *rv = xmalloc(sizeof(Channel)); -  rv->events = events ? events : multiqueue_new_child(main_loop.events); -  rv->type = type; -  rv->refcount = 1; -  rv->closed = false; -  rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); -  rv->id = id > 0 ? id : next_chan_id++; -  rv->subscribed_events = pmap_new(cstr_t)(); -  rv->next_request_id = 1; -  kv_init(rv->call_stack); -  pmap_put(uint64_t)(channels, rv->id, rv); - -  ILOG("new channel %" PRIu64 " (%s): %s", rv->id, -       (type == kChannelTypeProc ? "proc" -        : (type == kChannelTypeSocket ? "socket" -           : (type == kChannelTypeStdio ? "stdio" -              : (type == kChannelTypeInternal ? "internal" : "?")))), -       (source ? source : "?")); - -  return rv; +  pmap_free(cstr_t)(channel->rpc.subscribed_events); +  kv_destroy(channel->rpc.call_stack);  }  static bool is_rpc_response(msgpack_object *obj) @@ -818,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj)  static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)  {    uint64_t response_id = obj->via.array.ptr[1].via.u64; +  if (kv_size(channel->rpc.call_stack) == 0) { +    return false; +  } +    // Must be equal to the frame at the stack's bottom -  return kv_size(channel->call_stack) && response_id -    == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; +  ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); +  return response_id == frame->request_id;  }  static void complete_call(msgpack_object *obj, Channel *channel)  { -  ChannelCallFrame *frame = kv_A(channel->call_stack, -                             kv_size(channel->call_stack) - 1); +  ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);    frame->returned = true;    frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -840,15 +592,15 @@ static void complete_call(msgpack_object *obj, Channel *channel)  static void call_set_error(Channel *channel, char *msg, int loglevel)  {    LOG(loglevel, "RPC: %s", msg); -  for (size_t i = 0; i < kv_size(channel->call_stack); i++) { -    ChannelCallFrame *frame = kv_A(channel->call_stack, i); +  for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { +    ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);      frame->returned = true;      frame->errored = true;      api_free_object(frame->result);      frame->result = STRING_OBJ(cstr_to_string(msg));    } -  close_channel(channel); +  channel_close(channel->id, kChannelPartRpc, NULL);  }  static WBuffer *serialize_request(uint64_t channel_id, @@ -890,18 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id,    return rv;  } -static void incref(Channel *channel) -{ -  channel->refcount++; -} - -static void decref(Channel *channel) -{ -  if (!(--channel->refcount)) { -    free_channel(channel); -  } -} -  #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL  #define REQ "[request]  "  #define RES "[response] "  | 
