diff options
-rw-r--r-- | src/nvim/os/channel.c | 42 |
1 files changed, 20 insertions, 22 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 8a57a489c4..b8cceede6f 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -38,8 +38,8 @@ static Map(uint64_t) *channels = NULL; static Map(cstr_t) *event_strings = NULL; static msgpack_sbuffer msgpack_event_buffer; -static void on_job_stdout(RStream *rstream, void *data, bool eof); -static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void job_out(RStream *rstream, void *data, bool eof); +static void job_err(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof); static void send_event(Channel *channel, char *type, typval_T *data); static void broadcast_event(char *type, typval_T *data); @@ -47,6 +47,7 @@ static void unsubscribe(Channel *channel, char *event); static void close_channel(Channel *channel); static void close_cb(uv_handle_t *handle); static WBuffer *serialize_event(char *type, typval_T *data); +static Channel *register_channel(void); void channel_init() { @@ -70,38 +71,24 @@ void channel_teardown() void channel_from_job(char **argv) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = on_job_stdout; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - - channel->id = next_id++; - channel->subscribed_events = map_new(cstr_t)(); + Channel *channel = register_channel(); channel->is_job = true; - channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); - map_put(uint64_t)(channels, channel->id, channel); + channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL); } void channel_from_stream(uv_stream_t *stream) { - Channel *channel = xmalloc(sizeof(Channel)); - rstream_cb rcb = parse_msgpack; - channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - channel->sbuffer = msgpack_sbuffer_new(); - + Channel *channel = register_channel(); stream->data = NULL; - channel->id = next_id++; - channel->subscribed_events = map_new(cstr_t)(); channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(rcb, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream channel->data.streams.write = wstream_new(1024 * 1024); wstream_set_stream(channel->data.streams.write, stream); channel->data.streams.uv = stream; - map_put(uint64_t)(channels, channel->id, channel); } bool channel_send_event(uint64_t id, char *type, typval_T *data) @@ -149,13 +136,13 @@ void channel_unsubscribe(uint64_t id, char *event) unsubscribe(channel, event); } -static void on_job_stdout(RStream *rstream, void *data, bool eof) +static void job_out(RStream *rstream, void *data, bool eof) { Job *job = data; parse_msgpack(rstream, job_data(job), eof); } -static void on_job_stderr(RStream *rstream, void *data, bool eof) +static void job_err(RStream *rstream, void *data, bool eof) { // TODO(tarruda): plugin error messages should be sent to the error buffer } @@ -288,3 +275,14 @@ static WBuffer *serialize_event(char *type, typval_T *data) return rv; } + +static Channel *register_channel() +{ + Channel *rv = xmalloc(sizeof(Channel)); + rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rv->sbuffer = msgpack_sbuffer_new(); + rv->id = next_id++; + rv->subscribed_events = map_new(cstr_t)(); + map_put(uint64_t)(channels, rv->id, rv); + return rv; +} |