aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r--src/nvim/channel.c831
1 files changed, 831 insertions, 0 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
new file mode 100644
index 0000000000..8b8d27affd
--- /dev/null
+++ b/src/nvim/channel.c
@@ -0,0 +1,831 @@
+// This is an open source non-commercial project. Dear PVS-Studio, please check
+// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+
+#include "nvim/api/private/helpers.h"
+#include "nvim/api/ui.h"
+#include "nvim/channel.h"
+#include "nvim/eval.h"
+#include "nvim/eval/encode.h"
+#include "nvim/event/socket.h"
+#include "nvim/fileio.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
+#include "nvim/os/shell.h"
+#include "nvim/path.h"
+#include "nvim/ascii.h"
+
+static bool did_stdio = false;
+PMap(uint64_t) *channels = NULL;
+
+/// next free id for a job or rpc channel
+/// 1 is reserved for stdio channel
+/// 2 is reserved for stderr channel
+static uint64_t next_chan_id = CHAN_STDERR+1;
+
+
+typedef struct {
+ Channel *chan;
+ Callback *callback;
+ const char *type;
+ // if reader is set, status is ignored.
+ CallbackReader *reader;
+ int status;
+} ChannelEvent;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "channel.c.generated.h"
+#endif
+/// Teardown the module
+void channel_teardown(void)
+{
+ if (!channels) {
+ return;
+ }
+
+ Channel *channel;
+
+ map_foreach_value(channels, channel, {
+ channel_close(channel->id, kChannelPartAll, NULL);
+ });
+}
+
+/// Closes a channel
+///
+/// @param id The channel id
+/// @return true if successful, false otherwise
+bool channel_close(uint64_t id, ChannelPart part, const char **error)
+{
+ Channel *chan;
+ Process *proc;
+
+ const char *dummy;
+ if (!error) {
+ error = &dummy;
+ }
+
+ if (!(chan = find_channel(id))) {
+ if (id < next_chan_id) {
+ // allow double close, even though we can't say what parts was valid.
+ return true;
+ }
+ *error = (const char *)e_invchan;
+ return false;
+ }
+
+ bool close_main = false;
+ if (part == kChannelPartRpc || part == kChannelPartAll) {
+ close_main = true;
+ if (chan->is_rpc) {
+ rpc_close(chan);
+ } else if (part == kChannelPartRpc) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ } else if ((part == kChannelPartStdin || part == kChannelPartStdout)
+ && chan->is_rpc) {
+ *error = (const char *)e_invstreamrpc;
+ return false;
+ }
+
+ switch (chan->streamtype) {
+ case kChannelStreamSocket:
+ if (!close_main) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ stream_may_close(&chan->stream.socket);
+ break;
+
+ case kChannelStreamProc:
+ proc = (Process *)&chan->stream.proc;
+ if (part == kChannelPartStdin || close_main) {
+ stream_may_close(&proc->in);
+ }
+ if (part == kChannelPartStdout || close_main) {
+ stream_may_close(&proc->out);
+ }
+ if (part == kChannelPartStderr || part == kChannelPartAll) {
+ stream_may_close(&proc->err);
+ }
+ if (proc->type == kProcessTypePty && part == kChannelPartAll) {
+ pty_process_close_master(&chan->stream.pty);
+ }
+
+ break;
+
+ case kChannelStreamStdio:
+ if (part == kChannelPartStdin || close_main) {
+ stream_may_close(&chan->stream.stdio.in);
+ }
+ if (part == kChannelPartStdout || close_main) {
+ stream_may_close(&chan->stream.stdio.out);
+ }
+ if (part == kChannelPartStderr) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ break;
+
+ case kChannelStreamStderr:
+ if (part != kChannelPartAll && part != kChannelPartStderr) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ if (!chan->stream.err.closed) {
+ chan->stream.err.closed = true;
+ // Don't close on exit, in case late error messages
+ if (!exiting) {
+ fclose(stderr);
+ }
+ channel_decref(chan);
+ }
+ break;
+
+ case kChannelStreamInternal:
+ if (!close_main) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ break;
+
+ default:
+ abort();
+ }
+
+ return true;
+}
+
+/// Initializes the module
+void channel_init(void)
+{
+ channels = pmap_new(uint64_t)();
+ channel_alloc(kChannelStreamStderr);
+ rpc_init();
+}
+
+/// Allocates a channel.
+///
+/// Channel is allocated with refcount 1, which should be decreased
+/// when the underlying stream closes.
+static Channel *channel_alloc(ChannelStreamType type)
+{
+ Channel *chan = xcalloc(1, sizeof(*chan));
+ if (type == kChannelStreamStdio) {
+ chan->id = CHAN_STDIO;
+ } else if (type == kChannelStreamStderr) {
+ chan->id = CHAN_STDERR;
+ } else {
+ chan->id = next_chan_id++;
+ }
+ chan->events = multiqueue_new_child(main_loop.events);
+ chan->refcount = 1;
+ chan->streamtype = type;
+ pmap_put(uint64_t)(channels, chan->id, chan);
+ return chan;
+}
+
+void channel_create_event(Channel *chan, const char *ext_source)
+{
+#if MIN_LOG_LEVEL <= INFO_LOG_LEVEL
+ const char *source;
+
+ if (ext_source) {
+ // TODO(bfredl): in a future improved traceback solution,
+ // external events should be included.
+ source = ext_source;
+ } else {
+ eval_fmt_source_name_line((char *)IObuff, sizeof(IObuff));
+ source = (const char *)IObuff;
+ }
+
+ Dictionary info = channel_info(chan->id);
+ typval_T tv = TV_INITIAL_VALUE;
+ // TODO(bfredl): do the conversion in one step. Also would be nice
+ // to pretty print top level dict in defined order
+ (void)object_to_vim(DICTIONARY_OBJ(info), &tv, NULL);
+ char *str = encode_tv2json(&tv, NULL);
+ ILOG("new channel %" PRIu64 " (%s) : %s", chan->id, source, str);
+ xfree(str);
+ api_free_dictionary(info);
+
+#else
+ (void)ext_source;
+#endif
+
+ channel_info_changed(chan, true);
+}
+
+void channel_incref(Channel *chan)
+{
+ chan->refcount++;
+}
+
+void channel_decref(Channel *chan)
+{
+ if (!(--chan->refcount)) {
+ // delay free, so that libuv is done with the handles
+ multiqueue_put(main_loop.events, free_channel_event, 1, chan);
+ }
+}
+
+void callback_reader_free(CallbackReader *reader)
+{
+ callback_free(&reader->cb);
+ ga_clear(&reader->buffer);
+}
+
+void callback_reader_start(CallbackReader *reader)
+{
+ ga_init(&reader->buffer, sizeof(char *), 32);
+}
+
+static void free_channel_event(void **argv)
+{
+ Channel *chan = argv[0];
+ if (chan->is_rpc) {
+ rpc_free(chan);
+ }
+
+ callback_reader_free(&chan->on_stdout);
+ callback_reader_free(&chan->on_stderr);
+ callback_free(&chan->on_exit);
+
+ pmap_del(uint64_t)(channels, chan->id);
+ multiqueue_free(chan->events);
+ xfree(chan);
+}
+
+static void channel_destroy_early(Channel *chan)
+{
+ if ((chan->id != --next_chan_id)) {
+ abort();
+ }
+ pmap_del(uint64_t)(channels, chan->id);
+ chan->id = 0;
+
+ if ((--chan->refcount != 0)) {
+ abort();
+ }
+
+ // uv will keep a reference to handles until next loop tick, so delay free
+ multiqueue_put(main_loop.events, free_channel_event, 1, chan);
+}
+
+
+static void close_cb(Stream *stream, void *data)
+{
+ channel_decref(data);
+}
+
+Channel *channel_job_start(char **argv, CallbackReader on_stdout,
+ CallbackReader on_stderr, Callback on_exit,
+ bool pty, bool rpc, bool detach, const char *cwd,
+ uint16_t pty_width, uint16_t pty_height,
+ char *term_name, varnumber_T *status_out)
+{
+ assert(cwd == NULL || os_isdir_executable(cwd));
+
+ Channel *chan = channel_alloc(kChannelStreamProc);
+ chan->on_stdout = on_stdout;
+ chan->on_stderr = on_stderr;
+ chan->on_exit = on_exit;
+
+ if (pty) {
+ if (detach) {
+ EMSG2(_(e_invarg2), "terminal/pty job cannot be detached");
+ shell_free_argv(argv);
+ xfree(term_name);
+ channel_destroy_early(chan);
+ *status_out = 0;
+ return NULL;
+ }
+ chan->stream.pty = pty_process_init(&main_loop, chan);
+ if (pty_width > 0) {
+ chan->stream.pty.width = pty_width;
+ }
+ if (pty_height > 0) {
+ chan->stream.pty.height = pty_height;
+ }
+ if (term_name) {
+ chan->stream.pty.term_name = term_name;
+ }
+ } else {
+ chan->stream.uv = libuv_process_init(&main_loop, chan);
+ }
+
+ Process *proc = (Process *)&chan->stream.proc;
+ proc->argv = argv;
+ proc->cb = channel_process_exit_cb;
+ proc->events = chan->events;
+ proc->detach = detach;
+ proc->cwd = cwd;
+
+ char *cmd = xstrdup(proc->argv[0]);
+ bool has_out, has_err;
+ if (proc->type == kProcessTypePty) {
+ has_out = true;
+ has_err = false;
+ } else {
+ has_out = rpc || callback_reader_set(chan->on_stdout);
+ has_err = callback_reader_set(chan->on_stderr);
+ }
+ int status = process_spawn(proc, true, has_out, has_err);
+ if (status) {
+ EMSG3(_(e_jobspawn), os_strerror(status), cmd);
+ xfree(cmd);
+ if (proc->type == kProcessTypePty) {
+ xfree(chan->stream.pty.term_name);
+ }
+ channel_destroy_early(chan);
+ *status_out = proc->status;
+ return NULL;
+ }
+ xfree(cmd);
+
+ wstream_init(&proc->in, 0);
+ if (has_out) {
+ rstream_init(&proc->out, 0);
+ }
+
+ if (rpc) {
+ // the rpc takes over the in and out streams
+ rpc_start(chan);
+ } else {
+ if (has_out) {
+ callback_reader_start(&chan->on_stdout);
+ rstream_start(&proc->out, on_job_stdout, chan);
+ }
+ }
+
+ if (has_err) {
+ callback_reader_start(&chan->on_stderr);
+ rstream_init(&proc->err, 0);
+ rstream_start(&proc->err, on_job_stderr, chan);
+ }
+
+ *status_out = (varnumber_T)chan->id;
+ return chan;
+}
+
+
+uint64_t channel_connect(bool tcp, const char *address,
+ bool rpc, CallbackReader on_output,
+ int timeout, const char **error)
+{
+ Channel *channel;
+
+ if (!tcp && rpc) {
+ char *path = fix_fname(address);
+ bool loopback = server_owns_pipe_address(path);
+ xfree(path);
+ if (loopback) {
+ // Create a loopback channel. This avoids deadlock if nvim connects to
+ // its own named pipe.
+ channel = channel_alloc(kChannelStreamInternal);
+ rpc_start(channel);
+ goto end;
+ }
+ }
+
+ channel = channel_alloc(kChannelStreamSocket);
+ if (!socket_connect(&main_loop, &channel->stream.socket,
+ tcp, address, timeout, error)) {
+ channel_destroy_early(channel);
+ return 0;
+ }
+
+ channel->stream.socket.internal_close_cb = close_cb;
+ channel->stream.socket.internal_data = channel;
+ wstream_init(&channel->stream.socket, 0);
+ rstream_init(&channel->stream.socket, 0);
+
+ if (rpc) {
+ rpc_start(channel);
+ } else {
+ channel->on_stdout = on_output;
+ callback_reader_start(&channel->on_stdout);
+ rstream_start(&channel->stream.socket, on_socket_output, channel);
+ }
+
+end:
+ channel_create_event(channel, address);
+ return channel->id;
+}
+
+/// Creates an RPC channel from a tcp/pipe socket connection
+///
+/// @param watcher The SocketWatcher ready to accept the connection
+void channel_from_connection(SocketWatcher *watcher)
+{
+ Channel *channel = channel_alloc(kChannelStreamSocket);
+ socket_watcher_accept(watcher, &channel->stream.socket);
+ channel->stream.socket.internal_close_cb = close_cb;
+ channel->stream.socket.internal_data = channel;
+ wstream_init(&channel->stream.socket, 0);
+ rstream_init(&channel->stream.socket, 0);
+ rpc_start(channel);
+ channel_create_event(channel, watcher->addr);
+}
+
+/// Creates an API channel from stdin/stdout. This is used when embedding
+/// Neovim
+uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
+ const char **error)
+ FUNC_ATTR_NONNULL_ALL
+{
+ if (!headless_mode && !embedded_mode) {
+ *error = _("can only be opened in headless mode");
+ return 0;
+ }
+
+ if (did_stdio) {
+ *error = _("channel was already open");
+ return 0;
+ }
+ did_stdio = true;
+
+ Channel *channel = channel_alloc(kChannelStreamStdio);
+
+ rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0);
+ wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0);
+
+ if (rpc) {
+ rpc_start(channel);
+ } else {
+ channel->on_stdout = on_output;
+ callback_reader_start(&channel->on_stdout);
+ rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
+ }
+
+ return channel->id;
+}
+
+/// @param data will be consumed
+size_t channel_send(uint64_t id, char *data, size_t len, const char **error)
+{
+ Channel *chan = find_channel(id);
+ if (!chan) {
+ EMSG(_(e_invchan));
+ goto err;
+ }
+
+ if (chan->streamtype == kChannelStreamStderr) {
+ if (chan->stream.err.closed) {
+ *error = _("Can't send data to closed stream");
+ goto err;
+ }
+ // unbuffered write
+ size_t written = fwrite(data, len, 1, stderr);
+ xfree(data);
+ return len * written;
+ }
+
+
+ Stream *in = channel_instream(chan);
+ if (in->closed) {
+ *error = _("Can't send data to closed stream");
+ goto err;
+ }
+
+ if (chan->is_rpc) {
+ *error = _("Can't send raw data to rpc channel");
+ goto err;
+ }
+
+ WBuffer *buf = wstream_new_buffer(data, len, 1, xfree);
+ return wstream_write(in, buf) ? len : 0;
+
+err:
+ xfree(data);
+ return 0;
+}
+
+/// Convert binary byte array to a readfile()-style list
+///
+/// @param[in] buf Array to convert.
+/// @param[in] len Array length.
+///
+/// @return [allocated] Converted list.
+static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
+ FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE
+{
+ list_T *const l = tv_list_alloc(kListLenMayKnow);
+ // Empty buffer should be represented by [''], encode_list_write() thinks
+ // empty list is fine for the case.
+ tv_list_append_string(l, "", 0);
+ if (len > 0) {
+ encode_list_write(l, buf, len);
+ }
+ return l;
+}
+
+// vimscript job callbacks must be executed on Nvim main loop
+static inline void process_channel_event(Channel *chan, Callback *callback,
+ const char *type,
+ CallbackReader *reader, int status)
+{
+ assert(callback);
+ ChannelEvent *event_data = xmalloc(sizeof(*event_data));
+ event_data->reader = reader;
+ event_data->status = status;
+ channel_incref(chan); // Hold on ref to callback
+ event_data->chan = chan;
+ event_data->callback = callback;
+ event_data->type = type;
+
+ multiqueue_put(chan->events, on_channel_event, 1, event_data);
+}
+
+void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
+}
+
+void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
+}
+
+static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
+}
+
+static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
+ void *data, bool eof)
+{
+ Channel *chan = data;
+ on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
+}
+
+/// @param type must have static lifetime
+static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
+ size_t count, bool eof, CallbackReader *reader,
+ const char *type)
+{
+ // stub variable, to keep reading consistent with the order of events, only
+ // consider the count parameter.
+ size_t r;
+ char *ptr = rbuffer_read_ptr(buf, &r);
+
+ if (eof) {
+ if (reader->buffered) {
+ if (reader->cb.type != kCallbackNone) {
+ process_channel_event(chan, &reader->cb, type, reader, 0);
+ } else if (reader->self) {
+ if (tv_dict_find(reader->self, type, -1) == NULL) {
+ list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
+ (size_t)reader->buffer.ga_len);
+ tv_dict_add_list(reader->self, type, strlen(type), data);
+ } else {
+ // can't display error message now, defer it.
+ channel_incref(chan);
+ multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
+ }
+ ga_clear(&reader->buffer);
+ } else {
+ abort();
+ }
+ } else if (reader->cb.type != kCallbackNone) {
+ process_channel_event(chan, &reader->cb, type, reader, 0);
+ }
+ return;
+ }
+
+ // The order here matters, the terminal must receive the data first because
+ // process_channel_event will modify the read buffer(convert NULs into NLs)
+ if (chan->term) {
+ terminal_receive(chan->term, ptr, count);
+ terminal_flush_output(chan->term);
+ }
+
+ rbuffer_consumed(buf, count);
+
+ if (callback_reader_set(*reader) || reader->buffered) {
+ // if buffer wasn't consumed, a pending callback is stalled. Aggregate the
+ // received data and avoid a "burst" of multiple callbacks.
+ bool buffer_set = reader->buffer.ga_len > 0;
+ ga_concat_len(&reader->buffer, ptr, count);
+ if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) {
+ process_channel_event(chan, &reader->cb, type, reader, 0);
+ }
+ }
+}
+
+static void on_buffered_error(void **args)
+{
+ Channel *chan = (Channel *)args[0];
+ const char *stream = (const char *)args[1];
+ EMSG3(_(e_streamkey), stream, chan->id);
+ channel_decref(chan);
+}
+
+static void channel_process_exit_cb(Process *proc, int status, void *data)
+{
+ Channel *chan = data;
+ if (chan->term) {
+ char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN];
+ snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
+ terminal_close(chan->term, msg);
+ }
+
+ // If process did not exit, we only closed the handle of a detached process.
+ bool exited = (status >= 0);
+ if (exited) {
+ process_channel_event(chan, &chan->on_exit, "exit", NULL, status);
+ }
+
+ channel_decref(chan);
+}
+
+static void on_channel_event(void **args)
+{
+ ChannelEvent *ev = (ChannelEvent *)args[0];
+
+ typval_T argv[4];
+
+ argv[0].v_type = VAR_NUMBER;
+ argv[0].v_lock = VAR_UNLOCKED;
+ argv[0].vval.v_number = (varnumber_T)ev->chan->id;
+
+ if (ev->reader) {
+ argv[1].v_type = VAR_LIST;
+ argv[1].v_lock = VAR_UNLOCKED;
+ argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data,
+ (size_t)ev->reader->buffer.ga_len);
+ tv_list_ref(argv[1].vval.v_list);
+ ga_clear(&ev->reader->buffer);
+ } else {
+ argv[1].v_type = VAR_NUMBER;
+ argv[1].v_lock = VAR_UNLOCKED;
+ argv[1].vval.v_number = ev->status;
+ }
+
+ argv[2].v_type = VAR_STRING;
+ argv[2].v_lock = VAR_UNLOCKED;
+ argv[2].vval.v_string = (uint8_t *)ev->type;
+
+ typval_T rettv = TV_INITIAL_VALUE;
+ callback_call(ev->callback, 3, argv, &rettv);
+ tv_clear(&rettv);
+ channel_decref(ev->chan);
+ xfree(ev);
+}
+
+
+/// Open terminal for channel
+///
+/// Channel `chan` is assumed to be an open pty channel,
+/// and curbuf is assumed to be a new, unmodified buffer.
+void channel_terminal_open(Channel *chan)
+{
+ TerminalOptions topts;
+ topts.data = chan;
+ topts.width = chan->stream.pty.width;
+ topts.height = chan->stream.pty.height;
+ topts.write_cb = term_write;
+ topts.resize_cb = term_resize;
+ topts.close_cb = term_close;
+ curbuf->b_p_channel = (long)chan->id; // 'channel' option
+ Terminal *term = terminal_open(topts);
+ chan->term = term;
+ channel_incref(chan);
+}
+
+static void term_write(char *buf, size_t size, void *data)
+{
+ Channel *chan = data;
+ if (chan->stream.proc.in.closed) {
+ // If the backing stream was closed abruptly, there may be write events
+ // ahead of the terminal close event. Just ignore the writes.
+ ILOG("write failed: stream is closed");
+ return;
+ }
+ WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
+ wstream_write(&chan->stream.proc.in, wbuf);
+}
+
+static void term_resize(uint16_t width, uint16_t height, void *data)
+{
+ Channel *chan = data;
+ pty_process_resize(&chan->stream.pty, width, height);
+}
+
+static inline void term_delayed_free(void **argv)
+{
+ Channel *chan = argv[0];
+ if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) {
+ multiqueue_put(chan->events, term_delayed_free, 1, chan);
+ return;
+ }
+
+ terminal_destroy(chan->term);
+ chan->term = NULL;
+ channel_decref(chan);
+}
+
+static void term_close(void *data)
+{
+ Channel *chan = data;
+ process_stop(&chan->stream.proc);
+ multiqueue_put(chan->events, term_delayed_free, 1, data);
+}
+
+void channel_info_changed(Channel *chan, bool new)
+{
+ event_T event = new ? EVENT_CHANOPEN : EVENT_CHANINFO;
+ if (has_event(event)) {
+ channel_incref(chan);
+ multiqueue_put(main_loop.events, set_info_event,
+ 2, chan, event);
+ }
+}
+
+static void set_info_event(void **argv)
+{
+ Channel *chan = argv[0];
+ event_T event = (event_T)(ptrdiff_t)argv[1];
+
+ dict_T *dict = get_vim_var_dict(VV_EVENT);
+ Dictionary info = channel_info(chan->id);
+ typval_T retval;
+ (void)object_to_vim(DICTIONARY_OBJ(info), &retval, NULL);
+ tv_dict_add_dict(dict, S_LEN("info"), retval.vval.v_dict);
+
+ apply_autocmds(event, NULL, NULL, false, curbuf);
+
+ tv_dict_clear(dict);
+ api_free_dictionary(info);
+ channel_decref(chan);
+}
+
+Dictionary channel_info(uint64_t id)
+{
+ Channel *chan = find_channel(id);
+ if (!chan) {
+ return (Dictionary)ARRAY_DICT_INIT;
+ }
+
+ Dictionary info = ARRAY_DICT_INIT;
+ PUT(info, "id", INTEGER_OBJ((Integer)chan->id));
+
+ const char *stream_desc, *mode_desc;
+ switch (chan->streamtype) {
+ case kChannelStreamProc:
+ stream_desc = "job";
+ if (chan->stream.proc.type == kProcessTypePty) {
+ const char *name = pty_process_tty_name(&chan->stream.pty);
+ PUT(info, "pty", STRING_OBJ(cstr_to_string(name)));
+ }
+ break;
+
+ case kChannelStreamStdio:
+ stream_desc = "stdio";
+ break;
+
+ case kChannelStreamStderr:
+ stream_desc = "stderr";
+ break;
+
+ case kChannelStreamInternal:
+ PUT(info, "internal", BOOLEAN_OBJ(true));
+ FALLTHROUGH;
+
+ case kChannelStreamSocket:
+ stream_desc = "socket";
+ break;
+
+ default:
+ abort();
+ }
+ PUT(info, "stream", STRING_OBJ(cstr_to_string(stream_desc)));
+
+ if (chan->is_rpc) {
+ mode_desc = "rpc";
+ PUT(info, "client", DICTIONARY_OBJ(rpc_client_info(chan)));
+ } else if (chan->term) {
+ mode_desc = "terminal";
+ PUT(info, "buffer", BUFFER_OBJ(terminal_buf(chan->term)));
+ } else {
+ mode_desc = "bytes";
+ }
+ PUT(info, "mode", STRING_OBJ(cstr_to_string(mode_desc)));
+
+ return info;
+}
+
+Array channel_all_info(void)
+{
+ Channel *channel;
+ Array ret = ARRAY_DICT_INIT;
+ map_foreach_value(channels, channel, {
+ ADD(ret, DICTIONARY_OBJ(channel_info(channel->id)));
+ });
+ return ret;
+}