aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r--src/nvim/channel.c79
1 files changed, 34 insertions, 45 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index 41635747f8..e3df12abbe 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -13,6 +13,7 @@
#include "nvim/autocmd_defs.h"
#include "nvim/buffer_defs.h"
#include "nvim/channel.h"
+#include "nvim/errors.h"
#include "nvim/eval.h"
#include "nvim/eval/encode.h"
#include "nvim/eval/typval.h"
@@ -38,8 +39,6 @@
#include "nvim/os/os_defs.h"
#include "nvim/os/shell.h"
#include "nvim/path.h"
-#include "nvim/rbuffer.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/terminal.h"
#include "nvim/types_defs.h"
@@ -126,19 +125,19 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
*error = e_invstream;
return false;
}
- stream_may_close(&chan->stream.socket);
+ rstream_may_close(&chan->stream.socket);
break;
case kChannelStreamProc:
proc = &chan->stream.proc;
if (part == kChannelPartStdin || close_main) {
- stream_may_close(&proc->in);
+ wstream_may_close(&proc->in);
}
if (part == kChannelPartStdout || close_main) {
- stream_may_close(&proc->out);
+ rstream_may_close(&proc->out);
}
if (part == kChannelPartStderr || part == kChannelPartAll) {
- stream_may_close(&proc->err);
+ rstream_may_close(&proc->err);
}
if (proc->type == kProcessTypePty && part == kChannelPartAll) {
pty_process_close_master(&chan->stream.pty);
@@ -148,10 +147,10 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
case kChannelStreamStdio:
if (part == kChannelPartStdin || close_main) {
- stream_may_close(&chan->stream.stdio.in);
+ rstream_may_close(&chan->stream.stdio.in);
}
if (part == kChannelPartStdout || close_main) {
- stream_may_close(&chan->stream.stdio.out);
+ wstream_may_close(&chan->stream.stdio.out);
}
if (part == kChannelPartStderr) {
*error = e_invstream;
@@ -431,7 +430,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
wstream_init(&proc->in, 0);
}
if (has_out) {
- rstream_init(&proc->out, 0);
+ rstream_init(&proc->out);
}
if (rpc) {
@@ -446,7 +445,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
if (has_err) {
callback_reader_start(&chan->on_stderr, "stderr");
- rstream_init(&proc->err, 0);
+ rstream_init(&proc->err);
rstream_start(&proc->err, on_job_stderr, chan);
}
@@ -480,10 +479,10 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader
return 0;
}
- channel->stream.socket.internal_close_cb = close_cb;
- channel->stream.socket.internal_data = channel;
- wstream_init(&channel->stream.socket, 0);
- rstream_init(&channel->stream.socket, 0);
+ channel->stream.socket.s.internal_close_cb = close_cb;
+ channel->stream.socket.s.internal_data = channel;
+ wstream_init(&channel->stream.socket.s, 0);
+ rstream_init(&channel->stream.socket);
if (rpc) {
rpc_start(channel);
@@ -505,10 +504,10 @@ void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = channel_alloc(kChannelStreamSocket);
socket_watcher_accept(watcher, &channel->stream.socket);
- channel->stream.socket.internal_close_cb = close_cb;
- channel->stream.socket.internal_data = channel;
- wstream_init(&channel->stream.socket, 0);
- rstream_init(&channel->stream.socket, 0);
+ channel->stream.socket.s.internal_close_cb = close_cb;
+ channel->stream.socket.s.internal_data = channel;
+ wstream_init(&channel->stream.socket.s, 0);
+ rstream_init(&channel->stream.socket);
rpc_start(channel);
channel_create_event(channel, watcher->addr);
}
@@ -553,7 +552,7 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **err
dup2(STDERR_FILENO, STDIN_FILENO);
}
#endif
- rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd, 0);
+ rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd);
wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0);
if (rpc) {
@@ -640,58 +639,45 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
list_T *const l = tv_list_alloc(kListLenMayKnow);
// Empty buffer should be represented by [''], encode_list_write() thinks
// empty list is fine for the case.
- tv_list_append_string(l, "", 0);
+ tv_list_append_string(l, S_LEN(""));
if (len > 0) {
encode_list_write(l, buf, len);
}
return l;
}
-void on_channel_data(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof)
+size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
Channel *chan = data;
- on_channel_output(stream, chan, buf, eof, &chan->on_data);
+ return on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
}
-void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof)
+size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
Channel *chan = data;
- on_channel_output(stream, chan, buf, eof, &chan->on_stderr);
+ return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
}
-static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool eof,
- CallbackReader *reader)
+static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count,
+ bool eof, CallbackReader *reader)
{
- size_t count;
- char *output = rbuffer_read_ptr(buf, &count);
-
if (chan->term) {
- if (!eof) {
- char *p = output;
- char *end = output + count;
+ if (count) {
+ const char *p = buf;
+ const char *end = buf + count;
while (p < end) {
// Don't pass incomplete UTF-8 sequences to libvterm. #16245
// Composing chars can be passed separately, so utf_ptr2len_len() is enough.
int clen = utf_ptr2len_len(p, (int)(end - p));
if (clen > end - p) {
- count = (size_t)(p - output);
+ count = (size_t)(p - buf);
break;
}
p += clen;
}
}
- terminal_receive(chan->term, output, count);
- }
-
- if (count) {
- rbuffer_consumed(buf, count);
- }
- // Move remaining data to start of buffer, so the buffer can never wrap around.
- rbuffer_reset(buf);
-
- if (callback_reader_set(*reader)) {
- ga_concat_len(&reader->buffer, output, count);
+ terminal_receive(chan->term, buf, count);
}
if (eof) {
@@ -699,8 +685,11 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool
}
if (callback_reader_set(*reader)) {
+ ga_concat_len(&reader->buffer, buf, count);
schedule_channel_event(chan);
}
+
+ return count;
}
/// schedule the necessary callbacks to be invoked as a deferred event
@@ -864,7 +853,7 @@ static void term_resize(uint16_t width, uint16_t height, void *data)
static inline void term_delayed_free(void **argv)
{
Channel *chan = argv[0];
- if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) {
+ if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) {
multiqueue_put(chan->events, term_delayed_free, chan);
return;
}