diff options
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r-- | src/nvim/channel.c | 79 |
1 files changed, 34 insertions, 45 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c index 41635747f8..e3df12abbe 100644 --- a/src/nvim/channel.c +++ b/src/nvim/channel.c @@ -13,6 +13,7 @@ #include "nvim/autocmd_defs.h" #include "nvim/buffer_defs.h" #include "nvim/channel.h" +#include "nvim/errors.h" #include "nvim/eval.h" #include "nvim/eval/encode.h" #include "nvim/eval/typval.h" @@ -38,8 +39,6 @@ #include "nvim/os/os_defs.h" #include "nvim/os/shell.h" #include "nvim/path.h" -#include "nvim/rbuffer.h" -#include "nvim/rbuffer_defs.h" #include "nvim/terminal.h" #include "nvim/types_defs.h" @@ -126,19 +125,19 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error) *error = e_invstream; return false; } - stream_may_close(&chan->stream.socket); + rstream_may_close(&chan->stream.socket); break; case kChannelStreamProc: proc = &chan->stream.proc; if (part == kChannelPartStdin || close_main) { - stream_may_close(&proc->in); + wstream_may_close(&proc->in); } if (part == kChannelPartStdout || close_main) { - stream_may_close(&proc->out); + rstream_may_close(&proc->out); } if (part == kChannelPartStderr || part == kChannelPartAll) { - stream_may_close(&proc->err); + rstream_may_close(&proc->err); } if (proc->type == kProcessTypePty && part == kChannelPartAll) { pty_process_close_master(&chan->stream.pty); @@ -148,10 +147,10 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error) case kChannelStreamStdio: if (part == kChannelPartStdin || close_main) { - stream_may_close(&chan->stream.stdio.in); + rstream_may_close(&chan->stream.stdio.in); } if (part == kChannelPartStdout || close_main) { - stream_may_close(&chan->stream.stdio.out); + wstream_may_close(&chan->stream.stdio.out); } if (part == kChannelPartStderr) { *error = e_invstream; @@ -431,7 +430,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s wstream_init(&proc->in, 0); } if (has_out) { - rstream_init(&proc->out, 0); + rstream_init(&proc->out); } if (rpc) { @@ -446,7 +445,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s if (has_err) { callback_reader_start(&chan->on_stderr, "stderr"); - rstream_init(&proc->err, 0); + rstream_init(&proc->err); rstream_start(&proc->err, on_job_stderr, chan); } @@ -480,10 +479,10 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader return 0; } - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - wstream_init(&channel->stream.socket, 0); - rstream_init(&channel->stream.socket, 0); + channel->stream.socket.s.internal_close_cb = close_cb; + channel->stream.socket.s.internal_data = channel; + wstream_init(&channel->stream.socket.s, 0); + rstream_init(&channel->stream.socket); if (rpc) { rpc_start(channel); @@ -505,10 +504,10 @@ void channel_from_connection(SocketWatcher *watcher) { Channel *channel = channel_alloc(kChannelStreamSocket); socket_watcher_accept(watcher, &channel->stream.socket); - channel->stream.socket.internal_close_cb = close_cb; - channel->stream.socket.internal_data = channel; - wstream_init(&channel->stream.socket, 0); - rstream_init(&channel->stream.socket, 0); + channel->stream.socket.s.internal_close_cb = close_cb; + channel->stream.socket.s.internal_data = channel; + wstream_init(&channel->stream.socket.s, 0); + rstream_init(&channel->stream.socket); rpc_start(channel); channel_create_event(channel, watcher->addr); } @@ -553,7 +552,7 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **err dup2(STDERR_FILENO, STDIN_FILENO); } #endif - rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd, 0); + rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd); wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0); if (rpc) { @@ -640,58 +639,45 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) list_T *const l = tv_list_alloc(kListLenMayKnow); // Empty buffer should be represented by [''], encode_list_write() thinks // empty list is fine for the case. - tv_list_append_string(l, "", 0); + tv_list_append_string(l, S_LEN("")); if (len > 0) { encode_list_write(l, buf, len); } return l; } -void on_channel_data(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof) { Channel *chan = data; - on_channel_output(stream, chan, buf, eof, &chan->on_data); + return on_channel_output(stream, chan, buf, count, eof, &chan->on_data); } -void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) +size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof) { Channel *chan = data; - on_channel_output(stream, chan, buf, eof, &chan->on_stderr); + return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr); } -static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool eof, - CallbackReader *reader) +static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count, + bool eof, CallbackReader *reader) { - size_t count; - char *output = rbuffer_read_ptr(buf, &count); - if (chan->term) { - if (!eof) { - char *p = output; - char *end = output + count; + if (count) { + const char *p = buf; + const char *end = buf + count; while (p < end) { // Don't pass incomplete UTF-8 sequences to libvterm. #16245 // Composing chars can be passed separately, so utf_ptr2len_len() is enough. int clen = utf_ptr2len_len(p, (int)(end - p)); if (clen > end - p) { - count = (size_t)(p - output); + count = (size_t)(p - buf); break; } p += clen; } } - terminal_receive(chan->term, output, count); - } - - if (count) { - rbuffer_consumed(buf, count); - } - // Move remaining data to start of buffer, so the buffer can never wrap around. - rbuffer_reset(buf); - - if (callback_reader_set(*reader)) { - ga_concat_len(&reader->buffer, output, count); + terminal_receive(chan->term, buf, count); } if (eof) { @@ -699,8 +685,11 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool } if (callback_reader_set(*reader)) { + ga_concat_len(&reader->buffer, buf, count); schedule_channel_event(chan); } + + return count; } /// schedule the necessary callbacks to be invoked as a deferred event @@ -864,7 +853,7 @@ static void term_resize(uint16_t width, uint16_t height, void *data) static inline void term_delayed_free(void **argv) { Channel *chan = argv[0]; - if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) { + if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) { multiqueue_put(chan->events, term_delayed_free, chan); return; } |