aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/os')
-rw-r--r--src/nvim/os/event.c177
-rw-r--r--src/nvim/os/event.h35
-rw-r--r--src/nvim/os/event_defs.h17
-rw-r--r--src/nvim/os/input.c40
-rw-r--r--src/nvim/os/job.c467
-rw-r--r--src/nvim/os/job.h21
-rw-r--r--src/nvim/os/job_defs.h63
-rw-r--r--src/nvim/os/job_private.h118
-rw-r--r--src/nvim/os/os.h1
-rw-r--r--src/nvim/os/pipe_process.c110
-rw-r--r--src/nvim/os/pipe_process.h7
-rw-r--r--src/nvim/os/pty_process.c257
-rw-r--r--src/nvim/os/pty_process.h7
-rw-r--r--src/nvim/os/rstream.c253
-rw-r--r--src/nvim/os/rstream.h13
-rw-r--r--src/nvim/os/rstream_defs.h20
-rw-r--r--src/nvim/os/shell.c62
-rw-r--r--src/nvim/os/signal.c51
-rw-r--r--src/nvim/os/signal.h2
-rw-r--r--src/nvim/os/stream.c30
-rw-r--r--src/nvim/os/time.c4
-rw-r--r--src/nvim/os/uv_helpers.c98
-rw-r--r--src/nvim/os/uv_helpers.h13
-rw-r--r--src/nvim/os/wstream.c243
-rw-r--r--src/nvim/os/wstream.h13
-rw-r--r--src/nvim/os/wstream_defs.h19
26 files changed, 77 insertions, 2064 deletions
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
deleted file mode 100644
index 56874b495d..0000000000
--- a/src/nvim/os/event.c
+++ /dev/null
@@ -1,177 +0,0 @@
-#include <assert.h>
-#include <stdint.h>
-#include <stdbool.h>
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/os/event.h"
-#include "nvim/os/input.h"
-#include "nvim/msgpack_rpc/defs.h"
-#include "nvim/msgpack_rpc/channel.h"
-#include "nvim/msgpack_rpc/server.h"
-#include "nvim/msgpack_rpc/helpers.h"
-#include "nvim/os/signal.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/job.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-#include "nvim/misc2.h"
-#include "nvim/ui.h"
-#include "nvim/screen.h"
-#include "nvim/terminal.h"
-
-#include "nvim/lib/klist.h"
-
-// event will be cleaned up after it gets processed
-#define _destroy_event(x) // do nothing
-KLIST_INIT(Event, Event, _destroy_event)
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/event.c.generated.h"
-#endif
-// deferred_events: Events that should be processed as the K_EVENT special key
-// immediate_events: Events that should be processed after exiting libuv event
-// loop(to avoid recursion), but before returning from
-// `event_poll`
-static klist_t(Event) *deferred_events = NULL, *immediate_events = NULL;
-static int deferred_events_allowed = 0;
-
-void event_init(void)
-{
- // Initialize the event queues
- deferred_events = kl_init(Event);
- immediate_events = kl_init(Event);
- // early msgpack-rpc initialization
- msgpack_rpc_init_method_table();
- msgpack_rpc_helpers_init();
- // Initialize input events
- input_init();
- // Timer to wake the event loop if a timeout argument is passed to
- // `event_poll`
- // Signals
- signal_init();
- // Jobs
- job_init();
- // finish mspgack-rpc initialization
- channel_init();
- server_init();
- terminal_init();
-}
-
-void event_teardown(void)
-{
- if (!deferred_events) {
- // Not initialized(possibly a --version invocation)
- return;
- }
-
- process_events_from(immediate_events);
- process_events_from(deferred_events);
- input_stop();
- channel_teardown();
- job_teardown();
- server_teardown();
- signal_teardown();
- terminal_teardown();
-
- // this last `uv_run` will return after all handles are stopped, it will
- // also take care of finishing any uv_close calls made by other *_teardown
- // functions.
- do {
- uv_run(uv_default_loop(), UV_RUN_DEFAULT);
- } while (uv_loop_close(uv_default_loop()));
-}
-
-// Wait for some event
-void event_poll(int ms)
-{
- static int recursive = 0;
-
- if (recursive++) {
- abort(); // Should not re-enter uv_run
- }
-
- uv_run_mode run_mode = UV_RUN_ONCE;
- uv_timer_t timer;
-
- if (ms > 0) {
- uv_timer_init(uv_default_loop(), &timer);
- // Use a repeating timeout of ms milliseconds to make sure
- // we do not block indefinitely for I/O.
- uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
- } else if (ms == 0) {
- // For ms == 0, we need to do a non-blocking event poll by
- // setting the run mode to UV_RUN_NOWAIT.
- run_mode = UV_RUN_NOWAIT;
- }
-
- loop(run_mode);
-
- if (ms > 0) {
- // Ensure the timer handle is closed and run the event loop
- // once more to let libuv perform it's cleanup
- uv_timer_stop(&timer);
- uv_close((uv_handle_t *)&timer, NULL);
- loop(UV_RUN_NOWAIT);
- }
-
- recursive--; // Can re-enter uv_run now
-
- // In case this is run before event_init, don't process any events.
- if (immediate_events) {
- process_events_from(immediate_events);
- }
-}
-
-bool event_has_deferred(void)
-{
- return deferred_events_allowed && !kl_empty(deferred_events);
-}
-
-void event_enable_deferred(void)
-{
- ++deferred_events_allowed;
-}
-
-void event_disable_deferred(void)
-{
- --deferred_events_allowed;
-}
-
-// Queue an event
-void event_push(Event event, bool deferred)
-{
- // Sometimes libuv will run pending callbacks(timer for example) before
- // blocking for a poll. If this happens and the callback pushes a event to one
- // of the queues, the event would only be processed after the poll
- // returns(user hits a key for example). To avoid this scenario, we call
- // uv_stop when a event is enqueued.
- uv_stop(uv_default_loop());
- kl_push(Event, deferred ? deferred_events : immediate_events, event);
-}
-
-void event_process(void)
-{
- process_events_from(deferred_events);
-}
-
-static void process_events_from(klist_t(Event) *queue)
-{
- while (!kl_empty(queue)) {
- Event event = kl_shift(Event, queue);
- event.handler(event);
- }
-}
-
-static void timer_cb(uv_timer_t *handle)
-{
-}
-
-static void loop(uv_run_mode run_mode)
-{
- DLOG("Enter event loop");
- uv_run(uv_default_loop(), run_mode);
- DLOG("Exit event loop");
-}
diff --git a/src/nvim/os/event.h b/src/nvim/os/event.h
deleted file mode 100644
index db02b38c7f..0000000000
--- a/src/nvim/os/event.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#ifndef NVIM_OS_EVENT_H
-#define NVIM_OS_EVENT_H
-
-#include <stdint.h>
-#include <stdbool.h>
-
-#include "nvim/os/event_defs.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/time.h"
-
-// Poll for events until a condition or timeout
-#define event_poll_until(timeout, condition) \
- do { \
- int remaining = timeout; \
- uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
- while (!(condition)) { \
- event_poll(remaining); \
- if (remaining == 0) { \
- break; \
- } else if (remaining > 0) { \
- uint64_t now = os_hrtime(); \
- remaining -= (int) ((now - before) / 1000000); \
- before = now; \
- if (remaining <= 0) { \
- break; \
- } \
- } \
- } \
- } while (0)
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/event.h.generated.h"
-#endif
-
-#endif // NVIM_OS_EVENT_H
diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h
deleted file mode 100644
index 2dd9403d9f..0000000000
--- a/src/nvim/os/event_defs.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef NVIM_OS_EVENT_DEFS_H
-#define NVIM_OS_EVENT_DEFS_H
-
-#include <stdbool.h>
-
-#include "nvim/os/job_defs.h"
-#include "nvim/os/rstream_defs.h"
-
-typedef struct event Event;
-typedef void (*event_handler)(Event event);
-
-struct event {
- void *data;
- event_handler handler;
-};
-
-#endif // NVIM_OS_EVENT_DEFS_H
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index 726335bd9a..b0e0f57e60 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -6,9 +6,8 @@
#include "nvim/api/private/defs.h"
#include "nvim/os/input.h"
-#include "nvim/os/event.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/rstream.h"
+#include "nvim/event/loop.h"
+#include "nvim/event/rstream.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
#include "nvim/ui.h"
@@ -30,8 +29,8 @@ typedef enum {
kInputEof
} InbufPollResult;
-static RStream *read_stream = NULL;
-static RBuffer *read_buffer = NULL, *input_buffer = NULL;
+static Stream read_stream = {.closed = true};
+static RBuffer *input_buffer = NULL;
static bool input_eof = false;
static int global_fd = 0;
@@ -54,26 +53,23 @@ int input_global_fd(void)
void input_start(int fd)
{
- if (read_stream) {
+ if (!read_stream.closed) {
return;
}
global_fd = fd;
- read_buffer = rbuffer_new(READ_BUFFER_SIZE);
- read_stream = rstream_new(read_cb, read_buffer, NULL);
- rstream_set_file(read_stream, fd);
- rstream_start(read_stream);
+ rstream_init_fd(&loop, &read_stream, fd, READ_BUFFER_SIZE, NULL);
+ rstream_start(&read_stream, read_cb);
}
void input_stop(void)
{
- if (!read_stream) {
+ if (read_stream.closed) {
return;
}
- rstream_stop(read_stream);
- rstream_free(read_stream);
- read_stream = NULL;
+ rstream_stop(&read_stream);
+ stream_close(&read_stream, NULL);
}
// Low level input function
@@ -115,7 +111,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
}
// If there are deferred events, return the keys directly
- if (event_has_deferred()) {
+ if (loop_has_deferred_events(&loop)) {
return push_event_key(buf, maxlen);
}
@@ -136,7 +132,7 @@ bool os_char_avail(void)
void os_breakcheck(void)
{
if (!disable_breakcheck && !got_int) {
- event_poll(0);
+ loop_poll_events(&loop, 0);
}
}
@@ -285,7 +281,7 @@ static bool input_poll(int ms)
prof_inchar_enter();
}
- event_poll_until(ms, input_ready() || input_eof);
+ LOOP_POLL_EVENTS_UNTIL(&loop, ms, input_ready() || input_eof);
if (do_profiling == PROF_YES && ms) {
prof_inchar_exit();
@@ -309,16 +305,16 @@ static InbufPollResult inbuf_poll(int ms)
return input_eof ? kInputEof : kInputNone;
}
-static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool at_eof)
+static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof)
{
if (at_eof) {
input_eof = true;
}
- assert(rbuffer_space(input_buffer) >= rbuffer_size(read_buffer));
- RBUFFER_UNTIL_EMPTY(read_buffer, ptr, len) {
+ assert(rbuffer_space(input_buffer) >= rbuffer_size(buf));
+ RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
(void)rbuffer_write(input_buffer, ptr, len);
- rbuffer_consumed(read_buffer, len);
+ rbuffer_consumed(buf, len);
}
}
@@ -362,7 +358,7 @@ static bool input_ready(void)
{
return typebuf_was_filled || // API call filled typeahead
rbuffer_size(input_buffer) || // Input buffer filled
- event_has_deferred(); // Events must be processed
+ loop_has_deferred_events(&loop); // Events must be processed
}
// Exit because of an input read error.
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
deleted file mode 100644
index f9bde21361..0000000000
--- a/src/nvim/os/job.c
+++ /dev/null
@@ -1,467 +0,0 @@
-#include <stdint.h>
-#include <stdbool.h>
-
-#include <uv.h>
-
-#include "nvim/os/uv_helpers.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/job_private.h"
-#include "nvim/os/pty_process.h"
-#include "nvim/os/rstream.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
-#include "nvim/os/event.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/time.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-
-#ifdef HAVE_SYS_WAIT_H
-# include <sys/wait.h>
-#endif
-
-// {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit
-// before we send SIGNAL to it
-#define TERM_TIMEOUT 1000000000
-#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
-#define JOB_BUFFER_SIZE 0xFFFF
-
-#define close_job_stream(job, stream, type) \
- do { \
- if (job->stream) { \
- type##stream_free(job->stream); \
- job->stream = NULL; \
- if (!uv_is_closing((uv_handle_t *)job->proc_std##stream)) { \
- uv_close((uv_handle_t *)job->proc_std##stream, close_cb); \
- } \
- } \
- } while (0)
-
-#define close_job_in(job) close_job_stream(job, in, w)
-#define close_job_out(job) close_job_stream(job, out, r)
-#define close_job_err(job) close_job_stream(job, err, r)
-
-Job *table[MAX_RUNNING_JOBS] = {NULL};
-size_t stop_requests = 0;
-uv_timer_t job_stop_timer;
-uv_signal_t schld;
-
-// Some helpers shared in this module
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/job.c.generated.h"
-#endif
-// Callbacks for libuv
-
-/// Initializes job control resources
-void job_init(void)
-{
- uv_disable_stdio_inheritance();
- uv_timer_init(uv_default_loop(), &job_stop_timer);
- uv_signal_init(uv_default_loop(), &schld);
- uv_signal_start(&schld, chld_handler, SIGCHLD);
-}
-
-/// Releases job control resources and terminates running jobs
-void job_teardown(void)
-{
- // Stop all jobs
- for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
- Job *job;
- if ((job = table[i]) != NULL) {
- uv_kill(job->pid, SIGTERM);
- job->term_sent = true;
- job_stop(job);
- }
- }
-
- // Wait until all jobs are closed
- event_poll_until(-1, !stop_requests);
- uv_signal_stop(&schld);
- uv_close((uv_handle_t *)&schld, NULL);
- // Close the timer
- uv_close((uv_handle_t *)&job_stop_timer, NULL);
-}
-
-/// Tries to start a new job.
-///
-/// @param[out] status The job id if the job started successfully, 0 if the job
-/// table is full, -1 if the program could not be executed.
-/// @return The job pointer if the job started successfully, NULL otherwise
-Job *job_start(JobOptions opts, int *status)
-{
- int i;
- Job *job;
-
- // Search for a free slot in the table
- for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if (table[i] == NULL) {
- break;
- }
- }
-
- if (i == MAX_RUNNING_JOBS) {
- // No free slots
- shell_free_argv(opts.argv);
- *status = 0;
- return NULL;
- }
-
- job = xmalloc(sizeof(Job));
- // Initialize
- job->id = i + 1;
- *status = job->id;
- job->status = -1;
- job->refcount = 1;
- job->stopped_time = 0;
- job->term_sent = false;
- job->in = NULL;
- job->out = NULL;
- job->err = NULL;
- job->opts = opts;
- job->closed = false;
-
- process_init(job);
-
- if (opts.writable) {
- handle_set_job((uv_handle_t *)job->proc_stdin, job);
- job->refcount++;
- }
-
- if (opts.stdout_cb) {
- handle_set_job((uv_handle_t *)job->proc_stdout, job);
- job->refcount++;
- }
-
- if (opts.stderr_cb) {
- handle_set_job((uv_handle_t *)job->proc_stderr, job);
- job->refcount++;
- }
-
- // Spawn the job
- if (!process_spawn(job)) {
- if (opts.writable) {
- uv_close((uv_handle_t *)job->proc_stdin, close_cb);
- }
- if (opts.stdout_cb) {
- uv_close((uv_handle_t *)job->proc_stdout, close_cb);
- }
- if (opts.stderr_cb) {
- uv_close((uv_handle_t *)job->proc_stderr, close_cb);
- }
- process_close(job);
- event_poll(0);
- // Manually invoke the close_cb to free the job resources
- *status = -1;
- return NULL;
- }
-
- if (opts.writable) {
- job->in = wstream_new(opts.maxmem);
- wstream_set_stream(job->in, job->proc_stdin);
- }
-
- // Start the readable streams
- if (opts.stdout_cb) {
- job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
- rstream_set_stream(job->out, job->proc_stdout);
- rstream_start(job->out);
- }
-
- if (opts.stderr_cb) {
- job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
- rstream_set_stream(job->err, job->proc_stderr);
- rstream_start(job->err);
- }
- // Save the job to the table
- table[i] = job;
-
- return job;
-}
-
-/// Finds a job instance by id
-///
-/// @param id The job id
-/// @return the Job instance
-Job *job_find(int id)
-{
- Job *job;
-
- if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
- || job->stopped_time) {
- return NULL;
- }
-
- return job;
-}
-
-/// Terminates a job. This is a non-blocking operation, but if the job exists
-/// it's guaranteed to succeed(SIGKILL will eventually be sent)
-///
-/// @param job The Job instance
-void job_stop(Job *job)
-{
- if (job->stopped_time) {
- return;
- }
-
- job->stopped_time = os_hrtime();
- if (job->opts.pty) {
- // close all streams for pty jobs to send SIGHUP to the process
- job_close_streams(job);
- pty_process_close_master(job);
- } else {
- // Close the job's stdin. If the job doesn't close its own stdout/stderr,
- // they will be closed when the job exits(possibly due to being terminated
- // after a timeout)
- close_job_in(job);
- }
-
- if (!stop_requests++) {
- // When there's at least one stop request pending, start a timer that
- // will periodically check if a signal should be send to a to the job
- DLOG("Starting job kill timer");
- uv_timer_start(&job_stop_timer, job_stop_timer_cb, 100, 100);
- }
-}
-
-/// job_wait - synchronously wait for a job to finish
-///
-/// @param job The job instance
-/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
-/// waiting until the job quits.
-/// @return returns the status code of the exited job. -1 if the job is
-/// still running and the `timeout` has expired. Note that this is
-/// indistinguishable from the process returning -1 by itself. Which
-/// is possible on some OS. Returns -2 if the job was interrupted.
-int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
-{
- // The default status is -1, which represents a timeout
- int status = -1;
- bool interrupted = false;
-
- // Increase refcount to stop the job from being freed before we have a
- // chance to get the status.
- job->refcount++;
- event_poll_until(ms,
- // Until...
- got_int || // interrupted by the user
- job->refcount == 1); // job exited
-
- // we'll assume that a user frantically hitting interrupt doesn't like
- // the current job. Signal that it has to be killed.
- if (got_int) {
- interrupted = true;
- got_int = false;
- job_stop(job);
- if (ms == -1) {
- // We can only return, if all streams/handles are closed and the job
- // exited.
- event_poll_until(-1, job->refcount == 1);
- } else {
- event_poll(0);
- }
- }
-
- if (job->refcount == 1) {
- // Job exited, collect status and manually invoke close_cb to free the job
- // resources
- status = interrupted ? -2 : job->status;
- job_close_streams(job);
- job_decref(job);
- } else {
- job->refcount--;
- }
-
- return status;
-}
-
-/// Close the pipe used to write to the job.
-///
-/// This can be used for example to indicate to the job process that no more
-/// input is coming, and that it should shut down cleanly.
-///
-/// It has no effect when the input pipe doesn't exist or was already
-/// closed.
-///
-/// @param job The job instance
-void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- close_job_in(job);
-}
-
-// Close the job stdout stream.
-void job_close_out(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- close_job_out(job);
-}
-
-// Close the job stderr stream.
-void job_close_err(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- close_job_out(job);
-}
-
-/// All writes that complete after calling this function will be reported
-/// to `cb`.
-///
-/// Use this function to be notified about the status of an in-flight write.
-///
-/// @see {wstream_set_write_cb}
-///
-/// @param job The job instance
-/// @param cb The function that will be called on write completion or
-/// failure. It will be called with the job as the `data` argument.
-void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL
-{
- wstream_set_write_cb(job->in, cb, job);
-}
-
-/// Writes data to the job's stdin. This is a non-blocking operation, it
-/// returns when the write request was sent.
-///
-/// @param job The Job instance
-/// @param buffer The buffer which contains the data to be written
-/// @return true if the write request was successfully sent, false if writing
-/// to the job stream failed (possibly because the OS buffer is full)
-bool job_write(Job *job, WBuffer *buffer)
-{
- return wstream_write(job->in, buffer);
-}
-
-/// Get the job id
-///
-/// @param job A pointer to the job
-/// @return The job id
-int job_id(Job *job)
-{
- return job->id;
-}
-
-// Get the job pid
-int job_pid(Job *job)
-{
- return job->pid;
-}
-
-/// Get data associated with a job
-///
-/// @param job A pointer to the job
-/// @return The job data
-void *job_data(Job *job)
-{
- return job->opts.data;
-}
-
-/// Resize the window for a pty job
-bool job_resize(Job *job, uint16_t width, uint16_t height)
-{
- if (!job->opts.pty) {
- return false;
- }
- pty_process_resize(job, width, height);
- return true;
-}
-
-void job_close_streams(Job *job)
-{
- close_job_in(job);
- close_job_out(job);
- close_job_err(job);
-}
-
-JobOptions *job_opts(Job *job)
-{
- return &job->opts;
-}
-
-/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
-/// that didn't die from SIGTERM after a while(exit_timeout is 0).
-static void job_stop_timer_cb(uv_timer_t *handle)
-{
- Job *job;
- uint64_t now = os_hrtime();
-
- for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL || !job->stopped_time) {
- continue;
- }
-
- uint64_t elapsed = now - job->stopped_time;
-
- if (!job->term_sent && elapsed >= TERM_TIMEOUT) {
- ILOG("Sending SIGTERM to job(id: %d)", job->id);
- uv_kill(job->pid, SIGTERM);
- job->term_sent = true;
- } else if (elapsed >= KILL_TIMEOUT) {
- ILOG("Sending SIGKILL to job(id: %d)", job->id);
- uv_kill(job->pid, SIGKILL);
- process_close(job);
- }
- }
-}
-
-// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
-static void read_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
-{
- Job *job = data;
-
- if (rstream == job->out) {
- job->opts.stdout_cb(rstream, buf, data, eof);
- if (eof) {
- close_job_out(job);
- }
- } else {
- job->opts.stderr_cb(rstream, buf, data, eof);
- if (eof) {
- close_job_err(job);
- }
- }
-}
-
-static void close_cb(uv_handle_t *handle)
-{
- job_decref(handle_get_job(handle));
-}
-
-static void job_exited(Event event)
-{
- Job *job = event.data;
- process_close(job);
-}
-
-static void chld_handler(uv_signal_t *handle, int signum)
-{
- int stat = 0;
- int pid;
-
- do {
- pid = waitpid(-1, &stat, WNOHANG);
- } while (pid < 0 && errno == EINTR);
-
- if (pid <= 0) {
- return;
- }
-
- Job *job = NULL;
- // find the job corresponding to the exited pid
- for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) != NULL && job->pid == pid) {
- if (WIFEXITED(stat)) {
- job->status = WEXITSTATUS(stat);
- } else if (WIFSIGNALED(stat)) {
- job->status = WTERMSIG(stat);
- }
- if (exiting) {
- // don't enqueue more events when exiting
- process_close(job);
- } else {
- event_push((Event) {.handler = job_exited, .data = job}, false);
- }
- break;
- }
- }
-}
-
diff --git a/src/nvim/os/job.h b/src/nvim/os/job.h
deleted file mode 100644
index e0ca615626..0000000000
--- a/src/nvim/os/job.h
+++ /dev/null
@@ -1,21 +0,0 @@
-// Job is a short name we use to refer to child processes that run in parallel
-// with the editor, probably executing long-running tasks and sending updates
-// asynchronously. Communication happens through anonymous pipes connected to
-// the job's std{in,out,err}. They are more like bash/zsh co-processes than the
-// usual shell background job. The name 'Job' was chosen because it applies to
-// the concept while being significantly shorter.
-#ifndef NVIM_OS_JOB_H
-#define NVIM_OS_JOB_H
-
-#include <stdint.h>
-#include <stdbool.h>
-
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/job.h.generated.h"
-#endif
-#endif // NVIM_OS_JOB_H
diff --git a/src/nvim/os/job_defs.h b/src/nvim/os/job_defs.h
deleted file mode 100644
index 7fee900ac0..0000000000
--- a/src/nvim/os/job_defs.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef NVIM_OS_JOB_DEFS_H
-#define NVIM_OS_JOB_DEFS_H
-
-#include <uv.h>
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream_defs.h"
-
-#define MAX_RUNNING_JOBS 100
-typedef struct job Job;
-
-/// Function called when the job reads data
-///
-/// @param id The job id
-/// @param data Some data associated with the job by the caller
-typedef void (*job_exit_cb)(Job *job, int status, void *data);
-
-// Job startup options
-// job_exit_cb Callback that will be invoked when the job exits
-// maxmem Maximum amount of memory used by the job WStream
-typedef struct {
- // Argument vector for the process. The first item is the
- // executable to run.
- // [consumed]
- char **argv;
- // Caller data that will be associated with the job
- void *data;
- // If true the job stdin will be available for writing with job_write,
- // otherwise it will be redirected to /dev/null
- bool writable;
- // Callback that will be invoked when data is available on stdout. If NULL
- // stdout will be redirected to /dev/null.
- rstream_cb stdout_cb;
- // Callback that will be invoked when data is available on stderr. If NULL
- // stderr will be redirected to /dev/null.
- rstream_cb stderr_cb;
- // Callback that will be invoked when the job has exited and will not send
- // data
- job_exit_cb exit_cb;
- // Maximum memory used by the job's WStream
- size_t maxmem;
- // Connect the job to a pseudo terminal
- bool pty;
- // Initial window dimensions if the job is connected to a pseudo terminal
- uint16_t width, height;
- // Value for the $TERM environment variable. A default value of "ansi" is
- // assumed if NULL
- char *term_name;
-} JobOptions;
-
-#define JOB_OPTIONS_INIT ((JobOptions) { \
- .argv = NULL, \
- .data = NULL, \
- .writable = true, \
- .stdout_cb = NULL, \
- .stderr_cb = NULL, \
- .exit_cb = NULL, \
- .maxmem = 0, \
- .pty = false, \
- .width = 80, \
- .height = 24, \
- .term_name = NULL \
- })
-#endif // NVIM_OS_JOB_DEFS_H
diff --git a/src/nvim/os/job_private.h b/src/nvim/os/job_private.h
deleted file mode 100644
index 983106d918..0000000000
--- a/src/nvim/os/job_private.h
+++ /dev/null
@@ -1,118 +0,0 @@
-#ifndef NVIM_OS_JOB_PRIVATE_H
-#define NVIM_OS_JOB_PRIVATE_H
-
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/wstream_defs.h"
-#include "nvim/os/pipe_process.h"
-#include "nvim/os/pty_process.h"
-#include "nvim/os/shell.h"
-#include "nvim/log.h"
-#include "nvim/memory.h"
-
-struct job {
- // Job id the index in the job table plus one.
- int id;
- // Process id
- int pid;
- // Exit status code of the job process
- int status;
- // Number of references to the job. The job resources will only be freed by
- // close_cb when this is 0
- int refcount;
- // Time when job_stop was called for the job.
- uint64_t stopped_time;
- // If SIGTERM was already sent to the job(only send one before SIGKILL)
- bool term_sent;
- // Readable streams(std{out,err})
- RStream *out, *err;
- // Writable stream(stdin)
- WStream *in;
- // Libuv streams representing stdin/stdout/stderr
- uv_stream_t *proc_stdin, *proc_stdout, *proc_stderr;
- // Extra data set by the process spawner
- void *process;
- // If process_close has been called on this job
- bool closed;
- // Startup options
- JobOptions opts;
-};
-
-extern Job *table[];
-extern size_t stop_requests;
-extern uv_timer_t job_stop_timer;
-
-static inline bool process_spawn(Job *job)
-{
- return job->opts.pty ? pty_process_spawn(job) : pipe_process_spawn(job);
-}
-
-static inline void process_init(Job *job)
-{
- if (job->opts.pty) {
- pty_process_init(job);
- } else {
- pipe_process_init(job);
- }
-}
-
-static inline void process_close(Job *job)
-{
- if (job->closed) {
- return;
- }
- job->closed = true;
- if (job->opts.pty) {
- pty_process_close(job);
- } else {
- pipe_process_close(job);
- }
-}
-
-static inline void process_destroy(Job *job)
-{
- if (job->opts.pty) {
- pty_process_destroy(job);
- } else {
- pipe_process_destroy(job);
- }
-}
-
-static inline void job_exit_callback(Job *job)
-{
- // Free the slot now, 'exit_cb' may want to start another job to replace
- // this one
- table[job->id - 1] = NULL;
-
- if (job->opts.exit_cb) {
- // Invoke the exit callback
- job->opts.exit_cb(job, job->status, job->opts.data);
- }
-
- if (stop_requests && !--stop_requests) {
- // Stop the timer if no more stop requests are pending
- DLOG("Stopping job kill timer");
- uv_timer_stop(&job_stop_timer);
- }
-}
-
-static inline void job_decref(Job *job)
-{
- if (--job->refcount == 0) {
- // Invoke the exit_cb
- job_exit_callback(job);
- // Free all memory allocated for the job
- xfree(job->proc_stdin->data);
- xfree(job->proc_stdout->data);
- xfree(job->proc_stderr->data);
- shell_free_argv(job->opts.argv);
- process_destroy(job);
- xfree(job);
- }
-}
-
-
-#endif // NVIM_OS_JOB_PRIVATE_H
diff --git a/src/nvim/os/os.h b/src/nvim/os/os.h
index 3dd099890c..69bd1ff4fd 100644
--- a/src/nvim/os/os.h
+++ b/src/nvim/os/os.h
@@ -12,7 +12,6 @@
# include "os/mem.h.generated.h"
# include "os/env.h.generated.h"
# include "os/users.h.generated.h"
-# include "os/stream.h.generated.h"
#endif
#endif // NVIM_OS_OS_H
diff --git a/src/nvim/os/pipe_process.c b/src/nvim/os/pipe_process.c
deleted file mode 100644
index 2ac305e967..0000000000
--- a/src/nvim/os/pipe_process.c
+++ /dev/null
@@ -1,110 +0,0 @@
-#include <stdbool.h>
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/os/uv_helpers.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/job_private.h"
-#include "nvim/os/pipe_process.h"
-#include "nvim/memory.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pipe_process.c.generated.h"
-#endif
-
-typedef struct {
- // Structures for process spawning/management used by libuv
- uv_process_t proc;
- uv_process_options_t proc_opts;
- uv_stdio_container_t stdio[3];
- uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
-} UvProcess;
-
-void pipe_process_init(Job *job)
-{
- UvProcess *pipeproc = xmalloc(sizeof(UvProcess));
- pipeproc->proc_opts.file = job->opts.argv[0];
- pipeproc->proc_opts.args = job->opts.argv;
- pipeproc->proc_opts.stdio = pipeproc->stdio;
- pipeproc->proc_opts.stdio_count = 3;
- pipeproc->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE;
- pipeproc->proc_opts.exit_cb = exit_cb;
- pipeproc->proc_opts.cwd = NULL;
- pipeproc->proc_opts.env = NULL;
- pipeproc->proc.data = NULL;
- pipeproc->proc_stdin.data = NULL;
- pipeproc->proc_stdout.data = NULL;
- pipeproc->proc_stderr.data = NULL;
-
- // Initialize the job std{in,out,err}
- pipeproc->stdio[0].flags = UV_IGNORE;
- pipeproc->stdio[1].flags = UV_IGNORE;
- pipeproc->stdio[2].flags = UV_IGNORE;
-
- handle_set_job((uv_handle_t *)&pipeproc->proc, job);
-
- if (job->opts.writable) {
- uv_pipe_init(uv_default_loop(), &pipeproc->proc_stdin, 0);
- pipeproc->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
- pipeproc->stdio[0].data.stream = (uv_stream_t *)&pipeproc->proc_stdin;
- }
-
- if (job->opts.stdout_cb) {
- uv_pipe_init(uv_default_loop(), &pipeproc->proc_stdout, 0);
- pipeproc->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- pipeproc->stdio[1].data.stream = (uv_stream_t *)&pipeproc->proc_stdout;
- }
-
- if (job->opts.stderr_cb) {
- uv_pipe_init(uv_default_loop(), &pipeproc->proc_stderr, 0);
- pipeproc->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
- pipeproc->stdio[2].data.stream = (uv_stream_t *)&pipeproc->proc_stderr;
- }
-
- job->proc_stdin = (uv_stream_t *)&pipeproc->proc_stdin;
- job->proc_stdout = (uv_stream_t *)&pipeproc->proc_stdout;
- job->proc_stderr = (uv_stream_t *)&pipeproc->proc_stderr;
- job->process = pipeproc;
-}
-
-void pipe_process_destroy(Job *job)
-{
- UvProcess *pipeproc = job->process;
- xfree(pipeproc->proc.data);
- xfree(pipeproc);
- job->process = NULL;
-}
-
-bool pipe_process_spawn(Job *job)
-{
- UvProcess *pipeproc = job->process;
-
- if (uv_spawn(uv_default_loop(), &pipeproc->proc, &pipeproc->proc_opts) != 0) {
- return false;
- }
-
- job->pid = pipeproc->proc.pid;
- return true;
-}
-
-void pipe_process_close(Job *job)
-{
- UvProcess *pipeproc = job->process;
- uv_close((uv_handle_t *)&pipeproc->proc, close_cb);
-}
-
-static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
-{
- Job *job = handle_get_job((uv_handle_t *)proc);
- job->status = (int)status;
- pipe_process_close(job);
-}
-
-static void close_cb(uv_handle_t *handle)
-{
- Job *job = handle_get_job(handle);
- job_close_streams(job);
- job_decref(job);
-}
diff --git a/src/nvim/os/pipe_process.h b/src/nvim/os/pipe_process.h
deleted file mode 100644
index 17a4255ddc..0000000000
--- a/src/nvim/os/pipe_process.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef NVIM_OS_PIPE_PROCESS_H
-#define NVIM_OS_PIPE_PROCESS_H
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pipe_process.h.generated.h"
-#endif
-#endif // NVIM_OS_PIPE_PROCESS_H
diff --git a/src/nvim/os/pty_process.c b/src/nvim/os/pty_process.c
deleted file mode 100644
index ff0bcfb6de..0000000000
--- a/src/nvim/os/pty_process.c
+++ /dev/null
@@ -1,257 +0,0 @@
-// Some of the code came from pangoterm and libuv
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <unistd.h>
-#include <termios.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <sys/ioctl.h>
-
-// forkpty is not in POSIX, so headers are platform-specific
-#if defined(__FreeBSD__)
-# include <libutil.h>
-#elif defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__)
-# include <util.h>
-#else
-# include <pty.h>
-#endif
-
-#include <uv.h>
-
-#include "nvim/func_attr.h"
-#include "nvim/os/job.h"
-#include "nvim/os/job_defs.h"
-#include "nvim/os/job_private.h"
-#include "nvim/os/pty_process.h"
-#include "nvim/memory.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pty_process.c.generated.h"
-#endif
-
-typedef struct {
- struct winsize winsize;
- uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
- int tty_fd;
-} PtyProcess;
-
-void pty_process_init(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- PtyProcess *ptyproc = xmalloc(sizeof(PtyProcess));
- ptyproc->tty_fd = -1;
-
- if (job->opts.writable) {
- uv_pipe_init(uv_default_loop(), &ptyproc->proc_stdin, 0);
- ptyproc->proc_stdin.data = NULL;
- }
-
- if (job->opts.stdout_cb) {
- uv_pipe_init(uv_default_loop(), &ptyproc->proc_stdout, 0);
- ptyproc->proc_stdout.data = NULL;
- }
-
- if (job->opts.stderr_cb) {
- uv_pipe_init(uv_default_loop(), &ptyproc->proc_stderr, 0);
- ptyproc->proc_stderr.data = NULL;
- }
-
- job->proc_stdin = (uv_stream_t *)&ptyproc->proc_stdin;
- job->proc_stdout = (uv_stream_t *)&ptyproc->proc_stdout;
- job->proc_stderr = (uv_stream_t *)&ptyproc->proc_stderr;
- job->process = ptyproc;
-}
-
-void pty_process_destroy(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- xfree(job->opts.term_name);
- xfree(job->process);
- job->process = NULL;
-}
-
-static bool set_pipe_duplicating_descriptor(int fd, uv_pipe_t *pipe)
- FUNC_ATTR_NONNULL_ALL
-{
- int fd_dup = dup(fd);
- if (fd_dup < 0) {
- ELOG("Failed to dup descriptor %d: %s", fd, strerror(errno));
- return false;
- }
- int uv_result = uv_pipe_open(pipe, fd_dup);
- if (uv_result) {
- ELOG("Failed to set pipe to descriptor %d: %s",
- fd_dup, uv_strerror(uv_result));
- close(fd_dup);
- return false;
- }
- return true;
-}
-
-static const unsigned int KILL_RETRIES = 5;
-static const unsigned int KILL_TIMEOUT = 2; // seconds
-
-bool pty_process_spawn(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- int master;
- PtyProcess *ptyproc = job->process;
- ptyproc->winsize = (struct winsize){job->opts.height, job->opts.width, 0, 0};
- struct termios termios;
- init_termios(&termios);
- uv_disable_stdio_inheritance();
-
- int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
-
- if (pid < 0) {
- return false;
- } else if (pid == 0) {
- init_child(job);
- abort();
- }
-
- // make sure the master file descriptor is non blocking
- int master_status_flags = fcntl(master, F_GETFL);
- if (master_status_flags == -1) {
- ELOG("Failed to get master descriptor status flags: %s", strerror(errno));
- goto error;
- }
- if (fcntl(master, F_SETFL, master_status_flags | O_NONBLOCK) == -1) {
- ELOG("Failed to make master descriptor non-blocking: %s", strerror(errno));
- goto error;
- }
-
- if (job->opts.writable
- && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdin)) {
- goto error;
- }
-
- if (job->opts.stdout_cb
- && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stdout)) {
- goto error;
- }
-
- if (job->opts.stderr_cb
- && !set_pipe_duplicating_descriptor(master, &ptyproc->proc_stderr)) {
- goto error;
- }
-
- ptyproc->tty_fd = master;
- job->pid = pid;
- return true;
-
-error:
- close(master);
-
- // terminate spawned process
- kill(pid, SIGTERM);
- int status, child;
- unsigned int try = 0;
- while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) {
- sleep(KILL_TIMEOUT);
- }
- if (child != pid) {
- kill(pid, SIGKILL);
- }
-
- return false;
-}
-
-void pty_process_close(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- pty_process_close_master(job);
- job_close_streams(job);
- job_decref(job);
-}
-
-void pty_process_close_master(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- PtyProcess *ptyproc = job->process;
- if (ptyproc->tty_fd >= 0) {
- close(ptyproc->tty_fd);
- ptyproc->tty_fd = -1;
- }
-}
-
-void pty_process_resize(Job *job, uint16_t width, uint16_t height)
- FUNC_ATTR_NONNULL_ALL
-{
- PtyProcess *ptyproc = job->process;
- ptyproc->winsize = (struct winsize){height, width, 0, 0};
- ioctl(ptyproc->tty_fd, TIOCSWINSZ, &ptyproc->winsize);
-}
-
-static void init_child(Job *job) FUNC_ATTR_NONNULL_ALL
-{
- unsetenv("COLUMNS");
- unsetenv("LINES");
- unsetenv("TERMCAP");
- unsetenv("COLORTERM");
- unsetenv("COLORFGBG");
-
- signal(SIGCHLD, SIG_DFL);
- signal(SIGHUP, SIG_DFL);
- signal(SIGINT, SIG_DFL);
- signal(SIGQUIT, SIG_DFL);
- signal(SIGTERM, SIG_DFL);
- signal(SIGALRM, SIG_DFL);
-
- setenv("TERM", job->opts.term_name ? job->opts.term_name : "ansi", 1);
- execvp(job->opts.argv[0], job->opts.argv);
- fprintf(stderr, "execvp failed: %s\n", strerror(errno));
-}
-
-static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
-{
- memset(termios, 0, sizeof(struct termios));
- // Taken from pangoterm
- termios->c_iflag = ICRNL|IXON;
- termios->c_oflag = OPOST|ONLCR;
-#ifdef TAB0
- termios->c_oflag |= TAB0;
-#endif
- termios->c_cflag = CS8|CREAD;
- termios->c_lflag = ISIG|ICANON|IEXTEN|ECHO|ECHOE|ECHOK;
-
- cfsetspeed(termios, 38400);
-
-#ifdef IUTF8
- termios->c_iflag |= IUTF8;
-#endif
-#ifdef NL0
- termios->c_oflag |= NL0;
-#endif
-#ifdef CR0
- termios->c_oflag |= CR0;
-#endif
-#ifdef BS0
- termios->c_oflag |= BS0;
-#endif
-#ifdef VT0
- termios->c_oflag |= VT0;
-#endif
-#ifdef FF0
- termios->c_oflag |= FF0;
-#endif
-#ifdef ECHOCTL
- termios->c_lflag |= ECHOCTL;
-#endif
-#ifdef ECHOKE
- termios->c_lflag |= ECHOKE;
-#endif
-
- termios->c_cc[VINTR] = 0x1f & 'C';
- termios->c_cc[VQUIT] = 0x1f & '\\';
- termios->c_cc[VERASE] = 0x7f;
- termios->c_cc[VKILL] = 0x1f & 'U';
- termios->c_cc[VEOF] = 0x1f & 'D';
- termios->c_cc[VEOL] = _POSIX_VDISABLE;
- termios->c_cc[VEOL2] = _POSIX_VDISABLE;
- termios->c_cc[VSTART] = 0x1f & 'Q';
- termios->c_cc[VSTOP] = 0x1f & 'S';
- termios->c_cc[VSUSP] = 0x1f & 'Z';
- termios->c_cc[VREPRINT] = 0x1f & 'R';
- termios->c_cc[VWERASE] = 0x1f & 'W';
- termios->c_cc[VLNEXT] = 0x1f & 'V';
- termios->c_cc[VMIN] = 1;
- termios->c_cc[VTIME] = 0;
-}
diff --git a/src/nvim/os/pty_process.h b/src/nvim/os/pty_process.h
deleted file mode 100644
index 62fcd1671f..0000000000
--- a/src/nvim/os/pty_process.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef NVIM_OS_PTY_PROCESS_H
-#define NVIM_OS_PTY_PROCESS_H
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/pty_process.h.generated.h"
-#endif
-#endif // NVIM_OS_PTY_PROCESS_H
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
deleted file mode 100644
index af84288f0f..0000000000
--- a/src/nvim/os/rstream.c
+++ /dev/null
@@ -1,253 +0,0 @@
-#include <assert.h>
-#include <stdint.h>
-#include <stdbool.h>
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/os/uv_helpers.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/rstream.h"
-#include "nvim/ascii.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-#include "nvim/log.h"
-#include "nvim/misc1.h"
-
-struct rstream {
- void *data;
- uv_buf_t uvbuf;
- size_t fpos;
- RBuffer *buffer;
- uv_stream_t *stream;
- uv_idle_t *fread_idle;
- uv_handle_type file_type;
- uv_file fd;
- rstream_cb cb;
- bool free_handle;
-};
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/rstream.c.generated.h"
-#endif
-
-/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
-/// necessary for reading from a libuv stream.
-///
-/// @param cb A function that will be called whenever some data is available
-/// for reading with `rstream_read`
-/// @param buffer RBuffer instance to associate with the RStream
-/// @param data Some state to associate with the `RStream` instance
-/// @return The newly-allocated `RStream` instance
-RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
-{
- RStream *rv = xmalloc(sizeof(RStream));
- buffer->data = rv;
- buffer->full_cb = on_rbuffer_full;
- buffer->nonfull_cb = on_rbuffer_nonfull;
- rv->buffer = buffer;
- rv->fpos = 0;
- rv->data = data;
- rv->cb = cb;
- rv->stream = NULL;
- rv->fread_idle = NULL;
- rv->free_handle = false;
- rv->file_type = UV_UNKNOWN_HANDLE;
-
- return rv;
-}
-
-static void on_rbuffer_full(RBuffer *buf, void *data)
-{
- rstream_stop(data);
-}
-
-static void on_rbuffer_nonfull(RBuffer *buf, void *data)
-{
- rstream_start(data);
-}
-
-/// Frees all memory allocated for a RStream instance
-///
-/// @param rstream The `RStream` instance
-void rstream_free(RStream *rstream)
-{
- if (rstream->free_handle) {
- if (rstream->fread_idle != NULL) {
- uv_close((uv_handle_t *)rstream->fread_idle, close_cb);
- } else {
- uv_close((uv_handle_t *)rstream->stream, close_cb);
- }
- }
-
- rbuffer_free(rstream->buffer);
- xfree(rstream);
-}
-
-/// Sets the underlying `uv_stream_t` instance
-///
-/// @param rstream The `RStream` instance
-/// @param stream The new `uv_stream_t` instance
-void rstream_set_stream(RStream *rstream, uv_stream_t *stream)
-{
- handle_set_rstream((uv_handle_t *)stream, rstream);
- rstream->stream = stream;
-}
-
-/// Sets the underlying file descriptor that will be read from. Only pipes
-/// and regular files are supported for now.
-///
-/// @param rstream The `RStream` instance
-/// @param file The file descriptor
-void rstream_set_file(RStream *rstream, uv_file file)
-{
- rstream->file_type = uv_guess_handle(file);
-
- if (rstream->free_handle) {
- // If this is the second time we're calling this function, free the
- // previously allocated memory
- if (rstream->fread_idle != NULL) {
- uv_close((uv_handle_t *)rstream->fread_idle, close_cb);
- rstream->fread_idle = NULL;
- } else {
- uv_close((uv_handle_t *)rstream->stream, close_cb);
- rstream->stream = NULL;
- }
- }
-
- if (rstream->file_type == UV_FILE) {
- // Non-blocking file reads are simulated with an idle handle that reads
- // in chunks of rstream->buffer_size, giving time for other events to
- // be processed between reads.
- rstream->fread_idle = xmalloc(sizeof(uv_idle_t));
- uv_idle_init(uv_default_loop(), rstream->fread_idle);
- rstream->fread_idle->data = NULL;
- handle_set_rstream((uv_handle_t *)rstream->fread_idle, rstream);
- } else {
- // Only pipes are supported for now
- assert(rstream->file_type == UV_NAMED_PIPE
- || rstream->file_type == UV_TTY);
- rstream->stream = xmalloc(sizeof(uv_pipe_t));
- uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0);
- uv_pipe_open((uv_pipe_t *)rstream->stream, file);
- rstream->stream->data = NULL;
- handle_set_rstream((uv_handle_t *)rstream->stream, rstream);
- }
-
- rstream->fd = file;
- rstream->free_handle = true;
-}
-
-/// Starts watching for events from a `RStream` instance.
-///
-/// @param rstream The `RStream` instance
-void rstream_start(RStream *rstream)
-{
- if (rstream->file_type == UV_FILE) {
- uv_idle_start(rstream->fread_idle, fread_idle_cb);
- } else {
- uv_read_start(rstream->stream, alloc_cb, read_cb);
- }
-}
-
-/// Stops watching for events from a `RStream` instance.
-///
-/// @param rstream The `RStream` instance
-void rstream_stop(RStream *rstream)
-{
- if (rstream->file_type == UV_FILE) {
- uv_idle_stop(rstream->fread_idle);
- } else {
- uv_read_stop(rstream->stream);
- }
-}
-
-// Callbacks used by libuv
-
-// Called by libuv to allocate memory for reading.
-static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
-{
- RStream *rstream = handle_get_rstream(handle);
- buf->base = rbuffer_write_ptr(rstream->buffer, &buf->len);
-}
-
-// Callback invoked by libuv after it copies the data into the buffer provided
-// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
-// 0-length buffer.
-static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
-{
- RStream *rstream = handle_get_rstream((uv_handle_t *)stream);
-
- if (cnt <= 0) {
- if (cnt != UV_ENOBUFS
- // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
- // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
- //
- // We don't need to do anything with the RBuffer because the next call
- // to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
- // won't be called)
- && cnt != 0) {
- DLOG("Closing RStream(%p) because of %s(%zd)", rstream,
- uv_strerror((int)cnt), cnt);
- // Read error or EOF, either way stop the stream and invoke the callback
- // with eof == true
- uv_read_stop(stream);
- rstream->cb(rstream, rstream->buffer, rstream->data, true);
- }
- return;
- }
-
- // at this point we're sure that cnt is positive, no error occurred
- size_t nread = (size_t)cnt;
- // Data was already written, so all we need is to update 'wpos' to reflect
- // the space actually used in the buffer.
- rbuffer_produced(rstream->buffer, nread);
- rstream->cb(rstream, rstream->buffer, rstream->data, false);
-}
-
-// Called by the by the 'idle' handle to emulate a reading event
-static void fread_idle_cb(uv_idle_t *handle)
-{
- uv_fs_t req;
- RStream *rstream = handle_get_rstream((uv_handle_t *)handle);
-
- rstream->uvbuf.base = rbuffer_write_ptr(rstream->buffer, &rstream->uvbuf.len);
-
- // the offset argument to uv_fs_read is int64_t, could someone really try
- // to read more than 9 quintillion (9e18) bytes?
- // upcast is meant to avoid tautological condition warning on 32 bits
- uintmax_t fpos_intmax = rstream->fpos;
- if (fpos_intmax > INT64_MAX) {
- ELOG("stream offset overflow");
- preserve_exit();
- }
-
- // Synchronous read
- uv_fs_read(
- uv_default_loop(),
- &req,
- rstream->fd,
- &rstream->uvbuf,
- 1,
- (int64_t) rstream->fpos,
- NULL);
-
- uv_fs_req_cleanup(&req);
-
- if (req.result <= 0) {
- uv_idle_stop(rstream->fread_idle);
- rstream->cb(rstream, rstream->buffer, rstream->data, true);
- return;
- }
-
- // no errors (req.result (ssize_t) is positive), it's safe to cast.
- size_t nread = (size_t) req.result;
- rbuffer_produced(rstream->buffer, nread);
- rstream->fpos += nread;
-}
-
-static void close_cb(uv_handle_t *handle)
-{
- xfree(handle->data);
- xfree(handle);
-}
diff --git a/src/nvim/os/rstream.h b/src/nvim/os/rstream.h
deleted file mode 100644
index 3e24724573..0000000000
--- a/src/nvim/os/rstream.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#ifndef NVIM_OS_RSTREAM_H
-#define NVIM_OS_RSTREAM_H
-
-#include <stdbool.h>
-#include <stdint.h>
-#include <uv.h>
-#include "nvim/os/event_defs.h"
-#include "nvim/os/rstream_defs.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/rstream.h.generated.h"
-#endif
-#endif // NVIM_OS_RSTREAM_H
diff --git a/src/nvim/os/rstream_defs.h b/src/nvim/os/rstream_defs.h
deleted file mode 100644
index 45dced0b62..0000000000
--- a/src/nvim/os/rstream_defs.h
+++ /dev/null
@@ -1,20 +0,0 @@
-#ifndef NVIM_OS_RSTREAM_DEFS_H
-#define NVIM_OS_RSTREAM_DEFS_H
-
-#include <stdbool.h>
-
-#include "nvim/rbuffer.h"
-
-typedef struct rstream RStream;
-
-/// Type of function called when the RStream receives data
-///
-/// @param rstream The RStream instance
-/// @param rbuffer The associated RBuffer instance
-/// @param data State associated with the RStream instance
-/// @param eof If the stream reached EOF.
-typedef void (*rstream_cb)(RStream *rstream, RBuffer *buf, void *data,
- bool eof);
-
-#endif // NVIM_OS_RSTREAM_DEFS_H
-
diff --git a/src/nvim/os/shell.c b/src/nvim/os/shell.c
index 48174533a6..e0d67d4951 100644
--- a/src/nvim/os/shell.c
+++ b/src/nvim/os/shell.c
@@ -8,9 +8,9 @@
#include "nvim/ascii.h"
#include "nvim/lib/kvec.h"
#include "nvim/log.h"
-#include "nvim/os/event.h"
-#include "nvim/os/job.h"
-#include "nvim/os/rstream.h"
+#include "nvim/event/loop.h"
+#include "nvim/event/uv_process.h"
+#include "nvim/event/rstream.h"
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
#include "nvim/types.h"
@@ -189,7 +189,7 @@ static int do_os_system(char **argv,
{
// the output buffer
DynamicBuffer buf = DYNAMIC_BUFFER_INIT;
- rstream_cb data_cb = system_data_cb;
+ stream_read_cb data_cb = system_data_cb;
if (nread) {
*nread = 0;
}
@@ -204,17 +204,15 @@ static int do_os_system(char **argv,
char prog[MAXPATHL];
xstrlcpy(prog, argv[0], MAXPATHL);
- int status;
- JobOptions opts = JOB_OPTIONS_INIT;
- opts.argv = argv;
- opts.data = &buf;
- opts.writable = input != NULL;
- opts.stdout_cb = data_cb;
- opts.stderr_cb = data_cb;
- opts.exit_cb = NULL;
- Job *job = job_start(opts, &status);
-
- if (status <= 0) {
+ Stream in, out, err;
+ UvProcess uvproc = uv_process_init(&buf);
+ Process *proc = &uvproc.process;
+ proc->argv = argv;
+ proc->in = input != NULL ? &in : NULL;
+ proc->out = &out;
+ proc->err = &err;
+ if (!process_spawn(&loop, proc)) {
+ loop_poll_events(&loop, 0);
// Failed, probably due to `sh` not being executable
if (!silent) {
MSG_PUTS(_("\nCannot execute "));
@@ -224,28 +222,32 @@ static int do_os_system(char **argv,
return -1;
}
+ if (input != NULL) {
+ wstream_init(proc->in, 0);
+ }
+ rstream_init(proc->out, 0);
+ rstream_start(proc->out, data_cb);
+ rstream_init(proc->err, 0);
+ rstream_start(proc->err, data_cb);
+
// write the input, if any
if (input) {
WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL);
- if (!job_write(job, input_buffer)) {
- // couldn't write, stop the job and tell the user about it
- job_stop(job);
+ if (!wstream_write(&in, input_buffer)) {
+ // couldn't write, stop the process and tell the user about it
+ process_stop(proc);
return -1;
}
// close the input stream after everything is written
- job_write_cb(job, shell_write_cb);
- } else {
- // close the input stream, let the process know that no more input is
- // coming
- job_close_in(job);
+ wstream_set_write_cb(&in, shell_write_cb);
}
// invoke busy_start here so event_poll_until wont change the busy state for
// the UI
ui_busy_start();
ui_flush();
- status = job_wait(job, -1);
+ int status = process_wait(proc, -1);
ui_busy_stop();
// prepare the out parameters if requested
@@ -283,10 +285,9 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
buf->data = xrealloc(buf->data, buf->cap);
}
-static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
+static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
{
- Job *job = data;
- DynamicBuffer *dbuf = job_data(job);
+ DynamicBuffer *dbuf = data;
size_t nread = buf->size;
dynamic_buffer_ensure(dbuf, dbuf->len + nread + 1);
@@ -294,7 +295,7 @@ static void system_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
dbuf->len += nread;
}
-static void out_data_cb(RStream *rstream, RBuffer *buf, void *data, bool eof)
+static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
{
RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
size_t written = write_output(ptr, len, false,
@@ -470,8 +471,7 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
return (size_t)(output - start);
}
-static void shell_write_cb(WStream *wstream, void *data, int status)
+static void shell_write_cb(Stream *stream, void *data, int status)
{
- Job *job = data;
- job_close_in(job);
+ stream_close(stream, NULL);
}
diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c
index f824543003..6de3435c4c 100644
--- a/src/nvim/os/signal.c
+++ b/src/nvim/os/signal.c
@@ -11,12 +11,13 @@
#include "nvim/memory.h"
#include "nvim/misc1.h"
#include "nvim/misc2.h"
+#include "nvim/event/signal.h"
#include "nvim/os/signal.h"
-#include "nvim/os/event.h"
+#include "nvim/event/loop.h"
-static uv_signal_t spipe, shup, squit, sterm;
+static SignalWatcher spipe, shup, squit, sterm;
#ifdef SIGPWR
-static uv_signal_t spwr;
+static SignalWatcher spwr;
#endif
static bool rejecting_deadly;
@@ -27,40 +28,40 @@ static bool rejecting_deadly;
void signal_init(void)
{
- uv_signal_init(uv_default_loop(), &spipe);
- uv_signal_init(uv_default_loop(), &shup);
- uv_signal_init(uv_default_loop(), &squit);
- uv_signal_init(uv_default_loop(), &sterm);
- uv_signal_start(&spipe, signal_cb, SIGPIPE);
- uv_signal_start(&shup, signal_cb, SIGHUP);
- uv_signal_start(&squit, signal_cb, SIGQUIT);
- uv_signal_start(&sterm, signal_cb, SIGTERM);
+ signal_watcher_init(&loop, &spipe, NULL);
+ signal_watcher_init(&loop, &shup, NULL);
+ signal_watcher_init(&loop, &squit, NULL);
+ signal_watcher_init(&loop, &sterm, NULL);
+ signal_watcher_start(&spipe, on_signal, SIGPIPE);
+ signal_watcher_start(&shup, on_signal, SIGHUP);
+ signal_watcher_start(&squit, on_signal, SIGQUIT);
+ signal_watcher_start(&sterm, on_signal, SIGTERM);
#ifdef SIGPWR
- uv_signal_init(uv_default_loop(), &spwr);
- uv_signal_start(&spwr, signal_cb, SIGPWR);
+ signal_watcher_init(&loop, &spwr, NULL);
+ signal_watcher_start(&spwr, on_signal, SIGPWR);
#endif
}
void signal_teardown(void)
{
signal_stop();
- uv_close((uv_handle_t *)&spipe, NULL);
- uv_close((uv_handle_t *)&shup, NULL);
- uv_close((uv_handle_t *)&squit, NULL);
- uv_close((uv_handle_t *)&sterm, NULL);
+ signal_watcher_close(&spipe, NULL);
+ signal_watcher_close(&shup, NULL);
+ signal_watcher_close(&squit, NULL);
+ signal_watcher_close(&sterm, NULL);
#ifdef SIGPWR
- uv_close((uv_handle_t *)&spwr, NULL);
+ signal_watcher_close(&spwr, NULL);
#endif
}
void signal_stop(void)
{
- uv_signal_stop(&spipe);
- uv_signal_stop(&shup);
- uv_signal_stop(&squit);
- uv_signal_stop(&sterm);
+ signal_watcher_stop(&spipe);
+ signal_watcher_stop(&shup);
+ signal_watcher_stop(&squit);
+ signal_watcher_stop(&sterm);
#ifdef SIGPWR
- uv_signal_stop(&spwr);
+ signal_watcher_stop(&spwr);
#endif
}
@@ -111,10 +112,10 @@ static void deadly_signal(int signum)
preserve_exit();
}
-static void signal_cb(uv_signal_t *handle, int signum)
+static void on_signal(SignalWatcher *handle, int signum, void *data)
{
assert(signum >= 0);
- event_push((Event) {
+ loop_push_event(&loop, (Event) {
.handler = on_signal_event,
.data = (void *)(uintptr_t)signum
}, false);
diff --git a/src/nvim/os/signal.h b/src/nvim/os/signal.h
index 927437b2db..5d8cc6f661 100644
--- a/src/nvim/os/signal.h
+++ b/src/nvim/os/signal.h
@@ -1,8 +1,6 @@
#ifndef NVIM_OS_SIGNAL_H
#define NVIM_OS_SIGNAL_H
-#include "nvim/os/event_defs.h"
-
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/signal.h.generated.h"
#endif
diff --git a/src/nvim/os/stream.c b/src/nvim/os/stream.c
deleted file mode 100644
index 0c448872c3..0000000000
--- a/src/nvim/os/stream.c
+++ /dev/null
@@ -1,30 +0,0 @@
-// Functions for working with stdio streams (as opposed to RStream/WStream).
-
-#include <stdio.h>
-#include <stdbool.h>
-
-#include <uv.h>
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/stream.c.generated.h"
-#endif
-
-/// Sets the stream associated with `fd` to "blocking" mode.
-///
-/// @return `0` on success, or `-errno` on failure.
-int stream_set_blocking(int fd, bool blocking)
-{
- // Private loop to avoid conflict with existing watcher(s):
- // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
- uv_loop_t loop;
- uv_pipe_t stream;
- uv_loop_init(&loop);
- uv_pipe_init(&loop, &stream, 0);
- uv_pipe_open(&stream, fd);
- int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking);
- uv_close((uv_handle_t *)&stream, NULL);
- uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt.
- uv_loop_close(&loop);
- return retval;
-}
-
diff --git a/src/nvim/os/time.c b/src/nvim/os/time.c
index 590dfba797..6b5d4359db 100644
--- a/src/nvim/os/time.c
+++ b/src/nvim/os/time.c
@@ -7,7 +7,7 @@
#include <uv.h>
#include "nvim/os/time.h"
-#include "nvim/os/event.h"
+#include "nvim/event/loop.h"
#include "nvim/vim.h"
static uv_mutex_t delay_mutex;
@@ -43,7 +43,7 @@ void os_delay(uint64_t milliseconds, bool ignoreinput)
if (milliseconds > INT_MAX) {
milliseconds = INT_MAX;
}
- event_poll_until((int)milliseconds, got_int);
+ LOOP_POLL_EVENTS_UNTIL(&loop, (int)milliseconds, got_int);
} else {
os_microdelay(milliseconds * 1000);
}
diff --git a/src/nvim/os/uv_helpers.c b/src/nvim/os/uv_helpers.c
deleted file mode 100644
index 89687bdac7..0000000000
--- a/src/nvim/os/uv_helpers.c
+++ /dev/null
@@ -1,98 +0,0 @@
-#include <assert.h>
-#include <uv.h>
-
-#include "nvim/os/uv_helpers.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-
-/// Common structure that will always be assigned to the `data` field of
-/// libuv handles. It has fields for many types of pointers, and allow a single
-/// handle to contain data from many sources
-typedef struct {
- WStream *wstream;
- RStream *rstream;
- Job *job;
-} HandleData;
-
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/uv_helpers.c.generated.h"
-#endif
-
-/// Gets the RStream instance associated with a libuv handle
-///
-/// @param handle libuv handle
-/// @return the RStream pointer
-RStream *handle_get_rstream(uv_handle_t *handle)
-{
- RStream *rv = init(handle)->rstream;
- assert(rv != NULL);
- return rv;
-}
-
-/// Associates a RStream instance with a libuv handle
-///
-/// @param handle libuv handle
-/// @param rstream the RStream pointer
-void handle_set_rstream(uv_handle_t *handle, RStream *rstream)
-{
- init(handle)->rstream = rstream;
-}
-
-/// Gets the WStream instance associated with a libuv handle
-///
-/// @param handle libuv handle
-/// @return the WStream pointer
-WStream *handle_get_wstream(uv_handle_t *handle)
-{
- WStream *rv = init(handle)->wstream;
- assert(rv != NULL);
- return rv;
-}
-
-/// Associates a WStream instance with a libuv handle
-///
-/// @param handle libuv handle
-/// @param wstream the WStream pointer
-void handle_set_wstream(uv_handle_t *handle, WStream *wstream)
-{
- HandleData *data = init(handle);
- data->wstream = wstream;
-}
-
-/// Gets the Job instance associated with a libuv handle
-///
-/// @param handle libuv handle
-/// @return the Job pointer
-Job *handle_get_job(uv_handle_t *handle)
-{
- Job *rv = init(handle)->job;
- assert(rv != NULL);
- return rv;
-}
-
-/// Associates a Job instance with a libuv handle
-///
-/// @param handle libuv handle
-/// @param job the Job pointer
-void handle_set_job(uv_handle_t *handle, Job *job)
-{
- init(handle)->job = job;
-}
-
-static HandleData *init(uv_handle_t *handle)
-{
- HandleData *rv;
-
- if (handle->data == NULL) {
- rv = xmalloc(sizeof(HandleData));
- rv->rstream = NULL;
- rv->wstream = NULL;
- rv->job = NULL;
- handle->data = rv;
- } else {
- rv = handle->data;
- }
-
- return rv;
-}
diff --git a/src/nvim/os/uv_helpers.h b/src/nvim/os/uv_helpers.h
deleted file mode 100644
index b49656bcb8..0000000000
--- a/src/nvim/os/uv_helpers.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#ifndef NVIM_OS_UV_HELPERS_H
-#define NVIM_OS_UV_HELPERS_H
-
-#include <uv.h>
-
-#include "nvim/os/wstream_defs.h"
-#include "nvim/os/rstream_defs.h"
-#include "nvim/os/job_defs.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/uv_helpers.h.generated.h"
-#endif
-#endif // NVIM_OS_UV_HELPERS_H
diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c
deleted file mode 100644
index 73896c381d..0000000000
--- a/src/nvim/os/wstream.c
+++ /dev/null
@@ -1,243 +0,0 @@
-#include <assert.h>
-#include <stdint.h>
-#include <stdbool.h>
-#include <stdlib.h>
-
-#include <uv.h>
-
-#include "nvim/os/uv_helpers.h"
-#include "nvim/os/wstream.h"
-#include "nvim/os/wstream_defs.h"
-#include "nvim/vim.h"
-#include "nvim/memory.h"
-
-#define DEFAULT_MAXMEM 1024 * 1024 * 10
-
-struct wstream {
- uv_stream_t *stream;
- // Memory currently used by pending buffers
- size_t curmem;
- // Maximum memory used by this instance
- size_t maxmem;
- // Number of pending requests
- size_t pending_reqs;
- bool freed, free_handle;
- // (optional) Write callback and data
- wstream_cb cb;
- void *data;
-};
-
-struct wbuffer {
- size_t size, refcount;
- char *data;
- wbuffer_data_finalizer cb;
-};
-
-typedef struct {
- WStream *wstream;
- WBuffer *buffer;
- uv_write_t uv_req;
-} WRequest;
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/wstream.c.generated.h"
-#endif
-
-/// Creates a new WStream instance. A WStream encapsulates all the boilerplate
-/// necessary for writing to a libuv stream.
-///
-/// @param maxmem Maximum amount memory used by this `WStream` instance. If 0,
-/// a default value of 10mb will be used.
-/// @return The newly-allocated `WStream` instance
-WStream * wstream_new(size_t maxmem)
-{
- if (!maxmem) {
- maxmem = DEFAULT_MAXMEM;
- }
-
- WStream *rv = xmalloc(sizeof(WStream));
- rv->maxmem = maxmem;
- rv->stream = NULL;
- rv->curmem = 0;
- rv->pending_reqs = 0;
- rv->freed = false;
- rv->free_handle = false;
- rv->cb = NULL;
-
- return rv;
-}
-
-/// Frees all memory allocated for a WStream instance
-///
-/// @param wstream The `WStream` instance
-void wstream_free(WStream *wstream) {
- if (!wstream->pending_reqs) {
- if (wstream->free_handle) {
- uv_close((uv_handle_t *)wstream->stream, close_cb);
- } else {
- handle_set_wstream((uv_handle_t *)wstream->stream, NULL);
- xfree(wstream);
- }
- } else {
- wstream->freed = true;
- }
-}
-
-/// Sets the underlying `uv_stream_t` instance
-///
-/// @param wstream The `WStream` instance
-/// @param stream The new `uv_stream_t` instance
-void wstream_set_stream(WStream *wstream, uv_stream_t *stream)
-{
- handle_set_wstream((uv_handle_t *)stream, wstream);
- wstream->stream = stream;
-}
-
-/// Sets the underlying file descriptor that will be written to. Only pipes
-/// are supported for now.
-///
-/// @param wstream The `WStream` instance
-/// @param file The file descriptor
-void wstream_set_file(WStream *wstream, uv_file file)
-{
- assert(uv_guess_handle(file) == UV_NAMED_PIPE ||
- uv_guess_handle(file) == UV_TTY);
- wstream->stream = xmalloc(sizeof(uv_pipe_t));
- uv_pipe_init(uv_default_loop(), (uv_pipe_t *)wstream->stream, 0);
- uv_pipe_open((uv_pipe_t *)wstream->stream, file);
- wstream->stream->data = NULL;
- handle_set_wstream((uv_handle_t *)wstream->stream, wstream);
- wstream->free_handle = true;
-}
-
-/// Sets a callback that will be called on completion of a write request,
-/// indicating failure/success.
-///
-/// This affects all requests currently in-flight as well. Overwrites any
-/// possible earlier callback.
-///
-/// @note This callback will not fire if the write request couldn't even be
-/// queued properly (i.e.: when `wstream_write() returns an error`).
-///
-/// @param wstream The `WStream` instance
-/// @param cb The callback
-/// @param data User-provided data that will be passed to `cb`
-void wstream_set_write_cb(WStream *wstream, wstream_cb cb, void *data)
- FUNC_ATTR_NONNULL_ARG(1)
-{
- wstream->cb = cb;
- wstream->data = data;
-}
-
-/// Queues data for writing to the backing file descriptor of a `WStream`
-/// instance. This will fail if the write would cause the WStream use more
-/// memory than specified by `maxmem`.
-///
-/// @param wstream The `WStream` instance
-/// @param buffer The buffer which contains data to be written
-/// @return false if the write failed
-bool wstream_write(WStream *wstream, WBuffer *buffer)
-{
- // This should not be called after a wstream was freed
- assert(!wstream->freed);
-
- if (wstream->curmem > wstream->maxmem) {
- goto err;
- }
-
- wstream->curmem += buffer->size;
-
- WRequest *data = xmalloc(sizeof(WRequest));
- data->wstream = wstream;
- data->buffer = buffer;
- data->uv_req.data = data;
-
- uv_buf_t uvbuf;
- uvbuf.base = buffer->data;
- uvbuf.len = buffer->size;
-
- if (uv_write(&data->uv_req, wstream->stream, &uvbuf, 1, write_cb)) {
- xfree(data);
- goto err;
- }
-
- wstream->pending_reqs++;
- return true;
-
-err:
- wstream_release_wbuffer(buffer);
- return false;
-}
-
-/// Creates a WBuffer object for holding output data. Instances of this
-/// object can be reused across WStream instances, and the memory is freed
-/// automatically when no longer needed(it tracks the number of references
-/// internally)
-///
-/// @param data Data stored by the WBuffer
-/// @param size The size of the data array
-/// @param refcount The number of references for the WBuffer. This will be used
-/// by WStream instances to decide when a WBuffer should be freed.
-/// @param cb Pointer to function that will be responsible for freeing
-/// the buffer data(passing 'free' will work as expected).
-/// @return The allocated WBuffer instance
-WBuffer *wstream_new_buffer(char *data,
- size_t size,
- size_t refcount,
- wbuffer_data_finalizer cb)
-{
- WBuffer *rv = xmalloc(sizeof(WBuffer));
- rv->size = size;
- rv->refcount = refcount;
- rv->cb = cb;
- rv->data = data;
-
- return rv;
-}
-
-static void write_cb(uv_write_t *req, int status)
-{
- WRequest *data = req->data;
-
- data->wstream->curmem -= data->buffer->size;
-
- wstream_release_wbuffer(data->buffer);
-
- if (data->wstream->cb) {
- data->wstream->cb(data->wstream,
- data->wstream->data,
- status);
- }
-
- data->wstream->pending_reqs--;
-
- if (data->wstream->freed && data->wstream->pending_reqs == 0) {
- // Last pending write, free the wstream;
- if (data->wstream->free_handle) {
- uv_close((uv_handle_t *)data->wstream->stream, close_cb);
- } else {
- xfree(data->wstream);
- }
- }
-
- xfree(data);
-}
-
-void wstream_release_wbuffer(WBuffer *buffer)
-{
- if (!--buffer->refcount) {
- if (buffer->cb) {
- buffer->cb(buffer->data);
- }
-
- xfree(buffer);
- }
-}
-
-static void close_cb(uv_handle_t *handle)
-{
- xfree(handle_get_wstream(handle));
- xfree(handle->data);
- xfree(handle);
-}
-
diff --git a/src/nvim/os/wstream.h b/src/nvim/os/wstream.h
deleted file mode 100644
index d0e9bef93a..0000000000
--- a/src/nvim/os/wstream.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#ifndef NVIM_OS_WSTREAM_H
-#define NVIM_OS_WSTREAM_H
-
-#include <stdint.h>
-#include <stdbool.h>
-#include <uv.h>
-
-#include "nvim/os/wstream_defs.h"
-
-#ifdef INCLUDE_GENERATED_DECLARATIONS
-# include "os/wstream.h.generated.h"
-#endif
-#endif // NVIM_OS_WSTREAM_H
diff --git a/src/nvim/os/wstream_defs.h b/src/nvim/os/wstream_defs.h
deleted file mode 100644
index cfa0bf0b60..0000000000
--- a/src/nvim/os/wstream_defs.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#ifndef NVIM_OS_WSTREAM_DEFS_H
-#define NVIM_OS_WSTREAM_DEFS_H
-
-typedef struct wbuffer WBuffer;
-typedef struct wstream WStream;
-typedef void (*wbuffer_data_finalizer)(void *data);
-
-/// Type of function called when the WStream has information about a write
-/// request.
-///
-/// @param wstream The `WStream` instance
-/// @param data User-defined data
-/// @param status 0 on success, anything else indicates failure
-typedef void (*wstream_cb)(WStream *wstream,
- void *data,
- int status);
-
-#endif // NVIM_OS_WSTREAM_DEFS_H
-