diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 13 | ||||
-rw-r--r-- | src/nvim/os/event.c | 98 | ||||
-rw-r--r-- | src/nvim/os/event_defs.h | 13 | ||||
-rw-r--r-- | src/nvim/os/input.c | 18 | ||||
-rw-r--r-- | src/nvim/os/job.c | 55 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 58 | ||||
-rw-r--r-- | src/nvim/os/signal.c | 81 |
7 files changed, 65 insertions, 271 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 83e7900a54..d31e404c23 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -126,8 +126,7 @@ void channel_from_stream(uv_stream_t *stream) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -201,17 +200,12 @@ Object channel_send_call(uint64_t id, // Send the msgpack-rpc request send_request(channel, request_id, method_name, args); - EventSource channel_source = channel->is_job - ? job_event_source(channel->data.job) - : rstream_event_source(channel->data.streams.read); - EventSource sources[] = {channel_source, NULL}; - // Push the frame ChannelCallFrame frame = {request_id, false, false, NIL}; kv_push(ChannelCallFrame *, channel->call_stack, &frame); do { - event_poll(-1, sources); + event_poll(-1); } while (!frame.returned); (void)kv_pop(channel->call_stack); @@ -286,8 +280,7 @@ static void channel_from_stdio(void) // read stream channel->data.streams.read = rstream_new(parse_msgpack, rbuffer_new(CHANNEL_BUFFER_SIZE), - channel, - NULL); + channel); rstream_set_file(channel->data.streams.read, 0); rstream_start(channel->data.streams.read); // write stream diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 43c02b13b2..00920fc5cf 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -34,12 +34,7 @@ typedef struct { #ifdef INCLUDE_GENERATED_DECLARATIONS # include "os/event.c.generated.h" #endif -static klist_t(Event) *deferred_events, *immediate_events; -// NULL-terminated array of event sources that we should process immediately. -// -// Events from sources that are not contained in this array are processed -// later when `event_process` is called -static EventSource *immediate_sources = NULL; +static klist_t(Event) *pending_events; void event_init(void) { @@ -47,8 +42,7 @@ void event_init(void) msgpack_rpc_init_method_table(); msgpack_rpc_helpers_init(); // Initialize the event queues - deferred_events = kl_init(Event); - immediate_events = kl_init(Event); + pending_events = kl_init(Event); // Initialize input events input_init(); // Timer to wake the event loop if a timeout argument is passed to @@ -72,8 +66,7 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms, EventSource sources[]) - FUNC_ATTR_NONNULL_ARG(2) +bool event_poll(int32_t ms) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -104,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[]) run_mode = UV_RUN_NOWAIT; } - size_t processed_events; - - do { - // Run one event loop iteration, blocking for events if run_mode is - // UV_RUN_ONCE - processed_events = loop(run_mode, sources); - } while ( - // Continue running if ... - !processed_events && // we didn't process any immediate events - !event_has_deferred() && // no events are waiting to be processed - run_mode != UV_RUN_NOWAIT && // ms != 0 - !timer_data.timed_out); // we didn't get a timeout + loop(run_mode); if (!(--recursive)) { // Again, only stop when we leave the top-level invocation @@ -127,56 +109,31 @@ bool event_poll(int32_t ms, EventSource sources[]) // once more to let libuv perform it's cleanup uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL); - processed_events += loop(UV_RUN_NOWAIT, sources); + loop(UV_RUN_NOWAIT); } - return !timer_data.timed_out && (processed_events || event_has_deferred()); + return !timer_data.timed_out && event_has_deferred(); } bool event_has_deferred(void) { - return !kl_empty(deferred_events); + return !kl_empty(pending_events); } // Queue an event void event_push(Event event) { - bool defer = true; - - if (immediate_sources) { - size_t i; - EventSource src; - - for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) { - if (src == event.source) { - defer = false; - break; - } - } - } - - *kl_pushp(Event, defer ? deferred_events : immediate_events) = event; + *kl_pushp(Event, pending_events) = event; } -void event_process(void) -{ - process_from(deferred_events); -} -// Runs the appropriate action for each queued event -static size_t process_from(klist_t(Event) *queue) +void event_process(void) { - size_t count = 0; Event event; - while (kl_shift(Event, queue, &event) == 0) { + while (kl_shift(Event, pending_events, &event) == 0) { event.handler(event); - count++; } - - DLOG("Processed %u events", count); - - return count; } // Set a flag in the `event_poll` loop for signaling of a timeout @@ -194,42 +151,9 @@ static void timer_prepare_cb(uv_prepare_t *handle) uv_prepare_stop(handle); } -static void requeue_deferred_events(void) +static void loop(uv_run_mode run_mode) { - size_t remaining = deferred_events->size; - - DLOG("Number of deferred events: %u", remaining); - - while (remaining--) { - // Re-push each deferred event to ensure it will be in the right queue - Event event; - kl_shift(Event, deferred_events, &event); - event_push(event); - DLOG("Re-queueing event"); - } - - DLOG("Number of deferred events: %u", deferred_events->size); -} - -static size_t loop(uv_run_mode run_mode, EventSource *sources) -{ - size_t count; - immediate_sources = sources; - // It's possible that some events from the immediate sources are waiting - // in the deferred queue. If so, move them to the immediate queue so they - // will be processed in order of arrival by the next `process_from` call. - requeue_deferred_events(); - count = process_from(immediate_events); - - if (count) { - // No need to enter libuv, events were already processed - return count; - } - DLOG("Enter event loop"); uv_run(uv_default_loop(), run_mode); DLOG("Exit event loop"); - immediate_sources = NULL; - count = process_from(immediate_events); - return count; } diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index 553d4e3125..2dd9403d9f 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -6,21 +6,12 @@ #include "nvim/os/job_defs.h" #include "nvim/os/rstream_defs.h" -typedef void * EventSource; typedef struct event Event; typedef void (*event_handler)(Event event); struct event { - EventSource source; + void *data; event_handler handler; - union { - int signum; - struct { - RStream *ptr; - bool eof; - } rstream; - Job *job; - } data; -}; +}; #endif // NVIM_OS_EVENT_DEFS_H diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index a18d735ce6..d718bf95da 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -7,7 +7,6 @@ #include "nvim/api/private/defs.h" #include "nvim/os/input.h" #include "nvim/os/event.h" -#include "nvim/os/signal.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" #include "nvim/ascii.h" @@ -48,10 +47,7 @@ void input_init(void) } read_buffer = rbuffer_new(READ_BUFFER_SIZE); - read_stream = rstream_new(read_cb, - read_buffer, - NULL, - NULL); + read_stream = rstream_new(read_cb, read_buffer, NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -170,16 +166,10 @@ void input_buffer_restore(String str) static bool input_poll(int32_t ms) { if (embedded_mode) { - EventSource input_sources[] = { signal_event_source(), NULL }; - return event_poll(ms, input_sources); + return event_poll(ms); } - EventSource input_sources[] = { - rstream_event_source(read_stream), - NULL - }; - - return input_ready() || event_poll(ms, input_sources) || input_ready(); + return input_ready() || event_poll(ms) || input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c @@ -235,7 +225,7 @@ static void read_cb(RStream *rstream, void *data, bool at_eof) static void convert_input(void) { - if (!rbuffer_available(input_buffer)) { + if (embedded_mode || !rbuffer_available(input_buffer)) { // No input buffer space return; } diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index d0ac82c047..091da5d213 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -14,7 +14,6 @@ #include "nvim/os/event_defs.h" #include "nvim/os/time.h" #include "nvim/os/shell.h" -#include "nvim/os/signal.h" #include "nvim/vim.h" #include "nvim/memory.h" #include "nvim/term.h" @@ -103,21 +102,24 @@ void job_teardown(void) // Prepare to start shooting for (i = 0; i < MAX_RUNNING_JOBS; i++) { - if ((job = table[i]) == NULL) { - continue; - } + job = table[i]; // Still alive - while (is_alive(job) && remaining_tries--) { + while (job && is_alive(job) && remaining_tries--) { os_delay(50, 0); // Acknowledge child exits uv_run(uv_default_loop(), UV_RUN_NOWAIT); + // It's possible that the uv_run call removed the job from the table, + // reset 'job' so the next iteration won't run in that case. + job = table[i]; } - if (is_alive(job)) { + if (job && is_alive(job)) { uv_process_kill(&job->proc, SIGKILL); } } + // Last run to ensure all children were removed + uv_run(uv_default_loop(), UV_RUN_NOWAIT); } /// Tries to start a new job. @@ -213,14 +215,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); - job->err = rstream_new(read_cb, - rbuffer_new(JOB_BUFFER_SIZE), - job, - job_event_source(job)); + job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); + job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -277,8 +273,6 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int old_mode = cur_tmode; settmode(TMODE_COOK); - EventSource sources[] = {job_event_source(job), signal_event_source(), NULL}; - // keep track of the elapsed time if ms > 0 uint64_t before = (ms > 0) ? os_hrtime() : 0; @@ -288,7 +282,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL break; } - event_poll(ms, sources); + event_poll(ms); // we'll assume that a user frantically hitting interrupt doesn't like // the current job. Signal that it has to be killed. @@ -369,14 +363,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Runs the read callback associated with the job exit event -/// -/// @param event Object containing data necessary to invoke the callback -void job_exit_event(Event event) -{ - job_exit_callback(event.data.job); -} - /// Get the job id /// /// @param job A pointer to the job @@ -395,11 +381,6 @@ void *job_data(Job *job) return job->data; } -EventSource job_event_source(Job *job) -{ - return job; -} - static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -470,7 +451,7 @@ static void read_cb(RStream *rstream, void *data, bool eof) } if (eof && --job->pending_refs == 0) { - emit_exit_event(job); + job_exit_callback(job); } } @@ -481,20 +462,10 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) job->status = status; if (--job->pending_refs == 0) { - emit_exit_event(job); + job_exit_callback(job); } } -static void emit_exit_event(Job *job) -{ - Event event = { - .source = job_event_source(job), - .handler = job_exit_event, - .data.job = job - }; - event_push(event); -} - static void close_cb(uv_handle_t *handle) { Job *job = handle_get_job(handle); diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index b95098cc52..8cfd9d1b75 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -8,8 +8,6 @@ #include "nvim/os/uv_helpers.h" #include "nvim/os/rstream_defs.h" #include "nvim/os/rstream.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/ascii.h" #include "nvim/vim.h" #include "nvim/memory.h" @@ -33,7 +31,6 @@ struct rstream { uv_file fd; rstream_cb cb; bool free_handle; - EventSource source_override; }; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -76,18 +73,13 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count) void rbuffer_produced(RBuffer *rbuffer, size_t count) { rbuffer->wpos += count; - DLOG("Received %u bytes from RStream(address: %p, source: %p)", - (size_t)cnt, - rbuffer->rstream, - rstream_event_source(rbuffer->rstream)); + DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rbuffer->rstream); rbuffer_relocate(rbuffer); if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) { // The last read filled the buffer, stop reading for now rstream_stop(rbuffer->rstream); - DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", - rstream, - rstream_event_source(rstream)); + DLOG("Buffer for RStream(%p) is full, stopping it", rstream); } } @@ -180,13 +172,8 @@ void rbuffer_free(RBuffer *rbuffer) /// for reading with `rstream_read` /// @param buffer RBuffer instance to associate with the RStream /// @param data Some state to associate with the `RStream` instance -/// @param source_override Replacement for the default source used in events -/// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance -RStream * rstream_new(rstream_cb cb, - RBuffer *buffer, - void *data, - EventSource source_override) +RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data) { RStream *rv = xmalloc(sizeof(RStream)); rv->buffer = buffer; @@ -198,7 +185,6 @@ RStream * rstream_new(rstream_cb cb, rv->fread_idle = NULL; rv->free_handle = false; rv->file_type = UV_UNKNOWN_HANDLE; - rv->source_override = source_override ? source_override : rv; return rv; } @@ -322,21 +308,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count) return rbuffer_read(rstream->buffer, buffer, count); } -/// Runs the read callback associated with the rstream -/// -/// @param event Object containing data necessary to invoke the callback -void rstream_read_event(Event event) -{ - RStream *rstream = event.data.rstream.ptr; - - rstream->cb(rstream, rstream->data, event.data.rstream.eof); -} - -EventSource rstream_event_source(RStream *rstream) -{ - return rstream->source_override; -} - // Callbacks used by libuv // Called by libuv to allocate memory for reading. @@ -357,13 +328,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt <= 0) { if (cnt != UV_ENOBUFS) { - DLOG("Closing RStream(address: %p, source: %p)", - rstream, - rstream_event_source(rstream)); + DLOG("Closing RStream(%p)", rstream); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); - emit_read_event(rstream, true); + rstream->cb(rstream, rstream->data, true); } return; } @@ -374,7 +343,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rbuffer_produced(rstream->buffer, nread); - emit_read_event(rstream, false); + rstream->cb(rstream, rstream->data, false); } // Called by the by the 'idle' handle to emulate a reading event @@ -409,7 +378,6 @@ static void fread_idle_cb(uv_idle_t *handle) if (req.result <= 0) { uv_idle_stop(rstream->fread_idle); - emit_read_event(rstream, true); return; } @@ -417,7 +385,6 @@ static void fread_idle_cb(uv_idle_t *handle) size_t nread = (size_t) req.result; rbuffer_produced(rstream->buffer, nread); rstream->fpos += nread; - emit_read_event(rstream, false); } static void close_cb(uv_handle_t *handle) @@ -426,19 +393,6 @@ static void close_cb(uv_handle_t *handle) free(handle); } -static void emit_read_event(RStream *rstream, bool eof) -{ - Event event = { - .source = rstream_event_source(rstream), - .handler = rstream_read_event, - .data.rstream = { - .ptr = rstream, - .eof = eof - } - }; - event_push(event); -} - static void rbuffer_relocate(RBuffer *rbuffer) { // Move data ... diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index b330c7f788..36f7b37c48 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -12,8 +12,6 @@ #include "nvim/memory.h" #include "nvim/misc1.h" #include "nvim/misc2.h" -#include "nvim/os/event_defs.h" -#include "nvim/os/event.h" #include "nvim/os/signal.h" static uv_signal_t sint, spipe, shup, squit, sterm, swinch; @@ -72,45 +70,6 @@ void signal_accept_deadly(void) rejecting_deadly = false; } -void signal_handle(Event event) -{ - int signum = event.data.signum; - - switch (signum) { - case SIGINT: - got_int = true; - break; -#ifdef SIGPWR - case SIGPWR: - // Signal of a power failure(eg batteries low), flush the swap files to - // be safe - ml_sync_all(false, false); - break; -#endif - case SIGPIPE: - // Ignore - break; - case SIGWINCH: - shell_resized(); - break; - case SIGTERM: - case SIGQUIT: - case SIGHUP: - if (!rejecting_deadly) { - deadly_signal(signum); - } - break; - default: - fprintf(stderr, "Invalid signal %d", signum); - break; - } -} - -EventSource signal_event_source(void) -{ - return &sint; -} - static char * signal_name(int signum) { switch (signum) { @@ -154,20 +113,32 @@ static void deadly_signal(int signum) static void signal_cb(uv_signal_t *handle, int signum) { - if (rejecting_deadly) { - if (signum == SIGINT) { + switch (signum) { + case SIGINT: got_int = true; - } - - return; + break; +#ifdef SIGPWR + case SIGPWR: + // Signal of a power failure(eg batteries low), flush the swap files to + // be safe + ml_sync_all(false, false); + break; +#endif + case SIGPIPE: + // Ignore + break; + case SIGWINCH: + shell_resized(); + break; + case SIGTERM: + case SIGQUIT: + case SIGHUP: + if (!rejecting_deadly) { + deadly_signal(signum); + } + break; + default: + fprintf(stderr, "Invalid signal %d", signum); + break; } - - Event event = { - .source = signal_event_source(), - .handler = signal_handle, - .data = { - .signum = signum - } - }; - event_push(event); } |