diff options
author | Björn Linse <bjorn.linse@gmail.com> | 2016-05-12 13:18:04 +0200 |
---|---|---|
committer | Björn Linse <bjorn.linse@gmail.com> | 2016-08-20 10:25:33 +0200 |
commit | 215922120c43163f4e1cc00851bd1b86890d3a28 (patch) | |
tree | 60e3d96d8e31ea8d72d197ee740a4af89e5735cf | |
parent | 1b825a9ada4d89059645bc7a458e1e8d931c6161 (diff) | |
download | rneovim-215922120c43163f4e1cc00851bd1b86890d3a28.tar.gz rneovim-215922120c43163f4e1cc00851bd1b86890d3a28.tar.bz2 rneovim-215922120c43163f4e1cc00851bd1b86890d3a28.zip |
stream: set data together with callback
-rw-r--r-- | src/nvim/eval.c | 4 | ||||
-rw-r--r-- | src/nvim/event/process.c | 12 | ||||
-rw-r--r-- | src/nvim/event/rstream.c | 17 | ||||
-rw-r--r-- | src/nvim/event/socket.c | 4 | ||||
-rw-r--r-- | src/nvim/event/stream.c | 9 | ||||
-rw-r--r-- | src/nvim/event/stream.h | 3 | ||||
-rw-r--r-- | src/nvim/event/wstream.c | 17 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 21 | ||||
-rw-r--r-- | src/nvim/os/input.c | 6 | ||||
-rw-r--r-- | src/nvim/os/shell.c | 8 | ||||
-rw-r--r-- | src/nvim/tui/input.c | 12 |
11 files changed, 54 insertions, 59 deletions
diff --git a/src/nvim/eval.c b/src/nvim/eval.c index d936c9572a..2a6adbdc8b 100644 --- a/src/nvim/eval.c +++ b/src/nvim/eval.c @@ -22178,11 +22178,11 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) wstream_init(proc->in, 0); if (proc->out) { rstream_init(proc->out, 0); - rstream_start(proc->out, on_job_stdout); + rstream_start(proc->out, on_job_stdout, data); } if (proc->err) { rstream_init(proc->err, 0); - rstream_start(proc->err, on_job_stderr); + rstream_start(proc->err, on_job_stderr, data); } pmap_put(uint64_t)(jobs, data->id, data); rettv->vval.v_number = data->id; diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 317e40e43a..f507e3d71d 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -25,7 +25,7 @@ #define CLOSE_PROC_STREAM(proc, stream) \ do { \ if (proc->stream && !proc->stream->closed) { \ - stream_close(proc->stream, NULL); \ + stream_close(proc->stream, NULL, NULL); \ } \ } while (0) @@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL return false; } - void *data = proc->data; - if (proc->in) { - stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); + stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe); proc->in->events = proc->events; proc->in->internal_data = proc; proc->in->internal_close_cb = on_process_stream_close; @@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->out) { - stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); + stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe); proc->out->events = proc->events; proc->out->internal_data = proc; proc->out->internal_close_cb = on_process_stream_close; @@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL } if (proc->err) { - stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); + stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe); proc->err->events = proc->events; proc->err->internal_data = proc; proc->err->internal_close_cb = on_process_stream_close; @@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream) if (stream->read_cb) { // Stream callback could miss EOF handling if a child keeps the stream // open. - stream->read_cb(stream, stream->buffer, 0, stream->data, true); + stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); } break; } diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c index a520143064..5126dfd84e 100644 --- a/src/nvim/event/rstream.c +++ b/src/nvim/event/rstream.c @@ -17,21 +17,19 @@ # include "event/rstream.c.generated.h" #endif -void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, - void *data) +void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(loop, stream, fd, NULL, data); + stream_init(loop, stream, fd, NULL); rstream_init(stream, bufsize); } -void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, - void *data) +void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(NULL, stream, -1, uvstream, data); + stream_init(NULL, stream, -1, uvstream); rstream_init(stream, bufsize); } @@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize) /// Starts watching for events from a `Stream` instance. /// /// @param stream The `Stream` instance -void rstream_start(Stream *stream, stream_read_cb cb) +void rstream_start(Stream *stream, stream_read_cb cb, void *data) FUNC_ATTR_NONNULL_ARG(1) { stream->read_cb = cb; + stream->cb_data = data; if (stream->uvstream) { uv_read_start(stream->uvstream, alloc_cb, read_cb); } else { @@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data) { Stream *stream = data; assert(stream->read_cb); - rstream_start(stream, stream->read_cb); + rstream_start(stream, stream->read_cb, stream->cb_data); } // Callbacks used by libuv @@ -179,7 +178,7 @@ static void read_event(void **argv) if (stream->read_cb) { size_t count = (uintptr_t)argv[1]; bool eof = (uintptr_t)argv[2]; - stream->read_cb(stream, stream->buffer, count, stream->data, eof); + stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); } stream->pending_reqs--; if (stream->closed && !stream->pending_reqs) { diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c index cdaf40849b..8f9327f3d4 100644 --- a/src/nvim/event/socket.c +++ b/src/nvim/event/socket.c @@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb) return 0; } -int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) +int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { uv_stream_t *client; @@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) return result; } - stream_init(NULL, stream, -1, client, data); + stream_init(NULL, stream, -1, client); return 0; } diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c index 33404158cf..26083c20f4 100644 --- a/src/nvim/event/stream.c +++ b/src/nvim/event/stream.c @@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking) return retval; } -void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, - void *data) +void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream) FUNC_ATTR_NONNULL_ARG(2) { stream->uvstream = uvstream; @@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->uvstream->data = stream; } - stream->data = data; stream->internal_data = NULL; stream->fpos = 0; stream->curmem = 0; @@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, stream->num_bytes = 0; } -void stream_close(Stream *stream, stream_close_cb on_stream_close) +void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) FUNC_ATTR_NONNULL_ARG(1) { assert(!stream->closed); stream->closed = true; stream->close_cb = on_stream_close; + stream->close_cb_data = data; if (!stream->pending_reqs) { stream_close_handle(stream); @@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle) rbuffer_free(stream->buffer); } if (stream->close_cb) { - stream->close_cb(stream, stream->data); + stream->close_cb(stream, stream->close_cb_data); } if (stream->internal_close_cb) { stream->internal_close_cb(stream, stream->internal_data); diff --git a/src/nvim/event/stream.h b/src/nvim/event/stream.h index ad4e24775b..a176fac1c0 100644 --- a/src/nvim/event/stream.h +++ b/src/nvim/event/stream.h @@ -44,13 +44,14 @@ struct stream { uv_file fd; stream_read_cb read_cb; stream_write_cb write_cb; + void *cb_data; stream_close_cb close_cb, internal_close_cb; + void *close_cb_data, *internal_data; size_t fpos; size_t curmem; size_t maxmem; size_t pending_reqs; size_t num_bytes; - void *data, *internal_data; bool closed; Queue *events; }; diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c index 8028e35e6b..fc7aad8eb9 100644 --- a/src/nvim/event/wstream.c +++ b/src/nvim/event/wstream.c @@ -22,19 +22,17 @@ typedef struct { # include "event/wstream.c.generated.h" #endif -void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, - void *data) +void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(loop, stream, fd, NULL, data); + stream_init(loop, stream, fd, NULL); wstream_init(stream, maxmem); } -void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, - void *data) +void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) { - stream_init(NULL, stream, -1, uvstream, data); + stream_init(NULL, stream, -1, uvstream); wstream_init(stream, maxmem); } @@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem) /// /// @param stream The `Stream` instance /// @param cb The callback -void wstream_set_write_cb(Stream *stream, stream_write_cb cb) - FUNC_ATTR_NONNULL_ALL +void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data) + FUNC_ATTR_NONNULL_ARG(1, 2) { stream->write_cb = cb; + stream->cb_data = data; } /// Queues data for writing to the backing file descriptor of a `Stream` @@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status) wstream_release_wbuffer(data->buffer); if (data->stream->write_cb) { - data->stream->write_cb(data->stream, data->stream->data, status); + data->stream->write_cb(data->stream, data->stream->cb_data, status); } data->stream->pending_reqs--; diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 5b249ee1c7..c3378783f2 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -136,9 +136,9 @@ uint64_t channel_from_process(char **argv) incref(channel); // process channels are only closed by the exit_cb wstream_init(proc->in, 0); rstream_init(proc->out, 0); - rstream_start(proc->out, parse_msgpack); + rstream_start(proc->out, parse_msgpack, channel); rstream_init(proc->err, 0); - rstream_start(proc->err, forward_stderr); + rstream_start(proc->err, forward_stderr, channel); return channel->id; } @@ -149,13 +149,13 @@ uint64_t channel_from_process(char **argv) void channel_from_connection(SocketWatcher *watcher) { Channel *channel = register_channel(kChannelTypeSocket); - socket_watcher_accept(watcher, &channel->data.stream, channel); + socket_watcher_accept(watcher, &channel->data.stream); incref(channel); // close channel only after the stream is closed channel->data.stream.internal_close_cb = close_cb; channel->data.stream.internal_data = channel; wstream_init(&channel->data.stream, 0); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); - rstream_start(&channel->data.stream, parse_msgpack); + rstream_start(&channel->data.stream, parse_msgpack, channel); } /// Sends event/arguments to channel @@ -317,11 +317,10 @@ void channel_from_stdio(void) Channel *channel = register_channel(kChannelTypeStdio); incref(channel); // stdio channels are only closed on exit // read stream - rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, - channel); - rstream_start(&channel->data.std.in, parse_msgpack); + rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); + rstream_start(&channel->data.std.in, parse_msgpack, channel); // write stream - wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL); + wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); } static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, @@ -637,7 +636,7 @@ static void close_channel(Channel *channel) switch (channel->type) { case kChannelTypeSocket: - stream_close(&channel->data.stream, NULL); + stream_close(&channel->data.stream, NULL, NULL); break; case kChannelTypeProc: if (!channel->data.process.uvproc.process.closed) { @@ -645,8 +644,8 @@ static void close_channel(Channel *channel) } break; case kChannelTypeStdio: - stream_close(&channel->data.std.in, NULL); - stream_close(&channel->data.std.out, NULL); + stream_close(&channel->data.std.in, NULL, NULL); + stream_close(&channel->data.std.out, NULL, NULL); queue_put(main_loop.fast_events, exit_event, 1, channel); return; default: diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index a4e01b18cd..c0c73364c0 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -60,8 +60,8 @@ void input_start(int fd) } global_fd = fd; - rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL); - rstream_start(&read_stream, read_cb); + rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE); + rstream_start(&read_stream, read_cb, NULL); } void input_stop(void) @@ -71,7 +71,7 @@ void input_stop(void) } rstream_stop(&read_stream); - stream_close(&read_stream, NULL); + stream_close(&read_stream, NULL, NULL); } static void cursorhold_event(void **argv) diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c index 64c673930a..ba52b9f661 100644 --- a/src/nvim/os/shell.c +++ b/src/nvim/os/shell.c @@ -236,10 +236,10 @@ static int do_os_system(char **argv, } proc->out->events = NULL; rstream_init(proc->out, 0); - rstream_start(proc->out, data_cb); + rstream_start(proc->out, data_cb, &buf); proc->err->events = NULL; rstream_init(proc->err, 0); - rstream_start(proc->err, data_cb); + rstream_start(proc->err, data_cb, &buf); // write the input, if any if (input) { @@ -251,7 +251,7 @@ static int do_os_system(char **argv, return -1; } // close the input stream after everything is written - wstream_set_write_cb(&in, shell_write_cb); + wstream_set_write_cb(&in, shell_write_cb, NULL); } // invoke busy_start here so event_poll_until wont change the busy state for @@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer, static void shell_write_cb(Stream *stream, void *data, int status) { - stream_close(stream, NULL); + stream_close(stream, NULL, NULL); } diff --git a/src/nvim/tui/input.c b/src/nvim/tui/input.c index be256f3ebc..3ef4d34c9a 100644 --- a/src/nvim/tui/input.c +++ b/src/nvim/tui/input.c @@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop) int curflags = termkey_get_canonflags(input->tk); termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS); // setup input handle - rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input); + rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff); // initialize a timer handle for handling ESC with libtermkey time_watcher_init(loop, &input->timer_handle, input); } @@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input) uv_mutex_destroy(&input->key_buffer_mutex); uv_cond_destroy(&input->key_buffer_cond); time_watcher_close(&input->timer_handle, NULL); - stream_close(&input->read_stream, NULL); + stream_close(&input->read_stream, NULL, NULL); termkey_destroy(input->tk); } void term_input_start(TermInput *input) { - rstream_start(&input->read_stream, read_cb); + rstream_start(&input->read_stream, read_cb, input); } void term_input_stop(TermInput *input) @@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, // // ls *.md | xargs nvim input->in_fd = 2; - stream_close(&input->read_stream, NULL); + stream_close(&input->read_stream, NULL, NULL); queue_put(input->loop->fast_events, restart_reading, 1, input); } else { loop_schedule(&main_loop, event_create(1, input_done_event, 0)); @@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, static void restart_reading(void **argv) { TermInput *input = argv[0]; - rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input); - rstream_start(&input->read_stream, read_cb); + rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff); + rstream_start(&input->read_stream, read_cb, input); } |