diff options
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r-- | src/nvim/channel.c | 131 |
1 files changed, 60 insertions, 71 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c index 41635747f8..021fdd4b79 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -13,12 +13,13 @@ #include "nvim/autocmd_defs.h" #include "nvim/buffer_defs.h" #include "nvim/channel.h" +#include "nvim/errors.h" #include "nvim/eval.h" #include "nvim/eval/encode.h" #include "nvim/eval/typval.h" #include "nvim/event/loop.h" #include "nvim/event/multiqueue.h" -#include "nvim/event/process.h" +#include "nvim/event/proc.h" #include "nvim/event/rstream.h" #include "nvim/event/socket.h" #include "nvim/event/stream.h" @@ -38,8 +39,6 @@ #include "nvim/os/os_defs.h" #include "nvim/os/shell.h" #include "nvim/path.h" -#include "nvim/rbuffer.h" -#include "nvim/rbuffer_defs.h" #include "nvim/terminal.h" #include "nvim/types_defs.h" @@ -89,7 +88,7 @@ void channel_free_all_mem(void) bool channel_close(uint64_t id, ChannelPart part, const char **error) { Channel *chan; - Process *proc; + Proc *proc; const char *dummy; if (!error) { @@ -126,32 +125,32 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error) *error = e_invstream; return false; } - stream_may_close(&chan->stream.socket); + rstream_may_close(&chan->stream.socket); break; case kChannelStreamProc: proc = &chan->stream.proc; if (part == kChannelPartStdin || close_main) { - stream_may_close(&proc->in); + wstream_may_close(&proc->in); } if (part == kChannelPartStdout || close_main) { - stream_may_close(&proc->out); + rstream_may_close(&proc->out); } if (part == kChannelPartStderr || part == kChannelPartAll) { - stream_may_close(&proc->err); + rstream_may_close(&proc->err); } - if (proc->type == kProcessTypePty && part == kChannelPartAll) { - pty_process_close_master(&chan->stream.pty); + if (proc->type == kProcTypePty && part == kChannelPartAll) { + pty_proc_close_master(&chan->stream.pty); } break; case kChannelStreamStdio: if (part == kChannelPartStdin || close_main) { - stream_may_close(&chan->stream.stdio.in); + rstream_may_close(&chan->stream.stdio.in); } if (part == kChannelPartStdout || close_main) { - stream_may_close(&chan->stream.stdio.out); + wstream_may_close(&chan->stream.stdio.out); } if (part == kChannelPartStderr) { *error = e_invstream; @@ -240,11 +239,11 @@ void channel_create_event(Channel *chan, const char *ext_source) assert(chan->id <= VARNUMBER_MAX); Arena arena = ARENA_EMPTY; - Dictionary info = channel_info(chan->id, &arena); + Dict info = channel_info(chan->id, &arena); typval_T tv = TV_INITIAL_VALUE; // TODO(bfredl): do the conversion in one step. Also would be nice // to pretty print top level dict in defined order - object_to_vim(DICTIONARY_OBJ(info), &tv, NULL); + object_to_vim(DICT_OBJ(info), &tv, NULL); assert(tv.v_type == VAR_DICT); char *str = encode_tv2json(&tv, NULL); ILOG("new channel %" PRIu64 " (%s) : %s", chan->id, source, str); @@ -290,7 +289,7 @@ static void channel_destroy(Channel *chan) } if (chan->streamtype == kChannelStreamProc) { - process_free(&chan->stream.proc); + proc_free(&chan->stream.proc); } callback_reader_free(&chan->on_data); @@ -377,7 +376,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s *status_out = 0; return NULL; } - chan->stream.pty = pty_process_init(&main_loop, chan); + chan->stream.pty = pty_proc_init(&main_loop, chan); if (pty_width > 0) { chan->stream.pty.width = pty_width; } @@ -385,22 +384,22 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s chan->stream.pty.height = pty_height; } } else { - chan->stream.uv = libuv_process_init(&main_loop, chan); + chan->stream.uv = libuv_proc_init(&main_loop, chan); } - Process *proc = &chan->stream.proc; + Proc *proc = &chan->stream.proc; proc->argv = argv; proc->exepath = exepath; - proc->cb = channel_process_exit_cb; + proc->cb = channel_proc_exit_cb; proc->events = chan->events; proc->detach = detach; proc->cwd = cwd; proc->env = env; proc->overlapped = overlapped; - char *cmd = xstrdup(process_get_exepath(proc)); + char *cmd = xstrdup(proc_get_exepath(proc)); bool has_out, has_err; - if (proc->type == kProcessTypePty) { + if (proc->type == kProcTypePty) { has_out = true; has_err = false; } else { @@ -411,7 +410,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s bool has_in = stdin_mode == kChannelStdinPipe; - int status = process_spawn(proc, has_in, has_out, has_err); + int status = proc_spawn(proc, has_in, has_out, has_err); if (status) { semsg(_(e_jobspawn), os_strerror(status), cmd); xfree(cmd); @@ -431,7 +430,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s wstream_init(&proc->in, 0); } if (has_out) { - rstream_init(&proc->out, 0); + rstream_init(&proc->out); } if (rpc) { @@ -446,7 +445,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s if (has_err) { callback_reader_start(&chan->on_stderr, "stderr"); - rstream_init(&proc->err, 0); + rstream_init(&proc->err); rstream_start(&proc->err, on_job_stderr, chan); } @@ -480,10 +479,10 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader return 0; } - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - wstream_init(&channel->stream.socket, 0); - rstream_init(&channel->stream.socket, 0); + channel->stream.socket.s.internal_close_cb = close_cb; + channel->stream.socket.s.internal_data = channel; + wstream_init(&channel->stream.socket.s, 0); + rstream_init(&channel->stream.socket); if (rpc) { rpc_start(channel); @@ -505,10 +504,10 @@ void channel_from_connection(SocketWatcher *watcher) { Channel *channel = channel_alloc(kChannelStreamSocket); socket_watcher_accept(watcher, &channel->stream.socket); - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - wstream_init(&channel->stream.socket, 0); - rstream_init(&channel->stream.socket, 0); + channel->stream.socket.s.internal_close_cb = close_cb; + channel->stream.socket.s.internal_data = channel; + wstream_init(&channel->stream.socket.s, 0); + rstream_init(&channel->stream.socket); rpc_start(channel); channel_create_event(channel, watcher->addr); } @@ -553,7 +552,7 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **err dup2(STDERR_FILENO, STDIN_FILENO); } #endif - rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd, 0); + rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd); wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0); if (rpc) { @@ -647,51 +646,38 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) return l; } -void on_channel_data(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof) { Channel *chan = data; - on_channel_output(stream, chan, buf, eof, &chan->on_data); + return on_channel_output(stream, chan, buf, count, eof, &chan->on_data); } -void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof) { Channel *chan = data; - on_channel_output(stream, chan, buf, eof, &chan->on_stderr); + return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr); } -static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool eof, - CallbackReader *reader) +static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count, + bool eof, CallbackReader *reader) { - size_t count; - char *output = rbuffer_read_ptr(buf, &count); - if (chan->term) { - if (!eof) { - char *p = output; - char *end = output + count; + if (count) { + const char *p = buf; + const char *end = buf + count; while (p < end) { // Don't pass incomplete UTF-8 sequences to libvterm. #16245 // Composing chars can be passed separately, so utf_ptr2len_len() is enough. int clen = utf_ptr2len_len(p, (int)(end - p)); if (clen > end - p) { - count = (size_t)(p - output); + count = (size_t)(p - buf); break; } p += clen; } } - terminal_receive(chan->term, output, count); - } - - if (count) { - rbuffer_consumed(buf, count); - } - // Move remaining data to start of buffer, so the buffer can never wrap around. - rbuffer_reset(buf); - - if (callback_reader_set(*reader)) { - ga_concat_len(&reader->buffer, output, count); + terminal_receive(chan->term, buf, count); } if (eof) { @@ -699,8 +685,11 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool } if (callback_reader_set(*reader)) { + ga_concat_len(&reader->buffer, buf, count); schedule_channel_event(chan); } + + return count; } /// schedule the necessary callbacks to be invoked as a deferred event @@ -771,7 +760,7 @@ void channel_reader_callbacks(Channel *chan, CallbackReader *reader) } } -static void channel_process_exit_cb(Process *proc, int status, void *data) +static void channel_proc_exit_cb(Proc *proc, int status, void *data) { Channel *chan = data; if (chan->term) { @@ -858,13 +847,13 @@ static void term_write(const char *buf, size_t size, void *data) static void term_resize(uint16_t width, uint16_t height, void *data) { Channel *chan = data; - pty_process_resize(&chan->stream.pty, width, height); + pty_proc_resize(&chan->stream.pty, width, height); } static inline void term_delayed_free(void **argv) { Channel *chan = argv[0]; - if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) { + if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) { multiqueue_put(chan->events, term_delayed_free, chan); return; } @@ -878,7 +867,7 @@ static inline void term_delayed_free(void **argv) static void term_close(void *data) { Channel *chan = data; - process_stop(&chan->stream.proc); + proc_stop(&chan->stream.proc); multiqueue_put(chan->events, term_delayed_free, data); } @@ -899,9 +888,9 @@ static void set_info_event(void **argv) save_v_event_T save_v_event; dict_T *dict = get_v_event(&save_v_event); Arena arena = ARENA_EMPTY; - Dictionary info = channel_info(chan->id, &arena); + Dict info = channel_info(chan->id, &arena); typval_T retval; - object_to_vim(DICTIONARY_OBJ(info), &retval, NULL); + object_to_vim(DICT_OBJ(info), &retval, NULL); assert(retval.v_type == VAR_DICT); tv_dict_add_dict(dict, S_LEN("info"), retval.vval.v_dict); tv_dict_set_keys_readonly(dict); @@ -918,25 +907,25 @@ bool channel_job_running(uint64_t id) Channel *chan = find_channel(id); return (chan && chan->streamtype == kChannelStreamProc - && !process_is_stopped(&chan->stream.proc)); + && !proc_is_stopped(&chan->stream.proc)); } -Dictionary channel_info(uint64_t id, Arena *arena) +Dict channel_info(uint64_t id, Arena *arena) { Channel *chan = find_channel(id); if (!chan) { - return (Dictionary)ARRAY_DICT_INIT; + return (Dict)ARRAY_DICT_INIT; } - Dictionary info = arena_dict(arena, 8); + Dict info = arena_dict(arena, 8); PUT_C(info, "id", INTEGER_OBJ((Integer)chan->id)); const char *stream_desc, *mode_desc; switch (chan->streamtype) { case kChannelStreamProc: { stream_desc = "job"; - if (chan->stream.proc.type == kProcessTypePty) { - const char *name = pty_process_tty_name(&chan->stream.pty); + if (chan->stream.proc.type == kProcTypePty) { + const char *name = pty_proc_tty_name(&chan->stream.pty); PUT_C(info, "pty", CSTR_TO_ARENA_OBJ(arena, name)); } @@ -974,7 +963,7 @@ Dictionary channel_info(uint64_t id, Arena *arena) if (chan->is_rpc) { mode_desc = "rpc"; - PUT_C(info, "client", DICTIONARY_OBJ(chan->rpc.info)); + PUT_C(info, "client", DICT_OBJ(chan->rpc.info)); } else if (chan->term) { mode_desc = "terminal"; PUT_C(info, "buffer", BUFFER_OBJ(terminal_buf(chan->term))); @@ -1007,7 +996,7 @@ Array channel_all_info(Arena *arena) Array ret = arena_array(arena, ids.size); for (size_t i = 0; i < ids.size; i++) { - ADD_C(ret, DICTIONARY_OBJ(channel_info((uint64_t)ids.items[i], arena))); + ADD_C(ret, DICT_OBJ(channel_info((uint64_t)ids.items[i], arena))); } return ret; } |