aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-10-22 07:30:32 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-10-22 07:30:32 -0300
commit5cd9b6474287650790e97187058c81b176fbc7c9 (patch)
tree57a419ce9104adf84bd1152a01f9c1f75f4bef85 /src
parent6e268cd0d40a3652a68b486bdbb421d39295ab48 (diff)
parentf7fab4af863839a064a941a555b237b6eb789870 (diff)
downloadrneovim-5cd9b6474287650790e97187058c81b176fbc7c9.tar.gz
rneovim-5cd9b6474287650790e97187058c81b176fbc7c9.tar.bz2
rneovim-5cd9b6474287650790e97187058c81b176fbc7c9.zip
Merge PR #1316 'Refactor event deferral'
Diffstat (limited to 'src')
-rw-r--r--src/nvim/CMakeLists.txt44
-rw-r--r--src/nvim/api/buffer.c7
-rw-r--r--src/nvim/api/tabpage.c1
-rw-r--r--src/nvim/api/vim.c22
-rw-r--r--src/nvim/api/window.c5
-rw-r--r--src/nvim/eval.c58
-rw-r--r--src/nvim/func_attr.h1
-rw-r--r--src/nvim/lib/klist.h2
-rw-r--r--src/nvim/main.c2
-rw-r--r--src/nvim/map.c5
-rw-r--r--src/nvim/map.h4
-rw-r--r--src/nvim/msgpack_rpc/channel.c (renamed from src/nvim/os/channel.c)230
-rw-r--r--src/nvim/msgpack_rpc/channel.h (renamed from src/nvim/os/channel.h)8
-rw-r--r--src/nvim/msgpack_rpc/defs.h (renamed from src/nvim/os/msgpack_rpc.h)40
-rw-r--r--src/nvim/msgpack_rpc/helpers.c (renamed from src/nvim/os/msgpack_rpc_helpers.c)144
-rw-r--r--src/nvim/msgpack_rpc/helpers.h17
-rw-r--r--src/nvim/msgpack_rpc/server.c (renamed from src/nvim/os/server.c)9
-rw-r--r--src/nvim/msgpack_rpc/server.h7
-rw-r--r--src/nvim/os/event.c126
-rw-r--r--src/nvim/os/event.h21
-rw-r--r--src/nvim/os/event_defs.h25
-rw-r--r--src/nvim/os/input.c43
-rw-r--r--src/nvim/os/job.c116
-rw-r--r--src/nvim/os/msgpack_rpc.c188
-rw-r--r--src/nvim/os/msgpack_rpc_helpers.h16
-rw-r--r--src/nvim/os/provider.c2
-rw-r--r--src/nvim/os/rstream.c58
-rw-r--r--src/nvim/os/server.h7
-rw-r--r--src/nvim/os/server_defs.h7
-rw-r--r--src/nvim/os/shell.c31
-rw-r--r--src/nvim/os/signal.c81
-rw-r--r--src/nvim/os/time.c38
-rw-r--r--src/nvim/os_unix.c6
33 files changed, 617 insertions, 754 deletions
diff --git a/src/nvim/CMakeLists.txt b/src/nvim/CMakeLists.txt
index 208df31596..a77e5e27a0 100644
--- a/src/nvim/CMakeLists.txt
+++ b/src/nvim/CMakeLists.txt
@@ -3,7 +3,7 @@ include(CheckLibraryExists)
set(GENERATED_DIR ${PROJECT_BINARY_DIR}/src/nvim/auto)
set(DISPATCH_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/msgpack-gen.lua)
file(GLOB API_HEADERS api/*.h)
-set(MSGPACK_RPC_HEADER ${PROJECT_SOURCE_DIR}/src/nvim/os/msgpack_rpc.h)
+file(GLOB MSGPACK_RPC_HEADERS msgpack_rpc/*.h)
set(MSGPACK_DISPATCH ${GENERATED_DIR}/msgpack_dispatch.c)
set(HEADER_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/gendeclarations.lua)
set(GENERATED_INCLUDES_DIR ${PROJECT_BINARY_DIR}/include)
@@ -19,12 +19,14 @@ file(MAKE_DIRECTORY ${GENERATED_DIR})
file(MAKE_DIRECTORY ${GENERATED_DIR}/os)
file(MAKE_DIRECTORY ${GENERATED_DIR}/api)
file(MAKE_DIRECTORY ${GENERATED_DIR}/api/private)
+file(MAKE_DIRECTORY ${GENERATED_DIR}/msgpack_rpc)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR})
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/os)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api/private)
+file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/msgpack_rpc)
-file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c)
+file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c msgpack_rpc/*.c)
file(GLOB_RECURSE NEOVIM_HEADERS *.h)
foreach(sfile ${NEOVIM_SOURCES})
@@ -36,8 +38,7 @@ endforeach()
list(REMOVE_ITEM NEOVIM_SOURCES ${to_remove})
-set(CONV_SRCS
- api.c
+set(CONV_SOURCES
arabic.c
cursor.c
garray.c
@@ -46,31 +47,24 @@ set(CONV_SRCS
map.c
memory.c
misc2.c
- map.c
profile.c
- os/env.c
- os/event.c
- os/job.c
- os/mem.c
- os/rstream.c
- os/signal.c
- os/users.c
- os/provider.c
- os/uv_helpers.c
- os/wstream.c
- os/msgpack_rpc.c
tempfile.c
- api/buffer.c
- api/private/helpers.c
- api/private/handle.c
- api/tabpage.c
- api/window.c
- api/vim.h
- api/vim.c
)
+foreach(sfile ${CONV_SOURCES})
+ if(NOT EXISTS "${PROJECT_SOURCE_DIR}/src/nvim/${sfile}")
+ message(FATAL_ERROR "${sfile} doesn't exist(it was added to CONV_SOURCES)")
+ endif()
+endforeach()
+
+file(GLOB_RECURSE EXTRA_CONV_SOURCES os/*.c api/*.c msgpack_rpc/*.c)
+foreach(sfile ${EXTRA_CONV_SOURCES})
+ file(RELATIVE_PATH f "${PROJECT_SOURCE_DIR}/src/nvim" "${sfile}")
+ list(APPEND CONV_SOURCES ${f})
+endforeach()
+
set_source_files_properties(
- ${CONV_SRCS} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion")
+ ${CONV_SOURCES} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion")
if(CMAKE_C_COMPILER_ID MATCHES "Clang")
if(DEFINED ENV{SANITIZE})
@@ -126,7 +120,7 @@ add_custom_command(OUTPUT ${MSGPACK_DISPATCH}
COMMAND ${LUA_PRG} ${DISPATCH_GENERATOR} ${API_HEADERS} ${MSGPACK_DISPATCH}
DEPENDS
${API_HEADERS}
- ${MSGPACK_RPC_HEADER}
+ ${MSGPACK_RPC_HEADERS}
${DISPATCH_GENERATOR}
)
diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c
index 4ff5845bd4..982003a31a 100644
--- a/src/nvim/api/buffer.c
+++ b/src/nvim/api/buffer.c
@@ -69,6 +69,7 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err)
/// @param line The new line.
/// @param[out] err Details of an error that may have occurred
void buffer_set_line(Buffer buffer, Integer index, String line, Error *err)
+ FUNC_ATTR_DEFERRED
{
Object l = STRING_OBJ(line);
Array array = {.items = &l, .size = 1};
@@ -81,6 +82,7 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err)
/// @param index The line index
/// @param[out] err Details of an error that may have occurred
void buffer_del_line(Buffer buffer, Integer index, Error *err)
+ FUNC_ATTR_DEFERRED
{
Array array = ARRAY_DICT_INIT;
buffer_set_line_slice(buffer, index, index, true, true, array, err);
@@ -163,6 +165,7 @@ void buffer_set_line_slice(Buffer buffer,
Boolean include_end,
ArrayOf(String) replacement,
Error *err)
+ FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@@ -314,6 +317,7 @@ Object buffer_get_var(Buffer buffer, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The old value
Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@@ -349,6 +353,7 @@ Object buffer_get_option(Buffer buffer, String name, Error *err)
/// @param value The option value
/// @param[out] err Details of an error that may have occurred
void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@@ -399,6 +404,7 @@ String buffer_get_name(Buffer buffer, Error *err)
/// @param name The buffer name
/// @param[out] err Details of an error that may have occurred
void buffer_set_name(Buffer buffer, String name, Error *err)
+ FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@@ -444,6 +450,7 @@ void buffer_insert(Buffer buffer,
Integer lnum,
ArrayOf(String) lines,
Error *err)
+ FUNC_ATTR_DEFERRED
{
buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err);
}
diff --git a/src/nvim/api/tabpage.c b/src/nvim/api/tabpage.c
index 3e5d00671a..cb06825731 100644
--- a/src/nvim/api/tabpage.c
+++ b/src/nvim/api/tabpage.c
@@ -62,6 +62,7 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The tab page handle
Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
tabpage_T *tab = find_tab_by_handle(tabpage, err);
diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c
index 5e0f3e0c32..c90e7039ce 100644
--- a/src/nvim/api/vim.c
+++ b/src/nvim/api/vim.c
@@ -10,7 +10,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/api/buffer.h"
-#include "nvim/os/channel.h"
+#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/provider.h"
#include "nvim/vim.h"
#include "nvim/buffer.h"
@@ -31,19 +31,12 @@
# include "api/vim.c.generated.h"
#endif
-/// Send keys to vim input buffer, simulating user input.
-///
-/// @param str The keys to send
-void vim_push_keys(String str)
-{
- abort();
-}
-
/// Executes an ex-mode command str
///
/// @param str The command str
/// @param[out] err Details of an error that may have occurred
void vim_command(String str, Error *err)
+ FUNC_ATTR_DEFERRED
{
// Run the command
try_start();
@@ -111,6 +104,7 @@ String vim_replace_termcodes(String str, Boolean from_part, Boolean do_lt,
/// @param[out] err Details of an error that may have occurred
/// @return The expanded object
Object vim_eval(String str, Error *err)
+ FUNC_ATTR_DEFERRED
{
Object rv;
// Evaluate the expression
@@ -230,6 +224,7 @@ String vim_get_current_line(Error *err)
/// @param line The line contents
/// @param[out] err Details of an error that may have occurred
void vim_set_current_line(String line, Error *err)
+ FUNC_ATTR_DEFERRED
{
buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err);
}
@@ -238,6 +233,7 @@ void vim_set_current_line(String line, Error *err)
///
/// @param[out] err Details of an error that may have occurred
void vim_del_current_line(Error *err)
+ FUNC_ATTR_DEFERRED
{
buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err);
}
@@ -259,6 +255,7 @@ Object vim_get_var(String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return the old value if any
Object vim_set_var(String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
return dict_set_value(&globvardict, name, value, err);
}
@@ -289,6 +286,7 @@ Object vim_get_option(String name, Error *err)
/// @param value The new option value
/// @param[out] err Details of an error that may have occurred
void vim_set_option(String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
set_option_to(NULL, SREQ_GLOBAL, name, value, err);
}
@@ -297,6 +295,7 @@ void vim_set_option(String name, Object value, Error *err)
///
/// @param str The message
void vim_out_write(String str)
+ FUNC_ATTR_DEFERRED
{
write_msg(str, false);
}
@@ -305,6 +304,7 @@ void vim_out_write(String str)
///
/// @param str The message
void vim_err_write(String str)
+ FUNC_ATTR_DEFERRED
{
write_msg(str, true);
}
@@ -314,6 +314,7 @@ void vim_err_write(String str)
///
/// @param str The message
void vim_report_error(String str)
+ FUNC_ATTR_DEFERRED
{
vim_err_write(str);
vim_err_write((String) {.data = "\n", .size = 1});
@@ -357,6 +358,7 @@ Buffer vim_get_current_buffer(void)
/// @param id The buffer handle
/// @param[out] err Details of an error that may have occurred
void vim_set_current_buffer(Buffer buffer, Error *err)
+ FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@@ -407,6 +409,7 @@ Window vim_get_current_window(void)
///
/// @param handle The window handle
void vim_set_current_window(Window window, Error *err)
+ FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@@ -462,6 +465,7 @@ Tabpage vim_get_current_tabpage(void)
/// @param handle The tab page handle
/// @param[out] err Details of an error that may have occurred
void vim_set_current_tabpage(Tabpage tabpage, Error *err)
+ FUNC_ATTR_DEFERRED
{
tabpage_T *tp = find_tab_by_handle(tabpage, err);
diff --git a/src/nvim/api/window.c b/src/nvim/api/window.c
index 751518424b..fde1ebfa4c 100644
--- a/src/nvim/api/window.c
+++ b/src/nvim/api/window.c
@@ -52,6 +52,7 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err)
/// @param pos the (row, col) tuple representing the new position
/// @param[out] err Details of an error that may have occurred
void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err)
+ FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@@ -111,6 +112,7 @@ Integer window_get_height(Window window, Error *err)
/// @param height the new height in rows
/// @param[out] err Details of an error that may have occurred
void window_set_height(Window window, Integer height, Error *err)
+ FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@@ -154,6 +156,7 @@ Integer window_get_width(Window window, Error *err)
/// @param width the new width in columns
/// @param[out] err Details of an error that may have occurred
void window_set_width(Window window, Integer width, Error *err)
+ FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@@ -199,6 +202,7 @@ Object window_get_var(Window window, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The old value
Object window_set_var(Window window, String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@@ -234,6 +238,7 @@ Object window_get_option(Window window, String name, Error *err)
/// @param value The option value
/// @param[out] err Details of an error that may have occurred
void window_set_option(Window window, String name, Object value, Error *err)
+ FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index e3bd37a03f..3fc4104258 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -18,6 +18,8 @@
#include <stdbool.h>
#include <math.h>
+#include "nvim/lib/klist.h"
+
#include "nvim/assert.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@@ -81,11 +83,12 @@
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/time.h"
-#include "nvim/os/channel.h"
+#include "nvim/msgpack_rpc/channel.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/os/dl.h"
#include "nvim/os/provider.h"
+#include "nvim/os/event.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@@ -443,6 +446,16 @@ static dictitem_T vimvars_var; /* variable used for v: */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
+// Memory pool for reusing JobEvent structures
+typedef struct {
+ Job *job;
+ RStream *rstream;
+ char *type;
+} JobEvent;
+#define JobEventFreer(x)
+KMEMPOOL_INIT(JobEventPool, JobEvent, JobEventFreer)
+kmempool_t(JobEventPool) *job_event_pool = NULL;
+
/*
* Initialize the global and v: variables.
*/
@@ -478,6 +491,7 @@ void eval_init(void)
set_vim_var_nr(VV_HLSEARCH, 1L);
set_reg_var(0); /* default for v:register is not 0 but '"' */
+ job_event_pool = kmp_init(JobEventPool);
}
#if defined(EXITFREE) || defined(PROTO)
@@ -19508,35 +19522,55 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
return ret;
}
+// JobActivity autocommands will execute vimscript code, so it must be executed
+// on Nvim main loop
+#define push_job_event(j, r, t) \
+ do { \
+ JobEvent *event_data = kmp_alloc(JobEventPool, job_event_pool); \
+ event_data->job = j; \
+ event_data->rstream = r; \
+ event_data->type = t; \
+ event_push((Event) { \
+ .handler = on_job_event, \
+ .data = event_data \
+ }); \
+ } while(0)
+
static void on_job_stdout(RStream *rstream, void *data, bool eof)
{
if (!eof) {
- on_job_data(rstream, data, eof, "stdout");
+ push_job_event(data, rstream, "stdout");
}
}
static void on_job_stderr(RStream *rstream, void *data, bool eof)
{
if (!eof) {
- on_job_data(rstream, data, eof, "stderr");
+ push_job_event(data, rstream, "stderr");
}
}
static void on_job_exit(Job *job, void *data)
{
- apply_job_autocmds(job, data, "exit", NULL);
- free(data);
+ push_job_event(data, NULL, "exit");
}
-static void on_job_data(RStream *rstream, void *data, bool eof, char *type)
+static void on_job_event(Event event)
{
- Job *job = data;
- uint32_t read_count = rstream_pending(rstream);
- char *str = xmalloc(read_count + 1);
+ JobEvent *data = event.data;
+ Job *job = data->job;
+ char *str = NULL;
+
+ if (data->rstream) {
+ // Read event
+ size_t read_count = rstream_pending(data->rstream);
+ str = xmalloc(read_count + 1);
- rstream_read(rstream, str, read_count);
- str[read_count] = NUL;
- apply_job_autocmds(job, job_data(job), type, str);
+ rstream_read(data->rstream, str, read_count);
+ str[read_count] = NUL;
+ }
+ apply_job_autocmds(job, job_data(job), data->type, str);
+ kmp_free(JobEventPool, job_event_pool, data);
}
static void apply_job_autocmds(Job *job, char *name, char *type, char *str)
diff --git a/src/nvim/func_attr.h b/src/nvim/func_attr.h
index c75d0ab312..519f61c763 100644
--- a/src/nvim/func_attr.h
+++ b/src/nvim/func_attr.h
@@ -179,6 +179,7 @@
#endif
#ifdef DEFINE_FUNC_ATTRIBUTES
+ #define FUNC_ATTR_DEFERRED
#define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC
#define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x)
#define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y)
diff --git a/src/nvim/lib/klist.h b/src/nvim/lib/klist.h
index e4a90fef33..f8dc7d4c43 100644
--- a/src/nvim/lib/klist.h
+++ b/src/nvim/lib/klist.h
@@ -39,6 +39,8 @@
static inline kmp_##name##_t *kmp_init_##name(void) { \
return xcalloc(1, sizeof(kmp_##name##_t)); \
} \
+ static inline void kmp_destroy_##name(kmp_##name##_t *mp) \
+ REAL_FATTR_UNUSED; \
static inline void kmp_destroy_##name(kmp_##name##_t *mp) { \
size_t k; \
for (k = 0; k < mp->n; ++k) { \
diff --git a/src/nvim/main.c b/src/nvim/main.c
index 128d1a784c..a63ffb4a31 100644
--- a/src/nvim/main.c
+++ b/src/nvim/main.c
@@ -59,7 +59,7 @@
#include "nvim/os/input.h"
#include "nvim/os/os.h"
#include "nvim/os/signal.h"
-#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/api/private/helpers.h"
diff --git a/src/nvim/map.c b/src/nvim/map.c
index 24aa38d67d..3f485cb952 100644
--- a/src/nvim/map.c
+++ b/src/nvim/map.c
@@ -6,7 +6,7 @@
#include "nvim/map_defs.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
-#include "nvim/os/msgpack_rpc.h"
+#include "nvim/msgpack_rpc/defs.h"
#include "nvim/lib/khash.h"
@@ -108,4 +108,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER)
MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER)
-MAP_IMPL(String, rpc_method_handler_fn, DEFAULT_INITIALIZER)
+#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false}
+MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER)
diff --git a/src/nvim/map.h b/src/nvim/map.h
index 616516c3e1..5ade6dcf15 100644
--- a/src/nvim/map.h
+++ b/src/nvim/map.h
@@ -5,7 +5,7 @@
#include "nvim/map_defs.h"
#include "nvim/api/private/defs.h"
-#include "nvim/os/msgpack_rpc.h"
+#include "nvim/msgpack_rpc/defs.h"
#define MAP_DECLS(T, U) \
KHASH_DECLARE(T##_##U##_map, T, U) \
@@ -25,7 +25,7 @@ MAP_DECLS(cstr_t, uint64_t)
MAP_DECLS(cstr_t, ptr_t)
MAP_DECLS(ptr_t, ptr_t)
MAP_DECLS(uint64_t, ptr_t)
-MAP_DECLS(String, rpc_method_handler_fn)
+MAP_DECLS(String, MsgpackRpcRequestHandler)
#define map_new(T, U) map_##T##_##U##_new
#define map_free(T, U) map_##T##_##U##_free
diff --git a/src/nvim/os/channel.c b/src/nvim/msgpack_rpc/channel.c
index 959fbc6e73..a1ab12f7c3 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -5,9 +5,11 @@
#include <uv.h>
#include <msgpack.h>
+#include "nvim/lib/klist.h"
+
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
-#include "nvim/os/channel.h"
+#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
@@ -15,8 +17,7 @@
#include "nvim/os/wstream_defs.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
-#include "nvim/os/msgpack_rpc.h"
-#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
@@ -32,14 +33,14 @@
typedef struct {
uint64_t request_id;
- bool errored;
+ bool returned, errored;
Object result;
} ChannelCallFrame;
typedef struct {
uint64_t id;
PMap(cstr_t) *subscribed_events;
- bool is_job, enabled;
+ bool is_job, closed;
msgpack_unpacker *unpacker;
union {
Job *job;
@@ -51,21 +52,32 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
- size_t rpc_call_level;
} Channel;
+typedef struct {
+ Channel *channel;
+ MsgpackRpcRequestHandler handler;
+ Array args;
+ uint64_t request_id;
+} RequestEvent;
+
+#define RequestEventFreer(x)
+KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer)
+kmempool_t(RequestEventPool) *request_event_pool = NULL;
+
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/channel.c.generated.h"
+# include "msgpack_rpc/channel.c.generated.h"
#endif
/// Initializes the module
void channel_init(void)
{
+ request_event_pool = kmp_init(RequestEventPool);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@@ -104,12 +116,12 @@ uint64_t channel_from_job(char **argv)
channel,
job_out,
job_err,
- NULL,
+ job_exit,
0,
&status);
if (status <= 0) {
- close_channel(channel);
+ free_channel(channel);
return 0;
}
@@ -128,8 +140,7 @@ void channel_from_stream(uv_stream_t *stream)
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel,
- NULL);
+ channel);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
@@ -142,7 +153,7 @@ bool channel_exists(uint64_t id)
{
Channel *channel;
return (channel = pmap_get(uint64_t)(channels, id)) != NULL
- && channel->enabled;
+ && !channel->closed;
}
/// Sends event/arguments to channel
@@ -157,7 +168,7 @@ bool channel_send_event(uint64_t id, char *name, Array args)
Channel *channel = NULL;
if (id > 0) {
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
api_free_array(args);
return false;
}
@@ -183,7 +194,7 @@ Object channel_send_call(uint64_t id,
{
Channel *channel = NULL;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
api_free_array(args);
return NIL;
@@ -203,22 +214,11 @@ Object channel_send_call(uint64_t id,
// Send the msgpack-rpc request
send_request(channel, request_id, method_name, args);
- EventSource channel_source = channel->is_job
- ? job_event_source(channel->data.job)
- : rstream_event_source(channel->data.streams.read);
- EventSource sources[] = {channel_source, NULL};
-
// Push the frame
- ChannelCallFrame frame = {request_id, false, NIL};
+ ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
- size_t size = kv_size(channel->call_stack);
-
- do {
- event_poll(-1, sources);
- } while (
- // Continue running if ...
- channel->enabled && // the channel is still enabled
- kv_size(channel->call_stack) >= size); // the call didn't return
+ event_poll_until(-1, frame.returned);
+ (void)kv_pop(channel->call_stack);
if (frame.errored) {
api_set_error(err, Exception, "%s", frame.result.data.string.data);
@@ -236,7 +236,7 @@ void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
abort();
}
@@ -258,7 +258,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
abort();
}
@@ -273,12 +273,11 @@ bool channel_close(uint64_t id)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
return false;
}
- channel_kill(channel);
- channel->enabled = false;
+ close_channel(channel);
return true;
}
@@ -291,8 +290,7 @@ static void channel_from_stdio(void)
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel,
- NULL);
+ channel);
rstream_set_file(channel->data.streams.read, 0);
rstream_start(channel->data.streams.read);
// write stream
@@ -320,23 +318,20 @@ static void job_err(RStream *rstream, void *data, bool eof)
}
}
+static void job_exit(Job *job, void *data)
+{
+ free_channel((Channel *)data);
+}
+
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
Channel *channel = data;
- channel->rpc_call_level++;
if (eof) {
- char buf[256];
- snprintf(buf,
- sizeof(buf),
- "Before returning from a RPC call, channel %" PRIu64 " was "
- "closed by the client",
- channel->id);
- call_set_error(channel, buf);
goto end;
}
- uint32_t count = rstream_pending(rstream);
+ size_t count = rstream_pending(rstream);
DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
count,
rstream);
@@ -355,7 +350,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
MSGPACK_UNPACK_SUCCESS) {
if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
if (is_valid_rpc_response(&unpacked.data, channel)) {
- call_stack_pop(&unpacked.data, channel);
+ complete_call(&unpacked.data, channel);
} else {
char buf[256];
snprintf(buf,
@@ -371,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
goto end;
}
- // Perform the call
- WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
- // write the response
- if (!channel_write(channel, resp)) {
- goto end;
- }
+ handle_request(channel, &unpacked.data);
}
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
@@ -398,13 +388,92 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
}
end:
- channel->rpc_call_level--;
- if (!channel->enabled && !kv_size(channel->call_stack)) {
- // Now it's safe to destroy the channel
- close_channel(channel);
+ if (eof && !channel->is_job && !kv_size(channel->call_stack)) {
+ // The free_channel call is deferred for jobs because it's possible that
+ // job_stderr will called after this. For non-job channels, this is the
+ // last callback so it must be freed now.
+ free_channel(channel);
}
}
+static void handle_request(Channel *channel, msgpack_object *request)
+ FUNC_ATTR_NONNULL_ALL
+{
+ uint64_t request_id;
+ Error error = ERROR_INIT;
+ msgpack_rpc_validate(&request_id, request, &error);
+
+ if (error.set) {
+ // Validation failed, send response with error
+ channel_write(channel,
+ serialize_response(request_id, &error, NIL, &out_buffer));
+ return;
+ }
+
+ // Retrieve the request handler
+ MsgpackRpcRequestHandler handler;
+ msgpack_object method = request->via.array.ptr[2];
+
+ if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
+ handler = msgpack_rpc_get_handler_for(method.via.bin.ptr,
+ method.via.bin.size);
+ } else {
+ handler.fn = msgpack_rpc_handle_missing_method;
+ handler.defer = false;
+ }
+
+ Array args;
+ msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
+
+ if (kv_size(channel->call_stack) || !handler.defer) {
+ call_request_handler(channel, handler, args, request_id);
+ return;
+ }
+
+ // Defer calling the request handler.
+ RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
+ event_data->channel = channel;
+ event_data->handler = handler;
+ event_data->args = args;
+ event_data->request_id = request_id;
+ event_push((Event) {
+ .handler = on_request_event,
+ .data = event_data
+ });
+}
+
+static void on_request_event(Event event)
+{
+ RequestEvent *e = event.data;
+ call_request_handler(e->channel, e->handler, e->args, e->request_id);
+ kmp_free(RequestEventPool, request_event_pool, e);
+}
+
+static void call_request_handler(Channel *channel,
+ MsgpackRpcRequestHandler handler,
+ Array args,
+ uint64_t request_id)
+{
+ Error error = ERROR_INIT;
+ Object result = handler.fn(channel->id, request_id, args, &error);
+ // send the response
+ msgpack_packer response;
+ msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
+
+ if (error.set) {
+ ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
+ error.msg,
+ request_id);
+ channel_write(channel,
+ serialize_response(request_id, &error, NIL, &out_buffer));
+ }
+
+ DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
+ request_id);
+ channel_write(channel,
+ serialize_response(request_id, &error, result, &out_buffer));
+}
+
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;
@@ -501,26 +570,11 @@ static void unsubscribe(Channel *channel, char *event)
free(event_string);
}
+/// Close the channel streams/job. The channel resources will be freed by
+/// free_channel later.
static void close_channel(Channel *channel)
{
- pmap_del(uint64_t)(channels, channel->id);
- msgpack_unpacker_free(channel->unpacker);
-
- // Unsubscribe from all events
- char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
- unsubscribe(channel, event_string);
- });
-
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
- channel_kill(channel);
-
- free(channel);
-}
-
-static void channel_kill(Channel *channel)
-{
+ channel->closed = true;
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
@@ -537,6 +591,22 @@ static void channel_kill(Channel *channel)
}
}
+static void free_channel(Channel *channel)
+{
+ pmap_del(uint64_t)(channels, channel->id);
+ msgpack_unpacker_free(channel->unpacker);
+
+ // Unsubscribe from all events
+ char *event_string;
+ map_foreach_value(channel->subscribed_events, event_string, {
+ unsubscribe(channel, event_string);
+ });
+
+ pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
+ free(channel);
+}
+
static void close_cb(uv_handle_t *handle)
{
free(handle->data);
@@ -546,8 +616,7 @@ static void close_cb(uv_handle_t *handle)
static Channel *register_channel(void)
{
Channel *rv = xmalloc(sizeof(Channel));
- rv->enabled = true;
- rv->rpc_call_level = 0;
+ rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)();
@@ -574,9 +643,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
kv_size(channel->call_stack) - 1)->request_id;
}
-static void call_stack_pop(msgpack_object *obj, Channel *channel)
+static void complete_call(msgpack_object *obj, Channel *channel)
{
- ChannelCallFrame *frame = kv_pop(channel->call_stack);
+ ChannelCallFrame *frame = kv_A(channel->call_stack,
+ kv_size(channel->call_stack) - 1);
+ frame->returned = true;
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
if (frame->errored) {
@@ -589,10 +660,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
static void call_set_error(Channel *channel, char *msg)
{
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
- ChannelCallFrame *frame = kv_pop(channel->call_stack);
+ ChannelCallFrame *frame = kv_A(channel->call_stack, i);
+ frame->returned = true;
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
- channel->enabled = false;
+ close_channel(channel);
}
diff --git a/src/nvim/os/channel.h b/src/nvim/msgpack_rpc/channel.h
index bb409bfde9..df742fe368 100644
--- a/src/nvim/os/channel.h
+++ b/src/nvim/msgpack_rpc/channel.h
@@ -1,5 +1,5 @@
-#ifndef NVIM_OS_CHANNEL_H
-#define NVIM_OS_CHANNEL_H
+#ifndef NVIM_MSGPACK_RPC_CHANNEL_H
+#define NVIM_MSGPACK_RPC_CHANNEL_H
#include <stdbool.h>
#include <uv.h>
@@ -10,6 +10,6 @@
#define METHOD_MAXLEN 512
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/channel.h.generated.h"
+# include "msgpack_rpc/channel.h.generated.h"
#endif
-#endif // NVIM_OS_CHANNEL_H
+#endif // NVIM_MSGPACK_RPC_CHANNEL_H
diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/msgpack_rpc/defs.h
index 3476d791ea..13067fb7b4 100644
--- a/src/nvim/os/msgpack_rpc.h
+++ b/src/nvim/msgpack_rpc/defs.h
@@ -1,29 +1,23 @@
-#ifndef NVIM_OS_MSGPACK_RPC_H
-#define NVIM_OS_MSGPACK_RPC_H
-
-#include <stdint.h>
+#ifndef NVIM_MSGPACK_RPC_DEFS_H
+#define NVIM_MSGPACK_RPC_DEFS_H
#include <msgpack.h>
-#include "nvim/func_attr.h"
-#include "nvim/api/private/defs.h"
-#include "nvim/os/wstream.h"
-
-typedef enum {
- kUnpackResultOk, /// Successfully parsed a document
- kUnpackResultFail, /// Got unexpected input
- kUnpackResultNeedMore /// Need more data
-} UnpackResult;
/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores
/// functions of this type.
-typedef Object (*rpc_method_handler_fn)(uint64_t channel_id,
- msgpack_object *req,
- Error *error);
-
+typedef struct {
+ Object (*fn)(uint64_t channel_id,
+ uint64_t request_id,
+ Array args,
+ Error *error);
+ bool defer; // Should the call be deferred to the main loop? This should
+ // be true if the function mutates editor data structures such
+ // as buffers, windows, tabs, or if it executes vimscript code.
+} MsgpackRpcRequestHandler;
/// Initializes the msgpack-rpc method table
-void msgpack_rpc_init(void);
+void msgpack_rpc_init_method_table(void);
void msgpack_rpc_init_function_metadata(Dictionary *metadata);
@@ -43,9 +37,7 @@ Object msgpack_rpc_dispatch(uint64_t channel_id,
Error *error)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc.h.generated.h"
-#endif
-
-#endif // NVIM_OS_MSGPACK_RPC_H
+MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name,
+ size_t name_len)
+ FUNC_ATTR_NONNULL_ARG(1);
+#endif // NVIM_MSGPACK_RPC_DEFS_H
diff --git a/src/nvim/os/msgpack_rpc_helpers.c b/src/nvim/msgpack_rpc/helpers.c
index b14de8245c..6be221b912 100644
--- a/src/nvim/os/msgpack_rpc_helpers.c
+++ b/src/nvim/msgpack_rpc/helpers.c
@@ -1,14 +1,18 @@
#include <stdint.h>
#include <stdbool.h>
+#include <inttypes.h>
#include <msgpack.h>
-#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/api/private/helpers.h"
+#include "nvim/msgpack_rpc/helpers.h"
+#include "nvim/msgpack_rpc/defs.h"
#include "nvim/vim.h"
+#include "nvim/log.h"
#include "nvim/memory.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc_helpers.c.generated.h"
+# include "msgpack_rpc/helpers.c.generated.h"
#endif
static msgpack_zone zone;
@@ -136,10 +140,13 @@ bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
case MSGPACK_OBJECT_EXT:
switch (obj->via.ext.type) {
case kObjectTypeBuffer:
+ arg->type = kObjectTypeBuffer;
return msgpack_rpc_to_buffer(obj, &arg->data.buffer);
case kObjectTypeWindow:
+ arg->type = kObjectTypeWindow;
return msgpack_rpc_to_window(obj, &arg->data.window);
case kObjectTypeTabpage:
+ arg->type = kObjectTypeTabpage;
return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage);
}
default:
@@ -287,3 +294,136 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
msgpack_rpc_from_object(result.items[i].value, res);
}
}
+
+/// Finishes the msgpack-rpc call with an error message.
+///
+/// @param msg The error message
+/// @param res A packer that contains the response
+void msgpack_rpc_error(char *msg, msgpack_packer *res)
+ FUNC_ATTR_NONNULL_ALL
+{
+ size_t len = strlen(msg);
+
+ // error message
+ msgpack_pack_bin(res, len);
+ msgpack_pack_bin_body(res, msg, len);
+ // Nil result
+ msgpack_pack_nil(res);
+}
+
+/// Handler executed when an invalid method name is passed
+Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
+ uint64_t request_id,
+ Array args,
+ Error *error)
+{
+ snprintf(error->msg, sizeof(error->msg), "Invalid method name");
+ error->set = true;
+ return NIL;
+}
+
+/// Serializes a msgpack-rpc request or notification(id == 0)
+WBuffer *serialize_request(uint64_t request_id,
+ String method,
+ Array args,
+ msgpack_sbuffer *sbuffer,
+ size_t refcount)
+ FUNC_ATTR_NONNULL_ARG(4)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, request_id ? 4 : 3);
+ msgpack_pack_int(&pac, request_id ? 0 : 2);
+
+ if (request_id) {
+ msgpack_pack_uint64(&pac, request_id);
+ }
+
+ msgpack_pack_bin(&pac, method.size);
+ msgpack_pack_bin_body(&pac, method.data, method.size);
+ msgpack_rpc_from_array(args, &pac);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ refcount,
+ free);
+ api_free_array(args);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+/// Serializes a msgpack-rpc response
+WBuffer *serialize_response(uint64_t response_id,
+ Error *err,
+ Object arg,
+ msgpack_sbuffer *sbuffer)
+ FUNC_ATTR_NONNULL_ARG(2, 4)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&pac, 4);
+ msgpack_pack_int(&pac, 1);
+ msgpack_pack_uint64(&pac, response_id);
+
+ if (err->set) {
+ // error represented by a [type, message] array
+ msgpack_pack_array(&pac, 2);
+ msgpack_rpc_from_integer(err->type, &pac);
+ msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
+ // Nil result
+ msgpack_pack_nil(&pac);
+ } else {
+ // Nil error
+ msgpack_pack_nil(&pac);
+ // Return value
+ msgpack_rpc_from_object(arg, &pac);
+ }
+
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ 1, // responses only go though 1 channel
+ free);
+ api_free_object(arg);
+ msgpack_sbuffer_clear(sbuffer);
+ return rv;
+}
+
+void msgpack_rpc_validate(uint64_t *response_id,
+ msgpack_object *req,
+ Error *err)
+{
+ // response id not known yet
+
+ *response_id = 0;
+ // Validate the basic structure of the msgpack-rpc payload
+ if (req->type != MSGPACK_OBJECT_ARRAY) {
+ api_set_error(err, Validation, _("Request is not an array"));
+ }
+
+ if (req->via.array.size != 4) {
+ api_set_error(err, Validation, _("Request array size should be 4"));
+ }
+
+ if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ api_set_error(err, Validation, _("Id must be a positive integer"));
+ }
+
+ // Set the response id, which is the same as the request
+ *response_id = req->via.array.ptr[1].via.u64;
+
+ if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ api_set_error(err, Validation, _("Message type must be an integer"));
+ }
+
+ if (req->via.array.ptr[0].via.u64 != 0) {
+ api_set_error(err, Validation, _("Message type must be 0"));
+ }
+
+ if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
+ && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
+ api_set_error(err, Validation, _("Method must be a string"));
+ }
+
+ if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
+ api_set_error(err, Validation, _("Paremeters must be an array"));
+ }
+}
diff --git a/src/nvim/msgpack_rpc/helpers.h b/src/nvim/msgpack_rpc/helpers.h
new file mode 100644
index 0000000000..bf161d54e0
--- /dev/null
+++ b/src/nvim/msgpack_rpc/helpers.h
@@ -0,0 +1,17 @@
+#ifndef NVIM_MSGPACK_RPC_HELPERS_H
+#define NVIM_MSGPACK_RPC_HELPERS_H
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include <msgpack.h>
+
+#include "nvim/os/wstream.h"
+#include "nvim/api/private/defs.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/helpers.h.generated.h"
+#endif
+
+#endif // NVIM_MSGPACK_RPC_HELPERS_H
+
diff --git a/src/nvim/os/server.c b/src/nvim/msgpack_rpc/server.c
index 9f7f5b34da..087ba24111 100644
--- a/src/nvim/os/server.c
+++ b/src/nvim/msgpack_rpc/server.c
@@ -5,8 +5,8 @@
#include <uv.h>
-#include "nvim/os/channel.h"
-#include "nvim/os/server.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
#include "nvim/os/os.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
@@ -46,7 +46,7 @@ typedef struct {
static PMap(cstr_t) *servers = NULL;
#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/server.c.generated.h"
+# include "msgpack_rpc/server.c.generated.h"
#endif
/// Initializes the module
@@ -119,7 +119,8 @@ int server_start(const char *endpoint)
ip_end = strchr(addr, NUL);
}
- uint32_t addr_len = ip_end - addr;
+ // (ip_end - addr) is always > 0, so convert to size_t
+ size_t addr_len = (size_t)(ip_end - addr);
if (addr_len > sizeof(ip) - 1) {
// Maximum length of an IP address buffer is 15(eg: 255.255.255.255)
diff --git a/src/nvim/msgpack_rpc/server.h b/src/nvim/msgpack_rpc/server.h
new file mode 100644
index 0000000000..f1a6703938
--- /dev/null
+++ b/src/nvim/msgpack_rpc/server.h
@@ -0,0 +1,7 @@
+#ifndef NVIM_MSGPACK_RPC_SERVER_H
+#define NVIM_MSGPACK_RPC_SERVER_H
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "msgpack_rpc/server.h.generated.h"
+#endif
+#endif // NVIM_MSGPACK_RPC_SERVER_H
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index a460b2db96..2dee529452 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -7,8 +7,10 @@
#include "nvim/os/event.h"
#include "nvim/os/input.h"
-#include "nvim/os/channel.h"
-#include "nvim/os/server.h"
+#include "nvim/msgpack_rpc/defs.h"
+#include "nvim/msgpack_rpc/channel.h"
+#include "nvim/msgpack_rpc/server.h"
+#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/os/provider.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream.h"
@@ -25,25 +27,22 @@ KLIST_INIT(Event, Event, _destroy_event)
typedef struct {
bool timed_out;
- int32_t ms;
+ int ms;
uv_timer_t *timer;
} TimerData;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
-static klist_t(Event) *deferred_events, *immediate_events;
-// NULL-terminated array of event sources that we should process immediately.
-//
-// Events from sources that are not contained in this array are processed
-// later when `event_process` is called
-static EventSource *immediate_sources = NULL;
+static klist_t(Event) *pending_events;
void event_init(void)
{
+ // early msgpack-rpc initialization
+ msgpack_rpc_init_method_table();
+ msgpack_rpc_helpers_init();
// Initialize the event queues
- deferred_events = kl_init(Event);
- immediate_events = kl_init(Event);
+ pending_events = kl_init(Event);
// Initialize input events
input_init();
// Timer to wake the event loop if a timeout argument is passed to
@@ -52,9 +51,8 @@ void event_init(void)
signal_init();
// Jobs
job_init();
- // Channels
+ // finish mspgack-rpc initialization
channel_init();
- // Servers
server_init();
// Providers
provider_init();
@@ -68,8 +66,7 @@ void event_teardown(void)
}
// Wait for some event
-bool event_poll(int32_t ms, EventSource sources[])
- FUNC_ATTR_NONNULL_ARG(2)
+void event_poll(int ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
@@ -100,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[])
run_mode = UV_RUN_NOWAIT;
}
- size_t processed_events;
-
- do {
- // Run one event loop iteration, blocking for events if run_mode is
- // UV_RUN_ONCE
- processed_events = loop(run_mode, sources);
- } while (
- // Continue running if ...
- !processed_events && // we didn't process any immediate events
- !event_has_deferred() && // no events are waiting to be processed
- run_mode != UV_RUN_NOWAIT && // ms != 0
- !timer_data.timed_out); // we didn't get a timeout
+ loop(run_mode);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
@@ -123,68 +109,29 @@ bool event_poll(int32_t ms, EventSource sources[])
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
- processed_events += loop(UV_RUN_NOWAIT, sources);
+ loop(UV_RUN_NOWAIT);
}
-
- return !timer_data.timed_out && (processed_events || event_has_deferred());
}
bool event_has_deferred(void)
{
- return !kl_empty(deferred_events);
+ return !kl_empty(pending_events);
}
// Queue an event
void event_push(Event event)
{
- bool defer = true;
-
- if (immediate_sources) {
- size_t i;
- EventSource src;
-
- for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
- if (src == event.source) {
- defer = false;
- break;
- }
- }
- }
-
- *kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
+ *kl_pushp(Event, pending_events) = event;
}
-void event_process(void)
-{
- process_from(deferred_events);
-}
-// Runs the appropriate action for each queued event
-static size_t process_from(klist_t(Event) *queue)
+void event_process(void)
{
- size_t count = 0;
Event event;
- while (kl_shift(Event, queue, &event) == 0) {
- switch (event.type) {
- case kEventSignal:
- signal_handle(event);
- break;
- case kEventRStreamData:
- rstream_read_event(event);
- break;
- case kEventJobExit:
- job_exit_event(event);
- break;
- default:
- abort();
- }
- count++;
+ while (kl_shift(Event, pending_events, &event) == 0) {
+ event.handler(event);
}
-
- DLOG("Processed %u events", count);
-
- return count;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
@@ -202,42 +149,9 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle);
}
-static void requeue_deferred_events(void)
+static void loop(uv_run_mode run_mode)
{
- size_t remaining = deferred_events->size;
-
- DLOG("Number of deferred events: %u", remaining);
-
- while (remaining--) {
- // Re-push each deferred event to ensure it will be in the right queue
- Event event;
- kl_shift(Event, deferred_events, &event);
- event_push(event);
- DLOG("Re-queueing event");
- }
-
- DLOG("Number of deferred events: %u", deferred_events->size);
-}
-
-static size_t loop(uv_run_mode run_mode, EventSource *sources)
-{
- size_t count;
- immediate_sources = sources;
- // It's possible that some events from the immediate sources are waiting
- // in the deferred queue. If so, move them to the immediate queue so they
- // will be processed in order of arrival by the next `process_from` call.
- requeue_deferred_events();
- count = process_from(immediate_events);
-
- if (count) {
- // No need to enter libuv, events were already processed
- return count;
- }
-
DLOG("Enter event loop");
uv_run(uv_default_loop(), run_mode);
DLOG("Exit event loop");
- immediate_sources = NULL;
- count = process_from(immediate_events);
- return count;
}
diff --git a/src/nvim/os/event.h b/src/nvim/os/event.h
index 29e304adc8..f8139e978d 100644
--- a/src/nvim/os/event.h
+++ b/src/nvim/os/event.h
@@ -6,6 +6,27 @@
#include "nvim/os/event_defs.h"
#include "nvim/os/job_defs.h"
+#include "nvim/os/time.h"
+
+// Poll for events until a condition is true or a timeout has passed
+#define event_poll_until(timeout, condition) \
+ do { \
+ int remaining = timeout; \
+ uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
+ while (!(condition)) { \
+ event_poll(remaining); \
+ if (remaining == 0) { \
+ break; \
+ } else if (remaining > 0) { \
+ uint64_t now = os_hrtime(); \
+ remaining -= (int) ((now - before) / 1000000); \
+ before = now; \
+ if (remaining <= 0) { \
+ break; \
+ } \
+ } \
+ } \
+ } while (0)
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.h.generated.h"
diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h
index dbee3e2ba7..2dd9403d9f 100644
--- a/src/nvim/os/event_defs.h
+++ b/src/nvim/os/event_defs.h
@@ -6,25 +6,12 @@
#include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h"
-typedef void * EventSource;
+typedef struct event Event;
+typedef void (*event_handler)(Event event);
-typedef enum {
- kEventSignal,
- kEventRStreamData,
- kEventJobExit
-} EventType;
-
-typedef struct {
- EventSource source;
- EventType type;
- union {
- int signum;
- struct {
- RStream *ptr;
- bool eof;
- } rstream;
- Job *job;
- } data;
-} Event;
+struct event {
+ void *data;
+ event_handler handler;
+};
#endif // NVIM_OS_EVENT_DEFS_H
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index a18d735ce6..b7eba47d5e 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -7,7 +7,6 @@
#include "nvim/api/private/defs.h"
#include "nvim/os/input.h"
#include "nvim/os/event.h"
-#include "nvim/os/signal.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/ascii.h"
@@ -48,10 +47,7 @@ void input_init(void)
}
read_buffer = rbuffer_new(READ_BUFFER_SIZE);
- read_stream = rstream_new(read_cb,
- read_buffer,
- NULL,
- NULL);
+ read_stream = rstream_new(read_cb, read_buffer, NULL);
rstream_set_file(read_stream, read_cmd_fd);
}
@@ -76,7 +72,7 @@ void input_stop(void)
}
// Low level input function.
-int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
+int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
{
InbufPollResult result;
@@ -90,7 +86,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
return 0;
}
} else {
- if ((result = inbuf_poll(p_ut)) == kInputNone) {
+ if ((result = inbuf_poll((int)p_ut)) == kInputNone) {
if (trigger_cursorhold() && maxlen >= 3
&& !typebuf_changed(tb_change_cnt)) {
buf[0] = K_SPECIAL;
@@ -120,7 +116,9 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
}
convert_input();
- return rbuffer_read(input_buffer, (char *)buf, maxlen);
+ // Safe to convert rbuffer_read to int, it will never overflow since
+ // we use relatively small buffers.
+ return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen);
}
// Check if a character is available for reading
@@ -167,23 +165,14 @@ void input_buffer_restore(String str)
free(str.data);
}
-static bool input_poll(int32_t ms)
+static bool input_poll(int ms)
{
- if (embedded_mode) {
- EventSource input_sources[] = { signal_event_source(), NULL };
- return event_poll(ms, input_sources);
- }
-
- EventSource input_sources[] = {
- rstream_event_source(read_stream),
- NULL
- };
-
- return input_ready() || event_poll(ms, input_sources) || input_ready();
+ event_poll_until(ms, input_ready());
+ return input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c
-static InbufPollResult inbuf_poll(int32_t ms)
+static InbufPollResult inbuf_poll(int ms)
{
if (typebuf_was_filled || rbuffer_pending(input_buffer)) {
return kInputAvail;
@@ -235,7 +224,7 @@ static void read_cb(RStream *rstream, void *data, bool at_eof)
static void convert_input(void)
{
- if (!rbuffer_available(input_buffer)) {
+ if (embedded_mode || !rbuffer_available(input_buffer)) {
// No input buffer space
return;
}
@@ -273,9 +262,9 @@ static void convert_input(void)
char *inbuf = rbuffer_read_ptr(input_buffer);
size_t count = rbuffer_pending(input_buffer), consume_count = 0;
- for (int i = count - 1; i >= 0; i--) {
+ for (int i = (int)count - 1; i >= 0; i--) {
if (inbuf[i] == 3) {
- consume_count = i + 1;
+ consume_count = (size_t)i;
break;
}
}
@@ -304,6 +293,10 @@ static int push_event_key(uint8_t *buf, int maxlen)
// Check if there's pending input
static bool input_ready(void)
{
- return rstream_pending(read_stream) > 0 || eof;
+ return typebuf_was_filled || // API call filled typeahead
+ event_has_deferred() || // Events must be processed
+ (!embedded_mode && (
+ rstream_pending(read_stream) > 0 || // Stdin input
+ eof)); // Stdin closed
}
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index 2ca1023290..4c01829159 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -12,9 +12,7 @@
#include "nvim/os/wstream_defs.h"
#include "nvim/os/event.h"
#include "nvim/os/event_defs.h"
-#include "nvim/os/time.h"
#include "nvim/os/shell.h"
-#include "nvim/os/signal.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/term.h"
@@ -99,25 +97,28 @@ void job_teardown(void)
// their status with `wait` or handling SIGCHLD. libuv does that
// automatically (and then calls `exit_cb`) but we have to give it a chance
// by running the loop one more time
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_poll(0);
// Prepare to start shooting
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL) {
- continue;
- }
+ job = table[i];
// Still alive
- while (is_alive(job) && remaining_tries--) {
+ while (job && is_alive(job) && remaining_tries--) {
os_delay(50, 0);
// Acknowledge child exits
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_poll(0);
+ // It's possible that the event_poll call removed the job from the table,
+ // reset 'job' so the next iteration won't run in that case.
+ job = table[i];
}
- if (is_alive(job)) {
+ if (job && is_alive(job)) {
uv_process_kill(&job->proc, SIGKILL);
}
}
+ // Last run to ensure all children were removed
+ event_poll(0);
}
/// Tries to start a new job.
@@ -213,14 +214,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
- job->out = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
- job->err = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
+ job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
+ job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -277,47 +272,33 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
int old_mode = cur_tmode;
settmode(TMODE_COOK);
- EventSource sources[] = {job_event_source(job), signal_event_source(), NULL};
-
- // keep track of the elapsed time if ms > 0
- uint64_t before = (ms > 0) ? os_hrtime() : 0;
-
- while (1) {
- // check if the job has exited (and the status is available).
- if (job->pending_refs == 0) {
- break;
- }
-
- event_poll(ms, sources);
-
- // we'll assume that a user frantically hitting interrupt doesn't like
- // the current job. Signal that it has to be killed.
- if (got_int) {
- job_stop(job);
- }
-
- if (ms == 0) {
- break;
- }
-
- // check if the poll timed out, if not, decrease the ms to wait for the
- // next run
- if (ms > 0) {
- uint64_t now = os_hrtime();
- ms -= (int) ((now - before) / 1000000);
- before = now;
-
- // if the time elapsed is greater than the `ms` wait time, break
- if (ms <= 0) {
- break;
- }
- }
+ // Increase pending_refs to stop the exit_cb from being called, which
+ // could result in the job being freed before we have a chance
+ // to get the status.
+ job->pending_refs++;
+ event_poll_until(ms,
+ // Until...
+ got_int || // interrupted by the user
+ job->pending_refs == 1); // job exited
+ job->pending_refs--;
+
+ // we'll assume that a user frantically hitting interrupt doesn't like
+ // the current job. Signal that it has to be killed.
+ if (got_int) {
+ job_stop(job);
+ event_poll(0);
}
settmode(old_mode);
- // return -1 for a timeout, the job status otherwise
- return (job->pending_refs) ? -1 : (int) job->status;
+ if (!job->pending_refs) {
+ int status = (int) job->status;
+ job_exit_callback(job);
+ return status;
+ }
+
+ // return -1 for a timeout
+ return -1;
}
/// Close the pipe used to write to the job.
@@ -369,14 +350,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
-/// Runs the read callback associated with the job exit event
-///
-/// @param event Object containing data necessary to invoke the callback
-void job_exit_event(Event event)
-{
- job_exit_callback(event.data.job);
-}
-
/// Get the job id
///
/// @param job A pointer to the job
@@ -395,11 +368,6 @@ void *job_data(Job *job)
return job->data;
}
-EventSource job_event_source(Job *job)
-{
- return job;
-}
-
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@@ -470,7 +438,7 @@ static void read_cb(RStream *rstream, void *data, bool eof)
}
if (eof && --job->pending_refs == 0) {
- emit_exit_event(job);
+ job_exit_callback(job);
}
}
@@ -481,20 +449,10 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
job->status = status;
if (--job->pending_refs == 0) {
- emit_exit_event(job);
+ job_exit_callback(job);
}
}
-static void emit_exit_event(Job *job)
-{
- Event event = {
- .source = job_event_source(job),
- .type = kEventJobExit,
- .data.job = job
- };
- event_push(event);
-}
-
static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
deleted file mode 100644
index 55bc006ad1..0000000000
--- a/src/nvim/os/msgpack_rpc.c
+++ /dev/null
@@ -1,188 +0,0 @@
-#include <stdint.h>
-#include <stdbool.h>
-#include <inttypes.h>
-
-#include <msgpack.h>
-
-#include "nvim/vim.h"
-#include "nvim/log.h"
-#include "nvim/memory.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/msgpack_rpc.h"
-#include "nvim/os/msgpack_rpc_helpers.h"
-#include "nvim/api/private/helpers.h"
-#include "nvim/func_attr.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc.c.generated.h"
-#endif
-
-/// Validates the basic structure of the msgpack-rpc call and fills `res`
-/// with the basic response structure.
-///
-/// @param channel_id The channel id
-/// @param req The parsed request object
-/// @param res A packer that contains the response
-WBuffer *msgpack_rpc_call(uint64_t channel_id,
- msgpack_object *req,
- msgpack_sbuffer *sbuffer)
- FUNC_ATTR_NONNULL_ARG(2)
- FUNC_ATTR_NONNULL_ARG(3)
-{
- uint64_t response_id;
- Error error = ERROR_INIT;
- msgpack_rpc_validate(&response_id, req, &error);
-
- if (error.set) {
- return serialize_response(response_id, &error, NIL, sbuffer);
- }
-
- // dispatch the call
- Object rv = msgpack_rpc_dispatch(channel_id, req, &error);
- // send the response
- msgpack_packer response;
- msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
-
- if (error.set) {
- ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
- error.msg,
- response_id);
- return serialize_response(response_id, &error, NIL, sbuffer);
- }
-
- DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
- response_id);
- return serialize_response(response_id, &error, rv, sbuffer);
-}
-
-/// Finishes the msgpack-rpc call with an error message.
-///
-/// @param msg The error message
-/// @param res A packer that contains the response
-void msgpack_rpc_error(char *msg, msgpack_packer *res)
- FUNC_ATTR_NONNULL_ALL
-{
- size_t len = strlen(msg);
-
- // error message
- msgpack_pack_bin(res, len);
- msgpack_pack_bin_body(res, msg, len);
- // Nil result
- msgpack_pack_nil(res);
-}
-
-/// Handler executed when an invalid method name is passed
-Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
- msgpack_object *req,
- Error *error)
-{
- snprintf(error->msg, sizeof(error->msg), "Invalid method name");
- error->set = true;
- return NIL;
-}
-
-/// Serializes a msgpack-rpc request or notification(id == 0)
-WBuffer *serialize_request(uint64_t request_id,
- String method,
- Array args,
- msgpack_sbuffer *sbuffer,
- size_t refcount)
- FUNC_ATTR_NONNULL_ARG(4)
-{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&pac, request_id ? 4 : 3);
- msgpack_pack_int(&pac, request_id ? 0 : 2);
-
- if (request_id) {
- msgpack_pack_uint64(&pac, request_id);
- }
-
- msgpack_pack_bin(&pac, method.size);
- msgpack_pack_bin_body(&pac, method.data, method.size);
- msgpack_rpc_from_array(args, &pac);
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- refcount,
- free);
- api_free_array(args);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
-}
-
-/// Serializes a msgpack-rpc response
-WBuffer *serialize_response(uint64_t response_id,
- Error *err,
- Object arg,
- msgpack_sbuffer *sbuffer)
- FUNC_ATTR_NONNULL_ARG(2, 4)
-{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&pac, 4);
- msgpack_pack_int(&pac, 1);
- msgpack_pack_uint64(&pac, response_id);
-
- if (err->set) {
- // error represented by a [type, message] array
- msgpack_pack_array(&pac, 2);
- msgpack_rpc_from_integer(err->type, &pac);
- msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
- // Nil result
- msgpack_pack_nil(&pac);
- } else {
- // Nil error
- msgpack_pack_nil(&pac);
- // Return value
- msgpack_rpc_from_object(arg, &pac);
- }
-
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- 1, // responses only go though 1 channel
- free);
- api_free_object(arg);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
-}
-
-static void msgpack_rpc_validate(uint64_t *response_id,
- msgpack_object *req,
- Error *err)
-{
- // response id not known yet
-
- *response_id = 0;
- // Validate the basic structure of the msgpack-rpc payload
- if (req->type != MSGPACK_OBJECT_ARRAY) {
- api_set_error(err, Validation, _("Request is not an array"));
- }
-
- if (req->via.array.size != 4) {
- api_set_error(err, Validation, _("Request array size should be 4"));
- }
-
- if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Id must be a positive integer"));
- }
-
- // Set the response id, which is the same as the request
- *response_id = req->via.array.ptr[1].via.u64;
-
- if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
- api_set_error(err, Validation, _("Message type must be an integer"));
- }
-
- if (req->via.array.ptr[0].via.u64 != 0) {
- api_set_error(err, Validation, _("Message type must be 0"));
- }
-
- if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
- && req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
- api_set_error(err, Validation, _("Method must be a string"));
- }
-
- if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
- api_set_error(err, Validation, _("Paremeters must be an array"));
- }
-}
diff --git a/src/nvim/os/msgpack_rpc_helpers.h b/src/nvim/os/msgpack_rpc_helpers.h
deleted file mode 100644
index aede6b1587..0000000000
--- a/src/nvim/os/msgpack_rpc_helpers.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H
-#define NVIM_OS_MSGPACK_RPC_HELPERS_H
-
-#include <stdint.h>
-#include <stdbool.h>
-
-#include <msgpack.h>
-
-#include "nvim/api/private/defs.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/msgpack_rpc_helpers.h.generated.h"
-#endif
-
-#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H
-
diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c
index d4fffaa053..414c8841fa 100644
--- a/src/nvim/os/provider.c
+++ b/src/nvim/os/provider.c
@@ -8,7 +8,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
-#include "nvim/os/channel.h"
+#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/shell.h"
#include "nvim/os/os.h"
#include "nvim/log.h"
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index 8f1c30de50..8cfd9d1b75 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -8,8 +8,6 @@
#include "nvim/os/uv_helpers.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/event.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
@@ -33,7 +31,6 @@ struct rstream {
uv_file fd;
rstream_cb cb;
bool free_handle;
- EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -76,18 +73,13 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count)
void rbuffer_produced(RBuffer *rbuffer, size_t count)
{
rbuffer->wpos += count;
- DLOG("Received %u bytes from RStream(address: %p, source: %p)",
- (size_t)cnt,
- rbuffer->rstream,
- rstream_event_source(rbuffer->rstream));
+ DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rbuffer->rstream);
rbuffer_relocate(rbuffer);
if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
// The last read filled the buffer, stop reading for now
rstream_stop(rbuffer->rstream);
- DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
- rstream,
- rstream_event_source(rstream));
+ DLOG("Buffer for RStream(%p) is full, stopping it", rstream);
}
}
@@ -180,13 +172,8 @@ void rbuffer_free(RBuffer *rbuffer)
/// for reading with `rstream_read`
/// @param buffer RBuffer instance to associate with the RStream
/// @param data Some state to associate with the `RStream` instance
-/// @param source_override Replacement for the default source used in events
-/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
-RStream * rstream_new(rstream_cb cb,
- RBuffer *buffer,
- void *data,
- EventSource source_override)
+RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = buffer;
@@ -198,7 +185,6 @@ RStream * rstream_new(rstream_cb cb,
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
- rv->source_override = source_override ? source_override : rv;
return rv;
}
@@ -322,21 +308,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count)
return rbuffer_read(rstream->buffer, buffer, count);
}
-/// Runs the read callback associated with the rstream
-///
-/// @param event Object containing data necessary to invoke the callback
-void rstream_read_event(Event event)
-{
- RStream *rstream = event.data.rstream.ptr;
-
- rstream->cb(rstream, rstream->data, event.data.rstream.eof);
-}
-
-EventSource rstream_event_source(RStream *rstream)
-{
- return rstream->source_override;
-}
-
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@@ -357,13 +328,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
- DLOG("Closing RStream(address: %p, source: %p)",
- rstream,
- rstream_event_source(rstream));
+ DLOG("Closing RStream(%p)", rstream);
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
- emit_read_event(rstream, true);
+ rstream->cb(rstream, rstream->data, true);
}
return;
}
@@ -374,7 +343,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(rstream->buffer, nread);
- emit_read_event(rstream, false);
+ rstream->cb(rstream, rstream->data, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -409,7 +378,6 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
- emit_read_event(rstream, true);
return;
}
@@ -417,7 +385,6 @@ static void fread_idle_cb(uv_idle_t *handle)
size_t nread = (size_t) req.result;
rbuffer_produced(rstream->buffer, nread);
rstream->fpos += nread;
- emit_read_event(rstream, false);
}
static void close_cb(uv_handle_t *handle)
@@ -426,19 +393,6 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
-static void emit_read_event(RStream *rstream, bool eof)
-{
- Event event = {
- .source = rstream_event_source(rstream),
- .type = kEventRStreamData,
- .data.rstream = {
- .ptr = rstream,
- .eof = eof
- }
- };
- event_push(event);
-}
-
static void rbuffer_relocate(RBuffer *rbuffer)
{
// Move data ...
diff --git a/src/nvim/os/server.h b/src/nvim/os/server.h
deleted file mode 100644
index 43592a91e4..0000000000
--- a/src/nvim/os/server.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef NVIM_OS_SERVER_H
-#define NVIM_OS_SERVER_H
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/server.h.generated.h"
-#endif
-#endif // NVIM_OS_SERVER_H
diff --git a/src/nvim/os/server_defs.h b/src/nvim/os/server_defs.h
deleted file mode 100644
index 08cdf55428..0000000000
--- a/src/nvim/os/server_defs.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef NVIM_OS_SERVER_DEFS_H
-#define NVIM_OS_SERVER_DEFS_H
-
-typedef struct server Server;
-
-#endif // NVIM_OS_SERVER_DEFS_H
-
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 453cc6d605..d5464f7975 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -1,4 +1,5 @@
#include <string.h>
+#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
@@ -7,6 +8,7 @@
#include "nvim/ascii.h"
#include "nvim/lib/kvec.h"
#include "nvim/log.h"
+#include "nvim/os/event.h"
#include "nvim/os/job.h"
#include "nvim/os/rstream.h"
#include "nvim/os/shell.h"
@@ -58,11 +60,11 @@ typedef struct {
/// `shell_free_argv` when no longer needed.
char **shell_build_argv(const char_u *cmd, const char_u *extra_shell_opt)
{
- int argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL);
+ size_t argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL);
char **rv = xmalloc((unsigned)((argc + 4) * sizeof(char *)));
// Split 'shell'
- int i = tokenize(p_sh, rv);
+ size_t i = tokenize(p_sh, rv);
if (extra_shell_opt != NULL) {
// Push a copy of `extra_shell_opt`
@@ -212,7 +214,7 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg)
// Keep running the loop until all three handles are completely closed
while (pdata.exited < expected_exits) {
- uv_run(uv_default_loop(), UV_RUN_ONCE);
+ event_poll(0);
if (got_int) {
// Forward SIGINT to the shell
@@ -356,9 +358,9 @@ static void system_data_cb(RStream *rstream, void *data, bool eof)
/// @param argv The vector that will be filled with copies of the parsed
/// words. It can be NULL if the caller only needs to count words.
/// @return The number of words parsed.
-static int tokenize(const char_u *str, char **argv)
+static size_t tokenize(const char_u *str, char **argv)
{
- int argc = 0, len;
+ size_t argc = 0, len;
char_u *p = (char_u *) str;
while (*p != NUL) {
@@ -383,11 +385,11 @@ static int tokenize(const char_u *str, char **argv)
///
/// @param str A pointer to the first character of the word
/// @return The offset from `str` at which the word ends.
-static int word_length(const char_u *str)
+static size_t word_length(const char_u *str)
{
const char_u *p = str;
bool inquote = false;
- int length = 0;
+ size_t length = 0;
// Move `p` to the end of shell word by advancing the pointer while it's
// inside a quote or it's a non-whitespace character
@@ -418,15 +420,15 @@ static void write_selection(uv_write_t *req)
// TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and
// only after filled we should start allocating memory(skip unnecessary
// allocations for small writes)
- int buflen = BUFFER_LENGTH;
+ size_t buflen = BUFFER_LENGTH;
pdata->wbuffer = (char *)xmalloc(buflen);
uv_buf_t uvbuf;
linenr_T lnum = curbuf->b_op_start.lnum;
- int off = 0;
- int written = 0;
+ size_t off = 0;
+ size_t written = 0;
char_u *lp = ml_get(lnum);
- int l;
- int len;
+ size_t l;
+ size_t len;
for (;;) {
l = strlen((char *)lp + written);
@@ -443,7 +445,7 @@ static void write_selection(uv_write_t *req)
pdata->wbuffer[off++] = NUL;
} else {
char_u *s = vim_strchr(lp + written, NL);
- len = s == NULL ? l : s - (lp + written);
+ len = s == NULL ? l : (size_t)(s - (lp + written));
while (off + len >= buflen) {
// Resize the buffer
buflen *= 2;
@@ -584,6 +586,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
ProcessData *data = (ProcessData *)proc->data;
data->exited++;
- data->exit_status = status;
+ assert(status <= INT_MAX);
+ data->exit_status = (int)status;
uv_close((uv_handle_t *)proc, NULL);
}
diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c
index 2f93cfb08a..36f7b37c48 100644
--- a/src/nvim/os/signal.c
+++ b/src/nvim/os/signal.c
@@ -12,8 +12,6 @@
#include "nvim/memory.h"
#include "nvim/misc1.h"
#include "nvim/misc2.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/event.h"
#include "nvim/os/signal.h"
static uv_signal_t sint, spipe, shup, squit, sterm, swinch;
@@ -72,45 +70,6 @@ void signal_accept_deadly(void)
rejecting_deadly = false;
}
-void signal_handle(Event event)
-{
- int signum = event.data.signum;
-
- switch (signum) {
- case SIGINT:
- got_int = true;
- break;
-#ifdef SIGPWR
- case SIGPWR:
- // Signal of a power failure(eg batteries low), flush the swap files to
- // be safe
- ml_sync_all(false, false);
- break;
-#endif
- case SIGPIPE:
- // Ignore
- break;
- case SIGWINCH:
- shell_resized();
- break;
- case SIGTERM:
- case SIGQUIT:
- case SIGHUP:
- if (!rejecting_deadly) {
- deadly_signal(signum);
- }
- break;
- default:
- fprintf(stderr, "Invalid signal %d", signum);
- break;
- }
-}
-
-EventSource signal_event_source(void)
-{
- return &sint;
-}
-
static char * signal_name(int signum)
{
switch (signum) {
@@ -154,20 +113,32 @@ static void deadly_signal(int signum)
static void signal_cb(uv_signal_t *handle, int signum)
{
- if (rejecting_deadly) {
- if (signum == SIGINT) {
+ switch (signum) {
+ case SIGINT:
got_int = true;
- }
-
- return;
+ break;
+#ifdef SIGPWR
+ case SIGPWR:
+ // Signal of a power failure(eg batteries low), flush the swap files to
+ // be safe
+ ml_sync_all(false, false);
+ break;
+#endif
+ case SIGPIPE:
+ // Ignore
+ break;
+ case SIGWINCH:
+ shell_resized();
+ break;
+ case SIGTERM:
+ case SIGQUIT:
+ case SIGHUP:
+ if (!rejecting_deadly) {
+ deadly_signal(signum);
+ }
+ break;
+ default:
+ fprintf(stderr, "Invalid signal %d", signum);
+ break;
}
-
- Event event = {
- .source = signal_event_source(),
- .type = kEventSignal,
- .data = {
- .signum = signum
- }
- };
- event_push(event);
}
diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c
index e3b76ac833..a4871ef499 100644
--- a/src/nvim/os/time.c
+++ b/src/nvim/os/time.c
@@ -1,3 +1,4 @@
+#include <assert.h>
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
@@ -64,23 +65,6 @@ void os_microdelay(uint64_t microseconds, bool ignoreinput)
}
}
-static void microdelay(uint64_t microseconds)
-{
- uint64_t hrtime;
- int64_t ns = microseconds * 1000; // convert to nanoseconds
-
- uv_mutex_lock(&delay_mutex);
-
- while (ns > 0) {
- hrtime = uv_hrtime();
- if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns) == UV_ETIMEDOUT)
- break;
- ns -= uv_hrtime() - hrtime;
- }
-
- uv_mutex_unlock(&delay_mutex);
-}
-
/// Portable version of POSIX localtime_r()
///
/// @return NULL in case of error
@@ -112,3 +96,23 @@ struct tm *os_get_localtime(struct tm *result) FUNC_ATTR_NONNULL_ALL
time_t rawtime = time(NULL);
return os_localtime_r(&rawtime, result);
}
+
+static void microdelay(uint64_t microseconds)
+{
+ uint64_t elapsed = 0;
+ uint64_t ns = microseconds * 1000; // convert to nanoseconds
+ uint64_t base = uv_hrtime();
+
+ uv_mutex_lock(&delay_mutex);
+
+ while (elapsed < ns) {
+ if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns - elapsed)
+ == UV_ETIMEDOUT)
+ break;
+ uint64_t now = uv_hrtime();
+ elapsed += now - base;
+ base = now;
+ }
+
+ uv_mutex_unlock(&delay_mutex);
+}
diff --git a/src/nvim/os_unix.c b/src/nvim/os_unix.c
index 52f57f8262..0ad15bc433 100644
--- a/src/nvim/os_unix.c
+++ b/src/nvim/os_unix.c
@@ -54,8 +54,8 @@
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
#include "nvim/os/job.h"
-#include "nvim/os/msgpack_rpc.h"
-#include "nvim/os/msgpack_rpc_helpers.h"
+#include "nvim/msgpack_rpc/helpers.h"
+#include "nvim/msgpack_rpc/defs.h"
#if defined(HAVE_SYS_IOCTL_H)
# include <sys/ioctl.h>
@@ -166,8 +166,6 @@ void mch_init(void)
mac_conv_init();
#endif
- msgpack_rpc_init();
- msgpack_rpc_helpers_init();
event_init();
}