diff options
52 files changed, 923 insertions, 409 deletions
diff --git a/.asan-blacklist b/.asan-blacklist new file mode 100644 index 0000000000..bd977dfe17 --- /dev/null +++ b/.asan-blacklist @@ -0,0 +1,2 @@ +# libuv queue.h pointer arithmetic is not accepted by asan +fun:queue_node_data diff --git a/clint-ignored-files.txt b/clint-ignored-files.txt index 2988b15fd2..98a26b7fbc 100644 --- a/clint-ignored-files.txt +++ b/clint-ignored-files.txt @@ -57,6 +57,7 @@ src/nvim/keymap.h src/nvim/lib/khash.h src/nvim/lib/klist.h src/nvim/lib/kvec.h +src/nvim/lib/queue.h src/nvim/macros.h src/nvim/main.c src/nvim/main.h diff --git a/scripts/msgpack-gen.lua b/scripts/msgpack-gen.lua index bb37ae94da..581641ae06 100644 --- a/scripts/msgpack-gen.lua +++ b/scripts/msgpack-gen.lua @@ -34,8 +34,8 @@ c_params = Ct(c_void + c_param_list) c_proto = Ct( Cg(c_type, 'return_type') * Cg(c_id, 'name') * fill * P('(') * fill * Cg(c_params, 'parameters') * fill * P(')') * - Cg(Cc(false), 'deferred') * - (fill * Cg((P('FUNC_ATTR_DEFERRED') * Cc(true)), 'deferred') ^ -1) * + Cg(Cc(false), 'async') * + (fill * Cg((P('FUNC_ATTR_ASYNC') * Cc(true)), 'async') ^ -1) * fill * P(';') ) grammar = Ct((c_proto + c_comment + c_preproc + ws) ^ 1) @@ -279,7 +279,7 @@ for i = 1, #functions do '(String) {.data = "'..fn.name..'", '.. '.size = sizeof("'..fn.name..'") - 1}, '.. '(MsgpackRpcRequestHandler) {.fn = handle_'.. fn.name.. - ', .defer = '..tostring(fn.deferred)..'});\n') + ', .async = '..tostring(fn.async)..'});\n') if #fn.name > max_fname_len then max_fname_len = #fn.name diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index d49b8dc416..747b63b1ba 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -217,7 +217,7 @@ install_helper(TARGETS nvim) if(CLANG_ASAN_UBSAN) message(STATUS "Enabling Clang address sanitizer and undefined behavior sanitizer for nvim.") set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-DEXITFREE ") - set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined ") + set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/.asan-blacklist") set_property(TARGET nvim APPEND_STRING PROPERTY LINK_FLAGS "-fsanitize=address -fsanitize=undefined ") elseif(CLANG_MSAN) message(STATUS "Enabling Clang memory sanitizer for nvim.") diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c index 915c5f74d7..12c97cfffb 100644 --- a/src/nvim/api/buffer.c +++ b/src/nvim/api/buffer.c @@ -70,7 +70,6 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err) /// @param line The new line. /// @param[out] err Details of an error that may have occurred void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) - FUNC_ATTR_DEFERRED { Object l = STRING_OBJ(line); Array array = {.items = &l, .size = 1}; @@ -83,7 +82,6 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) /// @param index The line index /// @param[out] err Details of an error that may have occurred void buffer_del_line(Buffer buffer, Integer index, Error *err) - FUNC_ATTR_DEFERRED { Array array = ARRAY_DICT_INIT; buffer_set_line_slice(buffer, index, index, true, true, array, err); @@ -171,7 +169,6 @@ void buffer_set_line_slice(Buffer buffer, Boolean include_end, ArrayOf(String) replacement, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -339,7 +336,6 @@ Object buffer_get_var(Buffer buffer, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object buffer_set_var(Buffer buffer, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -375,7 +371,6 @@ Object buffer_get_option(Buffer buffer, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void buffer_set_option(Buffer buffer, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -426,7 +421,6 @@ String buffer_get_name(Buffer buffer, Error *err) /// @param name The buffer name /// @param[out] err Details of an error that may have occurred void buffer_set_name(Buffer buffer, String name, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -472,7 +466,6 @@ void buffer_insert(Buffer buffer, Integer lnum, ArrayOf(String) lines, Error *err) - FUNC_ATTR_DEFERRED { buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err); } diff --git a/src/nvim/api/tabpage.c b/src/nvim/api/tabpage.c index 1c958118e1..126ee4072d 100644 --- a/src/nvim/api/tabpage.c +++ b/src/nvim/api/tabpage.c @@ -62,7 +62,6 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The tab page handle Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { tabpage_T *tab = find_tab_by_handle(tabpage, err); diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index f5dadb00ea..b9900b5d5a 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -37,7 +37,6 @@ /// @param str The command str /// @param[out] err Details of an error that may have occurred void vim_command(String str, Error *err) - FUNC_ATTR_DEFERRED { // Run the command try_start(); @@ -54,7 +53,6 @@ void vim_command(String str, Error *err) /// @see feedkeys() /// @see vim_strsave_escape_csi void vim_feedkeys(String keys, String mode, Boolean escape_csi) - FUNC_ATTR_DEFERRED { bool remap = true; bool insert = false; @@ -100,6 +98,7 @@ void vim_feedkeys(String keys, String mode, Boolean escape_csi) /// @return The number of bytes actually written, which can be lower than /// requested if the buffer becomes full. Integer vim_input(String keys) + FUNC_ATTR_ASYNC { return (Integer)input_enqueue(keys); } @@ -143,7 +142,6 @@ String vim_command_output(String str, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The expanded object Object vim_eval(String str, Error *err) - FUNC_ATTR_DEFERRED { Object rv = OBJECT_INIT; // Evaluate the expression @@ -171,7 +169,6 @@ Object vim_eval(String str, Error *err) /// @param[out] err Details of an error that may have occurred /// @return Result of the function call Object vim_call_function(String fname, Array args, Error *err) - FUNC_ATTR_DEFERRED { Object rv = OBJECT_INIT; if (args.size > MAX_FUNC_ARGS) { @@ -312,7 +309,6 @@ String vim_get_current_line(Error *err) /// @param line The line contents /// @param[out] err Details of an error that may have occurred void vim_set_current_line(String line, Error *err) - FUNC_ATTR_DEFERRED { buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err); } @@ -321,7 +317,6 @@ void vim_set_current_line(String line, Error *err) /// /// @param[out] err Details of an error that may have occurred void vim_del_current_line(Error *err) - FUNC_ATTR_DEFERRED { buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err); } @@ -343,7 +338,6 @@ Object vim_get_var(String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return the old value if any Object vim_set_var(String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { return dict_set_value(&globvardict, name, value, err); } @@ -374,7 +368,6 @@ Object vim_get_option(String name, Error *err) /// @param value The new option value /// @param[out] err Details of an error that may have occurred void vim_set_option(String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { set_option_to(NULL, SREQ_GLOBAL, name, value, err); } @@ -383,7 +376,6 @@ void vim_set_option(String name, Object value, Error *err) /// /// @param str The message void vim_out_write(String str) - FUNC_ATTR_DEFERRED { write_msg(str, false); } @@ -392,7 +384,6 @@ void vim_out_write(String str) /// /// @param str The message void vim_err_write(String str) - FUNC_ATTR_DEFERRED { write_msg(str, true); } @@ -402,7 +393,6 @@ void vim_err_write(String str) /// /// @param str The message void vim_report_error(String str) - FUNC_ATTR_DEFERRED { vim_err_write(str); vim_err_write((String) {.data = "\n", .size = 1}); @@ -442,7 +432,6 @@ Buffer vim_get_current_buffer(void) /// @param id The buffer handle /// @param[out] err Details of an error that may have occurred void vim_set_current_buffer(Buffer buffer, Error *err) - FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -493,7 +482,6 @@ Window vim_get_current_window(void) /// /// @param handle The window handle void vim_set_current_window(Window window, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -545,7 +533,6 @@ Tabpage vim_get_current_tabpage(void) /// @param handle The tab page handle /// @param[out] err Details of an error that may have occurred void vim_set_current_tabpage(Tabpage tabpage, Error *err) - FUNC_ATTR_DEFERRED { tabpage_T *tp = find_tab_by_handle(tabpage, err); @@ -609,6 +596,7 @@ Dictionary vim_get_color_map(void) Array vim_get_api_info(uint64_t channel_id) + FUNC_ATTR_ASYNC { Array rv = ARRAY_DICT_INIT; diff --git a/src/nvim/api/window.c b/src/nvim/api/window.c index 5034c26c83..aad616c7bf 100644 --- a/src/nvim/api/window.c +++ b/src/nvim/api/window.c @@ -54,7 +54,6 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err) /// @param pos the (row, col) tuple representing the new position /// @param[out] err Details of an error that may have occurred void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -118,7 +117,6 @@ Integer window_get_height(Window window, Error *err) /// @param height the new height in rows /// @param[out] err Details of an error that may have occurred void window_set_height(Window window, Integer height, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -162,7 +160,6 @@ Integer window_get_width(Window window, Error *err) /// @param width the new width in columns /// @param[out] err Details of an error that may have occurred void window_set_width(Window window, Integer width, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -208,7 +205,6 @@ Object window_get_var(Window window, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object window_set_var(Window window, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -244,7 +240,6 @@ Object window_get_option(Window window, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void window_set_option(Window window, String name, Object value, Error *err) - FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); diff --git a/src/nvim/edit.c b/src/nvim/edit.c index 390d62210b..6bcf5e804a 100644 --- a/src/nvim/edit.c +++ b/src/nvim/edit.c @@ -254,7 +254,7 @@ edit ( ) { if (curbuf->terminal) { - terminal_enter(true); + terminal_enter(); return false; } @@ -601,15 +601,15 @@ edit ( * Get a character for Insert mode. Ignore K_IGNORE. */ lastc = c; /* remember previous char for CTRL-D */ - loop_enable_deferred_events(&loop); + input_enable_events(); do { c = safe_vgetc(); } while (c == K_IGNORE); - loop_disable_deferred_events(&loop); + input_disable_events(); if (c == K_EVENT) { c = lastc; - loop_process_event(&loop); + queue_process_events(loop.events); continue; } diff --git a/src/nvim/eval.c b/src/nvim/eval.c index aa9e656913..c7c67cfca4 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -468,6 +468,7 @@ typedef struct { dict_T *self; int *status_ptr; uint64_t id; + Queue *events; } TerminalJobData; /// Structure representing current VimL to messagepack conversion state @@ -493,6 +494,13 @@ typedef struct { /// Stack used to convert VimL values to messagepack. typedef kvec_t(MPConvStackVal) MPConvStack; +typedef struct { + TerminalJobData *data; + ufunc_T *callback; + const char *type; + list_T *received; + int status; +} JobEvent; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "eval.c.generated.h" @@ -501,15 +509,6 @@ typedef kvec_t(MPConvStackVal) MPConvStack; #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ -// Memory pool for reusing JobEvent structures -typedef struct { - TerminalJobData *data; - ufunc_T *callback; - const char *type; - list_T *received; - int status; -} JobEvent; -static int disable_job_defer = 0; static uint64_t current_job_id = 1; static PMap(uint64_t) *jobs = NULL; @@ -10778,7 +10777,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv) return; } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -10819,7 +10818,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv) return; } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -10860,7 +10859,7 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv) } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); + TerminalJobData *data = find_job(argvars[0].vval.v_number); if (!data) { EMSG(_(e_invjob)); return; @@ -11007,8 +11006,8 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv) } - TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number); - if (!data || data->stopped) { + TerminalJobData *data = find_job(argvars[0].vval.v_number); + if (!data) { EMSG(_(e_invjob)); return; } @@ -11038,28 +11037,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) list_T *rv = list_alloc(); ui_busy_start(); - // disable breakchecks, which could result in job callbacks being executed - // at unexpected places - disable_breakcheck++; - // disable job event deferring so the callbacks are processed while waiting. - if (!disable_job_defer++) { - // process any pending job events in the deferred queue, but only do this if - // deferred is not disabled(at the top-level `jobwait()` call) - loop_process_event(&loop); - } + Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop); // For each item in the input list append an integer to the output list. -3 // is used to represent an invalid job id, -2 is for a interrupted job and // -1 for jobs that were skipped or timed out. for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { list_append_number(rv, -3); } else { // append the list item and set the status pointer so we'll collect the // status code when the job exits list_append_number(rv, -1); data->status_ptr = &rv->lv_last->li_tv.vval.v_number; + // Process any pending events for the job because we'll temporarily + // replace the parent queue + queue_process_events(data->events); + queue_replace_parent(data->events, waiting_jobs); } } @@ -11077,10 +11072,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) break; } if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } - int status = process_wait((Process *)&data->proc, remaining); + int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); if (status < 0) { // interrupted or timed out, skip remaining jobs. if (status == -2) { @@ -11100,23 +11095,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv) } } - // poll to ensure any pending callbacks from the last job are invoked - loop_poll_events(&loop, 0); - for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { TerminalJobData *data = NULL; if (arg->li_tv.v_type != VAR_NUMBER - || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + || !(data = find_job(arg->li_tv.vval.v_number))) { continue; } // remove the status pointer because the list may be freed before the // job exits data->status_ptr = NULL; } - disable_job_defer--; - disable_breakcheck--; - ui_busy_stop(); + // restore the parent queue for any jobs still alive + for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { + TerminalJobData *data = NULL; + if (arg->li_tv.v_type != VAR_NUMBER + || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { + continue; + } + // restore the parent queue for the job + queue_process_events(data->events); + queue_replace_parent(data->events, loop.events); + } + + queue_free(waiting_jobs); + ui_busy_stop(); rv->lv_refcount++; rettv->v_type = VAR_LIST; rettv->vval.v_list = rv; @@ -21053,17 +21056,21 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout, data->on_stderr = on_stderr; data->on_exit = on_exit; data->self = self; + data->events = queue_new_child(loop.events); if (pty) { - data->proc.pty = pty_process_init(data); + data->proc.pty = pty_process_init(&loop, data); } else { - data->proc.uv = uv_process_init(data); + data->proc.uv = uv_process_init(&loop, data); } Process *proc = (Process *)&data->proc; proc->argv = argv; proc->in = &data->in; proc->out = &data->out; - proc->err = &data->err; + if (!pty) { + proc->err = &data->err; + } proc->cb = on_process_exit; + proc->events = data->events; return data; } @@ -21094,8 +21101,12 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) { data->refcount++; Process *proc = (Process *)&data->proc; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { EMSG(_(e_jobexe)); + if (proc->type == kProcessTypePty) { + xfree(data->proc.pty.term_name); + free_term_job_data(data); + } return false; } @@ -21114,7 +21125,9 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) return true; } -static inline void free_term_job_data(TerminalJobData *data) { +static inline void free_term_job_data_event(void **argv) +{ + TerminalJobData *data = argv[0]; if (data->on_stdout) { user_func_unref(data->on_stdout); } @@ -21129,17 +21142,25 @@ static inline void free_term_job_data(TerminalJobData *data) { data->self->internal_refcount--; dict_unref(data->self); } + queue_free(data->events); xfree(data); } +static inline void free_term_job_data(TerminalJobData *data) +{ + // data->queue may still be used after this function returns(process_wait), so + // only free in the next event loop iteration + queue_put(loop.fast_events, free_term_job_data_event, 1, data); +} + // vimscript job callbacks must be executed on Nvim main loop -static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, +static inline void process_job_event(TerminalJobData *data, ufunc_T *callback, const char *type, char *buf, size_t count, int status) { - JobEvent *event_data = xmalloc(sizeof(JobEvent)); - event_data->received = NULL; + JobEvent event_data; + event_data.received = NULL; if (buf) { - event_data->received = list_alloc(); + event_data.received = list_alloc(); char *ptr = buf; size_t remaining = count; size_t off = 0; @@ -21147,7 +21168,7 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, while (off < remaining) { // append the line if (ptr[off] == NL) { - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); size_t skip = off + 1; ptr += skip; remaining -= skip; @@ -21160,51 +21181,53 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback, } off++; } - list_append_string(event_data->received, (uint8_t *)ptr, off); + list_append_string(event_data.received, (uint8_t *)ptr, off); } else { - event_data->status = status; + event_data.status = status; } - event_data->data = data; - event_data->callback = callback; - event_data->type = type; - loop_push_event(&loop, (Event) { - .handler = on_job_event, - .data = event_data - }, !disable_job_defer); + event_data.data = data; + event_data.callback = callback; + event_data.type = type; + on_job_event(&event_data); } -static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof) +static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, + void *job, bool eof) { TerminalJobData *data = job; - on_job_output(stream, job, buf, eof, data->on_stdout, "stdout"); + on_job_output(stream, job, buf, count, eof, data->on_stdout, "stdout"); } -static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof) +static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, + void *job, bool eof) { TerminalJobData *data = job; - on_job_output(stream, job, buf, eof, data->on_stderr, "stderr"); + on_job_output(stream, job, buf, count, eof, data->on_stderr, "stderr"); } static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, - bool eof, ufunc_T *callback, const char *type) + size_t count, bool eof, ufunc_T *callback, const char *type) { if (eof) { return; } - RBUFFER_UNTIL_EMPTY(buf, ptr, len) { - // The order here matters, the terminal must receive the data first because - // push_job_event will modify the read buffer(convert NULs into NLs) - if (data->term) { - terminal_receive(data->term, ptr, len); - } + // stub variable, to keep reading consistent with the order of events, only + // consider the count parameter. + size_t r; + char *ptr = rbuffer_read_ptr(buf, &r); - if (callback) { - push_job_event(data, callback, type, ptr, len, 0); - } + // The order here matters, the terminal must receive the data first because + // process_job_event will modify the read buffer(convert NULs into NLs) + if (data->term) { + terminal_receive(data->term, ptr, count); + } - rbuffer_consumed(buf, len); + if (callback) { + process_job_event(data, callback, type, ptr, count, 0); } + + rbuffer_consumed(buf, count); } static void on_process_exit(Process *proc, int status, void *d) @@ -21220,7 +21243,7 @@ static void on_process_exit(Process *proc, int status, void *d) *data->status_ptr = status; } - push_job_event(data, data->on_exit, "exit", NULL, 0, status); + process_job_event(data, data->on_exit, "exit", NULL, 0, status); } static void term_write(char *buf, size_t size, void *d) @@ -21254,10 +21277,8 @@ static void term_job_data_decref(TerminalJobData *data) } } -static void on_job_event(Event event) +static void on_job_event(JobEvent *ev) { - JobEvent *ev = event.data; - if (!ev->callback) { goto end; } @@ -21302,7 +21323,15 @@ end: pmap_del(uint64_t)(jobs, ev->data->id); term_job_data_decref(ev->data); } - xfree(ev); +} + +static TerminalJobData *find_job(uint64_t id) +{ + TerminalJobData *data = pmap_get(uint64_t)(jobs, id); + if (!data || data->stopped) { + return NULL; + } + return data; } static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h new file mode 100644 index 0000000000..5126d52241 --- /dev/null +++ b/src/nvim/event/defs.h @@ -0,0 +1,39 @@ +#ifndef NVIM_EVENT_DEFS_H +#define NVIM_EVENT_DEFS_H + +#include <assert.h> +#include <stdarg.h> + +#define EVENT_HANDLER_MAX_ARGC 4 + +typedef void (*argv_callback)(void **argv); +typedef struct message { + int priority; + argv_callback handler; + void *argv[EVENT_HANDLER_MAX_ARGC]; +} Event; + +#define VA_EVENT_INIT(event, p, h, a) \ + do { \ + assert(a <= EVENT_HANDLER_MAX_ARGC); \ + (event)->priority = p; \ + (event)->handler = h; \ + if (a) { \ + va_list args; \ + va_start(args, a); \ + for (int i = 0; i < a; i++) { \ + (event)->argv[i] = va_arg(args, void *); \ + } \ + va_end(args); \ + } \ + } while (0) + +static inline Event event_create(int priority, argv_callback cb, int argc, ...) +{ + assert(argc <= EVENT_HANDLER_MAX_ARGC); + Event event; + VA_EVENT_INIT(&event, priority, cb, argc); + return event; +} + +#endif // NVIM_EVENT_DEFS_H diff --git a/src/nvim/event/loop.c b/src/nvim/event/loop.c index d90565002e..3d3288f858 100644 --- a/src/nvim/event/loop.c +++ b/src/nvim/event/loop.c @@ -1,3 +1,4 @@ +#include <stdarg.h> #include <stdint.h> #include <uv.h> @@ -9,17 +10,23 @@ # include "event/loop.c.generated.h" #endif +typedef struct idle_event { + uv_idle_t idle; + Event event; +} IdleEvent; + void loop_init(Loop *loop, void *data) { uv_loop_init(&loop->uv); loop->uv.data = loop; - loop->deferred_events = kl_init(Event); - loop->immediate_events = kl_init(Event); loop->children = kl_init(WatcherPtr); loop->children_stop_requests = 0; + loop->events = queue_new_parent(loop_on_put, loop); + loop->fast_events = queue_new_child(loop->events); uv_signal_init(&loop->uv, &loop->children_watcher); uv_timer_init(&loop->uv, &loop->children_kill_timer); + uv_timer_init(&loop->uv, &loop->poll_timer); } void loop_poll_events(Loop *loop, int ms) @@ -30,89 +37,36 @@ void loop_poll_events(Loop *loop, int ms) abort(); // Should not re-enter uv_run } - bool wait = true; - uv_timer_t timer; + uv_run_mode mode = UV_RUN_ONCE; if (ms > 0) { - uv_timer_init(&loop->uv, &timer); // Use a repeating timeout of ms milliseconds to make sure // we do not block indefinitely for I/O. - uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms); + uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms); } else if (ms == 0) { // For ms == 0, we need to do a non-blocking event poll by // setting the run mode to UV_RUN_NOWAIT. - wait = false; + mode = UV_RUN_NOWAIT; } - if (wait) { - loop_run_once(loop); - } else { - loop_run_nowait(loop); - } + uv_run(&loop->uv, mode); if (ms > 0) { - // Ensure the timer handle is closed and run the event loop - // once more to let libuv perform it's cleanup - uv_timer_stop(&timer); - uv_close((uv_handle_t *)&timer, NULL); - loop_run_nowait(loop); + uv_timer_stop(&loop->poll_timer); } recursive--; // Can re-enter uv_run now - process_events_from(loop->immediate_events); -} - -bool loop_has_deferred_events(Loop *loop) -{ - return loop->deferred_events_allowed && !kl_empty(loop->deferred_events); + queue_process_events(loop->fast_events); } -void loop_enable_deferred_events(Loop *loop) -{ - ++loop->deferred_events_allowed; -} - -void loop_disable_deferred_events(Loop *loop) -{ - --loop->deferred_events_allowed; -} - -// Queue an event -void loop_push_event(Loop *loop, Event event, bool deferred) +void loop_on_put(Queue *queue, void *data) { + Loop *loop = data; // Sometimes libuv will run pending callbacks(timer for example) before // blocking for a poll. If this happens and the callback pushes a event to one // of the queues, the event would only be processed after the poll // returns(user hits a key for example). To avoid this scenario, we call // uv_stop when a event is enqueued. - loop_stop(loop); - kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events, - event); -} - -void loop_process_event(Loop *loop) -{ - process_events_from(loop->deferred_events); -} - - -void loop_run(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_DEFAULT); -} - -void loop_run_once(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_ONCE); -} - -void loop_run_nowait(Loop *loop) -{ - uv_run(&loop->uv, UV_RUN_NOWAIT); -} - -void loop_stop(Loop *loop) -{ uv_stop(&loop->uv); } @@ -120,25 +74,12 @@ void loop_close(Loop *loop) { uv_close((uv_handle_t *)&loop->children_watcher, NULL); uv_close((uv_handle_t *)&loop->children_kill_timer, NULL); + uv_close((uv_handle_t *)&loop->poll_timer, NULL); do { uv_run(&loop->uv, UV_RUN_DEFAULT); } while (uv_loop_close(&loop->uv)); } -void loop_process_all_events(Loop *loop) -{ - process_events_from(loop->immediate_events); - process_events_from(loop->deferred_events); -} - -static void process_events_from(klist_t(Event) *queue) -{ - while (!kl_empty(queue)) { - Event event = kl_shift(Event, queue); - event.handler(event); - } -} - static void timer_cb(uv_timer_t *handle) { } diff --git a/src/nvim/event/loop.h b/src/nvim/event/loop.h index 5eb4d32ca8..9212a45aa4 100644 --- a/src/nvim/event/loop.h +++ b/src/nvim/event/loop.h @@ -7,38 +7,39 @@ #include "nvim/lib/klist.h" #include "nvim/os/time.h" - -typedef struct event Event; -typedef void (*event_handler)(Event event); - -struct event { - void *data; - event_handler handler; -}; +#include "nvim/event/queue.h" typedef void * WatcherPtr; #define _noop(x) KLIST_INIT(WatcherPtr, WatcherPtr, _noop) -KLIST_INIT(Event, Event, _noop) typedef struct loop { uv_loop_t uv; - klist_t(Event) *deferred_events, *immediate_events; - int deferred_events_allowed; + Queue *events, *fast_events; klist_t(WatcherPtr) *children; uv_signal_t children_watcher; - uv_timer_t children_kill_timer; + uv_timer_t children_kill_timer, poll_timer; size_t children_stop_requests; } Loop; +#define CREATE_EVENT(queue, handler, argc, ...) \ + do { \ + if (queue) { \ + queue_put((queue), (handler), argc, __VA_ARGS__); \ + } else { \ + void *argv[argc] = {__VA_ARGS__}; \ + (handler)(argv); \ + } \ + } while (0) + // Poll for events until a condition or timeout -#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \ +#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \ do { \ int remaining = timeout; \ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ while (!(condition)) { \ - loop_poll_events(loop, remaining); \ + LOOP_PROCESS_EVENTS(loop, queue, remaining); \ if (remaining == 0) { \ break; \ } else if (remaining > 0) { \ @@ -52,6 +53,16 @@ typedef struct loop { } \ } while (0) +#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \ + do { \ + if (queue && !queue_empty(queue)) { \ + queue_process_events(queue); \ + } else { \ + loop_poll_events(loop, timeout); \ + } \ + } while (0) + + #ifdef INCLUDE_GENERATED_DECLARATIONS # include "event/loop.h.generated.h" #endif diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 2b1f1ae096..81d4e690c3 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -22,7 +22,7 @@ #define TERM_TIMEOUT 1000000000 #define KILL_TIMEOUT (TERM_TIMEOUT * 2) -#define CLOSE_PROC_STREAM(proc, stream) \ +#define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ stream_close(proc->stream, NULL); \ @@ -30,19 +30,18 @@ } while (0) -bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL +bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL { - proc->loop = loop; if (proc->in) { - uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0); } if (proc->out) { - uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0); } if (proc->err) { - uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0); } bool success; @@ -77,6 +76,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->in) { stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -84,6 +84,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->out) { stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -91,6 +92,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL if (proc->err) { stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; proc->refcount++; @@ -99,7 +101,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL proc->internal_exit_cb = on_process_exit; proc->internal_close_cb = decref; proc->refcount++; - kl_push(WatcherPtr, loop->children, proc); + kl_push(WatcherPtr, proc->loop->children, proc); return true; } @@ -113,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL } // Wait until all children exit - LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children)); + LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children)); pty_process_teardown(loop); } @@ -150,16 +152,24 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL /// indistinguishable from the process returning -1 by itself. Which /// is possible on some OS. Returns -2 if an user has interruped the /// wait. -int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL +int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1) { // The default status is -1, which represents a timeout int status = -1; bool interrupted = false; + if (!proc->refcount) { + LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0); + return proc->status; + } + + if (!events) { + events = proc->events; + } // Increase refcount to stop the exit callback from being called(and possibly // being freed) before we have a chance to get the status. proc->refcount++; - LOOP_POLL_EVENTS_UNTIL(proc->loop, ms, + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms, // Until... got_int || // interrupted by the user proc->refcount == 1); // job exited @@ -171,12 +181,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL got_int = false; process_stop(proc); if (ms == -1) { - // We can only return, if all streams/handles are closed and the job - + // We can only return if all streams/handles are closed and the job // exited. - LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1); + LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1, + proc->refcount == 1); } else { - loop_poll_events(proc->loop, 0); + LOOP_PROCESS_EVENTS(proc->loop, events, 0); } } @@ -185,6 +195,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL // resources status = interrupted ? -2 : proc->status; decref(proc); + if (events) { + // the decref call created an exit event, process it now + queue_process_events(events); + } } else { proc->refcount--; } @@ -250,6 +264,18 @@ static void children_kill_cb(uv_timer_t *handle) } } +static void process_close_event(void **argv) +{ + Process *proc = argv[0]; + shell_free_argv(proc->argv); + if (proc->type == kProcessTypePty) { + xfree(((PtyProcess *)proc)->term_name); + } + if (proc->cb) { + proc->cb(proc, proc->status, proc->data); + } +} + static void decref(Process *proc) { if (--proc->refcount != 0) { @@ -264,16 +290,9 @@ static void decref(Process *proc) break; } } - assert(node); kl_shift_at(WatcherPtr, loop->children, node); - shell_free_argv(proc->argv); - if (proc->type == kProcessTypePty) { - xfree(((PtyProcess *)proc)->term_name); - } - if (proc->cb) { - proc->cb(proc, proc->status, proc->data); - } + CREATE_EVENT(proc->events, process_close_event, 1, proc); } static void process_close(Process *proc) @@ -293,28 +312,27 @@ static void process_close(Process *proc) } } -static void on_process_exit(Process *proc) +static void process_close_handles(void **argv) { - if (exiting) { - on_process_exit_event((Event) {.data = proc}); - } else { - loop_push_event(proc->loop, - (Event) {.handler = on_process_exit_event, .data = proc}, false); - } + Process *proc = argv[0]; + process_close_streams(proc); + process_close(proc); +} +static void on_process_exit(Process *proc) +{ Loop *loop = proc->loop; - if (loop->children_stop_requests && !--loop->children_stop_requests) { + if (proc->stopped_time && loop->children_stop_requests + && !--loop->children_stop_requests) { // Stop the timer if no more stop requests are pending DLOG("Stopping process kill timer"); uv_timer_stop(&loop->children_kill_timer); } -} - -static void on_process_exit_event(Event event) -{ - Process *proc = event.data; - process_close_streams(proc); - process_close(proc); + // Process handles are closed in the next event loop tick. This is done to + // give libuv more time to read data from the OS after the process exits(If + // process_close_streams is called with data still in the OS buffer, we lose + // it) + CREATE_EVENT(proc->events, process_close_handles, 1, proc); } static void on_process_stream_close(Stream *stream, void *data) diff --git a/src/nvim/event/process.h b/src/nvim/event/process.h index 5c84a7d1d0..45edc46b95 100644 --- a/src/nvim/event/process.h +++ b/src/nvim/event/process.h @@ -26,14 +26,16 @@ struct process { process_exit_cb cb; internal_process_cb internal_exit_cb, internal_close_cb; bool closed, term_sent; + Queue *events; }; -static inline Process process_init(ProcessType type, void *data) +static inline Process process_init(Loop *loop, ProcessType type, void *data) { return (Process) { .type = type, .data = data, - .loop = NULL, + .loop = loop, + .events = NULL, .pid = 0, .status = 0, .refcount = 0, diff --git a/src/nvim/event/pty_process.c b/src/nvim/event/pty_process.c index 1e24d7c919..8eef72f12f 100644 --- a/src/nvim/event/pty_process.c +++ b/src/nvim/event/pty_process.c @@ -33,17 +33,18 @@ # include "event/pty_process.c.generated.h" #endif -static const unsigned int KILL_RETRIES = 5; -static const unsigned int KILL_TIMEOUT = 2; // seconds - bool pty_process_spawn(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL { + static struct termios termios; + if (!termios.c_cflag) { + init_termios(&termios); + } + Process *proc = (Process *)ptyproc; + assert(!proc->err); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0}; - struct termios termios; - init_termios(&termios); uv_disable_stdio_inheritance(); int master; int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize); @@ -73,9 +74,6 @@ bool pty_process_spawn(PtyProcess *ptyproc) if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) { goto error; } - if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) { - goto error; - } ptyproc->tty_fd = master; proc->pid = pid; @@ -83,19 +81,8 @@ bool pty_process_spawn(PtyProcess *ptyproc) error: close(master); - - // terminate spawned process - kill(pid, SIGTERM); - int status, child; - unsigned int try = 0; - while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) { - sleep(KILL_TIMEOUT); - } - if (child != pid) { - kill(pid, SIGKILL); - waitpid(pid, NULL, 0); - } - + kill(pid, SIGKILL); + waitpid(pid, NULL, 0); return false; } @@ -152,7 +139,6 @@ static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL { - memset(termios, 0, sizeof(struct termios)); // Taken from pangoterm termios->c_iflag = ICRNL|IXON; termios->c_oflag = OPOST|ONLCR; diff --git a/src/nvim/event/pty_process.h b/src/nvim/event/pty_process.h index a12b5489c5..446d7fd3c8 100644 --- a/src/nvim/event/pty_process.h +++ b/src/nvim/event/pty_process.h @@ -13,10 +13,10 @@ typedef struct pty_process { int tty_fd; } PtyProcess; -static inline PtyProcess pty_process_init(void *data) +static inline PtyProcess pty_process_init(Loop *loop, void *data) { PtyProcess rv; - rv.process = process_init(kProcessTypePty, data); + rv.process = process_init(loop, kProcessTypePty, data); rv.term_name = NULL; rv.width = 80; rv.height = 24; diff --git a/src/nvim/event/queue.c b/src/nvim/event/queue.c new file mode 100644 index 0000000000..19eca14144 --- /dev/null +++ b/src/nvim/event/queue.c @@ -0,0 +1,208 @@ +// Queue for selective async event processing. Instances of this queue support a +// parent/child relationship with the following properties: +// +// - pushing a node to a child queue will push a corresponding link node to the +// parent queue +// - removing a link node from a parent queue will remove the next node +// in the linked child queue +// - removing a node from a child queue will remove the corresponding link node +// in the parent queue +// +// These properties allow neovim to organize and process events from different +// sources with a certain degree of control. Here's how the queue is used: +// +// +----------------+ +// | Main loop | +// +----------------+ +// ^ +// | +// +----------------+ +// +-------------->| Event loop |<------------+ +// | +--+-------------+ | +// | ^ ^ | +// | | | | +// +-----------+ +-----------+ +---------+ +---------+ +// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 | +// +-----------+ +-----------+ +---------+ +---------+ +// +// +// In the above diagram, the lower boxes represents event emitters, each with +// it's own private queue that have the event loop queue as the parent. +// +// When idle, the main loop spins the event loop which queues events from many +// sources(channels, jobs, user...). Each event emitter pushes events to its own +// private queue which is propagated to the event loop queue. When the main loop +// consumes an event, the corresponding event is removed from the emitter's +// queue. +// +// The main reason for this queue hierarchy is to allow focusing on a single +// event emitter while blocking the main loop. For example, if the `jobwait` +// vimscript function is called on job1, the main loop will temporarily stop +// polling the event loop queue and poll job1 queue instead. Same with channels, +// when calling `rpcrequest`, we want to temporarily stop processing events from +// other sources and focus on a specific channel. + +#include <assert.h> +#include <stdarg.h> +#include <stdbool.h> +#include <stdint.h> + + +#include <uv.h> + +#include "nvim/event/queue.h" +#include "nvim/memory.h" +#include "nvim/os/time.h" + +typedef struct queue_item QueueItem; +struct queue_item { + union { + Queue *queue; + struct { + Event event; + QueueItem *parent; + } item; + } data; + bool link; // this is just a link to a node in a child queue + QUEUE node; +}; + +struct queue { + Queue *parent; + QUEUE headtail; + put_callback put_cb; + void *data; +}; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/queue.c.generated.h" +#endif + +static Event NILEVENT = {.handler = NULL, .argv = {NULL}}; + +Queue *queue_new_parent(put_callback put_cb, void *data) +{ + return queue_new(NULL, put_cb, data); +} + +Queue *queue_new_child(Queue *parent) + FUNC_ATTR_NONNULL_ALL +{ + assert(!parent->parent); + return queue_new(parent, NULL, NULL); +} + +static Queue *queue_new(Queue *parent, put_callback put_cb, void *data) +{ + Queue *rv = xmalloc(sizeof(Queue)); + QUEUE_INIT(&rv->headtail); + rv->parent = parent; + rv->put_cb = put_cb; + rv->data = data; + return rv; +} + +void queue_free(Queue *queue) +{ + assert(queue); + if (queue->parent) { + while (!QUEUE_EMPTY(&queue->headtail)) { + QUEUE *q = QUEUE_HEAD(&queue->headtail); + QueueItem *item = queue_node_data(q); + assert(!item->link); + QUEUE_REMOVE(&item->data.item.parent->node); + xfree(item->data.item.parent); + QUEUE_REMOVE(q); + xfree(item); + } + } + + xfree(queue); +} + +Event queue_get(Queue *queue) +{ + return queue_empty(queue) ? NILEVENT : queue_remove(queue); +} + +void queue_put_event(Queue *queue, Event event) +{ + assert(queue); + assert(queue->parent); // don't push directly to the parent queue + queue_push(queue, event); + if (queue->parent->put_cb) { + queue->parent->put_cb(queue->parent, queue->parent->data); + } +} + +void queue_process_events(Queue *queue) +{ + assert(queue); + while (!queue_empty(queue)) { + Event event = queue_get(queue); + if (event.handler) { + event.handler(event.argv); + } + } +} + +bool queue_empty(Queue *queue) +{ + assert(queue); + return QUEUE_EMPTY(&queue->headtail); +} + +void queue_replace_parent(Queue *queue, Queue *new_parent) +{ + assert(queue_empty(queue)); + queue->parent = new_parent; +} + +static Event queue_remove(Queue *queue) +{ + assert(!queue_empty(queue)); + QUEUE *h = QUEUE_HEAD(&queue->headtail); + QUEUE_REMOVE(h); + QueueItem *item = queue_node_data(h); + Event rv; + + if (item->link) { + assert(!queue->parent); + // remove the next node in the linked queue + Queue *linked = item->data.queue; + assert(!queue_empty(linked)); + QueueItem *child = + queue_node_data(QUEUE_HEAD(&linked->headtail)); + QUEUE_REMOVE(&child->node); + rv = child->data.item.event; + xfree(child); + } else { + assert(queue->parent); + assert(!queue_empty(queue->parent)); + // remove the corresponding link node in the parent queue + QUEUE_REMOVE(&item->data.item.parent->node); + xfree(item->data.item.parent); + rv = item->data.item.event; + } + + xfree(item); + return rv; +} + +static void queue_push(Queue *queue, Event event) +{ + QueueItem *item = xmalloc(sizeof(QueueItem)); + item->link = false; + item->data.item.event = event; + QUEUE_INSERT_TAIL(&queue->headtail, &item->node); + // push link node to the parent queue + item->data.item.parent = xmalloc(sizeof(QueueItem)); + item->data.item.parent->link = true; + item->data.item.parent->data.queue = queue; + QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node); +} + +static QueueItem *queue_node_data(QUEUE *q) +{ + return QUEUE_DATA(q, QueueItem, node); +} diff --git a/src/nvim/event/queue.h b/src/nvim/event/queue.h new file mode 100644 index 0000000000..85fc59f8b2 --- /dev/null +++ b/src/nvim/event/queue.h @@ -0,0 +1,19 @@ +#ifndef NVIM_EVENT_QUEUE_H +#define NVIM_EVENT_QUEUE_H + +#include <uv.h> + +#include "nvim/event/defs.h" +#include "nvim/lib/queue.h" + +typedef struct queue Queue; +typedef void (*put_callback)(Queue *queue, void *data); + +#define queue_put(q, h, ...) \ + queue_put_event(q, event_create(1, h, __VA_ARGS__)); + + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "event/queue.h.generated.h" +#endif +#endif // NVIM_EVENT_QUEUE_H diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index 9d2439ac2b..0a720bb852 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -49,6 +49,7 @@ void rstream_init(Stream *stream, size_t bufsize) /// /// @param stream The `Stream` instance void rstream_start(Stream *stream, stream_read_cb cb) + FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; if (stream->uvstream) { @@ -62,6 +63,7 @@ void rstream_start(Stream *stream, stream_read_cb cb) /// /// @param stream The `Stream` instance void rstream_stop(Stream *stream) + FUNC_ATTR_NONNULL_ALL { if (stream->uvstream) { uv_read_stop(stream->uvstream); @@ -112,7 +114,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(uvstream); - invoke_read_cb(stream, true); + invoke_read_cb(stream, 0, true); } return; } @@ -122,7 +124,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(stream->buffer, nread); - invoke_read_cb(stream, false); + invoke_read_cb(stream, nread, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -156,7 +158,7 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(&stream->uv.idle); - invoke_read_cb(stream, true); + invoke_read_cb(stream, 0, true); return; } @@ -164,12 +166,21 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(stream->buffer, nread); stream->fpos += nread; - invoke_read_cb(stream, false); + invoke_read_cb(stream, nread, false); } -static void invoke_read_cb(Stream *stream, bool eof) +static void read_event(void **argv) { + Stream *stream = argv[0]; if (stream->read_cb) { - stream->read_cb(stream, stream->buffer, stream->data, eof); + size_t count = (uintptr_t)argv[1]; + bool eof = (uintptr_t)argv[2]; + stream->read_cb(stream, stream->buffer, count, stream->data, eof); } } + +static void invoke_read_cb(Stream *stream, size_t count, bool eof) +{ + CREATE_EVENT(stream->events, read_event, 3, stream, + (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); +} diff --git a/src/nvim/event/signal.c b/src/nvim/event/signal.c index 63133b4f57..11ce15a882 100644 --- a/src/nvim/event/signal.c +++ b/src/nvim/event/signal.c @@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data) watcher->uv.data = watcher; watcher->data = data; watcher->cb = NULL; + watcher->events = loop->fast_events; } void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum) @@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void signal_event(void **argv) +{ + SignalWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->uv.signum, watcher->data); +} + static void signal_watcher_cb(uv_signal_t *handle, int signum) { SignalWatcher *watcher = handle->data; - watcher->cb(watcher, signum, watcher->data); + CREATE_EVENT(watcher->events, signal_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/signal.h b/src/nvim/event/signal.h index c269fa9d95..e32608acc0 100644 --- a/src/nvim/event/signal.h +++ b/src/nvim/event/signal.h @@ -14,6 +14,7 @@ struct signal_watcher { void *data; signal_cb cb; signal_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index bdc632abf0..347e464d25 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher, watcher->stream->data = watcher; watcher->cb = NULL; watcher->close_cb = NULL; + watcher->events = NULL; } int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) @@ -113,6 +114,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) } int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) + FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; @@ -142,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb) uv_close((uv_handle_t *)watcher->stream, close_cb); } +static void connection_event(void **argv) +{ + SocketWatcher *watcher = argv[0]; + int status = (int)(uintptr_t)(argv[1]); + watcher->cb(watcher, status, watcher->data); +} + static void connection_cb(uv_stream_t *handle, int status) { SocketWatcher *watcher = handle->data; - watcher->cb(watcher, status, watcher->data); + CREATE_EVENT(watcher->events, connection_event, 2, watcher, + (void *)(uintptr_t)status); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/socket.h b/src/nvim/event/socket.h index 17fd39f33b..ad59fdbe3a 100644 --- a/src/nvim/event/socket.h +++ b/src/nvim/event/socket.h @@ -30,6 +30,7 @@ struct socket_watcher { void *data; socket_cb cb; socket_close_cb close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 959b532146..6caad6fdcc 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -32,6 +32,7 @@ int stream_set_blocking(int fd, bool blocking) void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, void *data) + FUNC_ATTR_NONNULL_ARG(2) { stream->uvstream = uvstream; @@ -55,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, if (stream->uvstream) { stream->uvstream->data = stream; + loop = stream->uvstream->loop->data; } stream->data = data; @@ -69,16 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->internal_close_cb = NULL; stream->closed = false; stream->buffer = NULL; + stream->events = NULL; } void stream_close(Stream *stream, stream_close_cb on_stream_close) + FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); - - if (stream->buffer) { - rbuffer_free(stream->buffer); - } - stream->closed = true; stream->close_cb = on_stream_close; @@ -88,6 +87,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close) } void stream_close_handle(Stream *stream) + FUNC_ATTR_NONNULL_ALL { if (stream->uvstream) { uv_close((uv_handle_t *)stream->uvstream, close_cb); @@ -99,6 +99,9 @@ void stream_close_handle(Stream *stream) static void close_cb(uv_handle_t *handle) { Stream *stream = handle->data; + if (stream->buffer) { + rbuffer_free(stream->buffer); + } if (stream->close_cb) { stream->close_cb(stream, stream->data); } diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index 37410b2036..c6baac0db7 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -14,10 +14,14 @@ typedef struct stream Stream; /// /// @param stream The Stream instance /// @param rbuffer The associated RBuffer instance +/// @param count Number of bytes to read. This must be respected if keeping +/// the order of events is a requirement. This is because events +/// may be queued and only processed later when more data is copied +/// into to the buffer, so one read may starve another. /// @param data User-defined data /// @param eof If the stream reached EOF. -typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data, - bool eof); +typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof); /// Type of function called when the Stream has information about a write /// request. @@ -47,6 +51,7 @@ struct stream { size_t pending_reqs; void *data, *internal_data; bool closed; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/time.c b/src/nvim/event/time.c index ce33cdfc10..7bf333bcea 100644 --- a/src/nvim/event/time.c +++ b/src/nvim/event/time.c @@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data) uv_timer_init(&loop->uv, &watcher->uv); watcher->uv.data = watcher; watcher->data = data; + watcher->events = loop->fast_events; } void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout, @@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb) uv_close((uv_handle_t *)&watcher->uv, close_cb); } +static void time_event(void **argv) +{ + TimeWatcher *watcher = argv[0]; + watcher->cb(watcher, watcher->data); +} + static void time_watcher_cb(uv_timer_t *handle) FUNC_ATTR_NONNULL_ALL { TimeWatcher *watcher = handle->data; - watcher->cb(watcher, watcher->data); + CREATE_EVENT(watcher->events, time_event, 1, watcher); } static void close_cb(uv_handle_t *handle) diff --git a/src/nvim/event/time.h b/src/nvim/event/time.h index ee50e53d11..7882b2b627 100644 --- a/src/nvim/event/time.h +++ b/src/nvim/event/time.h @@ -12,6 +12,7 @@ struct time_watcher { uv_timer_t uv; void *data; time_cb cb, close_cb; + Queue *events; }; #ifdef INCLUDE_GENERATED_DECLARATIONS diff --git a/src/nvim/event/uv_process.h b/src/nvim/event/uv_process.h index a17f1446b3..5ee73044b5 100644 --- a/src/nvim/event/uv_process.h +++ b/src/nvim/event/uv_process.h @@ -12,10 +12,10 @@ typedef struct uv_process { uv_stdio_container_t uvstdio[3]; } UvProcess; -static inline UvProcess uv_process_init(void *data) +static inline UvProcess uv_process_init(Loop *loop, void *data) { UvProcess rv; - rv.process = process_init(kProcessTypeUv, data); + rv.process = process_init(loop, kProcessTypeUv, data); return rv; } diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 5fcb724fe3..8028e35e6b 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -118,6 +118,7 @@ WBuffer *wstream_new_buffer(char *data, size_t size, size_t refcount, wbuffer_data_finalizer cb) + FUNC_ATTR_NONNULL_ARG(1) { WBuffer *rv = xmalloc(sizeof(WBuffer)); rv->size = size; @@ -151,6 +152,7 @@ static void write_cb(uv_write_t *req, int status) } void wstream_release_wbuffer(WBuffer *buffer) + FUNC_ATTR_NONNULL_ALL { if (!--buffer->refcount) { if (buffer->cb) { diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c index 785db1dbd1..03116d454f 100644 --- a/src/nvim/ex_getln.c +++ b/src/nvim/ex_getln.c @@ -62,6 +62,7 @@ #include "nvim/tag.h" #include "nvim/window.h" #include "nvim/ui.h" +#include "nvim/os/input.h" #include "nvim/os/os.h" #include "nvim/event/loop.h" @@ -298,14 +299,14 @@ getcmdline ( /* Get a character. Ignore K_IGNORE, it should not do anything, such * as stop completion. */ - loop_enable_deferred_events(&loop); + input_enable_events(); do { c = safe_vgetc(); } while (c == K_IGNORE); - loop_disable_deferred_events(&loop); + input_disable_events(); if (c == K_EVENT) { - loop_process_event(&loop); + queue_process_events(loop.events); continue; } diff --git a/src/nvim/func_attr.h b/src/nvim/func_attr.h index 519f61c763..c31d21ec6d 100644 --- a/src/nvim/func_attr.h +++ b/src/nvim/func_attr.h @@ -179,7 +179,7 @@ #endif #ifdef DEFINE_FUNC_ATTRIBUTES - #define FUNC_ATTR_DEFERRED + #define FUNC_ATTR_ASYNC #define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC #define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x) #define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y) diff --git a/src/nvim/globals.h b/src/nvim/globals.h index e4dcad9afb..68cb923e42 100644 --- a/src/nvim/globals.h +++ b/src/nvim/globals.h @@ -898,14 +898,6 @@ EXTERN FILE *scriptout INIT(= NULL); /* stream to write script to */ /* volatile because it is used in signal handler catch_sigint(). */ EXTERN volatile int got_int INIT(= FALSE); /* set to TRUE when interrupt signal occurred */ -EXTERN int disable_breakcheck INIT(= 0); // > 0 if breakchecks should be - // ignored. FIXME(tarruda): Hacky - // way to run functions that would - // result in *_breakcheck calls - // while events that would normally - // be deferred are being processed - // immediately. Ref: - // neovim/neovim#2371 EXTERN int bangredo INIT(= FALSE); /* set to TRUE with ! command */ EXTERN int searchcmdlen; /* length of previous search cmd */ EXTERN int reg_do_extmatch INIT(= 0); /* Used when compiling regexp: diff --git a/src/nvim/lib/queue.h b/src/nvim/lib/queue.h new file mode 100644 index 0000000000..fe02b454ea --- /dev/null +++ b/src/nvim/lib/queue.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef QUEUE_H_ +#define QUEUE_H_ + +typedef void *QUEUE[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) +#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Public macros. */ +#define QUEUE_DATA(ptr, type, field) \ + ((type *) ((char *) (ptr) - ((char *) &((type *) 0)->field))) + +#define QUEUE_FOREACH(q, h) \ + for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) + +#define QUEUE_EMPTY(q) \ + ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) + +#define QUEUE_HEAD(q) \ + (QUEUE_NEXT(q)) + +#define QUEUE_INIT(q) \ + do { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } \ + while (0) + +#define QUEUE_ADD(h, n) \ + do { \ + QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ + QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV(h) = QUEUE_PREV(n); \ + QUEUE_PREV_NEXT(h) = (h); \ + } \ + while (0) + +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } \ + while (0) + +#define QUEUE_INSERT_HEAD(h, q) \ + do { \ + QUEUE_NEXT(q) = QUEUE_NEXT(h); \ + QUEUE_PREV(q) = (h); \ + QUEUE_NEXT_PREV(q) = (q); \ + QUEUE_NEXT(h) = (q); \ + } \ + while (0) + +#define QUEUE_INSERT_TAIL(h, q) \ + do { \ + QUEUE_NEXT(q) = (h); \ + QUEUE_PREV(q) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(q) = (q); \ + QUEUE_PREV(h) = (q); \ + } \ + while (0) + +#define QUEUE_REMOVE(q) \ + do { \ + QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ + QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ + } \ + while (0) + +#endif /* QUEUE_H_ */ diff --git a/src/nvim/main.c b/src/nvim/main.c index bfaeada6de..e11db16c61 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -157,11 +157,11 @@ void event_init(void) void event_teardown(void) { - if (!loop.deferred_events) { + if (!loop.events) { return; } - loop_process_all_events(&loop); + queue_process_events(loop.events); input_stop(); channel_teardown(); process_teardown(&loop); diff --git a/src/nvim/map.c b/src/nvim/map.c index 5d83020619..ed7bda4cce 100644 --- a/src/nvim/map.c +++ b/src/nvim/map.c @@ -116,5 +116,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER) MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER) -#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false} +#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .async = false} MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER) diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index eee662dd9c..0e3b8200c9 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -68,6 +68,7 @@ typedef struct { uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; kvec_t(WBuffer *) delayed_notifications; + Queue *events; } Channel; typedef struct { @@ -123,14 +124,14 @@ void channel_teardown(void) uint64_t channel_from_process(char **argv) { Channel *channel = register_channel(kChannelTypeProc); - channel->data.process.uvproc = uv_process_init(channel); + channel->data.process.uvproc = uv_process_init(&loop, channel); Process *proc = &channel->data.process.uvproc.process; proc->argv = argv; proc->in = &channel->data.process.in; proc->out = &channel->data.process.out; proc->err = &channel->data.process.err; proc->cb = process_exit; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { loop_poll_events(&loop, 0); decref(channel); return 0; @@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id, ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); channel->pending_requests++; - LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned); + LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned); (void)kv_pop(channel->call_stack); channel->pending_requests--; @@ -327,7 +328,8 @@ static void channel_from_stdio(void) wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL); } -static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, + void *data, bool eof) { while (rbuffer_size(rbuf)) { char buf[256]; @@ -342,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data) decref(data); } -static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof) +static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, + bool eof) { Channel *channel = data; incref(channel); @@ -450,31 +453,31 @@ static void handle_request(Channel *channel, msgpack_object *request) method->via.bin.size); } else { handler.fn = msgpack_rpc_handle_missing_method; - handler.defer = false; + handler.async = true; } Array args = ARRAY_DICT_INIT; if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { handler.fn = msgpack_rpc_handle_invalid_arguments; - handler.defer = false; + handler.async = true; } - bool defer = (!kv_size(channel->call_stack) && handler.defer); RequestEvent *event_data = xmalloc(sizeof(RequestEvent)); event_data->channel = channel; event_data->handler = handler; event_data->args = args; event_data->request_id = request_id; incref(channel); - loop_push_event(&loop, (Event) { - .handler = on_request_event, - .data = event_data - }, defer); + if (handler.async) { + on_request_event((void **)&event_data); + } else { + queue_put(channel->events, on_request_event, 1, event_data); + } } -static void on_request_event(Event event) +static void on_request_event(void **argv) { - RequestEvent *e = event.data; + RequestEvent *e = argv[0]; Channel *channel = e->channel; MsgpackRpcRequestHandler handler = e->handler; Array args = e->args; @@ -649,9 +652,8 @@ static void close_channel(Channel *channel) case kChannelTypeStdio: stream_close(&channel->data.std.in, NULL); stream_close(&channel->data.std.out, NULL); - loop_push_event(&loop, - (Event) { .handler = on_stdio_close, .data = channel }, false); - break; + queue_put(loop.fast_events, exit_event, 1, channel); + return; default: abort(); } @@ -659,9 +661,9 @@ static void close_channel(Channel *channel) decref(channel); } -static void on_stdio_close(Event e) +static void exit_event(void **argv) { - decref(e.data); + decref(argv[0]); if (!exiting) { mch_exit(0); @@ -683,6 +685,7 @@ static void free_channel(Channel *channel) pmap_free(cstr_t)(channel->subscribed_events); kv_destroy(channel->call_stack); kv_destroy(channel->delayed_notifications); + queue_free(channel->events); xfree(channel); } @@ -694,6 +697,7 @@ static void close_cb(Stream *stream, void *data) static Channel *register_channel(ChannelType type) { Channel *rv = xmalloc(sizeof(Channel)); + rv->events = queue_new_child(loop.events); rv->type = type; rv->refcount = 1; rv->closed = false; diff --git a/src/nvim/msgpack_rpc/defs.h b/src/nvim/msgpack_rpc/defs.h index 0492a65290..d97cf28ca1 100644 --- a/src/nvim/msgpack_rpc/defs.h +++ b/src/nvim/msgpack_rpc/defs.h @@ -11,9 +11,8 @@ typedef struct { uint64_t request_id, Array args, Error *error); - bool defer; // Should the call be deferred to the main loop? This should - // be true if the function mutates editor data structures such - // as buffers, windows, tabs, or if it executes vimscript code. + bool async; // function is always safe to run immediately instead of being + // put in a request queue for handling when nvim waits for input. } MsgpackRpcRequestHandler; /// Initializes the msgpack-rpc method table diff --git a/src/nvim/msgpack_rpc/remote_ui.c b/src/nvim/msgpack_rpc/remote_ui.c index e582bf9550..3334b0e6af 100644 --- a/src/nvim/msgpack_rpc/remote_ui.c +++ b/src/nvim/msgpack_rpc/remote_ui.c @@ -28,7 +28,7 @@ void remote_ui_init(void) connected_uis = pmap_new(uint64_t)(); // Add handler for "attach_ui" String method = cstr_as_string("ui_attach"); - MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .defer = false}; + MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .async = true}; msgpack_rpc_add_method_handler(method, handler); method = cstr_as_string("ui_detach"); handler.fn = remote_ui_detach; diff --git a/src/nvim/normal.c b/src/nvim/normal.c index 95e1c3d113..5b35af9209 100644 --- a/src/nvim/normal.c +++ b/src/nvim/normal.c @@ -63,6 +63,7 @@ #include "nvim/window.h" #include "nvim/event/loop.h" #include "nvim/os/time.h" +#include "nvim/os/input.h" /* * The Visual area is remembered for reselection. @@ -487,12 +488,12 @@ normal_cmd ( /* * Get the command character from the user. */ - loop_enable_deferred_events(&loop); + input_enable_events(); c = safe_vgetc(); - loop_disable_deferred_events(&loop); + input_disable_events(); if (c == K_EVENT) { - loop_process_event(&loop); + queue_process_events(loop.events); return; } diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index b0e0f57e60..09f162f79d 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -33,6 +33,7 @@ static Stream read_stream = {.closed = true}; static RBuffer *input_buffer = NULL; static bool input_eof = false; static int global_fd = 0; +static int events_enabled = 0; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/input.c.generated.h" @@ -110,8 +111,8 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); } - // If there are deferred events, return the keys directly - if (loop_has_deferred_events(&loop)) { + // If there are events, return the keys directly + if (pending_events()) { return push_event_key(buf, maxlen); } @@ -131,11 +132,21 @@ bool os_char_avail(void) // Check for CTRL-C typed by reading all available characters. void os_breakcheck(void) { - if (!disable_breakcheck && !got_int) { + if (!got_int) { loop_poll_events(&loop, 0); } } +void input_enable_events(void) +{ + events_enabled++; +} + +void input_disable_events(void) +{ + events_enabled--; +} + /// Test whether a file descriptor refers to a terminal. /// /// @param fd File descriptor. @@ -281,7 +292,7 @@ static bool input_poll(int ms) prof_inchar_enter(); } - LOOP_POLL_EVENTS_UNTIL(&loop, ms, input_ready() || input_eof); + LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, ms, input_ready() || input_eof); if (do_profiling == PROF_YES && ms) { prof_inchar_exit(); @@ -305,7 +316,8 @@ static InbufPollResult inbuf_poll(int ms) return input_eof ? kInputEof : kInputNone; } -static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof) +static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, + bool at_eof) { if (at_eof) { input_eof = true; @@ -358,7 +370,7 @@ static bool input_ready(void) { return typebuf_was_filled || // API call filled typeahead rbuffer_size(input_buffer) || // Input buffer filled - loop_has_deferred_events(&loop); // Events must be processed + pending_events(); // Events must be processed } // Exit because of an input read error. @@ -369,3 +381,8 @@ static void read_error_exit(void) STRCPY(IObuff, _("Vim: Error reading input, exiting...\n")); preserve_exit(); } + +static bool pending_events(void) +{ + return events_enabled && !queue_empty(loop.events); +} diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index b9c5db4261..2d97c4bf4f 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -205,13 +205,15 @@ static int do_os_system(char **argv, xstrlcpy(prog, argv[0], MAXPATHL); Stream in, out, err; - UvProcess uvproc = uv_process_init(&buf); + UvProcess uvproc = uv_process_init(&loop, &buf); Process *proc = &uvproc.process; + Queue *events = queue_new_child(loop.events); + proc->events = events; proc->argv = argv; proc->in = input != NULL ? &in : NULL; proc->out = &out; proc->err = &err; - if (!process_spawn(&loop, proc)) { + if (!process_spawn(proc)) { loop_poll_events(&loop, 0); // Failed, probably due to `sh` not being executable if (!silent) { @@ -219,14 +221,22 @@ static int do_os_system(char **argv, msg_outtrans((char_u *)prog); msg_putchar('\n'); } + queue_free(events); return -1; } + // We want to deal with stream events as fast a possible while queueing + // process events, so reset everything to NULL. It prevents closing the + // streams while there's still data in the OS buffer(due to the process + // exiting before all data is read). if (input != NULL) { + proc->in->events = NULL; wstream_init(proc->in, 0); } + proc->out->events = NULL; rstream_init(proc->out, 0); rstream_start(proc->out, data_cb); + proc->err->events = NULL; rstream_init(proc->err, 0); rstream_start(proc->err, data_cb); @@ -247,7 +257,7 @@ static int do_os_system(char **argv, // the UI ui_busy_start(); ui_flush(); - int status = process_wait(proc, -1); + int status = process_wait(proc, -1, NULL); ui_busy_stop(); // prepare the out parameters if requested @@ -267,6 +277,9 @@ static int do_os_system(char **argv, } } + assert(queue_empty(events)); + queue_free(events); + return status; } @@ -285,7 +298,8 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired) buf->data = xrealloc(buf->data, buf->cap); } -static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) +static void system_data_cb(Stream *stream, RBuffer *buf, size_t count, + void *data, bool eof) { DynamicBuffer *dbuf = data; @@ -295,7 +309,8 @@ static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) dbuf->len += nread; } -static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof) +static void out_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data, + bool eof) { size_t cnt; char *ptr = rbuffer_read_ptr(buf, &cnt); diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 6de3435c4c..7158721433 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -115,16 +115,6 @@ static void deadly_signal(int signum) static void on_signal(SignalWatcher *handle, int signum, void *data) { assert(signum >= 0); - loop_push_event(&loop, (Event) { - .handler = on_signal_event, - .data = (void *)(uintptr_t)signum - }, false); -} - -static void on_signal_event(Event event) -{ - int signum = (int)(uintptr_t)event.data; - switch (signum) { #ifdef SIGPWR case SIGPWR: @@ -148,4 +138,3 @@ static void on_signal_event(Event event) break; } } - diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index 6b5d4359db..ee17938afc 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -43,7 +43,7 @@ void os_delay(uint64_t milliseconds, bool ignoreinput) if (milliseconds > INT_MAX) { milliseconds = INT_MAX; } - LOOP_POLL_EVENTS_UNTIL(&loop, (int)milliseconds, got_int); + LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, (int)milliseconds, got_int); } else { os_microdelay(milliseconds * 1000); } diff --git a/src/nvim/terminal.c b/src/nvim/terminal.c index 47fef692db..b9bc4c6d78 100644 --- a/src/nvim/terminal.c +++ b/src/nvim/terminal.c @@ -69,6 +69,7 @@ #include "nvim/fileio.h" #include "nvim/event/loop.h" #include "nvim/event/time.h" +#include "nvim/os/input.h" #include "nvim/api/private/helpers.h" #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -324,7 +325,7 @@ void terminal_resize(Terminal *term, uint16_t width, uint16_t height) invalidate_terminal(term, -1, -1); } -void terminal_enter(bool process_deferred) +void terminal_enter(void) { Terminal *term = curbuf->terminal; assert(term && "should only be called when curbuf has a terminal"); @@ -353,15 +354,9 @@ void terminal_enter(bool process_deferred) bool got_bs = false; // True if the last input was <C-\> while (term->buf == curbuf) { - if (process_deferred) { - loop_enable_deferred_events(&loop); - } - + input_enable_events(); c = safe_vgetc(); - - if (process_deferred) { - loop_disable_deferred_events(&loop); - } + input_disable_events(); switch (c) { case K_LEFTMOUSE: @@ -381,7 +376,7 @@ void terminal_enter(bool process_deferred) break; case K_EVENT: - loop_process_event(&loop); + queue_process_events(loop.events); break; case Ctrl_N: @@ -427,7 +422,13 @@ void terminal_destroy(Terminal *term) term->buf->terminal = NULL; } term->buf = NULL; - pmap_del(ptr_t)(invalidated_terminals, term); + if (pmap_has(ptr_t)(invalidated_terminals, term)) { + // flush any pending changes to the buffer + block_autocmds(); + refresh_terminal(term); + unblock_autocmds(); + pmap_del(ptr_t)(invalidated_terminals, term); + } for (size_t i = 0 ; i < term->sb_current; i++) { xfree(term->sb_buffer[i]); } @@ -883,48 +884,47 @@ static void invalidate_terminal(Terminal *term, int start_row, int end_row) } } +static void refresh_terminal(Terminal *term) +{ + // TODO(SplinterOfChaos): Find the condition that makes term->buf invalid. + bool valid = true; + if (!term->buf || !(valid = buf_valid(term->buf))) { + // destroyed by `close_buffer`. Dont do anything else + if (!valid) { + term->buf = NULL; + } + return; + } + bool pending_resize = term->pending_resize; + WITH_BUFFER(term->buf, { + refresh_size(term); + refresh_scrollback(term); + refresh_screen(term); + redraw_buf_later(term->buf, NOT_VALID); + }); + adjust_topline(term, pending_resize); +} // libuv timer callback. This will enqueue on_refresh to be processed as an // event. static void refresh_timer_cb(TimeWatcher *watcher, void *data) { - loop_push_event(&loop, (Event) {.handler = on_refresh}, false); - refresh_pending = false; -} - -// Refresh all invalidated terminals -static void on_refresh(Event event) -{ if (exiting) { // bad things can happen if we redraw when exiting, and there's no need to // update the buffer. - return; + goto end; } Terminal *term; void *stub; (void)(stub); // don't process autocommands while updating terminal buffers block_autocmds(); map_foreach(invalidated_terminals, term, stub, { - // TODO(SplinterOfChaos): Find the condition that makes term->buf invalid. - bool valid = true; - if (!term->buf || !(valid = buf_valid(term->buf))) { - // destroyed by `close_buffer`. Dont do anything else - if (!valid) { - term->buf = NULL; - } - continue; - } - bool pending_resize = term->pending_resize; - WITH_BUFFER(term->buf, { - refresh_size(term); - refresh_scrollback(term); - refresh_screen(term); - redraw_buf_later(term->buf, NOT_VALID); - }); - adjust_topline(term, pending_resize); + refresh_terminal(term); }); pmap_clear(ptr_t)(invalidated_terminals); unblock_autocmds(); redraw(true); +end: + refresh_pending = false; } static void refresh_size(Terminal *term) diff --git a/src/nvim/tui/term_input.inl b/src/nvim/tui/term_input.inl index 0a84a3688b..c396557160 100644 --- a/src/nvim/tui/term_input.inl +++ b/src/nvim/tui/term_input.inl @@ -206,9 +206,10 @@ static bool handle_forced_escape(TermInput *input) return false; } -static void restart_reading(Event event); +static void restart_reading(void **argv); -static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) +static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, + bool eof) { TermInput *input = data; @@ -226,8 +227,7 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) // ls *.md | xargs nvim input->in_fd = 2; stream_close(&input->read_stream, NULL); - loop_push_event(&loop, - (Event) { .data = input, .handler = restart_reading }, false); + queue_put(loop.fast_events, restart_reading, 1, input); } else { input_done(); } @@ -272,9 +272,9 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof) rbuffer_reset(input->read_stream.buffer); } -static void restart_reading(Event event) +static void restart_reading(void **argv) { - TermInput *input = event.data; + TermInput *input = argv[0]; rstream_init_fd(&loop, &input->read_stream, input->in_fd, 0xfff, input); rstream_start(&input->read_stream, read_cb); } diff --git a/src/nvim/tui/tui.c b/src/nvim/tui/tui.c index 28870b5abb..460fdb81a8 100644 --- a/src/nvim/tui/tui.c +++ b/src/nvim/tui/tui.c @@ -205,24 +205,13 @@ static void tui_stop(UI *ui) xfree(ui); } -static void try_resize(Event ev) +static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data) { - UI *ui = ev.data; + UI *ui = data; update_size(ui); ui_refresh(); } -static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data) -{ - got_winch = true; - // Queue the event because resizing can result in recursive event_poll calls - // FIXME(blueyed): TUI does not resize properly when not deferred. Why? #2322 - loop_push_event(&loop, (Event) { - .data = data, - .handler = try_resize - }, true); -} - static bool attrs_differ(HlAttrs a1, HlAttrs a2) { return a1.foreground != a2.foreground || a1.background != a2.background diff --git a/src/nvim/ui.c b/src/nvim/ui.c index 7e155f9b4f..ad875367c9 100644 --- a/src/nvim/ui.c +++ b/src/nvim/ui.c @@ -214,9 +214,9 @@ void ui_detach(UI *ui) shift_index++; } - ui_count--; - // schedule a refresh - loop_push_event(&loop, (Event) { .handler = refresh }, false); + if (--ui_count) { + ui_refresh(); + } } void ui_clear(void) @@ -485,11 +485,3 @@ static void ui_mode_change(void) UI_CALL(mode_change, mode); conceal_check_cursur_line(); } - -static void refresh(Event event) -{ - if (ui_count) { - ui_refresh(); - } -} - diff --git a/test/functional/clipboard/clipboard_provider_spec.lua b/test/functional/clipboard/clipboard_provider_spec.lua index 30903e8b57..0550d22fa6 100644 --- a/test/functional/clipboard/clipboard_provider_spec.lua +++ b/test/functional/clipboard/clipboard_provider_spec.lua @@ -369,6 +369,7 @@ describe('clipboard usage', function() [2] = {foreground = Screen.colors.Blue}, [3] = {bold = true, foreground = Screen.colors.SeaGreen}}, {{bold = true, foreground = Screen.colors.Blue}}) + feed('<cr>') -- clear out of Press ENTER screen end) it('can paste "* to the commandline', function() diff --git a/test/unit/fixtures/queue.c b/test/unit/fixtures/queue.c new file mode 100644 index 0000000000..bbb6274b21 --- /dev/null +++ b/test/unit/fixtures/queue.c @@ -0,0 +1,16 @@ +#include <string.h> +#include <stdlib.h> +#include "nvim/event/queue.h" +#include "queue.h" + + +void ut_queue_put(Queue *queue, const char *str) +{ + queue_put(queue, NULL, 1, str); +} + +const char *ut_queue_get(Queue *queue) +{ + Event event = queue_get(queue); + return event.argv[0]; +} diff --git a/test/unit/fixtures/queue.h b/test/unit/fixtures/queue.h new file mode 100644 index 0000000000..ae949c9f29 --- /dev/null +++ b/test/unit/fixtures/queue.h @@ -0,0 +1,4 @@ +#include "nvim/event/queue.h" + +void ut_queue_put(Queue *queue, const char *str); +const char *ut_queue_get(Queue *queue); diff --git a/test/unit/queue_spec.lua b/test/unit/queue_spec.lua new file mode 100644 index 0000000000..9326c1cad6 --- /dev/null +++ b/test/unit/queue_spec.lua @@ -0,0 +1,123 @@ +local helpers = require("test.unit.helpers") + +local ffi = helpers.ffi +local eq = helpers.eq + +local queue = helpers.cimport("./test/unit/fixtures/queue.h") + +describe('queue', function() + local parent, child1, child2, child3 + + local function put(q, str) + queue.ut_queue_put(q, str) + end + + local function get(q) + return ffi.string(queue.ut_queue_get(q)) + end + + local function free(q) + queue.queue_free(q) + end + + before_each(function() + parent = queue.queue_new_parent(ffi.NULL, ffi.NULL) + child1 = queue.queue_new_child(parent) + child2 = queue.queue_new_child(parent) + child3 = queue.queue_new_child(parent) + put(child1, 'c1i1') + put(child1, 'c1i2') + put(child2, 'c2i1') + put(child1, 'c1i3') + put(child2, 'c2i2') + put(child2, 'c2i3') + put(child2, 'c2i4') + put(child3, 'c3i1') + put(child3, 'c3i2') + end) + + it('removing from parent removes from child', function() + eq('c1i1', get(parent)) + eq('c1i2', get(parent)) + eq('c2i1', get(parent)) + eq('c1i3', get(parent)) + eq('c2i2', get(parent)) + eq('c2i3', get(parent)) + eq('c2i4', get(parent)) + end) + + it('removing from child removes from parent', function() + eq('c2i1', get(child2)) + eq('c2i2', get(child2)) + eq('c1i1', get(child1)) + eq('c1i2', get(parent)) + eq('c1i3', get(parent)) + eq('c2i3', get(parent)) + eq('c2i4', get(parent)) + end) + + it('removing from child at the beginning of parent', function() + eq('c1i1', get(child1)) + eq('c1i2', get(child1)) + eq('c2i1', get(parent)) + end) + + it('removing from parent after get from parent and put to child', function() + eq('c1i1', get(parent)) + eq('c1i2', get(parent)) + eq('c2i1', get(parent)) + eq('c1i3', get(parent)) + eq('c2i2', get(parent)) + eq('c2i3', get(parent)) + eq('c2i4', get(parent)) + eq('c3i1', get(parent)) + put(child1, 'c1i11') + put(child1, 'c1i22') + eq('c3i2', get(parent)) + eq('c1i11', get(parent)) + eq('c1i22', get(parent)) + end) + + it('removing from parent after get and put to child', function() + eq('c1i1', get(child1)) + eq('c1i2', get(child1)) + eq('c2i1', get(child2)) + eq('c1i3', get(child1)) + eq('c2i2', get(child2)) + eq('c2i3', get(child2)) + eq('c2i4', get(child2)) + eq('c3i1', get(child3)) + eq('c3i2', get(parent)) + put(child1, 'c1i11') + put(child2, 'c2i11') + put(child1, 'c1i12') + eq('c2i11', get(child2)) + eq('c1i11', get(parent)) + eq('c1i12', get(parent)) + end) + + it('put after removing from child at the end of parent', function() + eq('c3i1', get(child3)) + eq('c3i2', get(child3)) + put(child1, 'c1i11') + put(child2, 'c2i11') + eq('c1i1', get(parent)) + eq('c1i2', get(parent)) + eq('c2i1', get(parent)) + eq('c1i3', get(parent)) + eq('c2i2', get(parent)) + eq('c2i3', get(parent)) + eq('c2i4', get(parent)) + eq('c1i11', get(parent)) + eq('c2i11', get(parent)) + end) + + it('removes from parent queue when child is freed', function() + free(child2) + eq('c1i1', get(parent)) + eq('c1i2', get(parent)) + eq('c1i3', get(parent)) + eq('c3i1', get(child3)) + eq('c3i2', get(child3)) + end) +end) |