diff options
| -rw-r--r-- | src/nvim/os/channel.c | 110 | ||||
| -rw-r--r-- | src/nvim/os/channel.h | 7 | ||||
| -rw-r--r-- | src/nvim/os/channel_defs.h | 8 | ||||
| -rw-r--r-- | src/nvim/os/server.c | 12 | ||||
| -rw-r--r-- | src/nvim/os/server.h | 5 | 
5 files changed, 35 insertions, 107 deletions
| diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index f275c70805..2923ab0912 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -5,7 +5,6 @@  #include "nvim/api/private/helpers.h"  #include "nvim/os/channel.h" -#include "nvim/os/channel_defs.h"  #include "nvim/os/rstream.h"  #include "nvim/os/rstream_defs.h"  #include "nvim/os/wstream.h" @@ -19,14 +18,9 @@  typedef struct {    uint64_t id; -  ChannelProtocol protocol;    bool is_job; -  union { -    struct { -      msgpack_unpacker *unpacker; -      msgpack_sbuffer *sbuffer; -    } msgpack; -  } proto; +  msgpack_unpacker *unpacker; +  msgpack_sbuffer *sbuffer;    union {      int job_id;      struct { @@ -44,7 +38,6 @@ static msgpack_sbuffer msgpack_event_buffer;  static void on_job_stdout(RStream *rstream, void *data, bool eof);  static void on_job_stderr(RStream *rstream, void *data, bool eof);  static void parse_msgpack(RStream *rstream, void *data, bool eof); -static void send_msgpack(Channel *channel, String type, Object data);  static void close_channel(Channel *channel);  static void close_cb(uv_handle_t *handle); @@ -67,48 +60,28 @@ void channel_teardown()    });  } -void channel_from_job(char **argv, ChannelProtocol prot) +void channel_from_job(char **argv)  {    Channel *channel = xmalloc(sizeof(Channel)); -  rstream_cb rcb = NULL; - -  switch (prot) { -    case kChannelProtocolMsgpack: -      rcb = on_job_stdout; -      channel->proto.msgpack.unpacker = -        msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); -      channel->proto.msgpack.sbuffer = msgpack_sbuffer_new(); -      break; -    default: -      abort(); -  } +  rstream_cb rcb = on_job_stdout; +  channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); +  channel->sbuffer = msgpack_sbuffer_new();    channel->id = next_id++; -  channel->protocol = prot;    channel->is_job = true;    channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);    map_put(uint64_t)(channels, channel->id, channel);  } -void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) +void channel_from_stream(uv_stream_t *stream)  {    Channel *channel = xmalloc(sizeof(Channel)); -  rstream_cb rcb = NULL; - -  switch (prot) { -    case kChannelProtocolMsgpack: -      rcb = parse_msgpack; -      channel->proto.msgpack.unpacker = -        msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); -      channel->proto.msgpack.sbuffer = msgpack_sbuffer_new(); -      break; -    default: -      abort(); -  } +  rstream_cb rcb = parse_msgpack; +  channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); +  channel->sbuffer = msgpack_sbuffer_new();    stream->data = NULL;    channel->id = next_id++; -  channel->protocol = prot;    channel->is_job = false;    // read stream    channel->data.streams.read = rstream_new(rcb, 1024, channel, true); @@ -131,16 +104,18 @@ bool channel_send_event(uint64_t id, char *type, typval_T *data)    String event_type = {.size = strnlen(type, 1024), .data = type};    Object event_data = vim_to_object(data); +  msgpack_packer packer; +  msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); +  msgpack_rpc_notification(event_type, event_data, &packer); +  char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); -  switch (channel->protocol) { -    case kChannelProtocolMsgpack: -      send_msgpack(channel, event_type, event_data); -      break; -    default: -      abort(); -  } +  wstream_write(channel->data.streams.write, +                bytes, +                msgpack_event_buffer.size, +                true);    msgpack_rpc_free_object(event_data); +  msgpack_sbuffer_clear(&msgpack_event_buffer);    return true;  } @@ -168,62 +143,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)    uint32_t count = rstream_available(rstream);    // Feed the unpacker with data -  msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count); -  rstream_read(rstream, -               msgpack_unpacker_buffer(channel->proto.msgpack.unpacker), -               count); -  msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count); +  msgpack_unpacker_reserve_buffer(channel->unpacker, count); +  rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count); +  msgpack_unpacker_buffer_consumed(channel->unpacker, count);    msgpack_unpacked unpacked;    msgpack_unpacked_init(&unpacked);    // Deserialize everything we can. -  while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) { +  while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {      // Each object is a new msgpack-rpc request and requires an empty response      msgpack_packer response; -    msgpack_packer_init(&response, -                        channel->proto.msgpack.sbuffer, -                        msgpack_sbuffer_write); +    msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);      // Perform the call      msgpack_rpc_call(channel->id, &unpacked.data, &response);      wstream_write(channel->data.streams.write, -                  xmemdup(channel->proto.msgpack.sbuffer->data, -                          channel->proto.msgpack.sbuffer->size), -                  channel->proto.msgpack.sbuffer->size, +                  xmemdup(channel->sbuffer->data, channel->sbuffer->size), +                  channel->sbuffer->size,                    true);      // Clear the buffer for future calls -    msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer); +    msgpack_sbuffer_clear(channel->sbuffer);    }  } -static void send_msgpack(Channel *channel, String type, Object data) -{ -  msgpack_packer packer; -  msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write); -  msgpack_rpc_notification(type, data, &packer); -  char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size); - -  wstream_write(channel->data.streams.write, -      bytes, -      msgpack_event_buffer.size, -      true); - -  msgpack_sbuffer_clear(&msgpack_event_buffer); -} -  static void close_channel(Channel *channel)  {    map_del(uint64_t)(channels, channel->id); - -  switch (channel->protocol) { -    case kChannelProtocolMsgpack: -      msgpack_sbuffer_free(channel->proto.msgpack.sbuffer); -      msgpack_unpacker_free(channel->proto.msgpack.unpacker); -      break; -    default: -      abort(); -  } +  msgpack_sbuffer_free(channel->sbuffer); +  msgpack_unpacker_free(channel->unpacker);    if (channel->is_job) {      job_stop(channel->data.job_id); diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h index 543b91dd89..a1c650a378 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/os/channel.h @@ -4,7 +4,6 @@  #include <uv.h>  #include "nvim/vim.h" -#include "nvim/os/channel_defs.h"  /// Initializes the module  void channel_init(void); @@ -16,15 +15,13 @@ void channel_teardown(void);  /// pipe/socket client connection  ///  /// @param stream The established connection -/// @param prot The rpc protocol used -void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot); +void channel_from_stream(uv_stream_t *stream);  /// Creates an API channel by starting a job and connecting to its  /// stdin/stdout. stderr is forwarded to the editor error stream.  ///  /// @param argv The argument vector for the process -/// @param prot The rpc protocol used -void channel_from_job(char **argv, ChannelProtocol prot); +void channel_from_job(char **argv);  /// Sends event/data to channel  /// diff --git a/src/nvim/os/channel_defs.h b/src/nvim/os/channel_defs.h deleted file mode 100644 index 9763e0f284..0000000000 --- a/src/nvim/os/channel_defs.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef NVIM_OS_CHANNEL_DEFS_H -#define NVIM_OS_CHANNEL_DEFS_H - -typedef enum { -  kChannelProtocolMsgpack -} ChannelProtocol; - -#endif  // NVIM_OS_CHANNEL_DEFS_H diff --git a/src/nvim/os/server.c b/src/nvim/os/server.c index 7b2326556c..ee9a2e592c 100644 --- a/src/nvim/os/server.c +++ b/src/nvim/os/server.c @@ -5,7 +5,6 @@  #include <uv.h> -#include "nvim/os/channel_defs.h"  #include "nvim/os/channel.h"  #include "nvim/os/server.h"  #include "nvim/os/os.h" @@ -25,8 +24,6 @@ typedef enum {  } ServerType;  typedef struct { -  // Protocol for channels established through this server -  ChannelProtocol protocol;    // Type of the union below    ServerType type; @@ -59,8 +56,7 @@ void server_init()      free(listen_address);    } -  server_start((char *)os_getenv("NEOVIM_LISTEN_ADDRESS"), -               kChannelProtocolMsgpack); +  server_start((char *)os_getenv("NEOVIM_LISTEN_ADDRESS"));  }  void server_teardown() @@ -80,7 +76,7 @@ void server_teardown()    });  } -void server_start(char *endpoint, ChannelProtocol prot) +void server_start(char *endpoint)  {    char addr[ADDRESS_MAX_SIZE]; @@ -101,8 +97,6 @@ void server_start(char *endpoint, ChannelProtocol prot)    Server *server = xmalloc(sizeof(Server));    char ip[16], *ip_end = strrchr(addr, ':'); -  server->protocol = prot; -    if (!ip_end) {      ip_end = strchr(addr, NUL);    } @@ -229,7 +223,7 @@ static void connection_cb(uv_stream_t *server, int status)      return;    } -  channel_from_stream(client, srv->protocol); +  channel_from_stream(client);  }  static void free_client(uv_handle_t *handle) diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h index 73c6bd1fea..f6270b42e9 100644 --- a/src/nvim/os/server.h +++ b/src/nvim/os/server.h @@ -1,8 +1,6 @@  #ifndef NVIM_OS_SERVER_H  #define NVIM_OS_SERVER_H -#include "nvim/os/channel_defs.h" -  /// Initializes the module  void server_init(); @@ -18,8 +16,7 @@ void server_teardown();  /// @param endpoint Address of the server. Either a 'ip:port' string or an  ///        arbitrary identifier(trimmed to 256 bytes) for the unix socket or  ///        named pipe. -/// @param prot The rpc protocol to be used -void server_start(char *endpoint, ChannelProtocol prot); +void server_start(char *endpoint);  /// Stops listening on the address specified by `endpoint`.  /// | 
