aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nvim/api/buffer.c24
-rw-r--r--src/nvim/api/private/defs.h4
-rw-r--r--src/nvim/api/private/helpers.c21
-rw-r--r--src/nvim/api/private/helpers.h43
-rw-r--r--src/nvim/api/tabpage.c10
-rw-r--r--src/nvim/api/vim.c6
-rw-r--r--src/nvim/api/window.c28
-rw-r--r--src/nvim/edit.c2
-rw-r--r--src/nvim/eval.c32
-rw-r--r--src/nvim/ex_getln.c6
-rw-r--r--src/nvim/lib/kvec.h48
-rw-r--r--src/nvim/message.c2
-rw-r--r--src/nvim/normal.c2
-rw-r--r--src/nvim/os/channel.c89
-rw-r--r--src/nvim/os/channel.h2
-rw-r--r--src/nvim/os/event.c87
-rw-r--r--src/nvim/os/input.c6
-rw-r--r--src/nvim/os/job.c146
-rw-r--r--src/nvim/os/job.h2
-rw-r--r--src/nvim/os/msgpack_rpc.c27
-rw-r--r--src/nvim/os/msgpack_rpc.h19
-rw-r--r--src/nvim/os/rstream.c38
-rw-r--r--src/nvim/os/signal.c2
-rw-r--r--src/nvim/os/wstream.c21
-rw-r--r--src/nvim/os/wstream_defs.h1
25 files changed, 419 insertions, 249 deletions
diff --git a/src/nvim/api/buffer.c b/src/nvim/api/buffer.c
index adcdc6da94..034ced184a 100644
--- a/src/nvim/api/buffer.c
+++ b/src/nvim/api/buffer.c
@@ -31,7 +31,7 @@
/// @return The line count
Integer buffer_get_length(Buffer buffer, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return 0;
@@ -100,7 +100,7 @@ StringArray buffer_get_slice(Buffer buffer,
Error *err)
{
StringArray rv = ARRAY_DICT_INIT;
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return rv;
@@ -160,7 +160,7 @@ void buffer_set_slice(Buffer buffer,
StringArray replacement,
Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@@ -283,7 +283,7 @@ end:
/// @return The variable value
Object buffer_get_var(Buffer buffer, String name, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return (Object) OBJECT_INIT;
@@ -301,7 +301,7 @@ Object buffer_get_var(Buffer buffer, String name, Error *err)
/// @return The old value
Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return (Object) OBJECT_INIT;
@@ -318,7 +318,7 @@ Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
/// @return The option value
Object buffer_get_option(Buffer buffer, String name, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return (Object) OBJECT_INIT;
@@ -336,7 +336,7 @@ Object buffer_get_option(Buffer buffer, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@@ -353,7 +353,7 @@ void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
Integer buffer_get_number(Buffer buffer, Error *err)
{
Integer rv = 0;
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return rv;
@@ -370,7 +370,7 @@ Integer buffer_get_number(Buffer buffer, Error *err)
String buffer_get_name(Buffer buffer, Error *err)
{
String rv = STRING_INIT;
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf || buf->b_ffname == NULL) {
return rv;
@@ -386,7 +386,7 @@ String buffer_get_name(Buffer buffer, Error *err)
/// @param[out] err Details of an error that may have occurred
void buffer_set_name(Buffer buffer, String name, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@@ -416,7 +416,7 @@ void buffer_set_name(Buffer buffer, String name, Error *err)
Boolean buffer_is_valid(Buffer buffer)
{
Error stub = {.set = false};
- return find_buffer(buffer, &stub) != NULL;
+ return find_buffer_by_handle(buffer, &stub) != NULL;
}
/// Inserts a sequence of lines to a buffer at a certain index
@@ -440,7 +440,7 @@ void buffer_insert(Buffer buffer, Integer lnum, StringArray lines, Error *err)
Position buffer_get_mark(Buffer buffer, String name, Error *err)
{
Position rv = POSITION_INIT;
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return rv;
diff --git a/src/nvim/api/private/defs.h b/src/nvim/api/private/defs.h
index ab806941f4..ee0fc02c4d 100644
--- a/src/nvim/api/private/defs.h
+++ b/src/nvim/api/private/defs.h
@@ -49,14 +49,14 @@ typedef struct {
typedef struct {
Object *items;
- size_t size;
+ size_t size, capacity;
} Array;
typedef struct key_value_pair KeyValuePair;
typedef struct {
KeyValuePair *items;
- size_t size;
+ size_t size, capacity;
} Dictionary;
typedef enum {
diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c
index 264ac7a507..30301e9368 100644
--- a/src/nvim/api/private/helpers.c
+++ b/src/nvim/api/private/helpers.c
@@ -288,12 +288,7 @@ Object vim_to_object(typval_T *obj)
return rv;
}
-/// Finds the pointer for a window number
-///
-/// @param window the window number
-/// @param[out] err Details of an error that may have occurred
-/// @return the window pointer
-buf_T *find_buffer(Buffer buffer, Error *err)
+buf_T *find_buffer_by_handle(Buffer buffer, Error *err)
{
buf_T *rv = handle_get_buffer(buffer);
@@ -304,12 +299,7 @@ buf_T *find_buffer(Buffer buffer, Error *err)
return rv;
}
-/// Finds the pointer for a window number
-///
-/// @param window the window number
-/// @param[out] err Details of an error that may have occurred
-/// @return the window pointer
-win_T * find_window(Window window, Error *err)
+win_T * find_window_by_handle(Window window, Error *err)
{
win_T *rv = handle_get_window(window);
@@ -320,12 +310,7 @@ win_T * find_window(Window window, Error *err)
return rv;
}
-/// Finds the pointer for a tabpage number
-///
-/// @param tabpage the tabpage number
-/// @param[out] err Details of an error that may have occurred
-/// @return the tabpage pointer
-tabpage_T * find_tab(Tabpage tabpage, Error *err)
+tabpage_T * find_tab_by_handle(Tabpage tabpage, Error *err)
{
tabpage_T *rv = handle_get_tabpage(tabpage);
diff --git a/src/nvim/api/private/helpers.h b/src/nvim/api/private/helpers.h
index 68ab4ff614..e1e1a35490 100644
--- a/src/nvim/api/private/helpers.h
+++ b/src/nvim/api/private/helpers.h
@@ -6,6 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
+#include "nvim/lib/kvec.h"
#define set_api_error(message, err) \
do { \
@@ -13,6 +14,48 @@
err->set = true; \
} while (0)
+#define BOOL_OBJ(b) ((Object) { \
+ .type = kObjectTypeBoolean, \
+ .data.boolean = b \
+ })
+
+#define INTEGER_OBJ(i) ((Object) { \
+ .type = kObjectTypeInteger, \
+ .data.integer = i \
+ })
+
+#define STRING_OBJ(s) ((Object) { \
+ .type = kObjectTypeString, \
+ .data.string = cstr_to_string(s) \
+ })
+
+#define STRINGL_OBJ(d, s) ((Object) { \
+ .type = kObjectTypeString, \
+ .data.string = (String) { \
+ .size = s, \
+ .data = xmemdup(d, s) \
+ }})
+
+#define ARRAY_OBJ(a) ((Object) { \
+ .type = kObjectTypeArray, \
+ .data.array = a \
+ })
+
+#define DICTIONARY_OBJ(d) ((Object) { \
+ .type = kObjectTypeDictionary, \
+ .data.dictionary = d \
+ })
+
+#define NIL ((Object) {.type = kObjectTypeNil})
+
+#define PUT(dict, k, v) \
+ kv_push(KeyValuePair, \
+ dict, \
+ ((KeyValuePair) {.key = cstr_to_string(k), .value = v}))
+
+#define ADD(array, item) \
+ kv_push(Object, array, item)
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "api/private/helpers.h.generated.h"
#endif
diff --git a/src/nvim/api/tabpage.c b/src/nvim/api/tabpage.c
index 8d92b01cf9..535722c087 100644
--- a/src/nvim/api/tabpage.c
+++ b/src/nvim/api/tabpage.c
@@ -16,7 +16,7 @@
WindowArray tabpage_get_windows(Tabpage tabpage, Error *err)
{
WindowArray rv = ARRAY_DICT_INIT;
- tabpage_T *tab = find_tab(tabpage, err);
+ tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return rv;
@@ -53,7 +53,7 @@ WindowArray tabpage_get_windows(Tabpage tabpage, Error *err)
/// @return The variable value
Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
{
- tabpage_T *tab = find_tab(tabpage, err);
+ tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return (Object) OBJECT_INIT;
@@ -71,7 +71,7 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
/// @return The tab page handle
Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
{
- tabpage_T *tab = find_tab(tabpage, err);
+ tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return (Object) OBJECT_INIT;
@@ -88,7 +88,7 @@ Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
Window tabpage_get_window(Tabpage tabpage, Error *err)
{
Window rv = 0;
- tabpage_T *tab = find_tab(tabpage, err);
+ tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return rv;
@@ -117,6 +117,6 @@ Window tabpage_get_window(Tabpage tabpage, Error *err)
Boolean tabpage_is_valid(Tabpage tabpage)
{
Error stub = {.set = false};
- return find_tab(tabpage, &stub) != NULL;
+ return find_tab_by_handle(tabpage, &stub) != NULL;
}
diff --git a/src/nvim/api/vim.c b/src/nvim/api/vim.c
index b99311a61a..e7261e1096 100644
--- a/src/nvim/api/vim.c
+++ b/src/nvim/api/vim.c
@@ -291,7 +291,7 @@ Buffer vim_get_current_buffer(void)
/// @param[out] err Details of an error that may have occurred
void vim_set_current_buffer(Buffer buffer, Error *err)
{
- buf_T *buf = find_buffer(buffer, err);
+ buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@@ -348,7 +348,7 @@ Window vim_get_current_window(void)
/// @param handle The window handle
void vim_set_current_window(Window window, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@@ -407,7 +407,7 @@ Tabpage vim_get_current_tabpage(void)
/// @param[out] err Details of an error that may have occurred
void vim_set_current_tabpage(Tabpage tabpage, Error *err)
{
- tabpage_T *tp = find_tab(tabpage, err);
+ tabpage_T *tp = find_tab_by_handle(tabpage, err);
if (!tp) {
return;
diff --git a/src/nvim/api/window.c b/src/nvim/api/window.c
index 2b81afdb0c..548d8171a5 100644
--- a/src/nvim/api/window.c
+++ b/src/nvim/api/window.c
@@ -19,7 +19,7 @@
/// @return The buffer handle
Buffer window_get_buffer(Window window, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return 0;
@@ -36,7 +36,7 @@ Buffer window_get_buffer(Window window, Error *err)
Position window_get_cursor(Window window, Error *err)
{
Position rv = {.row = 0, .col = 0};
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (win) {
rv.row = win->w_cursor.lnum;
@@ -53,7 +53,7 @@ Position window_get_cursor(Window window, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_cursor(Window window, Position pos, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@@ -89,7 +89,7 @@ void window_set_cursor(Window window, Position pos, Error *err)
/// @return the height in rows
Integer window_get_height(Window window, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return 0;
@@ -106,7 +106,7 @@ Integer window_get_height(Window window, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_height(Window window, Integer height, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@@ -132,7 +132,7 @@ void window_set_height(Window window, Integer height, Error *err)
/// @return the width in columns
Integer window_get_width(Window window, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return 0;
@@ -149,7 +149,7 @@ Integer window_get_width(Window window, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_width(Window window, Integer width, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@@ -176,7 +176,7 @@ void window_set_width(Window window, Integer width, Error *err)
/// @return The variable value
Object window_get_var(Window window, String name, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return (Object) OBJECT_INIT;
@@ -194,7 +194,7 @@ Object window_get_var(Window window, String name, Error *err)
/// @return The old value
Object window_set_var(Window window, String name, Object value, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return (Object) OBJECT_INIT;
@@ -211,7 +211,7 @@ Object window_set_var(Window window, String name, Object value, Error *err)
/// @return The option value
Object window_get_option(Window window, String name, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return (Object) OBJECT_INIT;
@@ -229,7 +229,7 @@ Object window_get_option(Window window, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_option(Window window, String name, Object value, Error *err)
{
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@@ -246,7 +246,7 @@ void window_set_option(Window window, String name, Object value, Error *err)
Position window_get_position(Window window, Error *err)
{
Position rv;
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (win) {
rv.col = win->w_wincol;
@@ -264,7 +264,7 @@ Position window_get_position(Window window, Error *err)
Tabpage window_get_tabpage(Window window, Error *err)
{
Tabpage rv = 0;
- win_T *win = find_window(window, err);
+ win_T *win = find_window_by_handle(window, err);
if (win) {
rv = win_find_tabpage(win)->handle;
@@ -280,6 +280,6 @@ Tabpage window_get_tabpage(Window window, Error *err)
Boolean window_is_valid(Window window)
{
Error stub = {.set = false};
- return find_window(window, &stub) != NULL;
+ return find_window_by_handle(window, &stub) != NULL;
}
diff --git a/src/nvim/edit.c b/src/nvim/edit.c
index 5a70964d57..c0a99baf0f 100644
--- a/src/nvim/edit.c
+++ b/src/nvim/edit.c
@@ -939,7 +939,7 @@ doESCkey:
break;
case K_EVENT:
- event_process();
+ event_process(true);
break;
case K_HOME: /* <Home> */
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index 1de2b7bbf1..0df0c1e4f4 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -70,6 +70,7 @@
#include "nvim/os/rstream_defs.h"
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
+#include "nvim/api/private/helpers.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@@ -10471,11 +10472,13 @@ static void f_job_start(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
- rettv->vval.v_number = job_start(argv,
- xstrdup((char *)argvars[0].vval.v_string),
- on_job_stdout,
- on_job_stderr,
- on_job_exit);
+ job_start(argv,
+ xstrdup((char *)argvars[0].vval.v_string),
+ on_job_stdout,
+ on_job_stderr,
+ on_job_exit,
+ true,
+ &rettv->vval.v_number);
if (rettv->vval.v_number <= 0) {
if (rettv->vval.v_number == 0) {
@@ -10502,19 +10505,21 @@ static void f_job_stop(typval_T *argvars, typval_T *rettv)
return;
}
- if (!job_stop(argvars[0].vval.v_number)) {
+ Job *job = job_find(argvars[0].vval.v_number);
+
+ if (!job) {
// Probably an invalid job id
EMSG(_(e_invjob));
return;
}
+ job_stop(job);
rettv->vval.v_number = 1;
}
// "jobwrite()" function
static void f_job_write(typval_T *argvars, typval_T *rettv)
{
- bool res;
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
@@ -10529,16 +10534,17 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
return;
}
- res = job_write(argvars[0].vval.v_number,
- xstrdup((char *)argvars[1].vval.v_string),
- strlen((char *)argvars[1].vval.v_string));
+ Job *job = job_find(argvars[0].vval.v_number);
- if (!res) {
+ if (!job) {
// Invalid job id
EMSG(_(e_invjob));
}
- rettv->vval.v_number = 1;
+ WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
+ strlen((char *)argvars[1].vval.v_string),
+ free);
+ rettv->vval.v_number = job_write(job, buf);
}
/*
@@ -12550,7 +12556,7 @@ static void f_send_event(typval_T *argvars, typval_T *rettv)
if (!channel_send_event((uint64_t)argvars[0].vval.v_number,
(char *)argvars[1].vval.v_string,
- &argvars[2])) {
+ vim_to_object(&argvars[2]))) {
EMSG2(_(e_invarg2), "Channel doesn't exist");
return;
}
diff --git a/src/nvim/ex_getln.c b/src/nvim/ex_getln.c
index 02c82eaf21..9b09abb1b7 100644
--- a/src/nvim/ex_getln.c
+++ b/src/nvim/ex_getln.c
@@ -758,7 +758,7 @@ getcmdline (
*/
switch (c) {
case K_EVENT:
- event_process();
+ event_process(true);
// Force a redraw even though the command line didn't change
shell_resized();
goto cmdline_not_changed;
@@ -1873,8 +1873,8 @@ redraw:
}
if (IS_SPECIAL(c1)) {
- // Process pending events
- event_process();
+ // Process deferred events
+ event_process(true);
// Ignore other special key codes
continue;
}
diff --git a/src/nvim/lib/kvec.h b/src/nvim/lib/kvec.h
index fe17afb7c2..982b5d6f1c 100644
--- a/src/nvim/lib/kvec.h
+++ b/src/nvim/lib/kvec.h
@@ -53,39 +53,39 @@ int main() {
#define kv_roundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
-#define kvec_t(type) struct { size_t n, m; type *a; }
-#define kv_init(v) ((v).n = (v).m = 0, (v).a = 0)
-#define kv_destroy(v) free((v).a)
-#define kv_A(v, i) ((v).a[(i)])
-#define kv_pop(v) ((v).a[--(v).n])
-#define kv_size(v) ((v).n)
-#define kv_max(v) ((v).m)
+#define kvec_t(type) struct { size_t size, capacity; type *items; }
+#define kv_init(v) ((v).size = (v).capacity = 0, (v).items = 0)
+#define kv_destroy(v) free((v).items)
+#define kv_A(v, i) ((v).items[(i)])
+#define kv_pop(v) ((v).items[--(v).size])
+#define kv_size(v) ((v).size)
+#define kv_max(v) ((v).capacity)
-#define kv_resize(type, v, s) ((v).m = (s), (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m))
+#define kv_resize(type, v, s) ((v).capacity = (s), (v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity))
#define kv_copy(type, v1, v0) do { \
- if ((v1).m < (v0).n) kv_resize(type, v1, (v0).n); \
- (v1).n = (v0).n; \
- memcpy((v1).a, (v0).a, sizeof(type) * (v0).n); \
+ if ((v1).capacity < (v0).size) kv_resize(type, v1, (v0).size); \
+ (v1).size = (v0).size; \
+ memcpy((v1).items, (v0).items, sizeof(type) * (v0).size); \
} while (0) \
#define kv_push(type, v, x) do { \
- if ((v).n == (v).m) { \
- (v).m = (v).m? (v).m<<1 : 2; \
- (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m); \
+ if ((v).size == (v).capacity) { \
+ (v).capacity = (v).capacity? (v).capacity<<1 : 8; \
+ (v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity); \
} \
- (v).a[(v).n++] = (x); \
+ (v).items[(v).size++] = (x); \
} while (0)
-#define kv_pushp(type, v) (((v).n == (v).m)? \
- ((v).m = ((v).m? (v).m<<1 : 2), \
- (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \
- : 0), ((v).a + ((v).n++))
+#define kv_pushp(type, v) (((v).size == (v).capacity)? \
+ ((v).capacity = ((v).capacity? (v).capacity<<1 : 8), \
+ (v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity), 0) \
+ : 0), ((v).items + ((v).size++))
-#define kv_a(type, v, i) (((v).m <= (size_t)(i)? \
- ((v).m = (v).n = (i) + 1, kv_roundup32((v).m), \
- (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \
- : (v).n <= (size_t)(i)? (v).n = (i) + 1 \
- : 0), (v).a[(i)])
+#define kv_a(type, v, i) (((v).capacity <= (size_t)(i)? \
+ ((v).capacity = (v).size = (i) + 1, kv_roundup32((v).capacity), \
+ (v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity), 0) \
+ : (v).size <= (size_t)(i)? (v).size = (i) + 1 \
+ : 0), (v).items[(i)])
#endif
diff --git a/src/nvim/message.c b/src/nvim/message.c
index 281270155a..1571e8678f 100644
--- a/src/nvim/message.c
+++ b/src/nvim/message.c
@@ -2064,7 +2064,7 @@ static int do_more_prompt(int typed_char)
toscroll = 0;
switch (c) {
case K_EVENT:
- event_process();
+ event_process(true);
break;
case BS: /* scroll one line back */
case K_BS:
diff --git a/src/nvim/normal.c b/src/nvim/normal.c
index f4f03f4ff8..9fb096f7d2 100644
--- a/src/nvim/normal.c
+++ b/src/nvim/normal.c
@@ -7368,5 +7368,5 @@ static void nv_cursorhold(cmdarg_T *cap)
static void nv_event(cmdarg_T *cap)
{
- event_process();
+ event_process(true);
}
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 9a692cf9fe..653f09756a 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -24,7 +24,7 @@ typedef struct {
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
- int job_id;
+ Job *job;
struct {
RStream *read;
WStream *write;
@@ -68,11 +68,26 @@ void channel_teardown()
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
-void channel_from_job(char **argv)
+bool channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
- channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL);
+
+ int status;
+ channel->data.job = job_start(argv,
+ channel,
+ job_out,
+ job_err,
+ job_exit,
+ true,
+ &status);
+
+ if (status <= 0) {
+ close_channel(channel);
+ return false;
+ }
+
+ return true;
}
/// Creates an API channel from a libuv stream representing a tcp or
@@ -101,12 +116,13 @@ void channel_from_stream(uv_stream_t *stream)
/// @param type The event type, an arbitrary string
/// @param obj The event data
/// @return True if the data was sent successfully, false otherwise.
-bool channel_send_event(uint64_t id, char *type, typval_T *data)
+bool channel_send_event(uint64_t id, char *type, Object data)
{
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ msgpack_rpc_free_object(data);
return false;
}
send_event(channel, type, data);
@@ -126,7 +142,7 @@ void channel_subscribe(uint64_t id, char *event)
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
- return;
+ abort();
}
char *event_string = pmap_get(cstr_t)(event_strings, event);
@@ -148,7 +164,7 @@ void channel_unsubscribe(uint64_t id, char *event)
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
- return;
+ abort();
}
unsubscribe(channel, event);
@@ -165,6 +181,11 @@ static void job_err(RStream *rstream, void *data, bool eof)
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
+static void job_exit(Job *job, void *data)
+{
+ // TODO(tarruda): what should be done here?
+}
+
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
Channel *channel = data;
@@ -183,30 +204,57 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
+ UnpackResult result;
+ msgpack_packer response;
// Deserialize everything we can.
- while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
+ while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
+ == kUnpackResultOk) {
// Each object is a new msgpack-rpc request and requires an empty response
- msgpack_packer response;
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
- wstream_new_buffer(channel->sbuffer->data,
+ wstream_new_buffer(xmemdup(channel->sbuffer->data,
+ channel->sbuffer->size),
channel->sbuffer->size,
- true));
+ free));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
+
+ if (result == kUnpackResultFail) {
+ // See src/msgpack/unpack_template.h in msgpack source tree for
+ // causes for this error(search for 'goto _failed')
+ //
+ // A not so uncommon cause for this might be deserializing objects with
+ // a high nesting level: msgpack will break when it's internal parse stack
+ // size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
+ msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
+ msgpack_pack_array(&response, 4);
+ msgpack_pack_int(&response, 1);
+ msgpack_pack_int(&response, 0);
+ msgpack_rpc_error("Invalid msgpack payload. "
+ "This error can also happen when deserializing "
+ "an object with high level of nesting",
+ &response);
+ wstream_write(channel->data.streams.write,
+ wstream_new_buffer(xmemdup(channel->sbuffer->data,
+ channel->sbuffer->size),
+ channel->sbuffer->size,
+ free));
+ // Clear the buffer for future calls
+ msgpack_sbuffer_clear(channel->sbuffer);
+ }
}
-static void send_event(Channel *channel, char *type, typval_T *data)
+static void send_event(Channel *channel, char *type, Object data)
{
wstream_write(channel->data.streams.write, serialize_event(type, data));
}
-static void broadcast_event(char *type, typval_T *data)
+static void broadcast_event(char *type, Object data)
{
kvec_t(Channel *) subscribed;
kv_init(subscribed);
@@ -219,6 +267,7 @@ static void broadcast_event(char *type, typval_T *data)
});
if (!kv_size(subscribed)) {
+ msgpack_rpc_free_object(data);
goto end;
}
@@ -255,7 +304,9 @@ static void close_channel(Channel *channel)
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
- job_stop(channel->data.job_id);
+ if (channel->data.job) {
+ job_stop(channel->data.job);
+ }
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
@@ -278,17 +329,17 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
-static WBuffer *serialize_event(char *type, typval_T *data)
+static WBuffer *serialize_event(char *type, Object data)
{
String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
- Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
- msgpack_rpc_notification(event_type, event_data, &packer);
- WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data,
+ msgpack_rpc_notification(event_type, data, &packer);
+ WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data,
+ msgpack_event_buffer.size),
msgpack_event_buffer.size,
- true);
- msgpack_rpc_free_object(event_data);
+ free);
+ msgpack_rpc_free_object(data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return rv;
diff --git a/src/nvim/os/channel.h b/src/nvim/os/channel.h
index 240461d22e..f12d54cede 100644
--- a/src/nvim/os/channel.h
+++ b/src/nvim/os/channel.h
@@ -2,8 +2,8 @@
#define NVIM_OS_CHANNEL_H
#include <uv.h>
-#include <msgpack.h>
+#include "nvim/api/private/defs.h"
#include "nvim/vim.h"
#define EVENT_MAXLEN 512
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index 2ebf28f436..6723b97e0c 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -21,17 +21,22 @@
#define _destroy_event(x) // do nothing
KLIST_INIT(Event, Event, _destroy_event)
+typedef struct {
+ bool timed_out;
+ int32_t ms;
+ uv_timer_t *timer;
+} TimerData;
+
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
-static klist_t(Event) *event_queue;
-static uv_timer_t timer;
-static uv_prepare_t timer_prepare;
+static klist_t(Event) *deferred_events, *immediate_events;
void event_init()
{
- // Initialize the event queue
- event_queue = kl_init(Event);
+ // Initialize the event queues
+ deferred_events = kl_init(Event);
+ immediate_events = kl_init(Event);
// Initialize input events
input_init();
// Timer to wake the event loop if a timeout argument is passed to
@@ -44,9 +49,6 @@ void event_init()
channel_init();
// Servers
server_init();
- uv_timer_init(uv_default_loop(), &timer);
- // This prepare handle that actually starts the timer
- uv_prepare_init(uv_default_loop(), &timer_prepare);
}
void event_teardown()
@@ -59,7 +61,6 @@ void event_teardown()
// Wait for some event
bool event_poll(int32_t ms)
{
- bool timed_out;
uv_run_mode run_mode = UV_RUN_ONCE;
if (input_ready()) {
@@ -67,15 +68,26 @@ bool event_poll(int32_t ms)
return true;
}
- input_start();
- timed_out = false;
+ static int recursive = 0;
+
+ if (!(recursive++)) {
+ // Only needs to start the libuv handle the first time we enter here
+ input_start();
+ }
+
+ uv_timer_t timer;
+ uv_prepare_t timer_prepare;
+ TimerData timer_data = {.ms = ms, .timed_out = false, .timer = &timer};
if (ms > 0) {
+ uv_timer_init(uv_default_loop(), &timer);
+ // This prepare handle that actually starts the timer
+ uv_prepare_init(uv_default_loop(), &timer_prepare);
// Timeout passed as argument to the timer
- timer.data = &timed_out;
+ timer.data = &timer_data;
// We only start the timer after the loop is running, for that we
// use a prepare handle(pass the interval as data to it)
- timer_prepare.data = &ms;
+ timer_prepare.data = &timer_data;
uv_prepare_start(&timer_prepare, timer_prepare_cb);
} else if (ms == 0) {
// For ms == 0, we need to do a non-blocking event poll by
@@ -87,40 +99,51 @@ bool event_poll(int32_t ms)
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
uv_run(uv_default_loop(), run_mode);
+ // Process immediate events outside uv_run since libuv event loop not
+ // support recursion(processing events may cause a recursive event_poll
+ // call)
+ event_process(false);
} while (
// Continue running if ...
!input_ready() && // we have no input
- kl_empty(event_queue) && // no events are waiting to be processed
+ !event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
- !timed_out); // we didn't get a timeout
+ !timer_data.timed_out); // we didn't get a timeout
- input_stop();
+ if (!(--recursive)) {
+ // Again, only stop when we leave the top-level invocation
+ input_stop();
+ }
if (ms > 0) {
- // Stop the timer
- uv_timer_stop(&timer);
+ // Ensure the timer-related handles are closed and run the event loop
+ // once more to let libuv perform it's cleanup
+ uv_close((uv_handle_t *)&timer, NULL);
+ uv_close((uv_handle_t *)&timer_prepare, NULL);
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ event_process(false);
}
- return input_ready() || event_is_pending();
+ return input_ready() || event_has_deferred();
}
-bool event_is_pending()
+bool event_has_deferred()
{
- return !kl_empty(event_queue);
+ return !kl_empty(get_queue(true));
}
// Push an event to the queue
-void event_push(Event event)
+void event_push(Event event, bool deferred)
{
- *kl_pushp(Event, event_queue) = event;
+ *kl_pushp(Event, get_queue(deferred)) = event;
}
// Runs the appropriate action for each queued event
-void event_process()
+void event_process(bool deferred)
{
Event event;
- while (kl_shift(Event, event_queue, &event) == 0) {
+ while (kl_shift(Event, get_queue(deferred), &event) == 0) {
switch (event.type) {
case kEventSignal:
signal_handle(event);
@@ -140,11 +163,19 @@ void event_process()
// Set a flag in the `event_poll` loop for signaling of a timeout
static void timer_cb(uv_timer_t *handle)
{
- *((bool *)handle->data) = true;
+ TimerData *data = handle->data;
+ data->timed_out = true;
}
static void timer_prepare_cb(uv_prepare_t *handle)
{
- uv_timer_start(&timer, timer_cb, *(uint32_t *)timer_prepare.data, 0);
- uv_prepare_stop(&timer_prepare);
+ TimerData *data = handle->data;
+ assert(data->ms > 0);
+ uv_timer_start(data->timer, timer_cb, (uint32_t)data->ms, 0);
+ uv_prepare_stop(handle);
+}
+
+static klist_t(Event) *get_queue(bool deferred)
+{
+ return deferred ? deferred_events : immediate_events;
}
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index 3e9751a4db..6e42cba4ad 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -67,7 +67,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
{
InbufPollResult result;
- if (event_is_pending()) {
+ if (event_has_deferred()) {
// Return pending event bytes
return push_event_key(buf, maxlen);
}
@@ -91,8 +91,8 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
}
}
- // If there are pending events, return the keys directly
- if (event_is_pending()) {
+ // If there are deferred events, return the keys directly
+ if (event_has_deferred()) {
return push_event_key(buf, maxlen);
}
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index f9f94158ae..b369004e47 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -37,6 +37,7 @@ struct job {
int pending_closes;
// If the job was already stopped
bool stopped;
+ bool defer;
// Data associated with the job
void *data;
// Callbacks
@@ -128,14 +129,18 @@ void job_teardown()
/// @param stderr_cb Callback that will be invoked when data is available
/// on stderr
/// @param exit_cb Callback that will be invoked when the job exits
-/// @return The job id if the job started successfully. If the the first item /
-/// of `argv`(the program) could not be executed, -1 will be returned.
-// 0 will be returned if the job table is full.
-int job_start(char **argv,
- void *data,
- rstream_cb stdout_cb,
- rstream_cb stderr_cb,
- job_exit_cb job_exit_cb)
+/// @param defer If the job callbacks invocation should be deferred to vim
+/// main loop
+/// @param[out] The job id if the job started successfully, 0 if the job table
+/// is full, -1 if the program could not be executed.
+/// @return The job pointer if the job started successfully, NULL otherwise
+Job *job_start(char **argv,
+ void *data,
+ rstream_cb stdout_cb,
+ rstream_cb stderr_cb,
+ job_exit_cb job_exit_cb,
+ bool defer,
+ int *status)
{
int i;
Job *job;
@@ -149,12 +154,14 @@ int job_start(char **argv,
if (i == MAX_RUNNING_JOBS) {
// No free slots
- return 0;
+ *status = 0;
+ return NULL;
}
job = xmalloc(sizeof(Job));
// Initialize
job->id = i + 1;
+ *status = job->id;
job->pending_refs = 3;
job->pending_closes = 4;
job->data = data;
@@ -175,6 +182,7 @@ int job_start(char **argv,
job->proc_stdin.data = NULL;
job->proc_stdout.data = NULL;
job->proc_stderr.data = NULL;
+ job->defer = defer;
// Initialize the job std{in,out,err}
uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
@@ -192,7 +200,8 @@ int job_start(char **argv,
// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
free_job(job);
- return -1;
+ *status = -1;
+ return NULL;
}
// Give all handles a reference to the job
@@ -204,8 +213,8 @@ int job_start(char **argv,
job->in = wstream_new(JOB_WRITE_MAXMEM);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
- job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
- job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -219,77 +228,64 @@ int job_start(char **argv,
}
job_count++;
- return job->id;
+ return job;
}
-/// Terminates a job. This is a non-blocking operation, but if the job exists
-/// it's guaranteed to succeed(SIGKILL will eventually be sent)
+/// Finds a job instance by id
///
/// @param id The job id
-/// @return true if the stop request was successfully sent, false if the job
-/// id is invalid(probably because it has already stopped)
-bool job_stop(int id)
+/// @return the Job instance
+Job *job_find(int id)
{
- Job *job = find_job(id);
+ Job *job;
- if (job == NULL || job->stopped) {
- return false;
+ if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
+ || job->stopped) {
+ return NULL;
}
- job->stopped = true;
+ return job;
+}
- return true;
+/// Terminates a job. This is a non-blocking operation, but if the job exists
+/// it's guaranteed to succeed(SIGKILL will eventually be sent)
+///
+/// @param job The Job instance
+void job_stop(Job *job)
+{
+ job->stopped = true;
}
/// Writes data to the job's stdin. This is a non-blocking operation, it
/// returns when the write request was sent.
///
-/// @param id The job id
-/// @param data Buffer containing the data to be written
-/// @param len Size of the data
-/// @return true if the write request was successfully sent, false if the job
-/// id is invalid(probably because it has already stopped)
-bool job_write(int id, char *data, uint32_t len)
+/// @param job The Job instance
+/// @param buffer The buffer which contains the data to be written
+/// @return true if the write request was successfully sent, false if writing
+/// to the job stream failed (possibly because the OS buffer is full)
+bool job_write(Job *job, WBuffer *buffer)
{
- Job *job = find_job(id);
-
- if (job == NULL || job->stopped) {
- free(data);
- return false;
- }
-
- if (!wstream_write(job->in, wstream_new_buffer(data, len, false))) {
- job_stop(job->id);
- return false;
- }
+ return wstream_write(job->in, buffer);
+}
- return true;
+/// Sets the `defer` flag for a Job instance
+///
+/// @param rstream The Job id
+/// @param defer The new value for the flag
+void job_set_defer(Job *job, bool defer)
+{
+ job->defer = defer;
+ rstream_set_defer(job->out, defer);
+ rstream_set_defer(job->err, defer);
}
+
/// Runs the read callback associated with the job exit event
///
/// @param event Object containing data necessary to invoke the callback
void job_exit_event(Event event)
{
- Job *job = event.data.job;
-
- // Free the slot now, 'exit_cb' may want to start another job to replace
- // this one
- table[job->id - 1] = NULL;
-
- if (job->exit_cb) {
- // Invoke the exit callback
- job->exit_cb(job, job->data);
- }
-
- // Free the job resources
- free_job(job);
-
- // Stop polling job status if this was the last
- job_count--;
- if (job_count == 0) {
- uv_prepare_stop(&job_prepare);
- }
+ job_exit_callback(event.data.job);
}
/// Get the job id
@@ -310,18 +306,30 @@ void *job_data(Job *job)
return job->data;
}
-static bool is_alive(Job *job)
+static void job_exit_callback(Job *job)
{
- return uv_process_kill(&job->proc, 0) == 0;
-}
+ // Free the slot now, 'exit_cb' may want to start another job to replace
+ // this one
+ table[job->id - 1] = NULL;
-static Job * find_job(int id)
-{
- if (id <= 0 || id > MAX_RUNNING_JOBS) {
- return NULL;
+ if (job->exit_cb) {
+ // Invoke the exit callback
+ job->exit_cb(job, job->data);
+ }
+
+ // Free the job resources
+ free_job(job);
+
+ // Stop polling job status if this was the last
+ job_count--;
+ if (job_count == 0) {
+ uv_prepare_stop(&job_prepare);
}
+}
- return table[id - 1];
+static bool is_alive(Job *job)
+{
+ return uv_process_kill(&job->proc, 0) == 0;
}
static void free_job(Job *job)
@@ -385,7 +393,7 @@ static void emit_exit_event(Job *job)
Event event;
event.type = kEventJobExit;
event.data.job = job;
- event_push(event);
+ event_push(event, true);
}
static void close_cb(uv_handle_t *handle)
diff --git a/src/nvim/os/job.h b/src/nvim/os/job.h
index f48218ffe7..e0ca615626 100644
--- a/src/nvim/os/job.h
+++ b/src/nvim/os/job.h
@@ -12,6 +12,8 @@
#include "nvim/os/rstream_defs.h"
#include "nvim/os/event_defs.h"
+#include "nvim/os/wstream.h"
+#include "nvim/os/wstream_defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/job.h.generated.h"
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
index 932a7717fd..63e1245028 100644
--- a/src/nvim/os/msgpack_rpc.c
+++ b/src/nvim/os/msgpack_rpc.c
@@ -79,11 +79,13 @@ void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
"Request array size is %u, it should be 4",
req->via.array.size);
msgpack_rpc_error(error_msg, res);
+ return;
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_pack_int(res, 0); // no message id yet
msgpack_rpc_error("Id must be a positive integer", res);
+ return;
}
// Set the response id, which is the same as the request
@@ -398,6 +400,31 @@ void msgpack_rpc_free_dictionary(Dictionary value)
free(value.items);
}
+UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
+ msgpack_unpacked* result)
+{
+ if (result->zone != NULL) {
+ msgpack_zone_free(result->zone);
+ }
+
+ int res = msgpack_unpacker_execute(unpacker);
+
+ if (res > 0) {
+ result->zone = msgpack_unpacker_release_zone(unpacker);
+ result->data = msgpack_unpacker_data(unpacker);
+ msgpack_unpacker_reset(unpacker);
+ return kUnpackResultOk;
+ }
+
+ if (res < 0) {
+ // Since we couldn't parse it, destroy the data consumed so far
+ msgpack_unpacker_reset(unpacker);
+ return kUnpackResultFail;
+ }
+
+ return kUnpackResultNeedMore;
+}
+
REMOTE_FUNCS_IMPL(Buffer, buffer)
REMOTE_FUNCS_IMPL(Window, window)
REMOTE_FUNCS_IMPL(Tabpage, tabpage)
diff --git a/src/nvim/os/msgpack_rpc.h b/src/nvim/os/msgpack_rpc.h
index c8f243e2cf..baabff20aa 100644
--- a/src/nvim/os/msgpack_rpc.h
+++ b/src/nvim/os/msgpack_rpc.h
@@ -9,6 +9,12 @@
#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
+typedef enum {
+ kUnpackResultOk, /// Successfully parsed a document
+ kUnpackResultFail, /// Got unexpected input
+ kUnpackResultNeedMore /// Need more data
+} UnpackResult;
+
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
@@ -40,6 +46,19 @@ void msgpack_rpc_dispatch(uint64_t id,
msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
+/// Try to unpack a msgpack document from the data in the unpacker buffer. This
+/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets
+/// the called know if the unpacking failed due to bad input or due to missing
+/// data.
+///
+/// @param unpacker The unpacker containing the parse buffer
+/// @param result The result which will contain the parsed object
+/// @return kUnpackResultOk : An object was parsed
+/// kUnpackResultFail : Got bad input
+/// kUnpackResultNeedMore: Need more data
+UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
+ msgpack_unpacked* result);
+
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index 1025201f7a..81714f7bae 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -24,7 +24,7 @@ struct rstream {
uv_file fd;
rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos;
- bool reading, free_handle, async;
+ bool reading, free_handle, defer;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -38,21 +38,19 @@ struct rstream {
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
-/// @param async Flag that specifies if the callback should only be called
-/// outside libuv event loop(When processing async events with
-/// KE_EVENT). Only the RStream instance reading user input should set
-/// this to false
+/// @param defer Flag that specifies if callback invocation should be deferred
+/// to vim main loop(as a KE_EVENT special key)
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
size_t buffer_size,
void *data,
- bool async)
+ bool defer)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
- rv->async = async;
+ rv->defer = defer;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
@@ -213,6 +211,15 @@ size_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
+/// Sets the `defer` flag for a a RStream instance
+///
+/// @param rstream The RStream instance
+/// @param defer The new value for the flag
+void rstream_set_defer(RStream *rstream, bool defer)
+{
+ rstream->defer = defer;
+}
+
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
@@ -333,16 +340,9 @@ static void close_cb(uv_handle_t *handle)
static void emit_read_event(RStream *rstream, bool eof)
{
- if (rstream->async) {
- Event event;
-
- event.type = kEventRStreamData;
- event.data.rstream.ptr = rstream;
- event.data.rstream.eof = eof;
- event_push(event);
- } else {
- // Invoke the callback passing in the number of bytes available and data
- // associated with the stream
- rstream->cb(rstream, rstream->data, eof);
- }
+ Event event;
+ event.type = kEventRStreamData;
+ event.data.rstream.ptr = rstream;
+ event.data.rstream.eof = eof;
+ event_push(event, rstream->defer);
}
diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c
index 85aa8ae5cb..cfdc8821a4 100644
--- a/src/nvim/os/signal.c
+++ b/src/nvim/os/signal.c
@@ -159,5 +159,5 @@ static void signal_cb(uv_signal_t *handle, int signum)
.signum = signum
}
};
- event_push(event);
+ event_push(event, true);
}
diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c
index c2ed05b78f..9a908a4348 100644
--- a/src/nvim/os/wstream.c
+++ b/src/nvim/os/wstream.c
@@ -21,8 +21,9 @@ struct wstream {
};
struct wbuffer {
- size_t refcount, size;
+ size_t size, refcount;
char *data;
+ wbuffer_data_finalizer cb;
};
typedef struct {
@@ -90,7 +91,7 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
// This should not be called after a wstream was freed
assert(!wstream->freed);
- if (wstream->curmem + buffer->size > wstream->maxmem) {
+ if (wstream->curmem > wstream->maxmem) {
return false;
}
@@ -116,19 +117,16 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
///
/// @param data Data stored by the WBuffer
/// @param size The size of the data array
-/// @param copy If true, the data will be copied into the WBuffer
+/// @param cb Pointer to function that will be responsible for freeing
+/// the buffer data(passing 'free' will work as expected).
/// @return The allocated WBuffer instance
-WBuffer *wstream_new_buffer(char *data, size_t size, bool copy)
+WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
rv->refcount = 0;
-
- if (copy) {
- rv->data = xmemdup(data, size);
- } else {
- rv->data = data;
- }
+ rv->cb = cb;
+ rv->data = data;
return rv;
}
@@ -141,8 +139,7 @@ static void write_cb(uv_write_t *req, int status)
data->wstream->curmem -= data->buffer->size;
if (!--data->buffer->refcount) {
- // Free the data written to the stream
- free(data->buffer->data);
+ data->buffer->cb(data->buffer->data);
free(data->buffer);
}
diff --git a/src/nvim/os/wstream_defs.h b/src/nvim/os/wstream_defs.h
index a7565c9bc7..1bf61ffce1 100644
--- a/src/nvim/os/wstream_defs.h
+++ b/src/nvim/os/wstream_defs.h
@@ -3,6 +3,7 @@
typedef struct wbuffer WBuffer;
typedef struct wstream WStream;
+typedef void (*wbuffer_data_finalizer)(void *data);
#endif // NVIM_OS_WSTREAM_DEFS_H