aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjörn Linse <bjorn.linse@gmail.com>2016-05-12 13:18:04 +0200
committerBjörn Linse <bjorn.linse@gmail.com>2016-08-20 10:25:33 +0200
commit215922120c43163f4e1cc00851bd1b86890d3a28 (patch)
tree60e3d96d8e31ea8d72d197ee740a4af89e5735cf
parent1b825a9ada4d89059645bc7a458e1e8d931c6161 (diff)
downloadrneovim-215922120c43163f4e1cc00851bd1b86890d3a28.tar.gz
rneovim-215922120c43163f4e1cc00851bd1b86890d3a28.tar.bz2
rneovim-215922120c43163f4e1cc00851bd1b86890d3a28.zip
stream: set data together with callback
-rw-r--r--src/nvim/eval.c4
-rw-r--r--src/nvim/event/process.c12
-rw-r--r--src/nvim/event/rstream.c17
-rw-r--r--src/nvim/event/socket.c4
-rw-r--r--src/nvim/event/stream.c9
-rw-r--r--src/nvim/event/stream.h3
-rw-r--r--src/nvim/event/wstream.c17
-rw-r--r--src/nvim/msgpack_rpc/channel.c21
-rw-r--r--src/nvim/os/input.c6
-rw-r--r--src/nvim/os/shell.c8
-rw-r--r--src/nvim/tui/input.c12
11 files changed, 54 insertions, 59 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c
index d936c9572a..2a6adbdc8b 100644
--- a/src/nvim/eval.c
+++ b/src/nvim/eval.c
@@ -22178,11 +22178,11 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
wstream_init(proc->in, 0);
if (proc->out) {
rstream_init(proc->out, 0);
- rstream_start(proc->out, on_job_stdout);
+ rstream_start(proc->out, on_job_stdout, data);
}
if (proc->err) {
rstream_init(proc->err, 0);
- rstream_start(proc->err, on_job_stderr);
+ rstream_start(proc->err, on_job_stderr, data);
}
pmap_put(uint64_t)(jobs, data->id, data);
rettv->vval.v_number = data->id;
diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c
index 317e40e43a..f507e3d71d 100644
--- a/src/nvim/event/process.c
+++ b/src/nvim/event/process.c
@@ -25,7 +25,7 @@
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
- stream_close(proc->stream, NULL); \
+ stream_close(proc->stream, NULL, NULL); \
} \
} while (0)
@@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return false;
}
- void *data = proc->data;
-
if (proc->in) {
- stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
+ stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
@@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->out) {
- stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
+ stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
@@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->err) {
- stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
+ stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
@@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream)
if (stream->read_cb) {
// Stream callback could miss EOF handling if a child keeps the stream
// open.
- stream->read_cb(stream, stream->buffer, 0, stream->data, true);
+ stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
}
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
index a520143064..5126dfd84e 100644
--- a/src/nvim/event/rstream.c
+++ b/src/nvim/event/rstream.c
@@ -17,21 +17,19 @@
# include "event/rstream.c.generated.h"
#endif
-void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
- void *data)
+void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL, data);
+ stream_init(loop, stream, fd, NULL);
rstream_init(stream, bufsize);
}
-void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
- void *data)
+void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream, data);
+ stream_init(NULL, stream, -1, uvstream);
rstream_init(stream, bufsize);
}
@@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize)
/// Starts watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
-void rstream_start(Stream *stream, stream_read_cb cb)
+void rstream_start(Stream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
+ stream->cb_data = data;
if (stream->uvstream) {
uv_read_start(stream->uvstream, alloc_cb, read_cb);
} else {
@@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
Stream *stream = data;
assert(stream->read_cb);
- rstream_start(stream, stream->read_cb);
+ rstream_start(stream, stream->read_cb, stream->cb_data);
}
// Callbacks used by libuv
@@ -179,7 +178,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
- stream->read_cb(stream, stream->buffer, count, stream->data, eof);
+ stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
if (stream->closed && !stream->pending_reqs) {
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
index cdaf40849b..8f9327f3d4 100644
--- a/src/nvim/event/socket.c
+++ b/src/nvim/event/socket.c
@@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0;
}
-int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
+int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
@@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
return result;
}
- stream_init(NULL, stream, -1, client, data);
+ stream_init(NULL, stream, -1, client);
return 0;
}
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
index 33404158cf..26083c20f4 100644
--- a/src/nvim/event/stream.c
+++ b/src/nvim/event/stream.c
@@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking)
return retval;
}
-void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
- void *data)
+void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(2)
{
stream->uvstream = uvstream;
@@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->uvstream->data = stream;
}
- stream->data = data;
stream->internal_data = NULL;
stream->fpos = 0;
stream->curmem = 0;
@@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->num_bytes = 0;
}
-void stream_close(Stream *stream, stream_close_cb on_stream_close)
+void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
stream->closed = true;
stream->close_cb = on_stream_close;
+ stream->close_cb_data = data;
if (!stream->pending_reqs) {
stream_close_handle(stream);
@@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle)
rbuffer_free(stream->buffer);
}
if (stream->close_cb) {
- stream->close_cb(stream, stream->data);
+ stream->close_cb(stream, stream->close_cb_data);
}
if (stream->internal_close_cb) {
stream->internal_close_cb(stream, stream->internal_data);
diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h
index ad4e24775b..a176fac1c0 100644
--- a/src/nvim/event/stream.h
+++ b/src/nvim/event/stream.h
@@ -44,13 +44,14 @@ struct stream {
uv_file fd;
stream_read_cb read_cb;
stream_write_cb write_cb;
+ void *cb_data;
stream_close_cb close_cb, internal_close_cb;
+ void *close_cb_data, *internal_data;
size_t fpos;
size_t curmem;
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
- void *data, *internal_data;
bool closed;
Queue *events;
};
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
index 8028e35e6b..fc7aad8eb9 100644
--- a/src/nvim/event/wstream.c
+++ b/src/nvim/event/wstream.c
@@ -22,19 +22,17 @@ typedef struct {
# include "event/wstream.c.generated.h"
#endif
-void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
- void *data)
+void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL, data);
+ stream_init(loop, stream, fd, NULL);
wstream_init(stream, maxmem);
}
-void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
- void *data)
+void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream, data);
+ stream_init(NULL, stream, -1, uvstream);
wstream_init(stream, maxmem);
}
@@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem)
///
/// @param stream The `Stream` instance
/// @param cb The callback
-void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
- FUNC_ATTR_NONNULL_ALL
+void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
+ FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream->write_cb = cb;
+ stream->cb_data = data;
}
/// Queues data for writing to the backing file descriptor of a `Stream`
@@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status)
wstream_release_wbuffer(data->buffer);
if (data->stream->write_cb) {
- data->stream->write_cb(data->stream, data->stream->data, status);
+ data->stream->write_cb(data->stream, data->stream->cb_data, status);
}
data->stream->pending_reqs--;
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 5b249ee1c7..c3378783f2 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -136,9 +136,9 @@ uint64_t channel_from_process(char **argv)
incref(channel); // process channels are only closed by the exit_cb
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
- rstream_start(proc->out, parse_msgpack);
+ rstream_start(proc->out, parse_msgpack, channel);
rstream_init(proc->err, 0);
- rstream_start(proc->err, forward_stderr);
+ rstream_start(proc->err, forward_stderr, channel);
return channel->id;
}
@@ -149,13 +149,13 @@ uint64_t channel_from_process(char **argv)
void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = register_channel(kChannelTypeSocket);
- socket_watcher_accept(watcher, &channel->data.stream, channel);
+ socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
- rstream_start(&channel->data.stream, parse_msgpack);
+ rstream_start(&channel->data.stream, parse_msgpack, channel);
}
/// Sends event/arguments to channel
@@ -317,11 +317,10 @@ void channel_from_stdio(void)
Channel *channel = register_channel(kChannelTypeStdio);
incref(channel); // stdio channels are only closed on exit
// read stream
- rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
- channel);
- rstream_start(&channel->data.std.in, parse_msgpack);
+ rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
+ rstream_start(&channel->data.std.in, parse_msgpack, channel);
// write stream
- wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL);
+ wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
@@ -637,7 +636,7 @@ static void close_channel(Channel *channel)
switch (channel->type) {
case kChannelTypeSocket:
- stream_close(&channel->data.stream, NULL);
+ stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
if (!channel->data.process.uvproc.process.closed) {
@@ -645,8 +644,8 @@ static void close_channel(Channel *channel)
}
break;
case kChannelTypeStdio:
- stream_close(&channel->data.std.in, NULL);
- stream_close(&channel->data.std.out, NULL);
+ stream_close(&channel->data.std.in, NULL, NULL);
+ stream_close(&channel->data.std.out, NULL, NULL);
queue_put(main_loop.fast_events, exit_event, 1, channel);
return;
default:
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index a4e01b18cd..c0c73364c0 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -60,8 +60,8 @@ void input_start(int fd)
}
global_fd = fd;
- rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL);
- rstream_start(&read_stream, read_cb);
+ rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE);
+ rstream_start(&read_stream, read_cb, NULL);
}
void input_stop(void)
@@ -71,7 +71,7 @@ void input_stop(void)
}
rstream_stop(&read_stream);
- stream_close(&read_stream, NULL);
+ stream_close(&read_stream, NULL, NULL);
}
static void cursorhold_event(void **argv)
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 64c673930a..ba52b9f661 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -236,10 +236,10 @@ static int do_os_system(char **argv,
}
proc->out->events = NULL;
rstream_init(proc->out, 0);
- rstream_start(proc->out, data_cb);
+ rstream_start(proc->out, data_cb, &buf);
proc->err->events = NULL;
rstream_init(proc->err, 0);
- rstream_start(proc->err, data_cb);
+ rstream_start(proc->err, data_cb, &buf);
// write the input, if any
if (input) {
@@ -251,7 +251,7 @@ static int do_os_system(char **argv,
return -1;
}
// close the input stream after everything is written
- wstream_set_write_cb(&in, shell_write_cb);
+ wstream_set_write_cb(&in, shell_write_cb, NULL);
}
// invoke busy_start here so event_poll_until wont change the busy state for
@@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
static void shell_write_cb(Stream *stream, void *data, int status)
{
- stream_close(stream, NULL);
+ stream_close(stream, NULL, NULL);
}
diff --git a/src/nvim/tui/input.c b/src/nvim/tui/input.c
index be256f3ebc..3ef4d34c9a 100644
--- a/src/nvim/tui/input.c
+++ b/src/nvim/tui/input.c
@@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop)
int curflags = termkey_get_canonflags(input->tk);
termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS);
// setup input handle
- rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input);
+ rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff);
// initialize a timer handle for handling ESC with libtermkey
time_watcher_init(loop, &input->timer_handle, input);
}
@@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input)
uv_mutex_destroy(&input->key_buffer_mutex);
uv_cond_destroy(&input->key_buffer_cond);
time_watcher_close(&input->timer_handle, NULL);
- stream_close(&input->read_stream, NULL);
+ stream_close(&input->read_stream, NULL, NULL);
termkey_destroy(input->tk);
}
void term_input_start(TermInput *input)
{
- rstream_start(&input->read_stream, read_cb);
+ rstream_start(&input->read_stream, read_cb, input);
}
void term_input_stop(TermInput *input)
@@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
//
// ls *.md | xargs nvim
input->in_fd = 2;
- stream_close(&input->read_stream, NULL);
+ stream_close(&input->read_stream, NULL, NULL);
queue_put(input->loop->fast_events, restart_reading, 1, input);
} else {
loop_schedule(&main_loop, event_create(1, input_done_event, 0));
@@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
static void restart_reading(void **argv)
{
TermInput *input = argv[0];
- rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input);
- rstream_start(&input->read_stream, read_cb);
+ rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff);
+ rstream_start(&input->read_stream, read_cb, input);
}