diff options
| -rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 51 | ||||
| -rw-r--r-- | src/nvim/os/wstream.c | 6 | 
2 files changed, 25 insertions, 32 deletions
| diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index b1f0798528..35549ce042 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -59,6 +59,7 @@ typedef struct {    } data;    uint64_t next_request_id;    kvec_t(ChannelCallFrame *) call_stack; +  kvec_t(WBuffer *) delayed_notifications;  } Channel;  typedef struct { @@ -68,18 +69,10 @@ typedef struct {    uint64_t request_id;  } RequestEvent; -typedef struct { -  Channel *channel; -  String method; -  Array args; -} DelayedNotification; -  #define _noop(x)  KMEMPOOL_INIT(RequestEventPool, RequestEvent, _noop) -KLIST_INIT(DelayedNotification, DelayedNotification, _noop) -  static kmempool_t(RequestEventPool) *request_event_pool = NULL; -static klist_t(DelayedNotification) *delayed_notifications = NULL; +  static uint64_t next_id = 1;  static PMap(uint64_t) *channels = NULL;  static PMap(cstr_t) *event_strings = NULL; @@ -93,7 +86,6 @@ static msgpack_sbuffer out_buffer;  void channel_init(void)  {    request_event_pool = kmp_init(RequestEventPool); -  delayed_notifications = kl_init(DelayedNotification);    channels = pmap_new(uint64_t)();    event_strings = pmap_new(cstr_t)();    msgpack_sbuffer_init(&out_buffer); @@ -191,13 +183,10 @@ bool channel_send_event(uint64_t id, char *name, Array args)    if (channel) {      if (channel->pending_requests) { -      DelayedNotification p = { -        .channel = channel, -        .method = cstr_to_string(name), -        .args = args -      }; -      // Pending request, queue the notification for sending later -      *kl_pushp(DelayedNotification, delayed_notifications) = p; +      // Pending request, queue the notification for later sending. +      String method = cstr_as_string(name); +      WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1); +      kv_push(WBuffer *, channel->delayed_notifications, buffer);      } else {        send_event(channel, name, args);      } @@ -248,7 +237,7 @@ Object channel_send_call(uint64_t id,    }    if (!channel->pending_requests) { -    send_delayed_notifications(); +    send_delayed_notifications(channel);    }    decref(channel); @@ -506,6 +495,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)    bool success;    if (channel->closed) { +    wstream_release_wbuffer(buffer);      return false;    } @@ -593,7 +583,12 @@ static void broadcast_event(char *name, Array args)                                        kv_size(subscribed));    for (size_t i = 0; i < kv_size(subscribed); i++) { -    channel_write(kv_A(subscribed, i), buffer); +    Channel *channel = kv_A(subscribed, i); +    if (channel->pending_requests) { +      kv_push(WBuffer *, channel->delayed_notifications, buffer); +    } else { +      channel_write(channel, buffer); +    }    }  end: @@ -666,6 +661,7 @@ static void free_channel(Channel *channel)    pmap_free(cstr_t)(channel->subscribed_events);    kv_destroy(channel->call_stack); +  kv_destroy(channel->delayed_notifications);    free(channel);  } @@ -686,6 +682,7 @@ static Channel *register_channel(void)    rv->subscribed_events = pmap_new(cstr_t)();    rv->next_request_id = 1;    kv_init(rv->call_stack); +  kv_init(rv->delayed_notifications);    pmap_put(uint64_t)(channels, rv->id, rv);    return rv;  } @@ -773,18 +770,14 @@ static WBuffer *serialize_response(uint64_t channel_id,    return rv;  } -static void send_delayed_notifications(void) +static void send_delayed_notifications(Channel* channel)  { -  DelayedNotification p; - -  while (kl_shift(DelayedNotification, delayed_notifications, &p) == 0) { -    if (p.channel) { -      send_event(p.channel, p.method.data, p.args); -    } else { -      broadcast_event(p.method.data, p.args); -    } -    free(p.method.data); +  for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) { +    WBuffer *buffer = kv_A(channel->delayed_notifications, i); +    channel_write(channel, buffer);    } + +  kv_size(channel->delayed_notifications) = 0;  }  static void incref(Channel *channel) diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 90d4ebeec8..13c6c0429f 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -181,7 +181,7 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)    return true;  err: -  release_wbuffer(buffer); +  wstream_release_wbuffer(buffer);    return false;  } @@ -217,7 +217,7 @@ static void write_cb(uv_write_t *req, int status)    data->wstream->curmem -= data->buffer->size; -  release_wbuffer(data->buffer); +  wstream_release_wbuffer(data->buffer);    if (data->wstream->cb) {      data->wstream->cb(data->wstream, @@ -239,7 +239,7 @@ static void write_cb(uv_write_t *req, int status)    kmp_free(WRequestPool, wrequest_pool, data);  } -static void release_wbuffer(WBuffer *buffer) +void wstream_release_wbuffer(WBuffer *buffer)  {    if (!--buffer->refcount) {      if (buffer->cb) { | 
