aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/nvim/channel.c98
-rw-r--r--src/nvim/channel.h9
-rw-r--r--src/nvim/eval.c107
-rw-r--r--src/nvim/eval.lua3
-rw-r--r--src/nvim/event/process.c34
-rw-r--r--src/nvim/event/stream.c7
-rw-r--r--src/nvim/globals.h3
-rw-r--r--src/nvim/msgpack_rpc/channel.c62
-rw-r--r--src/nvim/os/shell.c4
9 files changed, 172 insertions, 155 deletions
diff --git a/src/nvim/channel.c b/src/nvim/channel.c
index c6db2b7b7a..416e0a1fb6 100644
--- a/src/nvim/channel.c
+++ b/src/nvim/channel.c
@@ -35,10 +35,99 @@ void channel_teardown(void)
Channel *channel;
map_foreach_value(channels, channel, {
- (void)channel; // close_channel(channel);
+ channel_close(channel->id, kChannelPartAll, NULL);
});
}
+/// Closes a channel
+///
+/// @param id The channel id
+/// @return true if successful, false otherwise
+bool channel_close(uint64_t id, ChannelPart part, const char **error)
+{
+ Channel *chan;
+ Process *proc;
+
+ const char *dummy;
+ if (!error) {
+ error = &dummy;
+ }
+
+ if (!(chan = find_channel(id))) {
+ if (id < next_chan_id) {
+ // allow double close, even though we can't say what parts was valid.
+ return true;
+ }
+ *error = (const char *)e_invchan;
+ return false;
+ }
+
+ bool close_main = false;
+ if (part == kChannelPartRpc || part == kChannelPartAll) {
+ close_main = true;
+ if (chan->is_rpc) {
+ rpc_close(chan);
+ } else if (part == kChannelPartRpc) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ } else if ((part == kChannelPartStdin || part == kChannelPartStdout)
+ && chan->is_rpc) {
+ // EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
+ *error = (const char *)e_invstreamrpc;
+ return false;
+ }
+
+ switch (chan->streamtype) {
+ case kChannelStreamSocket:
+ if (!close_main) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ stream_may_close(&chan->stream.socket);
+ break;
+
+ case kChannelStreamProc:
+ proc = (Process *)&chan->stream.proc;
+ if (part == kChannelPartStdin || close_main) {
+ stream_may_close(&proc->in);
+ }
+ if (part == kChannelPartStdout || close_main) {
+ stream_may_close(&proc->out);
+ }
+ if (part == kChannelPartStderr || part == kChannelPartAll) {
+ stream_may_close(&proc->err);
+ }
+ if (proc->type == kProcessTypePty && part == kChannelPartAll) {
+ pty_process_close_master(&chan->stream.pty);
+ }
+
+ break;
+
+ case kChannelStreamStdio:
+ if (part == kChannelPartStdin || close_main) {
+ stream_may_close(&chan->stream.stdio.in);
+ }
+ if (part == kChannelPartStdout || close_main) {
+ stream_may_close(&chan->stream.stdio.out);
+ }
+ if (part == kChannelPartStderr) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ break;
+
+ case kChannelStreamInternal:
+ if (!close_main) {
+ *error = (const char *)e_invstream;
+ return false;
+ }
+ break;
+ }
+
+ return true;
+}
+
/// Initializes the module
void channel_init(void)
{
@@ -239,7 +328,6 @@ uint64_t channel_connect(bool tcp, const char *address,
return 0;
}
- channel_incref(channel); // close channel only after the stream is closed
channel->stream.socket.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel;
wstream_init(&channel->stream.socket, 0);
@@ -264,7 +352,6 @@ void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = channel_alloc(kChannelStreamSocket);
socket_watcher_accept(watcher, &channel->stream.socket);
- channel_incref(channel); // close channel only after the stream is closed
channel->stream.socket.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel;
wstream_init(&channel->stream.socket, 0);
@@ -277,7 +364,6 @@ void channel_from_connection(SocketWatcher *watcher)
static uint64_t channel_create_internal_rpc(void)
{
Channel *channel = channel_alloc(kChannelStreamInternal);
- channel_incref(channel); // internal channel lives until process exit
rpc_start(channel);
return channel->id;
}
@@ -427,10 +513,6 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
terminal_close(chan->term, msg);
}
- if (chan->is_rpc) {
- channel_process_exit(chan->id, status);
- }
-
if (chan->status_ptr) {
*chan->status_ptr = status;
}
diff --git a/src/nvim/channel.h b/src/nvim/channel.h
index 8ead6749a6..eaf0fd92d0 100644
--- a/src/nvim/channel.h
+++ b/src/nvim/channel.h
@@ -16,6 +16,15 @@ typedef enum {
kChannelStreamInternal
} ChannelStreamType;
+typedef enum {
+ kChannelPartStdin,
+ kChannelPartStdout,
+ kChannelPartStderr,
+ kChannelPartRpc,
+ kChannelPartAll
+} ChannelPart;
+
+
typedef struct {
Stream in;
Stream out;
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 6899474577..ba356f28b9 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -7322,6 +7322,45 @@ static void f_changenr(typval_T *argvars, typval_T *rettv, FunPtr fptr)
rettv->vval.v_number = curbuf->b_u_seq_cur;
}
+// "chanclose(id[, stream])" function
+static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
+{
+ rettv->v_type = VAR_NUMBER;
+ rettv->vval.v_number = 0;
+
+ if (check_restricted() || check_secure()) {
+ return;
+ }
+
+ if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING
+ && argvars[1].v_type != VAR_UNKNOWN)) {
+ EMSG(_(e_invarg));
+ return;
+ }
+
+ ChannelPart part = kChannelPartAll;
+ if (argvars[1].v_type == VAR_STRING) {
+ char *stream = (char *)argvars[1].vval.v_string;
+ if (!strcmp(stream, "stdin")) {
+ part = kChannelPartStdin;
+ } else if (!strcmp(stream, "stdout")) {
+ part = kChannelPartStdout;
+ } else if (!strcmp(stream, "stderr")) {
+ part = kChannelPartStderr;
+ } else if (!strcmp(stream, "rpc")) {
+ part = kChannelPartRpc;
+ } else {
+ EMSG2(_("Invalid channel stream \"%s\""), stream);
+ return;
+ }
+ }
+ const char *error;
+ rettv->vval.v_number = channel_close(argvars[0].vval.v_number, part, &error);
+ if (!rettv->vval.v_number) {
+ EMSG(error);
+ }
+}
+
/*
* "char2nr(string)" function
*/
@@ -11391,67 +11430,6 @@ static void f_items(typval_T *argvars, typval_T *rettv, FunPtr fptr)
dict_list(argvars, rettv, 2);
}
-// "jobclose(id[, stream])" function
-static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
-{
- rettv->v_type = VAR_NUMBER;
- rettv->vval.v_number = 0;
-
- if (check_restricted() || check_secure()) {
- return;
- }
-
- if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING
- && argvars[1].v_type != VAR_UNKNOWN)) {
- EMSG(_(e_invarg));
- return;
- }
-
- Channel *data = find_job(argvars[0].vval.v_number, true);
- if (!data) {
- return;
- }
-
- Process *proc = (Process *)&data->stream.proc;
-
- if (argvars[1].v_type == VAR_STRING) {
- char *stream = (char *)argvars[1].vval.v_string;
- if (!strcmp(stream, "stdin")) {
- if (data->is_rpc) {
- EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
- } else {
- process_close_in(proc);
- }
- } else if (!strcmp(stream, "stdout")) {
- if (data->is_rpc) {
- EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
- } else {
- process_close_out(proc);
- }
- } else if (!strcmp(stream, "stderr")) {
- process_close_err(proc);
- } else if (!strcmp(stream, "rpc")) {
- if (data->is_rpc) {
- channel_close(data->id);
- } else {
- EMSG(_("Invalid job stream: Not an rpc job"));
- }
- } else {
- EMSG2(_("Invalid job stream \"%s\""), stream);
- }
- } else {
- if (data->is_rpc) {
- channel_close(data->id);
- process_close_err(proc);
- } else {
- process_close_streams(proc);
- if (proc->type == kProcessTypePty) {
- pty_process_close_master(&data->stream.pty);
- }
- }
- }
-}
-
// "jobpid(id)" function
static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
@@ -13933,7 +13911,12 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr)
if (find_job(id, false)) {
f_jobstop(argvars, rettv, NULL);
} else {
- rettv->vval.v_number = channel_close(id);
+ const char *error;
+ rettv->vval.v_number = channel_close(argvars[0].vval.v_number,
+ kChannelPartRpc, &error);
+ if (!rettv->vval.v_number) {
+ EMSG(error);
+ }
}
}
diff --git a/src/nvim/eval.lua b/src/nvim/eval.lua
index 3150f26df6..bb03691fd4 100644
--- a/src/nvim/eval.lua
+++ b/src/nvim/eval.lua
@@ -55,6 +55,7 @@ return {
call={args={2, 3}},
ceil={args=1, func="float_op_wrapper", data="&ceil"},
changenr={},
+ chanclose={args={1, 2}},
char2nr={args={1, 2}},
cindent={args=1},
clearmatches={},
@@ -173,7 +174,7 @@ return {
islocked={args=1},
id={args=1},
items={args=1},
- jobclose={args={1, 2}},
+ jobclose={args={1, 2}, func="f_chanclose"},
jobpid={args=1},
jobresize={args=3},
jobsend={args=2},
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 8946f049e2..34be291aef 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -25,13 +25,6 @@
// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
#define KILL_TIMEOUT_MS 2000
-#define CLOSE_PROC_STREAM(proc, stream) \
- do { \
- if (!proc->stream.closed) { \
- stream_close(&proc->stream, NULL, NULL); \
- } \
- } while (0)
-
static bool process_is_tearing_down = false;
/// @returns zero on success, or negative error code
@@ -140,27 +133,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
pty_process_teardown(loop);
}
-// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
- process_close_in(proc);
- process_close_out(proc);
- process_close_err(proc);
-}
-
-void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, in);
-}
-
-void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, out);
-}
-
-void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
-{
- CLOSE_PROC_STREAM(proc, err);
+ stream_may_close(&proc->in);
+ stream_may_close(&proc->out);
+ stream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@@ -237,8 +214,9 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
switch (proc->type) {
case kProcessTypeUv:
// Close the process's stdin. If the process doesn't close its own
- // stdout/stderr, they will be closed when it exits (voluntarily or not).
- process_close_in(proc);
+ // stdout/stderr, they will be closed when it exits(possibly due to being
+ // terminated after a timeout)
+ stream_may_close(&proc->in);
ILOG("Sending SIGTERM to pid %d", proc->pid);
uv_kill(proc->pid, SIGTERM);
break;
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 7c865bfe1e..ba25b76ec7 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -92,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
}
}
+void stream_may_close(Stream *stream)
+{
+ if (!stream->closed) {
+ stream_close(stream, NULL, NULL);
+ }
+}
+
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
diff --git a/src/nvim/globals.h b/src/nvim/globals.h
index d21cfe7ab6..d1b0ad0ed3 100644
--- a/src/nvim/globals.h
+++ b/src/nvim/globals.h
@@ -1082,6 +1082,9 @@ EXTERN char_u e_jobspawn[] INIT(= N_(
EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty"));
EXTERN char_u e_stdiochan2[] INIT(= N_(
"E905: Couldn't open stdio channel: %s"));
+EXTERN char_u e_invstream[] INIT(= N_("E906: invalid stream for channel"));
+EXTERN char_u e_invstreamrpc[] INIT(= N_(
+ "E906: invalid stream for rpc channel, use 'rpc'"));
EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\""));
EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s"));
EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number"));
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 56af4fa791..32781cf4d9 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -54,6 +54,7 @@ void rpc_init(void)
void rpc_start(Channel *channel)
{
+ channel_incref(channel);
channel->is_rpc = true;
RpcState *rpc = &channel->rpc;
rpc->closed = false;
@@ -204,31 +205,6 @@ void rpc_unsubscribe(uint64_t id, char *event)
unsubscribe(channel, event);
}
-/// Closes a channel
-///
-/// @param id The channel id
-/// @return true if successful, false otherwise
-bool channel_close(uint64_t id)
-{
- Channel *channel;
-
- if (!(channel = find_rpc_channel(id))) {
- return false;
- }
-
- close_channel(channel);
- return true;
-}
-
-void channel_process_exit(uint64_t id, int status)
-{
- Channel *channel = pmap_get(uint64_t)(channels, id);
-
- // channel_decref(channel); remove??
- channel->rpc.closed = true;
-}
-
-// rstream.c:read_event() invokes this as stream->read_cb().
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
void *data, bool eof)
{
@@ -236,7 +212,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
channel_incref(channel);
if (eof) {
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
@@ -540,43 +516,25 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
-/// Close the channel streams/process and free the channel resources.
-/// TODO: move to channel.h
-static void close_channel(Channel *channel)
+
+/// Mark rpc state as closed, and release its reference to the channel.
+/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
+void rpc_close(Channel *channel)
{
if (channel->rpc.closed) {
return;
}
channel->rpc.closed = true;
+ channel_decref(channel);
- switch (channel->streamtype) {
- case kChannelStreamSocket:
- stream_close(&channel->stream.socket, NULL, NULL);
- break;
- case kChannelStreamProc:
- // Only close the rpc channel part,
- // there could be an error message on the stderr stream
- process_close_in(&channel->stream.proc);
- process_close_out(&channel->stream.proc);
- break;
- case kChannelStreamStdio:
- stream_close(&channel->stream.stdio.in, NULL, NULL);
- stream_close(&channel->stream.stdio.out, NULL, NULL);
- multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
- return;
- case kChannelStreamInternal:
- // nothing to free.
- break;
+ if (channel->streamtype == kChannelStreamStdio) {
+ multiqueue_put(main_loop.fast_events, exit_event, 0);
}
-
- channel_decref(channel);
}
static void exit_event(void **argv)
{
- channel_decref(argv[0]);
-
if (!exiting) {
mch_exit(0);
}
@@ -642,7 +600,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
frame->result = STRING_OBJ(cstr_to_string(msg));
}
- close_channel(channel);
+ channel_close(channel->id, kChannelPartRpc, NULL);
}
static WBuffer *serialize_request(uint64_t channel_id,
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index ec2ebb9d19..e32c6e05d2 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -677,10 +677,6 @@ static void shell_write_cb(Stream *stream, void *data, int status)
msg_schedule_emsgf(_("E5677: Error writing input to shell-command: %s"),
uv_err_name(status));
}
- if (stream->closed) { // Process may have exited before this write.
- WLOG("stream was already closed");
- return;
- }
stream_close(stream, NULL, NULL);
}