diff options
author | Björn Linse <bjorn.linse@gmail.com> | 2017-08-27 11:59:33 +0200 |
---|---|---|
committer | Björn Linse <bjorn.linse@gmail.com> | 2017-11-24 14:50:00 +0100 |
commit | 5215e3205a07b85e4e4cf1f8a8ca6be2b9556459 (patch) | |
tree | 515ccc2d01935465edd71f1d66f526be7dc66cfc | |
parent | 3717e2157f2d45ce23dbe4ac03085fea2d956dc4 (diff) | |
download | rneovim-5215e3205a07b85e4e4cf1f8a8ca6be2b9556459.tar.gz rneovim-5215e3205a07b85e4e4cf1f8a8ca6be2b9556459.tar.bz2 rneovim-5215e3205a07b85e4e4cf1f8a8ca6be2b9556459.zip |
channels: refactor
-rw-r--r-- | src/nvim/api/ui.c | 2 | ||||
-rw-r--r-- | src/nvim/api/vim.c | 4 | ||||
-rw-r--r-- | src/nvim/channel.c | 61 | ||||
-rw-r--r-- | src/nvim/channel.h | 120 | ||||
-rw-r--r-- | src/nvim/eval.c | 314 | ||||
-rw-r--r-- | src/nvim/eval/typval.c | 24 | ||||
-rw-r--r-- | src/nvim/event/libuv_process.c | 12 | ||||
-rw-r--r-- | src/nvim/event/process.c | 77 | ||||
-rw-r--r-- | src/nvim/event/process.h | 15 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 2 | ||||
-rw-r--r-- | src/nvim/globals.h | 7 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 388 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.h | 2 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel_defs.h | 36 | ||||
-rw-r--r-- | src/nvim/os/pty_process_unix.c | 10 | ||||
-rw-r--r-- | src/nvim/os/pty_process_win.c | 12 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 26 |
17 files changed, 580 insertions, 532 deletions
diff --git a/src/nvim/api/ui.c b/src/nvim/api/ui.c index afbee09c1c..a9eaccfac5 100644 --- a/src/nvim/api/ui.c +++ b/src/nvim/api/ui.c @@ -252,7 +252,7 @@ static void remote_ui_flush(UI *ui) { UIData *data = ui->data; if (data->buffer.size > 0) { - channel_send_event(data->channel_id, "redraw", data->buffer); + rpc_send_event(data->channel_id, "redraw", data->buffer); data->buffer = (Array)ARRAY_DICT_INIT; } } diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index d2b0e329c9..f4ccf07bec 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -721,7 +721,7 @@ void nvim_subscribe(uint64_t channel_id, String event) char e[METHOD_MAXLEN + 1]; memcpy(e, event.data, length); e[length] = NUL; - channel_subscribe(channel_id, e); + rpc_subscribe(channel_id, e); } /// Unsubscribes to event broadcasts @@ -737,7 +737,7 @@ void nvim_unsubscribe(uint64_t channel_id, String event) char e[METHOD_MAXLEN + 1]; memcpy(e, event.data, length); e[length] = NUL; - channel_unsubscribe(channel_id, e); + rpc_unsubscribe(channel_id, e); } Integer nvim_get_color_by_name(String name) diff --git a/src/nvim/channel.c b/src/nvim/channel.c new file mode 100644 index 0000000000..e61ec9c19b --- /dev/null +++ b/src/nvim/channel.c @@ -0,0 +1,61 @@ +// This is an open source non-commercial project. Dear PVS-Studio, please check +// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + +#include "nvim/api/ui.h" +#include "nvim/channel.h" +#include "nvim/msgpack_rpc/channel.h" + +PMap(uint64_t) *channels = NULL; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "channel.c.generated.h" +#endif +/// Teardown the module +void channel_teardown(void) +{ + if (!channels) { + return; + } + + Channel *channel; + + map_foreach_value(channels, channel, { + (void)channel; // close_channel(channel); + }); +} + +/// Initializes the module +void channel_init(void) +{ + channels = pmap_new(uint64_t)(); + rpc_init(); + remote_ui_init(); +} + +void channel_incref(Channel *channel) +{ + channel->refcount++; +} + +void channel_decref(Channel *channel) +{ + if (!(--channel->refcount)) { + multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel); + } +} + +static void free_channel_event(void **argv) +{ + Channel *channel = argv[0]; + if (channel->is_rpc) { + rpc_free(channel); + } + + callback_free(&channel->on_stdout); + callback_free(&channel->on_stderr); + callback_free(&channel->on_exit); + + pmap_del(uint64_t)(channels, channel->id); + multiqueue_free(channel->events); + xfree(channel); +} diff --git a/src/nvim/channel.h b/src/nvim/channel.h new file mode 100644 index 0000000000..b48f508722 --- /dev/null +++ b/src/nvim/channel.h @@ -0,0 +1,120 @@ +#ifndef NVIM_CHANNEL_H +#define NVIM_CHANNEL_H + +#include "nvim/main.h" +#include "nvim/event/socket.h" +#include "nvim/event/process.h" +#include "nvim/os/pty_process.h" +#include "nvim/event/libuv_process.h" +#include "nvim/eval/typval.h" +#include "nvim/msgpack_rpc/channel_defs.h" + +typedef enum { + kChannelStreamProc, + kChannelStreamSocket, + kChannelStreamStdio, + kChannelStreamInternal +} ChannelStreamType; + +typedef struct { + Stream in; + Stream out; +} StdioPair; + +// typedef struct { +// Callback on_out; +// Callback on_close; +// Garray buffer; +// bool buffering; +// } CallbackReader + +#define CallbackReader Callback + +struct Channel { + uint64_t id; + size_t refcount; + MultiQueue *events; + + ChannelStreamType streamtype; + union { + Process proc; + LibuvProcess uv; + PtyProcess pty; + Stream socket; + StdioPair stdio; + } stream; + + bool is_rpc; + RpcState rpc; + Terminal *term; + + CallbackReader on_stdout; + CallbackReader on_stderr; + Callback on_exit; + + varnumber_T *status_ptr; // TODO: refactor? +}; + +EXTERN PMap(uint64_t) *channels; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "channel.h.generated.h" +#endif + +static inline Channel *channel_alloc(ChannelStreamType type) +{ + Channel *chan = xcalloc(1, sizeof(*chan)); + chan->id = next_chan_id++; + chan->events = multiqueue_new_child(main_loop.events); + chan->refcount = 1; + chan->streamtype = type; + pmap_put(uint64_t)(channels, chan->id, chan); + return chan; +} + +/// @returns Channel with the id or NULL if not found +static inline Channel *find_channel(uint64_t id) +{ + return pmap_get(uint64_t)(channels, id); +} + +static inline Stream *channel_instream(Channel *chan) + FUNC_ATTR_NONNULL_ALL +{ + switch (chan->streamtype) { + case kChannelStreamProc: + return &chan->stream.proc.in; + + case kChannelStreamSocket: + return &chan->stream.socket; + + case kChannelStreamStdio: + return &chan->stream.stdio.in; + + case kChannelStreamInternal: + abort(); + } + abort(); +} + +static inline Stream *channel_outstream(Channel *chan) + FUNC_ATTR_NONNULL_ALL +{ + switch (chan->streamtype) { + case kChannelStreamProc: + return &chan->stream.proc.out; + + case kChannelStreamSocket: + return &chan->stream.socket; + + case kChannelStreamStdio: + return &chan->stream.stdio.out; + + case kChannelStreamInternal: + abort(); + } + abort(); +} + + +#endif // NVIM_CHANNEL_H diff --git a/src/nvim/eval.c b/src/nvim/eval.c index b8bae3e293..013cfce78d 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -24,6 +24,7 @@ #endif #include "nvim/eval.h" #include "nvim/buffer.h" +#include "nvim/channel.h" #include "nvim/charset.h" #include "nvim/cursor.h" #include "nvim/diff.h" @@ -437,29 +438,12 @@ static ScopeDictDictItem vimvars_var; #define vimvarht vimvardict.dv_hashtab typedef struct { - union { - LibuvProcess uv; - PtyProcess pty; - } proc; - Stream in, out, err; // Initialized in common_job_start(). - Terminal *term; - bool stopped; - bool exited; - bool rpc; - int refcount; - Callback on_stdout, on_stderr, on_exit; - varnumber_T *status_ptr; - uint64_t id; - MultiQueue *events; -} TerminalJobData; - -typedef struct { - TerminalJobData *data; + Channel *data; Callback *callback; const char *type; list_T *received; int status; -} JobEvent; +} ChannelEvent; typedef struct { TimeWatcher tw; @@ -513,7 +497,6 @@ typedef enum { #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -static PMap(uint64_t) *jobs = NULL; static uint64_t last_timer_id = 0; static PMap(uint64_t) *timers = NULL; @@ -556,7 +539,6 @@ void eval_init(void) { vimvars[VV_VERSION].vv_nr = VIM_VERSION_100; - jobs = pmap_new(uint64_t)(); timers = pmap_new(uint64_t)(); struct vimvar *p; @@ -5141,8 +5123,8 @@ bool garbage_collect(bool testing) // Jobs { - TerminalJobData *data; - map_foreach_value(jobs, data, { + Channel *data; + map_foreach_value(channels, data, { set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); @@ -11433,24 +11415,23 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; if (argvars[1].v_type == VAR_STRING) { char *stream = (char *)argvars[1].vval.v_string; if (!strcmp(stream, "stdin")) { - if (data->rpc) { + if (data->is_rpc) { EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); } else { process_close_in(proc); } } else if (!strcmp(stream, "stdout")) { - if (data->rpc) { + if (data->is_rpc) { EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); } else { process_close_out(proc); @@ -11458,7 +11439,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) } else if (!strcmp(stream, "stderr")) { process_close_err(proc); } else if (!strcmp(stream, "rpc")) { - if (data->rpc) { + if (data->is_rpc) { channel_close(data->id); } else { EMSG(_("Invalid job stream: Not an rpc job")); @@ -11467,13 +11448,13 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) EMSG2(_("Invalid job stream \"%s\""), stream); } } else { - if (data->rpc) { + if (data->is_rpc) { channel_close(data->id); process_close_err(proc); } else { process_close_streams(proc); if (proc->type == kProcessTypePty) { - pty_process_close_master(&data->proc.pty); + pty_process_close_master(&data->stream.pty); } } } @@ -11494,13 +11475,12 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; rettv->vval.v_number = proc->pid; } @@ -11521,18 +11501,19 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) return; } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_channel(argvars[0].vval.v_number); if (!data) { - EMSG(_(e_invjob)); + EMSG(_(e_invchan)); return; } - if (((Process *)&data->proc)->in->closed) { + Stream *in = channel_instream(data); + if (in->closed) { EMSG(_("Can't send data to the job: stdin is closed")); return; } - if (data->rpc) { + if (data->is_rpc) { EMSG(_("Can't send raw data to rpc channel")); return; } @@ -11546,7 +11527,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) } WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); - rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf); + rettv->vval.v_number = wstream_write(in, buf); } // "jobresize(job, width, height)" function @@ -11567,19 +11548,17 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - if (data->proc.uv.process.type != kProcessTypePty) { - EMSG(_(e_jobnotpty)); + if (data->stream.proc.type != kProcessTypePty) { + EMSG(_(e_channotpty)); return; } - pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, - argvars[2].vval.v_number); + pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, argvars[2].vval.v_number); rettv->vval.v_number = 1; } @@ -11697,31 +11676,25 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) } } - TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, + Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, pty, rpc, detach, cwd); - Process *proc = (Process *)&data->proc; if (pty) { + PtyProcess *pty = &data->stream.pty; uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); if (width > 0) { - data->proc.pty.width = width; + pty->width = width; } uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); if (height > 0) { - data->proc.pty.height = height; + pty->height = height; } char *term = tv_dict_get_string(job_opts, "TERM", true); if (term) { - data->proc.pty.term_name = term; + pty->term_name = term; } } - if (!rpc && on_stdout.type == kCallbackNone) { - proc->out = NULL; - } - if (on_stderr.type == kCallbackNone) { - proc->err = NULL; - } common_job_start(data, rettv); } @@ -11742,14 +11715,12 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) } - TerminalJobData *data = find_job(argvars[0].vval.v_number); + Channel *data = find_job(argvars[0].vval.v_number, true); if (!data) { - EMSG(_(e_invjob)); return; } - process_stop((Process *)&data->proc); - data->stopped = true; + process_stop((Process *)&data->stream.proc); rettv->vval.v_number = 1; } @@ -11778,9 +11749,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { tv_list_append_number(rv, -3); } else { // append the list item and set the status pointer so we'll collect the @@ -11802,16 +11773,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) } for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (remaining == 0) { // timed out break; } if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } - int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); + int status = process_wait((Process *)&data->stream.proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11832,9 +11803,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) } for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = find_job(arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } // remove the status pointer because the list may be freed before the @@ -11844,9 +11815,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) // restore the parent queue for any jobs still alive for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { - TerminalJobData *data = NULL; + Channel *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number, false))) { continue; } // restore the parent queue for the job @@ -13803,9 +13774,8 @@ static void f_rpcnotify(typval_T *argvars, typval_T *rettv, FunPtr fptr) ADD(args, vim_to_object(tv)); } - if (!channel_send_event((uint64_t)argvars[0].vval.v_number, - tv_get_string(&argvars[1]), - args)) { + if (!rpc_send_event((uint64_t)argvars[0].vval.v_number, + tv_get_string(&argvars[1]), args)) { EMSG2(_(e_invarg2), "Channel doesn't exist"); return; } @@ -13870,10 +13840,8 @@ static void f_rpcrequest(typval_T *argvars, typval_T *rettv, FunPtr fptr) Error err = ERROR_INIT; - Object result = channel_send_call((uint64_t)argvars[0].vval.v_number, - tv_get_string(&argvars[1]), - args, - &err); + Object result = rpc_send_call((uint64_t)argvars[0].vval.v_number, + tv_get_string(&argvars[1]), args, &err); if (l_provider_call_nesting) { current_SID = save_current_SID; @@ -13954,7 +13922,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) // The last item of argv must be NULL argv[i] = NULL; - TerminalJobData *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, + Channel *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, CALLBACK_NONE, false, true, false, NULL); common_job_start(data, rettv); @@ -13977,10 +13945,11 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) } // if called with a job, stop it, else closes the channel - if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { + uint64_t id = argvars[0].vval.v_number; + if (find_job(id, false)) { // FIXME f_jobstop(argvars, rettv, NULL); } else { - rettv->vval.v_number = channel_close(argvars[0].vval.v_number); + rettv->vval.v_number = channel_close(id); } } @@ -16689,11 +16658,11 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) } uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); - TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, + Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, true, false, false, cwd); - data->proc.pty.width = term_width; - data->proc.pty.height = curwin->w_height; - data->proc.pty.term_name = xstrdup("xterm-256color"); + data->stream.pty.width = term_width; + data->stream.pty.height = curwin->w_height; + data->stream.pty.term_name = xstrdup("xterm-256color"); if (!common_job_start(data, rettv)) { return; } @@ -16705,7 +16674,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) topts.resize_cb = term_resize; topts.close_cb = term_close; - int pid = data->proc.pty.process.pid; + int pid = data->stream.pty.process.pid; char buf[1024]; // format the title with the pid to conform with the term:// URI @@ -16725,7 +16694,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) Terminal *term = terminal_open(topts); data->term = term; - data->refcount++; + channel_incref(data); return; } @@ -16760,30 +16729,6 @@ bool callback_from_typval(Callback *const callback, typval_T *const arg) return true; } -/// Unref/free callback -void callback_free(Callback *const callback) - FUNC_ATTR_NONNULL_ALL -{ - switch (callback->type) { - case kCallbackFuncref: { - func_unref(callback->data.funcref); - xfree(callback->data.funcref); - break; - } - case kCallbackPartial: { - partial_unref(callback->data.partial); - break; - } - case kCallbackNone: { - break; - } - default: { - abort(); - } - } - callback->type = kCallbackNone; -} - bool callback_call(Callback *const callback, const int argcount_in, typval_T *const argvars_in, typval_T *const rettv) FUNC_ATTR_NONNULL_ALL @@ -22402,7 +22347,7 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, return ret; } -static inline TerminalJobData *common_job_init(char **argv, +static inline Channel *common_job_init(char **argv, Callback on_stdout, Callback on_stderr, Callback on_exit, @@ -22411,25 +22356,18 @@ static inline TerminalJobData *common_job_init(char **argv, bool detach, const char *cwd) { - TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); - data->stopped = false; + Channel *data = channel_alloc(kChannelStreamProc); data->on_stdout = on_stdout; data->on_stderr = on_stderr; data->on_exit = on_exit; - data->events = multiqueue_new_child(main_loop.events); - data->rpc = rpc; + data->is_rpc = rpc; if (pty) { - data->proc.pty = pty_process_init(&main_loop, data); + data->stream.pty = pty_process_init(&main_loop, data); } else { - data->proc.uv = libuv_process_init(&main_loop, data); + data->stream.uv = libuv_process_init(&main_loop, data); } - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; proc->argv = argv; - proc->in = &data->in; - proc->out = &data->out; - if (!pty) { - proc->err = &data->err; - } proc->cb = eval_job_process_exit_cb; proc->events = data->events; proc->detach = detach; @@ -22456,80 +22394,66 @@ static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, return false; } -static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) +static inline bool common_job_start(Channel *data, typval_T *rettv) { - Process *proc = (Process *)&data->proc; + Process *proc = (Process *)&data->stream.proc; if (proc->type == kProcessTypePty && proc->detach) { EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); - xfree(data->proc.pty.term_name); + xfree(data->stream.pty.term_name); shell_free_argv(proc->argv); - free_term_job_data_event((void **)&data); + channel_decref(data); return false; } - data->id = next_chan_id++; - pmap_put(uint64_t)(jobs, data->id, data); - data->refcount++; char *cmd = xstrdup(proc->argv[0]); - int status = process_spawn(proc); + bool has_out, has_err; + if (proc->type == kProcessTypePty) { + has_out = true; + has_err = false; + } else { + has_out = data->is_rpc || data->on_stdout.type != kCallbackNone; + has_err = data->on_stderr.type != kCallbackNone; + } + int status = process_spawn(proc, true, has_out, has_err); if (status) { EMSG3(_(e_jobspawn), os_strerror(status), cmd); xfree(cmd); if (proc->type == kProcessTypePty) { - xfree(data->proc.pty.term_name); + xfree(data->stream.pty.term_name); } rettv->vval.v_number = proc->status; - term_job_data_decref(data); + channel_decref(data); return false; } xfree(cmd); - if (data->rpc) { - // the rpc channel takes over the in and out streams - channel_from_process(proc, data->id); + if (data->is_rpc) { + // the rpc takes over the in and out streams + rpc_start(data); } else { - wstream_init(proc->in, 0); - if (proc->out) { - rstream_init(proc->out, 0); - rstream_start(proc->out, on_job_stdout, data); + wstream_init(&proc->in, 0); + if (has_out) { + rstream_init(&proc->out, 0); + rstream_start(&proc->out, on_job_stdout, data); } } - if (proc->err) { - rstream_init(proc->err, 0); - rstream_start(proc->err, on_job_stderr, data); + if (has_err) { + rstream_init(&proc->err, 0); + rstream_start(&proc->err, on_job_stderr, data); } rettv->vval.v_number = data->id; return true; } -static inline void free_term_job_data_event(void **argv) -{ - TerminalJobData *data = argv[0]; - callback_free(&data->on_stdout); - callback_free(&data->on_stderr); - callback_free(&data->on_exit); - - multiqueue_free(data->events); - pmap_del(uint64_t)(jobs, data->id); - xfree(data); -} - -static inline void free_term_job_data(TerminalJobData *data) -{ - // data->queue may still be used after this function returns(process_wait), so - // only free in the next event loop iteration - multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data); -} - // vimscript job callbacks must be executed on Nvim main loop -static inline void process_job_event(TerminalJobData *data, Callback *callback, +static inline void process_job_event(Channel *data, Callback *callback, const char *type, char *buf, size_t count, int status) { - JobEvent event_data; + ChannelEvent event_data; event_data.received = NULL; if (buf) { event_data.received = tv_list_alloc(); @@ -22566,18 +22490,18 @@ static inline void process_job_event(TerminalJobData *data, Callback *callback, static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, void *job, bool eof) { - TerminalJobData *data = job; + Channel *data = job; on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); } static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *job, bool eof) { - TerminalJobData *data = job; + Channel *data = job; on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); } -static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, +static void on_job_output(Stream *stream, Channel *data, RBuffer *buf, size_t count, bool eof, Callback *callback, const char *type) { @@ -22604,14 +22528,14 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, static void eval_job_process_exit_cb(Process *proc, int status, void *d) { - TerminalJobData *data = d; - if (data->term && !data->exited) { - data->exited = true; + Channel *data = d; + if (data->term && !data->stream.proc.exited) { + data->stream.proc.exited = true; char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); terminal_close(data->term, msg); } - if (data->rpc) { + if (data->is_rpc) { channel_process_exit(data->id, status); } @@ -22621,58 +22545,51 @@ static void eval_job_process_exit_cb(Process *proc, int status, void *d) process_job_event(data, &data->on_exit, "exit", NULL, 0, status); - term_job_data_decref(data); + channel_decref(data); } static void term_write(char *buf, size_t size, void *d) { - TerminalJobData *job = d; - if (job->in.closed) { + Channel *job = d; + if (job->stream.proc.in.closed) { // If the backing stream was closed abruptly, there may be write events // ahead of the terminal close event. Just ignore the writes. ILOG("write failed: stream is closed"); return; } WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); - wstream_write(&job->in, wbuf); + wstream_write(&job->stream.proc.in, wbuf); } static void term_resize(uint16_t width, uint16_t height, void *d) { - TerminalJobData *data = d; - pty_process_resize(&data->proc.pty, width, height); + Channel *data = d; + pty_process_resize(&data->stream.pty, width, height); } static inline void term_delayed_free(void **argv) { - TerminalJobData *j = argv[0]; - if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) { + Channel *j = argv[0]; + if (j->stream.proc.in.pending_reqs || j->stream.proc.out.pending_reqs) { multiqueue_put(j->events, term_delayed_free, 1, j); return; } terminal_destroy(j->term); - term_job_data_decref(j); + channel_decref(j); } static void term_close(void *d) { - TerminalJobData *data = d; - if (!data->exited) { - data->exited = true; - process_stop((Process *)&data->proc); + Channel *data = d; + if (!data->stream.proc.exited) { + data->stream.proc.exited = true; + process_stop((Process *)&data->stream.proc); } multiqueue_put(data->events, term_delayed_free, 1, data); } -static void term_job_data_decref(TerminalJobData *data) -{ - if (!(--data->refcount)) { - free_term_job_data(data); - } -} - -static void on_job_event(JobEvent *ev) +static void on_job_event(ChannelEvent *ev) { if (!ev->callback) { return; @@ -22704,15 +22621,24 @@ static void on_job_event(JobEvent *ev) tv_clear(&rettv); } -static TerminalJobData *find_job(uint64_t id) +static Channel *find_job(uint64_t id, bool show_error) { - TerminalJobData *data = pmap_get(uint64_t)(jobs, id); - if (!data || data->stopped) { + Channel *data = find_channel(id); + if (!data || data->streamtype != kChannelStreamProc + || process_is_stopped(&data->stream.proc)) { + if (show_error) { + if (data && data->streamtype != kChannelStreamProc) { + EMSG(_(e_invchanjob)); + } else { + EMSG(_(e_invchan)); + } + } return NULL; } return data; } + static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) { if (check_restricted() || check_secure()) { diff --git a/src/nvim/eval/typval.c b/src/nvim/eval/typval.c index 262ea922ef..99382d2a24 100644 --- a/src/nvim/eval/typval.c +++ b/src/nvim/eval/typval.c @@ -847,6 +847,30 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2) return false; } +/// Unref/free callback +void callback_free(Callback *const callback) + FUNC_ATTR_NONNULL_ALL +{ + switch (callback->type) { + case kCallbackFuncref: { + func_unref(callback->data.funcref); + xfree(callback->data.funcref); + break; + } + case kCallbackPartial: { + partial_unref(callback->data.partial); + break; + } + case kCallbackNone: { + break; + } + default: { + abort(); + } + } + callback->type = kCallbackNone; +} + /// Remove watcher from a dictionary /// /// @param dict Dictionary to remove watcher from. diff --git a/src/nvim/event/libuv_process.c b/src/nvim/event/libuv_process.c index 758b35796e..c101cb1bb9 100644 --- a/src/nvim/event/libuv_process.c +++ b/src/nvim/event/libuv_process.c @@ -46,22 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc) uvproc->uvstdio[2].flags = UV_IGNORE; uvproc->uv.data = proc; - if (proc->in) { + if (!proc->in.closed) { uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t, - &proc->in->uv.pipe); + &proc->in.uv.pipe); } - if (proc->out) { + if (!proc->out.closed) { uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t, - &proc->out->uv.pipe); + &proc->out.uv.pipe); } - if (proc->err) { + if (!proc->err.closed) { uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t, - &proc->err->uv.pipe); + &proc->err.uv.pipe); } int status; diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 41e793500a..1bbe4d9cad 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -27,26 +27,33 @@ #define CLOSE_PROC_STREAM(proc, stream) \ do { \ - if (proc->stream && !proc->stream->closed) { \ - stream_close(proc->stream, NULL, NULL); \ + if (!proc->stream.closed) { \ + stream_close(&proc->stream, NULL, NULL); \ } \ } while (0) static bool process_is_tearing_down = false; /// @returns zero on success, or negative error code -int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL +int process_spawn(Process *proc, bool in, bool out, bool err) + FUNC_ATTR_NONNULL_ALL { - if (proc->in) { - uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0); + if (in) { + uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0); + } else { + proc->in.closed = true; } - if (proc->out) { - uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0); + if (out) { + uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); + } else { + proc->out.closed = true; } - if (proc->err) { - uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0); + if (err) { + uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); + } else { + proc->err.closed = true; } int status; @@ -62,14 +69,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (status) { - if (proc->in) { - uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL); + if (in) { + uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); } - if (proc->out) { - uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL); + if (out) { + uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); } - if (proc->err) { - uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL); + if (err) { + uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); } if (proc->type == kProcessTypeUv) { @@ -82,30 +89,30 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL return status; } - if (proc->in) { - stream_init(NULL, proc->in, -1, - STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe)); - proc->in->events = proc->events; - proc->in->internal_data = proc; - proc->in->internal_close_cb = on_process_stream_close; + if (in) { + stream_init(NULL, &proc->in, -1, + STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe)); + proc->in.events = proc->events; + proc->in.internal_data = proc; + proc->in.internal_close_cb = on_process_stream_close; proc->refcount++; } - if (proc->out) { - stream_init(NULL, proc->out, -1, - STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe)); - proc->out->events = proc->events; - proc->out->internal_data = proc; - proc->out->internal_close_cb = on_process_stream_close; + if (out) { + stream_init(NULL, &proc->out, -1, + STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe)); + proc->out.events = proc->events; + proc->out.internal_data = proc; + proc->out.internal_close_cb = on_process_stream_close; proc->refcount++; } - if (proc->err) { - stream_init(NULL, proc->err, -1, - STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe)); - proc->err->events = proc->events; - proc->err->internal_data = proc; - proc->err->internal_close_cb = on_process_stream_close; + if (err) { + stream_init(NULL, &proc->err, -1, + STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe)); + proc->err.events = proc->events; + proc->err.internal_data = proc; + proc->err.internal_close_cb = on_process_stream_close; proc->refcount++; } @@ -395,8 +402,8 @@ static void process_close_handles(void **argv) { Process *proc = argv[0]; - flush_stream(proc, proc->out); - flush_stream(proc, proc->err); + flush_stream(proc, &proc->out); + flush_stream(proc, &proc->err); process_close_streams(proc); process_close(proc); diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c00e8e7ec..0897563c83 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -23,13 +23,15 @@ struct process { uint64_t stopped_time; const char *cwd; char **argv; - Stream *in, *out, *err; + Stream in, out, err; process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; + bool exited; // TODO: redundant bool closed, detach; MultiQueue *events; }; + static inline Process process_init(Loop *loop, ProcessType type, void *data) { return (Process) { @@ -43,9 +45,9 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .stopped_time = 0, .cwd = NULL, .argv = NULL, - .in = NULL, - .out = NULL, - .err = NULL, + .in = { .closed = false }, + .out = { .closed = false }, + .err = { .closed = false }, .cb = NULL, .closed = false, .internal_close_cb = NULL, @@ -54,6 +56,11 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) }; } +static inline bool process_is_stopped(Process *proc) +{ + return proc->stopped_time != 0; +} + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/process.h.generated.h" #endif diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index d27497e4a4..70a708ff4d 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -33,6 +33,7 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status); typedef void (*stream_close_cb)(Stream *stream, void *data); struct stream { + bool closed; union { uv_pipe_t pipe; uv_tcp_t tcp; @@ -52,7 +53,6 @@ struct stream { size_t maxmem; size_t pending_reqs; size_t num_bytes; - bool closed; MultiQueue *events; }; diff --git a/src/nvim/globals.h b/src/nvim/globals.h index 1ff0f7eb89..c639dbcf83 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -1074,11 +1074,12 @@ EXTERN char_u e_invexpr2[] INIT(= N_("E15: Invalid expression: %s")); EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range")); EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command")); EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory")); -EXTERN char_u e_invjob[] INIT(= N_("E900: Invalid job id")); +EXTERN char_u e_invchan[] INIT(= N_("E900: Invalid channel id")); +EXTERN char_u e_invchanjob[] INIT(= N_("E900: Invalid channel id: not a job")); EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full")); EXTERN char_u e_jobspawn[] INIT(= N_( - "E903: Process failed to start: %s: \"%s\"")); -EXTERN char_u e_jobnotpty[] INIT(= N_("E904: Job is not connected to a pty")); + "E903: Process failed to start: %s: \"%s\"")); +EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty")); EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\"")); EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s")); EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number")); diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 18f6334780..42b1f63830 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -11,6 +11,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/api/ui.h" +#include "nvim/channel.h" #include "nvim/msgpack_rpc/channel.h" #include "nvim/msgpack_rpc/server.h" #include "nvim/event/loop.h" @@ -40,47 +41,6 @@ #define log_server_msg(...) #endif -typedef enum { - kChannelTypeSocket, - kChannelTypeProc, - kChannelTypeStdio, - kChannelTypeInternal -} ChannelType; - -typedef struct { - uint64_t request_id; - bool returned, errored; - Object result; -} ChannelCallFrame; - -typedef struct { - uint64_t id; - size_t refcount; - PMap(cstr_t) *subscribed_events; - bool closed; - ChannelType type; - msgpack_unpacker *unpacker; - union { - Stream stream; // bidirectional (socket) - Process *proc; - struct { - Stream in; - Stream out; - } std; - } data; - uint64_t next_request_id; - kvec_t(ChannelCallFrame *) call_stack; - MultiQueue *events; -} Channel; - -typedef struct { - Channel *channel; - MsgpackRpcRequestHandler handler; - Array args; - uint64_t request_id; -} RequestEvent; - -static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; @@ -88,50 +48,32 @@ static msgpack_sbuffer out_buffer; # include "msgpack_rpc/channel.c.generated.h" #endif -/// Initializes the module -void channel_init(void) +void rpc_init(void) { ch_before_blocking_events = multiqueue_new_child(main_loop.events); - channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); - remote_ui_init(); } -/// Teardown the module -void channel_teardown(void) -{ - if (!channels) { - return; - } - - Channel *channel; - map_foreach_value(channels, channel, { - close_channel(channel); - }); -} - -/// Creates an API channel by starting a process and connecting to its -/// stdin/stdout. stderr is handled by the job infrastructure. -/// -/// @param argv The argument vector for the process. [consumed] -/// @return The channel id (> 0), on success. -/// 0, on error. -uint64_t channel_from_process(Process *proc, uint64_t id) +void rpc_start(Channel *channel) { - Channel *channel = register_channel(kChannelTypeProc, id, proc->events); - incref(channel); // process channels are only closed by the exit_cb - channel->data.proc = proc; + channel->is_rpc = true; + RpcState *rpc = &channel->rpc; + rpc->closed = false; + rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + rpc->subscribed_events = pmap_new(cstr_t)(); + rpc->next_request_id = 1; + kv_init(rpc->call_stack); - wstream_init(proc->in, 0); - rstream_init(proc->out, 0); - rstream_start(proc->out, receive_msgpack, channel); + Stream *in = channel_instream(channel); + Stream *out = channel_outstream(channel); - DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, - proc->out); + DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); - return channel->id; + wstream_init(in, 0); + rstream_init(out, CHANNEL_BUFFER_SIZE); + rstream_start(out, receive_msgpack, channel); } /// Creates an API channel from a tcp/pipe socket connection @@ -139,19 +81,15 @@ uint64_t channel_from_process(Process *proc, uint64_t id) /// @param watcher The SocketWatcher ready to accept the connection void channel_from_connection(SocketWatcher *watcher) { - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - socket_watcher_accept(watcher, &channel->data.stream); - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); - - DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, - &channel->data.stream); + Channel *channel = channel_alloc(kChannelStreamSocket); + socket_watcher_accept(watcher, &channel->stream.socket); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + rpc_start(channel); } +/// TODO: move to eval.c, also support bytes uint64_t channel_connect(bool tcp, const char *address, int timeout, const char **error) { @@ -165,34 +103,40 @@ uint64_t channel_connect(bool tcp, const char *address, xfree(path); } - Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); - if (!socket_connect(&main_loop, &channel->data.stream, + Channel *channel = channel_alloc(kChannelStreamSocket); + if (!socket_connect(&main_loop, &channel->stream.socket, tcp, address, timeout, error)) { - decref(channel); + channel_decref(channel); return 0; } - incref(channel); // close channel only after the stream is closed - channel->data.stream.internal_close_cb = close_cb; - channel->data.stream.internal_data = channel; - wstream_init(&channel->data.stream, 0); - rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, receive_msgpack, channel); + channel_incref(channel); // close channel only after the stream is closed + channel->stream.socket.internal_close_cb = close_cb; + channel->stream.socket.internal_data = channel; + rpc_start(channel); return channel->id; } +static Channel *find_rpc_channel(uint64_t id) +{ + Channel *chan = find_channel(id); + if (!chan || !chan->is_rpc || chan->rpc.closed) { + return NULL; + } + return chan; +} + /// Publishes an event to a channel. /// /// @param id Channel id. 0 means "broadcast to all subscribed channels" /// @param name Event name (application-defined) /// @param args Array of event arguments /// @return True if the event was sent successfully, false otherwise. -bool channel_send_event(uint64_t id, const char *name, Array args) +bool rpc_send_event(uint64_t id, const char *name, Array args) { Channel *channel = NULL; - if (id && (!(channel = pmap_get(uint64_t)(channels, id)) - || channel->closed)) { + if (id && (!(channel = find_rpc_channel(id)))) { api_free_array(args); return false; } @@ -213,29 +157,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args) /// @param args Array with method arguments /// @param[out] error True if the return value is an error /// @return Whatever the remote method returned -Object channel_send_call(uint64_t id, - const char *method_name, - Array args, - Error *err) +Object rpc_send_call(uint64_t id, + const char *method_name, + Array args, + Error *err) { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); api_free_array(args); return NIL; } - incref(channel); - uint64_t request_id = channel->next_request_id++; + channel_incref(channel); + RpcState *rpc = &channel->rpc; + uint64_t request_id = rpc->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); // Push the frame ChannelCallFrame frame = { request_id, false, false, NIL }; - kv_push(channel->call_stack, &frame); + kv_push(rpc->call_stack, &frame); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); - (void)kv_pop(channel->call_stack); + (void)kv_pop(rpc->call_stack); if (frame.errored) { if (frame.result.type == kObjectTypeString) { @@ -260,7 +205,7 @@ Object channel_send_call(uint64_t id, api_free_object(frame.result); } - decref(channel); + channel_decref(channel); return frame.errored ? NIL : frame.result; } @@ -269,11 +214,11 @@ Object channel_send_call(uint64_t id, /// /// @param id The channel id /// @param event The event type string -void channel_subscribe(uint64_t id, char *event) +void rpc_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -284,18 +229,18 @@ void channel_subscribe(uint64_t id, char *event) pmap_put(cstr_t)(event_strings, event_string, event_string); } - pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); + pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); } /// Unsubscribes to event broadcasts /// /// @param id The channel id /// @param event The event type string -void channel_unsubscribe(uint64_t id, char *event) +void rpc_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { abort(); } @@ -310,7 +255,7 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { + if (!(channel = find_rpc_channel(id))) { return false; } @@ -322,24 +267,22 @@ bool channel_close(uint64_t id) /// Neovim void channel_from_stdio(void) { - Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); - incref(channel); // stdio channels are only closed on exit + Channel *channel = channel_alloc(kChannelStreamStdio); + channel_incref(channel); // stdio channels are only closed on exit // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.std.in, receive_msgpack, channel); - // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); + rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); + wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); - DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, - &channel->data.std.in, &channel->data.std.out); + rpc_start(channel); } /// Creates a loopback channel. This is used to avoid deadlock /// when an instance connects to its own named pipe. uint64_t channel_create_internal(void) { - Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); - incref(channel); // internal channel lives until process exit + Channel *channel = channel_alloc(kChannelStreamInternal); + channel_incref(channel); // internal channel lives until process exit + rpc_start(channel); return channel->id; } @@ -347,8 +290,8 @@ void channel_process_exit(uint64_t id, int status) { Channel *channel = pmap_get(uint64_t)(channels, id); - channel->closed = true; - decref(channel); + // channel_decref(channel); remove?? + channel->rpc.closed = true; } // rstream.c:read_event() invokes this as stream->read_cb(). @@ -356,7 +299,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) { Channel *channel = data; - incref(channel); + channel_incref(channel); if (eof) { close_channel(channel); @@ -367,30 +310,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, goto end; } - if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) - || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { - char buf[256]; - snprintf(buf, sizeof(buf), - "ch %" PRIu64 ": stream closed unexpectedly. " - "closing channel", - channel->id); - call_set_error(channel, buf, WARN_LOG_LEVEL); - goto end; - } - size_t count = rbuffer_size(rbuf); - DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", + DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", channel->id, count, stream); // Feed the unpacker with data - msgpack_unpacker_reserve_buffer(channel->unpacker, count); - rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); - msgpack_unpacker_buffer_consumed(channel->unpacker, count); + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); + rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); parse_msgpack(channel); end: - decref(channel); + channel_decref(channel); } static void parse_msgpack(Channel *channel) @@ -400,8 +332,8 @@ static void parse_msgpack(Channel *channel) msgpack_unpack_return result; // Deserialize everything we can. - while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == - MSGPACK_UNPACK_SUCCESS) { + while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == + MSGPACK_UNPACK_SUCCESS) { bool is_response = is_rpc_response(&unpacked.data); log_client_msg(channel->id, !is_response, unpacked.data); @@ -427,7 +359,7 @@ static void parse_msgpack(Channel *channel) if (result == MSGPACK_UNPACK_NOMEM_ERROR) { mch_errmsg(e_outofmem); mch_errmsg("\n"); - decref(channel); + channel_decref(channel); preserve_exit(); } @@ -492,7 +424,7 @@ static void handle_request(Channel *channel, msgpack_object *request) evdata->handler = handler; evdata->args = args; evdata->request_id = request_id; - incref(channel); + channel_incref(channel); if (handler.async) { bool is_get_mode = handler.fn == handle_nvim_get_mode; @@ -530,66 +462,30 @@ static void on_request_event(void **argv) api_free_object(result); } api_free_array(args); - decref(channel); + channel_decref(channel); xfree(e); api_clear_error(&error); } -/// Returns the Stream that a Channel writes to. -static Stream *chan_wstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->in; - case kChannelTypeStdio: - return &chan->data.std.out; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - -/// Returns the Stream that a Channel reads from. -static Stream *chan_rstream(Channel *chan) -{ - switch (chan->type) { - case kChannelTypeSocket: - return &chan->data.stream; - case kChannelTypeProc: - return chan->data.proc->out; - case kChannelTypeStdio: - return &chan->data.std.in; - case kChannelTypeInternal: - return NULL; - } - abort(); -} - - static bool channel_write(Channel *channel, WBuffer *buffer) { - bool success = false; + bool success; - if (channel->closed) { + if (channel->rpc.closed) { wstream_release_wbuffer(buffer); return false; } - switch (channel->type) { - case kChannelTypeSocket: - case kChannelTypeProc: - case kChannelTypeStdio: - success = wstream_write(chan_wstream(channel), buffer); - break; - case kChannelTypeInternal: - incref(channel); - CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); - success = true; - break; + if (channel->streamtype == kChannelStreamInternal) { + channel_incref(channel); + CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); + success = true; + } else { + Stream *in = channel_instream(channel); + success = wstream_write(in, buffer); } + if (!success) { // If the write failed for any reason, close the channel char buf[256]; @@ -609,14 +505,14 @@ static void internal_read_event(void **argv) Channel *channel = argv[0]; WBuffer *buffer = argv[1]; - msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); - memcpy(msgpack_unpacker_buffer(channel->unpacker), + msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); + memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), buffer->data, buffer->size); - msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); + msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); parse_msgpack(channel); - decref(channel); + channel_decref(channel); wstream_release_wbuffer(buffer); } @@ -665,7 +561,8 @@ static void broadcast_event(const char *name, Array args) Channel *channel; map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, name)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { kv_push(subscribed, channel); } }); @@ -695,10 +592,11 @@ end: static void unsubscribe(Channel *channel, char *event) { char *event_string = pmap_get(cstr_t)(event_strings, event); - pmap_del(cstr_t)(channel->subscribed_events, event_string); + pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); map_foreach_value(channels, channel, { - if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { + if (channel->is_rpc + && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { return; } }); @@ -709,86 +607,65 @@ static void unsubscribe(Channel *channel, char *event) } /// Close the channel streams/process and free the channel resources. +/// TODO: move to channel.h static void close_channel(Channel *channel) { - if (channel->closed) { + if (channel->rpc.closed) { return; } - channel->closed = true; + channel->rpc.closed = true; - switch (channel->type) { - case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL, NULL); + switch (channel->streamtype) { + case kChannelStreamSocket: + stream_close(&channel->stream.socket, NULL, NULL); break; - case kChannelTypeProc: + case kChannelStreamProc: // Only close the rpc channel part, // there could be an error message on the stderr stream - process_close_in(channel->data.proc); - process_close_out(channel->data.proc); + process_close_in(&channel->stream.proc); + process_close_out(&channel->stream.proc); break; - case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL, NULL); - stream_close(&channel->data.std.out, NULL, NULL); + case kChannelStreamStdio: + stream_close(&channel->stream.stdio.in, NULL, NULL); + stream_close(&channel->stream.stdio.out, NULL, NULL); multiqueue_put(main_loop.fast_events, exit_event, 1, channel); return; - case kChannelTypeInternal: + case kChannelStreamInternal: // nothing to free. break; } - decref(channel); + channel_decref(channel); } static void exit_event(void **argv) { - decref(argv[0]); + channel_decref(argv[0]); if (!exiting) { mch_exit(0); } } -static void free_channel(Channel *channel) +void rpc_free(Channel *channel) { remote_ui_disconnect(channel->id); - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); + msgpack_unpacker_free(channel->rpc.unpacker); // Unsubscribe from all events char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { + map_foreach_value(channel->rpc.subscribed_events, event_string, { unsubscribe(channel, event_string); }); - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - if (channel->type != kChannelTypeProc) { - multiqueue_free(channel->events); - } - xfree(channel); + pmap_free(cstr_t)(channel->rpc.subscribed_events); + kv_destroy(channel->rpc.call_stack); } static void close_cb(Stream *stream, void *data) { - decref(data); -} - -static Channel *register_channel(ChannelType type, uint64_t id, - MultiQueue *events) -{ - Channel *rv = xmalloc(sizeof(Channel)); - rv->events = events ? events : multiqueue_new_child(main_loop.events); - rv->type = type; - rv->refcount = 1; - rv->closed = false; - rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); - rv->id = id > 0 ? id : next_chan_id++; - rv->subscribed_events = pmap_new(cstr_t)(); - rv->next_request_id = 1; - kv_init(rv->call_stack); - pmap_put(uint64_t)(channels, rv->id, rv); - return rv; + channel_decref(data); } static bool is_rpc_response(msgpack_object *obj) @@ -803,15 +680,18 @@ static bool is_rpc_response(msgpack_object *obj) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) { uint64_t response_id = obj->via.array.ptr[1].via.u64; + if (kv_size(channel->rpc.call_stack) == 0) { + return false; + } + // Must be equal to the frame at the stack's bottom - return kv_size(channel->call_stack) && response_id - == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); + return response_id == frame->request_id; } static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; @@ -825,8 +705,8 @@ static void complete_call(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg, int loglevel) { LOG(loglevel, "RPC: %s", msg); - for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_A(channel->call_stack, i); + for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { + ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); frame->returned = true; frame->errored = true; api_free_object(frame->result); @@ -875,18 +755,6 @@ static WBuffer *serialize_response(uint64_t channel_id, return rv; } -static void incref(Channel *channel) -{ - channel->refcount++; -} - -static void decref(Channel *channel) -{ - if (!(--channel->refcount)) { - free_channel(channel); - } -} - #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL #define REQ "[request] " #define RES "[response] " diff --git a/src/nvim/msgpack_rpc/channel.h b/src/nvim/msgpack_rpc/channel.h index f8fe6f129b..9ff5abdc5f 100644 --- a/src/nvim/msgpack_rpc/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -8,6 +8,7 @@ #include "nvim/event/socket.h" #include "nvim/event/process.h" #include "nvim/vim.h" +#include "nvim/channel.h" #define METHOD_MAXLEN 512 @@ -16,6 +17,7 @@ /// of os_inchar(), so they are processed "just-in-time". MultiQueue *ch_before_blocking_events; + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "msgpack_rpc/channel.h.generated.h" #endif diff --git a/src/nvim/msgpack_rpc/channel_defs.h b/src/nvim/msgpack_rpc/channel_defs.h new file mode 100644 index 0000000000..6d8362e8b7 --- /dev/null +++ b/src/nvim/msgpack_rpc/channel_defs.h @@ -0,0 +1,36 @@ +#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H +#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H + +#include <stdbool.h> +#include <uv.h> +#include <msgpack.h> + +#include "nvim/api/private/defs.h" +#include "nvim/event/socket.h" +#include "nvim/event/process.h" +#include "nvim/vim.h" + +typedef struct Channel Channel; + +typedef struct { + uint64_t request_id; + bool returned, errored; + Object result; +} ChannelCallFrame; + +typedef struct { + Channel *channel; + MsgpackRpcRequestHandler handler; + Array args; + uint64_t request_id; +} RequestEvent; + +typedef struct { + PMap(cstr_t) *subscribed_events; + bool closed; + msgpack_unpacker *unpacker; + uint64_t next_request_id; + kvec_t(ChannelCallFrame *) call_stack; +} RpcState; + +#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H diff --git a/src/nvim/os/pty_process_unix.c b/src/nvim/os/pty_process_unix.c index ee3ab96a83..e39f837c1a 100644 --- a/src/nvim/os/pty_process_unix.c +++ b/src/nvim/os/pty_process_unix.c @@ -47,7 +47,7 @@ int pty_process_spawn(PtyProcess *ptyproc) int status = 0; // zero or negative error code (libuv convention) Process *proc = (Process *)ptyproc; - assert(!proc->err); + assert(proc->err.closed); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 }; uv_disable_stdio_inheritance(); @@ -83,12 +83,12 @@ int pty_process_spawn(PtyProcess *ptyproc) goto error; } - if (proc->in - && (status = set_duplicating_descriptor(master, &proc->in->uv.pipe))) { + if (!proc->in.closed + && (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) { goto error; } - if (proc->out - && (status = set_duplicating_descriptor(master, &proc->out->uv.pipe))) { + if (!proc->out.closed + && (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) { goto error; } diff --git a/src/nvim/os/pty_process_win.c b/src/nvim/os/pty_process_win.c index ef8a699c56..3c4839a076 100644 --- a/src/nvim/os/pty_process_win.c +++ b/src/nvim/os/pty_process_win.c @@ -44,7 +44,7 @@ int pty_process_spawn(PtyProcess *ptyproc) wchar_t *cwd = NULL; const char *emsg = NULL; - assert(!proc->err); + assert(proc->err.closed); cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err); if (cfg == NULL) { @@ -71,20 +71,20 @@ int pty_process_spawn(PtyProcess *ptyproc) goto cleanup; } - if (proc->in != NULL) { + if (!proc->in.closed) { in_req = xmalloc(sizeof(uv_connect_t)); uv_pipe_connect( in_req, - &proc->in->uv.pipe, + &proc->in.uv.pipe, in_name, pty_process_connect_cb); } - if (proc->out != NULL) { + if (!proc->out.closed) { out_req = xmalloc(sizeof(uv_connect_t)); uv_pipe_connect( out_req, - &proc->out->uv.pipe, + &proc->out.uv.pipe, out_name, pty_process_connect_cb); } @@ -228,7 +228,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer) PtyProcess *ptyproc = wait_eof_timer->data; Process *proc = (Process *)ptyproc; - if (!proc->out || !uv_is_readable(proc->out->uvstream)) { + if (proc->out.closed || !uv_is_readable(proc->out.uvstream)) { uv_timer_stop(&ptyproc->wait_eof_timer); pty_process_finish2(ptyproc); } diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 32e9a70e57..f54fe412ba 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -207,16 +207,12 @@ static int do_os_system(char **argv, char prog[MAXPATHL]; xstrlcpy(prog, argv[0], MAXPATHL); - Stream in, out, err; LibuvProcess uvproc = libuv_process_init(&main_loop, &buf); Process *proc = &uvproc.process; MultiQueue *events = multiqueue_new_child(main_loop.events); proc->events = events; proc->argv = argv; - proc->in = input != NULL ? &in : NULL; - proc->out = &out; - proc->err = &err; - int status = process_spawn(proc); + int status = process_spawn(proc, input != NULL, true, true); if (status) { loop_poll_events(&main_loop, 0); // Failed, probably 'shell' is not executable. @@ -236,27 +232,27 @@ static int do_os_system(char **argv, // streams while there's still data in the OS buffer (due to the process // exiting before all data is read). if (input != NULL) { - proc->in->events = NULL; - wstream_init(proc->in, 0); + proc->in.events = NULL; + wstream_init(&proc->in, 0); } - proc->out->events = NULL; - rstream_init(proc->out, 0); - rstream_start(proc->out, data_cb, &buf); - proc->err->events = NULL; - rstream_init(proc->err, 0); - rstream_start(proc->err, data_cb, &buf); + proc->out.events = NULL; + rstream_init(&proc->out, 0); + rstream_start(&proc->out, data_cb, &buf); + proc->err.events = NULL; + rstream_init(&proc->err, 0); + rstream_start(&proc->err, data_cb, &buf); // write the input, if any if (input) { WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL); - if (!wstream_write(&in, input_buffer)) { + if (!wstream_write(&proc->in, input_buffer)) { // couldn't write, stop the process and tell the user about it process_stop(proc); return -1; } // close the input stream after everything is written - wstream_set_write_cb(&in, shell_write_cb, NULL); + wstream_set_write_cb(&proc->in, shell_write_cb, NULL); } // Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the |