aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/channel.c')
-rw-r--r--src/nvim/channel.c131
1 files changed, 60 insertions, 71 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index 41635747f8..021fdd4b79 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -13,12 +13,13 @@
#include "nvim/autocmd_defs.h"
#include "nvim/buffer_defs.h"
#include "nvim/channel.h"
+#include "nvim/errors.h"
#include "nvim/eval.h"
#include "nvim/eval/encode.h"
#include "nvim/eval/typval.h"
#include "nvim/event/loop.h"
#include "nvim/event/multiqueue.h"
-#include "nvim/event/process.h"
+#include "nvim/event/proc.h"
#include "nvim/event/rstream.h"
#include "nvim/event/socket.h"
#include "nvim/event/stream.h"
@@ -38,8 +39,6 @@
#include "nvim/os/os_defs.h"
#include "nvim/os/shell.h"
#include "nvim/path.h"
-#include "nvim/rbuffer.h"
-#include "nvim/rbuffer_defs.h"
#include "nvim/terminal.h"
#include "nvim/types_defs.h"
@@ -89,7 +88,7 @@ void channel_free_all_mem(void)
bool channel_close(uint64_t id, ChannelPart part, const char **error)
{
Channel *chan;
- Process *proc;
+ Proc *proc;
const char *dummy;
if (!error) {
@@ -126,32 +125,32 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
*error = e_invstream;
return false;
}
- stream_may_close(&chan->stream.socket);
+ rstream_may_close(&chan->stream.socket);
break;
case kChannelStreamProc:
proc = &chan->stream.proc;
if (part == kChannelPartStdin || close_main) {
- stream_may_close(&proc->in);
+ wstream_may_close(&proc->in);
}
if (part == kChannelPartStdout || close_main) {
- stream_may_close(&proc->out);
+ rstream_may_close(&proc->out);
}
if (part == kChannelPartStderr || part == kChannelPartAll) {
- stream_may_close(&proc->err);
+ rstream_may_close(&proc->err);
}
- if (proc->type == kProcessTypePty && part == kChannelPartAll) {
- pty_process_close_master(&chan->stream.pty);
+ if (proc->type == kProcTypePty && part == kChannelPartAll) {
+ pty_proc_close_master(&chan->stream.pty);
}
break;
case kChannelStreamStdio:
if (part == kChannelPartStdin || close_main) {
- stream_may_close(&chan->stream.stdio.in);
+ rstream_may_close(&chan->stream.stdio.in);
}
if (part == kChannelPartStdout || close_main) {
- stream_may_close(&chan->stream.stdio.out);
+ wstream_may_close(&chan->stream.stdio.out);
}
if (part == kChannelPartStderr) {
*error = e_invstream;
@@ -240,11 +239,11 @@ void channel_create_event(Channel *chan, const char *ext_source)
assert(chan->id <= VARNUMBER_MAX);
Arena arena = ARENA_EMPTY;
- Dictionary info = channel_info(chan->id, &arena);
+ Dict info = channel_info(chan->id, &arena);
typval_T tv = TV_INITIAL_VALUE;
// TODO(bfredl): do the conversion in one step. Also would be nice
// to pretty print top level dict in defined order
- object_to_vim(DICTIONARY_OBJ(info), &tv, NULL);
+ object_to_vim(DICT_OBJ(info), &tv, NULL);
assert(tv.v_type == VAR_DICT);
char *str = encode_tv2json(&tv, NULL);
ILOG("new channel %" PRIu64 " (%s) : %s", chan->id, source, str);
@@ -290,7 +289,7 @@ static void channel_destroy(Channel *chan)
}
if (chan->streamtype == kChannelStreamProc) {
- process_free(&chan->stream.proc);
+ proc_free(&chan->stream.proc);
}
callback_reader_free(&chan->on_data);
@@ -377,7 +376,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
*status_out = 0;
return NULL;
}
- chan->stream.pty = pty_process_init(&main_loop, chan);
+ chan->stream.pty = pty_proc_init(&main_loop, chan);
if (pty_width > 0) {
chan->stream.pty.width = pty_width;
}
@@ -385,22 +384,22 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
chan->stream.pty.height = pty_height;
}
} else {
- chan->stream.uv = libuv_process_init(&main_loop, chan);
+ chan->stream.uv = libuv_proc_init(&main_loop, chan);
}
- Process *proc = &chan->stream.proc;
+ Proc *proc = &chan->stream.proc;
proc->argv = argv;
proc->exepath = exepath;
- proc->cb = channel_process_exit_cb;
+ proc->cb = channel_proc_exit_cb;
proc->events = chan->events;
proc->detach = detach;
proc->cwd = cwd;
proc->env = env;
proc->overlapped = overlapped;
- char *cmd = xstrdup(process_get_exepath(proc));
+ char *cmd = xstrdup(proc_get_exepath(proc));
bool has_out, has_err;
- if (proc->type == kProcessTypePty) {
+ if (proc->type == kProcTypePty) {
has_out = true;
has_err = false;
} else {
@@ -411,7 +410,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
bool has_in = stdin_mode == kChannelStdinPipe;
- int status = process_spawn(proc, has_in, has_out, has_err);
+ int status = proc_spawn(proc, has_in, has_out, has_err);
if (status) {
semsg(_(e_jobspawn), os_strerror(status), cmd);
xfree(cmd);
@@ -431,7 +430,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
wstream_init(&proc->in, 0);
}
if (has_out) {
- rstream_init(&proc->out, 0);
+ rstream_init(&proc->out);
}
if (rpc) {
@@ -446,7 +445,7 @@ Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_s
if (has_err) {
callback_reader_start(&chan->on_stderr, "stderr");
- rstream_init(&proc->err, 0);
+ rstream_init(&proc->err);
rstream_start(&proc->err, on_job_stderr, chan);
}
@@ -480,10 +479,10 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader
return 0;
}
- channel->stream.socket.internal_close_cb = close_cb;
- channel->stream.socket.internal_data = channel;
- wstream_init(&channel->stream.socket, 0);
- rstream_init(&channel->stream.socket, 0);
+ channel->stream.socket.s.internal_close_cb = close_cb;
+ channel->stream.socket.s.internal_data = channel;
+ wstream_init(&channel->stream.socket.s, 0);
+ rstream_init(&channel->stream.socket);
if (rpc) {
rpc_start(channel);
@@ -505,10 +504,10 @@ void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = channel_alloc(kChannelStreamSocket);
socket_watcher_accept(watcher, &channel->stream.socket);
- channel->stream.socket.internal_close_cb = close_cb;
- channel->stream.socket.internal_data = channel;
- wstream_init(&channel->stream.socket, 0);
- rstream_init(&channel->stream.socket, 0);
+ channel->stream.socket.s.internal_close_cb = close_cb;
+ channel->stream.socket.s.internal_data = channel;
+ wstream_init(&channel->stream.socket.s, 0);
+ rstream_init(&channel->stream.socket);
rpc_start(channel);
channel_create_event(channel, watcher->addr);
}
@@ -553,7 +552,7 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **err
dup2(STDERR_FILENO, STDIN_FILENO);
}
#endif
- rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd, 0);
+ rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd);
wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0);
if (rpc) {
@@ -647,51 +646,38 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
return l;
}
-void on_channel_data(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof)
+size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
Channel *chan = data;
- on_channel_output(stream, chan, buf, eof, &chan->on_data);
+ return on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
}
-void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof)
+size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof)
{
Channel *chan = data;
- on_channel_output(stream, chan, buf, eof, &chan->on_stderr);
+ return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
}
-static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool eof,
- CallbackReader *reader)
+static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count,
+ bool eof, CallbackReader *reader)
{
- size_t count;
- char *output = rbuffer_read_ptr(buf, &count);
-
if (chan->term) {
- if (!eof) {
- char *p = output;
- char *end = output + count;
+ if (count) {
+ const char *p = buf;
+ const char *end = buf + count;
while (p < end) {
// Don't pass incomplete UTF-8 sequences to libvterm. #16245
// Composing chars can be passed separately, so utf_ptr2len_len() is enough.
int clen = utf_ptr2len_len(p, (int)(end - p));
if (clen > end - p) {
- count = (size_t)(p - output);
+ count = (size_t)(p - buf);
break;
}
p += clen;
}
}
- terminal_receive(chan->term, output, count);
- }
-
- if (count) {
- rbuffer_consumed(buf, count);
- }
- // Move remaining data to start of buffer, so the buffer can never wrap around.
- rbuffer_reset(buf);
-
- if (callback_reader_set(*reader)) {
- ga_concat_len(&reader->buffer, output, count);
+ terminal_receive(chan->term, buf, count);
}
if (eof) {
@@ -699,8 +685,11 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool
}
if (callback_reader_set(*reader)) {
+ ga_concat_len(&reader->buffer, buf, count);
schedule_channel_event(chan);
}
+
+ return count;
}
/// schedule the necessary callbacks to be invoked as a deferred event
@@ -771,7 +760,7 @@ void channel_reader_callbacks(Channel *chan, CallbackReader *reader)
}
}
-static void channel_process_exit_cb(Process *proc, int status, void *data)
+static void channel_proc_exit_cb(Proc *proc, int status, void *data)
{
Channel *chan = data;
if (chan->term) {
@@ -858,13 +847,13 @@ static void term_write(const char *buf, size_t size, void *data)
static void term_resize(uint16_t width, uint16_t height, void *data)
{
Channel *chan = data;
- pty_process_resize(&chan->stream.pty, width, height);
+ pty_proc_resize(&chan->stream.pty, width, height);
}
static inline void term_delayed_free(void **argv)
{
Channel *chan = argv[0];
- if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) {
+ if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) {
multiqueue_put(chan->events, term_delayed_free, chan);
return;
}
@@ -878,7 +867,7 @@ static inline void term_delayed_free(void **argv)
static void term_close(void *data)
{
Channel *chan = data;
- process_stop(&chan->stream.proc);
+ proc_stop(&chan->stream.proc);
multiqueue_put(chan->events, term_delayed_free, data);
}
@@ -899,9 +888,9 @@ static void set_info_event(void **argv)
save_v_event_T save_v_event;
dict_T *dict = get_v_event(&save_v_event);
Arena arena = ARENA_EMPTY;
- Dictionary info = channel_info(chan->id, &arena);
+ Dict info = channel_info(chan->id, &arena);
typval_T retval;
- object_to_vim(DICTIONARY_OBJ(info), &retval, NULL);
+ object_to_vim(DICT_OBJ(info), &retval, NULL);
assert(retval.v_type == VAR_DICT);
tv_dict_add_dict(dict, S_LEN("info"), retval.vval.v_dict);
tv_dict_set_keys_readonly(dict);
@@ -918,25 +907,25 @@ bool channel_job_running(uint64_t id)
Channel *chan = find_channel(id);
return (chan
&& chan->streamtype == kChannelStreamProc
- && !process_is_stopped(&chan->stream.proc));
+ && !proc_is_stopped(&chan->stream.proc));
}
-Dictionary channel_info(uint64_t id, Arena *arena)
+Dict channel_info(uint64_t id, Arena *arena)
{
Channel *chan = find_channel(id);
if (!chan) {
- return (Dictionary)ARRAY_DICT_INIT;
+ return (Dict)ARRAY_DICT_INIT;
}
- Dictionary info = arena_dict(arena, 8);
+ Dict info = arena_dict(arena, 8);
PUT_C(info, "id", INTEGER_OBJ((Integer)chan->id));
const char *stream_desc, *mode_desc;
switch (chan->streamtype) {
case kChannelStreamProc: {
stream_desc = "job";
- if (chan->stream.proc.type == kProcessTypePty) {
- const char *name = pty_process_tty_name(&chan->stream.pty);
+ if (chan->stream.proc.type == kProcTypePty) {
+ const char *name = pty_proc_tty_name(&chan->stream.pty);
PUT_C(info, "pty", CSTR_TO_ARENA_OBJ(arena, name));
}
@@ -974,7 +963,7 @@ Dictionary channel_info(uint64_t id, Arena *arena)
if (chan->is_rpc) {
mode_desc = "rpc";
- PUT_C(info, "client", DICTIONARY_OBJ(chan->rpc.info));
+ PUT_C(info, "client", DICT_OBJ(chan->rpc.info));
} else if (chan->term) {
mode_desc = "terminal";
PUT_C(info, "buffer", BUFFER_OBJ(terminal_buf(chan->term)));
@@ -1007,7 +996,7 @@ Array channel_all_info(Arena *arena)
Array ret = arena_array(arena, ids.size);
for (size_t i = 0; i < ids.size; i++) {
- ADD_C(ret, DICTIONARY_OBJ(channel_info((uint64_t)ids.items[i], arena)));
+ ADD_C(ret, DICT_OBJ(channel_info((uint64_t)ids.items[i], arena)));
}
return ret;
}