From a10b32707346b03ddd0b3a160c55dd5f7a333b59 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Sat, 18 Jul 2015 09:27:43 -0300 Subject: events: Add missing function attributes to some APIs --- src/nvim/event/rstream.c | 2 ++ src/nvim/event/socket.c | 1 + src/nvim/event/stream.c | 3 +++ src/nvim/event/wstream.c | 2 ++ 4 files changed, 8 insertions(+) (limited to 'src') diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 9d2439ac2b..7283cca02b 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -49,6 +49,7 @@ void rstream_init(Stream *stream, size_t bufsize) /// /// @param stream The `Stream` instance void rstream_start(Stream *stream, stream_read_cb cb) + FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; if (stream->uvstream) { @@ -62,6 +63,7 @@ void rstream_start(Stream *stream, stream_read_cb cb) /// /// @param stream The `Stream` instance void rstream_stop(Stream *stream) + FUNC_ATTR_NONNULL_ALL { if (stream->uvstream) { uv_read_stop(stream->uvstream); diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index bdc632abf0..2a618d290d 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -113,6 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) } int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 959b532146..72dabc3ce7 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -32,6 +32,7 @@ int stream_set_blocking(int fd, bool blocking) void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, void *data) + FUNC_ATTR_NONNULL_ARG(2) { stream->uvstream = uvstream; @@ -72,6 +73,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, } void stream_close(Stream *stream, stream_close_cb on_stream_close) + FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); @@ -88,6 +90,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close) } void stream_close_handle(Stream *stream) + FUNC_ATTR_NONNULL_ALL { if (stream->uvstream) { uv_close((uv_handle_t *)stream->uvstream, close_cb); diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 5fcb724fe3..8028e35e6b 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -118,6 +118,7 @@ WBuffer *wstream_new_buffer(char *data, size_t size, size_t refcount, wbuffer_data_finalizer cb) + FUNC_ATTR_NONNULL_ARG(1) { WBuffer *rv = xmalloc(sizeof(WBuffer)); rv->size = size; @@ -151,6 +152,7 @@ static void write_cb(uv_write_t *req, int status) } void wstream_release_wbuffer(WBuffer *buffer) + FUNC_ATTR_NONNULL_ALL { if (!--buffer->refcount) { if (buffer->cb) { -- cgit From 024b1f39a31c09f3460a1af13062f1920b323f61 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Sat, 18 Jul 2015 09:40:02 -0300 Subject: eval: Fixes to job control - Ensure TerminalJobData is freed in case of error when spawning pty jobs - Check if job was stopped in every function that receives a job id. --- src/nvim/eval.c | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index aa9e656913..2f7b296103 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -10778,7 +10778,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv) return; } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -10819,7 +10819,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv) return; } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -10860,7 +10860,7 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv) } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -11007,8 +11007,8 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv) } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); - if (!data || data->stopped) { + TerminalJobData *data = find_job(argvars[0].vval.v_number); + if (!data) { EMSG(_(e_invjob)); return; } @@ -11053,7 +11053,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { list_append_number(rv, -3); } else { // append the list item and set the status pointer so we'll collect the @@ -11077,7 +11077,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) break; } if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } int status = process_wait((Process *)&data->proc, remaining); @@ -11106,7 +11106,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } // remove the status pointer because the list may be freed before the @@ -21096,6 +21096,10 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) Process *proc = (Process *)&data->proc; if (!process_spawn(&loop, proc)) { EMSG(_(e_jobexe)); + if (proc->type == kProcessTypePty) { + xfree(data->proc.pty.term_name); + free_term_job_data(data); + } return false; } @@ -21305,6 +21309,15 @@ end: xfree(ev); } +static TerminalJobData *find_job(uint64_t id) +{ + TerminalJobData *data = pmap_get(uint64_t)(jobs, id); + if (!data || data->stopped) { + return NULL; + } + return data; +} + static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) { if (check_restricted() || check_secure()) { -- cgit From b13011ff47d9af019c74681edb5a1a42736ee7d7 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Sat, 18 Jul 2015 09:46:14 -0300 Subject: pty_process: Simplify cleanup after error during spawn --- src/nvim/event/pty_process.c | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c index 1e24d7c919..0aa0d6fbd4 100644 --- a/src/nvim/event/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -33,9 +33,6 @@ # include "event/pty_process.c.generated.h" #endif -static const unsigned int KILL_RETRIES = 5; -static const unsigned int KILL_TIMEOUT = 2; // seconds - bool pty_process_spawn(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { @@ -83,19 +80,8 @@ bool pty_process_spawn(PtyProcess *ptyproc) error: close(master); - - // terminate spawned process - kill(pid, SIGTERM); - int status, child; - unsigned int try = 0; - while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) { - sleep(KILL_TIMEOUT); - } - if (child != pid) { - kill(pid, SIGKILL); - waitpid(pid, NULL, 0); - } - + kill(pid, SIGKILL); + waitpid(pid, NULL, 0); return false; } -- cgit From ccdeb91e1206f38773664979bf03694213a2ba80 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 09:55:31 -0300 Subject: msgpack: Replace FUNC_ATTR_DEFERRED by FUNC_ATTR_ASYNC API functions exposed via msgpack-rpc now fall into two categories: - async functions, which are executed as soon as the request is parsed - sync functions, which are invoked in nvim main loop when processing the `K_EVENT special key Only a few functions which can be safely executed in any context are marked as async. --- src/nvim/api/buffer.c | 7 ------- src/nvim/api/tabpage.c | 1 - src/nvim/api/vim.c | 16 ++-------------- src/nvim/api/window.c | 5 ----- src/nvim/func_attr.h | 2 +- src/nvim/map.c | 2 +- src/nvim/msgpack_rpc/channel.c | 8 ++++---- src/nvim/msgpack_rpc/defs.h | 5 ++--- src/nvim/msgpack_rpc/remote_ui.c | 2 +- 9 files changed, 11 insertions(+), 37 deletions(-) (limited to 'src') diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c index 915c5f74d7..12c97cfffb 100644 --- a/src/nvim/api/buffer.c +++ b/src/nvim/api/buffer.c @@ -70,7 +70,6 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err) /// @param line The new line. /// @param[out] err Details of an error that may have occurred void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) - FUNC_ATTR_DEFERRED { Object l = STRING_OBJ(line); Array array = {.items = &l, .size = 1}; @@ -83,7 +82,6 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) /// @param index The line index /// @param[out] err Details of an error that may have occurred void buffer_del_line(Buffer buffer, Integer index, Error *err) - FUNC_ATTR_DEFERRED { Array array = ARRAY_DICT_INIT; buffer_set_line_slice(buffer, index, index, true, true, array, err); @@ -171,7 +169,6 @@ void buffer_set_line_slice(Buffer buffer, Boolean include_end, ArrayOf(String) replacement, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -339,7 +336,6 @@ Object buffer_get_var(Buffer buffer, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object buffer_set_var(Buffer buffer, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -375,7 +371,6 @@ Object buffer_get_option(Buffer buffer, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void buffer_set_option(Buffer buffer, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -426,7 +421,6 @@ String buffer_get_name(Buffer buffer, Error *err) /// @param name The buffer name /// @param[out] err Details of an error that may have occurred void buffer_set_name(Buffer buffer, String name, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -472,7 +466,6 @@ void buffer_insert(Buffer buffer, Integer lnum, ArrayOf(String) lines, Error *err) - FUNC_ATTR_DEFERRED { buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err); } diff --git a/src/nvim/api/tabpage.c b/src/nvim/api/tabpage.c index 1c958118e1..126ee4072d 100644 --- a/src/nvim/api/tabpage.c +++ b/src/nvim/api/tabpage.c @@ -62,7 +62,6 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The tab page handle Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { tabpage_T *tab = find_tab_by_handle(tabpage, err); diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index f5dadb00ea..b9900b5d5a 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -37,7 +37,6 @@ /// @param str The command str /// @param[out] err Details of an error that may have occurred void vim_command(String str, Error *err) - FUNC_ATTR_DEFERRED { // Run the command try_start(); @@ -54,7 +53,6 @@ void vim_command(String str, Error *err) /// @see feedkeys() /// @see vim_strsave_escape_csi void vim_feedkeys(String keys, String mode, Boolean escape_csi) - FUNC_ATTR_DEFERRED { bool remap = true; bool insert = false; @@ -100,6 +98,7 @@ void vim_feedkeys(String keys, String mode, Boolean escape_csi) /// @return The number of bytes actually written, which can be lower than /// requested if the buffer becomes full. Integer vim_input(String keys) + FUNC_ATTR_ASYNC { return (Integer)input_enqueue(keys); } @@ -143,7 +142,6 @@ String vim_command_output(String str, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The expanded object Object vim_eval(String str, Error *err) - FUNC_ATTR_DEFERRED { Object rv = OBJECT_INIT; // Evaluate the expression @@ -171,7 +169,6 @@ Object vim_eval(String str, Error *err) /// @param[out] err Details of an error that may have occurred /// @return Result of the function call Object vim_call_function(String fname, Array args, Error *err) - FUNC_ATTR_DEFERRED { Object rv = OBJECT_INIT; if (args.size > MAX_FUNC_ARGS) { @@ -312,7 +309,6 @@ String vim_get_current_line(Error *err) /// @param line The line contents /// @param[out] err Details of an error that may have occurred void vim_set_current_line(String line, Error *err) - FUNC_ATTR_DEFERRED { buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err); } @@ -321,7 +317,6 @@ void vim_set_current_line(String line, Error *err) /// /// @param[out] err Details of an error that may have occurred void vim_del_current_line(Error *err) - FUNC_ATTR_DEFERRED { buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err); } @@ -343,7 +338,6 @@ Object vim_get_var(String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return the old value if any Object vim_set_var(String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { return dict_set_value(&globvardict, name, value, err); } @@ -374,7 +368,6 @@ Object vim_get_option(String name, Error *err) /// @param value The new option value /// @param[out] err Details of an error that may have occurred void vim_set_option(String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { set_option_to(NULL, SREQ_GLOBAL, name, value, err); } @@ -383,7 +376,6 @@ void vim_set_option(String name, Object value, Error *err) /// /// @param str The message void vim_out_write(String str) - FUNC_ATTR_DEFERRED { write_msg(str, false); } @@ -392,7 +384,6 @@ void vim_out_write(String str) /// /// @param str The message void vim_err_write(String str) - FUNC_ATTR_DEFERRED { write_msg(str, true); } @@ -402,7 +393,6 @@ void vim_err_write(String str) /// /// @param str The message void vim_report_error(String str) - FUNC_ATTR_DEFERRED { vim_err_write(str); vim_err_write((String) {.data = "\n", .size = 1}); @@ -442,7 +432,6 @@ Buffer vim_get_current_buffer(void) /// @param id The buffer handle /// @param[out] err Details of an error that may have occurred void vim_set_current_buffer(Buffer buffer, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -493,7 +482,6 @@ Window vim_get_current_window(void) /// /// @param handle The window handle void vim_set_current_window(Window window, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -545,7 +533,6 @@ Tabpage vim_get_current_tabpage(void) /// @param handle The tab page handle /// @param[out] err Details of an error that may have occurred void vim_set_current_tabpage(Tabpage tabpage, Error *err) - FUNC_ATTR_DEFERRED { tabpage_T *tp = find_tab_by_handle(tabpage, err); @@ -609,6 +596,7 @@ Dictionary vim_get_color_map(void) Array vim_get_api_info(uint64_t channel_id) + FUNC_ATTR_ASYNC { Array rv = ARRAY_DICT_INIT; diff --git a/src/nvim/api/window.c b/src/nvim/api/window.c index 5034c26c83..aad616c7bf 100644 --- a/src/nvim/api/window.c +++ b/src/nvim/api/window.c @@ -54,7 +54,6 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err) /// @param pos the (row, col) tuple representing the new position /// @param[out] err Details of an error that may have occurred void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -118,7 +117,6 @@ Integer window_get_height(Window window, Error *err) /// @param height the new height in rows /// @param[out] err Details of an error that may have occurred void window_set_height(Window window, Integer height, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -162,7 +160,6 @@ Integer window_get_width(Window window, Error *err) /// @param width the new width in columns /// @param[out] err Details of an error that may have occurred void window_set_width(Window window, Integer width, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -208,7 +205,6 @@ Object window_get_var(Window window, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object window_set_var(Window window, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -244,7 +240,6 @@ Object window_get_option(Window window, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void window_set_option(Window window, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); diff --git a/src/nvim/func_attr.h b/src/nvim/func_attr.h index 519f61c763..c31d21ec6d 100644 --- a/src/nvim/func_attr.h +++ b/src/nvim/func_attr.h @@ -179,7 +179,7 @@ #endif #ifdef DEFINE_FUNC_ATTRIBUTES - #define FUNC_ATTR_DEFERRED + #define FUNC_ATTR_ASYNC #define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC #define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x) #define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y) diff --git a/src/nvim/map.c b/src/nvim/map.c index 5d83020619..ed7bda4cce 100644 --- a/src/nvim/map.c +++ b/src/nvim/map.c @@ -116,5 +116,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER) MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER) -#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false} +#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .async = false} MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER) diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index eee662dd9c..ab81e3194c 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -450,16 +450,16 @@ static void handle_request(Channel *channel, msgpack_object *request) method->via.bin.size); } else { handler.fn = msgpack_rpc_handle_missing_method; - handler.defer = false; + handler.async = true; } Array args = ARRAY_DICT_INIT; if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { handler.fn = msgpack_rpc_handle_invalid_arguments; - handler.defer = false; + handler.async = true; } - bool defer = (!kv_size(channel->call_stack) && handler.defer); + bool async = kv_size(channel->call_stack) || handler.async; RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; @@ -469,7 +469,7 @@ static void handle_request(Channel *channel, msgpack_object *request) loop_push_event(&loop, (Event) { .handler = on_request_event, .data = event_data - }, defer); + }, !async); } static void on_request_event(Event event) diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h index 0492a65290..d97cf28ca1 100644 --- a/src/nvim/msgpack_rpc/defs.h +++ b/src/nvim/msgpack_rpc/defs.h @@ -11,9 +11,8 @@ typedef struct { uint64_t request_id, Array args, Error *error); - bool defer; // Should the call be deferred to the main loop? This should - // be true if the function mutates editor data structures such - // as buffers, windows, tabs, or if it executes vimscript code. + bool async; // function is always safe to run immediately instead of being + // put in a request queue for handling when nvim waits for input. } MsgpackRpcRequestHandler; /// Initializes the msgpack-rpc method table diff --git a/src/nvim/msgpack_rpc/remote_ui.c b/src/nvim/msgpack_rpc/remote_ui.c index e582bf9550..3334b0e6af 100644 --- a/src/nvim/msgpack_rpc/remote_ui.c +++ b/src/nvim/msgpack_rpc/remote_ui.c @@ -28,7 +28,7 @@ void remote_ui_init(void) connected_uis = pmap_new(uint64_t)(); // Add handler for "attach_ui" String method = cstr_as_string("ui_attach"); - MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .defer = false}; + MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .async = true}; msgpack_rpc_add_method_handler(method, handler); method = cstr_as_string("ui_detach"); handler.fn = remote_ui_detach; -- cgit From 3f5af6c1c4815c5fb2a492292212b244abe23759 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 10:38:35 -0300 Subject: loop: Simplify loop.c and move some code to input.c - Declare poll timer in Loop structure instead of a loop_poll_events local variable. - Move deferred event management to input.c --- src/nvim/edit.c | 4 ++-- src/nvim/event/loop.c | 58 +++++++-------------------------------------------- src/nvim/event/loop.h | 3 +-- src/nvim/ex_getln.c | 5 +++-- src/nvim/normal.c | 5 +++-- src/nvim/os/input.c | 22 ++++++++++++++++--- src/nvim/terminal.c | 5 +++-- 7 files changed, 39 insertions(+), 63 deletions(-) (limited to 'src') diff --git a/src/nvim/edit.c b/src/nvim/edit.c index 390d62210b..3982e32c2f 100644 --- a/src/nvim/edit.c +++ b/src/nvim/edit.c @@ -601,11 +601,11 @@ edit ( * Get a character for Insert mode. Ignore K_IGNORE. */ lastc = c; /* remember previous char for CTRL-D */ - loop_enable_deferred_events(&loop); + input_enable_events(); do { c = safe_vgetc(); } while (c == K_IGNORE); - loop_disable_deferred_events(&loop); + input_disable_events(); if (c == K_EVENT) { c = lastc; diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index d90565002e..67572c4f30 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -20,6 +20,7 @@ void loop_init(Loop *loop, void *data) loop->children_stop_requests = 0; uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); + uv_timer_init(&loop->uv, &loop->poll_timer); } void loop_poll_events(Loop *loop, int ms) @@ -30,52 +31,28 @@ void loop_poll_events(Loop *loop, int ms) abort(); // Should not re-enter uv_run } - bool wait = true; - uv_timer_t timer; + uv_run_mode mode = UV_RUN_ONCE; if (ms > 0) { - uv_timer_init(&loop->uv, &timer); // Use a repeating timeout of ms milliseconds to make sure // we do not block indefinitely for I/O. - uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms); + uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms); } else if (ms == 0) { // For ms == 0, we need to do a non-blocking event poll by // setting the run mode to UV_RUN_NOWAIT. - wait = false; + mode = UV_RUN_NOWAIT; } - if (wait) { - loop_run_once(loop); - } else { - loop_run_nowait(loop); - } + uv_run(&loop->uv, mode); if (ms > 0) { - // Ensure the timer handle is closed and run the event loop - // once more to let libuv perform it's cleanup - uv_timer_stop(&timer); - uv_close((uv_handle_t *)&timer, NULL); - loop_run_nowait(loop); + uv_timer_stop(&loop->poll_timer); } recursive--; // Can re-enter uv_run now process_events_from(loop->immediate_events); } -bool loop_has_deferred_events(Loop *loop) -{ - return loop->deferred_events_allowed && !kl_empty(loop->deferred_events); -} - -void loop_enable_deferred_events(Loop *loop) -{ - ++loop->deferred_events_allowed; -} - -void loop_disable_deferred_events(Loop *loop) -{ - --loop->deferred_events_allowed; -} // Queue an event void loop_push_event(Loop *loop, Event event, bool deferred) @@ -85,7 +62,7 @@ void loop_push_event(Loop *loop, Event event, bool deferred) // of the queues, the event would only be processed after the poll // returns(user hits a key for example). To avoid this scenario, we call // uv_stop when a event is enqueued. - loop_stop(loop); + uv_stop(&loop->uv); kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, event); } @@ -96,30 +73,11 @@ void loop_process_event(Loop *loop) } -void loop_run(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_DEFAULT); -} - -void loop_run_once(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_ONCE); -} - -void loop_run_nowait(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_NOWAIT); -} - -void loop_stop(Loop *loop) -{ - uv_stop(&loop->uv); -} - void loop_close(Loop *loop) { uv_close((uv_handle_t *)&loop->children_watcher, NULL); uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); + uv_close((uv_handle_t *)&loop->poll_timer, NULL); do { uv_run(&loop->uv, UV_RUN_DEFAULT); } while (uv_loop_close(&loop->uv)); diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 5eb4d32ca8..340e096fec 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -25,10 +25,9 @@ KLIST_INIT(Event, Event, _noop) typedef struct loop { uv_loop_t uv; klist_t(Event) *deferred_events, *immediate_events; - int deferred_events_allowed; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; - uv_timer_t children_kill_timer; + uv_timer_t children_kill_timer, poll_timer; size_t children_stop_requests; } Loop; diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c index 785db1dbd1..f32aee809a 100644 --- a/src/nvim/ex_getln.c +++ b/src/nvim/ex_getln.c @@ -62,6 +62,7 @@ #include "nvim/tag.h" #include "nvim/window.h" #include "nvim/ui.h" +#include "nvim/os/input.h" #include "nvim/os/os.h" #include "nvim/event/loop.h" @@ -298,11 +299,11 @@ getcmdline ( /* Get a character. Ignore K_IGNORE, it should not do anything, such * as stop completion. */ - loop_enable_deferred_events(&loop); + input_enable_events(); do { c = safe_vgetc(); } while (c == K_IGNORE); - loop_disable_deferred_events(&loop); + input_disable_events(); if (c == K_EVENT) { loop_process_event(&loop); diff --git a/src/nvim/normal.c b/src/nvim/normal.c index 95e1c3d113..dd9a4230b5 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -63,6 +63,7 @@ #include "nvim/window.h" #include "nvim/event/loop.h" #include "nvim/os/time.h" +#include "nvim/os/input.h" /* * The Visual area is remembered for reselection. @@ -487,9 +488,9 @@ normal_cmd ( /* * Get the command character from the user. */ - loop_enable_deferred_events(&loop); + input_enable_events(); c = safe_vgetc(); - loop_disable_deferred_events(&loop); + input_disable_events(); if (c == K_EVENT) { loop_process_event(&loop); diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index b0e0f57e60..b0263f18d0 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -33,6 +33,7 @@ static Stream read_stream = {.closed = true}; static RBuffer *input_buffer = NULL; static bool input_eof = false; static int global_fd = 0; +static int events_enabled = 0; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/input.c.generated.h" @@ -110,8 +111,8 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); } - // If there are deferred events, return the keys directly - if (loop_has_deferred_events(&loop)) { + // If there are events, return the keys directly + if (pending_events()) { return push_event_key(buf, maxlen); } @@ -136,6 +137,16 @@ void os_breakcheck(void) } } +void input_enable_events(void) +{ + events_enabled++; +} + +void input_disable_events(void) +{ + events_enabled--; +} + /// Test whether a file descriptor refers to a terminal. /// /// @param fd File descriptor. @@ -358,7 +369,7 @@ static bool input_ready(void) { return typebuf_was_filled || // API call filled typeahead rbuffer_size(input_buffer) || // Input buffer filled - loop_has_deferred_events(&loop); // Events must be processed + pending_events(); // Events must be processed } // Exit because of an input read error. @@ -369,3 +380,8 @@ static void read_error_exit(void) STRCPY(IObuff, _("Vim: Error reading input, exiting...\n")); preserve_exit(); } + +static bool pending_events(void) +{ + return events_enabled && !kl_empty(loop.deferred_events); +} diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c index 47fef692db..0285ce72d4 100644 --- a/src/nvim/terminal.c +++ b/src/nvim/terminal.c @@ -69,6 +69,7 @@ #include "nvim/fileio.h" #include "nvim/event/loop.h" #include "nvim/event/time.h" +#include "nvim/os/input.h" #include "nvim/api/private/helpers.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -354,13 +355,13 @@ void terminal_enter(bool process_deferred) while (term->buf == curbuf) { if (process_deferred) { - loop_enable_deferred_events(&loop); + input_enable_events(); } c = safe_vgetc(); if (process_deferred) { - loop_disable_deferred_events(&loop); + input_disable_events(); } switch (c) { -- cgit From 696f9c2759b078f749625d167f3424915586108d Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 11:56:44 -0300 Subject: process: Pass loop reference during initialization Change the API so that it is passed to {uv,pty}_process_init instead of `process_spawn`. --- src/nvim/eval.c | 6 +++--- src/nvim/event/process.c | 11 +++++------ src/nvim/event/process.h | 4 ++-- src/nvim/event/pty_process.h | 4 ++-- src/nvim/event/uv_process.h | 4 ++-- src/nvim/msgpack_rpc/channel.c | 4 ++-- src/nvim/os/shell.c | 4 ++-- 7 files changed, 18 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 2f7b296103..e2f095dbad 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -21054,9 +21054,9 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, data->on_exit = on_exit; data->self = self; if (pty) { - data->proc.pty = pty_process_init(data); + data->proc.pty = pty_process_init(&loop, data); } else { - data->proc.uv = uv_process_init(data); + data->proc.uv = uv_process_init(&loop, data); } Process *proc = (Process *)&data->proc; proc->argv = argv; @@ -21094,7 +21094,7 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) { data->refcount++; Process *proc = (Process *)&data->proc; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { EMSG(_(e_jobexe)); if (proc->type == kProcessTypePty) { xfree(data->proc.pty.term_name); diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 2b1f1ae096..c7360b8614 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -30,19 +30,18 @@ } while (0) -bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL +bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL { - proc->loop = loop; if (proc->in) { - uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0); } if (proc->out) { - uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0); } if (proc->err) { - uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0); } bool success; @@ -99,7 +98,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL proc->internal_exit_cb = on_process_exit; proc->internal_close_cb = decref; proc->refcount++; - kl_push(WatcherPtr, loop->children, proc); + kl_push(WatcherPtr, proc->loop->children, proc); return true; } diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c84a7d1d0..7ef2b24b7f 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -28,12 +28,12 @@ struct process { bool closed, term_sent; }; -static inline Process process_init(ProcessType type, void *data) +static inline Process process_init(Loop *loop, ProcessType type, void *data) { return (Process) { .type = type, .data = data, - .loop = NULL, + .loop = loop, .pid = 0, .status = 0, .refcount = 0, diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h index a12b5489c5..446d7fd3c8 100644 --- a/src/nvim/event/pty_process.h +++ b/src/nvim/event/pty_process.h @@ -13,10 +13,10 @@ typedef struct pty_process { int tty_fd; } PtyProcess; -static inline PtyProcess pty_process_init(void *data) +static inline PtyProcess pty_process_init(Loop *loop, void *data) { PtyProcess rv; - rv.process = process_init(kProcessTypePty, data); + rv.process = process_init(loop, kProcessTypePty, data); rv.term_name = NULL; rv.width = 80; rv.height = 24; diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h index a17f1446b3..5ee73044b5 100644 --- a/src/nvim/event/uv_process.h +++ b/src/nvim/event/uv_process.h @@ -12,10 +12,10 @@ typedef struct uv_process { uv_stdio_container_t uvstdio[3]; } UvProcess; -static inline UvProcess uv_process_init(void *data) +static inline UvProcess uv_process_init(Loop *loop, void *data) { UvProcess rv; - rv.process = process_init(kProcessTypeUv, data); + rv.process = process_init(loop, kProcessTypeUv, data); return rv; } diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ab81e3194c..ca08af1fe8 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -123,14 +123,14 @@ void channel_teardown(void) uint64_t channel_from_process(char **argv) { Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = uv_process_init(channel); + channel->data.process.uvproc = uv_process_init(&loop, channel); Process *proc = &channel->data.process.uvproc.process; proc->argv = argv; proc->in = &channel->data.process.in; proc->out = &channel->data.process.out; proc->err = &channel->data.process.err; proc->cb = process_exit; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { loop_poll_events(&loop, 0); decref(channel); return 0; diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index b9c5db4261..958b4483e8 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -205,13 +205,13 @@ static int do_os_system(char **argv, xstrlcpy(prog, argv[0], MAXPATHL); Stream in, out, err; - UvProcess uvproc = uv_process_init(&buf); + UvProcess uvproc = uv_process_init(&loop, &buf); Process *proc = &uvproc.process; proc->argv = argv; proc->in = input != NULL ? &in : NULL; proc->out = &out; proc->err = &err; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { loop_poll_events(&loop, 0); // Failed, probably due to `sh` not being executable if (!silent) { -- cgit From bef0c03b250b1cb671ad8f130228e6b4a7ae74d3 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 12:14:31 -0300 Subject: terminal: Ensure terminal buffers are flushed on exit When a terminal closed, make sure it is refreshed before the Terminal structure is freed. Also extract `refresh_terminal` from `on_refresh`. --- src/nvim/terminal.c | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c index 0285ce72d4..cf68143ac7 100644 --- a/src/nvim/terminal.c +++ b/src/nvim/terminal.c @@ -428,7 +428,13 @@ void terminal_destroy(Terminal *term) term->buf->terminal = NULL; } term->buf = NULL; - pmap_del(ptr_t)(invalidated_terminals, term); + if (pmap_has(ptr_t)(invalidated_terminals, term)) { + // flush any pending changes to the buffer + block_autocmds(); + refresh_terminal(term); + unblock_autocmds(); + pmap_del(ptr_t)(invalidated_terminals, term); + } for (size_t i = 0 ; i < term->sb_current; i++) { xfree(term->sb_buffer[i]); } @@ -884,6 +890,26 @@ static void invalidate_terminal(Terminal *term, int start_row, int end_row) } } +static void refresh_terminal(Terminal *term) +{ + // TODO(SplinterOfChaos): Find the condition that makes term->buf invalid. + bool valid = true; + if (!term->buf || !(valid = buf_valid(term->buf))) { + // destroyed by `close_buffer`. Dont do anything else + if (!valid) { + term->buf = NULL; + } + return; + } + bool pending_resize = term->pending_resize; + WITH_BUFFER(term->buf, { + refresh_size(term); + refresh_scrollback(term); + refresh_screen(term); + redraw_buf_later(term->buf, NOT_VALID); + }); + adjust_topline(term, pending_resize); +} // libuv timer callback. This will enqueue on_refresh to be processed as an // event. static void refresh_timer_cb(TimeWatcher *watcher, void *data) @@ -905,23 +931,7 @@ static void on_refresh(Event event) // don't process autocommands while updating terminal buffers block_autocmds(); map_foreach(invalidated_terminals, term, stub, { - // TODO(SplinterOfChaos): Find the condition that makes term->buf invalid. - bool valid = true; - if (!term->buf || !(valid = buf_valid(term->buf))) { - // destroyed by `close_buffer`. Dont do anything else - if (!valid) { - term->buf = NULL; - } - continue; - } - bool pending_resize = term->pending_resize; - WITH_BUFFER(term->buf, { - refresh_size(term); - refresh_scrollback(term); - refresh_screen(term); - redraw_buf_later(term->buf, NOT_VALID); - }); - adjust_topline(term, pending_resize); + refresh_terminal(term); }); pmap_clear(ptr_t)(invalidated_terminals); unblock_autocmds(); -- cgit From 1a7a020b68ed50d52dafa4ccc72a12686942e6b7 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 12:24:16 -0300 Subject: lib: Include libuv circularly linked list This is simpler and more efficient than klist.h for implementing queues that support insertion or removal at arbitrary positions. --- src/nvim/lib/queue.h | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 src/nvim/lib/queue.h (limited to 'src') diff --git a/src/nvim/lib/queue.h b/src/nvim/lib/queue.h new file mode 100644 index 0000000000..fe02b454ea --- /dev/null +++ b/src/nvim/lib/queue.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2013, Ben Noordhuis + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef QUEUE_H_ +#define QUEUE_H_ + +typedef void *QUEUE[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) +#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Public macros. */ +#define QUEUE_DATA(ptr, type, field) \ + ((type *) ((char *) (ptr) - ((char *) &((type *) 0)->field))) + +#define QUEUE_FOREACH(q, h) \ + for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) + +#define QUEUE_EMPTY(q) \ + ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) + +#define QUEUE_HEAD(q) \ + (QUEUE_NEXT(q)) + +#define QUEUE_INIT(q) \ + do { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } \ + while (0) + +#define QUEUE_ADD(h, n) \ + do { \ + QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ + QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV(h) = QUEUE_PREV(n); \ + QUEUE_PREV_NEXT(h) = (h); \ + } \ + while (0) + +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } \ + while (0) + +#define QUEUE_INSERT_HEAD(h, q) \ + do { \ + QUEUE_NEXT(q) = QUEUE_NEXT(h); \ + QUEUE_PREV(q) = (h); \ + QUEUE_NEXT_PREV(q) = (q); \ + QUEUE_NEXT(h) = (q); \ + } \ + while (0) + +#define QUEUE_INSERT_TAIL(h, q) \ + do { \ + QUEUE_NEXT(q) = (h); \ + QUEUE_PREV(q) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(q) = (q); \ + QUEUE_PREV(h) = (q); \ + } \ + while (0) + +#define QUEUE_REMOVE(q) \ + do { \ + QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ + QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ + } \ + while (0) + +#endif /* QUEUE_H_ */ -- cgit From a6e0d35d2da3ee4270ddb712410ea0c8c55b0f0f Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 24 Jul 2015 12:24:25 -0300 Subject: queue: Implement a more flexible event queue --- src/nvim/CMakeLists.txt | 2 +- src/nvim/event/defs.h | 39 ++++++++++ src/nvim/event/queue.c | 202 ++++++++++++++++++++++++++++++++++++++++++++++++ src/nvim/event/queue.h | 19 +++++ 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 src/nvim/event/defs.h create mode 100644 src/nvim/event/queue.c create mode 100644 src/nvim/event/queue.h (limited to 'src') diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index d49b8dc416..747b63b1ba 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -217,7 +217,7 @@ install_helper(TARGETS nvim) if(CLANG_ASAN_UBSAN) message(STATUS "Enabling Clang address sanitizer and undefined behavior sanitizer for nvim.") set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-DEXITFREE ") - set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined ") + set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/.asan-blacklist") set_property(TARGET nvim APPEND_STRING PROPERTY LINK_FLAGS "-fsanitize=address -fsanitize=undefined ") elseif(CLANG_MSAN) message(STATUS "Enabling Clang memory sanitizer for nvim.") diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h new file mode 100644 index 0000000000..5126d52241 --- /dev/null +++ b/src/nvim/event/defs.h @@ -0,0 +1,39 @@ +#ifndef NVIM_EVENT_DEFS_H +#define NVIM_EVENT_DEFS_H + +#include +#include + +#define EVENT_HANDLER_MAX_ARGC 4 + +typedef void (*argv_callback)(void **argv); +typedef struct message { + int priority; + argv_callback handler; + void *argv[EVENT_HANDLER_MAX_ARGC]; +} Event; + +#define VA_EVENT_INIT(event, p, h, a) \ + do { \ + assert(a <= EVENT_HANDLER_MAX_ARGC); \ + (event)->priority = p; \ + (event)->handler = h; \ + if (a) { \ + va_list args; \ + va_start(args, a); \ + for (int i = 0; i < a; i++) { \ + (event)->argv[i] = va_arg(args, void *); \ + } \ + va_end(args); \ + } \ + } while (0) + +static inline Event event_create(int priority, argv_callback cb, int argc, ...) +{ + assert(argc <= EVENT_HANDLER_MAX_ARGC); + Event event; + VA_EVENT_INIT(&event, priority, cb, argc); + return event; +} + +#endif // NVIM_EVENT_DEFS_H diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c new file mode 100644 index 0000000000..3f03dd444e --- /dev/null +++ b/src/nvim/event/queue.c @@ -0,0 +1,202 @@ +// Queue for selective async event processing. Instances of this queue support a +// parent/child relationship with the following properties: +// +// - pushing a node to a child queue will push a corresponding link node to the +// parent queue +// - removing a link node from a parent queue will remove the next node +// in the linked child queue +// - removing a node from a child queue will remove the corresponding link node +// in the parent queue +// +// These properties allow neovim to organize and process events from different +// sources with a certain degree of control. Here's how the queue is used: +// +// +----------------+ +// | Main loop | +// +----------------+ +// ^ +// | +// +----------------+ +// +-------------->| Event loop |<------------+ +// | +--+-------------+ | +// | ^ ^ | +// | | | | +// +-----------+ +-----------+ +---------+ +---------+ +// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 | +// +-----------+ +-----------+ +---------+ +---------+ +// +// +// In the above diagram, the lower boxes represents event emitters, each with +// it's own private queue that have the event loop queue as the parent. +// +// When idle, the main loop spins the event loop which queues events from many +// sources(channels, jobs, user...). Each event emitter pushes events to its own +// private queue which is propagated to the event loop queue. When the main loop +// consumes an event, the corresponding event is removed from the emitter's +// queue. +// +// The main reason for this queue hierarchy is to allow focusing on a single +// event emitter while blocking the main loop. For example, if the `jobwait` +// vimscript function is called on job1, the main loop will temporarily stop +// polling the event loop queue and poll job1 queue instead. Same with channels, +// when calling `rpcrequest`, we want to temporarily stop processing events from +// other sources and focus on a specific channel. + +#include +#include +#include +#include + + +#include + +#include "nvim/event/queue.h" +#include "nvim/memory.h" +#include "nvim/os/time.h" + +typedef struct queue_item QueueItem; +struct queue_item { + union { + Queue *queue; + struct { + Event event; + QueueItem *parent; + } item; + } data; + bool link; // this is just a link to a node in a child queue + QUEUE node; +}; + +struct queue { + Queue *parent; + QUEUE headtail; + put_callback put_cb; + void *data; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/queue.c.generated.h" +#endif + +static Event NILEVENT = {.handler = NULL, .argv = {NULL}}; + +Queue *queue_new_parent(put_callback put_cb, void *data) +{ + return queue_new(NULL, put_cb, data); +} + +Queue *queue_new_child(Queue *parent) + FUNC_ATTR_NONNULL_ALL +{ + assert(!parent->parent); + return queue_new(parent, NULL, NULL); +} + +static Queue *queue_new(Queue *parent, put_callback put_cb, void *data) +{ + Queue *rv = xmalloc(sizeof(Queue)); + QUEUE_INIT(&rv->headtail); + rv->parent = parent; + rv->put_cb = put_cb; + rv->data = data; + return rv; +} + +void queue_free(Queue *queue) +{ + assert(queue); + if (queue->parent) { + while (!QUEUE_EMPTY(&queue->headtail)) { + QUEUE *q = QUEUE_HEAD(&queue->headtail); + QueueItem *item = queue_node_data(q); + assert(!item->link); + QUEUE_REMOVE(&item->data.item.parent->node); + xfree(item->data.item.parent); + QUEUE_REMOVE(q); + xfree(item); + } + } + + xfree(queue); +} + +Event queue_get(Queue *queue) +{ + return queue_empty(queue) ? NILEVENT : queue_remove(queue); +} + +void queue_put_event(Queue *queue, Event event) +{ + assert(queue); + assert(queue->parent); // don't push directly to the parent queue + queue_push(queue, event); + if (queue->parent->put_cb) { + queue->parent->put_cb(queue->parent, queue->parent->data); + } +} + +void queue_process_events(Queue *queue) +{ + assert(queue); + while (!queue_empty(queue)) { + Event event = queue_get(queue); + if (event.handler) { + event.handler(event.argv); + } + } +} + +bool queue_empty(Queue *queue) +{ + assert(queue); + return QUEUE_EMPTY(&queue->headtail); +} + +static Event queue_remove(Queue *queue) +{ + assert(!queue_empty(queue)); + QUEUE *h = QUEUE_HEAD(&queue->headtail); + QUEUE_REMOVE(h); + QueueItem *item = queue_node_data(h); + Event rv; + + if (item->link) { + assert(!queue->parent); + // remove the next node in the linked queue + Queue *linked = item->data.queue; + assert(!queue_empty(linked)); + QueueItem *child = + queue_node_data(QUEUE_HEAD(&linked->headtail)); + QUEUE_REMOVE(&child->node); + rv = child->data.item.event; + xfree(child); + } else { + assert(queue->parent); + assert(!queue_empty(queue->parent)); + // remove the corresponding link node in the parent queue + QUEUE_REMOVE(&item->data.item.parent->node); + xfree(item->data.item.parent); + rv = item->data.item.event; + } + + xfree(item); + return rv; +} + +static void queue_push(Queue *queue, Event event) +{ + QueueItem *item = xmalloc(sizeof(QueueItem)); + item->link = false; + item->data.item.event = event; + QUEUE_INSERT_TAIL(&queue->headtail, &item->node); + // push link node to the parent queue + item->data.item.parent = xmalloc(sizeof(QueueItem)); + item->data.item.parent->link = true; + item->data.item.parent->data.queue = queue; + QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node); +} + +static QueueItem *queue_node_data(QUEUE *q) +{ + return QUEUE_DATA(q, QueueItem, node); +} diff --git a/src/nvim/event/queue.h b/src/nvim/event/queue.h new file mode 100644 index 0000000000..85fc59f8b2 --- /dev/null +++ b/src/nvim/event/queue.h @@ -0,0 +1,19 @@ +#ifndef NVIM_EVENT_QUEUE_H +#define NVIM_EVENT_QUEUE_H + +#include + +#include "nvim/event/defs.h" +#include "nvim/lib/queue.h" + +typedef struct queue Queue; +typedef void (*put_callback)(Queue *queue, void *data); + +#define queue_put(q, h, ...) \ + queue_put_event(q, event_create(1, h, __VA_ARGS__)); + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/queue.h.generated.h" +#endif +#endif // NVIM_EVENT_QUEUE_H -- cgit From 502aee690c980fcb3cfcb3f211dcfad06103db46 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Fri, 7 Aug 2015 22:54:02 -0300 Subject: event: Refactor async event processing - Improve the implementation of deferred/immediate events. - Use the new queue module to change how/when events are queued/processed by giving a private queue to each emitter. - Immediate events(which only exist to break uv_run recursion) are now represented in the `loop->fast_events` queue. - Events pushed to child queues are propagated to the event loop main queue and processed as K_EVENT keys. --- src/nvim/edit.c | 4 +-- src/nvim/eval.c | 78 +++++++++++++++++++----------------------- src/nvim/event/loop.c | 39 ++++++--------------- src/nvim/event/loop.h | 36 ++++++++++++------- src/nvim/event/process.c | 75 ++++++++++++++++++++++++---------------- src/nvim/event/process.h | 2 ++ src/nvim/event/rstream.c | 9 ++++- src/nvim/event/signal.c | 9 ++++- src/nvim/event/signal.h | 1 + src/nvim/event/socket.c | 11 +++++- src/nvim/event/socket.h | 1 + src/nvim/event/stream.c | 10 +++--- src/nvim/event/stream.h | 1 + src/nvim/event/time.c | 9 ++++- src/nvim/event/time.h | 1 + src/nvim/ex_getln.c | 2 +- src/nvim/globals.h | 8 ----- src/nvim/main.c | 4 +-- src/nvim/msgpack_rpc/channel.c | 28 ++++++++------- src/nvim/normal.c | 2 +- src/nvim/os/input.c | 6 ++-- src/nvim/os/shell.c | 13 +++++++ src/nvim/os/signal.c | 11 ------ src/nvim/os/time.c | 2 +- src/nvim/terminal.c | 25 ++++---------- src/nvim/tui/term_input.inl | 9 +++-- src/nvim/tui/tui.c | 15 ++------ src/nvim/ui.c | 14 ++------ 28 files changed, 215 insertions(+), 210 deletions(-) (limited to 'src') diff --git a/src/nvim/edit.c b/src/nvim/edit.c index 3982e32c2f..6bcf5e804a 100644 --- a/src/nvim/edit.c +++ b/src/nvim/edit.c @@ -254,7 +254,7 @@ edit ( ) { if (curbuf->terminal) { - terminal_enter(true); + terminal_enter(); return false; } @@ -609,7 +609,7 @@ edit ( if (c == K_EVENT) { c = lastc; - loop_process_event(&loop); + queue_process_events(loop.events); continue; } diff --git a/src/nvim/eval.c b/src/nvim/eval.c index e2f095dbad..d61cccd41c 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -468,6 +468,7 @@ typedef struct { dict_T *self; int *status_ptr; uint64_t id; + Queue *events; } TerminalJobData; /// Structure representing current VimL to messagepack conversion state @@ -493,6 +494,13 @@ typedef struct { /// Stack used to convert VimL values to messagepack. typedef kvec_t(MPConvStackVal) MPConvStack; +typedef struct { + TerminalJobData *data; + ufunc_T *callback; + const char *type; + list_T *received; + int status; +} JobEvent; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "eval.c.generated.h" @@ -501,15 +509,6 @@ typedef kvec_t(MPConvStackVal) MPConvStack; #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -// Memory pool for reusing JobEvent structures -typedef struct { - TerminalJobData *data; - ufunc_T *callback; - const char *type; - list_T *received; - int status; -} JobEvent; -static int disable_job_defer = 0; static uint64_t current_job_id = 1; static PMap(uint64_t) *jobs = NULL; @@ -11038,15 +11037,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) list_T *rv = list_alloc(); ui_busy_start(); - // disable breakchecks, which could result in job callbacks being executed - // at unexpected places - disable_breakcheck++; - // disable job event deferring so the callbacks are processed while waiting. - if (!disable_job_defer++) { - // process any pending job events in the deferred queue, but only do this if - // deferred is not disabled(at the top-level `jobwait()` call) - loop_process_event(&loop); - } // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. @@ -11113,8 +11103,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // job exits data->status_ptr = NULL; } - disable_job_defer--; - disable_breakcheck--; ui_busy_stop(); rv->lv_refcount++; @@ -21053,6 +21041,7 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, data->on_stderr = on_stderr; data->on_exit = on_exit; data->self = self; + data->events = queue_new_child(loop.events); if (pty) { data->proc.pty = pty_process_init(&loop, data); } else { @@ -21064,6 +21053,7 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, proc->out = &data->out; proc->err = &data->err; proc->cb = on_process_exit; + proc->events = data->events; return data; } @@ -21118,7 +21108,9 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) return true; } -static inline void free_term_job_data(TerminalJobData *data) { +static inline void free_term_job_data_event(void **argv) +{ + TerminalJobData *data = argv[0]; if (data->on_stdout) { user_func_unref(data->on_stdout); } @@ -21133,17 +21125,25 @@ static inline void free_term_job_data(TerminalJobData *data) { data->self->internal_refcount--; dict_unref(data->self); } + queue_free(data->events); xfree(data); } +static inline void free_term_job_data(TerminalJobData *data) +{ + // data->queue may still be used after this function returns(process_wait), so + // only free in the next event loop iteration + queue_put(loop.fast_events, free_term_job_data_event, 1, data); +} + // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, +static inline void process_job_event(TerminalJobData *data, ufunc_T *callback, const char *type, char *buf, size_t count, int status) { - JobEvent *event_data = xmalloc(sizeof(JobEvent)); - event_data->received = NULL; + JobEvent event_data; + event_data.received = NULL; if (buf) { - event_data->received = list_alloc(); + event_data.received = list_alloc(); char *ptr = buf; size_t remaining = count; size_t off = 0; @@ -21151,7 +21151,7 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, while (off < remaining) { // append the line if (ptr[off] == NL) { - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); size_t skip = off + 1; ptr += skip; remaining -= skip; @@ -21164,17 +21164,14 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, } off++; } - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); } else { - event_data->status = status; + event_data.status = status; } - event_data->data = data; - event_data->callback = callback; - event_data->type = type; - loop_push_event(&loop, (Event) { - .handler = on_job_event, - .data = event_data - }, !disable_job_defer); + event_data.data = data; + event_data.callback = callback; + event_data.type = type; + on_job_event(&event_data); } static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof) @@ -21198,13 +21195,13 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, RBUFFER_UNTIL_EMPTY(buf, ptr, len) { // The order here matters, the terminal must receive the data first because - // push_job_event will modify the read buffer(convert NULs into NLs) + // process_job_event will modify the read buffer(convert NULs into NLs) if (data->term) { terminal_receive(data->term, ptr, len); } if (callback) { - push_job_event(data, callback, type, ptr, len, 0); + process_job_event(data, callback, type, ptr, len, 0); } rbuffer_consumed(buf, len); @@ -21224,7 +21221,7 @@ static void on_process_exit(Process *proc, int status, void *d) *data->status_ptr = status; } - push_job_event(data, data->on_exit, "exit", NULL, 0, status); + process_job_event(data, data->on_exit, "exit", NULL, 0, status); } static void term_write(char *buf, size_t size, void *d) @@ -21258,10 +21255,8 @@ static void term_job_data_decref(TerminalJobData *data) } } -static void on_job_event(Event event) +static void on_job_event(JobEvent *ev) { - JobEvent *ev = event.data; - if (!ev->callback) { goto end; } @@ -21306,7 +21301,6 @@ end: pmap_del(uint64_t)(jobs, ev->data->id); term_job_data_decref(ev->data); } - xfree(ev); } static TerminalJobData *find_job(uint64_t id) diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 67572c4f30..1a50ec0d9a 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -1,3 +1,4 @@ +#include #include #include @@ -9,15 +10,20 @@ # include "event/loop.c.generated.h" #endif +typedef struct idle_event { + uv_idle_t idle; + Event event; +} IdleEvent; + void loop_init(Loop *loop, void *data) { uv_loop_init(&loop->uv); loop->uv.data = loop; - loop->deferred_events = kl_init(Event); - loop->immediate_events = kl_init(Event); loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; + loop->events = queue_new_parent(on_put, loop); + loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); uv_timer_init(&loop->uv, &loop->poll_timer); @@ -50,29 +56,20 @@ void loop_poll_events(Loop *loop, int ms) } recursive--; // Can re-enter uv_run now - process_events_from(loop->immediate_events); + queue_process_events(loop->fast_events); } - -// Queue an event -void loop_push_event(Loop *loop, Event event, bool deferred) +static void on_put(Queue *queue, void *data) { + Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before // blocking for a poll. If this happens and the callback pushes a event to one // of the queues, the event would only be processed after the poll // returns(user hits a key for example). To avoid this scenario, we call // uv_stop when a event is enqueued. uv_stop(&loop->uv); - kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, - event); } -void loop_process_event(Loop *loop) -{ - process_events_from(loop->deferred_events); -} - - void loop_close(Loop *loop) { uv_close((uv_handle_t *)&loop->children_watcher, NULL); @@ -83,20 +80,6 @@ void loop_close(Loop *loop) } while (uv_loop_close(&loop->uv)); } -void loop_process_all_events(Loop *loop) -{ - process_events_from(loop->immediate_events); - process_events_from(loop->deferred_events); -} - -static void process_events_from(klist_t(Event) *queue) -{ - while (!kl_empty(queue)) { - Event event = kl_shift(Event, queue); - event.handler(event); - } -} - static void timer_cb(uv_timer_t *handle) { } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 340e096fec..9212a45aa4 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -7,37 +7,39 @@ #include "nvim/lib/klist.h" #include "nvim/os/time.h" - -typedef struct event Event; -typedef void (*event_handler)(Event event); - -struct event { - void *data; - event_handler handler; -}; +#include "nvim/event/queue.h" typedef void * WatcherPtr; #define _noop(x) KLIST_INIT(WatcherPtr, WatcherPtr, _noop) -KLIST_INIT(Event, Event, _noop) typedef struct loop { uv_loop_t uv; - klist_t(Event) *deferred_events, *immediate_events; + Queue *events, *fast_events; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; uv_timer_t children_kill_timer, poll_timer; size_t children_stop_requests; } Loop; +#define CREATE_EVENT(queue, handler, argc, ...) \ + do { \ + if (queue) { \ + queue_put((queue), (handler), argc, __VA_ARGS__); \ + } else { \ + void *argv[argc] = {__VA_ARGS__}; \ + (handler)(argv); \ + } \ + } while (0) + // Poll for events until a condition or timeout -#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \ +#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \ do { \ int remaining = timeout; \ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ while (!(condition)) { \ - loop_poll_events(loop, remaining); \ + LOOP_PROCESS_EVENTS(loop, queue, remaining); \ if (remaining == 0) { \ break; \ } else if (remaining > 0) { \ @@ -51,6 +53,16 @@ typedef struct loop { } \ } while (0) +#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \ + do { \ + if (queue && !queue_empty(queue)) { \ + queue_process_events(queue); \ + } else { \ + loop_poll_events(loop, timeout); \ + } \ + } while (0) + + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.h.generated.h" #endif diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index c7360b8614..54dbc11a03 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -22,7 +22,7 @@ #define TERM_TIMEOUT 1000000000 #define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define CLOSE_PROC_STREAM(proc, stream) \ +#define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ stream_close(proc->stream, NULL); \ @@ -76,6 +76,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->in) { stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -83,6 +84,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->out) { stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -90,6 +92,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->err) { stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -112,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL } // Wait until all children exit - LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children)); pty_process_teardown(loop); } @@ -154,11 +157,15 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // The default status is -1, which represents a timeout int status = -1; bool interrupted = false; + if (!proc->refcount) { + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + return proc->status; + } // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -170,12 +177,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL got_int = false; process_stop(proc); if (ms == -1) { - // We can only return, if all streams/handles are closed and the job - + // We can only return if all streams/handles are closed and the job // exited. - LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, -1, + proc->refcount == 1); } else { - loop_poll_events(proc->loop, 0); + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); } } @@ -184,6 +191,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); + if (proc->events) { + // the decref call created an exit event, process it now + queue_process_events(proc->events); + } } else { proc->refcount--; } @@ -249,6 +260,18 @@ static void children_kill_cb(uv_timer_t *handle) } } +static void process_close_event(void **argv) +{ + Process *proc = argv[0]; + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + static void decref(Process *proc) { if (--proc->refcount != 0) { @@ -263,16 +286,9 @@ static void decref(Process *proc) break; } } - assert(node); kl_shift_at(WatcherPtr, loop->children, node); - shell_free_argv(proc->argv); - if (proc->type == kProcessTypePty) { - xfree(((PtyProcess *)proc)->term_name); - } - if (proc->cb) { - proc->cb(proc, proc->status, proc->data); - } + CREATE_EVENT(proc->events, process_close_event, 1, proc); } static void process_close(Process *proc) @@ -292,28 +308,27 @@ static void process_close(Process *proc) } } -static void on_process_exit(Process *proc) +static void process_close_handles(void **argv) { - if (exiting) { - on_process_exit_event((Event) {.data = proc}); - } else { - loop_push_event(proc->loop, - (Event) {.handler = on_process_exit_event, .data = proc}, false); - } + Process *proc = argv[0]; + process_close_streams(proc); + process_close(proc); +} +static void on_process_exit(Process *proc) +{ Loop *loop = proc->loop; - if (loop->children_stop_requests && !--loop->children_stop_requests) { + if (proc->stopped_time && loop->children_stop_requests + && !--loop->children_stop_requests) { // Stop the timer if no more stop requests are pending DLOG("Stopping process kill timer"); uv_timer_stop(&loop->children_kill_timer); } -} - -static void on_process_exit_event(Event event) -{ - Process *proc = event.data; - process_close_streams(proc); - process_close(proc); + // Process handles are closed in the next event loop tick. This is done to + // give libuv more time to read data from the OS after the process exits(If + // process_close_streams is called with data still in the OS buffer, we lose + // it) + CREATE_EVENT(proc->events, process_close_handles, 1, proc); } static void on_process_stream_close(Stream *stream, void *data) diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 7ef2b24b7f..45edc46b95 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,6 +26,7 @@ struct process { process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; bool closed, term_sent; + Queue *events; }; static inline Process process_init(Loop *loop, ProcessType type, void *data) @@ -34,6 +35,7 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) .type = type, .data = data, .loop = loop, + .events = NULL, .pid = 0, .status = 0, .refcount = 0, diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 7283cca02b..94853f616a 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -169,9 +169,16 @@ static void fread_idle_cb(uv_idle_t *handle) invoke_read_cb(stream, false); } -static void invoke_read_cb(Stream *stream, bool eof) +static void read_event(void **argv) { + Stream *stream = argv[0]; if (stream->read_cb) { + bool eof = (uintptr_t)argv[1]; stream->read_cb(stream, stream->buffer, stream->data, eof); } } + +static void invoke_read_cb(Stream *stream, bool eof) +{ + CREATE_EVENT(stream->events, read_event, 2, stream, (void *)(uintptr_t)eof); +} diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c index 63133b4f57..11ce15a882 100644 --- a/src/nvim/event/signal.c +++ b/src/nvim/event/signal.c @@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) watcher->uv.data = watcher; watcher->data = data; watcher->cb = NULL; + watcher->events = loop->fast_events; } void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) @@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void signal_event(void **argv) +{ + SignalWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->uv.signum, watcher->data); +} + static void signal_watcher_cb(uv_signal_t *handle, int signum) { SignalWatcher *watcher = handle->data; - watcher->cb(watcher, signum, watcher->data); + CREATE_EVENT(watcher->events, signal_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h index c269fa9d95..e32608acc0 100644 --- a/src/nvim/event/signal.h +++ b/src/nvim/event/signal.h @@ -14,6 +14,7 @@ struct signal_watcher { void *data; signal_cb cb; signal_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index 2a618d290d..347e464d25 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher, watcher->stream->data = watcher; watcher->cb = NULL; watcher->close_cb = NULL; + watcher->events = NULL; } int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) @@ -143,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) uv_close((uv_handle_t *)watcher->stream, close_cb); } +static void connection_event(void **argv) +{ + SocketWatcher *watcher = argv[0]; + int status = (int)(uintptr_t)(argv[1]); + watcher->cb(watcher, status, watcher->data); +} + static void connection_cb(uv_stream_t *handle, int status) { SocketWatcher *watcher = handle->data; - watcher->cb(watcher, status, watcher->data); + CREATE_EVENT(watcher->events, connection_event, 2, watcher, + (void *)(uintptr_t)status); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index 17fd39f33b..ad59fdbe3a 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -30,6 +30,7 @@ struct socket_watcher { void *data; socket_cb cb; socket_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 72dabc3ce7..6caad6fdcc 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -56,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, if (stream->uvstream) { stream->uvstream->data = stream; + loop = stream->uvstream->loop->data; } stream->data = data; @@ -70,17 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->internal_close_cb = NULL; stream->closed = false; stream->buffer = NULL; + stream->events = NULL; } void stream_close(Stream *stream, stream_close_cb on_stream_close) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); - - if (stream->buffer) { - rbuffer_free(stream->buffer); - } - stream->closed = true; stream->close_cb = on_stream_close; @@ -102,6 +99,9 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + if (stream->buffer) { + rbuffer_free(stream->buffer); + } if (stream->close_cb) { stream->close_cb(stream, stream->data); } diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 37410b2036..eaf4b010f5 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -47,6 +47,7 @@ struct stream { size_t pending_reqs; void *data, *internal_data; bool closed; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c index ce33cdfc10..7bf333bcea 100644 --- a/src/nvim/event/time.c +++ b/src/nvim/event/time.c @@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) uv_timer_init(&loop->uv, &watcher->uv); watcher->uv.data = watcher; watcher->data = data; + watcher->events = loop->fast_events; } void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, @@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void time_event(void **argv) +{ + TimeWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->data); +} + static void time_watcher_cb(uv_timer_t *handle) FUNC_ATTR_NONNULL_ALL { TimeWatcher *watcher = handle->data; - watcher->cb(watcher, watcher->data); + CREATE_EVENT(watcher->events, time_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h index ee50e53d11..7882b2b627 100644 --- a/src/nvim/event/time.h +++ b/src/nvim/event/time.h @@ -12,6 +12,7 @@ struct time_watcher { uv_timer_t uv; void *data; time_cb cb, close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c index f32aee809a..03116d454f 100644 --- a/src/nvim/ex_getln.c +++ b/src/nvim/ex_getln.c @@ -306,7 +306,7 @@ getcmdline ( input_disable_events(); if (c == K_EVENT) { - loop_process_event(&loop); + queue_process_events(loop.events); continue; } diff --git a/src/nvim/globals.h b/src/nvim/globals.h index e4dcad9afb..68cb923e42 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -898,14 +898,6 @@ EXTERN FILE *scriptout INIT(= NULL); /* stream to write script to */ /* volatile because it is used in signal handler catch_sigint(). */ EXTERN volatile int got_int INIT(= FALSE); /* set to TRUE when interrupt signal occurred */ -EXTERN int disable_breakcheck INIT(= 0); // > 0 if breakchecks should be - // ignored. FIXME(tarruda): Hacky - // way to run functions that would - // result in *_breakcheck calls - // while events that would normally - // be deferred are being processed - // immediately. Ref: - // neovim/neovim#2371 EXTERN int bangredo INIT(= FALSE); /* set to TRUE with ! command */ EXTERN int searchcmdlen; /* length of previous search cmd */ EXTERN int reg_do_extmatch INIT(= 0); /* Used when compiling regexp: diff --git a/src/nvim/main.c b/src/nvim/main.c index bfaeada6de..e11db16c61 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -157,11 +157,11 @@ void event_init(void) void event_teardown(void) { - if (!loop.deferred_events) { + if (!loop.events) { return; } - loop_process_all_events(&loop); + queue_process_events(loop.events); input_stop(); channel_teardown(); process_teardown(&loop); diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index ca08af1fe8..6674f3c4e4 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -68,6 +68,7 @@ typedef struct { uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; kvec_t(WBuffer *) delayed_notifications; + Queue *events; } Channel; typedef struct { @@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); + LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -459,22 +460,22 @@ static void handle_request(Channel *channel, msgpack_object *request) handler.async = true; } - bool async = kv_size(channel->call_stack) || handler.async; RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; event_data->args = args; event_data->request_id = request_id; incref(channel); - loop_push_event(&loop, (Event) { - .handler = on_request_event, - .data = event_data - }, !async); + if (handler.async) { + on_request_event((void **)&event_data); + } else { + queue_put(channel->events, on_request_event, 1, event_data); + } } -static void on_request_event(Event event) +static void on_request_event(void **argv) { - RequestEvent *e = event.data; + RequestEvent *e = argv[0]; Channel *channel = e->channel; MsgpackRpcRequestHandler handler = e->handler; Array args = e->args; @@ -649,9 +650,8 @@ static void close_channel(Channel *channel) case kChannelTypeStdio: stream_close(&channel->data.std.in, NULL); stream_close(&channel->data.std.out, NULL); - loop_push_event(&loop, - (Event) { .handler = on_stdio_close, .data = channel }, false); - break; + queue_put(loop.fast_events, exit_event, 1, channel); + return; default: abort(); } @@ -659,9 +659,9 @@ static void close_channel(Channel *channel) decref(channel); } -static void on_stdio_close(Event e) +static void exit_event(void **argv) { - decref(e.data); + decref(argv[0]); if (!exiting) { mch_exit(0); @@ -683,6 +683,7 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); + queue_free(channel->events); xfree(channel); } @@ -694,6 +695,7 @@ static void close_cb(Stream *stream, void *data) static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->events = queue_new_child(loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; diff --git a/src/nvim/normal.c b/src/nvim/normal.c index dd9a4230b5..5b35af9209 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -493,7 +493,7 @@ normal_cmd ( input_disable_events(); if (c == K_EVENT) { - loop_process_event(&loop); + queue_process_events(loop.events); return; } diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index b0263f18d0..8bc713bcff 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -132,7 +132,7 @@ bool os_char_avail(void) // Check for CTRL-C typed by reading all available characters. void os_breakcheck(void) { - if (!disable_breakcheck && !got_int) { + if (!got_int) { loop_poll_events(&loop, 0); } } @@ -292,7 +292,7 @@ static bool input_poll(int ms) prof_inchar_enter(); } - LOOP_POLL_EVENTS_UNTIL(&loop, ms, input_ready() || input_eof); + LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, ms, input_ready() || input_eof); if (do_profiling == PROF_YES && ms) { prof_inchar_exit(); @@ -383,5 +383,5 @@ static void read_error_exit(void) static bool pending_events(void) { - return events_enabled && !kl_empty(loop.deferred_events); + return events_enabled && !queue_empty(loop.events); } diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 958b4483e8..8faa46dd63 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -207,6 +207,8 @@ static int do_os_system(char **argv, Stream in, out, err; UvProcess uvproc = uv_process_init(&loop, &buf); Process *proc = &uvproc.process; + Queue *events = queue_new_child(loop.events); + proc->events = events; proc->argv = argv; proc->in = input != NULL ? &in : NULL; proc->out = &out; @@ -219,14 +221,22 @@ static int do_os_system(char **argv, msg_outtrans((char_u *)prog); msg_putchar('\n'); } + queue_free(events); return -1; } + // We want to deal with stream events as fast a possible while queueing + // process events, so reset everything to NULL. It prevents closing the + // streams while there's still data in the OS buffer(due to the process + // exiting before all data is read). if (input != NULL) { + proc->in->events = NULL; wstream_init(proc->in, 0); } + proc->out->events = NULL; rstream_init(proc->out, 0); rstream_start(proc->out, data_cb); + proc->err->events = NULL; rstream_init(proc->err, 0); rstream_start(proc->err, data_cb); @@ -267,6 +277,9 @@ static int do_os_system(char **argv, } } + assert(queue_empty(events)); + queue_free(events); + return status; } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 6de3435c4c..7158721433 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -115,16 +115,6 @@ static void deadly_signal(int signum) static void on_signal(SignalWatcher *handle, int signum, void *data) { assert(signum >= 0); - loop_push_event(&loop, (Event) { - .handler = on_signal_event, - .data = (void *)(uintptr_t)signum - }, false); -} - -static void on_signal_event(Event event) -{ - int signum = (int)(uintptr_t)event.data; - switch (signum) { #ifdef SIGPWR case SIGPWR: @@ -148,4 +138,3 @@ static void on_signal_event(Event event) break; } } - diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index 6b5d4359db..ee17938afc 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -43,7 +43,7 @@ void os_delay(uint64_t milliseconds, bool ignoreinput) if (milliseconds > INT_MAX) { milliseconds = INT_MAX; } - LOOP_POLL_EVENTS_UNTIL(&loop, (int)milliseconds, got_int); + LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, (int)milliseconds, got_int); } else { os_microdelay(milliseconds * 1000); } diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c index cf68143ac7..b9bc4c6d78 100644 --- a/src/nvim/terminal.c +++ b/src/nvim/terminal.c @@ -325,7 +325,7 @@ void terminal_resize(Terminal *term, uint16_t width, uint16_t height) invalidate_terminal(term, -1, -1); } -void terminal_enter(bool process_deferred) +void terminal_enter(void) { Terminal *term = curbuf->terminal; assert(term && "should only be called when curbuf has a terminal"); @@ -354,15 +354,9 @@ void terminal_enter(bool process_deferred) bool got_bs = false; // True if the last input was while (term->buf == curbuf) { - if (process_deferred) { - input_enable_events(); - } - + input_enable_events(); c = safe_vgetc(); - - if (process_deferred) { - input_disable_events(); - } + input_disable_events(); switch (c) { case K_LEFTMOUSE: @@ -382,7 +376,7 @@ void terminal_enter(bool process_deferred) break; case K_EVENT: - loop_process_event(&loop); + queue_process_events(loop.events); break; case Ctrl_N: @@ -913,18 +907,11 @@ static void refresh_terminal(Terminal *term) // libuv timer callback. This will enqueue on_refresh to be processed as an // event. static void refresh_timer_cb(TimeWatcher *watcher, void *data) -{ - loop_push_event(&loop, (Event) {.handler = on_refresh}, false); - refresh_pending = false; -} - -// Refresh all invalidated terminals -static void on_refresh(Event event) { if (exiting) { // bad things can happen if we redraw when exiting, and there's no need to // update the buffer. - return; + goto end; } Terminal *term; void *stub; (void)(stub); @@ -936,6 +923,8 @@ static void on_refresh(Event event) pmap_clear(ptr_t)(invalidated_terminals); unblock_autocmds(); redraw(true); +end: + refresh_pending = false; } static void refresh_size(Terminal *term) diff --git a/src/nvim/tui/term_input.inl b/src/nvim/tui/term_input.inl index 0a84a3688b..1907d9895e 100644 --- a/src/nvim/tui/term_input.inl +++ b/src/nvim/tui/term_input.inl @@ -206,7 +206,7 @@ static bool handle_forced_escape(TermInput *input) return false; } -static void restart_reading(Event event); +static void restart_reading(void **argv); static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) { @@ -226,8 +226,7 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) // ls *.md | xargs nvim input->in_fd = 2; stream_close(&input->read_stream, NULL); - loop_push_event(&loop, - (Event) { .data = input, .handler = restart_reading }, false); + queue_put(loop.fast_events, restart_reading, 1, input); } else { input_done(); } @@ -272,9 +271,9 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) rbuffer_reset(input->read_stream.buffer); } -static void restart_reading(Event event) +static void restart_reading(void **argv) { - TermInput *input = event.data; + TermInput *input = argv[0]; rstream_init_fd(&loop, &input->read_stream, input->in_fd, 0xfff, input); rstream_start(&input->read_stream, read_cb); } diff --git a/src/nvim/tui/tui.c b/src/nvim/tui/tui.c index 28870b5abb..460fdb81a8 100644 --- a/src/nvim/tui/tui.c +++ b/src/nvim/tui/tui.c @@ -205,24 +205,13 @@ static void tui_stop(UI *ui) xfree(ui); } -static void try_resize(Event ev) +static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data) { - UI *ui = ev.data; + UI *ui = data; update_size(ui); ui_refresh(); } -static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data) -{ - got_winch = true; - // Queue the event because resizing can result in recursive event_poll calls - // FIXME(blueyed): TUI does not resize properly when not deferred. Why? #2322 - loop_push_event(&loop, (Event) { - .data = data, - .handler = try_resize - }, true); -} - static bool attrs_differ(HlAttrs a1, HlAttrs a2) { return a1.foreground != a2.foreground || a1.background != a2.background diff --git a/src/nvim/ui.c b/src/nvim/ui.c index 7e155f9b4f..ad875367c9 100644 --- a/src/nvim/ui.c +++ b/src/nvim/ui.c @@ -214,9 +214,9 @@ void ui_detach(UI *ui) shift_index++; } - ui_count--; - // schedule a refresh - loop_push_event(&loop, (Event) { .handler = refresh }, false); + if (--ui_count) { + ui_refresh(); + } } void ui_clear(void) @@ -485,11 +485,3 @@ static void ui_mode_change(void) UI_CALL(mode_change, mode); conceal_check_cursur_line(); } - -static void refresh(Event event) -{ - if (ui_count) { - ui_refresh(); - } -} - -- cgit From 166d8c799f367e64744cef4d9a6ddd386809ece8 Mon Sep 17 00:00:00 2001 From: oni-link Date: Fri, 7 Aug 2015 23:26:01 -0300 Subject: process: Remove indeterminism that causes reordering of pty events Since pty events are queued, it is possible that the reads will be reordered. Example scenario: In the terminal you have output combined from stdout and stderr. A program generates output, first you have some output on stdout, then output on stderr, output on stdout, output on stderr,... The whole output should be interleaved from both streams. Each output generates a read_event and they are placed in the same queue. If the queue is processed, the first read_event will send the whole stdout output to the terminal (on_job_output() consumes the whole buffer). The next read_event is similar for stderr. The remaining read events do nothing because now both RBuffer are already empty. So the terminal would show first the stdout output and after that the stderr output. This commit fixes the problem by disabling stderr stream in pty processes. That's ok because they all represent the same stream(duplicate file descriptors), plus one stream is simpler to deal with. --- src/nvim/eval.c | 4 +++- src/nvim/event/pty_process.c | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index d61cccd41c..9d838406ac 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -21051,7 +21051,9 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, proc->argv = argv; proc->in = &data->in; proc->out = &data->out; - proc->err = &data->err; + if (!pty) { + proc->err = &data->err; + } proc->cb = on_process_exit; proc->events = data->events; return data; diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c index 0aa0d6fbd4..5a42695ae4 100644 --- a/src/nvim/event/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -37,6 +37,7 @@ bool pty_process_spawn(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { Process *proc = (Process *)ptyproc; + assert(!proc->err); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; struct termios termios; @@ -70,9 +71,6 @@ bool pty_process_spawn(PtyProcess *ptyproc) if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) { goto error; } - if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) { - goto error; - } ptyproc->tty_fd = master; proc->pid = pid; -- cgit From 6b3cd381dcd01268479dc56103498a029133644d Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Wed, 12 Aug 2015 19:16:06 -0300 Subject: rstream: Pass read count to read events This is necessary to keep events in the same order received from the OS. --- src/nvim/eval.c | 35 ++++++++++++++++++++--------------- src/nvim/event/rstream.c | 18 ++++++++++-------- src/nvim/event/stream.h | 8 ++++++-- src/nvim/msgpack_rpc/channel.c | 6 ++++-- src/nvim/os/input.c | 3 ++- src/nvim/os/shell.c | 6 ++++-- src/nvim/tui/term_input.inl | 3 ++- 7 files changed, 48 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index 9d838406ac..ab0c7d79bb 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -21176,38 +21176,43 @@ static inline void process_job_event(TerminalJobData *data, ufunc_T *callback, on_job_event(&event_data); } -static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof) +static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, + void *job, bool eof) { TerminalJobData *data = job; - on_job_output(stream, job, buf, eof, data->on_stdout, "stdout"); + on_job_output(stream, job, buf, count, eof, data->on_stdout, "stdout"); } -static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof) +static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, + void *job, bool eof) { TerminalJobData *data = job; - on_job_output(stream, job, buf, eof, data->on_stderr, "stderr"); + on_job_output(stream, job, buf, count, eof, data->on_stderr, "stderr"); } static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, - bool eof, ufunc_T *callback, const char *type) + size_t count, bool eof, ufunc_T *callback, const char *type) { if (eof) { return; } - RBUFFER_UNTIL_EMPTY(buf, ptr, len) { - // The order here matters, the terminal must receive the data first because - // process_job_event will modify the read buffer(convert NULs into NLs) - if (data->term) { - terminal_receive(data->term, ptr, len); - } + // stub variable, to keep reading consistent with the order of events, only + // consider the count parameter. + size_t r; + char *ptr = rbuffer_read_ptr(buf, &r); - if (callback) { - process_job_event(data, callback, type, ptr, len, 0); - } + // The order here matters, the terminal must receive the data first because + // process_job_event will modify the read buffer(convert NULs into NLs) + if (data->term) { + terminal_receive(data->term, ptr, count); + } - rbuffer_consumed(buf, len); + if (callback) { + process_job_event(data, callback, type, ptr, count, 0); } + + rbuffer_consumed(buf, count); } static void on_process_exit(Process *proc, int status, void *d) diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 94853f616a..0a720bb852 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -114,7 +114,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); - invoke_read_cb(stream, true); + invoke_read_cb(stream, 0, true); } return; } @@ -124,7 +124,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(stream->buffer, nread); - invoke_read_cb(stream, false); + invoke_read_cb(stream, nread, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -158,7 +158,7 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(&stream->uv.idle); - invoke_read_cb(stream, true); + invoke_read_cb(stream, 0, true); return; } @@ -166,19 +166,21 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(stream->buffer, nread); stream->fpos += nread; - invoke_read_cb(stream, false); + invoke_read_cb(stream, nread, false); } static void read_event(void **argv) { Stream *stream = argv[0]; if (stream->read_cb) { - bool eof = (uintptr_t)argv[1]; - stream->read_cb(stream, stream->buffer, stream->data, eof); + size_t count = (uintptr_t)argv[1]; + bool eof = (uintptr_t)argv[2]; + stream->read_cb(stream, stream->buffer, count, stream->data, eof); } } -static void invoke_read_cb(Stream *stream, bool eof) +static void invoke_read_cb(Stream *stream, size_t count, bool eof) { - CREATE_EVENT(stream->events, read_event, 2, stream, (void *)(uintptr_t)eof); + CREATE_EVENT(stream->events, read_event, 3, stream, + (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); } diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index eaf4b010f5..c6baac0db7 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -14,10 +14,14 @@ typedef struct stream Stream; /// /// @param stream The Stream instance /// @param rbuffer The associated RBuffer instance +/// @param count Number of bytes to read. This must be respected if keeping +/// the order of events is a requirement. This is because events +/// may be queued and only processed later when more data is copied +/// into to the buffer, so one read may starve another. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, - bool eof); +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof); /// Type of function called when the Stream has information about a write /// request. diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 6674f3c4e4..0e3b8200c9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -328,7 +328,8 @@ static void channel_from_stdio(void) wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, + void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; @@ -343,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data) decref(data); } -static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, + bool eof) { Channel *channel = data; incref(channel); diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 8bc713bcff..09f162f79d 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -316,7 +316,8 @@ static InbufPollResult inbuf_poll(int ms) return input_eof ? kInputEof : kInputNone; } -static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof) +static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, + bool at_eof) { if (at_eof) { input_eof = true; diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 8faa46dd63..77750bb077 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -298,7 +298,8 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired) buf->data = xrealloc(buf->data, buf->cap); } -static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) +static void system_data_cb(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) { DynamicBuffer *dbuf = data; @@ -308,7 +309,8 @@ static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) dbuf->len += nread; } -static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) +static void out_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data, + bool eof) { size_t cnt; char *ptr = rbuffer_read_ptr(buf, &cnt); diff --git a/src/nvim/tui/term_input.inl b/src/nvim/tui/term_input.inl index 1907d9895e..c396557160 100644 --- a/src/nvim/tui/term_input.inl +++ b/src/nvim/tui/term_input.inl @@ -208,7 +208,8 @@ static bool handle_forced_escape(TermInput *input) static void restart_reading(void **argv); -static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) +static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, + bool eof) { TermInput *input = data; -- cgit From a816c726bbae4361a30c95b1226aaaa1dc76fd24 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Thu, 13 Aug 2015 07:57:20 -0300 Subject: pty_process: Make termios structure a static variable The structure has a constant initializer and is only used for reading. --- src/nvim/event/pty_process.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c index 5a42695ae4..8eef72f12f 100644 --- a/src/nvim/event/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -36,12 +36,15 @@ bool pty_process_spawn(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { + static struct termios termios; + if (!termios.c_cflag) { + init_termios(&termios); + } + Process *proc = (Process *)ptyproc; assert(!proc->err); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; - struct termios termios; - init_termios(&termios); uv_disable_stdio_inheritance(); int master; int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize); @@ -136,7 +139,6 @@ static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL { - memset(termios, 0, sizeof(struct termios)); // Taken from pangoterm termios->c_iflag = ICRNL|IXON; termios->c_oflag = OPOST|ONLCR; -- cgit From f1de097dbb236ea400150f80b909407ca9af7441 Mon Sep 17 00:00:00 2001 From: Thiago de Arruda Date: Thu, 13 Aug 2015 11:53:19 -0300 Subject: eval: Fix jobwait() to process multiple jobs concurrently The new event processing architecture changed `jobwait()` semantics: Only one job is processed at time since process_wait only focuses on one queue. This fixes the problem with a few changes: - Allow the event queue polled by `process_wait` to be overriden by a new argument. - Allow the parent queue to be overriden with `queue_replace_parent` - Create a temporary queue that serves as the parent for all jobs passed to `jobwait()` --- src/nvim/eval.c | 25 ++++++++++++++++++++----- src/nvim/event/loop.c | 4 ++-- src/nvim/event/process.c | 16 ++++++++++------ src/nvim/event/queue.c | 6 ++++++ src/nvim/os/shell.c | 2 +- 5 files changed, 39 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/nvim/eval.c b/src/nvim/eval.c index ab0c7d79bb..c7c67cfca4 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -11037,6 +11037,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) list_T *rv = list_alloc(); ui_busy_start(); + Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop); // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. @@ -11050,6 +11051,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // status code when the job exits list_append_number(rv, -1); data->status_ptr = &rv->lv_last->li_tv.vval.v_number; + // Process any pending events for the job because we'll temporarily + // replace the parent queue + queue_process_events(data->events); + queue_replace_parent(data->events, waiting_jobs); } } @@ -11070,7 +11075,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } - int status = process_wait((Process *)&data->proc, remaining); + int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11090,9 +11095,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) } } - // poll to ensure any pending callbacks from the last job are invoked - loop_poll_events(&loop, 0); - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER @@ -11103,8 +11105,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) // job exits data->status_ptr = NULL; } - ui_busy_stop(); + // restore the parent queue for any jobs still alive + for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { + TerminalJobData *data = NULL; + if (arg->li_tv.v_type != VAR_NUMBER + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + continue; + } + // restore the parent queue for the job + queue_process_events(data->events); + queue_replace_parent(data->events, loop.events); + } + + queue_free(waiting_jobs); + ui_busy_stop(); rv->lv_refcount++; rettv->v_type = VAR_LIST; rettv->vval.v_list = rv; diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index 1a50ec0d9a..3d3288f858 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -22,7 +22,7 @@ void loop_init(Loop *loop, void *data) loop->uv.data = loop; loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; - loop->events = queue_new_parent(on_put, loop); + loop->events = queue_new_parent(loop_on_put, loop); loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); @@ -59,7 +59,7 @@ void loop_poll_events(Loop *loop, int ms) queue_process_events(loop->fast_events); } -static void on_put(Queue *queue, void *data) +void loop_on_put(Queue *queue, void *data) { Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 54dbc11a03..81d4e690c3 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -152,7 +152,7 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL /// indistinguishable from the process returning -1 by itself. Which /// is possible on some OS. Returns -2 if an user has interruped the /// wait. -int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1) { // The default status is -1, which represents a timeout int status = -1; @@ -162,10 +162,14 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL return proc->status; } + if (!events) { + events = proc->events; + } + // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -179,10 +183,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL if (ms == -1) { // We can only return if all streams/handles are closed and the job // exited. - LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, -1, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1, proc->refcount == 1); } else { - LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + LOOP_PROCESS_EVENTS(proc->loop, events, 0); } } @@ -191,9 +195,9 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); - if (proc->events) { + if (events) { // the decref call created an exit event, process it now - queue_process_events(proc->events); + queue_process_events(events); } } else { proc->refcount--; diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c index 3f03dd444e..19eca14144 100644 --- a/src/nvim/event/queue.c +++ b/src/nvim/event/queue.c @@ -152,6 +152,12 @@ bool queue_empty(Queue *queue) return QUEUE_EMPTY(&queue->headtail); } +void queue_replace_parent(Queue *queue, Queue *new_parent) +{ + assert(queue_empty(queue)); + queue->parent = new_parent; +} + static Event queue_remove(Queue *queue) { assert(!queue_empty(queue)); diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 77750bb077..2d97c4bf4f 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -257,7 +257,7 @@ static int do_os_system(char **argv, // the UI ui_busy_start(); ui_flush(); - int status = process_wait(proc, -1); + int status = process_wait(proc, -1, NULL); ui_busy_stop(); // prepare the out parameters if requested -- cgit