From c13c50b752dca322a5ec77dea6188c9e3694549b Mon Sep 17 00:00:00 2001 From: bfredl Date: Thu, 30 May 2024 12:59:02 +0200 Subject: refactor(io): separate types for read and write streams This is a structural refactor with no logical changes, yet. Done in preparation for simplifying rstream/rbuffer which will require more state inline in RStream. The initial idea was to have RStream and WStream as sub-types symetrically but that doesn't work, as sockets are both reading and writing. Also there is very little write-specific state to start with, so the benefit of a separate WStream struct is a lot smaller. Just document what fields in `Stream` are write specific. --- src/nvim/event/process.c | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) (limited to 'src/nvim/event/process.c') diff --git a/src/nvim/event/process.c b/src/nvim/event/process.c index 7460e92766..710376cd62 100644 --- a/src/nvim/event/process.c +++ b/src/nvim/event/process.c @@ -8,7 +8,9 @@ #include "nvim/event/loop.h" #include "nvim/event/multiqueue.h" #include "nvim/event/process.h" +#include "nvim/event/rstream.h" #include "nvim/event/stream.h" +#include "nvim/event/wstream.h" #include "nvim/globals.h" #include "nvim/log.h" #include "nvim/main.h" @@ -51,15 +53,15 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0); } else { - proc->out.closed = true; + proc->out.s.closed = true; } if (err) { - uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); + uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0); } else { - proc->err.closed = true; + proc->err.s.closed = true; } #ifdef USE_GCOV @@ -82,10 +84,10 @@ int process_spawn(Process *proc, bool in, bool out, bool err) uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); } if (out) { - uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL); } if (err) { - uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); + uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL); } if (proc->type == kProcessTypeUv) { @@ -106,16 +108,16 @@ int process_spawn(Process *proc, bool in, bool out, bool err) } if (out) { - stream_init(NULL, &proc->out, -1, (uv_stream_t *)&proc->out.uv.pipe); - proc->out.internal_data = proc; - proc->out.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe); + proc->out.s.internal_data = proc; + proc->out.s.internal_close_cb = on_process_stream_close; proc->refcount++; } if (err) { - stream_init(NULL, &proc->err, -1, (uv_stream_t *)&proc->err.uv.pipe); - proc->err.internal_data = proc; - proc->err.internal_close_cb = on_process_stream_close; + stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe); + proc->err.s.internal_data = proc; + proc->err.s.internal_close_cb = on_process_stream_close; proc->refcount++; } @@ -148,9 +150,9 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL { - stream_may_close(&proc->in); - stream_may_close(&proc->out); - stream_may_close(&proc->err); + wstream_may_close(&proc->in); + rstream_may_close(&proc->out); + rstream_may_close(&proc->err); } /// Synchronously wait for a process to finish @@ -337,10 +339,10 @@ static void process_close(Process *proc) /// /// @param proc Process, for which an output stream should be flushed. /// @param stream Stream to flush. -static void flush_stream(Process *proc, Stream *stream) +static void flush_stream(Process *proc, RStream *stream) FUNC_ATTR_NONNULL_ARG(1) { - if (!stream || stream->closed) { + if (!stream || stream->s.closed) { return; } @@ -350,7 +352,7 @@ static void flush_stream(Process *proc, Stream *stream) // keeps sending data, we only accept as much data as the system buffer size. // Otherwise this would block cleanup/teardown. int system_buffer_size = 0; - int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe, + int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe, &system_buffer_size); if (err) { system_buffer_size = (int)rbuffer_capacity(stream->buffer); @@ -359,14 +361,14 @@ static void flush_stream(Process *proc, Stream *stream) size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; // Read remaining data. - while (!stream->closed && stream->num_bytes < max_bytes) { + while (!stream->s.closed && stream->num_bytes < max_bytes) { // Remember number of bytes before polling size_t num_bytes = stream->num_bytes; // Poll for data and process the generated events. loop_poll_events(proc->loop, 0); - if (stream->events) { - multiqueue_process_events(stream->events); + if (stream->s.events) { + multiqueue_process_events(stream->s.events); } // Stream can be closed if it is empty. @@ -374,7 +376,7 @@ static void flush_stream(Process *proc, Stream *stream) if (stream->read_cb && !stream->did_eof) { // Stream callback could miss EOF handling if a child keeps the stream // open. But only send EOF if we haven't already. - stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); + stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true); } break; } -- cgit