// This is an open source non-commercial project. Dear PVS-Studio, please check // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com #include "nvim/api/private/helpers.h" #include "nvim/api/ui.h" #include "nvim/channel.h" #include "nvim/eval.h" #include "nvim/eval/encode.h" #include "nvim/event/socket.h" #include "nvim/fileio.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" #include "nvim/os/shell.h" #include "nvim/path.h" #include "nvim/ascii.h" static bool did_stdio = false; PMap(uint64_t) *channels = NULL; /// next free id for a job or rpc channel /// 1 is reserved for stdio channel /// 2 is reserved for stderr channel static uint64_t next_chan_id = CHAN_STDERR+1; typedef struct { Channel *chan; Callback *callback; const char *type; // if reader is set, status is ignored. CallbackReader *reader; int status; } ChannelEvent; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "channel.c.generated.h" #endif /// Teardown the module void channel_teardown(void) { if (!channels) { return; } Channel *channel; map_foreach_value(channels, channel, { channel_close(channel->id, kChannelPartAll, NULL); }); } /// Closes a channel /// /// @param id The channel id /// @return true if successful, false otherwise bool channel_close(uint64_t id, ChannelPart part, const char **error) { Channel *chan; Process *proc; const char *dummy; if (!error) { error = &dummy; } if (!(chan = find_channel(id))) { if (id < next_chan_id) { // allow double close, even though we can't say what parts was valid. return true; } *error = (const char *)e_invchan; return false; } bool close_main = false; if (part == kChannelPartRpc || part == kChannelPartAll) { close_main = true; if (chan->is_rpc) { rpc_close(chan); } else if (part == kChannelPartRpc) { *error = (const char *)e_invstream; return false; } } else if ((part == kChannelPartStdin || part == kChannelPartStdout) && chan->is_rpc) { *error = (const char *)e_invstreamrpc; return false; } switch (chan->streamtype) { case kChannelStreamSocket: if (!close_main) { *error = (const char *)e_invstream; return false; } stream_may_close(&chan->stream.socket); break; case kChannelStreamProc: proc = (Process *)&chan->stream.proc; if (part == kChannelPartStdin || close_main) { stream_may_close(&proc->in); } if (part == kChannelPartStdout || close_main) { stream_may_close(&proc->out); } if (part == kChannelPartStderr || part == kChannelPartAll) { stream_may_close(&proc->err); } if (proc->type == kProcessTypePty && part == kChannelPartAll) { pty_process_close_master(&chan->stream.pty); } break; case kChannelStreamStdio: if (part == kChannelPartStdin || close_main) { stream_may_close(&chan->stream.stdio.in); } if (part == kChannelPartStdout || close_main) { stream_may_close(&chan->stream.stdio.out); } if (part == kChannelPartStderr) { *error = (const char *)e_invstream; return false; } break; case kChannelStreamStderr: if (part != kChannelPartAll && part != kChannelPartStderr) { *error = (const char *)e_invstream; return false; } if (!chan->stream.err.closed) { chan->stream.err.closed = true; // Don't close on exit, in case late error messages if (!exiting) { fclose(stderr); } channel_decref(chan); } break; case kChannelStreamInternal: if (!close_main) { *error = (const char *)e_invstream; return false; } break; default: abort(); } return true; } /// Initializes the module void channel_init(void) { channels = pmap_new(uint64_t)(); channel_alloc(kChannelStreamStderr); rpc_init(); } /// Allocates a channel. /// /// Channel is allocated with refcount 1, which should be decreased /// when the underlying stream closes. static Channel *channel_alloc(ChannelStreamType type) { Channel *chan = xcalloc(1, sizeof(*chan)); if (type == kChannelStreamStdio) { chan->id = CHAN_STDIO; } else if (type == kChannelStreamStderr) { chan->id = CHAN_STDERR; } else { chan->id = next_chan_id++; } chan->events = multiqueue_new_child(main_loop.events); chan->refcount = 1; chan->streamtype = type; pmap_put(uint64_t)(channels, chan->id, chan); return chan; } void channel_create_event(Channel *chan, const char *ext_source) { #if MIN_LOG_LEVEL <= INFO_LOG_LEVEL const char *source; if (ext_source) { // TODO(bfredl): in a future improved traceback solution, // external events should be included. source = ext_source; } else { eval_fmt_source_name_line((char *)IObuff, sizeof(IObuff)); source = (const char *)IObuff; } Dictionary info = channel_info(chan->id); typval_T tv = TV_INITIAL_VALUE; // TODO(bfredl): do the conversion in one step. Also would be nice // to pretty print top level dict in defined order (void)object_to_vim(DICTIONARY_OBJ(info), &tv, NULL); char *str = encode_tv2json(&tv, NULL); ILOG("new channel %" PRIu64 " (%s) : %s", chan->id, source, str); xfree(str); api_free_dictionary(info); #else (void)ext_source; #endif channel_info_changed(chan, true); } void channel_incref(Channel *chan) { chan->refcount++; } void channel_decref(Channel *chan) { if (!(--chan->refcount)) { // delay free, so that libuv is done with the handles multiqueue_put(main_loop.events, free_channel_event, 1, chan); } } void callback_reader_free(CallbackReader *reader) { callback_free(&reader->cb); ga_clear(&reader->buffer); } void callback_reader_start(CallbackReader *reader) { ga_init(&reader->buffer, sizeof(char *), 32); } static void free_channel_event(void **argv) { Channel *chan = argv[0]; if (chan->is_rpc) { rpc_free(chan); } callback_reader_free(&chan->on_stdout); callback_reader_free(&chan->on_stderr); callback_free(&chan->on_exit); pmap_del(uint64_t)(channels, chan->id); multiqueue_free(chan->events); xfree(chan); } static void channel_destroy_early(Channel *chan) { if ((chan->id != --next_chan_id)) { abort(); } pmap_del(uint64_t)(channels, chan->id); chan->id = 0; if ((--chan->refcount != 0)) { abort(); } // uv will keep a reference to handles until next loop tick, so delay free multiqueue_put(main_loop.events, free_channel_event, 1, chan); } static void close_cb(Stream *stream, void *data) { channel_decref(data); } Channel *channel_job_start(char **argv, CallbackReader on_stdout, CallbackReader on_stderr, Callback on_exit, bool pty, bool rpc, bool detach, const char *cwd, uint16_t pty_width, uint16_t pty_height, char *term_name, varnumber_T *status_out) { assert(cwd == NULL || os_isdir_executable(cwd)); Channel *chan = channel_alloc(kChannelStreamProc); chan->on_stdout = on_stdout; chan->on_stderr = on_stderr; chan->on_exit = on_exit; if (pty) { if (detach) { EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); shell_free_argv(argv); xfree(term_name); channel_destroy_early(chan); *status_out = 0; return NULL; } chan->stream.pty = pty_process_init(&main_loop, chan); if (pty_width > 0) { chan->stream.pty.width = pty_width; } if (pty_height > 0) { chan->stream.pty.height = pty_height; } if (term_name) { chan->stream.pty.term_name = term_name; } } else { chan->stream.uv = libuv_process_init(&main_loop, chan); } Process *proc = (Process *)&chan->stream.proc; proc->argv = argv; proc->cb = channel_process_exit_cb; proc->events = chan->events; proc->detach = detach; proc->cwd = cwd; char *cmd = xstrdup(proc->argv[0]); bool has_out, has_err; if (proc->type == kProcessTypePty) { has_out = true; has_err = false; } else { has_out = rpc || callback_reader_set(chan->on_stdout); has_err = callback_reader_set(chan->on_stderr); } int status = process_spawn(proc, true, has_out, has_err); if (status) { EMSG3(_(e_jobspawn), os_strerror(status), cmd); xfree(cmd); if (proc->type == kProcessTypePty) { xfree(chan->stream.pty.term_name); } channel_destroy_early(chan); *status_out = proc->status; return NULL; } xfree(cmd); wstream_init(&proc->in, 0); if (has_out) { rstream_init(&proc->out, 0); } if (rpc) { // the rpc takes over the in and out streams rpc_start(chan); } else { if (has_out) { callback_reader_start(&chan->on_stdout); rstream_start(&proc->out, on_job_stdout, chan); } } if (has_err) { callback_reader_start(&chan->on_stderr); rstream_init(&proc->err, 0); rstream_start(&proc->err, on_job_stderr, chan); } *status_out = (varnumber_T)chan->id; return chan; } uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader on_output, int timeout, const char **error) { Channel *channel; if (!tcp && rpc) { char *path = fix_fname(address); bool loopback = server_owns_pipe_address(path); xfree(path); if (loopback) { // Create a loopback channel. This avoids deadlock if nvim connects to // its own named pipe. channel = channel_alloc(kChannelStreamInternal); rpc_start(channel); goto end; } } channel = channel_alloc(kChannelStreamSocket); if (!socket_connect(&main_loop, &channel->stream.socket, tcp, address, timeout, error)) { channel_destroy_early(channel); return 0; } channel->stream.socket.internal_close_cb = close_cb; channel->stream.socket.internal_data = channel; wstream_init(&channel->stream.socket, 0); rstream_init(&channel->stream.socket, 0); if (rpc) { rpc_start(channel); } else { channel->on_stdout = on_output; callback_reader_start(&channel->on_stdout); rstream_start(&channel->stream.socket, on_socket_output, channel); } end: channel_create_event(channel, address); return channel->id; } /// Creates an RPC channel from a tcp/pipe socket connection /// /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { Channel *channel = channel_alloc(kChannelStreamSocket); socket_watcher_accept(watcher, &channel->stream.socket); channel->stream.socket.internal_close_cb = close_cb; channel->stream.socket.internal_data = channel; wstream_init(&channel->stream.socket, 0); rstream_init(&channel->stream.socket, 0); rpc_start(channel); channel_create_event(channel, watcher->addr); } /// Creates an API channel from stdin/stdout. This is used when embedding /// Neovim uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **error) FUNC_ATTR_NONNULL_ALL { if (!headless_mode && !embedded_mode) { *error = _("can only be opened in headless mode"); return 0; } if (did_stdio) { *error = _("channel was already open"); return 0; } did_stdio = true; Channel *channel = channel_alloc(kChannelStreamStdio); rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0); wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); if (rpc) { rpc_start(channel); } else { channel->on_stdout = on_output; callback_reader_start(&channel->on_stdout); rstream_start(&channel->stream.stdio.in, on_stdio_input, channel); } return channel->id; } /// @param data will be consumed size_t channel_send(uint64_t id, char *data, size_t len, const char **error) { Channel *chan = find_channel(id); if (!chan) { EMSG(_(e_invchan)); goto err; } if (chan->streamtype == kChannelStreamStderr) { if (chan->stream.err.closed) { *error = _("Can't send data to closed stream"); goto err; } // unbuffered write size_t written = fwrite(data, len, 1, stderr); xfree(data); return len * written; } Stream *in = channel_instream(chan); if (in->closed) { *error = _("Can't send data to closed stream"); goto err; } if (chan->is_rpc) { *error = _("Can't send raw data to rpc channel"); goto err; } WBuffer *buf = wstream_new_buffer(data, len, 1, xfree); return wstream_write(in, buf) ? len : 0; err: xfree(data); return 0; } /// Convert binary byte array to a readfile()-style list /// /// @param[in] buf Array to convert. /// @param[in] len Array length. /// /// @return [allocated] Converted list. static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE { list_T *const l = tv_list_alloc(kListLenMayKnow); // Empty buffer should be represented by [''], encode_list_write() thinks // empty list is fine for the case. tv_list_append_string(l, "", 0); if (len > 0) { encode_list_write(l, buf, len); } return l; } // vimscript job callbacks must be executed on Nvim main loop static inline void process_channel_event(Channel *chan, Callback *callback, const char *type, CallbackReader *reader, int status) { assert(callback); ChannelEvent *event_data = xmalloc(sizeof(*event_data)); event_data->reader = reader; event_data->status = status; channel_incref(chan); // Hold on ref to callback event_data->chan = chan; event_data->callback = callback; event_data->type = type; multiqueue_put(chan->events, on_channel_event, 1, event_data); } void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout"); } void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr"); } static void on_socket_output(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data"); } static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) { Channel *chan = data; on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin"); } /// @param type must have static lifetime static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, size_t count, bool eof, CallbackReader *reader, const char *type) { // stub variable, to keep reading consistent with the order of events, only // consider the count parameter. size_t r; char *ptr = rbuffer_read_ptr(buf, &r); if (eof) { if (reader->buffered) { if (reader->cb.type != kCallbackNone) { process_channel_event(chan, &reader->cb, type, reader, 0); } else if (reader->self) { if (tv_dict_find(reader->self, type, -1) == NULL) { list_T *data = buffer_to_tv_list(reader->buffer.ga_data, (size_t)reader->buffer.ga_len); tv_dict_add_list(reader->self, type, strlen(type), data); } else { // can't display error message now, defer it. channel_incref(chan); multiqueue_put(chan->events, on_buffered_error, 2, chan, type); } ga_clear(&reader->buffer); } else { abort(); } } else if (reader->cb.type != kCallbackNone) { process_channel_event(chan, &reader->cb, type, reader, 0); } return; } // The order here matters, the terminal must receive the data first because // process_channel_event will modify the read buffer(convert NULs into NLs) if (chan->term) { terminal_receive(chan->term, ptr, count); terminal_flush_output(chan->term); } rbuffer_consumed(buf, count); if (callback_reader_set(*reader) || reader->buffered) { // if buffer wasn't consumed, a pending callback is stalled. Aggregate the // received data and avoid a "burst" of multiple callbacks. bool buffer_set = reader->buffer.ga_len > 0; ga_concat_len(&reader->buffer, ptr, count); if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) { process_channel_event(chan, &reader->cb, type, reader, 0); } } } static void on_buffered_error(void **args) { Channel *chan = (Channel *)args[0]; const char *stream = (const char *)args[1]; EMSG3(_(e_streamkey), stream, chan->id); channel_decref(chan); } static void channel_process_exit_cb(Process *proc, int status, void *data) { Channel *chan = data; if (chan->term) { char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); terminal_close(chan->term, msg); } // If process did not exit, we only closed the handle of a detached process. bool exited = (status >= 0); if (exited) { process_channel_event(chan, &chan->on_exit, "exit", NULL, status); } channel_decref(chan); } static void on_channel_event(void **args) { ChannelEvent *ev = (ChannelEvent *)args[0]; typval_T argv[4]; argv[0].v_type = VAR_NUMBER; argv[0].v_lock = VAR_UNLOCKED; argv[0].vval.v_number = (varnumber_T)ev->chan->id; if (ev->reader) { argv[1].v_type = VAR_LIST; argv[1].v_lock = VAR_UNLOCKED; argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data, (size_t)ev->reader->buffer.ga_len); tv_list_ref(argv[1].vval.v_list); ga_clear(&ev->reader->buffer); } else { argv[1].v_type = VAR_NUMBER; argv[1].v_lock = VAR_UNLOCKED; argv[1].vval.v_number = ev->status; } argv[2].v_type = VAR_STRING; argv[2].v_lock = VAR_UNLOCKED; argv[2].vval.v_string = (uint8_t *)ev->type; typval_T rettv = TV_INITIAL_VALUE; callback_call(ev->callback, 3, argv, &rettv); tv_clear(&rettv); channel_decref(ev->chan); xfree(ev); } /// Open terminal for channel /// /// Channel `chan` is assumed to be an open pty channel, /// and curbuf is assumed to be a new, unmodified buffer. void channel_terminal_open(Channel *chan) { TerminalOptions topts; topts.data = chan; topts.width = chan->stream.pty.width; topts.height = chan->stream.pty.height; topts.write_cb = term_write; topts.resize_cb = term_resize; topts.close_cb = term_close; curbuf->b_p_channel = (long)chan->id; // 'channel' option Terminal *term = terminal_open(topts); chan->term = term; channel_incref(chan); } static void term_write(char *buf, size_t size, void *data) { Channel *chan = data; if (chan->stream.proc.in.closed) { // If the backing stream was closed abruptly, there may be write events // ahead of the terminal close event. Just ignore the writes. ILOG("write failed: stream is closed"); return; } WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); wstream_write(&chan->stream.proc.in, wbuf); } static void term_resize(uint16_t width, uint16_t height, void *data) { Channel *chan = data; pty_process_resize(&chan->stream.pty, width, height); } static inline void term_delayed_free(void **argv) { Channel *chan = argv[0]; if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) { multiqueue_put(chan->events, term_delayed_free, 1, chan); return; } terminal_destroy(chan->term); chan->term = NULL; channel_decref(chan); } static void term_close(void *data) { Channel *chan = data; process_stop(&chan->stream.proc); multiqueue_put(chan->events, term_delayed_free, 1, data); } void channel_info_changed(Channel *chan, bool new) { event_T event = new ? EVENT_CHANOPEN : EVENT_CHANINFO; if (has_event(event)) { channel_incref(chan); multiqueue_put(main_loop.events, set_info_event, 2, chan, event); } } static void set_info_event(void **argv) { Channel *chan = argv[0]; event_T event = (event_T)(ptrdiff_t)argv[1]; dict_T *dict = get_vim_var_dict(VV_EVENT); Dictionary info = channel_info(chan->id); typval_T retval; (void)object_to_vim(DICTIONARY_OBJ(info), &retval, NULL); tv_dict_add_dict(dict, S_LEN("info"), retval.vval.v_dict); apply_autocmds(event, NULL, NULL, false, curbuf); tv_dict_clear(dict); api_free_dictionary(info); channel_decref(chan); } Dictionary channel_info(uint64_t id) { Channel *chan = find_channel(id); if (!chan) { return (Dictionary)ARRAY_DICT_INIT; } Dictionary info = ARRAY_DICT_INIT; PUT(info, "id", INTEGER_OBJ((Integer)chan->id)); const char *stream_desc, *mode_desc; switch (chan->streamtype) { case kChannelStreamProc: stream_desc = "job"; if (chan->stream.proc.type == kProcessTypePty) { const char *name = pty_process_tty_name(&chan->stream.pty); PUT(info, "pty", STRING_OBJ(cstr_to_string(name))); } break; case kChannelStreamStdio: stream_desc = "stdio"; break; case kChannelStreamStderr: stream_desc = "stderr"; break; case kChannelStreamInternal: PUT(info, "internal", BOOLEAN_OBJ(true)); FALLTHROUGH; case kChannelStreamSocket: stream_desc = "socket"; break; default: abort(); } PUT(info, "stream", STRING_OBJ(cstr_to_string(stream_desc))); if (chan->is_rpc) { mode_desc = "rpc"; PUT(info, "client", DICTIONARY_OBJ(rpc_client_info(chan))); } else if (chan->term) { mode_desc = "terminal"; PUT(info, "buffer", BUFFER_OBJ(terminal_buf(chan->term))); } else { mode_desc = "bytes"; } PUT(info, "mode", STRING_OBJ(cstr_to_string(mode_desc))); return info; } Array channel_all_info(void) { Channel *channel; Array ret = ARRAY_DICT_INIT; map_foreach_value(channels, channel, { ADD(ret, DICTIONARY_OBJ(channel_info(channel->id))); }); return ret; }