diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-10-22 07:30:32 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-10-22 07:30:32 -0300 |
commit | 5cd9b6474287650790e97187058c81b176fbc7c9 (patch) | |
tree | 57a419ce9104adf84bd1152a01f9c1f75f4bef85 /src | |
parent | 6e268cd0d40a3652a68b486bdbb421d39295ab48 (diff) | |
parent | f7fab4af863839a064a941a555b237b6eb789870 (diff) | |
download | rneovim-5cd9b6474287650790e97187058c81b176fbc7c9.tar.gz rneovim-5cd9b6474287650790e97187058c81b176fbc7c9.tar.bz2 rneovim-5cd9b6474287650790e97187058c81b176fbc7c9.zip |
Merge PR #1316 'Refactor event deferral'
Diffstat (limited to 'src')
33 files changed, 617 insertions, 754 deletions
diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt index 208df31596..a77e5e27a0 100644 --- a/src/nvim/CMakeLists.txt +++ b/src/nvim/CMakeLists.txt @@ -3,7 +3,7 @@ include(CheckLibraryExists) set(GENERATED_DIR ${PROJECT_BINARY_DIR}/src/nvim/auto) set(DISPATCH_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/msgpack-gen.lua) file(GLOB API_HEADERS api/*.h) -set(MSGPACK_RPC_HEADER ${PROJECT_SOURCE_DIR}/src/nvim/os/msgpack_rpc.h) +file(GLOB MSGPACK_RPC_HEADERS msgpack_rpc/*.h) set(MSGPACK_DISPATCH ${GENERATED_DIR}/msgpack_dispatch.c) set(HEADER_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/gendeclarations.lua) set(GENERATED_INCLUDES_DIR ${PROJECT_BINARY_DIR}/include) @@ -19,12 +19,14 @@ file(MAKE_DIRECTORY ${GENERATED_DIR}) file(MAKE_DIRECTORY ${GENERATED_DIR}/os) file(MAKE_DIRECTORY ${GENERATED_DIR}/api) file(MAKE_DIRECTORY ${GENERATED_DIR}/api/private) +file(MAKE_DIRECTORY ${GENERATED_DIR}/msgpack_rpc) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/os) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api) file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api/private) +file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/msgpack_rpc) -file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c) +file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c msgpack_rpc/*.c) file(GLOB_RECURSE NEOVIM_HEADERS *.h) foreach(sfile ${NEOVIM_SOURCES}) @@ -36,8 +38,7 @@ endforeach() list(REMOVE_ITEM NEOVIM_SOURCES ${to_remove}) -set(CONV_SRCS - api.c +set(CONV_SOURCES arabic.c cursor.c garray.c @@ -46,31 +47,24 @@ set(CONV_SRCS map.c memory.c misc2.c - map.c profile.c - os/env.c - os/event.c - os/job.c - os/mem.c - os/rstream.c - os/signal.c - os/users.c - os/provider.c - os/uv_helpers.c - os/wstream.c - os/msgpack_rpc.c tempfile.c - api/buffer.c - api/private/helpers.c - api/private/handle.c - api/tabpage.c - api/window.c - api/vim.h - api/vim.c ) +foreach(sfile ${CONV_SOURCES}) + if(NOT EXISTS "${PROJECT_SOURCE_DIR}/src/nvim/${sfile}") + message(FATAL_ERROR "${sfile} doesn't exist(it was added to CONV_SOURCES)") + endif() +endforeach() + +file(GLOB_RECURSE EXTRA_CONV_SOURCES os/*.c api/*.c msgpack_rpc/*.c) +foreach(sfile ${EXTRA_CONV_SOURCES}) + file(RELATIVE_PATH f "${PROJECT_SOURCE_DIR}/src/nvim" "${sfile}") + list(APPEND CONV_SOURCES ${f}) +endforeach() + set_source_files_properties( - ${CONV_SRCS} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion") + ${CONV_SOURCES} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion") if(CMAKE_C_COMPILER_ID MATCHES "Clang") if(DEFINED ENV{SANITIZE}) @@ -126,7 +120,7 @@ add_custom_command(OUTPUT ${MSGPACK_DISPATCH} COMMAND ${LUA_PRG} ${DISPATCH_GENERATOR} ${API_HEADERS} ${MSGPACK_DISPATCH} DEPENDS ${API_HEADERS} - ${MSGPACK_RPC_HEADER} + ${MSGPACK_RPC_HEADERS} ${DISPATCH_GENERATOR} ) diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c index 4ff5845bd4..982003a31a 100644 --- a/src/nvim/api/buffer.c +++ b/src/nvim/api/buffer.c @@ -69,6 +69,7 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err) /// @param line The new line. /// @param[out] err Details of an error that may have occurred void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) + FUNC_ATTR_DEFERRED { Object l = STRING_OBJ(line); Array array = {.items = &l, .size = 1}; @@ -81,6 +82,7 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err) /// @param index The line index /// @param[out] err Details of an error that may have occurred void buffer_del_line(Buffer buffer, Integer index, Error *err) + FUNC_ATTR_DEFERRED { Array array = ARRAY_DICT_INIT; buffer_set_line_slice(buffer, index, index, true, true, array, err); @@ -163,6 +165,7 @@ void buffer_set_line_slice(Buffer buffer, Boolean include_end, ArrayOf(String) replacement, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -314,6 +317,7 @@ Object buffer_get_var(Buffer buffer, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object buffer_set_var(Buffer buffer, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -349,6 +353,7 @@ Object buffer_get_option(Buffer buffer, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void buffer_set_option(Buffer buffer, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -399,6 +404,7 @@ String buffer_get_name(Buffer buffer, Error *err) /// @param name The buffer name /// @param[out] err Details of an error that may have occurred void buffer_set_name(Buffer buffer, String name, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -444,6 +450,7 @@ void buffer_insert(Buffer buffer, Integer lnum, ArrayOf(String) lines, Error *err) + FUNC_ATTR_DEFERRED { buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err); } diff --git a/src/nvim/api/tabpage.c b/src/nvim/api/tabpage.c index 3e5d00671a..cb06825731 100644 --- a/src/nvim/api/tabpage.c +++ b/src/nvim/api/tabpage.c @@ -62,6 +62,7 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The tab page handle Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { tabpage_T *tab = find_tab_by_handle(tabpage, err); diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c index 5e0f3e0c32..c90e7039ce 100644 --- a/src/nvim/api/vim.c +++ b/src/nvim/api/vim.c @@ -10,7 +10,7 @@ #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/buffer.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/provider.h" #include "nvim/vim.h" #include "nvim/buffer.h" @@ -31,19 +31,12 @@ # include "api/vim.c.generated.h" #endif -/// Send keys to vim input buffer, simulating user input. -/// -/// @param str The keys to send -void vim_push_keys(String str) -{ - abort(); -} - /// Executes an ex-mode command str /// /// @param str The command str /// @param[out] err Details of an error that may have occurred void vim_command(String str, Error *err) + FUNC_ATTR_DEFERRED { // Run the command try_start(); @@ -111,6 +104,7 @@ String vim_replace_termcodes(String str, Boolean from_part, Boolean do_lt, /// @param[out] err Details of an error that may have occurred /// @return The expanded object Object vim_eval(String str, Error *err) + FUNC_ATTR_DEFERRED { Object rv; // Evaluate the expression @@ -230,6 +224,7 @@ String vim_get_current_line(Error *err) /// @param line The line contents /// @param[out] err Details of an error that may have occurred void vim_set_current_line(String line, Error *err) + FUNC_ATTR_DEFERRED { buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err); } @@ -238,6 +233,7 @@ void vim_set_current_line(String line, Error *err) /// /// @param[out] err Details of an error that may have occurred void vim_del_current_line(Error *err) + FUNC_ATTR_DEFERRED { buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err); } @@ -259,6 +255,7 @@ Object vim_get_var(String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return the old value if any Object vim_set_var(String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { return dict_set_value(&globvardict, name, value, err); } @@ -289,6 +286,7 @@ Object vim_get_option(String name, Error *err) /// @param value The new option value /// @param[out] err Details of an error that may have occurred void vim_set_option(String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { set_option_to(NULL, SREQ_GLOBAL, name, value, err); } @@ -297,6 +295,7 @@ void vim_set_option(String name, Object value, Error *err) /// /// @param str The message void vim_out_write(String str) + FUNC_ATTR_DEFERRED { write_msg(str, false); } @@ -305,6 +304,7 @@ void vim_out_write(String str) /// /// @param str The message void vim_err_write(String str) + FUNC_ATTR_DEFERRED { write_msg(str, true); } @@ -314,6 +314,7 @@ void vim_err_write(String str) /// /// @param str The message void vim_report_error(String str) + FUNC_ATTR_DEFERRED { vim_err_write(str); vim_err_write((String) {.data = "\n", .size = 1}); @@ -357,6 +358,7 @@ Buffer vim_get_current_buffer(void) /// @param id The buffer handle /// @param[out] err Details of an error that may have occurred void vim_set_current_buffer(Buffer buffer, Error *err) + FUNC_ATTR_DEFERRED { buf_T *buf = find_buffer_by_handle(buffer, err); @@ -407,6 +409,7 @@ Window vim_get_current_window(void) /// /// @param handle The window handle void vim_set_current_window(Window window, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -462,6 +465,7 @@ Tabpage vim_get_current_tabpage(void) /// @param handle The tab page handle /// @param[out] err Details of an error that may have occurred void vim_set_current_tabpage(Tabpage tabpage, Error *err) + FUNC_ATTR_DEFERRED { tabpage_T *tp = find_tab_by_handle(tabpage, err); diff --git a/src/nvim/api/window.c b/src/nvim/api/window.c index 751518424b..fde1ebfa4c 100644 --- a/src/nvim/api/window.c +++ b/src/nvim/api/window.c @@ -52,6 +52,7 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err) /// @param pos the (row, col) tuple representing the new position /// @param[out] err Details of an error that may have occurred void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -111,6 +112,7 @@ Integer window_get_height(Window window, Error *err) /// @param height the new height in rows /// @param[out] err Details of an error that may have occurred void window_set_height(Window window, Integer height, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -154,6 +156,7 @@ Integer window_get_width(Window window, Error *err) /// @param width the new width in columns /// @param[out] err Details of an error that may have occurred void window_set_width(Window window, Integer width, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -199,6 +202,7 @@ Object window_get_var(Window window, String name, Error *err) /// @param[out] err Details of an error that may have occurred /// @return The old value Object window_set_var(Window window, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); @@ -234,6 +238,7 @@ Object window_get_option(Window window, String name, Error *err) /// @param value The option value /// @param[out] err Details of an error that may have occurred void window_set_option(Window window, String name, Object value, Error *err) + FUNC_ATTR_DEFERRED { win_T *win = find_window_by_handle(window, err); diff --git a/src/nvim/eval.c b/src/nvim/eval.c index e3bd37a03f..3fc4104258 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -18,6 +18,8 @@ #include <stdbool.h> #include <math.h> +#include "nvim/lib/klist.h" + #include "nvim/assert.h" #include "nvim/vim.h" #include "nvim/ascii.h" @@ -81,11 +83,12 @@ #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/time.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" #include "nvim/os/dl.h" #include "nvim/os/provider.h" +#include "nvim/os/event.h" #define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */ @@ -443,6 +446,16 @@ static dictitem_T vimvars_var; /* variable used for v: */ #define FNE_CHECK_START 2 /* find_name_end(): check name starts with valid character */ +// Memory pool for reusing JobEvent structures +typedef struct { + Job *job; + RStream *rstream; + char *type; +} JobEvent; +#define JobEventFreer(x) +KMEMPOOL_INIT(JobEventPool, JobEvent, JobEventFreer) +kmempool_t(JobEventPool) *job_event_pool = NULL; + /* * Initialize the global and v: variables. */ @@ -478,6 +491,7 @@ void eval_init(void) set_vim_var_nr(VV_HLSEARCH, 1L); set_reg_var(0); /* default for v:register is not 0 but '"' */ + job_event_pool = kmp_init(JobEventPool); } #if defined(EXITFREE) || defined(PROTO) @@ -19508,35 +19522,55 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags) return ret; } +// JobActivity autocommands will execute vimscript code, so it must be executed +// on Nvim main loop +#define push_job_event(j, r, t) \ + do { \ + JobEvent *event_data = kmp_alloc(JobEventPool, job_event_pool); \ + event_data->job = j; \ + event_data->rstream = r; \ + event_data->type = t; \ + event_push((Event) { \ + .handler = on_job_event, \ + .data = event_data \ + }); \ + } while(0) + static void on_job_stdout(RStream *rstream, void *data, bool eof) { if (!eof) { - on_job_data(rstream, data, eof, "stdout"); + push_job_event(data, rstream, "stdout"); } } static void on_job_stderr(RStream *rstream, void *data, bool eof) { if (!eof) { - on_job_data(rstream, data, eof, "stderr"); + push_job_event(data, rstream, "stderr"); } } static void on_job_exit(Job *job, void *data) { - apply_job_autocmds(job, data, "exit", NULL); - free(data); + push_job_event(data, NULL, "exit"); } -static void on_job_data(RStream *rstream, void *data, bool eof, char *type) +static void on_job_event(Event event) { - Job *job = data; - uint32_t read_count = rstream_pending(rstream); - char *str = xmalloc(read_count + 1); + JobEvent *data = event.data; + Job *job = data->job; + char *str = NULL; + + if (data->rstream) { + // Read event + size_t read_count = rstream_pending(data->rstream); + str = xmalloc(read_count + 1); - rstream_read(rstream, str, read_count); - str[read_count] = NUL; - apply_job_autocmds(job, job_data(job), type, str); + rstream_read(data->rstream, str, read_count); + str[read_count] = NUL; + } + apply_job_autocmds(job, job_data(job), data->type, str); + kmp_free(JobEventPool, job_event_pool, data); } static void apply_job_autocmds(Job *job, char *name, char *type, char *str) diff --git a/src/nvim/func_attr.h b/src/nvim/func_attr.h index c75d0ab312..519f61c763 100644 --- a/src/nvim/func_attr.h +++ b/src/nvim/func_attr.h @@ -179,6 +179,7 @@ #endif #ifdef DEFINE_FUNC_ATTRIBUTES + #define FUNC_ATTR_DEFERRED #define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC #define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x) #define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y) diff --git a/src/nvim/lib/klist.h b/src/nvim/lib/klist.h index e4a90fef33..f8dc7d4c43 100644 --- a/src/nvim/lib/klist.h +++ b/src/nvim/lib/klist.h @@ -39,6 +39,8 @@ static inline kmp_##name##_t *kmp_init_##name(void) { \ return xcalloc(1, sizeof(kmp_##name##_t)); \ } \ + static inline void kmp_destroy_##name(kmp_##name##_t *mp) \ + REAL_FATTR_UNUSED; \ static inline void kmp_destroy_##name(kmp_##name##_t *mp) { \ size_t k; \ for (k = 0; k < mp->n; ++k) { \ diff --git a/src/nvim/main.c b/src/nvim/main.c index 128d1a784c..a63ffb4a31 100644 --- a/src/nvim/main.c +++ b/src/nvim/main.c @@ -59,7 +59,7 @@ #include "nvim/os/input.h" #include "nvim/os/os.h" #include "nvim/os/signal.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/api/private/defs.h" #include "nvim/api/private/helpers.h" diff --git a/src/nvim/map.c b/src/nvim/map.c index 24aa38d67d..3f485cb952 100644 --- a/src/nvim/map.c +++ b/src/nvim/map.c @@ -6,7 +6,7 @@ #include "nvim/map_defs.h" #include "nvim/vim.h" #include "nvim/memory.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/msgpack_rpc/defs.h" #include "nvim/lib/khash.h" @@ -108,4 +108,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER) MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER) MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER) -MAP_IMPL(String, rpc_method_handler_fn, DEFAULT_INITIALIZER) +#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false} +MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER) diff --git a/src/nvim/map.h b/src/nvim/map.h index 616516c3e1..5ade6dcf15 100644 --- a/src/nvim/map.h +++ b/src/nvim/map.h @@ -5,7 +5,7 @@ #include "nvim/map_defs.h" #include "nvim/api/private/defs.h" -#include "nvim/os/msgpack_rpc.h" +#include "nvim/msgpack_rpc/defs.h" #define MAP_DECLS(T, U) \ KHASH_DECLARE(T##_##U##_map, T, U) \ @@ -25,7 +25,7 @@ MAP_DECLS(cstr_t, uint64_t) MAP_DECLS(cstr_t, ptr_t) MAP_DECLS(ptr_t, ptr_t) MAP_DECLS(uint64_t, ptr_t) -MAP_DECLS(String, rpc_method_handler_fn) +MAP_DECLS(String, MsgpackRpcRequestHandler) #define map_new(T, U) map_##T##_##U##_new #define map_free(T, U) map_##T##_##U##_free diff --git a/src/nvim/os/channel.c b/src/nvim/msgpack_rpc/channel.c index 959fbc6e73..a1ab12f7c3 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -5,9 +5,11 @@ #include <uv.h> #include <msgpack.h> +#include "nvim/lib/klist.h" + #include "nvim/api/private/helpers.h" #include "nvim/api/vim.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/event.h" #include "nvim/os/rstream.h" #include "nvim/os/rstream_defs.h" @@ -15,8 +17,7 @@ #include "nvim/os/wstream_defs.h" #include "nvim/os/job.h" #include "nvim/os/job_defs.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/vim.h" #include "nvim/ascii.h" #include "nvim/memory.h" @@ -32,14 +33,14 @@ typedef struct { uint64_t request_id; - bool errored; + bool returned, errored; Object result; } ChannelCallFrame; typedef struct { uint64_t id; PMap(cstr_t) *subscribed_events; - bool is_job, enabled; + bool is_job, closed; msgpack_unpacker *unpacker; union { Job *job; @@ -51,21 +52,32 @@ typedef struct { } data; uint64_t next_request_id; kvec_t(ChannelCallFrame *) call_stack; - size_t rpc_call_level; } Channel; +typedef struct { + Channel *channel; + MsgpackRpcRequestHandler handler; + Array args; + uint64_t request_id; +} RequestEvent; + +#define RequestEventFreer(x) +KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer) +kmempool_t(RequestEventPool) *request_event_pool = NULL; + static uint64_t next_id = 1; static PMap(uint64_t) *channels = NULL; static PMap(cstr_t) *event_strings = NULL; static msgpack_sbuffer out_buffer; #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.c.generated.h" +# include "msgpack_rpc/channel.c.generated.h" #endif /// Initializes the module void channel_init(void) { + request_event_pool = kmp_init(RequestEventPool); channels = pmap_new(uint64_t)(); event_strings = pmap_new(cstr_t)(); msgpack_sbuffer_init(&out_buffer); @@ -104,12 +116,12 @@ uint64_t channel_from_job(char **argv) channel, job_out, job_err, - NULL, + job_exit, 0, &status); if (status <= 0) { - close_channel(channel); + free_channel(channel); return 0; } @@ -128,8 +140,7 @@ void channel_from_stream(uv_stream_t *stream) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -142,7 +153,7 @@ bool channel_exists(uint64_t id) { Channel *channel; return (channel = pmap_get(uint64_t)(channels, id)) != NULL - && channel->enabled; + && !channel->closed; } /// Sends event/arguments to channel @@ -157,7 +168,7 @@ bool channel_send_event(uint64_t id, char *name, Array args) Channel *channel = NULL; if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_free_array(args); return false; } @@ -183,7 +194,7 @@ Object channel_send_call(uint64_t id, { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); api_free_array(args); return NIL; @@ -203,22 +214,11 @@ Object channel_send_call(uint64_t id, // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - // Push the frame - ChannelCallFrame frame = {request_id, false, NIL}; + ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); - size_t size = kv_size(channel->call_stack); - - do { - event_poll(-1, sources); - } while ( - // Continue running if ... - channel->enabled && // the channel is still enabled - kv_size(channel->call_stack) >= size); // the call didn't return + event_poll_until(-1, frame.returned); + (void)kv_pop(channel->call_stack); if (frame.errored) { api_set_error(err, Exception, "%s", frame.result.data.string.data); @@ -236,7 +236,7 @@ void channel_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -258,7 +258,7 @@ void channel_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { abort(); } @@ -273,12 +273,11 @@ bool channel_close(uint64_t id) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { return false; } - channel_kill(channel); - channel->enabled = false; + close_channel(channel); return true; } @@ -291,8 +290,7 @@ static void channel_from_stdio(void) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_file(channel->data.streams.read, 0); rstream_start(channel->data.streams.read); // write stream @@ -320,23 +318,20 @@ static void job_err(RStream *rstream, void *data, bool eof) } } +static void job_exit(Job *job, void *data) +{ + free_channel((Channel *)data); +} + static void parse_msgpack(RStream *rstream, void *data, bool eof) { Channel *channel = data; - channel->rpc_call_level++; if (eof) { - char buf[256]; - snprintf(buf, - sizeof(buf), - "Before returning from a RPC call, channel %" PRIu64 " was " - "closed by the client", - channel->id); - call_set_error(channel, buf); goto end; } - uint32_t count = rstream_pending(rstream); + size_t count = rstream_pending(rstream); DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", count, rstream); @@ -355,7 +350,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) MSGPACK_UNPACK_SUCCESS) { if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { if (is_valid_rpc_response(&unpacked.data, channel)) { - call_stack_pop(&unpacked.data, channel); + complete_call(&unpacked.data, channel); } else { char buf[256]; snprintf(buf, @@ -371,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) goto end; } - // Perform the call - WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer); - // write the response - if (!channel_write(channel, resp)) { - goto end; - } + handle_request(channel, &unpacked.data); } if (result == MSGPACK_UNPACK_NOMEM_ERROR) { @@ -398,13 +388,92 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } end: - channel->rpc_call_level--; - if (!channel->enabled && !kv_size(channel->call_stack)) { - // Now it's safe to destroy the channel - close_channel(channel); + if (eof && !channel->is_job && !kv_size(channel->call_stack)) { + // The free_channel call is deferred for jobs because it's possible that + // job_stderr will called after this. For non-job channels, this is the + // last callback so it must be freed now. + free_channel(channel); } } +static void handle_request(Channel *channel, msgpack_object *request) + FUNC_ATTR_NONNULL_ALL +{ + uint64_t request_id; + Error error = ERROR_INIT; + msgpack_rpc_validate(&request_id, request, &error); + + if (error.set) { + // Validation failed, send response with error + channel_write(channel, + serialize_response(request_id, &error, NIL, &out_buffer)); + return; + } + + // Retrieve the request handler + MsgpackRpcRequestHandler handler; + msgpack_object method = request->via.array.ptr[2]; + + if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) { + handler = msgpack_rpc_get_handler_for(method.via.bin.ptr, + method.via.bin.size); + } else { + handler.fn = msgpack_rpc_handle_missing_method; + handler.defer = false; + } + + Array args; + msgpack_rpc_to_array(request->via.array.ptr + 3, &args); + + if (kv_size(channel->call_stack) || !handler.defer) { + call_request_handler(channel, handler, args, request_id); + return; + } + + // Defer calling the request handler. + RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool); + event_data->channel = channel; + event_data->handler = handler; + event_data->args = args; + event_data->request_id = request_id; + event_push((Event) { + .handler = on_request_event, + .data = event_data + }); +} + +static void on_request_event(Event event) +{ + RequestEvent *e = event.data; + call_request_handler(e->channel, e->handler, e->args, e->request_id); + kmp_free(RequestEventPool, request_event_pool, e); +} + +static void call_request_handler(Channel *channel, + MsgpackRpcRequestHandler handler, + Array args, + uint64_t request_id) +{ + Error error = ERROR_INIT; + Object result = handler.fn(channel->id, request_id, args, &error); + // send the response + msgpack_packer response; + msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); + + if (error.set) { + ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", + error.msg, + request_id); + channel_write(channel, + serialize_response(request_id, &error, NIL, &out_buffer)); + } + + DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", + request_id); + channel_write(channel, + serialize_response(request_id, &error, result, &out_buffer)); +} + static bool channel_write(Channel *channel, WBuffer *buffer) { bool success; @@ -501,26 +570,11 @@ static void unsubscribe(Channel *channel, char *event) free(event_string); } +/// Close the channel streams/job. The channel resources will be freed by +/// free_channel later. static void close_channel(Channel *channel) { - pmap_del(uint64_t)(channels, channel->id); - msgpack_unpacker_free(channel->unpacker); - - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); - channel_kill(channel); - - free(channel); -} - -static void channel_kill(Channel *channel) -{ + channel->closed = true; if (channel->is_job) { if (channel->data.job) { job_stop(channel->data.job); @@ -537,6 +591,22 @@ static void channel_kill(Channel *channel) } } +static void free_channel(Channel *channel) +{ + pmap_del(uint64_t)(channels, channel->id); + msgpack_unpacker_free(channel->unpacker); + + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + free(channel); +} + static void close_cb(uv_handle_t *handle) { free(handle->data); @@ -546,8 +616,7 @@ static void close_cb(uv_handle_t *handle) static Channel *register_channel(void) { Channel *rv = xmalloc(sizeof(Channel)); - rv->enabled = true; - rv->rpc_call_level = 0; + rv->closed = false; rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->id = next_id++; rv->subscribed_events = pmap_new(cstr_t)(); @@ -574,9 +643,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) kv_size(channel->call_stack) - 1)->request_id; } -static void call_stack_pop(msgpack_object *obj, Channel *channel) +static void complete_call(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, + kv_size(channel->call_stack) - 1); + frame->returned = true; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; if (frame->errored) { @@ -589,10 +660,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel) static void call_set_error(Channel *channel, char *msg) { for (size_t i = 0; i < kv_size(channel->call_stack); i++) { - ChannelCallFrame *frame = kv_pop(channel->call_stack); + ChannelCallFrame *frame = kv_A(channel->call_stack, i); + frame->returned = true; frame->errored = true; frame->result = STRING_OBJ(cstr_to_string(msg)); } - channel->enabled = false; + close_channel(channel); } diff --git a/src/nvim/os/channel.h b/src/nvim/msgpack_rpc/channel.h index bb409bfde9..df742fe368 100644 --- a/src/nvim/os/channel.h +++ b/src/nvim/msgpack_rpc/channel.h @@ -1,5 +1,5 @@ -#ifndef NVIM_OS_CHANNEL_H -#define NVIM_OS_CHANNEL_H +#ifndef NVIM_MSGPACK_RPC_CHANNEL_H +#define NVIM_MSGPACK_RPC_CHANNEL_H #include <stdbool.h> #include <uv.h> @@ -10,6 +10,6 @@ #define METHOD_MAXLEN 512 #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/channel.h.generated.h" +# include "msgpack_rpc/channel.h.generated.h" #endif -#endif // NVIM_OS_CHANNEL_H +#endif // NVIM_MSGPACK_RPC_CHANNEL_H diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/msgpack_rpc/defs.h index 3476d791ea..13067fb7b4 100644 --- a/src/nvim/os/msgpack_rpc.h +++ b/src/nvim/msgpack_rpc/defs.h @@ -1,29 +1,23 @@ -#ifndef NVIM_OS_MSGPACK_RPC_H -#define NVIM_OS_MSGPACK_RPC_H - -#include <stdint.h> +#ifndef NVIM_MSGPACK_RPC_DEFS_H +#define NVIM_MSGPACK_RPC_DEFS_H #include <msgpack.h> -#include "nvim/func_attr.h" -#include "nvim/api/private/defs.h" -#include "nvim/os/wstream.h" - -typedef enum { - kUnpackResultOk, /// Successfully parsed a document - kUnpackResultFail, /// Got unexpected input - kUnpackResultNeedMore /// Need more data -} UnpackResult; /// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores /// functions of this type. -typedef Object (*rpc_method_handler_fn)(uint64_t channel_id, - msgpack_object *req, - Error *error); - +typedef struct { + Object (*fn)(uint64_t channel_id, + uint64_t request_id, + Array args, + Error *error); + bool defer; // Should the call be deferred to the main loop? This should + // be true if the function mutates editor data structures such + // as buffers, windows, tabs, or if it executes vimscript code. +} MsgpackRpcRequestHandler; /// Initializes the msgpack-rpc method table -void msgpack_rpc_init(void); +void msgpack_rpc_init_method_table(void); void msgpack_rpc_init_function_metadata(Dictionary *metadata); @@ -43,9 +37,7 @@ Object msgpack_rpc_dispatch(uint64_t channel_id, Error *error) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3); - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_H +MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name, + size_t name_len) + FUNC_ATTR_NONNULL_ARG(1); +#endif // NVIM_MSGPACK_RPC_DEFS_H diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/msgpack_rpc/helpers.c index b14de8245c..6be221b912 100644 --- a/src/nvim/os/msgpack_rpc_helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -1,14 +1,18 @@ #include <stdint.h> #include <stdbool.h> +#include <inttypes.h> #include <msgpack.h> -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/api/private/helpers.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/defs.h" #include "nvim/vim.h" +#include "nvim/log.h" #include "nvim/memory.h" #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.c.generated.h" +# include "msgpack_rpc/helpers.c.generated.h" #endif static msgpack_zone zone; @@ -136,10 +140,13 @@ bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg) case MSGPACK_OBJECT_EXT: switch (obj->via.ext.type) { case kObjectTypeBuffer: + arg->type = kObjectTypeBuffer; return msgpack_rpc_to_buffer(obj, &arg->data.buffer); case kObjectTypeWindow: + arg->type = kObjectTypeWindow; return msgpack_rpc_to_window(obj, &arg->data.window); case kObjectTypeTabpage: + arg->type = kObjectTypeTabpage; return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage); } default: @@ -287,3 +294,136 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res) msgpack_rpc_from_object(result.items[i].value, res); } } + +/// Finishes the msgpack-rpc call with an error message. +/// +/// @param msg The error message +/// @param res A packer that contains the response +void msgpack_rpc_error(char *msg, msgpack_packer *res) + FUNC_ATTR_NONNULL_ALL +{ + size_t len = strlen(msg); + + // error message + msgpack_pack_bin(res, len); + msgpack_pack_bin_body(res, msg, len); + // Nil result + msgpack_pack_nil(res); +} + +/// Handler executed when an invalid method name is passed +Object msgpack_rpc_handle_missing_method(uint64_t channel_id, + uint64_t request_id, + Array args, + Error *error) +{ + snprintf(error->msg, sizeof(error->msg), "Invalid method name"); + error->set = true; + return NIL; +} + +/// Serializes a msgpack-rpc request or notification(id == 0) +WBuffer *serialize_request(uint64_t request_id, + String method, + Array args, + msgpack_sbuffer *sbuffer, + size_t refcount) + FUNC_ATTR_NONNULL_ARG(4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, request_id ? 4 : 3); + msgpack_pack_int(&pac, request_id ? 0 : 2); + + if (request_id) { + msgpack_pack_uint64(&pac, request_id); + } + + msgpack_pack_bin(&pac, method.size); + msgpack_pack_bin_body(&pac, method.data, method.size); + msgpack_rpc_from_array(args, &pac); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + refcount, + free); + api_free_array(args); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +/// Serializes a msgpack-rpc response +WBuffer *serialize_response(uint64_t response_id, + Error *err, + Object arg, + msgpack_sbuffer *sbuffer) + FUNC_ATTR_NONNULL_ARG(2, 4) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_pack_array(&pac, 4); + msgpack_pack_int(&pac, 1); + msgpack_pack_uint64(&pac, response_id); + + if (err->set) { + // error represented by a [type, message] array + msgpack_pack_array(&pac, 2); + msgpack_rpc_from_integer(err->type, &pac); + msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); + // Nil result + msgpack_pack_nil(&pac); + } else { + // Nil error + msgpack_pack_nil(&pac); + // Return value + msgpack_rpc_from_object(arg, &pac); + } + + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + 1, // responses only go though 1 channel + free); + api_free_object(arg); + msgpack_sbuffer_clear(sbuffer); + return rv; +} + +void msgpack_rpc_validate(uint64_t *response_id, + msgpack_object *req, + Error *err) +{ + // response id not known yet + + *response_id = 0; + // Validate the basic structure of the msgpack-rpc payload + if (req->type != MSGPACK_OBJECT_ARRAY) { + api_set_error(err, Validation, _("Request is not an array")); + } + + if (req->via.array.size != 4) { + api_set_error(err, Validation, _("Request array size should be 4")); + } + + if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Id must be a positive integer")); + } + + // Set the response id, which is the same as the request + *response_id = req->via.array.ptr[1].via.u64; + + if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + api_set_error(err, Validation, _("Message type must be an integer")); + } + + if (req->via.array.ptr[0].via.u64 != 0) { + api_set_error(err, Validation, _("Message type must be 0")); + } + + if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN + && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { + api_set_error(err, Validation, _("Method must be a string")); + } + + if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { + api_set_error(err, Validation, _("Paremeters must be an array")); + } +} diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h new file mode 100644 index 0000000000..bf161d54e0 --- /dev/null +++ b/src/nvim/msgpack_rpc/helpers.h @@ -0,0 +1,17 @@ +#ifndef NVIM_MSGPACK_RPC_HELPERS_H +#define NVIM_MSGPACK_RPC_HELPERS_H + +#include <stdint.h> +#include <stdbool.h> + +#include <msgpack.h> + +#include "nvim/os/wstream.h" +#include "nvim/api/private/defs.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/helpers.h.generated.h" +#endif + +#endif // NVIM_MSGPACK_RPC_HELPERS_H + diff --git a/src/nvim/os/server.c b/src/nvim/msgpack_rpc/server.c index 9f7f5b34da..087ba24111 100644 --- a/src/nvim/os/server.c +++ b/src/nvim/msgpack_rpc/server.c @@ -5,8 +5,8 @@ #include <uv.h> -#include "nvim/os/channel.h" -#include "nvim/os/server.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" #include "nvim/os/os.h" #include "nvim/ascii.h" #include "nvim/vim.h" @@ -46,7 +46,7 @@ typedef struct { static PMap(cstr_t) *servers = NULL; #ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.c.generated.h" +# include "msgpack_rpc/server.c.generated.h" #endif /// Initializes the module @@ -119,7 +119,8 @@ int server_start(const char *endpoint) ip_end = strchr(addr, NUL); } - uint32_t addr_len = ip_end - addr; + // (ip_end - addr) is always > 0, so convert to size_t + size_t addr_len = (size_t)(ip_end - addr); if (addr_len > sizeof(ip) - 1) { // Maximum length of an IP address buffer is 15(eg: 255.255.255.255) diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h new file mode 100644 index 0000000000..f1a6703938 --- /dev/null +++ b/src/nvim/msgpack_rpc/server.h @@ -0,0 +1,7 @@ +#ifndef NVIM_MSGPACK_RPC_SERVER_H +#define NVIM_MSGPACK_RPC_SERVER_H + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "msgpack_rpc/server.h.generated.h" +#endif +#endif // NVIM_MSGPACK_RPC_SERVER_H diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index a460b2db96..2dee529452 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -7,8 +7,10 @@ #include "nvim/os/event.h" #include "nvim/os/input.h" -#include "nvim/os/channel.h" -#include "nvim/os/server.h" +#include "nvim/msgpack_rpc/defs.h" +#include "nvim/msgpack_rpc/channel.h" +#include "nvim/msgpack_rpc/server.h" +#include "nvim/msgpack_rpc/helpers.h" #include "nvim/os/provider.h" #include "nvim/os/signal.h" #include "nvim/os/rstream.h" @@ -25,25 +27,22 @@ KLIST_INIT(Event, Event, _destroy_event) typedef struct { bool timed_out; - int32_t ms; + int ms; uv_timer_t *timer; } TimerData; #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.c.generated.h" #endif -static klist_t(Event) *deferred_events, *immediate_events; -// NULL-terminated array of event sources that we should process immediately. -// -// Events from sources that are not contained in this array are processed -// later when `event_process` is called -static EventSource *immediate_sources = NULL; +static klist_t(Event) *pending_events; void event_init(void) { + // early msgpack-rpc initialization + msgpack_rpc_init_method_table(); + msgpack_rpc_helpers_init(); // Initialize the event queues - deferred_events = kl_init(Event); - immediate_events = kl_init(Event); + pending_events = kl_init(Event); // Initialize input events input_init(); // Timer to wake the event loop if a timeout argument is passed to @@ -52,9 +51,8 @@ void event_init(void) signal_init(); // Jobs job_init(); - // Channels + // finish mspgack-rpc initialization channel_init(); - // Servers server_init(); // Providers provider_init(); @@ -68,8 +66,7 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms, EventSource sources[]) - FUNC_ATTR_NONNULL_ARG(2) +void event_poll(int ms) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -100,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[]) run_mode = UV_RUN_NOWAIT; } - size_t processed_events; - - do { - // Run one event loop iteration, blocking for events if run_mode is - // UV_RUN_ONCE - processed_events = loop(run_mode, sources); - } while ( - // Continue running if ... - !processed_events && // we didn't process any immediate events - !event_has_deferred() && // no events are waiting to be processed - run_mode != UV_RUN_NOWAIT && // ms != 0 - !timer_data.timed_out); // we didn't get a timeout + loop(run_mode); if (!(--recursive)) { // Again, only stop when we leave the top-level invocation @@ -123,68 +109,29 @@ bool event_poll(int32_t ms, EventSource sources[]) // once more to let libuv perform it's cleanup uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL); - processed_events += loop(UV_RUN_NOWAIT, sources); + loop(UV_RUN_NOWAIT); } - - return !timer_data.timed_out && (processed_events || event_has_deferred()); } bool event_has_deferred(void) { - return !kl_empty(deferred_events); + return !kl_empty(pending_events); } // Queue an event void event_push(Event event) { - bool defer = true; - - if (immediate_sources) { - size_t i; - EventSource src; - - for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) { - if (src == event.source) { - defer = false; - break; - } - } - } - - *kl_pushp(Event, defer ? deferred_events : immediate_events) = event; + *kl_pushp(Event, pending_events) = event; } -void event_process(void) -{ - process_from(deferred_events); -} -// Runs the appropriate action for each queued event -static size_t process_from(klist_t(Event) *queue) +void event_process(void) { - size_t count = 0; Event event; - while (kl_shift(Event, queue, &event) == 0) { - switch (event.type) { - case kEventSignal: - signal_handle(event); - break; - case kEventRStreamData: - rstream_read_event(event); - break; - case kEventJobExit: - job_exit_event(event); - break; - default: - abort(); - } - count++; + while (kl_shift(Event, pending_events, &event) == 0) { + event.handler(event); } - - DLOG("Processed %u events", count); - - return count; } // Set a flag in the `event_poll` loop for signaling of a timeout @@ -202,42 +149,9 @@ static void timer_prepare_cb(uv_prepare_t *handle) uv_prepare_stop(handle); } -static void requeue_deferred_events(void) +static void loop(uv_run_mode run_mode) { - size_t remaining = deferred_events->size; - - DLOG("Number of deferred events: %u", remaining); - - while (remaining--) { - // Re-push each deferred event to ensure it will be in the right queue - Event event; - kl_shift(Event, deferred_events, &event); - event_push(event); - DLOG("Re-queueing event"); - } - - DLOG("Number of deferred events: %u", deferred_events->size); -} - -static size_t loop(uv_run_mode run_mode, EventSource *sources) -{ - size_t count; - immediate_sources = sources; - // It's possible that some events from the immediate sources are waiting - // in the deferred queue. If so, move them to the immediate queue so they - // will be processed in order of arrival by the next `process_from` call. - requeue_deferred_events(); - count = process_from(immediate_events); - - if (count) { - // No need to enter libuv, events were already processed - return count; - } - DLOG("Enter event loop"); uv_run(uv_default_loop(), run_mode); DLOG("Exit event loop"); - immediate_sources = NULL; - count = process_from(immediate_events); - return count; } diff --git a/src/nvim/os/event.h b/src/nvim/os/event.h index 29e304adc8..f8139e978d 100644 --- a/src/nvim/os/event.h +++ b/src/nvim/os/event.h @@ -6,6 +6,27 @@ #include "nvim/os/event_defs.h" #include "nvim/os/job_defs.h" +#include "nvim/os/time.h" + +// Poll for events until a condition is true or a timeout has passed +#define event_poll_until(timeout, condition) \ + do { \ + int remaining = timeout; \ + uint64_t before = (remaining > 0) ? os_hrtime() : 0; \ + while (!(condition)) { \ + event_poll(remaining); \ + if (remaining == 0) { \ + break; \ + } else if (remaining > 0) { \ + uint64_t now = os_hrtime(); \ + remaining -= (int) ((now - before) / 1000000); \ + before = now; \ + if (remaining <= 0) { \ + break; \ + } \ + } \ + } \ + } while (0) #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.h.generated.h" diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index dbee3e2ba7..2dd9403d9f 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -6,25 +6,12 @@ #include "nvim/os/job_defs.h" #include "nvim/os/rstream_defs.h" -typedef void * EventSource; +typedef struct event Event; +typedef void (*event_handler)(Event event); -typedef enum { - kEventSignal, - kEventRStreamData, - kEventJobExit -} EventType; - -typedef struct { - EventSource source; - EventType type; - union { - int signum; - struct { - RStream *ptr; - bool eof; - } rstream; - Job *job; - } data; -} Event; +struct event { + void *data; + event_handler handler; +}; #endif // NVIM_OS_EVENT_DEFS_H diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index a18d735ce6..b7eba47d5e 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -7,7 +7,6 @@ #include "nvim/api/private/defs.h" #include "nvim/os/input.h" #include "nvim/os/event.h" -#include "nvim/os/signal.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" #include "nvim/ascii.h" @@ -48,10 +47,7 @@ void input_init(void) } read_buffer = rbuffer_new(READ_BUFFER_SIZE); - read_stream = rstream_new(read_cb, - read_buffer, - NULL, - NULL); + read_stream = rstream_new(read_cb, read_buffer, NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -76,7 +72,7 @@ void input_stop(void) } // Low level input function. -int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) +int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt) { InbufPollResult result; @@ -90,7 +86,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) return 0; } } else { - if ((result = inbuf_poll(p_ut)) == kInputNone) { + if ((result = inbuf_poll((int)p_ut)) == kInputNone) { if (trigger_cursorhold() && maxlen >= 3 && !typebuf_changed(tb_change_cnt)) { buf[0] = K_SPECIAL; @@ -120,7 +116,9 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt) } convert_input(); - return rbuffer_read(input_buffer, (char *)buf, maxlen); + // Safe to convert rbuffer_read to int, it will never overflow since + // we use relatively small buffers. + return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen); } // Check if a character is available for reading @@ -167,23 +165,14 @@ void input_buffer_restore(String str) free(str.data); } -static bool input_poll(int32_t ms) +static bool input_poll(int ms) { - if (embedded_mode) { - EventSource input_sources[] = { signal_event_source(), NULL }; - return event_poll(ms, input_sources); - } - - EventSource input_sources[] = { - rstream_event_source(read_stream), - NULL - }; - - return input_ready() || event_poll(ms, input_sources) || input_ready(); + event_poll_until(ms, input_ready()); + return input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c -static InbufPollResult inbuf_poll(int32_t ms) +static InbufPollResult inbuf_poll(int ms) { if (typebuf_was_filled || rbuffer_pending(input_buffer)) { return kInputAvail; @@ -235,7 +224,7 @@ static void read_cb(RStream *rstream, void *data, bool at_eof) static void convert_input(void) { - if (!rbuffer_available(input_buffer)) { + if (embedded_mode || !rbuffer_available(input_buffer)) { // No input buffer space return; } @@ -273,9 +262,9 @@ static void convert_input(void) char *inbuf = rbuffer_read_ptr(input_buffer); size_t count = rbuffer_pending(input_buffer), consume_count = 0; - for (int i = count - 1; i >= 0; i--) { + for (int i = (int)count - 1; i >= 0; i--) { if (inbuf[i] == 3) { - consume_count = i + 1; + consume_count = (size_t)i; break; } } @@ -304,6 +293,10 @@ static int push_event_key(uint8_t *buf, int maxlen) // Check if there's pending input static bool input_ready(void) { - return rstream_pending(read_stream) > 0 || eof; + return typebuf_was_filled || // API call filled typeahead + event_has_deferred() || // Events must be processed + (!embedded_mode && ( + rstream_pending(read_stream) > 0 || // Stdin input + eof)); // Stdin closed } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 2ca1023290..4c01829159 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -12,9 +12,7 @@ #include "nvim/os/wstream_defs.h" #include "nvim/os/event.h" #include "nvim/os/event_defs.h" -#include "nvim/os/time.h" #include "nvim/os/shell.h" -#include "nvim/os/signal.h" #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/term.h" @@ -99,25 +97,28 @@ void job_teardown(void) // their status with `wait` or handling SIGCHLD. libuv does that // automatically (and then calls `exit_cb`) but we have to give it a chance // by running the loop one more time - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); // Prepare to start shooting for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL) { - continue; - } + job = table[i]; // Still alive - while (is_alive(job) && remaining_tries--) { + while (job && is_alive(job) && remaining_tries--) { os_delay(50, 0); // Acknowledge child exits - uv_run(uv_default_loop(), UV_RUN_NOWAIT); + event_poll(0); + // It's possible that the event_poll call removed the job from the table, + // reset 'job' so the next iteration won't run in that case. + job = table[i]; } - if (is_alive(job)) { + if (job && is_alive(job)) { uv_process_kill(&job->proc, SIGKILL); } } + // Last run to ensure all children were removed + event_poll(0); } /// Tries to start a new job. @@ -213,14 +214,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); - job->err = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); + job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -277,47 +272,33 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int old_mode = cur_tmode; settmode(TMODE_COOK); - EventSource sources[] = {job_event_source(job), signal_event_source(), NULL}; - - // keep track of the elapsed time if ms > 0 - uint64_t before = (ms > 0) ? os_hrtime() : 0; - - while (1) { - // check if the job has exited (and the status is available). - if (job->pending_refs == 0) { - break; - } - - event_poll(ms, sources); - - // we'll assume that a user frantically hitting interrupt doesn't like - // the current job. Signal that it has to be killed. - if (got_int) { - job_stop(job); - } - - if (ms == 0) { - break; - } - - // check if the poll timed out, if not, decrease the ms to wait for the - // next run - if (ms > 0) { - uint64_t now = os_hrtime(); - ms -= (int) ((now - before) / 1000000); - before = now; - - // if the time elapsed is greater than the `ms` wait time, break - if (ms <= 0) { - break; - } - } + // Increase pending_refs to stop the exit_cb from being called, which + // could result in the job being freed before we have a chance + // to get the status. + job->pending_refs++; + event_poll_until(ms, + // Until... + got_int || // interrupted by the user + job->pending_refs == 1); // job exited + job->pending_refs--; + + // we'll assume that a user frantically hitting interrupt doesn't like + // the current job. Signal that it has to be killed. + if (got_int) { + job_stop(job); + event_poll(0); } settmode(old_mode); - // return -1 for a timeout, the job status otherwise - return (job->pending_refs) ? -1 : (int) job->status; + if (!job->pending_refs) { + int status = (int) job->status; + job_exit_callback(job); + return status; + } + + // return -1 for a timeout + return -1; } /// Close the pipe used to write to the job. @@ -369,14 +350,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Runs the read callback associated with the job exit event -/// -/// @param event Object containing data necessary to invoke the callback -void job_exit_event(Event event) -{ - job_exit_callback(event.data.job); -} - /// Get the job id /// /// @param job A pointer to the job @@ -395,11 +368,6 @@ void *job_data(Job *job) return job->data; } -EventSource job_event_source(Job *job) -{ - return job; -} - static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -470,7 +438,7 @@ static void read_cb(RStream *rstream, void *data, bool eof) } if (eof && --job->pending_refs == 0) { - emit_exit_event(job); + job_exit_callback(job); } } @@ -481,20 +449,10 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) job->status = status; if (--job->pending_refs == 0) { - emit_exit_event(job); + job_exit_callback(job); } } -static void emit_exit_event(Job *job) -{ - Event event = { - .source = job_event_source(job), - .type = kEventJobExit, - .data.job = job - }; - event_push(event); -} - static void close_cb(uv_handle_t *handle) { Job *job = handle_get_job(handle); diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c deleted file mode 100644 index 55bc006ad1..0000000000 --- a/src/nvim/os/msgpack_rpc.c +++ /dev/null @@ -1,188 +0,0 @@ -#include <stdint.h> -#include <stdbool.h> -#include <inttypes.h> - -#include <msgpack.h> - -#include "nvim/vim.h" -#include "nvim/log.h" -#include "nvim/memory.h" -#include "nvim/os/wstream.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" -#include "nvim/api/private/helpers.h" -#include "nvim/func_attr.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc.c.generated.h" -#endif - -/// Validates the basic structure of the msgpack-rpc call and fills `res` -/// with the basic response structure. -/// -/// @param channel_id The channel id -/// @param req The parsed request object -/// @param res A packer that contains the response -WBuffer *msgpack_rpc_call(uint64_t channel_id, - msgpack_object *req, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2) - FUNC_ATTR_NONNULL_ARG(3) -{ - uint64_t response_id; - Error error = ERROR_INIT; - msgpack_rpc_validate(&response_id, req, &error); - - if (error.set) { - return serialize_response(response_id, &error, NIL, sbuffer); - } - - // dispatch the call - Object rv = msgpack_rpc_dispatch(channel_id, req, &error); - // send the response - msgpack_packer response; - msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); - - if (error.set) { - ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", - error.msg, - response_id); - return serialize_response(response_id, &error, NIL, sbuffer); - } - - DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", - response_id); - return serialize_response(response_id, &error, rv, sbuffer); -} - -/// Finishes the msgpack-rpc call with an error message. -/// -/// @param msg The error message -/// @param res A packer that contains the response -void msgpack_rpc_error(char *msg, msgpack_packer *res) - FUNC_ATTR_NONNULL_ALL -{ - size_t len = strlen(msg); - - // error message - msgpack_pack_bin(res, len); - msgpack_pack_bin_body(res, msg, len); - // Nil result - msgpack_pack_nil(res); -} - -/// Handler executed when an invalid method name is passed -Object msgpack_rpc_handle_missing_method(uint64_t channel_id, - msgpack_object *req, - Error *error) -{ - snprintf(error->msg, sizeof(error->msg), "Invalid method name"); - error->set = true; - return NIL; -} - -/// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t request_id, - String method, - Array args, - msgpack_sbuffer *sbuffer, - size_t refcount) - FUNC_ATTR_NONNULL_ARG(4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, request_id ? 4 : 3); - msgpack_pack_int(&pac, request_id ? 0 : 2); - - if (request_id) { - msgpack_pack_uint64(&pac, request_id); - } - - msgpack_pack_bin(&pac, method.size); - msgpack_pack_bin_body(&pac, method.data, method.size); - msgpack_rpc_from_array(args, &pac); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - free); - api_free_array(args); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -/// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t response_id, - Error *err, - Object arg, - msgpack_sbuffer *sbuffer) - FUNC_ATTR_NONNULL_ARG(2, 4) -{ - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, 4); - msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, response_id); - - if (err->set) { - // error represented by a [type, message] array - msgpack_pack_array(&pac, 2); - msgpack_rpc_from_integer(err->type, &pac); - msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); - // Nil result - msgpack_pack_nil(&pac); - } else { - // Nil error - msgpack_pack_nil(&pac); - // Return value - msgpack_rpc_from_object(arg, &pac); - } - - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - free); - api_free_object(arg); - msgpack_sbuffer_clear(sbuffer); - return rv; -} - -static void msgpack_rpc_validate(uint64_t *response_id, - msgpack_object *req, - Error *err) -{ - // response id not known yet - - *response_id = 0; - // Validate the basic structure of the msgpack-rpc payload - if (req->type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Request is not an array")); - } - - if (req->via.array.size != 4) { - api_set_error(err, Validation, _("Request array size should be 4")); - } - - if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Id must be a positive integer")); - } - - // Set the response id, which is the same as the request - *response_id = req->via.array.ptr[1].via.u64; - - if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - api_set_error(err, Validation, _("Message type must be an integer")); - } - - if (req->via.array.ptr[0].via.u64 != 0) { - api_set_error(err, Validation, _("Message type must be 0")); - } - - if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN - && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) { - api_set_error(err, Validation, _("Method must be a string")); - } - - if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) { - api_set_error(err, Validation, _("Paremeters must be an array")); - } -} diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h deleted file mode 100644 index aede6b1587..0000000000 --- a/src/nvim/os/msgpack_rpc_helpers.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H -#define NVIM_OS_MSGPACK_RPC_HELPERS_H - -#include <stdint.h> -#include <stdbool.h> - -#include <msgpack.h> - -#include "nvim/api/private/defs.h" - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/msgpack_rpc_helpers.h.generated.h" -#endif - -#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H - diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c index d4fffaa053..414c8841fa 100644 --- a/src/nvim/os/provider.c +++ b/src/nvim/os/provider.c @@ -8,7 +8,7 @@ #include "nvim/api/vim.h" #include "nvim/api/private/helpers.h" #include "nvim/api/private/defs.h" -#include "nvim/os/channel.h" +#include "nvim/msgpack_rpc/channel.h" #include "nvim/os/shell.h" #include "nvim/os/os.h" #include "nvim/log.h" diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 8f1c30de50..8cfd9d1b75 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -8,8 +8,6 @@ #include "nvim/os/uv_helpers.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/ascii.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -33,7 +31,6 @@ struct rstream { uv_file fd; rstream_cb cb; bool free_handle; - EventSource source_override; }; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -76,18 +73,13 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count) void rbuffer_produced(RBuffer *rbuffer, size_t count) { rbuffer->wpos += count; - DLOG("Received %u bytes from RStream(address: %p, source: %p)", - (size_t)cnt, - rbuffer->rstream, - rstream_event_source(rbuffer->rstream)); + DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rbuffer->rstream); rbuffer_relocate(rbuffer); if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { // The last read filled the buffer, stop reading for now rstream_stop(rbuffer->rstream); - DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", - rstream, - rstream_event_source(rstream)); + DLOG("Buffer for RStream(%p) is full, stopping it", rstream); } } @@ -180,13 +172,8 @@ void rbuffer_free(RBuffer *rbuffer) /// for reading with `rstream_read` /// @param buffer RBuffer instance to associate with the RStream /// @param data Some state to associate with the `RStream` instance -/// @param source_override Replacement for the default source used in events -/// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance -RStream * rstream_new(rstream_cb cb, - RBuffer *buffer, - void *data, - EventSource source_override) +RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) { RStream *rv = xmalloc(sizeof(RStream)); rv->buffer = buffer; @@ -198,7 +185,6 @@ RStream * rstream_new(rstream_cb cb, rv->fread_idle = NULL; rv->free_handle = false; rv->file_type = UV_UNKNOWN_HANDLE; - rv->source_override = source_override ? source_override : rv; return rv; } @@ -322,21 +308,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count) return rbuffer_read(rstream->buffer, buffer, count); } -/// Runs the read callback associated with the rstream -/// -/// @param event Object containing data necessary to invoke the callback -void rstream_read_event(Event event) -{ - RStream *rstream = event.data.rstream.ptr; - - rstream->cb(rstream, rstream->data, event.data.rstream.eof); -} - -EventSource rstream_event_source(RStream *rstream) -{ - return rstream->source_override; -} - // Callbacks used by libuv // Called by libuv to allocate memory for reading. @@ -357,13 +328,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt <= 0) { if (cnt != UV_ENOBUFS) { - DLOG("Closing RStream(address: %p, source: %p)", - rstream, - rstream_event_source(rstream)); + DLOG("Closing RStream(%p)", rstream); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); - emit_read_event(rstream, true); + rstream->cb(rstream, rstream->data, true); } return; } @@ -374,7 +343,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(rstream->buffer, nread); - emit_read_event(rstream, false); + rstream->cb(rstream, rstream->data, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -409,7 +378,6 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(rstream->fread_idle); - emit_read_event(rstream, true); return; } @@ -417,7 +385,6 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(rstream->buffer, nread); rstream->fpos += nread; - emit_read_event(rstream, false); } static void close_cb(uv_handle_t *handle) @@ -426,19 +393,6 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static void emit_read_event(RStream *rstream, bool eof) -{ - Event event = { - .source = rstream_event_source(rstream), - .type = kEventRStreamData, - .data.rstream = { - .ptr = rstream, - .eof = eof - } - }; - event_push(event); -} - static void rbuffer_relocate(RBuffer *rbuffer) { // Move data ... diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h deleted file mode 100644 index 43592a91e4..0000000000 --- a/src/nvim/os/server.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_H -#define NVIM_OS_SERVER_H - -#ifdef INCLUDE_GENERATED_DECLARATIONS -# include "os/server.h.generated.h" -#endif -#endif // NVIM_OS_SERVER_H diff --git a/src/nvim/os/server_defs.h b/src/nvim/os/server_defs.h deleted file mode 100644 index 08cdf55428..0000000000 --- a/src/nvim/os/server_defs.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef NVIM_OS_SERVER_DEFS_H -#define NVIM_OS_SERVER_DEFS_H - -typedef struct server Server; - -#endif // NVIM_OS_SERVER_DEFS_H - diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 453cc6d605..d5464f7975 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -1,4 +1,5 @@ #include <string.h> +#include <assert.h> #include <stdbool.h> #include <stdlib.h> @@ -7,6 +8,7 @@ #include "nvim/ascii.h" #include "nvim/lib/kvec.h" #include "nvim/log.h" +#include "nvim/os/event.h" #include "nvim/os/job.h" #include "nvim/os/rstream.h" #include "nvim/os/shell.h" @@ -58,11 +60,11 @@ typedef struct { /// `shell_free_argv` when no longer needed. char **shell_build_argv(const char_u *cmd, const char_u *extra_shell_opt) { - int argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL); + size_t argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL); char **rv = xmalloc((unsigned)((argc + 4) * sizeof(char *))); // Split 'shell' - int i = tokenize(p_sh, rv); + size_t i = tokenize(p_sh, rv); if (extra_shell_opt != NULL) { // Push a copy of `extra_shell_opt` @@ -212,7 +214,7 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg) // Keep running the loop until all three handles are completely closed while (pdata.exited < expected_exits) { - uv_run(uv_default_loop(), UV_RUN_ONCE); + event_poll(0); if (got_int) { // Forward SIGINT to the shell @@ -356,9 +358,9 @@ static void system_data_cb(RStream *rstream, void *data, bool eof) /// @param argv The vector that will be filled with copies of the parsed /// words. It can be NULL if the caller only needs to count words. /// @return The number of words parsed. -static int tokenize(const char_u *str, char **argv) +static size_t tokenize(const char_u *str, char **argv) { - int argc = 0, len; + size_t argc = 0, len; char_u *p = (char_u *) str; while (*p != NUL) { @@ -383,11 +385,11 @@ static int tokenize(const char_u *str, char **argv) /// /// @param str A pointer to the first character of the word /// @return The offset from `str` at which the word ends. -static int word_length(const char_u *str) +static size_t word_length(const char_u *str) { const char_u *p = str; bool inquote = false; - int length = 0; + size_t length = 0; // Move `p` to the end of shell word by advancing the pointer while it's // inside a quote or it's a non-whitespace character @@ -418,15 +420,15 @@ static void write_selection(uv_write_t *req) // TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and // only after filled we should start allocating memory(skip unnecessary // allocations for small writes) - int buflen = BUFFER_LENGTH; + size_t buflen = BUFFER_LENGTH; pdata->wbuffer = (char *)xmalloc(buflen); uv_buf_t uvbuf; linenr_T lnum = curbuf->b_op_start.lnum; - int off = 0; - int written = 0; + size_t off = 0; + size_t written = 0; char_u *lp = ml_get(lnum); - int l; - int len; + size_t l; + size_t len; for (;;) { l = strlen((char *)lp + written); @@ -443,7 +445,7 @@ static void write_selection(uv_write_t *req) pdata->wbuffer[off++] = NUL; } else { char_u *s = vim_strchr(lp + written, NL); - len = s == NULL ? l : s - (lp + written); + len = s == NULL ? l : (size_t)(s - (lp + written)); while (off + len >= buflen) { // Resize the buffer buflen *= 2; @@ -584,6 +586,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) { ProcessData *data = (ProcessData *)proc->data; data->exited++; - data->exit_status = status; + assert(status <= INT_MAX); + data->exit_status = (int)status; uv_close((uv_handle_t *)proc, NULL); } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 2f93cfb08a..36f7b37c48 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -12,8 +12,6 @@ #include "nvim/memory.h" #include "nvim/misc1.h" #include "nvim/misc2.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/os/signal.h" static uv_signal_t sint, spipe, shup, squit, sterm, swinch; @@ -72,45 +70,6 @@ void signal_accept_deadly(void) rejecting_deadly = false; } -void signal_handle(Event event) -{ - int signum = event.data.signum; - - switch (signum) { - case SIGINT: - got_int = true; - break; -#ifdef SIGPWR - case SIGPWR: - // Signal of a power failure(eg batteries low), flush the swap files to - // be safe - ml_sync_all(false, false); - break; -#endif - case SIGPIPE: - // Ignore - break; - case SIGWINCH: - shell_resized(); - break; - case SIGTERM: - case SIGQUIT: - case SIGHUP: - if (!rejecting_deadly) { - deadly_signal(signum); - } - break; - default: - fprintf(stderr, "Invalid signal %d", signum); - break; - } -} - -EventSource signal_event_source(void) -{ - return &sint; -} - static char * signal_name(int signum) { switch (signum) { @@ -154,20 +113,32 @@ static void deadly_signal(int signum) static void signal_cb(uv_signal_t *handle, int signum) { - if (rejecting_deadly) { - if (signum == SIGINT) { + switch (signum) { + case SIGINT: got_int = true; - } - - return; + break; +#ifdef SIGPWR + case SIGPWR: + // Signal of a power failure(eg batteries low), flush the swap files to + // be safe + ml_sync_all(false, false); + break; +#endif + case SIGPIPE: + // Ignore + break; + case SIGWINCH: + shell_resized(); + break; + case SIGTERM: + case SIGQUIT: + case SIGHUP: + if (!rejecting_deadly) { + deadly_signal(signum); + } + break; + default: + fprintf(stderr, "Invalid signal %d", signum); + break; } - - Event event = { - .source = signal_event_source(), - .type = kEventSignal, - .data = { - .signum = signum - } - }; - event_push(event); } diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c index e3b76ac833..a4871ef499 100644 --- a/src/nvim/os/time.c +++ b/src/nvim/os/time.c @@ -1,3 +1,4 @@ +#include <assert.h> #include <stdint.h> #include <stdbool.h> #include <time.h> @@ -64,23 +65,6 @@ void os_microdelay(uint64_t microseconds, bool ignoreinput) } } -static void microdelay(uint64_t microseconds) -{ - uint64_t hrtime; - int64_t ns = microseconds * 1000; // convert to nanoseconds - - uv_mutex_lock(&delay_mutex); - - while (ns > 0) { - hrtime = uv_hrtime(); - if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns) == UV_ETIMEDOUT) - break; - ns -= uv_hrtime() - hrtime; - } - - uv_mutex_unlock(&delay_mutex); -} - /// Portable version of POSIX localtime_r() /// /// @return NULL in case of error @@ -112,3 +96,23 @@ struct tm *os_get_localtime(struct tm *result) FUNC_ATTR_NONNULL_ALL time_t rawtime = time(NULL); return os_localtime_r(&rawtime, result); } + +static void microdelay(uint64_t microseconds) +{ + uint64_t elapsed = 0; + uint64_t ns = microseconds * 1000; // convert to nanoseconds + uint64_t base = uv_hrtime(); + + uv_mutex_lock(&delay_mutex); + + while (elapsed < ns) { + if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns - elapsed) + == UV_ETIMEDOUT) + break; + uint64_t now = uv_hrtime(); + elapsed += now - base; + base = now; + } + + uv_mutex_unlock(&delay_mutex); +} diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c index 52f57f8262..0ad15bc433 100644 --- a/src/nvim/os_unix.c +++ b/src/nvim/os_unix.c @@ -54,8 +54,8 @@ #include "nvim/os/shell.h" #include "nvim/os/signal.h" #include "nvim/os/job.h" -#include "nvim/os/msgpack_rpc.h" -#include "nvim/os/msgpack_rpc_helpers.h" +#include "nvim/msgpack_rpc/helpers.h" +#include "nvim/msgpack_rpc/defs.h" #if defined(HAVE_SYS_IOCTL_H) # include <sys/ioctl.h> @@ -166,8 +166,6 @@ void mch_init(void) mac_conv_init(); #endif - msgpack_rpc_init(); - msgpack_rpc_helpers_init(); event_init(); } |